mirror of
https://github.com/coder/coder.git
synced 2025-07-15 22:20:27 +00:00
feat: integrate new agentexec pkg (#15609)
- Integrates the `agentexec` pkg into the agent and removes the legacy system of iterating over the process tree. It adds some linting rules to hopefully catch future improper uses of `exec.Command` in the package.
This commit is contained in:
268
agent/agent.go
268
agent/agent.go
@ -12,8 +12,6 @@ import (
|
||||
"os"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -35,7 +33,6 @@ import (
|
||||
"tailscale.com/util/clientmetric"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/v2/agent/agentproc"
|
||||
"github.com/coder/coder/v2/agent/agentscripts"
|
||||
"github.com/coder/coder/v2/agent/agentssh"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
@ -82,12 +79,7 @@ type Options struct {
|
||||
PrometheusRegistry *prometheus.Registry
|
||||
ReportMetadataInterval time.Duration
|
||||
ServiceBannerRefreshInterval time.Duration
|
||||
Syscaller agentproc.Syscaller
|
||||
// ModifiedProcesses is used for testing process priority management.
|
||||
ModifiedProcesses chan []*agentproc.Process
|
||||
// ProcessManagementTick is used for testing process priority management.
|
||||
ProcessManagementTick <-chan time.Time
|
||||
BlockFileTransfer bool
|
||||
BlockFileTransfer bool
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
@ -147,10 +139,6 @@ func New(options Options) Agent {
|
||||
prometheusRegistry = prometheus.NewRegistry()
|
||||
}
|
||||
|
||||
if options.Syscaller == nil {
|
||||
options.Syscaller = agentproc.NewSyscaller()
|
||||
}
|
||||
|
||||
hardCtx, hardCancel := context.WithCancel(context.Background())
|
||||
gracefulCtx, gracefulCancel := context.WithCancel(hardCtx)
|
||||
a := &agent{
|
||||
@ -178,9 +166,6 @@ func New(options Options) Agent {
|
||||
announcementBannersRefreshInterval: options.ServiceBannerRefreshInterval,
|
||||
sshMaxTimeout: options.SSHMaxTimeout,
|
||||
subsystems: options.Subsystems,
|
||||
syscaller: options.Syscaller,
|
||||
modifiedProcs: options.ModifiedProcesses,
|
||||
processManagementTick: options.ProcessManagementTick,
|
||||
logSender: agentsdk.NewLogSender(options.Logger),
|
||||
blockFileTransfer: options.BlockFileTransfer,
|
||||
|
||||
@ -253,13 +238,7 @@ type agent struct {
|
||||
prometheusRegistry *prometheus.Registry
|
||||
// metrics are prometheus registered metrics that will be collected and
|
||||
// labeled in Coder with the agent + workspace.
|
||||
metrics *agentMetrics
|
||||
syscaller agentproc.Syscaller
|
||||
|
||||
// modifiedProcs is used for testing process priority management.
|
||||
modifiedProcs chan []*agentproc.Process
|
||||
// processManagementTick is used for testing process priority management.
|
||||
processManagementTick <-chan time.Time
|
||||
metrics *agentMetrics
|
||||
}
|
||||
|
||||
func (a *agent) TailnetConn() *tailnet.Conn {
|
||||
@ -308,8 +287,6 @@ func (a *agent) init() {
|
||||
// may be happening, but regardless after the intermittent
|
||||
// failure, you'll want the agent to reconnect.
|
||||
func (a *agent) runLoop() {
|
||||
go a.manageProcessPriorityUntilGracefulShutdown()
|
||||
|
||||
// need to keep retrying up to the hardCtx so that we can send graceful shutdown-related
|
||||
// messages.
|
||||
ctx := a.hardCtx
|
||||
@ -1443,162 +1420,6 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
|
||||
return stats
|
||||
}
|
||||
|
||||
var prioritizedProcs = []string{"coder agent"}
|
||||
|
||||
func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
|
||||
// process priority can stop as soon as we are gracefully shutting down
|
||||
ctx := a.gracefulCtx
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
a.logger.Critical(ctx, "recovered from panic",
|
||||
slog.F("panic", r),
|
||||
slog.F("stack", string(debug.Stack())),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
if val := a.environmentVariables[EnvProcPrioMgmt]; val == "" || runtime.GOOS != "linux" {
|
||||
a.logger.Debug(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ",
|
||||
slog.F("env_var", EnvProcPrioMgmt),
|
||||
slog.F("value", val),
|
||||
slog.F("goos", runtime.GOOS),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if a.processManagementTick == nil {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
a.processManagementTick = ticker.C
|
||||
}
|
||||
|
||||
oomScore := unsetOOMScore
|
||||
if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok {
|
||||
score, err := strconv.Atoi(strings.TrimSpace(scoreStr))
|
||||
if err == nil && score >= -1000 && score <= 1000 {
|
||||
oomScore = score
|
||||
} else {
|
||||
a.logger.Error(ctx, "invalid oom score",
|
||||
slog.F("min_value", -1000),
|
||||
slog.F("max_value", 1000),
|
||||
slog.F("value", scoreStr),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
debouncer := &logDebouncer{
|
||||
logger: a.logger,
|
||||
messages: map[string]time.Time{},
|
||||
interval: time.Minute,
|
||||
}
|
||||
|
||||
for {
|
||||
procs, err := a.manageProcessPriority(ctx, debouncer, oomScore)
|
||||
// Avoid spamming the logs too often.
|
||||
if err != nil {
|
||||
debouncer.Error(ctx, "manage process priority",
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
if a.modifiedProcs != nil {
|
||||
a.modifiedProcs <- procs
|
||||
}
|
||||
|
||||
select {
|
||||
case <-a.processManagementTick:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unsetOOMScore is set to an invalid OOM score to imply an unset value.
|
||||
const unsetOOMScore = 1001
|
||||
|
||||
func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) {
|
||||
const (
|
||||
niceness = 10
|
||||
)
|
||||
|
||||
// We fetch the agent score each time because it's possible someone updates the
|
||||
// value after it is started.
|
||||
agentScore, err := a.getAgentOOMScore()
|
||||
if err != nil {
|
||||
agentScore = unsetOOMScore
|
||||
}
|
||||
if oomScore == unsetOOMScore && agentScore != unsetOOMScore {
|
||||
// If the child score has not been explicitly specified we should
|
||||
// set it to a score relative to the agent score.
|
||||
oomScore = childOOMScore(agentScore)
|
||||
}
|
||||
|
||||
procs, err := agentproc.List(a.filesystem, a.syscaller)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("list: %w", err)
|
||||
}
|
||||
|
||||
modProcs := []*agentproc.Process{}
|
||||
|
||||
for _, proc := range procs {
|
||||
containsFn := func(e string) bool {
|
||||
contains := strings.Contains(proc.Cmd(), e)
|
||||
return contains
|
||||
}
|
||||
|
||||
// If the process is prioritized we should adjust
|
||||
// it's oom_score_adj and avoid lowering its niceness.
|
||||
if slices.ContainsFunc(prioritizedProcs, containsFn) {
|
||||
continue
|
||||
}
|
||||
|
||||
score, niceErr := proc.Niceness(a.syscaller)
|
||||
if niceErr != nil && !isBenignProcessErr(niceErr) {
|
||||
debouncer.Warn(ctx, "unable to get proc niceness",
|
||||
slog.F("cmd", proc.Cmd()),
|
||||
slog.F("pid", proc.PID),
|
||||
slog.Error(niceErr),
|
||||
)
|
||||
}
|
||||
|
||||
// We only want processes that don't have a nice value set
|
||||
// so we don't override user nice values.
|
||||
// Getpriority actually returns priority for the nice value
|
||||
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
|
||||
if score != 20 {
|
||||
// We don't log here since it can get spammy
|
||||
continue
|
||||
}
|
||||
|
||||
if niceErr == nil {
|
||||
err := proc.SetNiceness(a.syscaller, niceness)
|
||||
if err != nil && !isBenignProcessErr(err) {
|
||||
debouncer.Warn(ctx, "unable to set proc niceness",
|
||||
slog.F("cmd", proc.Cmd()),
|
||||
slog.F("pid", proc.PID),
|
||||
slog.F("niceness", niceness),
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it.
|
||||
if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) {
|
||||
oomScoreStr := strconv.Itoa(oomScore)
|
||||
err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644)
|
||||
if err != nil && !isBenignProcessErr(err) {
|
||||
debouncer.Warn(ctx, "unable to set oom_score_adj",
|
||||
slog.F("cmd", proc.Cmd()),
|
||||
slog.F("pid", proc.PID),
|
||||
slog.F("score", oomScoreStr),
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
modProcs = append(modProcs, proc)
|
||||
}
|
||||
return modProcs, nil
|
||||
}
|
||||
|
||||
// isClosed returns whether the API is closed or not.
|
||||
func (a *agent) isClosed() bool {
|
||||
return a.hardCtx.Err() != nil
|
||||
@ -1992,88 +1813,3 @@ func PrometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger sl
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// childOOMScore returns the oom_score_adj for a child process. It is based
|
||||
// on the oom_score_adj of the agent process.
|
||||
func childOOMScore(agentScore int) int {
|
||||
// If the agent has a negative oom_score_adj, we set the child to 0
|
||||
// so it's treated like every other process.
|
||||
if agentScore < 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// If the agent is already almost at the maximum then set it to the max.
|
||||
if agentScore >= 998 {
|
||||
return 1000
|
||||
}
|
||||
|
||||
// If the agent oom_score_adj is >=0, we set the child to slightly
|
||||
// less than the maximum. If users want a different score they set it
|
||||
// directly.
|
||||
return 998
|
||||
}
|
||||
|
||||
func (a *agent) getAgentOOMScore() (int, error) {
|
||||
scoreStr, err := afero.ReadFile(a.filesystem, "/proc/self/oom_score_adj")
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("read file: %w", err)
|
||||
}
|
||||
|
||||
score, err := strconv.Atoi(strings.TrimSpace(string(scoreStr)))
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("parse int: %w", err)
|
||||
}
|
||||
|
||||
return score, nil
|
||||
}
|
||||
|
||||
// isCustomOOMScore checks to see if the oom_score_adj is not a value that would
|
||||
// originate from an agent-spawned process.
|
||||
func isCustomOOMScore(agentScore int, process *agentproc.Process) bool {
|
||||
score := process.OOMScoreAdj
|
||||
return agentScore != score && score != 1000 && score != 0 && score != 998
|
||||
}
|
||||
|
||||
// logDebouncer skips writing a log for a particular message if
|
||||
// it's been emitted within the given interval duration.
|
||||
// It's a shoddy implementation used in one spot that should be replaced at
|
||||
// some point.
|
||||
type logDebouncer struct {
|
||||
logger slog.Logger
|
||||
messages map[string]time.Time
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) {
|
||||
l.log(ctx, slog.LevelWarn, msg, fields...)
|
||||
}
|
||||
|
||||
func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) {
|
||||
l.log(ctx, slog.LevelError, msg, fields...)
|
||||
}
|
||||
|
||||
func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) {
|
||||
// This (bad) implementation assumes you wouldn't reuse the same msg
|
||||
// for different levels.
|
||||
if last, ok := l.messages[msg]; ok && time.Since(last) < l.interval {
|
||||
return
|
||||
}
|
||||
switch level {
|
||||
case slog.LevelWarn:
|
||||
l.logger.Warn(ctx, msg, fields...)
|
||||
case slog.LevelError:
|
||||
l.logger.Error(ctx, msg, fields...)
|
||||
}
|
||||
l.messages[msg] = time.Now()
|
||||
}
|
||||
|
||||
func isBenignProcessErr(err error) bool {
|
||||
return err != nil &&
|
||||
(xerrors.Is(err, os.ErrNotExist) ||
|
||||
xerrors.Is(err, os.ErrPermission) ||
|
||||
isNoSuchProcessErr(err))
|
||||
}
|
||||
|
||||
func isNoSuchProcessErr(err error) bool {
|
||||
return err != nil && strings.Contains(err.Error(), "no such process")
|
||||
}
|
||||
|
Reference in New Issue
Block a user