remove etcd
This commit is contained in:
parent
eae51feabb
commit
19af08af57
@ -46,7 +46,6 @@ import (
|
|||||||
physCockroachDB "github.com/hashicorp/vault/physical/cockroachdb"
|
physCockroachDB "github.com/hashicorp/vault/physical/cockroachdb"
|
||||||
physConsul "github.com/hashicorp/vault/physical/consul"
|
physConsul "github.com/hashicorp/vault/physical/consul"
|
||||||
physCouchDB "github.com/hashicorp/vault/physical/couchdb"
|
physCouchDB "github.com/hashicorp/vault/physical/couchdb"
|
||||||
physEtcd "github.com/hashicorp/vault/physical/etcd"
|
|
||||||
physFoundationDB "github.com/hashicorp/vault/physical/foundationdb"
|
physFoundationDB "github.com/hashicorp/vault/physical/foundationdb"
|
||||||
physMySQL "github.com/hashicorp/vault/physical/mysql"
|
physMySQL "github.com/hashicorp/vault/physical/mysql"
|
||||||
physOCI "github.com/hashicorp/vault/physical/oci"
|
physOCI "github.com/hashicorp/vault/physical/oci"
|
||||||
@ -178,7 +177,6 @@ var (
|
|||||||
"consul": physConsul.NewConsulBackend,
|
"consul": physConsul.NewConsulBackend,
|
||||||
"couchdb_transactional": physCouchDB.NewTransactionalCouchDBBackend,
|
"couchdb_transactional": physCouchDB.NewTransactionalCouchDBBackend,
|
||||||
"couchdb": physCouchDB.NewCouchDBBackend,
|
"couchdb": physCouchDB.NewCouchDBBackend,
|
||||||
"etcd": physEtcd.NewEtcdBackend,
|
|
||||||
"file_transactional": physFile.NewTransactionalFileBackend,
|
"file_transactional": physFile.NewTransactionalFileBackend,
|
||||||
"file": physFile.NewFileBackend,
|
"file": physFile.NewFileBackend,
|
||||||
"foundationdb": physFoundationDB.NewFDBBackend,
|
"foundationdb": physFoundationDB.NewFDBBackend,
|
||||||
|
3
go.mod
3
go.mod
@ -159,8 +159,6 @@ require (
|
|||||||
github.com/shirou/gopsutil/v3 v3.22.6
|
github.com/shirou/gopsutil/v3 v3.22.6
|
||||||
github.com/stretchr/testify v1.8.4
|
github.com/stretchr/testify v1.8.4
|
||||||
go.etcd.io/bbolt v1.3.7
|
go.etcd.io/bbolt v1.3.7
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7
|
|
||||||
go.etcd.io/etcd/client/v2 v2.305.5
|
|
||||||
go.etcd.io/etcd/client/v3 v3.5.7
|
go.etcd.io/etcd/client/v3 v3.5.7
|
||||||
go.opentelemetry.io/otel v1.22.0
|
go.opentelemetry.io/otel v1.22.0
|
||||||
go.opentelemetry.io/otel/sdk v1.22.0
|
go.opentelemetry.io/otel/sdk v1.22.0
|
||||||
@ -364,6 +362,7 @@ require (
|
|||||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||||
github.com/zclconf/go-cty v1.12.1 // indirect
|
github.com/zclconf/go-cty v1.12.1 // indirect
|
||||||
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
|
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
|
||||||
|
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
|
||||||
go.mongodb.org/mongo-driver v1.11.6 // indirect
|
go.mongodb.org/mongo-driver v1.11.6 // indirect
|
||||||
go.opencensus.io v0.24.0 // indirect
|
go.opencensus.io v0.24.0 // indirect
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
|
||||||
|
1
go.sum
1
go.sum
@ -2907,7 +2907,6 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.5/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4
|
|||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg=
|
go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg=
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7/go.mod h1:o0Abi1MK86iad3YrWhgUsbGx1pmTS+hrORWc2CamuhY=
|
go.etcd.io/etcd/client/pkg/v3 v3.5.7/go.mod h1:o0Abi1MK86iad3YrWhgUsbGx1pmTS+hrORWc2CamuhY=
|
||||||
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
|
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
|
||||||
go.etcd.io/etcd/client/v2 v2.305.5 h1:DktRP60//JJpnPC0VBymAN/7V71GHMdjDCBt4ZPXDjI=
|
|
||||||
go.etcd.io/etcd/client/v2 v2.305.5/go.mod h1:zQjKllfqfBVyVStbt4FaosoX2iYd8fV/GRy/PbowgP4=
|
go.etcd.io/etcd/client/v2 v2.305.5/go.mod h1:zQjKllfqfBVyVStbt4FaosoX2iYd8fV/GRy/PbowgP4=
|
||||||
go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0=
|
go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0=
|
||||||
go.etcd.io/etcd/client/v3 v3.5.5/go.mod h1:aApjR4WGlSumpnJ2kloS75h6aHUmAyaPLjHMxpc7E7c=
|
go.etcd.io/etcd/client/v3 v3.5.5/go.mod h1:aApjR4WGlSumpnJ2kloS75h6aHUmAyaPLjHMxpc7E7c=
|
||||||
|
@ -1,90 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/docker"
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
docker.ServiceURL
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrepareTestContainer creates etcd docker container. If environment variabe
|
|
||||||
// ETCD_ADDR is set, the tests are executed against specified address and etcd
|
|
||||||
// container is not launched.
|
|
||||||
func PrepareTestContainer(t *testing.T, version string) (func(), *Config) {
|
|
||||||
if addr := os.Getenv("ETCD_ADDR"); addr != "" {
|
|
||||||
url, err := docker.NewServiceURLParse(addr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return func() {}, &Config{ServiceURL: *url}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check https://github.com/etcd-io/etcd/releases for latest releases.
|
|
||||||
runner, err := docker.NewServiceRunner(docker.RunOptions{
|
|
||||||
ContainerName: "etcd",
|
|
||||||
ImageRepo: "gcr.io/etcd-development/etcd",
|
|
||||||
ImageTag: version,
|
|
||||||
Cmd: []string{
|
|
||||||
"/usr/local/bin/etcd",
|
|
||||||
"--name", "s1",
|
|
||||||
"--listen-client-urls", "http://0.0.0.0:2379",
|
|
||||||
"--advertise-client-urls", "http://0.0.0.0:2379",
|
|
||||||
"--listen-peer-urls", "http://0.0.0.0:2380",
|
|
||||||
"--initial-advertise-peer-urls", "http://0.0.0.0:2380",
|
|
||||||
"--initial-cluster", "s1=http://0.0.0.0:2380",
|
|
||||||
"--initial-cluster-token", "tkn",
|
|
||||||
"--initial-cluster-state", "new",
|
|
||||||
"--log-level", "info",
|
|
||||||
"--logger", "zap",
|
|
||||||
"--log-outputs", "stderr",
|
|
||||||
},
|
|
||||||
Ports: []string{"2379/tcp"},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Could not start docker etcd container: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
svc, err := runner.StartService(context.Background(), func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
|
|
||||||
address := fmt.Sprintf("%s:%d", host, port)
|
|
||||||
s := docker.NewServiceURL(url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: address,
|
|
||||||
})
|
|
||||||
|
|
||||||
client, err := clientv3.New(clientv3.Config{
|
|
||||||
Endpoints: []string{address},
|
|
||||||
DialTimeout: 2 * time.Minute,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not connect to etcd container: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enable authentication for the tests.
|
|
||||||
client.RoleAdd(ctx, "root")
|
|
||||||
client.UserAdd(ctx, "root", "insecure")
|
|
||||||
client.UserGrantRole(ctx, "root", "root")
|
|
||||||
client.AuthEnable(ctx)
|
|
||||||
client.Close()
|
|
||||||
|
|
||||||
return &Config{
|
|
||||||
ServiceURL: *s,
|
|
||||||
}, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Could not start docker etcd container: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return svc.Cleanup, svc.Config.(*Config)
|
|
||||||
}
|
|
@ -1,92 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
"go.etcd.io/etcd/client/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
EtcdMultipleBootstrapError = errors.New("client setup failed: multiple discovery or bootstrap flags specified, use either \"address\" or \"discovery_srv\"")
|
|
||||||
EtcdAddressError = errors.New("client setup failed: address must be valid URL (ex. 'scheme://host:port')")
|
|
||||||
EtcdLockHeldError = errors.New("lock already held")
|
|
||||||
EtcdLockNotHeldError = errors.New("lock not held")
|
|
||||||
EtcdVersionUnknown = errors.New("etcd: unknown API version")
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewEtcdBackend constructs a etcd backend using a given machine address.
|
|
||||||
func NewEtcdBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
|
||||||
var (
|
|
||||||
apiVersion string
|
|
||||||
ok bool
|
|
||||||
)
|
|
||||||
|
|
||||||
if apiVersion, ok = conf["etcd_api"]; !ok {
|
|
||||||
apiVersion = os.Getenv("ETCD_API")
|
|
||||||
}
|
|
||||||
|
|
||||||
if apiVersion == "" {
|
|
||||||
apiVersion = "v3"
|
|
||||||
}
|
|
||||||
|
|
||||||
switch apiVersion {
|
|
||||||
case "3", "etcd3", "v3":
|
|
||||||
return newEtcd3Backend(conf, logger)
|
|
||||||
default:
|
|
||||||
return nil, EtcdVersionUnknown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieves the config option in order of priority:
|
|
||||||
// 1. The named environment variable if it exist
|
|
||||||
// 2. The key in the config map
|
|
||||||
func getEtcdOption(conf map[string]string, confKey, envVar string) (string, bool) {
|
|
||||||
confVal, inConf := conf[confKey]
|
|
||||||
envVal, inEnv := os.LookupEnv(envVar)
|
|
||||||
if inEnv {
|
|
||||||
return envVal, true
|
|
||||||
}
|
|
||||||
return confVal, inConf
|
|
||||||
}
|
|
||||||
|
|
||||||
func getEtcdEndpoints(conf map[string]string) ([]string, error) {
|
|
||||||
address, staticBootstrap := getEtcdOption(conf, "address", "ETCD_ADDR")
|
|
||||||
domain, useSrv := getEtcdOption(conf, "discovery_srv", "ETCD_DISCOVERY_SRV")
|
|
||||||
if useSrv && staticBootstrap {
|
|
||||||
return nil, EtcdMultipleBootstrapError
|
|
||||||
}
|
|
||||||
|
|
||||||
if staticBootstrap {
|
|
||||||
endpoints := strings.Split(address, ",")
|
|
||||||
// Verify that the machines are valid URLs
|
|
||||||
for _, e := range endpoints {
|
|
||||||
u, urlErr := url.Parse(e)
|
|
||||||
if urlErr != nil || u.Scheme == "" {
|
|
||||||
return nil, EtcdAddressError
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return endpoints, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if useSrv {
|
|
||||||
srvName, _ := getEtcdOption(conf, "discovery_srv_name", "ETCD_DISCOVERY_SRV_NAME")
|
|
||||||
discoverer := client.NewSRVDiscover()
|
|
||||||
endpoints, err := discoverer.Discover(domain, srvName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to discover etcd endpoints through SRV discovery: %w", err)
|
|
||||||
}
|
|
||||||
return endpoints, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set a default endpoints list if no option was set
|
|
||||||
return []string{"http://127.0.0.1:2379"}, nil
|
|
||||||
}
|
|
@ -1,383 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
|
||||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
"go.etcd.io/etcd/client/v3/concurrency"
|
|
||||||
)
|
|
||||||
|
|
||||||
// EtcdBackend is a physical backend that stores data at specific
|
|
||||||
// prefix within etcd. It is used for most production situations as
|
|
||||||
// it allows Vault to run on multiple machines in a highly-available manner.
|
|
||||||
type EtcdBackend struct {
|
|
||||||
logger log.Logger
|
|
||||||
path string
|
|
||||||
haEnabled bool
|
|
||||||
lockTimeout time.Duration
|
|
||||||
requestTimeout time.Duration
|
|
||||||
|
|
||||||
permitPool *physical.PermitPool
|
|
||||||
|
|
||||||
etcd *clientv3.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify EtcdBackend satisfies the correct interfaces
|
|
||||||
var (
|
|
||||||
_ physical.Backend = (*EtcdBackend)(nil)
|
|
||||||
_ physical.HABackend = (*EtcdBackend)(nil)
|
|
||||||
_ physical.Lock = (*EtcdLock)(nil)
|
|
||||||
)
|
|
||||||
|
|
||||||
// newEtcd3Backend constructs a etcd3 backend.
|
|
||||||
func newEtcd3Backend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
|
||||||
// Get the etcd path form the configuration.
|
|
||||||
path, ok := conf["path"]
|
|
||||||
if !ok {
|
|
||||||
path = "/vault"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure path is prefixed.
|
|
||||||
if !strings.HasPrefix(path, "/") {
|
|
||||||
path = "/" + path
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoints, err := getEtcdEndpoints(conf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := clientv3.Config{
|
|
||||||
Endpoints: endpoints,
|
|
||||||
}
|
|
||||||
|
|
||||||
haEnabled := os.Getenv("ETCD_HA_ENABLED")
|
|
||||||
if haEnabled == "" {
|
|
||||||
haEnabled = conf["ha_enabled"]
|
|
||||||
}
|
|
||||||
if haEnabled == "" {
|
|
||||||
haEnabled = "false"
|
|
||||||
}
|
|
||||||
haEnabledBool, err := strconv.ParseBool(haEnabled)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("value [%v] of 'ha_enabled' could not be understood", haEnabled)
|
|
||||||
}
|
|
||||||
|
|
||||||
cert, hasCert := conf["tls_cert_file"]
|
|
||||||
key, hasKey := conf["tls_key_file"]
|
|
||||||
ca, hasCa := conf["tls_ca_file"]
|
|
||||||
if (hasCert && hasKey) || hasCa {
|
|
||||||
tls := transport.TLSInfo{
|
|
||||||
TrustedCAFile: ca,
|
|
||||||
CertFile: cert,
|
|
||||||
KeyFile: key,
|
|
||||||
}
|
|
||||||
|
|
||||||
tlscfg, err := tls.ClientConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cfg.TLS = tlscfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set credentials.
|
|
||||||
username := os.Getenv("ETCD_USERNAME")
|
|
||||||
if username == "" {
|
|
||||||
username, _ = conf["username"]
|
|
||||||
}
|
|
||||||
|
|
||||||
password := os.Getenv("ETCD_PASSWORD")
|
|
||||||
if password == "" {
|
|
||||||
password, _ = conf["password"]
|
|
||||||
}
|
|
||||||
|
|
||||||
if username != "" && password != "" {
|
|
||||||
cfg.Username = username
|
|
||||||
cfg.Password = password
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxReceive, ok := conf["max_receive_size"]; ok {
|
|
||||||
// grpc converts this to uint32 internally, so parse as that to avoid passing invalid values
|
|
||||||
val, err := strconv.ParseUint(maxReceive, 10, 32)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("value of 'max_receive_size' (%v) could not be understood: %w", maxReceive, err)
|
|
||||||
}
|
|
||||||
cfg.MaxCallRecvMsgSize = int(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
etcd, err := clientv3.New(cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sReqTimeout := conf["request_timeout"]
|
|
||||||
if sReqTimeout == "" {
|
|
||||||
// etcd3 default request timeout is set to 5s. It should be long enough
|
|
||||||
// for most cases, even with internal retry.
|
|
||||||
sReqTimeout = "5s"
|
|
||||||
}
|
|
||||||
reqTimeout, err := parseutil.ParseDurationSecond(sReqTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("value [%v] of 'request_timeout' could not be understood: %w", sReqTimeout, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ssync, ok := conf["sync"]
|
|
||||||
if !ok {
|
|
||||||
ssync = "true"
|
|
||||||
}
|
|
||||||
sync, err := strconv.ParseBool(ssync)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("value of 'sync' (%v) could not be understood: %w", ssync, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if sync {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), reqTimeout)
|
|
||||||
err := etcd.Sync(ctx)
|
|
||||||
cancel()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sLock := conf["lock_timeout"]
|
|
||||||
if sLock == "" {
|
|
||||||
// etcd3 default lease duration is 60s. set to 15s for faster recovery.
|
|
||||||
sLock = "15s"
|
|
||||||
}
|
|
||||||
lock, err := parseutil.ParseDurationSecond(sLock)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("value [%v] of 'lock_timeout' could not be understood: %w", sLock, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &EtcdBackend{
|
|
||||||
path: path,
|
|
||||||
etcd: etcd,
|
|
||||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
|
||||||
logger: logger,
|
|
||||||
haEnabled: haEnabledBool,
|
|
||||||
lockTimeout: lock,
|
|
||||||
requestTimeout: reqTimeout,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
|
||||||
defer metrics.MeasureSince([]string{"etcd", "put"}, time.Now())
|
|
||||||
|
|
||||||
c.permitPool.Acquire()
|
|
||||||
defer c.permitPool.Release()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
_, err := c.etcd.Put(ctx, path.Join(c.path, entry.Key), string(entry.Value))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"etcd", "get"}, time.Now())
|
|
||||||
|
|
||||||
c.permitPool.Acquire()
|
|
||||||
defer c.permitPool.Release()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
resp, err := c.etcd.Get(ctx, path.Join(c.path, key))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(resp.Kvs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if len(resp.Kvs) > 1 {
|
|
||||||
return nil, errors.New("unexpected number of keys from a get request")
|
|
||||||
}
|
|
||||||
return &physical.Entry{
|
|
||||||
Key: key,
|
|
||||||
Value: resp.Kvs[0].Value,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdBackend) Delete(ctx context.Context, key string) error {
|
|
||||||
defer metrics.MeasureSince([]string{"etcd", "delete"}, time.Now())
|
|
||||||
|
|
||||||
c.permitPool.Acquire()
|
|
||||||
defer c.permitPool.Release()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
_, err := c.etcd.Delete(ctx, path.Join(c.path, key))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"etcd", "list"}, time.Now())
|
|
||||||
|
|
||||||
c.permitPool.Acquire()
|
|
||||||
defer c.permitPool.Release()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
prefix = path.Join(c.path, prefix) + "/"
|
|
||||||
resp, err := c.etcd.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
keys := []string{}
|
|
||||||
for _, kv := range resp.Kvs {
|
|
||||||
key := strings.TrimPrefix(string(kv.Key), prefix)
|
|
||||||
key = strings.TrimPrefix(key, "/")
|
|
||||||
|
|
||||||
if len(key) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if i := strings.Index(key, "/"); i == -1 {
|
|
||||||
keys = append(keys, key)
|
|
||||||
} else if i != -1 {
|
|
||||||
keys = strutil.AppendIfMissing(keys, key[:i+1])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *EtcdBackend) HAEnabled() bool {
|
|
||||||
return e.haEnabled
|
|
||||||
}
|
|
||||||
|
|
||||||
// EtcdLock implements a lock using and etcd backend.
|
|
||||||
type EtcdLock struct {
|
|
||||||
lock sync.Mutex
|
|
||||||
held bool
|
|
||||||
timeout time.Duration
|
|
||||||
requestTimeout time.Duration
|
|
||||||
|
|
||||||
etcdSession *concurrency.Session
|
|
||||||
etcdMu *concurrency.Mutex
|
|
||||||
|
|
||||||
prefix string
|
|
||||||
value string
|
|
||||||
|
|
||||||
etcd *clientv3.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock is used for mutual exclusion based on the given key.
|
|
||||||
func (c *EtcdBackend) LockWith(key, value string) (physical.Lock, error) {
|
|
||||||
p := path.Join(c.path, key)
|
|
||||||
return &EtcdLock{
|
|
||||||
prefix: p,
|
|
||||||
value: value,
|
|
||||||
etcd: c.etcd,
|
|
||||||
timeout: c.lockTimeout,
|
|
||||||
requestTimeout: c.requestTimeout,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
|
|
||||||
if c.etcdMu == nil {
|
|
||||||
if err := c.initMu(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.held {
|
|
||||||
return nil, EtcdLockHeldError
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-c.etcdSession.Done():
|
|
||||||
if !ok {
|
|
||||||
// The session's done channel is closed, so the session is over,
|
|
||||||
// and we need a new lock with a new session.
|
|
||||||
if err := c.initMu(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
go func() {
|
|
||||||
<-stopCh
|
|
||||||
cancel()
|
|
||||||
}()
|
|
||||||
if err := c.etcdMu.Lock(ctx); err != nil {
|
|
||||||
if err == context.Canceled {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
if _, err := c.etcd.Put(pctx, c.etcdMu.Key(), c.value, clientv3.WithLease(c.etcdSession.Lease())); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.held = true
|
|
||||||
|
|
||||||
return c.etcdSession.Done(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdLock) Unlock() error {
|
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
|
|
||||||
if !c.held {
|
|
||||||
return EtcdLockNotHeldError
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
return c.etcdMu.Unlock(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdLock) Value() (bool, string, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resp, err := c.etcd.Get(ctx,
|
|
||||||
c.prefix, clientv3.WithPrefix(),
|
|
||||||
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
|
|
||||||
if err != nil {
|
|
||||||
return false, "", err
|
|
||||||
}
|
|
||||||
if len(resp.Kvs) == 0 {
|
|
||||||
return false, "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, string(resp.Kvs[0].Value), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *EtcdLock) initMu() error {
|
|
||||||
session, err := concurrency.NewSession(c.etcd, concurrency.WithTTL(int(c.timeout.Seconds())))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
c.etcdSession = session
|
|
||||||
c.etcdMu = concurrency.NewMutex(session, c.prefix)
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,46 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/etcd"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestEtcd3Backend(t *testing.T) {
|
|
||||||
cleanup, config := etcd.PrepareTestContainer(t, "v3.5.0")
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
logger := logging.NewVaultLogger(log.Debug)
|
|
||||||
configMap := map[string]string{
|
|
||||||
"address": config.URL().String(),
|
|
||||||
"path": fmt.Sprintf("/vault-%d", time.Now().Unix()),
|
|
||||||
"etcd_api": "3",
|
|
||||||
"username": "root",
|
|
||||||
"password": "insecure",
|
|
||||||
|
|
||||||
// Syncing advertised client urls should be disabled since docker port mapping confuses the client.
|
|
||||||
"sync": "false",
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := NewEtcdBackend(configMap, logger)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
b2, err := NewEtcdBackend(configMap, logger)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
physical.ExerciseBackend(t, b)
|
|
||||||
physical.ExerciseBackend_ListPrefix(t, b)
|
|
||||||
physical.ExerciseHABackend(t, b.(physical.HABackend), b2.(physical.HABackend))
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user