feat: add agent timings (#14713)

* feat: begin impl of agent script timings

* feat: add job_id and display_name to script timings

* fix: increment migration number

* fix: rename migrations from 251 to 254

* test: get tests compiling

* fix: appease the linter

* fix: get tests passing again

* fix: drop column from correct table

* test: add fixture for agent script timings

* fix: typo

* fix: use job id used in provisioner job timings

* fix: increment migration number

* test: behaviour of script runner

* test: rewrite test

* test: does exit 1 script break things?

* test: rewrite test again

* fix: revert change

Not sure how this came to be, I do not recall manually changing
these files.

* fix: let code breathe

* fix: wrap errors

* fix: justify nolint

* fix: swap require.Equal argument order

* fix: add mutex operations

* feat: add 'ran_on_start' and 'blocked_login' fields

* fix: update testdata fixture

* fix: refer to agent_id instead of job_id in timings

* fix: JobID -> AgentID in dbauthz_test

* fix: add 'id' to scripts, make timing refer to script id

* fix: fix broken tests and convert bug

* fix: update testdata fixtures

* fix: update testdata fixtures again

* feat: capture stage and if script timed out

* fix: update migration number

* test: add test for script api

* fix: fake db query

* fix: use UTC time

* fix: ensure r.scriptComplete is not nil

* fix: move err check to right after call

* fix: uppercase sql

* fix: use dbtime.Now()

* fix: debug log on r.scriptCompleted being nil

* fix: ensure correct rbac permissions

* chore: remove DisplayName

* fix: get tests passing

* fix: remove space in sql up

* docs: document ExecuteOption

* fix: drop 'RETURNING' from sql

* chore: remove 'display_name' from timing table

* fix: testdata fixture

* fix: put r.scriptCompleted call in goroutine

* fix: track goroutine for test + use separate context for reporting

* fix: appease linter, handle trackCommandGoroutine error

* fix: resolve race condition

* feat: replace timed_out column with status column

* test: update testdata fixture

* fix: apply suggestions from review

* revert: linter changes
This commit is contained in:
Danielle Maywood
2024-09-24 10:51:49 +01:00
committed by GitHub
parent b8944074c4
commit ae522c558d
43 changed files with 1367 additions and 232 deletions

View File

@ -19,10 +19,13 @@ import (
"github.com/spf13/afero"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/timestamppb"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
@ -75,18 +78,21 @@ func New(opts Options) *Runner {
}
}
type ScriptCompletedFunc func(context.Context, *proto.WorkspaceAgentScriptCompletedRequest) (*proto.WorkspaceAgentScriptCompletedResponse, error)
type Runner struct {
Options
cronCtx context.Context
cronCtxCancel context.CancelFunc
cmdCloseWait sync.WaitGroup
closed chan struct{}
closeMutex sync.Mutex
cron *cron.Cron
initialized atomic.Bool
scripts []codersdk.WorkspaceAgentScript
dataDir string
cronCtx context.Context
cronCtxCancel context.CancelFunc
cmdCloseWait sync.WaitGroup
closed chan struct{}
closeMutex sync.Mutex
cron *cron.Cron
initialized atomic.Bool
scripts []codersdk.WorkspaceAgentScript
dataDir string
scriptCompleted ScriptCompletedFunc
// scriptsExecuted includes all scripts executed by the workspace agent. Agents
// execute startup scripts, and scripts on a cron schedule. Both will increment
@ -116,12 +122,13 @@ func (r *Runner) RegisterMetrics(reg prometheus.Registerer) {
// Init initializes the runner with the provided scripts.
// It also schedules any scripts that have a schedule.
// This function must be called before Execute.
func (r *Runner) Init(scripts []codersdk.WorkspaceAgentScript) error {
func (r *Runner) Init(scripts []codersdk.WorkspaceAgentScript, scriptCompleted ScriptCompletedFunc) error {
if r.initialized.Load() {
return xerrors.New("init: already initialized")
}
r.initialized.Store(true)
r.scripts = scripts
r.scriptCompleted = scriptCompleted
r.Logger.Info(r.cronCtx, "initializing agent scripts", slog.F("script_count", len(scripts)), slog.F("log_dir", r.LogDir))
err := r.Filesystem.MkdirAll(r.ScriptBinDir(), 0o700)
@ -135,7 +142,7 @@ func (r *Runner) Init(scripts []codersdk.WorkspaceAgentScript) error {
}
script := script
_, err := r.cron.AddFunc(script.Cron, func() {
err := r.trackRun(r.cronCtx, script)
err := r.trackRun(r.cronCtx, script, ExecuteCronScripts)
if err != nil {
r.Logger.Warn(context.Background(), "run agent script on schedule", slog.Error(err))
}
@ -172,22 +179,33 @@ func (r *Runner) StartCron() {
}
}
// ExecuteOption describes what scripts we want to execute.
type ExecuteOption int
// ExecuteOption enums.
const (
ExecuteAllScripts ExecuteOption = iota
ExecuteStartScripts
ExecuteStopScripts
ExecuteCronScripts
)
// Execute runs a set of scripts according to a filter.
func (r *Runner) Execute(ctx context.Context, filter func(script codersdk.WorkspaceAgentScript) bool) error {
if filter == nil {
// Execute em' all!
filter = func(script codersdk.WorkspaceAgentScript) bool {
return true
}
}
func (r *Runner) Execute(ctx context.Context, option ExecuteOption) error {
var eg errgroup.Group
for _, script := range r.scripts {
if !filter(script) {
runScript := (option == ExecuteStartScripts && script.RunOnStart) ||
(option == ExecuteStopScripts && script.RunOnStop) ||
(option == ExecuteCronScripts && script.Cron != "") ||
option == ExecuteAllScripts
if !runScript {
continue
}
script := script
eg.Go(func() error {
err := r.trackRun(ctx, script)
err := r.trackRun(ctx, script, option)
if err != nil {
return xerrors.Errorf("run agent script %q: %w", script.LogSourceID, err)
}
@ -198,8 +216,8 @@ func (r *Runner) Execute(ctx context.Context, filter func(script codersdk.Worksp
}
// trackRun wraps "run" with metrics.
func (r *Runner) trackRun(ctx context.Context, script codersdk.WorkspaceAgentScript) error {
err := r.run(ctx, script)
func (r *Runner) trackRun(ctx context.Context, script codersdk.WorkspaceAgentScript, option ExecuteOption) error {
err := r.run(ctx, script, option)
if err != nil {
r.scriptsExecuted.WithLabelValues("false").Add(1)
} else {
@ -212,7 +230,7 @@ func (r *Runner) trackRun(ctx context.Context, script codersdk.WorkspaceAgentScr
// If the timeout is exceeded, the process is sent an interrupt signal.
// If the process does not exit after a few seconds, it is forcefully killed.
// This function immediately returns after a timeout, and does not wait for the process to exit.
func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript) error {
func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript, option ExecuteOption) error {
logPath := script.LogPath
if logPath == "" {
logPath = fmt.Sprintf("coder-script-%s.log", script.LogSourceID)
@ -299,9 +317,9 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
cmd.Stdout = io.MultiWriter(fileWriter, infoW)
cmd.Stderr = io.MultiWriter(fileWriter, errW)
start := time.Now()
start := dbtime.Now()
defer func() {
end := time.Now()
end := dbtime.Now()
execTime := end.Sub(start)
exitCode := 0
if err != nil {
@ -314,6 +332,60 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
} else {
logger.Info(ctx, fmt.Sprintf("%s script completed", logPath), slog.F("execution_time", execTime), slog.F("exit_code", exitCode))
}
if r.scriptCompleted == nil {
logger.Debug(ctx, "r.scriptCompleted unexpectedly nil")
return
}
// We want to check this outside of the goroutine to avoid a race condition
timedOut := errors.Is(err, ErrTimeout)
pipesLeftOpen := errors.Is(err, ErrOutputPipesOpen)
err = r.trackCommandGoroutine(func() {
var stage proto.Timing_Stage
switch option {
case ExecuteStartScripts:
stage = proto.Timing_START
case ExecuteStopScripts:
stage = proto.Timing_STOP
case ExecuteCronScripts:
stage = proto.Timing_CRON
}
var status proto.Timing_Status
switch {
case timedOut:
status = proto.Timing_TIMED_OUT
case pipesLeftOpen:
status = proto.Timing_PIPES_LEFT_OPEN
case exitCode != 0:
status = proto.Timing_EXIT_FAILURE
default:
status = proto.Timing_OK
}
reportTimeout := 30 * time.Second
reportCtx, cancel := context.WithTimeout(context.Background(), reportTimeout)
defer cancel()
_, err := r.scriptCompleted(reportCtx, &proto.WorkspaceAgentScriptCompletedRequest{
Timing: &proto.Timing{
ScriptId: script.ID[:],
Start: timestamppb.New(start),
End: timestamppb.New(end),
ExitCode: int32(exitCode),
Stage: stage,
Status: status,
},
})
if err != nil {
logger.Error(ctx, fmt.Sprintf("reporting script completed: %s", err.Error()))
}
})
if err != nil {
logger.Error(ctx, fmt.Sprintf("reporting script completed: track command goroutine: %s", err.Error()))
}
}()
err = cmd.Start()