From f33fd378bb2c59103d69e3dabefbbc04d4e9f408 Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Mon, 1 Jul 2024 12:23:25 +0300 Subject: [PATCH] remove cassandra --- CODEOWNERS | 1 - command/commands.go | 2 - physical/cassandra/cassandra.go | 366 --------------------------- physical/cassandra/cassandra_test.go | 60 ----- 4 files changed, 429 deletions(-) delete mode 100644 physical/cassandra/cassandra.go delete mode 100644 physical/cassandra/cassandra_test.go diff --git a/CODEOWNERS b/CODEOWNERS index 04ebb5831..c86a4e0aa 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -10,7 +10,6 @@ /builtin/credential/okta/ @hashicorp/vault-ecosystem # Secrets engines (pki, ssh, totp and transit omitted) -/builtin/logical/cassandra/ @hashicorp/vault-ecosystem /builtin/logical/consul/ @hashicorp/vault-ecosystem /builtin/logical/database/ @hashicorp/vault-ecosystem /builtin/logical/mysql/ @hashicorp/vault-ecosystem diff --git a/command/commands.go b/command/commands.go index 874322e77..625be1e72 100644 --- a/command/commands.go +++ b/command/commands.go @@ -42,7 +42,6 @@ import ( logicalDb "github.com/hashicorp/vault/builtin/logical/database" physAerospike "github.com/hashicorp/vault/physical/aerospike" - physCassandra "github.com/hashicorp/vault/physical/cassandra" physCockroachDB "github.com/hashicorp/vault/physical/cockroachdb" physConsul "github.com/hashicorp/vault/physical/consul" physFoundationDB "github.com/hashicorp/vault/physical/foundationdb" @@ -171,7 +170,6 @@ var ( physicalBackends = map[string]physical.Factory{ "aerospike": physAerospike.NewAerospikeBackend, - "cassandra": physCassandra.NewCassandraBackend, "cockroachdb": physCockroachDB.NewCockroachDBBackend, "consul": physConsul.NewConsulBackend, "file_transactional": physFile.NewTransactionalFileBackend, diff --git a/physical/cassandra/cassandra.go b/physical/cassandra/cassandra.go deleted file mode 100644 index 1c592ee33..000000000 --- a/physical/cassandra/cassandra.go +++ /dev/null @@ -1,366 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cassandra - -import ( - "context" - "crypto/tls" - "fmt" - "io/ioutil" - "net" - "strconv" - "strings" - "time" - - metrics "github.com/armon/go-metrics" - "github.com/gocql/gocql" - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/vault/sdk/helper/certutil" - "github.com/hashicorp/vault/sdk/physical" -) - -// CassandraBackend is a physical backend that stores data in Cassandra. -type CassandraBackend struct { - sess *gocql.Session - table string - - logger log.Logger -} - -// Verify CassandraBackend satisfies the correct interfaces -var _ physical.Backend = (*CassandraBackend)(nil) - -// NewCassandraBackend constructs a Cassandra backend using a pre-existing -// keyspace and table. -func NewCassandraBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { - splitArray := func(v string) []string { - return strings.FieldsFunc(v, func(r rune) bool { - return r == ',' - }) - } - - var ( - hosts = splitArray(conf["hosts"]) - port = 9042 - explicitPort = false - keyspace = conf["keyspace"] - table = conf["table"] - consistency = gocql.LocalQuorum - ) - - if len(hosts) == 0 { - hosts = []string{"localhost"} - } - for i, hp := range hosts { - h, ps, err := net.SplitHostPort(hp) - if err != nil { - continue - } - p, err := strconv.Atoi(ps) - if err != nil { - return nil, err - } - - if explicitPort && p != port { - return nil, fmt.Errorf("all hosts must have the same port") - } - hosts[i], port = h, p - explicitPort = true - } - - if keyspace == "" { - keyspace = "vault" - } - if table == "" { - table = "entries" - } - if cs, ok := conf["consistency"]; ok { - switch cs { - case "ANY": - consistency = gocql.Any - case "ONE": - consistency = gocql.One - case "TWO": - consistency = gocql.Two - case "THREE": - consistency = gocql.Three - case "QUORUM": - consistency = gocql.Quorum - case "ALL": - consistency = gocql.All - case "LOCAL_QUORUM": - consistency = gocql.LocalQuorum - case "EACH_QUORUM": - consistency = gocql.EachQuorum - case "LOCAL_ONE": - consistency = gocql.LocalOne - default: - return nil, fmt.Errorf("'consistency' must be one of {ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE}") - } - } - - connectStart := time.Now() - cluster := gocql.NewCluster(hosts...) - cluster.Port = port - cluster.Keyspace = keyspace - - if retryCountStr, ok := conf["simple_retry_policy_retries"]; ok { - retryCount, err := strconv.Atoi(retryCountStr) - if err != nil || retryCount <= 0 { - return nil, fmt.Errorf("'simple_retry_policy_retries' must be a positive integer") - } - cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retryCount} - } - - cluster.ProtoVersion = 2 - if protoVersionStr, ok := conf["protocol_version"]; ok { - protoVersion, err := strconv.Atoi(protoVersionStr) - if err != nil { - return nil, fmt.Errorf("'protocol_version' must be an integer") - } - cluster.ProtoVersion = protoVersion - } - - if username, ok := conf["username"]; ok { - if cluster.ProtoVersion < 2 { - return nil, fmt.Errorf("authentication is not supported with protocol version < 2") - } - authenticator := gocql.PasswordAuthenticator{Username: username} - if password, ok := conf["password"]; ok { - authenticator.Password = password - } - cluster.Authenticator = authenticator - } - - if initialConnectionTimeoutStr, ok := conf["initial_connection_timeout"]; ok { - initialConnectionTimeout, err := strconv.Atoi(initialConnectionTimeoutStr) - if err != nil || initialConnectionTimeout <= 0 { - return nil, fmt.Errorf("'initial_connection_timeout' must be a positive integer") - } - cluster.ConnectTimeout = time.Duration(initialConnectionTimeout) * time.Second - } - - if connTimeoutStr, ok := conf["connection_timeout"]; ok { - connectionTimeout, err := strconv.Atoi(connTimeoutStr) - if err != nil || connectionTimeout <= 0 { - return nil, fmt.Errorf("'connection_timeout' must be a positive integer") - } - cluster.Timeout = time.Duration(connectionTimeout) * time.Second - } - - if err := setupCassandraTLS(conf, cluster); err != nil { - return nil, err - } - - sess, err := cluster.CreateSession() - if err != nil { - return nil, err - } - metrics.MeasureSince([]string{"cassandra", "connect"}, connectStart) - sess.SetConsistency(consistency) - - impl := &CassandraBackend{ - sess: sess, - table: table, - logger: logger, - } - return impl, nil -} - -func setupCassandraTLS(conf map[string]string, cluster *gocql.ClusterConfig) error { - tlsOnStr, ok := conf["tls"] - if !ok { - return nil - } - - tlsOn, err := strconv.Atoi(tlsOnStr) - if err != nil { - return fmt.Errorf("'tls' must be an integer (0 or 1)") - } - - if tlsOn == 0 { - return nil - } - - tlsConfig := &tls.Config{} - if pemBundlePath, ok := conf["pem_bundle_file"]; ok { - pemBundleData, err := ioutil.ReadFile(pemBundlePath) - if err != nil { - return fmt.Errorf("error reading pem bundle from %q: %w", pemBundlePath, err) - } - pemBundle, err := certutil.ParsePEMBundle(string(pemBundleData)) - if err != nil { - return fmt.Errorf("error parsing 'pem_bundle': %w", err) - } - tlsConfig, err = pemBundle.GetTLSConfig(certutil.TLSClient) - if err != nil { - return err - } - } else if pemJSONPath, ok := conf["pem_json_file"]; ok { - pemJSONData, err := ioutil.ReadFile(pemJSONPath) - if err != nil { - return fmt.Errorf("error reading json bundle from %q: %w", pemJSONPath, err) - } - pemJSON, err := certutil.ParsePKIJSON([]byte(pemJSONData)) - if err != nil { - return err - } - tlsConfig, err = pemJSON.GetTLSConfig(certutil.TLSClient) - if err != nil { - return err - } - } - - if tlsSkipVerifyStr, ok := conf["tls_skip_verify"]; ok { - tlsSkipVerify, err := strconv.Atoi(tlsSkipVerifyStr) - if err != nil { - return fmt.Errorf("'tls_skip_verify' must be an integer (0 or 1)") - } - if tlsSkipVerify == 0 { - tlsConfig.InsecureSkipVerify = false - } else { - tlsConfig.InsecureSkipVerify = true - } - } - - if tlsMinVersion, ok := conf["tls_min_version"]; ok { - switch tlsMinVersion { - case "tls10": - tlsConfig.MinVersion = tls.VersionTLS10 - case "tls11": - tlsConfig.MinVersion = tls.VersionTLS11 - case "tls12": - tlsConfig.MinVersion = tls.VersionTLS12 - case "tls13": - tlsConfig.MinVersion = tls.VersionTLS13 - default: - return fmt.Errorf("'tls_min_version' must be one of `tls10`, `tls11`, `tls12` or `tls13`") - } - } - - cluster.SslOpts = &gocql.SslOptions{ - Config: tlsConfig, - EnableHostVerification: !tlsConfig.InsecureSkipVerify, - } - return nil -} - -// bucketName sanitises a bucket name for Cassandra -func (c *CassandraBackend) bucketName(name string) string { - if name == "" { - name = "." - } - return strings.TrimRight(name, "/") -} - -// bucket returns all the prefix buckets the key should be stored at -func (c *CassandraBackend) buckets(key string) []string { - vals := append([]string{""}, physical.Prefixes(key)...) - for i, v := range vals { - vals[i] = c.bucketName(v) - } - return vals -} - -// bucket returns the most specific bucket for the key -func (c *CassandraBackend) bucket(key string) string { - bs := c.buckets(key) - return bs[len(bs)-1] -} - -// Put is used to insert or update an entry -func (c *CassandraBackend) Put(ctx context.Context, entry *physical.Entry) error { - defer metrics.MeasureSince([]string{"cassandra", "put"}, time.Now()) - - // Execute inserts to each key prefix simultaneously - stmt := fmt.Sprintf(`INSERT INTO "%s" (bucket, key, value) VALUES (?, ?, ?)`, c.table) - buckets := c.buckets(entry.Key) - results := make(chan error, len(buckets)) - for i, _bucket := range buckets { - go func(i int, bucket string) { - var value []byte - if i == len(buckets)-1 { - // Only store the full value if this is the leaf bucket where the entry will actually be read - // otherwise this write is just to allow for list operations - value = entry.Value - } - results <- c.sess.Query(stmt, bucket, entry.Key, value).Exec() - }(i, _bucket) - } - for i := 0; i < len(buckets); i++ { - if err := <-results; err != nil { - return err - } - } - return nil -} - -// Get is used to fetch an entry -func (c *CassandraBackend) Get(ctx context.Context, key string) (*physical.Entry, error) { - defer metrics.MeasureSince([]string{"cassandra", "get"}, time.Now()) - - v := []byte(nil) - stmt := fmt.Sprintf(`SELECT value FROM "%s" WHERE bucket = ? AND key = ? LIMIT 1`, c.table) - q := c.sess.Query(stmt, c.bucket(key), key) - if err := q.Scan(&v); err != nil { - if err == gocql.ErrNotFound { - return nil, nil - } - return nil, err - } - - return &physical.Entry{ - Key: key, - Value: v, - }, nil -} - -// Delete is used to permanently delete an entry -func (c *CassandraBackend) Delete(ctx context.Context, key string) error { - defer metrics.MeasureSince([]string{"cassandra", "delete"}, time.Now()) - - stmt := fmt.Sprintf(`DELETE FROM "%s" WHERE bucket = ? AND key = ?`, c.table) - buckets := c.buckets(key) - results := make(chan error, len(buckets)) - - for _, bucket := range buckets { - go func(bucket string) { - results <- c.sess.Query(stmt, bucket, key).Exec() - }(bucket) - } - - for i := 0; i < len(buckets); i++ { - if err := <-results; err != nil { - return err - } - } - return nil -} - -// List is used ot list all the keys under a given -// prefix, up to the next prefix. -func (c *CassandraBackend) List(ctx context.Context, prefix string) ([]string, error) { - defer metrics.MeasureSince([]string{"cassandra", "list"}, time.Now()) - - stmt := fmt.Sprintf(`SELECT key FROM "%s" WHERE bucket = ?`, c.table) - q := c.sess.Query(stmt, c.bucketName(prefix)) - iter := q.Iter() - k, keys := "", []string{} - for iter.Scan(&k) { - // Only return the next "component" (with a trailing slash if it has children) - k = strings.TrimPrefix(k, prefix) - if parts := strings.SplitN(k, "/", 2); len(parts) > 1 { - k = parts[0] + "/" - } else { - k = parts[0] - } - - // Deduplicate; this works because the keys are sorted - if len(keys) > 0 && keys[len(keys)-1] == k { - continue - } - keys = append(keys, k) - } - return keys, iter.Close() -} diff --git a/physical/cassandra/cassandra_test.go b/physical/cassandra/cassandra_test.go deleted file mode 100644 index 3370c3947..000000000 --- a/physical/cassandra/cassandra_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cassandra - -import ( - "os" - "reflect" - "testing" - - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/vault/helper/testhelpers/cassandra" - "github.com/hashicorp/vault/sdk/helper/logging" - "github.com/hashicorp/vault/sdk/physical" -) - -func TestCassandraBackend(t *testing.T) { - if testing.Short() { - t.Skipf("skipping in short mode") - } - if os.Getenv("VAULT_CI_GO_TEST_RACE") != "" { - t.Skip("skipping race test in CI pending https://github.com/gocql/gocql/pull/1474") - } - - host, cleanup := cassandra.PrepareTestContainer(t) - defer cleanup() - - // Run vault tests - logger := logging.NewVaultLogger(log.Debug) - b, err := NewCassandraBackend(map[string]string{ - "hosts": host.ConnectionURL(), - "protocol_version": "3", - "connection_timeout": "5", - "initial_connection_timeout": "5", - "simple_retry_policy_retries": "3", - }, logger) - if err != nil { - t.Fatalf("Failed to create new backend: %v", err) - } - - physical.ExerciseBackend(t, b) - physical.ExerciseBackend_ListPrefix(t, b) -} - -func TestCassandraBackendBuckets(t *testing.T) { - expectations := map[string][]string{ - "": {"."}, - "a": {"."}, - "a/b": {".", "a"}, - "a/b/c/d/e": {".", "a", "a/b", "a/b/c", "a/b/c/d"}, - } - - b := &CassandraBackend{} - for input, expected := range expectations { - actual := b.buckets(input) - if !reflect.DeepEqual(actual, expected) { - t.Errorf("bad: %v expected: %v", actual, expected) - } - } -}