mirror of
https://github.com/coder/coder.git
synced 2025-07-09 11:45:56 +00:00
fix: Trigger workspace event after agent timeout seconds (#5141)
Fixes #5116
This commit is contained in:
committed by
GitHub
parent
e94b27bce4
commit
2b6c229e4e
@ -14,6 +14,8 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/tabbed/pqtype"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/xerrors"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
|
||||
@ -631,14 +633,58 @@ func (server *Server) CompleteJob(ctx context.Context, completed *proto.Complete
|
||||
if err != nil {
|
||||
return xerrors.Errorf("update workspace build: %w", err)
|
||||
}
|
||||
|
||||
agentTimeouts := make(map[time.Duration]bool) // A set of agent timeouts.
|
||||
// This could be a bulk insert to improve performance.
|
||||
for _, protoResource := range jobType.WorkspaceBuild.Resources {
|
||||
for _, protoAgent := range protoResource.Agents {
|
||||
dur := time.Duration(protoAgent.GetConnectionTimeoutSeconds()) * time.Second
|
||||
agentTimeouts[dur] = true
|
||||
}
|
||||
err = InsertWorkspaceResource(ctx, db, job.ID, workspaceBuild.Transition, protoResource, telemetrySnapshot)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("insert provisioner job: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// On start, we want to ensure that workspace agents timeout statuses
|
||||
// are propagated. This method is simple and does not protect against
|
||||
// notifying in edge cases like when a workspace is stopped soon
|
||||
// after being started.
|
||||
//
|
||||
// Agent timeouts could be minutes apart, resulting in an unresponsive
|
||||
// experience, so we'll notify after every unique timeout seconds.
|
||||
if !input.DryRun && workspaceBuild.Transition == database.WorkspaceTransitionStart && len(agentTimeouts) > 0 {
|
||||
timeouts := maps.Keys(agentTimeouts)
|
||||
slices.Sort(timeouts)
|
||||
|
||||
var updates []<-chan time.Time
|
||||
for _, d := range timeouts {
|
||||
server.Logger.Debug(ctx, "triggering workspace notification after agent timeout",
|
||||
slog.F("workspace_build_id", workspaceBuild.ID),
|
||||
slog.F("timeout", d),
|
||||
)
|
||||
// Agents are inserted with `database.Now()`, this triggers a
|
||||
// workspace event approximately after created + timeout seconds.
|
||||
updates = append(updates, time.After(d))
|
||||
}
|
||||
go func() {
|
||||
for _, wait := range updates {
|
||||
// Wait for the next potential timeout to occur. Note that we
|
||||
// can't listen on the context here because we will hang around
|
||||
// after this function has returned. The server also doesn't
|
||||
// have a shutdown signal we can listen to.
|
||||
<-wait
|
||||
if err := server.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceBuild.WorkspaceID), []byte{}); err != nil {
|
||||
server.Logger.Error(ctx, "workspace notification after agent timeout failed",
|
||||
slog.F("workspace_build_id", workspaceBuild.ID),
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if workspaceBuild.Transition != database.WorkspaceTransitionDelete {
|
||||
// This is for deleting a workspace!
|
||||
return nil
|
||||
|
@ -1357,6 +1357,7 @@ func TestWorkspaceWatcher(t *testing.T) {
|
||||
Auth: &proto.Agent_Token{
|
||||
Token: authToken,
|
||||
},
|
||||
ConnectionTimeoutSeconds: 1,
|
||||
}},
|
||||
}},
|
||||
},
|
||||
@ -1372,58 +1373,64 @@ func TestWorkspaceWatcher(t *testing.T) {
|
||||
|
||||
wc, err := client.WatchWorkspace(ctx, workspace.ID)
|
||||
require.NoError(t, err)
|
||||
wait := func() {
|
||||
|
||||
// Wait events are easier to debug with timestamped logs.
|
||||
logger := slogtest.Make(t, nil).Named(t.Name()).Leveled(slog.LevelDebug)
|
||||
wait := func(event string) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fail()
|
||||
require.FailNow(t, "timed out waiting for event", event)
|
||||
case <-wc:
|
||||
logger.Info(ctx, "done waiting for event", slog.F("event", event))
|
||||
}
|
||||
}
|
||||
|
||||
coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
|
||||
// the workspace build being created
|
||||
wait()
|
||||
// the workspace build being acquired
|
||||
wait()
|
||||
// the workspace build completing
|
||||
wait()
|
||||
wait("workspace build being created")
|
||||
wait("workspace build being acquired")
|
||||
wait("workspace build completing")
|
||||
|
||||
// Unfortunately, this will add ~1s to the test due to the granularity
|
||||
// of agent timeout seconds. However, if we don't do this we won't know
|
||||
// which trigger we received when waiting for connection.
|
||||
//
|
||||
// Note that the first timeout is from `coderdtest.CreateWorkspace` and
|
||||
// the latter is from `coderdtest.CreateWorkspaceBuild`.
|
||||
wait("agent timeout after create")
|
||||
wait("agent timeout after start")
|
||||
|
||||
agentClient := codersdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
agentCloser := agent.New(agent.Options{
|
||||
Client: agentClient,
|
||||
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
|
||||
Logger: logger.Named("agent"),
|
||||
})
|
||||
defer func() {
|
||||
_ = agentCloser.Close()
|
||||
}()
|
||||
|
||||
// the agent connected
|
||||
wait()
|
||||
wait("agent connected")
|
||||
agentCloser.Close()
|
||||
// the agent disconnected
|
||||
wait()
|
||||
wait("agent disconnected")
|
||||
|
||||
closeFunc.Close()
|
||||
build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
|
||||
// First is for the workspace build itself
|
||||
wait()
|
||||
wait("first is for the workspace build itself")
|
||||
err = client.CancelWorkspaceBuild(ctx, build.ID)
|
||||
require.NoError(t, err)
|
||||
// Second is for the build cancel
|
||||
wait()
|
||||
wait("second is for the build cancel")
|
||||
|
||||
err = client.UpdateWorkspace(ctx, workspace.ID, codersdk.UpdateWorkspaceRequest{
|
||||
Name: "another",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
wait()
|
||||
wait("update workspace name")
|
||||
|
||||
err = client.UpdateActiveTemplateVersion(ctx, template.ID, codersdk.UpdateActiveTemplateVersion{
|
||||
ID: template.ActiveVersionID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
wait()
|
||||
wait("update active template version")
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
Reference in New Issue
Block a user