mirror of
https://github.com/coder/coder.git
synced 2025-07-09 11:45:56 +00:00
feat: add provisionerd prometheus metrics (#4909)
This commit is contained in:
@ -29,6 +29,7 @@ import (
|
||||
"github.com/google/go-github/v43/github"
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/spf13/cobra"
|
||||
@ -358,6 +359,7 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
|
||||
AgentStatsRefreshInterval: cfg.AgentStatRefreshInterval.Value,
|
||||
Experimental: ExperimentalEnabled(cmd),
|
||||
DeploymentConfig: cfg,
|
||||
PrometheusRegistry: prometheus.NewRegistry(),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
options.TLSCertificates = tlsConfig.Certificates
|
||||
@ -505,21 +507,25 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
|
||||
defer serveHandler(ctx, logger, nil, cfg.Pprof.Address.Value, "pprof")()
|
||||
}
|
||||
if cfg.Prometheus.Enable.Value {
|
||||
options.PrometheusRegisterer = prometheus.DefaultRegisterer
|
||||
closeUsersFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegisterer, options.Database, 0)
|
||||
options.PrometheusRegistry.MustRegister(collectors.NewGoCollector())
|
||||
options.PrometheusRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
|
||||
|
||||
closeUsersFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegistry, options.Database, 0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("register active users prometheus metric: %w", err)
|
||||
}
|
||||
defer closeUsersFunc()
|
||||
|
||||
closeWorkspacesFunc, err := prometheusmetrics.Workspaces(ctx, options.PrometheusRegisterer, options.Database, 0)
|
||||
closeWorkspacesFunc, err := prometheusmetrics.Workspaces(ctx, options.PrometheusRegistry, options.Database, 0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("register workspaces prometheus metric: %w", err)
|
||||
}
|
||||
defer closeWorkspacesFunc()
|
||||
|
||||
//nolint:revive
|
||||
defer serveHandler(ctx, logger, promhttp.Handler(), cfg.Prometheus.Address.Value, "prometheus")()
|
||||
defer serveHandler(ctx, logger, promhttp.InstrumentMetricHandler(
|
||||
options.PrometheusRegistry, promhttp.HandlerFor(options.PrometheusRegistry, promhttp.HandlerOpts{}),
|
||||
), cfg.Prometheus.Address.Value, "prometheus")()
|
||||
}
|
||||
|
||||
// We use a separate coderAPICloser so the Enterprise API
|
||||
@ -555,8 +561,9 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
|
||||
_ = daemon.Close()
|
||||
}
|
||||
}()
|
||||
provisionerdMetrics := provisionerd.NewMetrics(options.PrometheusRegistry)
|
||||
for i := 0; i < cfg.ProvisionerDaemons.Value; i++ {
|
||||
daemon, err := newProvisionerDaemon(ctx, coderAPI, logger, cfg.CacheDirectory.Value, errCh, false)
|
||||
daemon, err := newProvisionerDaemon(ctx, coderAPI, provisionerdMetrics, logger, cfg.CacheDirectory.Value, errCh, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("create provisioner daemon: %w", err)
|
||||
}
|
||||
@ -823,6 +830,7 @@ func shutdownWithTimeout(shutdown func(context.Context) error, timeout time.Dura
|
||||
func newProvisionerDaemon(
|
||||
ctx context.Context,
|
||||
coderAPI *coderd.API,
|
||||
metrics provisionerd.Metrics,
|
||||
logger slog.Logger,
|
||||
cacheDir string,
|
||||
errCh chan error,
|
||||
@ -899,7 +907,8 @@ func newProvisionerDaemon(
|
||||
UpdateInterval: 500 * time.Millisecond,
|
||||
Provisioners: provisioners,
|
||||
WorkDirectory: tempDir,
|
||||
Tracer: coderAPI.TracerProvider,
|
||||
TracerProvider: coderAPI.TracerProvider,
|
||||
Metrics: &metrics,
|
||||
}), nil
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ type Options struct {
|
||||
GoogleTokenValidator *idtoken.Validator
|
||||
GithubOAuth2Config *GithubOAuth2Config
|
||||
OIDCConfig *OIDCConfig
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
PrometheusRegistry *prometheus.Registry
|
||||
SecureAuthCookie bool
|
||||
SSHKeygenAlgorithm gitsshkey.Algorithm
|
||||
Telemetry telemetry.Reporter
|
||||
@ -132,8 +132,8 @@ func New(options *Options) *API {
|
||||
if options.Authorizer == nil {
|
||||
options.Authorizer = rbac.NewAuthorizer()
|
||||
}
|
||||
if options.PrometheusRegisterer == nil {
|
||||
options.PrometheusRegisterer = prometheus.NewRegistry()
|
||||
if options.PrometheusRegistry == nil {
|
||||
options.PrometheusRegistry = prometheus.NewRegistry()
|
||||
}
|
||||
if options.TailnetCoordinator == nil {
|
||||
options.TailnetCoordinator = tailnet.NewCoordinator()
|
||||
@ -204,7 +204,7 @@ func New(options *Options) *API {
|
||||
httpmw.Recover(api.Logger),
|
||||
httpmw.ExtractRealIP(api.RealIPConfig),
|
||||
httpmw.Logger(api.Logger),
|
||||
httpmw.Prometheus(options.PrometheusRegisterer),
|
||||
httpmw.Prometheus(options.PrometheusRegistry),
|
||||
// handleSubdomainApplications checks if the first subdomain is a valid
|
||||
// app URL. If it is, it will serve that application.
|
||||
api.handleSubdomainApplications(
|
||||
|
@ -38,6 +38,7 @@ func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db datab
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
apiKeys, err := db.GetAPIKeysLastUsedAfter(ctx, database.Now().Add(-1*time.Hour))
|
||||
if err != nil {
|
||||
continue
|
||||
@ -82,6 +83,7 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
builds, err := db.GetLatestWorkspaceBuilds(ctx)
|
||||
if err != nil {
|
||||
continue
|
||||
|
@ -11,6 +11,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/yamux"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/spf13/afero"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
|
||||
@ -43,7 +45,8 @@ type Provisioners map[string]sdkproto.DRPCProvisionerClient
|
||||
type Options struct {
|
||||
Filesystem afero.Fs
|
||||
Logger slog.Logger
|
||||
Tracer trace.TracerProvider
|
||||
TracerProvider trace.TracerProvider
|
||||
Metrics *Metrics
|
||||
|
||||
ForceCancelInterval time.Duration
|
||||
UpdateInterval time.Duration
|
||||
@ -66,14 +69,19 @@ func New(clientDialer Dialer, opts *Options) *Server {
|
||||
if opts.Filesystem == nil {
|
||||
opts.Filesystem = afero.NewOsFs()
|
||||
}
|
||||
if opts.Tracer == nil {
|
||||
opts.Tracer = trace.NewNoopTracerProvider()
|
||||
if opts.TracerProvider == nil {
|
||||
opts.TracerProvider = trace.NewNoopTracerProvider()
|
||||
}
|
||||
if opts.Metrics == nil {
|
||||
reg := prometheus.NewRegistry()
|
||||
mets := NewMetrics(reg)
|
||||
opts.Metrics = &mets
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
daemon := &Server{
|
||||
opts: opts,
|
||||
tracer: opts.Tracer.Tracer(tracing.TracerName),
|
||||
tracer: opts.TracerProvider.Tracer(tracing.TracerName),
|
||||
|
||||
clientDialer: clientDialer,
|
||||
|
||||
@ -103,6 +111,42 @@ type Server struct {
|
||||
activeJob *runner.Runner
|
||||
}
|
||||
|
||||
type Metrics struct {
|
||||
Runner runner.Metrics
|
||||
}
|
||||
|
||||
func NewMetrics(reg prometheus.Registerer) Metrics {
|
||||
auto := promauto.With(reg)
|
||||
durationToFloatMs := func(d time.Duration) float64 {
|
||||
return float64(d.Milliseconds())
|
||||
}
|
||||
|
||||
return Metrics{
|
||||
Runner: runner.Metrics{
|
||||
ConcurrentJobs: auto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "provisionerd",
|
||||
Name: "jobs_current",
|
||||
}, []string{"provisioner"}),
|
||||
JobTimings: auto.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "provisionerd",
|
||||
Name: "job_timings_ms",
|
||||
Buckets: []float64{
|
||||
durationToFloatMs(1 * time.Second),
|
||||
durationToFloatMs(10 * time.Second),
|
||||
durationToFloatMs(30 * time.Second),
|
||||
durationToFloatMs(1 * time.Minute),
|
||||
durationToFloatMs(5 * time.Minute),
|
||||
durationToFloatMs(10 * time.Minute),
|
||||
durationToFloatMs(30 * time.Minute),
|
||||
durationToFloatMs(1 * time.Hour),
|
||||
},
|
||||
}, []string{"provisioner", "status"}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Connect establishes a connection to coderd.
|
||||
func (p *Server) connect(ctx context.Context) {
|
||||
// An exponential back-off occurs when the connection is failing to dial.
|
||||
@ -282,6 +326,7 @@ func (p *Server) acquireJob(ctx context.Context) {
|
||||
p.opts.UpdateInterval,
|
||||
p.opts.ForceCancelInterval,
|
||||
p.tracer,
|
||||
p.opts.Metrics.Runner,
|
||||
)
|
||||
|
||||
go p.activeJob.Run()
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/afero"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
|
||||
@ -34,6 +35,7 @@ const (
|
||||
|
||||
type Runner struct {
|
||||
tracer trace.Tracer
|
||||
metrics Metrics
|
||||
job *proto.AcquiredJob
|
||||
sender JobUpdater
|
||||
logger slog.Logger
|
||||
@ -65,6 +67,12 @@ type Runner struct {
|
||||
okToSend bool
|
||||
}
|
||||
|
||||
type Metrics struct {
|
||||
ConcurrentJobs *prometheus.GaugeVec
|
||||
// JobTimings also counts the total amount of jobs.
|
||||
JobTimings *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
type JobUpdater interface {
|
||||
UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error)
|
||||
FailJob(ctx context.Context, in *proto.FailedJob) error
|
||||
@ -82,6 +90,7 @@ func NewRunner(
|
||||
updateInterval time.Duration,
|
||||
forceCancelInterval time.Duration,
|
||||
tracer trace.Tracer,
|
||||
metrics Metrics,
|
||||
) *Runner {
|
||||
m := new(sync.Mutex)
|
||||
|
||||
@ -91,6 +100,7 @@ func NewRunner(
|
||||
|
||||
return &Runner{
|
||||
tracer: tracer,
|
||||
metrics: metrics,
|
||||
job: job,
|
||||
sender: updater,
|
||||
logger: logger.With(slog.F("job_id", job.JobId)),
|
||||
@ -120,9 +130,22 @@ func NewRunner(
|
||||
// that goroutine on the context passed into Fail(), and it marks okToSend false to signal us here
|
||||
// that this function should not also send a terminal message.
|
||||
func (r *Runner) Run() {
|
||||
start := time.Now()
|
||||
ctx, span := r.startTrace(r.notStopped, tracing.FuncName())
|
||||
defer span.End()
|
||||
|
||||
concurrentGauge := r.metrics.ConcurrentJobs.WithLabelValues(r.job.Provisioner)
|
||||
concurrentGauge.Inc()
|
||||
defer func() {
|
||||
status := "success"
|
||||
if r.failedJob != nil {
|
||||
status = "failed"
|
||||
}
|
||||
|
||||
concurrentGauge.Dec()
|
||||
r.metrics.JobTimings.WithLabelValues(r.job.Provisioner, status).Observe(float64(time.Since(start).Milliseconds()))
|
||||
}()
|
||||
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
defer r.stop()
|
||||
|
Reference in New Issue
Block a user