Skip to content

Commit

Permalink
Make TestActivityLog_MultipleFragmentsAndSegments timeout on its own (h…
Browse files Browse the repository at this point in the history
…ashicorp#11490)

* The main driver for this change was to make the read from a.newFragmentCh timeout quickly rather than waiting for the test timeout (much longer).  While testing the change I observed a panic during shutdown, but it was swallowed and moreover there was no stack trace so it wasn't obvious.  I'm hoping we can get rid of the recover, so I fixed the issue in the activitylog tests that needed it.
  • Loading branch information
ncabatoff committed May 6, 2021
1 parent e5f86dc commit 42c2ed6
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 22 deletions.
17 changes: 10 additions & 7 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime
func (a *ActivityLog) WalkEntitySegments(ctx context.Context,
startTime time.Time,
walkFn func(*activity.EntityActivityLog)) error {

basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath)
if err != nil {
Expand Down Expand Up @@ -486,7 +485,6 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context,
func (a *ActivityLog) WalkTokenSegments(ctx context.Context,
startTime time.Time,
walkFn func(*activity.TokenCount)) error {

basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath)
if err != nil {
Expand Down Expand Up @@ -995,7 +993,7 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {

// stopActivityLog removes the ActivityLog from Core
// and frees any resources.
func (c *Core) stopActivityLog() error {
func (c *Core) stopActivityLog() {
if c.tokenStore != nil {
c.tokenStore.SetActivityLog(nil)
}
Expand All @@ -1007,7 +1005,6 @@ func (c *Core) stopActivityLog() error {
}

c.activityLog = nil
return nil
}

func (a *ActivityLog) StartOfNextMonth() time.Time {
Expand Down Expand Up @@ -1526,14 +1523,20 @@ func (a *ActivityLog) precomputedQueryWorker() error {

// Cancel the context if activity log is shut down.
// This will cause the next storage operation to fail.
go func() {
a.l.RLock()
// doneCh is modified in some tests, so we don't want to access that member
// without a lock, but we don't want to hold the lock for the entire lifetime
// of this goroutine. Passing the channel to the goroutine works here because
// no tests depend on us accessing the new doneCh after modifying the field.
go func(done chan struct{}) {
select {
case <-a.doneCh:
case <-done:
cancel()
case <-ctx.Done():
break
}
}()
}(a.doneCh)
a.l.RUnlock()

// Load the intent log
rawIntentLog, err := a.view.Get(ctx, activityIntentLogKey)
Expand Down
17 changes: 15 additions & 2 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {

// Stop timers for test purposes
close(a.doneCh)
defer func() {
a.l.Lock()
a.doneCh = make(chan struct{}, 1)
a.l.Unlock()
}()

startTimestamp := a.GetStartTimestamp()
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
Expand Down Expand Up @@ -516,7 +521,11 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
a.receivedFragment(fragment1)
a.receivedFragment(fragment2)

<-a.newFragmentCh
select {
case <-a.newFragmentCh:
case <-time.After(time.Minute):
t.Fatal("timed out waiting for new fragment")
}

err = a.saveCurrentSegmentToStorage(context.Background(), false)
if err != nil {
Expand Down Expand Up @@ -1376,6 +1385,9 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi

var wg sync.WaitGroup
close(a.doneCh)
defer func() {
a.doneCh = make(chan struct{}, 1)
}()

err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
if err != nil {
Expand Down Expand Up @@ -2249,7 +2261,8 @@ func TestActivityLog_PrecomputeCancel(t *testing.T) {

// This will block if the shutdown didn't work.
go func() {
a.precomputedQueryWorker()
// We expect this to error because of BlockingInmemStorage
_ = a.precomputedQueryWorker()
close(done)
}()

Expand Down
4 changes: 1 addition & 3 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,9 +2154,7 @@ func (c *Core) preSeal() error {
if err := c.stopExpiration(); err != nil {
result = multierror.Append(result, errwrap.Wrapf("error stopping expiration: {{err}}", err))
}
if err := c.stopActivityLog(); err != nil {
result = multierror.Append(result, errwrap.Wrapf("error stopping activity log: {{err}}", err))
}
c.stopActivityLog()
if err := c.teardownCredentials(context.Background()); err != nil {
result = multierror.Append(result, errwrap.Wrapf("error tearing down credentials: {{err}}", err))
}
Expand Down
5 changes: 3 additions & 2 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ func benchmarkExpirationBackend(b *testing.B, physicalBackend physical.Backend,
}

func TestExpiration_Restore(t *testing.T) {
exp := mockExpiration(t)
c, _, _ := TestCoreUnsealed(t)
exp := c.expiration
noop := &NoopBackend{}
_, barrier, _ := mockBarrier(t)
view := NewBarrierView(barrier, "logical/")
Expand Down Expand Up @@ -601,7 +602,7 @@ func TestExpiration_Restore(t *testing.T) {
}

// Stop everything
err = exp.Stop()
err = c.stopExpiration()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
7 changes: 0 additions & 7 deletions vault/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,6 @@ func testCoreUnsealed(t testing.T, core *Core) (*Core, [][]byte, string) {
testCoreAddSecretMount(t, core, token)

t.Cleanup(func() {
defer func() {
if r := recover(); r != nil {
t.Log("panic closing core during cleanup", "panic", r)
}
}()
core.Shutdown()
})
return core, keys, token
Expand Down Expand Up @@ -1871,7 +1866,6 @@ func (testCluster *TestCluster) newCore(t testing.T, idx int, coreConfig *CoreCo
func (testCluster *TestCluster) setupClusterListener(
t testing.T, idx int, core *Core, coreConfig *CoreConfig,
opts *TestClusterOptions, listeners []*TestListener, handler http.Handler) {

if coreConfig.ClusterAddr == "" {
return
}
Expand Down Expand Up @@ -2058,7 +2052,6 @@ func (tc *TestCluster) initCores(t testing.T, opts *TestClusterOptions, addAudit
func (testCluster *TestCluster) getAPIClient(
t testing.T, opts *TestClusterOptions,
port int, tlsConfig *tls.Config) *api.Client {

transport := cleanhttp.DefaultPooledTransport()
transport.TLSClientConfig = tlsConfig.Clone()
if err := http2.ConfigureTransport(transport); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion vault/token_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ func TestTokenStore_CreateLookup_ExpirationInRestoreMode(t *testing.T) {
t.Fatalf("err: %v", err)
}

err = ts.expiration.Stop()
err = c.stopExpiration()
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 42c2ed6

Please sign in to comment.