feat(coderd/database/dbpurge): retain most recent agent build logs (#14460)

Updates the `DeleteOldWorkspaceAgentLogs` to:
- Retain logs for the most recent build regardless of age,
- Delete logs for agents that never connected and were created before
   the cutoff for deleting logs while still retaining the logs most recent build.
This commit is contained in:
Cian Johnston
2024-08-30 17:39:09 +01:00
committed by GitHub
parent 10c958bba1
commit 0f8251be41
6 changed files with 354 additions and 131 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/quartz"
)
@@ -30,7 +31,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.
//nolint:gocritic // The system purges old db records without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
ticker := clk.NewTicker(time.Nanosecond)
// Start the ticker with the initial delay.
ticker := clk.NewTicker(delay)
doTick := func(start time.Time) {
defer ticker.Reset(delay)
// Start a transaction to grab advisory lock, we don't want to run
@@ -47,7 +49,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.
return nil
}
if err := tx.DeleteOldWorkspaceAgentLogs(ctx, start.Add(-maxAgentLogAge)); err != nil {
deleteOldWorkspaceAgentLogsBefore := start.Add(-maxAgentLogAge)
if err := tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore); err != nil {
return xerrors.Errorf("failed to delete old workspace agent logs: %w", err)
}
if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil {
@@ -72,13 +75,15 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.
go func() {
defer close(closed)
defer ticker.Stop()
// Force an initial tick.
doTick(dbtime.Time(clk.Now()).UTC())
for {
select {
case <-ctx.Done():
return
case tick := <-ticker.C:
ticker.Stop()
doTick(tick)
doTick(dbtime.Time(tick).UTC())
}
}
}()

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/exp/slices"
@@ -42,20 +43,11 @@ func TestPurge(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
clk := quartz.NewMock(t)
// We want to make sure dbpurge is actually started so that this test is meaningful.
trapStop := clk.Trap().TickerStop()
clk := quartz.NewMock(t)
done := awaitDoTick(ctx, t, clk)
purger := dbpurge.New(context.Background(), slogtest.Make(t, nil), dbmem.New(), clk)
// Wait for the initial nanosecond tick.
clk.Advance(time.Nanosecond).MustWait(ctx)
// Wait for ticker.Stop call that happens in the goroutine.
trapStop.MustWait(ctx).Release()
// Stop the trap now to avoid blocking further.
trapStop.Close()
<-done // wait for doTick() to run.
require.NoError(t, purger.Close())
}
@@ -181,7 +173,15 @@ func containsWorkspaceAgentStat(stats []database.GetWorkspaceAgentStatsRow, need
//nolint:paralleltest // It uses LockIDDBPurge.
func TestDeleteOldWorkspaceAgentLogs(t *testing.T) {
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
clk := quartz.NewMock(t)
now := dbtime.Now()
threshold := now.Add(-7 * 24 * time.Hour)
beforeThreshold := threshold.Add(-24 * time.Hour)
afterThreshold := threshold.Add(24 * time.Hour)
clk.Set(now).MustWait(ctx)
db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
org := dbgen.Organization(t, db, database.Organization{})
user := dbgen.User(t, db, database.User{})
_ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID})
@@ -189,95 +189,191 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) {
tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID})
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
now := dbtime.Now()
//nolint:paralleltest // It uses LockIDDBPurge.
t.Run("AgentHasNotConnectedSinceWeek_LogsExpired", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
clk := quartz.NewMock(t)
clk.Set(now).MustWait(ctx)
// Given the following:
// After dbpurge completes, the ticker is reset. Trap this call.
trapReset := clk.Trap().TickerReset()
defer trapReset.Close()
// Workspace A was built twice before the threshold, and never connected on
// either attempt.
wsA := dbgen.Workspace(t, db, database.Workspace{Name: "a", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
wbA1 := mustCreateWorkspaceBuild(t, db, org, tv, wsA.ID, beforeThreshold, 1)
wbA2 := mustCreateWorkspaceBuild(t, db, org, tv, wsA.ID, beforeThreshold, 2)
agentA1 := mustCreateAgent(t, db, wbA1)
agentA2 := mustCreateAgent(t, db, wbA2)
mustCreateAgentLogs(ctx, t, db, agentA1, nil, "agent a1 logs should be deleted")
mustCreateAgentLogs(ctx, t, db, agentA2, nil, "agent a2 logs should be retained")
// given: an agent with logs older than threshold
agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-8*24*time.Hour), t.Name())
// Workspace B was built twice before the threshold.
wsB := dbgen.Workspace(t, db, database.Workspace{Name: "b", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
wbB1 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 1)
wbB2 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 2)
agentB1 := mustCreateAgent(t, db, wbB1)
agentB2 := mustCreateAgent(t, db, wbB2)
mustCreateAgentLogs(ctx, t, db, agentB1, &beforeThreshold, "agent b1 logs should be deleted")
mustCreateAgentLogs(ctx, t, db, agentB2, &beforeThreshold, "agent b2 logs should be retained")
// when dbpurge runs
closer := dbpurge.New(ctx, logger, db, clk)
defer closer.Close()
// Wait for the initial nanosecond tick.
clk.Advance(time.Nanosecond).MustWait(ctx)
// Workspace C was built once before the threshold, and once after.
wsC := dbgen.Workspace(t, db, database.Workspace{Name: "c", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
wbC1 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, beforeThreshold, 1)
wbC2 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, afterThreshold, 2)
agentC1 := mustCreateAgent(t, db, wbC1)
agentC2 := mustCreateAgent(t, db, wbC2)
mustCreateAgentLogs(ctx, t, db, agentC1, &beforeThreshold, "agent c1 logs should be deleted")
mustCreateAgentLogs(ctx, t, db, agentC2, &afterThreshold, "agent c2 logs should be retained")
trapReset.MustWait(ctx).Release() // Wait for ticker.Reset()
d, w := clk.AdvanceNext()
require.Equal(t, 10*time.Minute, d)
// Workspace D was built twice after the threshold.
wsD := dbgen.Workspace(t, db, database.Workspace{Name: "d", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
wbD1 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 1)
wbD2 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 2)
agentD1 := mustCreateAgent(t, db, wbD1)
agentD2 := mustCreateAgent(t, db, wbD2)
mustCreateAgentLogs(ctx, t, db, agentD1, &afterThreshold, "agent d1 logs should be retained")
mustCreateAgentLogs(ctx, t, db, agentD2, &afterThreshold, "agent d2 logs should be retained")
closer.Close() // doTick() has now run.
w.MustWait(ctx)
// Workspace E was build once after threshold but never connected.
wsE := dbgen.Workspace(t, db, database.Workspace{Name: "e", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
wbE1 := mustCreateWorkspaceBuild(t, db, org, tv, wsE.ID, beforeThreshold, 1)
agentE1 := mustCreateAgent(t, db, wbE1)
mustCreateAgentLogs(ctx, t, db, agentE1, nil, "agent e1 logs should be retained")
// then the logs should be gone
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
AgentID: agent,
CreatedAfter: 0,
})
require.NoError(t, err)
require.Empty(t, agentLogs, "expected agent logs to be empty")
})
// when dbpurge runs
//nolint:paralleltest // It uses LockIDDBPurge.
t.Run("AgentConnectedSixDaysAgo_LogsValid", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
clk := quartz.NewMock(t)
clk.Set(now).MustWait(ctx)
// After dbpurge completes, the ticker is reset. Trap this call.
// After dbpurge completes, the ticker is reset. Trap this call.
trapReset := clk.Trap().TickerReset()
defer trapReset.Close()
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, clk)
defer closer.Close()
<-done // doTick() has now run.
// given: an agent with logs newer than threshold
agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-6*24*time.Hour), t.Name())
// then logs related to the following agents should be deleted:
// Agent A1 never connected, was created before the threshold, and is not the
// latest build.
assertNoWorkspaceAgentLogs(ctx, t, db, agentA1.ID)
// Agent B1 is not the latest build and the logs are from before threshold.
assertNoWorkspaceAgentLogs(ctx, t, db, agentB1.ID)
// Agent C1 is not the latest build and the logs are from before threshold.
assertNoWorkspaceAgentLogs(ctx, t, db, agentC1.ID)
// when dbpurge runs
closer := dbpurge.New(ctx, logger, db, clk)
defer closer.Close()
// Wait for the initial nanosecond tick.
clk.Advance(time.Nanosecond).MustWait(ctx)
trapReset.MustWait(ctx).Release() // Wait for ticker.Reset()
d, w := clk.AdvanceNext()
require.Equal(t, 10*time.Minute, d)
closer.Close() // doTick() has now run.
w.MustWait(ctx)
// then the logs should still be there
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
AgentID: agent,
})
require.NoError(t, err)
require.NotEmpty(t, agentLogs)
for _, al := range agentLogs {
require.Equal(t, t.Name(), al.Output)
}
})
// then logs related to the following agents should be retained:
// Agent A2 is the latest build.
assertWorkspaceAgentLogs(ctx, t, db, agentA2.ID, "agent a2 logs should be retained")
// Agent B2 is the latest build.
assertWorkspaceAgentLogs(ctx, t, db, agentB2.ID, "agent b2 logs should be retained")
// Agent C2 is the latest build.
assertWorkspaceAgentLogs(ctx, t, db, agentC2.ID, "agent c2 logs should be retained")
// Agents D1, D2, and E1 are all after threshold.
assertWorkspaceAgentLogs(ctx, t, db, agentD1.ID, "agent d1 logs should be retained")
assertWorkspaceAgentLogs(ctx, t, db, agentD2.ID, "agent d2 logs should be retained")
assertWorkspaceAgentLogs(ctx, t, db, agentE1.ID, "agent e1 logs should be retained")
}
func mustCreateAgentWithLogs(ctx context.Context, t *testing.T, db database.Store, user database.User, org database.Organization, tmpl database.Template, tv database.TemplateVersion, agentLastConnectedAt time.Time, output string) uuid.UUID {
agent := mustCreateAgent(t, db, user, org, tmpl, tv)
func awaitDoTick(ctx context.Context, t *testing.T, clk *quartz.Mock) chan struct{} {
t.Helper()
ch := make(chan struct{})
trapNow := clk.Trap().Now()
trapStop := clk.Trap().TickerStop()
trapReset := clk.Trap().TickerReset()
go func() {
defer close(ch)
defer trapReset.Close()
defer trapStop.Close()
defer trapNow.Close()
// Wait for the initial tick signified by a call to Now().
trapNow.MustWait(ctx).Release()
// doTick runs here. Wait for the next
// ticker reset event that signifies it's completed.
trapReset.MustWait(ctx).Release()
// Ensure that the next tick happens in 10 minutes from start.
d, w := clk.AdvanceNext()
if !assert.Equal(t, 10*time.Minute, d) {
return
}
w.MustWait(ctx)
// Wait for the ticker stop event.
trapStop.MustWait(ctx).Release()
}()
err := db.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{
ID: agent.ID,
LastConnectedAt: sql.NullTime{Time: agentLastConnectedAt, Valid: true},
return ch
}
func assertNoWorkspaceAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID) {
t.Helper()
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
AgentID: agentID,
CreatedAfter: 0,
})
require.NoError(t, err)
_, err = db.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{
assert.Empty(t, agentLogs)
}
func assertWorkspaceAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID, msg string) {
t.Helper()
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
AgentID: agentID,
CreatedAfter: 0,
})
require.NoError(t, err)
assert.NotEmpty(t, agentLogs)
for _, al := range agentLogs {
assert.Equal(t, msg, al.Output)
}
}
func mustCreateWorkspaceBuild(t *testing.T, db database.Store, org database.Organization, tv database.TemplateVersion, wsID uuid.UUID, createdAt time.Time, n int32) database.WorkspaceBuild {
t.Helper()
job := dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: createdAt,
OrganizationID: org.ID,
Type: database.ProvisionerJobTypeWorkspaceBuild,
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
})
wb := dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{
CreatedAt: createdAt,
WorkspaceID: wsID,
JobID: job.ID,
TemplateVersionID: tv.ID,
Transition: database.WorkspaceTransitionStart,
Reason: database.BuildReasonInitiator,
BuildNumber: n,
})
require.Equal(t, createdAt.UTC(), wb.CreatedAt.UTC())
return wb
}
func mustCreateAgent(t *testing.T, db database.Store, wb database.WorkspaceBuild) database.WorkspaceAgent {
t.Helper()
resource := dbgen.WorkspaceResource(t, db, database.WorkspaceResource{
JobID: wb.JobID,
Transition: database.WorkspaceTransitionStart,
CreatedAt: wb.CreatedAt,
})
ws, err := db.GetWorkspaceByID(context.Background(), wb.WorkspaceID)
require.NoError(t, err)
wa := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{
Name: fmt.Sprintf("%s%d", ws.Name, wb.BuildNumber),
ResourceID: resource.ID,
CreatedAt: wb.CreatedAt,
FirstConnectedAt: sql.NullTime{},
DisconnectedAt: sql.NullTime{},
LastConnectedAt: sql.NullTime{},
})
require.Equal(t, wb.CreatedAt.UTC(), wa.CreatedAt.UTC())
return wa
}
func mustCreateAgentLogs(ctx context.Context, t *testing.T, db database.Store, agent database.WorkspaceAgent, agentLastConnectedAt *time.Time, output string) {
t.Helper()
if agentLastConnectedAt != nil {
require.NoError(t, db.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{
ID: agent.ID,
LastConnectedAt: sql.NullTime{Time: *agentLastConnectedAt, Valid: true},
}))
}
_, err := db.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{
AgentID: agent.ID,
CreatedAt: agentLastConnectedAt,
CreatedAt: agent.CreatedAt,
Output: []string{output},
Level: []database.LogLevel{database.LogLevelDebug},
})
@@ -287,33 +383,7 @@ func mustCreateAgentWithLogs(ctx context.Context, t *testing.T, db database.Stor
AgentID: agent.ID,
})
require.NoError(t, err)
require.NotZero(t, agentLogs, "agent logs must be present")
return agent.ID
}
func mustCreateAgent(t *testing.T, db database.Store, user database.User, org database.Organization, tmpl database.Template, tv database.TemplateVersion) database.WorkspaceAgent {
workspace := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID})
job := dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
OrganizationID: org.ID,
Type: database.ProvisionerJobTypeWorkspaceBuild,
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
})
_ = dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{
WorkspaceID: workspace.ID,
JobID: job.ID,
TemplateVersionID: tv.ID,
Transition: database.WorkspaceTransitionStart,
Reason: database.BuildReasonInitiator,
})
resource := dbgen.WorkspaceResource(t, db, database.WorkspaceResource{
JobID: job.ID,
Transition: database.WorkspaceTransitionStart,
})
return dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{
ResourceID: resource.ID,
})
require.NotEmpty(t, agentLogs, "agent logs must be present")
}
//nolint:paralleltest // It uses LockIDDBPurge.