feat: unified tracing between coderd<->provisionerd (#7370)

This commit is contained in:
Colin Adler
2023-05-03 18:02:35 -05:00
committed by GitHub
parent 3368b8b65f
commit 8bd9f9c351
28 changed files with 592 additions and 331 deletions

View File

@ -236,6 +236,9 @@ func New(options *Options) *API {
if options.SSHConfig.HostnamePrefix == "" {
options.SSHConfig.HostnamePrefix = "coder."
}
if options.TracerProvider == nil {
options.TracerProvider = trace.NewNoopTracerProvider()
}
if options.SetUserGroups == nil {
options.SetUserGroups = func(ctx context.Context, _ database.Store, id uuid.UUID, groups []string) error {
options.Logger.Warn(ctx, "attempted to assign OIDC groups without enterprise license",
@ -898,6 +901,7 @@ func compressHandler(h http.Handler) http.Handler {
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
tracer := api.TracerProvider.Tracer(tracing.TracerName)
clientSession, serverSession := provisionersdk.MemTransportPipe()
defer func() {
if err != nil {
@ -937,6 +941,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
Provisioners: daemon.Provisioners,
GitAuthConfigs: api.GitAuthConfigs,
Telemetry: api.Telemetry,
Tracer: tracer,
Tags: tags,
QuotaCommitter: &api.QuotaCommitter,
Auditor: &api.Auditor,
@ -947,14 +952,16 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
if err != nil {
return nil, err
}
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
},
},
})
)
go func() {
err := server.Serve(ctx, serverSession)
if err != nil && !xerrors.Is(err, io.EOF) {

View File

@ -346,7 +346,8 @@ CREATE TABLE provisioner_jobs (
worker_id uuid,
file_id uuid NOT NULL,
tags jsonb DEFAULT '{"scope": "organization"}'::jsonb NOT NULL,
error_code text
error_code text,
trace_metadata jsonb
);
CREATE TABLE replicas (

View File

@ -0,0 +1 @@
ALTER TABLE provisioner_jobs DROP COLUMN trace_metadata;

View File

@ -0,0 +1 @@
ALTER TABLE provisioner_jobs ADD COLUMN trace_metadata jsonb;

View File

@ -1380,6 +1380,7 @@ type ProvisionerJob struct {
FileID uuid.UUID `db:"file_id" json:"file_id"`
Tags dbtype.StringMap `db:"tags" json:"tags"`
ErrorCode sql.NullString `db:"error_code" json:"error_code"`
TraceMetadata pqtype.NullRawMessage `db:"trace_metadata" json:"trace_metadata"`
}
type ProvisionerJobLog struct {

View File

@ -2499,7 +2499,7 @@ WHERE
SKIP LOCKED
LIMIT
1
) RETURNING id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code
) RETURNING id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata
`
type AcquireProvisionerJobParams struct {
@ -2541,13 +2541,14 @@ func (q *sqlQuerier) AcquireProvisionerJob(ctx context.Context, arg AcquireProvi
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
)
return i, err
}
const getProvisionerJobByID = `-- name: GetProvisionerJobByID :one
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata
FROM
provisioner_jobs
WHERE
@ -2575,13 +2576,14 @@ func (q *sqlQuerier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (P
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
)
return i, err
}
const getProvisionerJobsByIDs = `-- name: GetProvisionerJobsByIDs :many
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata
FROM
provisioner_jobs
WHERE
@ -2615,6 +2617,7 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
); err != nil {
return nil, err
}
@ -2630,7 +2633,7 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI
}
const getProvisionerJobsCreatedAfter = `-- name: GetProvisionerJobsCreatedAfter :many
SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code FROM provisioner_jobs WHERE created_at > $1
SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata FROM provisioner_jobs WHERE created_at > $1
`
func (q *sqlQuerier) GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error) {
@ -2660,6 +2663,7 @@ func (q *sqlQuerier) GetProvisionerJobsCreatedAfter(ctx context.Context, created
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
); err != nil {
return nil, err
}
@ -2687,10 +2691,11 @@ INSERT INTO
file_id,
"type",
"input",
tags
tags,
trace_metadata
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata
`
type InsertProvisionerJobParams struct {
@ -2705,6 +2710,7 @@ type InsertProvisionerJobParams struct {
Type ProvisionerJobType `db:"type" json:"type"`
Input json.RawMessage `db:"input" json:"input"`
Tags dbtype.StringMap `db:"tags" json:"tags"`
TraceMetadata pqtype.NullRawMessage `db:"trace_metadata" json:"trace_metadata"`
}
func (q *sqlQuerier) InsertProvisionerJob(ctx context.Context, arg InsertProvisionerJobParams) (ProvisionerJob, error) {
@ -2720,6 +2726,7 @@ func (q *sqlQuerier) InsertProvisionerJob(ctx context.Context, arg InsertProvisi
arg.Type,
arg.Input,
arg.Tags,
arg.TraceMetadata,
)
var i ProvisionerJob
err := row.Scan(
@ -2740,6 +2747,7 @@ func (q *sqlQuerier) InsertProvisionerJob(ctx context.Context, arg InsertProvisi
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
)
return i, err
}

View File

@ -63,10 +63,11 @@ INSERT INTO
file_id,
"type",
"input",
tags
tags,
trace_metadata
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
-- name: UpdateProvisionerJobByID :exec
UPDATE

View File

@ -17,6 +17,8 @@ import (
"github.com/google/uuid"
"github.com/tabbed/pqtype"
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/oauth2"
@ -33,6 +35,7 @@ import (
"github.com/coder/coder/coderd/parameter"
"github.com/coder/coder/coderd/schedule"
"github.com/coder/coder/coderd/telemetry"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/provisioner"
"github.com/coder/coder/provisionerd/proto"
@ -55,6 +58,7 @@ type Server struct {
Database database.Store
Pubsub database.Pubsub
Telemetry telemetry.Reporter
Tracer trace.Tracer
QuotaCommitter *atomic.Pointer[proto.QuotaCommitter]
Auditor *atomic.Pointer[audit.Auditor]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
@ -129,12 +133,22 @@ func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Ac
return nil, failJob(fmt.Sprintf("get user: %s", err))
}
protoJob := &proto.AcquiredJob{
JobId: job.ID.String(),
CreatedAt: job.CreatedAt.UnixMilli(),
Provisioner: string(job.Provisioner),
UserName: user.Username,
jobTraceMetadata := map[string]string{}
if job.TraceMetadata.Valid {
err := json.Unmarshal(job.TraceMetadata.RawMessage, &jobTraceMetadata)
if err != nil {
return nil, failJob(fmt.Sprintf("unmarshal metadata: %s", err))
}
}
protoJob := &proto.AcquiredJob{
JobId: job.ID.String(),
CreatedAt: job.CreatedAt.UnixMilli(),
Provisioner: string(job.Provisioner),
UserName: user.Username,
TraceMetadata: jobTraceMetadata,
}
switch job.Type {
case database.ProvisionerJobTypeWorkspaceBuild:
var input WorkspaceProvisionJob
@ -411,6 +425,9 @@ func (server *Server) includeLastVariableValues(ctx context.Context, templateVer
}
func (server *Server) CommitQuota(ctx context.Context, request *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) {
ctx, span := server.startTrace(ctx, tracing.FuncName())
defer span.End()
//nolint:gocritic // Provisionerd has specific authz rules.
ctx = dbauthz.AsProvisionerd(ctx)
jobID, err := uuid.Parse(request.JobId)
@ -442,6 +459,9 @@ func (server *Server) CommitQuota(ctx context.Context, request *proto.CommitQuot
}
func (server *Server) UpdateJob(ctx context.Context, request *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
ctx, span := server.startTrace(ctx, tracing.FuncName())
defer span.End()
//nolint:gocritic // Provisionerd has specific authz rules.
ctx = dbauthz.AsProvisionerd(ctx)
parsedID, err := uuid.Parse(request.JobId)
@ -671,6 +691,9 @@ func (server *Server) UpdateJob(ctx context.Context, request *proto.UpdateJobReq
}
func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*proto.Empty, error) {
ctx, span := server.startTrace(ctx, tracing.FuncName())
defer span.End()
//nolint:gocritic // Provisionerd has specific authz rules.
ctx = dbauthz.AsProvisionerd(ctx)
jobID, err := uuid.Parse(failJob.JobId)
@ -822,6 +845,9 @@ func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*p
//
//nolint:gocyclo
func (server *Server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
ctx, span := server.startTrace(ctx, tracing.FuncName())
defer span.End()
//nolint:gocritic // Provisionerd has specific authz rules.
ctx = dbauthz.AsProvisionerd(ctx)
jobID, err := uuid.Parse(completed.JobId)
@ -1192,6 +1218,12 @@ func (server *Server) CompleteJob(ctx context.Context, completed *proto.Complete
return &proto.Empty{}, nil
}
func (server *Server) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return server.Tracer.Start(ctx, name, append(opts, trace.WithAttributes(
semconv.ServiceNameKey.String("coderd.provisionerd"),
))...)
}
func InsertWorkspaceResource(ctx context.Context, db database.Store, jobID uuid.UUID, transition database.WorkspaceTransition, protoResource *sdkproto.Resource, snapshot *telemetry.Snapshot) error {
resource, err := db.InsertWorkspaceResource(ctx, database.InsertWorkspaceResourceParams{
ID: uuid.New(),

View File

@ -11,6 +11,7 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"golang.org/x/oauth2"
"cdr.dev/slog/sloggers/slogtest"
@ -59,6 +60,7 @@ func TestAcquireJob(t *testing.T) {
AcquireJobDebounce: time.Hour,
Auditor: mockAuditor(),
TemplateScheduleStore: testTemplateScheduleStore(),
Tracer: trace.NewNoopTracerProvider().Tracer("noop"),
}
job, err := srv.AcquireJob(context.Background(), nil)
require.NoError(t, err)
@ -1202,6 +1204,7 @@ func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server {
Telemetry: telemetry.NewNoop(),
Auditor: mockAuditor(),
TemplateScheduleStore: testTemplateScheduleStore(),
Tracer: trace.NewNoopTracerProvider().Tracer("noop"),
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/moby/moby/pkg/namesgenerator"
"github.com/tabbed/pqtype"
"golang.org/x/xerrors"
"cdr.dev/slog"
@ -25,6 +26,7 @@ import (
"github.com/coder/coder/coderd/parameter"
"github.com/coder/coder/coderd/provisionerdserver"
"github.com/coder/coder/coderd/rbac"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/examples"
sdkproto "github.com/coder/coder/provisionersdk/proto"
@ -574,6 +576,15 @@ func (api *API) postTemplateVersionDryRun(rw http.ResponseWriter, r *http.Reques
return
}
metadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error unmarshalling metadata.",
Detail: err.Error(),
})
return
}
// Create a dry-run job
jobID := uuid.New()
provisionerJob, err := api.Database.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
@ -589,6 +600,10 @@ func (api *API) postTemplateVersionDryRun(rw http.ResponseWriter, r *http.Reques
Input: input,
// Copy tags from the previous run.
Tags: job.Tags,
TraceMetadata: pqtype.NullRawMessage{
Valid: true,
RawMessage: metadataRaw,
},
})
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
@ -1408,6 +1423,10 @@ func (api *API) postTemplateVersionsByOrganization(rw http.ResponseWriter, r *ht
if err != nil {
return xerrors.Errorf("marshal job input: %w", err)
}
traceMetadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
if err != nil {
return xerrors.Errorf("marshal job metadata: %w", err)
}
provisionerJob, err = tx.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: jobID,
@ -1421,6 +1440,10 @@ func (api *API) postTemplateVersionsByOrganization(rw http.ResponseWriter, r *ht
Type: database.ProvisionerJobTypeTemplateVersionImport,
Input: jobInput,
Tags: tags,
TraceMetadata: pqtype.NullRawMessage{
Valid: true,
RawMessage: traceMetadataRaw,
},
})
if err != nil {
return xerrors.Errorf("insert provisioner job: %w", err)

54
coderd/tracing/drpc.go Normal file
View File

@ -0,0 +1,54 @@
package tracing
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"storj.io/drpc"
"storj.io/drpc/drpcmetadata"
)
type DRPCHandler struct {
Handler drpc.Handler
}
func (t *DRPCHandler) HandleRPC(stream drpc.Stream, rpc string) error {
metadata, ok := drpcmetadata.Get(stream.Context())
if ok {
ctx := otel.GetTextMapPropagator().Extract(stream.Context(), propagation.MapCarrier(metadata))
stream = &drpcStreamWrapper{Stream: stream, ctx: ctx}
}
return t.Handler.HandleRPC(stream, rpc)
}
type drpcStreamWrapper struct {
drpc.Stream
ctx context.Context
}
func (s *drpcStreamWrapper) Context() context.Context { return s.ctx }
type DRPCConn struct {
drpc.Conn
}
// Invoke implements drpc.Conn's Invoke method with tracing information injected into the context.
func (c *DRPCConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in drpc.Message, out drpc.Message) (err error) {
return c.Conn.Invoke(c.addMetadata(ctx), rpc, enc, in, out)
}
// NewStream implements drpc.Conn's NewStream method with tracing information injected into the context.
func (c *DRPCConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
return c.Conn.NewStream(c.addMetadata(ctx), rpc, enc)
}
// addMetadata propagates the headers into a map that we inject into drpc metadata so they are
// sent across the wire for the server to get.
func (*DRPCConn) addMetadata(ctx context.Context) context.Context {
metadata := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(metadata))
return drpcmetadata.AddPairs(ctx, metadata)
}

View File

@ -5,6 +5,8 @@ import (
"runtime"
"strings"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
@ -41,3 +43,13 @@ func RunWithoutSpan(ctx context.Context, fn func(ctx context.Context)) {
ctx = trace.ContextWithSpan(ctx, NoopSpan)
fn(ctx)
}
func MetadataFromContext(ctx context.Context) map[string]string {
metadata := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(metadata))
return metadata
}
func MetadataToContext(ctx context.Context, metadata map[string]string) context.Context {
return otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(metadata))
}

View File

@ -12,6 +12,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/tabbed/pqtype"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
@ -21,6 +22,7 @@ import (
"github.com/coder/coder/coderd/httpmw"
"github.com/coder/coder/coderd/provisionerdserver"
"github.com/coder/coder/coderd/rbac"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
)
@ -605,6 +607,11 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
if err != nil {
return xerrors.Errorf("marshal provision job: %w", err)
}
traceMetadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
if err != nil {
return xerrors.Errorf("marshal metadata: %w", err)
}
provisionerJob, err = db.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: uuid.New(),
CreatedAt: database.Now(),
@ -617,6 +624,10 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
FileID: templateVersionJob.FileID,
Input: input,
Tags: tags,
TraceMetadata: pqtype.NullRawMessage{
Valid: true,
RawMessage: traceMetadataRaw,
},
})
if err != nil {
return xerrors.Errorf("insert provisioner job: %w", err)

View File

@ -13,6 +13,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/tabbed/pqtype"
"golang.org/x/xerrors"
"cdr.dev/slog"
@ -25,6 +26,7 @@ import (
"github.com/coder/coder/coderd/schedule"
"github.com/coder/coder/coderd/searchquery"
"github.com/coder/coder/coderd/telemetry"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/util/ptr"
"github.com/coder/coder/codersdk"
)
@ -514,6 +516,10 @@ func (api *API) postWorkspacesByOrganization(rw http.ResponseWriter, r *http.Req
if err != nil {
return xerrors.Errorf("marshal provision job: %w", err)
}
traceMetadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
if err != nil {
return xerrors.Errorf("marshal metadata: %w", err)
}
provisionerJob, err = db.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: uuid.New(),
CreatedAt: now,
@ -526,6 +532,10 @@ func (api *API) postWorkspacesByOrganization(rw http.ResponseWriter, r *http.Req
FileID: templateVersionJob.FileID,
Input: input,
Tags: tags,
TraceMetadata: pqtype.NullRawMessage{
Valid: true,
RawMessage: traceMetadataRaw,
},
})
if err != nil {
return xerrors.Errorf("insert provisioner job: %w", err)