1
0

Sync up Agent and API's renewers. (#7733)

* Sync up Agent and API's renewers.

This introduces a new type, LifetimeWatcher, which can handle both
renewable and non-renewable secrets, modeled after the version in Agent.
It allows the user to select behavior, with the new style being the
default when calling Start(), and old style if using the legacy Renew()
call.

No tests have been modified (except for reflect issues) and no other
code has been modified to make sure the changes are backwards
compatible.

Once this is accepted I'll pull the Agent version out.

* Move compat flags to NewRenewer

* Port agent to shared lifetime watcher lib
This commit is contained in:
Jeff Mitchell 2019-10-28 20:28:59 -04:00 committed by Brian Kassouf
parent eb1f426285
commit 79ae63e9ae
8 changed files with 428 additions and 768 deletions

View File

@ -5,6 +5,7 @@ go 1.12
replace github.com/hashicorp/vault/sdk => ../sdk
require (
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31
github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/go-cleanhttp v0.5.1
github.com/hashicorp/go-multierror v1.0.0

View File

@ -10,6 +10,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31 h1:28FVBuwkwowZMjbA7M0wXsI6t3PYulRTMio3SO+eKCM=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -37,7 +38,6 @@ github.com/hashicorp/go-rootcerts v1.0.1/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR3
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.2-0.20191001231223-f32f5fe8d6a8/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=

384
api/lifetime_watcher.go Normal file
View File

@ -0,0 +1,384 @@
package api
import (
"errors"
"math/rand"
"sync"
"time"
)
var (
ErrLifetimeWatcherMissingInput = errors.New("missing input")
ErrLifetimeWatcherMissingSecret = errors.New("missing secret")
ErrLifetimeWatcherNotRenewable = errors.New("secret is not renewable")
ErrLifetimeWatcherNoSecretData = errors.New("returned empty secret data")
// Deprecated; kept for compatibility
ErrRenewerMissingInput = errors.New("missing input to renewer")
ErrRenewerMissingSecret = errors.New("missing secret to renew")
ErrRenewerNotRenewable = errors.New("secret is not renewable")
ErrRenewerNoSecretData = errors.New("returned empty secret data")
// DefaultLifetimeWatcherRenewBuffer is the default size of the buffer for renew
// messages on the channel.
DefaultLifetimeWatcherRenewBuffer = 5
// Deprecated: kept for backwards compatibility
DefaultRenewerRenewBuffer = 5
)
type RenewBehavior uint
const (
// RenewBehaviorIgnoreErrors means we will attempt to keep renewing until
// we hit the lifetime threshold. It also ignores errors stemming from
// passing a non-renewable lease in. In practice, this means you simply
// reauthenticate/refetch credentials when the watcher exits. This is the
// default.
RenewBehaviorIgnoreErrors RenewBehavior = iota
// RenewBehaviorRenewDisabled turns off renewal attempts entirely. This
// allows you to simply watch lifetime and have the watcher return at a
// reasonable threshold without actually making Vault calls.
RenewBehaviorRenewDisabled
// RenewBehaviorErrorOnErrors is the "legacy" behavior which always exits
// on some kind of error
RenewBehaviorErrorOnErrors
)
// LifetimeWatcher is a process for watching lifetime of a secret.
//
// watcher, err := client.NewLifetimeWatcher(&LifetimeWatcherInput{
// Secret: mySecret,
// })
// go watcher.Start()
// defer watcher.Stop()
//
// for {
// select {
// case err := <-watcher.DoneCh():
// if err != nil {
// log.Fatal(err)
// }
//
// // Renewal is now over
// case renewal := <-watcher.RenewCh():
// log.Printf("Successfully renewed: %#v", renewal)
// }
// }
//
//
// `DoneCh` will return if renewal fails, or if the remaining lease duration is
// under a built-in threshold and either renewing is not extending it or
// renewing is disabled. In both cases, the caller should attempt a re-read of
// the secret. Clients should check the return value of the channel to see if
// renewal was successful.
type LifetimeWatcher struct {
l sync.Mutex
client *Client
secret *Secret
grace time.Duration
random *rand.Rand
increment int
doneCh chan error
renewCh chan *RenewOutput
renewBehavior RenewBehavior
stopped bool
stopCh chan struct{}
errLifetimeWatcherNotRenewable error
errLifetimeWatcherNoSecretData error
}
// LifetimeWatcherInput is used as input to the renew function.
type LifetimeWatcherInput struct {
// Secret is the secret to renew
Secret *Secret
// DEPRECATED: this does not do anything.
Grace time.Duration
// Rand is the randomizer to use for underlying randomization. If not
// provided, one will be generated and seeded automatically. If provided, it
// is assumed to have already been seeded.
Rand *rand.Rand
// RenewBuffer is the size of the buffered channel where renew messages are
// dispatched.
RenewBuffer int
// The new TTL, in seconds, that should be set on the lease. The TTL set
// here may or may not be honored by the vault server, based on Vault
// configuration or any associated max TTL values.
Increment int
// RenewBehavior controls what happens when a renewal errors or the
// passed-in secret is not renewable.
RenewBehavior RenewBehavior
}
// RenewOutput is the metadata returned to the client (if it's listening) to
// renew messages.
type RenewOutput struct {
// RenewedAt is the timestamp when the renewal took place (UTC).
RenewedAt time.Time
// Secret is the underlying renewal data. It's the same struct as all data
// that is returned from Vault, but since this is renewal data, it will not
// usually include the secret itself.
Secret *Secret
}
// NewLifetimeWatcher creates a new renewer from the given input.
func (c *Client) NewLifetimeWatcher(i *LifetimeWatcherInput) (*LifetimeWatcher, error) {
if i == nil {
return nil, ErrLifetimeWatcherMissingInput
}
secret := i.Secret
if secret == nil {
return nil, ErrLifetimeWatcherMissingSecret
}
random := i.Rand
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
renewBuffer := i.RenewBuffer
if renewBuffer == 0 {
renewBuffer = DefaultLifetimeWatcherRenewBuffer
}
return &LifetimeWatcher{
client: c,
secret: secret,
increment: i.Increment,
random: random,
doneCh: make(chan error, 1),
renewCh: make(chan *RenewOutput, renewBuffer),
renewBehavior: i.RenewBehavior,
stopped: false,
stopCh: make(chan struct{}),
errLifetimeWatcherNotRenewable: ErrLifetimeWatcherNotRenewable,
errLifetimeWatcherNoSecretData: ErrLifetimeWatcherNoSecretData,
}, nil
}
// Deprecated: exists only for backwards compatibility. Calls
// NewLifetimeWatcher, and sets compatibility flags.
func (c *Client) NewRenewer(i *LifetimeWatcherInput) (*LifetimeWatcher, error) {
if i == nil {
return nil, ErrRenewerMissingInput
}
secret := i.Secret
if secret == nil {
return nil, ErrRenewerMissingSecret
}
renewer, err := c.NewLifetimeWatcher(i)
if err != nil {
return nil, err
}
renewer.renewBehavior = RenewBehaviorErrorOnErrors
renewer.errLifetimeWatcherNotRenewable = ErrRenewerNotRenewable
renewer.errLifetimeWatcherNoSecretData = ErrRenewerNoSecretData
return renewer, err
}
// DoneCh returns the channel where the renewer will publish when renewal stops.
// If there is an error, this will be an error.
func (r *LifetimeWatcher) DoneCh() <-chan error {
return r.doneCh
}
// RenewCh is a channel that receives a message when a successful renewal takes
// place and includes metadata about the renewal.
func (r *LifetimeWatcher) RenewCh() <-chan *RenewOutput {
return r.renewCh
}
// Stop stops the renewer.
func (r *LifetimeWatcher) Stop() {
r.l.Lock()
defer r.l.Unlock()
if !r.stopped {
close(r.stopCh)
r.stopped = true
}
}
// Start starts a background process for watching the lifetime of this secret.
// If renewal is enabled, when the secret has auth data, this attempts to renew
// the auth (token); When the secret has a lease, this attempts to renew the
// lease.
func (r *LifetimeWatcher) Start() {
r.doneCh <- r.doRenew()
}
// Renew is for comnpatibility with the legacy api.Renewer. Calling Renew
// simply chains to Start.
func (r *LifetimeWatcher) Renew() {
r.Start()
}
// renewAuth is a helper for renewing authentication.
func (r *LifetimeWatcher) doRenew() error {
var nonRenewable bool
var tokenMode bool
var initLeaseDuration int
var credString string
var renewFunc func(string, int) (*Secret, error)
switch {
case r.secret.Auth != nil:
tokenMode = true
nonRenewable = !r.secret.Auth.Renewable
initLeaseDuration = r.secret.Auth.LeaseDuration
credString = r.secret.Auth.ClientToken
renewFunc = r.client.Auth().Token().RenewTokenAsSelf
default:
nonRenewable = !r.secret.Renewable
initLeaseDuration = r.secret.LeaseDuration
credString = r.secret.LeaseID
renewFunc = r.client.Sys().Renew
}
if credString == "" ||
(nonRenewable && r.renewBehavior == RenewBehaviorErrorOnErrors) {
return r.errLifetimeWatcherNotRenewable
}
initialTime := time.Now()
priorDuration := time.Duration(initLeaseDuration) * time.Second
r.calculateGrace(priorDuration)
for {
// Check if we are stopped.
select {
case <-r.stopCh:
return nil
default:
}
var leaseDuration time.Duration
fallbackLeaseDuration := initialTime.Add(priorDuration).Sub(time.Now())
switch {
case nonRenewable || r.renewBehavior == RenewBehaviorRenewDisabled:
// Can't or won't renew, just keep the same expiration so we exit
// when it's reauthentication time
leaseDuration = fallbackLeaseDuration
default:
// Renew the token
renewal, err := renewFunc(credString, r.increment)
if err != nil || renewal == nil || (tokenMode && renewal.Auth == nil) {
if r.renewBehavior == RenewBehaviorErrorOnErrors {
if err != nil {
return err
}
if renewal == nil || (tokenMode && renewal.Auth == nil) {
return r.errLifetimeWatcherNoSecretData
}
}
leaseDuration = fallbackLeaseDuration
break
}
// Push a message that a renewal took place.
select {
case r.renewCh <- &RenewOutput{time.Now().UTC(), renewal}:
default:
}
// Possibly error if we are not renewable
if ((tokenMode && !renewal.Auth.Renewable) || (!tokenMode && !renewal.Renewable)) &&
r.renewBehavior == RenewBehaviorErrorOnErrors {
return r.errLifetimeWatcherNotRenewable
}
// Grab the lease duration
newDuration := renewal.LeaseDuration
if tokenMode {
newDuration = renewal.Auth.LeaseDuration
}
leaseDuration = time.Duration(newDuration) * time.Second
}
// We keep evaluating a new grace period so long as the lease is
// extending. Once it stops extending, we've hit the max and need to
// rely on the grace duration.
if leaseDuration > priorDuration {
r.calculateGrace(leaseDuration)
}
priorDuration = leaseDuration
// The sleep duration is set to 2/3 of the current lease duration plus
// 1/3 of the current grace period, which adds jitter.
sleepDuration := time.Duration(float64(leaseDuration.Nanoseconds())*2/3 + float64(r.grace.Nanoseconds())/3)
// If we are within grace, return now; or, if the amount of time we
// would sleep would land us in the grace period. This helps with short
// tokens; for example, you don't want a current lease duration of 4
// seconds, a grace period of 3 seconds, and end up sleeping for more
// than three of those seconds and having a very small budget of time
// to renew.
if leaseDuration <= r.grace || leaseDuration-sleepDuration <= r.grace {
return nil
}
select {
case <-r.stopCh:
return nil
case <-time.After(sleepDuration):
continue
}
}
}
// sleepDuration calculates the time to sleep given the base lease duration. The
// base is the resulting lease duration. It will be reduced to 1/3 and
// multiplied by a random float between 0.0 and 1.0. This extra randomness
// prevents multiple clients from all trying to renew simultaneously.
func (r *LifetimeWatcher) sleepDuration(base time.Duration) time.Duration {
sleep := float64(base)
// Renew at 1/3 the remaining lease. This will give us an opportunity to retry
// at least one more time should the first renewal fail.
sleep = sleep / 3.0
// Use a randomness so many clients do not hit Vault simultaneously.
sleep = sleep * (r.random.Float64() + 1) / 2.0
return time.Duration(sleep)
}
// calculateGrace calculates the grace period based on a reasonable set of
// assumptions given the total lease time; it also adds some jitter to not have
// clients be in sync.
func (r *LifetimeWatcher) calculateGrace(leaseDuration time.Duration) {
if leaseDuration == 0 {
r.grace = 0
return
}
leaseNanos := float64(leaseDuration.Nanoseconds())
jitterMax := 0.1 * leaseNanos
// For a given lease duration, we want to allow 80-90% of that to elapse,
// so the remaining amount is the grace period
r.grace = time.Duration(jitterMax) + time.Duration(uint64(r.random.Int63())%uint64(jitterMax))
}
type Renewer = LifetimeWatcher
type RenewerInput = LifetimeWatcherInput

View File

@ -1,349 +0,0 @@
package api
import (
"errors"
"math/rand"
"sync"
"time"
)
var (
ErrRenewerMissingInput = errors.New("missing input to renewer")
ErrRenewerMissingSecret = errors.New("missing secret to renew")
ErrRenewerNotRenewable = errors.New("secret is not renewable")
ErrRenewerNoSecretData = errors.New("returned empty secret data")
// DefaultRenewerRenewBuffer is the default size of the buffer for renew
// messages on the channel.
DefaultRenewerRenewBuffer = 5
)
// Renewer is a process for renewing a secret.
//
// renewer, err := client.NewRenewer(&RenewerInput{
// Secret: mySecret,
// })
// go renewer.Renew()
// defer renewer.Stop()
//
// for {
// select {
// case err := <-renewer.DoneCh():
// if err != nil {
// log.Fatal(err)
// }
//
// // Renewal is now over
// case renewal := <-renewer.RenewCh():
// log.Printf("Successfully renewed: %#v", renewal)
// }
// }
//
//
// The `DoneCh` will return if renewal fails or if the remaining lease duration
// after a renewal is less than or equal to the grace (in number of seconds). In
// both cases, the caller should attempt a re-read of the secret. Clients should
// check the return value of the channel to see if renewal was successful.
type Renewer struct {
l sync.Mutex
client *Client
secret *Secret
grace time.Duration
random *rand.Rand
increment int
doneCh chan error
renewCh chan *RenewOutput
stopped bool
stopCh chan struct{}
}
// RenewerInput is used as input to the renew function.
type RenewerInput struct {
// Secret is the secret to renew
Secret *Secret
// DEPRECATED: this does not do anything.
Grace time.Duration
// Rand is the randomizer to use for underlying randomization. If not
// provided, one will be generated and seeded automatically. If provided, it
// is assumed to have already been seeded.
Rand *rand.Rand
// RenewBuffer is the size of the buffered channel where renew messages are
// dispatched.
RenewBuffer int
// The new TTL, in seconds, that should be set on the lease. The TTL set
// here may or may not be honored by the vault server, based on Vault
// configuration or any associated max TTL values.
Increment int
}
// RenewOutput is the metadata returned to the client (if it's listening) to
// renew messages.
type RenewOutput struct {
// RenewedAt is the timestamp when the renewal took place (UTC).
RenewedAt time.Time
// Secret is the underlying renewal data. It's the same struct as all data
// that is returned from Vault, but since this is renewal data, it will not
// usually include the secret itself.
Secret *Secret
}
// NewRenewer creates a new renewer from the given input.
func (c *Client) NewRenewer(i *RenewerInput) (*Renewer, error) {
if i == nil {
return nil, ErrRenewerMissingInput
}
secret := i.Secret
if secret == nil {
return nil, ErrRenewerMissingSecret
}
random := i.Rand
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
renewBuffer := i.RenewBuffer
if renewBuffer == 0 {
renewBuffer = DefaultRenewerRenewBuffer
}
return &Renewer{
client: c,
secret: secret,
increment: i.Increment,
random: random,
doneCh: make(chan error, 1),
renewCh: make(chan *RenewOutput, renewBuffer),
stopped: false,
stopCh: make(chan struct{}),
}, nil
}
// DoneCh returns the channel where the renewer will publish when renewal stops.
// If there is an error, this will be an error.
func (r *Renewer) DoneCh() <-chan error {
return r.doneCh
}
// RenewCh is a channel that receives a message when a successful renewal takes
// place and includes metadata about the renewal.
func (r *Renewer) RenewCh() <-chan *RenewOutput {
return r.renewCh
}
// Stop stops the renewer.
func (r *Renewer) Stop() {
r.l.Lock()
if !r.stopped {
close(r.stopCh)
r.stopped = true
}
r.l.Unlock()
}
// Renew starts a background process for renewing this secret. When the secret
// has auth data, this attempts to renew the auth (token). When the secret has
// a lease, this attempts to renew the lease.
func (r *Renewer) Renew() {
var result error
if r.secret.Auth != nil {
result = r.renewAuth()
} else {
result = r.renewLease()
}
r.doneCh <- result
}
// renewAuth is a helper for renewing authentication.
func (r *Renewer) renewAuth() error {
if !r.secret.Auth.Renewable || r.secret.Auth.ClientToken == "" {
return ErrRenewerNotRenewable
}
priorDuration := time.Duration(r.secret.Auth.LeaseDuration) * time.Second
r.calculateGrace(priorDuration)
client, token := r.client, r.secret.Auth.ClientToken
for {
// Check if we are stopped.
select {
case <-r.stopCh:
return nil
default:
}
// Renew the auth.
renewal, err := client.Auth().Token().RenewTokenAsSelf(token, r.increment)
if err != nil {
return err
}
// Push a message that a renewal took place.
select {
case r.renewCh <- &RenewOutput{time.Now().UTC(), renewal}:
default:
}
// Somehow, sometimes, this happens.
if renewal == nil || renewal.Auth == nil {
return ErrRenewerNoSecretData
}
// Do nothing if we are not renewable
if !renewal.Auth.Renewable {
return ErrRenewerNotRenewable
}
// Grab the lease duration
leaseDuration := time.Duration(renewal.Auth.LeaseDuration) * time.Second
// We keep evaluating a new grace period so long as the lease is
// extending. Once it stops extending, we've hit the max and need to
// rely on the grace duration.
if leaseDuration > priorDuration {
r.calculateGrace(leaseDuration)
}
priorDuration = leaseDuration
// The sleep duration is set to 2/3 of the current lease duration plus
// 1/3 of the current grace period, which adds jitter.
sleepDuration := time.Duration(float64(leaseDuration.Nanoseconds())*2/3 + float64(r.grace.Nanoseconds())/3)
// If we are within grace, return now; or, if the amount of time we
// would sleep would land us in the grace period. This helps with short
// tokens; for example, you don't want a current lease duration of 4
// seconds, a grace period of 3 seconds, and end up sleeping for more
// than three of those seconds and having a very small budget of time
// to renew.
if leaseDuration <= r.grace || leaseDuration-sleepDuration <= r.grace {
return nil
}
select {
case <-r.stopCh:
return nil
case <-time.After(sleepDuration):
continue
}
}
}
// renewLease is a helper for renewing a lease.
func (r *Renewer) renewLease() error {
if !r.secret.Renewable || r.secret.LeaseID == "" {
return ErrRenewerNotRenewable
}
priorDuration := time.Duration(r.secret.LeaseDuration) * time.Second
r.calculateGrace(priorDuration)
client, leaseID := r.client, r.secret.LeaseID
for {
// Check if we are stopped.
select {
case <-r.stopCh:
return nil
default:
}
// Renew the lease.
renewal, err := client.Sys().Renew(leaseID, r.increment)
if err != nil {
return err
}
// Push a message that a renewal took place.
select {
case r.renewCh <- &RenewOutput{time.Now().UTC(), renewal}:
default:
}
// Somehow, sometimes, this happens.
if renewal == nil {
return ErrRenewerNoSecretData
}
// Do nothing if we are not renewable
if !renewal.Renewable {
return ErrRenewerNotRenewable
}
// Grab the lease duration
leaseDuration := time.Duration(renewal.LeaseDuration) * time.Second
// We keep evaluating a new grace period so long as the lease is
// extending. Once it stops extending, we've hit the max and need to
// rely on the grace duration.
if leaseDuration > priorDuration {
r.calculateGrace(leaseDuration)
}
priorDuration = leaseDuration
// The sleep duration is set to 2/3 of the current lease duration plus
// 1/3 of the current grace period, which adds jitter.
sleepDuration := time.Duration(float64(leaseDuration.Nanoseconds())*2/3 + float64(r.grace.Nanoseconds())/3)
// If we are within grace, return now; or, if the amount of time we
// would sleep would land us in the grace period. This helps with short
// tokens; for example, you don't want a current lease duration of 4
// seconds, a grace period of 3 seconds, and end up sleeping for more
// than three of those seconds and having a very small budget of time
// to renew.
if leaseDuration <= r.grace || leaseDuration-sleepDuration <= r.grace {
return nil
}
select {
case <-r.stopCh:
return nil
case <-time.After(sleepDuration):
continue
}
}
}
// sleepDuration calculates the time to sleep given the base lease duration. The
// base is the resulting lease duration. It will be reduced to 1/3 and
// multiplied by a random float between 0.0 and 1.0. This extra randomness
// prevents multiple clients from all trying to renew simultaneously.
func (r *Renewer) sleepDuration(base time.Duration) time.Duration {
sleep := float64(base)
// Renew at 1/3 the remaining lease. This will give us an opportunity to retry
// at least one more time should the first renewal fail.
sleep = sleep / 3.0
// Use a randomness so many clients do not hit Vault simultaneously.
sleep = sleep * (r.random.Float64() + 1) / 2.0
return time.Duration(sleep)
}
// calculateGrace calculates the grace period based on a reasonable set of
// assumptions given the total lease time; it also adds some jitter to not have
// clients be in sync.
func (r *Renewer) calculateGrace(leaseDuration time.Duration) {
if leaseDuration == 0 {
r.grace = 0
return
}
leaseNanos := float64(leaseDuration.Nanoseconds())
jitterMax := 0.1 * leaseNanos
// For a given lease duration, we want to allow 80-90% of that to elapse,
// so the remaining amount is the grace period
r.grace = time.Duration(jitterMax) + time.Duration(uint64(r.random.Int63())%uint64(jitterMax))
}

View File

@ -1,8 +1,9 @@
package api
import (
"reflect"
"testing"
"github.com/go-test/deep"
)
func TestRenewer_NewRenewer(t *testing.T) {
@ -63,8 +64,8 @@ func TestRenewer_NewRenewer(t *testing.T) {
v.renewCh = nil
v.stopCh = nil
if !reflect.DeepEqual(tc.e, v) {
t.Errorf("not equal\nexp: %#v\nact: %#v", tc.e, v)
if diff := deep.Equal(tc.e, v); diff != nil {
t.Error(diff)
}
})
}

View File

@ -1,378 +0,0 @@
package agentint
import (
"errors"
"math/rand"
"sync"
"time"
"github.com/hashicorp/vault/api"
)
var (
ErrRenewerMissingInput = errors.New("missing input to renewer")
ErrRenewerMissingSecret = errors.New("missing secret to renew")
ErrRenewerNotRenewable = errors.New("secret is not renewable")
ErrRenewerNoSecretData = errors.New("returned empty secret data")
// DefaultRenewerRenewBuffer is the default size of the buffer for renew
// messages on the channel.
DefaultRenewerRenewBuffer = 5
)
// Renewer is a process for renewing a secret.
//
// renewer, err := client.NewRenewer(&RenewerInput{
// Secret: mySecret,
// })
// go renewer.Renew()
// defer renewer.Stop()
//
// for {
// select {
// case err := <-renewer.DoneCh():
// if err != nil {
// log.Fatal(err)
// }
//
// // Renewal is now over
// case renewal := <-renewer.RenewCh():
// log.Printf("Successfully renewed: %#v", renewal)
// }
// }
//
//
// The `DoneCh` will return if renewal fails or if the remaining lease duration
// after a renewal is less than or equal to the grace (in number of seconds).
// In both cases, the caller should attempt a re-read of the secret or
// reauthenticate to get a new token. Clients should check the return value of
// the channel to see if renewal was successful.
type Renewer struct {
l sync.Mutex
client *api.Client
secret *api.Secret
grace time.Duration
random *rand.Rand
increment int
doneCh chan error
renewCh chan *renewOutput
stopped bool
stopCh chan struct{}
}
// RenewerInput is used as input to the renew function.
type RenewerInput struct {
// Secret is the secret to renew
Secret *api.Secret
// DEPRECATED: this does not do anything.
Grace time.Duration
// Rand is the randomizer to use for underlying randomization. If not
// provided, one will be generated and seeded automatically. If provided, it
// is assumed to have already been seeded.
Rand *rand.Rand
// RenewBuffer is the size of the buffered channel where renew messages are
// dispatched.
RenewBuffer int
// The new TTL, in seconds, that should be set on the lease. The TTL set
// here may or may not be honored by the vault server, based on Vault
// configuration or any associated max TTL values.
Increment int
}
// renewOutput is the metadata returned to the client (if it's listening) to
// renew messages.
type renewOutput struct {
// RenewedAt is the timestamp when the renewal took place (UTC).
RenewedAt time.Time
// Secret is the underlying renewal data. It's the same struct as all data
// that is returned from Vault, but since this is renewal data, it will not
// usually include the secret itself.
Secret *api.Secret
}
// NewRenewer creates a new Renewer from the given input.
func NewRenewer(c *api.Client, i *RenewerInput) (*Renewer, error) {
if i == nil {
return nil, ErrRenewerMissingInput
}
secret := i.Secret
if secret == nil {
return nil, ErrRenewerMissingSecret
}
random := i.Rand
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
renewBuffer := i.RenewBuffer
if renewBuffer == 0 {
renewBuffer = DefaultRenewerRenewBuffer
}
return &Renewer{
client: c,
secret: secret,
increment: i.Increment,
random: random,
doneCh: make(chan error, 1),
renewCh: make(chan *renewOutput, renewBuffer),
stopped: false,
stopCh: make(chan struct{}),
}, nil
}
// DoneCh returns the channel where the Renewer will publish when renewal stops.
// If there is an error, this will be an error.
func (r *Renewer) DoneCh() <-chan error {
return r.doneCh
}
// RenewCh is a channel that receives a message when a successful renewal takes
// place and includes metadata about the renewal.
func (r *Renewer) RenewCh() <-chan *renewOutput {
return r.renewCh
}
// Stop stops the Renewer.
func (r *Renewer) Stop() {
r.l.Lock()
if !r.stopped {
close(r.stopCh)
r.stopped = true
}
r.l.Unlock()
}
// Renew starts a background process for renewing this secret. When the secret
// has auth data, this attempts to renew the auth (token). When the secret has
// a lease, this attempts to renew the lease.
func (r *Renewer) Renew() {
var result error
if r.secret.Auth != nil {
result = r.renewAuth()
} else {
result = r.renewLease()
}
r.doneCh <- result
}
// renewAuth is a helper for renewing authentication.
func (r *Renewer) renewAuth() error {
if r.secret.Auth.ClientToken == "" {
return ErrRenewerNotRenewable
}
nonRenewable := !r.secret.Auth.Renewable
initialTime := time.Now()
priorDuration := time.Duration(r.secret.Auth.LeaseDuration) * time.Second
r.calculateGrace(priorDuration)
client, token := r.client, r.secret.Auth.ClientToken
for {
// Check if we are stopped.
select {
case <-r.stopCh:
return nil
default:
}
var leaseDuration time.Duration
switch {
case nonRenewable:
// Can't renew, just keep same expiration so we exit when it's
// reauthentication time
leaseDuration = initialTime.Add(priorDuration).Sub(time.Now())
// Push a message that a renewal took place.
default:
renewal, err := client.Auth().Token().RenewTokenAsSelf(token, r.increment)
if err != nil {
return err
}
// Push a message that a renewal took place.
select {
case r.renewCh <- &renewOutput{time.Now().UTC(), renewal}:
default:
}
// Somehow, sometimes, this happens.
if renewal == nil || renewal.Auth == nil {
return ErrRenewerNoSecretData
}
// Do nothing if we are not renewable
if !renewal.Auth.Renewable {
return ErrRenewerNotRenewable
}
// Grab the lease duration
leaseDuration = time.Duration(renewal.Auth.LeaseDuration) * time.Second
}
// We keep evaluating a new grace period so long as the lease is
// extending. Once it stops extending, we've hit the max and need to
// rely on the grace duration.
if leaseDuration > priorDuration {
r.calculateGrace(leaseDuration)
}
priorDuration = leaseDuration
// The sleep duration is set to 2/3 of the current lease duration plus
// 1/3 of the current grace period, which adds jitter.
sleepDuration := time.Duration(float64(leaseDuration.Nanoseconds())*2/3 + float64(r.grace.Nanoseconds())/3)
// If we are within grace, return now; or, if the amount of time we
// would sleep would land us in the grace period. This helps with short
// tokens; for example, you don't want a current lease duration of 4
// seconds, a grace period of 3 seconds, and end up sleeping for more
// than three of those seconds and having a very small budget of time
// to renew.
if leaseDuration <= r.grace || leaseDuration-sleepDuration <= r.grace {
return nil
}
select {
case <-r.stopCh:
return nil
case <-time.After(sleepDuration):
continue
}
}
}
// renewLease is a helper for renewing a lease.
func (r *Renewer) renewLease() error {
if r.secret.LeaseID == "" {
return ErrRenewerNotRenewable
}
nonRenewable := !r.secret.Renewable
initialTime := time.Now()
priorDuration := time.Duration(r.secret.LeaseDuration) * time.Second
r.calculateGrace(priorDuration)
client, leaseID := r.client, r.secret.LeaseID
for {
// Check if we are stopped.
select {
case <-r.stopCh:
return nil
default:
}
var leaseDuration time.Duration
switch {
case nonRenewable:
// Can't renew, just keep same expiration so we exit when it's
// reread time
leaseDuration = initialTime.Add(priorDuration).Sub(time.Now())
default:
// Renew the lease.
renewal, err := client.Sys().Renew(leaseID, r.increment)
if err != nil {
return err
}
// Push a message that a renewal took place.
select {
case r.renewCh <- &renewOutput{time.Now().UTC(), renewal}:
default:
}
// Somehow, sometimes, this happens.
if renewal == nil {
return ErrRenewerNoSecretData
}
// Do nothing if we are not renewable
if !renewal.Renewable {
return ErrRenewerNotRenewable
}
// Grab the lease duration
leaseDuration = time.Duration(renewal.LeaseDuration) * time.Second
}
// We keep evaluating a new grace period so long as the lease is
// extending. Once it stops extending, we've hit the max and need to
// rely on the grace duration.
if leaseDuration > priorDuration {
r.calculateGrace(leaseDuration)
}
priorDuration = leaseDuration
// The sleep duration is set to 2/3 of the current lease duration plus
// 1/3 of the current grace period, which adds jitter.
sleepDuration := time.Duration(float64(leaseDuration.Nanoseconds())*2/3 + float64(r.grace.Nanoseconds())/3)
// If we are within grace, return now; or, if the amount of time we
// would sleep would land us in the grace period. This helps with short
// tokens; for example, you don't want a current lease duration of 4
// seconds, a grace period of 3 seconds, and end up sleeping for more
// than three of those seconds and having a very small budget of time
// to renew.
if leaseDuration <= r.grace || leaseDuration-sleepDuration <= r.grace {
return nil
}
select {
case <-r.stopCh:
return nil
case <-time.After(sleepDuration):
continue
}
}
}
// sleepDuration calculates the time to sleep given the base lease duration. The
// base is the resulting lease duration. It will be reduced to 1/3 and
// multiplied by a random float between 0.0 and 1.0. This extra randomness
// prevents multiple clients from all trying to renew simultaneously.
func (r *Renewer) sleepDuration(base time.Duration) time.Duration {
sleep := float64(base)
// Renew at 1/3 the remaining lease. This will give us an opportunity to retry
// at least one more time should the first renewal fail.
sleep = sleep / 3.0
// Use a randomness so many clients do not hit Vault simultaneously.
sleep = sleep * (r.random.Float64() + 1) / 2.0
return time.Duration(sleep)
}
// calculateGrace calculates the grace period based on a reasonable set of
// assumptions given the total lease time; it also adds some jitter to not have
// clients be in sync.
func (r *Renewer) calculateGrace(leaseDuration time.Duration) {
if leaseDuration == 0 {
r.grace = 0
return
}
leaseNanos := float64(leaseDuration.Nanoseconds())
jitterMax := 0.1 * leaseNanos
// For a given lease duration, we want to allow 80-90% of that to elapse,
// so the remaining amount is the grace period
r.grace = time.Duration(jitterMax) + time.Duration(uint64(r.random.Int63())%uint64(jitterMax))
}

View File

@ -7,7 +7,6 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/command/agent/agentint"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
)
@ -106,7 +105,7 @@ func (ah *AuthHandler) Run(ctx context.Context, am AuthMethod) {
credCh = make(chan struct{})
}
var renewer *agentint.Renewer
var watcher *api.LifetimeWatcher
for {
select {
@ -205,44 +204,44 @@ func (ah *AuthHandler) Run(ctx context.Context, am AuthMethod) {
am.CredSuccess()
}
if renewer != nil {
renewer.Stop()
if watcher != nil {
watcher.Stop()
}
renewer, err = agentint.NewRenewer(ah.client, &agentint.RenewerInput{
watcher, err = ah.client.NewLifetimeWatcher(&api.LifetimeWatcherInput{
Secret: secret,
})
if err != nil {
ah.logger.Error("error creating renewer, backing off and retrying", "error", err, "backoff", backoff.Seconds())
ah.logger.Error("error creating lifetime watcher, backing off and retrying", "error", err, "backoff", backoff.Seconds())
backoffOrQuit(ctx, backoff)
continue
}
// Start the renewal process
ah.logger.Info("starting renewal process")
go renewer.Renew()
go watcher.Renew()
RenewerLoop:
LifetimeWatcherLoop:
for {
select {
case <-ctx.Done():
ah.logger.Info("shutdown triggered, stopping renewer")
renewer.Stop()
break RenewerLoop
ah.logger.Info("shutdown triggered, stopping lifetime watcher")
watcher.Stop()
break LifetimeWatcherLoop
case err := <-renewer.DoneCh():
ah.logger.Info("renewer done channel triggered")
case err := <-watcher.DoneCh():
ah.logger.Info("lifetime watcher done channel triggered")
if err != nil {
ah.logger.Error("error renewing token", "error", err)
}
break RenewerLoop
break LifetimeWatcherLoop
case <-renewer.RenewCh():
case <-watcher.RenewCh():
ah.logger.Info("renewed auth token")
case <-credCh:
ah.logger.Info("auth method found new credentials, re-authenticating")
break RenewerLoop
break LifetimeWatcherLoop
}
}
}

View File

@ -18,7 +18,6 @@ import (
"github.com/hashicorp/errwrap"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/command/agent/agentint"
cachememdb "github.com/hashicorp/vault/command/agent/cache/cachememdb"
"github.com/hashicorp/vault/helper/namespace"
nshelper "github.com/hashicorp/vault/helper/namespace"
@ -343,10 +342,10 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
// Set the index's Response
index.Response = respBytes.Bytes()
// Store the index ID in the renewer context
// Store the index ID in the lifetimewatcher context
renewCtx := context.WithValue(renewCtxInfo.Ctx, contextIndexID, index.ID)
// Store the renewer context in the index
// Store the lifetime watcher context in the index
index.RenewCtxInfo = &cachememdb.ContextInfo{
Ctx: renewCtx,
CancelFunc: renewCtxInfo.CancelFunc,
@ -389,23 +388,23 @@ func (c *LeaseCache) startRenewing(ctx context.Context, index *cachememdb.Index,
client, err := c.client.Clone()
if err != nil {
c.logger.Error("failed to create API client in the renewer", "error", err)
c.logger.Error("failed to create API client in the lifetime watcher", "error", err)
return
}
client.SetToken(req.Token)
client.SetHeaders(req.Request.Header)
renewer, err := agentint.NewRenewer(client, &agentint.RenewerInput{
watcher, err := client.NewLifetimeWatcher(&api.LifetimeWatcherInput{
Secret: secret,
})
if err != nil {
c.logger.Error("failed to create secret renewer", "error", err)
c.logger.Error("failed to create secret lifetime watcher", "error", err)
return
}
c.logger.Debug("initiating renewal", "method", req.Request.Method, "path", req.Request.URL.Path)
go renewer.Renew()
defer renewer.Stop()
go watcher.Start()
defer watcher.Stop()
for {
select {
@ -413,9 +412,9 @@ func (c *LeaseCache) startRenewing(ctx context.Context, index *cachememdb.Index,
// This is the case which captures context cancellations from token
// and leases. Since all the contexts are derived from the agent's
// context, this will also cover the shutdown scenario.
c.logger.Debug("context cancelled; stopping renewer", "path", req.Request.URL.Path)
c.logger.Debug("context cancelled; stopping lifetime watcher", "path", req.Request.URL.Path)
return
case err := <-renewer.DoneCh():
case err := <-watcher.DoneCh():
// This case covers renewal completion and renewal errors
if err != nil {
c.logger.Error("failed to renew secret", "error", err)
@ -423,7 +422,7 @@ func (c *LeaseCache) startRenewing(ctx context.Context, index *cachememdb.Index,
}
c.logger.Debug("renewal halted; evicting from cache", "path", req.Request.URL.Path)
return
case <-renewer.RenewCh():
case <-watcher.RenewCh():
c.logger.Debug("secret renewed", "path", req.Request.URL.Path)
case <-index.RenewCtxInfo.DoneCh:
// This case indicates the renewal process to shutdown and evict
@ -522,7 +521,7 @@ func (c *LeaseCache) handleCacheClear(ctx context.Context, in *cacheClearInput)
}
// Find all the cached entries which has the given request path and
// cancel the contexts of all the respective renewers
// cancel the contexts of all the respective lifetime watchers
indexes, err := c.db.GetByPrefix(cachememdb.IndexNameRequestPath, in.Namespace, in.RequestPath)
if err != nil {
return err
@ -554,7 +553,8 @@ func (c *LeaseCache) handleCacheClear(ctx context.Context, in *cacheClearInput)
return errors.New("token accessor not provided")
}
// Get the cached index and cancel the corresponding renewer context
// Get the cached index and cancel the corresponding lifetime watcher
// context
index, err := c.db.Get(cachememdb.IndexNameTokenAccessor, in.TokenAccessor)
if err != nil {
return err
@ -572,7 +572,8 @@ func (c *LeaseCache) handleCacheClear(ctx context.Context, in *cacheClearInput)
return errors.New("lease not provided")
}
// Get the cached index and cancel the corresponding renewer context
// Get the cached index and cancel the corresponding lifetime watcher
// context
index, err := c.db.Get(cachememdb.IndexNameLease, in.Lease)
if err != nil {
return err
@ -698,7 +699,8 @@ func (c *LeaseCache) handleRevocationRequest(ctx context.Context, req *SendReque
return false, fmt.Errorf("expected token in the request body to be string")
}
// Kill the renewers of all the leases attached to the revoked token
// Kill the lifetime watchers of all the leases attached to the revoked
// token
indexes, err := c.db.GetByPrefix(cachememdb.IndexNameLeaseToken, token)
if err != nil {
return false, err
@ -707,7 +709,7 @@ func (c *LeaseCache) handleRevocationRequest(ctx context.Context, req *SendReque
index.RenewCtxInfo.CancelFunc()
}
// Kill the renewer of the revoked token
// Kill the lifetime watchers of the revoked token
index, err := c.db.Get(cachememdb.IndexNameToken, token)
if err != nil {
return false, err
@ -716,9 +718,9 @@ func (c *LeaseCache) handleRevocationRequest(ctx context.Context, req *SendReque
return true, nil
}
// Indicate the renewer goroutine for this index to return. This will
// not affect the child tokens because the context is not getting
// cancelled.
// Indicate the lifetime watcher goroutine for this index to return.
// This will not affect the child tokens because the context is not
// getting cancelled.
close(index.RenewCtxInfo.DoneCh)
// Clear the parent references of the revoked token in the entries
@ -763,7 +765,7 @@ func (c *LeaseCache) handleRevocationRequest(ctx context.Context, req *SendReque
// Trim the URL path to get the request path prefix
prefix := strings.TrimPrefix(path, vaultPathLeaseRevokeForce)
// Get all the cache indexes that use the request path containing the
// prefix and cancel the renewer context of each.
// prefix and cancel the lifetime watcher context of each.
indexes, err := c.db.GetByPrefix(cachememdb.IndexNameLease, prefix)
if err != nil {
return false, err
@ -782,7 +784,7 @@ func (c *LeaseCache) handleRevocationRequest(ctx context.Context, req *SendReque
// Trim the URL path to get the request path prefix
prefix := strings.TrimPrefix(path, vaultPathLeaseRevokePrefix)
// Get all the cache indexes that use the request path containing the
// prefix and cancel the renewer context of each.
// prefix and cancel the lifetime watcher context of each.
indexes, err := c.db.GetByPrefix(cachememdb.IndexNameLease, prefix)
if err != nil {
return false, err