feat: implement agent process management (#9461)

- An opt-in feature has been added to the agent to allow
   deprioritizing non coder-related processes for CPU by setting their
   niceness level to 10.
- Opting in to the feature requires setting CODER_PROC_PRIO_MGMT to a non-empty value.
This commit is contained in:
Jon Ayers
2023-09-14 19:45:05 -05:00
committed by GitHub
parent 79d4179123
commit 7311ffbd9d
13 changed files with 856 additions and 1 deletions

View File

@ -15,6 +15,8 @@ import (
"os/exec"
"os/user"
"path/filepath"
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
@ -34,6 +36,7 @@ import (
"tailscale.com/types/netlogtype"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/agentproc"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/buildinfo"
@ -51,6 +54,10 @@ const (
ProtocolDial = "dial"
)
// EnvProcPrioMgmt determines whether we attempt to manage
// process CPU and OOM Killer priority.
const EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT"
type Options struct {
Filesystem afero.Fs
LogDir string
@ -68,6 +75,11 @@ type Options struct {
PrometheusRegistry *prometheus.Registry
ReportMetadataInterval time.Duration
ServiceBannerRefreshInterval time.Duration
Syscaller agentproc.Syscaller
// ModifiedProcesses is used for testing process priority management.
ModifiedProcesses chan []*agentproc.Process
// ProcessManagementTick is used for testing process priority management.
ProcessManagementTick <-chan time.Time
}
type Client interface {
@ -120,6 +132,10 @@ func New(options Options) Agent {
prometheusRegistry = prometheus.NewRegistry()
}
if options.Syscaller == nil {
options.Syscaller = agentproc.NewSyscaller()
}
ctx, cancelFunc := context.WithCancel(context.Background())
a := &agent{
tailnetListenPort: options.TailnetListenPort,
@ -143,6 +159,9 @@ func New(options Options) Agent {
sshMaxTimeout: options.SSHMaxTimeout,
subsystems: options.Subsystems,
addresses: options.Addresses,
syscaller: options.Syscaller,
modifiedProcs: options.ModifiedProcesses,
processManagementTick: options.ProcessManagementTick,
prometheusRegistry: prometheusRegistry,
metrics: newAgentMetrics(prometheusRegistry),
@ -197,6 +216,12 @@ type agent struct {
prometheusRegistry *prometheus.Registry
metrics *agentMetrics
syscaller agentproc.Syscaller
// modifiedProcs is used for testing process priority management.
modifiedProcs chan []*agentproc.Process
// processManagementTick is used for testing process priority management.
processManagementTick <-chan time.Time
}
func (a *agent) TailnetConn() *tailnet.Conn {
@ -225,6 +250,7 @@ func (a *agent) runLoop(ctx context.Context) {
go a.reportLifecycleLoop(ctx)
go a.reportMetadataLoop(ctx)
go a.fetchServiceBannerLoop(ctx)
go a.manageProcessPriorityLoop(ctx)
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
a.logger.Info(ctx, "connecting to coderd")
@ -1253,6 +1279,119 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
}
}
var prioritizedProcs = []string{"coder agent"}
func (a *agent) manageProcessPriorityLoop(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
a.logger.Critical(ctx, "recovered from panic",
slog.F("panic", r),
slog.F("stack", string(debug.Stack())),
)
}
}()
if val := a.envVars[EnvProcPrioMgmt]; val == "" || runtime.GOOS != "linux" {
a.logger.Debug(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ",
slog.F("env_var", EnvProcPrioMgmt),
slog.F("value", val),
slog.F("goos", runtime.GOOS),
)
return
}
if a.processManagementTick == nil {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
a.processManagementTick = ticker.C
}
for {
procs, err := a.manageProcessPriority(ctx)
if err != nil {
a.logger.Error(ctx, "manage process priority",
slog.Error(err),
)
}
if a.modifiedProcs != nil {
a.modifiedProcs <- procs
}
select {
case <-a.processManagementTick:
case <-ctx.Done():
return
}
}
}
func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process, error) {
const (
niceness = 10
)
procs, err := agentproc.List(a.filesystem, a.syscaller)
if err != nil {
return nil, xerrors.Errorf("list: %w", err)
}
var (
modProcs = []*agentproc.Process{}
logger slog.Logger
)
for _, proc := range procs {
logger = a.logger.With(
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
)
containsFn := func(e string) bool {
contains := strings.Contains(proc.Cmd(), e)
return contains
}
// If the process is prioritized we should adjust
// it's oom_score_adj and avoid lowering its niceness.
if slices.ContainsFunc[[]string, string](prioritizedProcs, containsFn) {
continue
}
score, err := proc.Niceness(a.syscaller)
if err != nil {
logger.Warn(ctx, "unable to get proc niceness",
slog.Error(err),
)
continue
}
// We only want processes that don't have a nice value set
// so we don't override user nice values.
// Getpriority actually returns priority for the nice value
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
if score != 20 {
if score != niceness {
logger.Debug(ctx, "skipping process due to custom niceness",
slog.F("niceness", score),
)
}
continue
}
err = proc.SetNiceness(a.syscaller, niceness)
if err != nil {
logger.Warn(ctx, "unable to set proc niceness",
slog.F("niceness", niceness),
slog.Error(err),
)
continue
}
modProcs = append(modProcs, proc)
}
return modProcs, nil
}
// isClosed returns whether the API is closed or not.
func (a *agent) isClosed() bool {
select {