remove cassandra
This commit is contained in:
parent
37d5afd3ed
commit
f33fd378bb
@ -10,7 +10,6 @@
|
|||||||
/builtin/credential/okta/ @hashicorp/vault-ecosystem
|
/builtin/credential/okta/ @hashicorp/vault-ecosystem
|
||||||
|
|
||||||
# Secrets engines (pki, ssh, totp and transit omitted)
|
# Secrets engines (pki, ssh, totp and transit omitted)
|
||||||
/builtin/logical/cassandra/ @hashicorp/vault-ecosystem
|
|
||||||
/builtin/logical/consul/ @hashicorp/vault-ecosystem
|
/builtin/logical/consul/ @hashicorp/vault-ecosystem
|
||||||
/builtin/logical/database/ @hashicorp/vault-ecosystem
|
/builtin/logical/database/ @hashicorp/vault-ecosystem
|
||||||
/builtin/logical/mysql/ @hashicorp/vault-ecosystem
|
/builtin/logical/mysql/ @hashicorp/vault-ecosystem
|
||||||
|
@ -42,7 +42,6 @@ import (
|
|||||||
logicalDb "github.com/hashicorp/vault/builtin/logical/database"
|
logicalDb "github.com/hashicorp/vault/builtin/logical/database"
|
||||||
|
|
||||||
physAerospike "github.com/hashicorp/vault/physical/aerospike"
|
physAerospike "github.com/hashicorp/vault/physical/aerospike"
|
||||||
physCassandra "github.com/hashicorp/vault/physical/cassandra"
|
|
||||||
physCockroachDB "github.com/hashicorp/vault/physical/cockroachdb"
|
physCockroachDB "github.com/hashicorp/vault/physical/cockroachdb"
|
||||||
physConsul "github.com/hashicorp/vault/physical/consul"
|
physConsul "github.com/hashicorp/vault/physical/consul"
|
||||||
physFoundationDB "github.com/hashicorp/vault/physical/foundationdb"
|
physFoundationDB "github.com/hashicorp/vault/physical/foundationdb"
|
||||||
@ -171,7 +170,6 @@ var (
|
|||||||
|
|
||||||
physicalBackends = map[string]physical.Factory{
|
physicalBackends = map[string]physical.Factory{
|
||||||
"aerospike": physAerospike.NewAerospikeBackend,
|
"aerospike": physAerospike.NewAerospikeBackend,
|
||||||
"cassandra": physCassandra.NewCassandraBackend,
|
|
||||||
"cockroachdb": physCockroachDB.NewCockroachDBBackend,
|
"cockroachdb": physCockroachDB.NewCockroachDBBackend,
|
||||||
"consul": physConsul.NewConsulBackend,
|
"consul": physConsul.NewConsulBackend,
|
||||||
"file_transactional": physFile.NewTransactionalFileBackend,
|
"file_transactional": physFile.NewTransactionalFileBackend,
|
||||||
|
@ -1,366 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package cassandra
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
|
||||||
"github.com/gocql/gocql"
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/certutil"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CassandraBackend is a physical backend that stores data in Cassandra.
|
|
||||||
type CassandraBackend struct {
|
|
||||||
sess *gocql.Session
|
|
||||||
table string
|
|
||||||
|
|
||||||
logger log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify CassandraBackend satisfies the correct interfaces
|
|
||||||
var _ physical.Backend = (*CassandraBackend)(nil)
|
|
||||||
|
|
||||||
// NewCassandraBackend constructs a Cassandra backend using a pre-existing
|
|
||||||
// keyspace and table.
|
|
||||||
func NewCassandraBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
|
||||||
splitArray := func(v string) []string {
|
|
||||||
return strings.FieldsFunc(v, func(r rune) bool {
|
|
||||||
return r == ','
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
hosts = splitArray(conf["hosts"])
|
|
||||||
port = 9042
|
|
||||||
explicitPort = false
|
|
||||||
keyspace = conf["keyspace"]
|
|
||||||
table = conf["table"]
|
|
||||||
consistency = gocql.LocalQuorum
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
hosts = []string{"localhost"}
|
|
||||||
}
|
|
||||||
for i, hp := range hosts {
|
|
||||||
h, ps, err := net.SplitHostPort(hp)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p, err := strconv.Atoi(ps)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if explicitPort && p != port {
|
|
||||||
return nil, fmt.Errorf("all hosts must have the same port")
|
|
||||||
}
|
|
||||||
hosts[i], port = h, p
|
|
||||||
explicitPort = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if keyspace == "" {
|
|
||||||
keyspace = "vault"
|
|
||||||
}
|
|
||||||
if table == "" {
|
|
||||||
table = "entries"
|
|
||||||
}
|
|
||||||
if cs, ok := conf["consistency"]; ok {
|
|
||||||
switch cs {
|
|
||||||
case "ANY":
|
|
||||||
consistency = gocql.Any
|
|
||||||
case "ONE":
|
|
||||||
consistency = gocql.One
|
|
||||||
case "TWO":
|
|
||||||
consistency = gocql.Two
|
|
||||||
case "THREE":
|
|
||||||
consistency = gocql.Three
|
|
||||||
case "QUORUM":
|
|
||||||
consistency = gocql.Quorum
|
|
||||||
case "ALL":
|
|
||||||
consistency = gocql.All
|
|
||||||
case "LOCAL_QUORUM":
|
|
||||||
consistency = gocql.LocalQuorum
|
|
||||||
case "EACH_QUORUM":
|
|
||||||
consistency = gocql.EachQuorum
|
|
||||||
case "LOCAL_ONE":
|
|
||||||
consistency = gocql.LocalOne
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("'consistency' must be one of {ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
connectStart := time.Now()
|
|
||||||
cluster := gocql.NewCluster(hosts...)
|
|
||||||
cluster.Port = port
|
|
||||||
cluster.Keyspace = keyspace
|
|
||||||
|
|
||||||
if retryCountStr, ok := conf["simple_retry_policy_retries"]; ok {
|
|
||||||
retryCount, err := strconv.Atoi(retryCountStr)
|
|
||||||
if err != nil || retryCount <= 0 {
|
|
||||||
return nil, fmt.Errorf("'simple_retry_policy_retries' must be a positive integer")
|
|
||||||
}
|
|
||||||
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retryCount}
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster.ProtoVersion = 2
|
|
||||||
if protoVersionStr, ok := conf["protocol_version"]; ok {
|
|
||||||
protoVersion, err := strconv.Atoi(protoVersionStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("'protocol_version' must be an integer")
|
|
||||||
}
|
|
||||||
cluster.ProtoVersion = protoVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
if username, ok := conf["username"]; ok {
|
|
||||||
if cluster.ProtoVersion < 2 {
|
|
||||||
return nil, fmt.Errorf("authentication is not supported with protocol version < 2")
|
|
||||||
}
|
|
||||||
authenticator := gocql.PasswordAuthenticator{Username: username}
|
|
||||||
if password, ok := conf["password"]; ok {
|
|
||||||
authenticator.Password = password
|
|
||||||
}
|
|
||||||
cluster.Authenticator = authenticator
|
|
||||||
}
|
|
||||||
|
|
||||||
if initialConnectionTimeoutStr, ok := conf["initial_connection_timeout"]; ok {
|
|
||||||
initialConnectionTimeout, err := strconv.Atoi(initialConnectionTimeoutStr)
|
|
||||||
if err != nil || initialConnectionTimeout <= 0 {
|
|
||||||
return nil, fmt.Errorf("'initial_connection_timeout' must be a positive integer")
|
|
||||||
}
|
|
||||||
cluster.ConnectTimeout = time.Duration(initialConnectionTimeout) * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
if connTimeoutStr, ok := conf["connection_timeout"]; ok {
|
|
||||||
connectionTimeout, err := strconv.Atoi(connTimeoutStr)
|
|
||||||
if err != nil || connectionTimeout <= 0 {
|
|
||||||
return nil, fmt.Errorf("'connection_timeout' must be a positive integer")
|
|
||||||
}
|
|
||||||
cluster.Timeout = time.Duration(connectionTimeout) * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := setupCassandraTLS(conf, cluster); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sess, err := cluster.CreateSession()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
metrics.MeasureSince([]string{"cassandra", "connect"}, connectStart)
|
|
||||||
sess.SetConsistency(consistency)
|
|
||||||
|
|
||||||
impl := &CassandraBackend{
|
|
||||||
sess: sess,
|
|
||||||
table: table,
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
return impl, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupCassandraTLS(conf map[string]string, cluster *gocql.ClusterConfig) error {
|
|
||||||
tlsOnStr, ok := conf["tls"]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tlsOn, err := strconv.Atoi(tlsOnStr)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("'tls' must be an integer (0 or 1)")
|
|
||||||
}
|
|
||||||
|
|
||||||
if tlsOn == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tlsConfig := &tls.Config{}
|
|
||||||
if pemBundlePath, ok := conf["pem_bundle_file"]; ok {
|
|
||||||
pemBundleData, err := ioutil.ReadFile(pemBundlePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error reading pem bundle from %q: %w", pemBundlePath, err)
|
|
||||||
}
|
|
||||||
pemBundle, err := certutil.ParsePEMBundle(string(pemBundleData))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error parsing 'pem_bundle': %w", err)
|
|
||||||
}
|
|
||||||
tlsConfig, err = pemBundle.GetTLSConfig(certutil.TLSClient)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else if pemJSONPath, ok := conf["pem_json_file"]; ok {
|
|
||||||
pemJSONData, err := ioutil.ReadFile(pemJSONPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error reading json bundle from %q: %w", pemJSONPath, err)
|
|
||||||
}
|
|
||||||
pemJSON, err := certutil.ParsePKIJSON([]byte(pemJSONData))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tlsConfig, err = pemJSON.GetTLSConfig(certutil.TLSClient)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tlsSkipVerifyStr, ok := conf["tls_skip_verify"]; ok {
|
|
||||||
tlsSkipVerify, err := strconv.Atoi(tlsSkipVerifyStr)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("'tls_skip_verify' must be an integer (0 or 1)")
|
|
||||||
}
|
|
||||||
if tlsSkipVerify == 0 {
|
|
||||||
tlsConfig.InsecureSkipVerify = false
|
|
||||||
} else {
|
|
||||||
tlsConfig.InsecureSkipVerify = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tlsMinVersion, ok := conf["tls_min_version"]; ok {
|
|
||||||
switch tlsMinVersion {
|
|
||||||
case "tls10":
|
|
||||||
tlsConfig.MinVersion = tls.VersionTLS10
|
|
||||||
case "tls11":
|
|
||||||
tlsConfig.MinVersion = tls.VersionTLS11
|
|
||||||
case "tls12":
|
|
||||||
tlsConfig.MinVersion = tls.VersionTLS12
|
|
||||||
case "tls13":
|
|
||||||
tlsConfig.MinVersion = tls.VersionTLS13
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("'tls_min_version' must be one of `tls10`, `tls11`, `tls12` or `tls13`")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster.SslOpts = &gocql.SslOptions{
|
|
||||||
Config: tlsConfig,
|
|
||||||
EnableHostVerification: !tlsConfig.InsecureSkipVerify,
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// bucketName sanitises a bucket name for Cassandra
|
|
||||||
func (c *CassandraBackend) bucketName(name string) string {
|
|
||||||
if name == "" {
|
|
||||||
name = "."
|
|
||||||
}
|
|
||||||
return strings.TrimRight(name, "/")
|
|
||||||
}
|
|
||||||
|
|
||||||
// bucket returns all the prefix buckets the key should be stored at
|
|
||||||
func (c *CassandraBackend) buckets(key string) []string {
|
|
||||||
vals := append([]string{""}, physical.Prefixes(key)...)
|
|
||||||
for i, v := range vals {
|
|
||||||
vals[i] = c.bucketName(v)
|
|
||||||
}
|
|
||||||
return vals
|
|
||||||
}
|
|
||||||
|
|
||||||
// bucket returns the most specific bucket for the key
|
|
||||||
func (c *CassandraBackend) bucket(key string) string {
|
|
||||||
bs := c.buckets(key)
|
|
||||||
return bs[len(bs)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put is used to insert or update an entry
|
|
||||||
func (c *CassandraBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
|
||||||
defer metrics.MeasureSince([]string{"cassandra", "put"}, time.Now())
|
|
||||||
|
|
||||||
// Execute inserts to each key prefix simultaneously
|
|
||||||
stmt := fmt.Sprintf(`INSERT INTO "%s" (bucket, key, value) VALUES (?, ?, ?)`, c.table)
|
|
||||||
buckets := c.buckets(entry.Key)
|
|
||||||
results := make(chan error, len(buckets))
|
|
||||||
for i, _bucket := range buckets {
|
|
||||||
go func(i int, bucket string) {
|
|
||||||
var value []byte
|
|
||||||
if i == len(buckets)-1 {
|
|
||||||
// Only store the full value if this is the leaf bucket where the entry will actually be read
|
|
||||||
// otherwise this write is just to allow for list operations
|
|
||||||
value = entry.Value
|
|
||||||
}
|
|
||||||
results <- c.sess.Query(stmt, bucket, entry.Key, value).Exec()
|
|
||||||
}(i, _bucket)
|
|
||||||
}
|
|
||||||
for i := 0; i < len(buckets); i++ {
|
|
||||||
if err := <-results; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get is used to fetch an entry
|
|
||||||
func (c *CassandraBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"cassandra", "get"}, time.Now())
|
|
||||||
|
|
||||||
v := []byte(nil)
|
|
||||||
stmt := fmt.Sprintf(`SELECT value FROM "%s" WHERE bucket = ? AND key = ? LIMIT 1`, c.table)
|
|
||||||
q := c.sess.Query(stmt, c.bucket(key), key)
|
|
||||||
if err := q.Scan(&v); err != nil {
|
|
||||||
if err == gocql.ErrNotFound {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &physical.Entry{
|
|
||||||
Key: key,
|
|
||||||
Value: v,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete is used to permanently delete an entry
|
|
||||||
func (c *CassandraBackend) Delete(ctx context.Context, key string) error {
|
|
||||||
defer metrics.MeasureSince([]string{"cassandra", "delete"}, time.Now())
|
|
||||||
|
|
||||||
stmt := fmt.Sprintf(`DELETE FROM "%s" WHERE bucket = ? AND key = ?`, c.table)
|
|
||||||
buckets := c.buckets(key)
|
|
||||||
results := make(chan error, len(buckets))
|
|
||||||
|
|
||||||
for _, bucket := range buckets {
|
|
||||||
go func(bucket string) {
|
|
||||||
results <- c.sess.Query(stmt, bucket, key).Exec()
|
|
||||||
}(bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(buckets); i++ {
|
|
||||||
if err := <-results; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// List is used ot list all the keys under a given
|
|
||||||
// prefix, up to the next prefix.
|
|
||||||
func (c *CassandraBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"cassandra", "list"}, time.Now())
|
|
||||||
|
|
||||||
stmt := fmt.Sprintf(`SELECT key FROM "%s" WHERE bucket = ?`, c.table)
|
|
||||||
q := c.sess.Query(stmt, c.bucketName(prefix))
|
|
||||||
iter := q.Iter()
|
|
||||||
k, keys := "", []string{}
|
|
||||||
for iter.Scan(&k) {
|
|
||||||
// Only return the next "component" (with a trailing slash if it has children)
|
|
||||||
k = strings.TrimPrefix(k, prefix)
|
|
||||||
if parts := strings.SplitN(k, "/", 2); len(parts) > 1 {
|
|
||||||
k = parts[0] + "/"
|
|
||||||
} else {
|
|
||||||
k = parts[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deduplicate; this works because the keys are sorted
|
|
||||||
if len(keys) > 0 && keys[len(keys)-1] == k {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
return keys, iter.Close()
|
|
||||||
}
|
|
@ -1,60 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package cassandra
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/cassandra"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCassandraBackend(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skipf("skipping in short mode")
|
|
||||||
}
|
|
||||||
if os.Getenv("VAULT_CI_GO_TEST_RACE") != "" {
|
|
||||||
t.Skip("skipping race test in CI pending https://github.com/gocql/gocql/pull/1474")
|
|
||||||
}
|
|
||||||
|
|
||||||
host, cleanup := cassandra.PrepareTestContainer(t)
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
// Run vault tests
|
|
||||||
logger := logging.NewVaultLogger(log.Debug)
|
|
||||||
b, err := NewCassandraBackend(map[string]string{
|
|
||||||
"hosts": host.ConnectionURL(),
|
|
||||||
"protocol_version": "3",
|
|
||||||
"connection_timeout": "5",
|
|
||||||
"initial_connection_timeout": "5",
|
|
||||||
"simple_retry_policy_retries": "3",
|
|
||||||
}, logger)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create new backend: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
physical.ExerciseBackend(t, b)
|
|
||||||
physical.ExerciseBackend_ListPrefix(t, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCassandraBackendBuckets(t *testing.T) {
|
|
||||||
expectations := map[string][]string{
|
|
||||||
"": {"."},
|
|
||||||
"a": {"."},
|
|
||||||
"a/b": {".", "a"},
|
|
||||||
"a/b/c/d/e": {".", "a", "a/b", "a/b/c", "a/b/c/d"},
|
|
||||||
}
|
|
||||||
|
|
||||||
b := &CassandraBackend{}
|
|
||||||
for input, expected := range expectations {
|
|
||||||
actual := b.buckets(input)
|
|
||||||
if !reflect.DeepEqual(actual, expected) {
|
|
||||||
t.Errorf("bad: %v expected: %v", actual, expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user