feat: Add graceful exits to provisionerd (#372)

* ci: Update DataDog GitHub branch to fallback to GITHUB_REF

This was detecting branches, but not our "main" branch before.
Hopefully this fixes it!

* Add basic Terraform Provider

* Rename post files to upload

* Add tests for resources

* Skip instance identity test

* Add tests for ensuring agent get's passed through properly

* Fix linting errors

* Add echo path

* Fix agent authentication

* fix: Convert all jobs to use a common resource and agent type

This enables a consistent API for project import and provisioned resources.

* Add "coder_workspace" data source

* feat: Remove magical parameters from being injected

This is a much cleaner abstraction. Explicitly declaring the user
parameters for each provisioner makes for significantly simpler
testing.

* feat: Add graceful exits to provisionerd

Terraform (or other provisioners) may need to cleanup state, or
cancel actions before exit. This adds the ability to gracefully
exit provisionerd.

* Fix cancel error check
This commit is contained in:
Kyle Carberry
2022-02-28 12:40:49 -06:00
committed by GitHub
parent e5c95552cd
commit 9d2803e07a
13 changed files with 1091 additions and 537 deletions

View File

@ -103,7 +103,7 @@ func TestProvisionerd(t *testing.T) {
}, nil
},
updateJob: noopUpdateJob,
cancelJob: func(ctx context.Context, job *proto.CancelledJob) (*proto.Empty, error) {
failJob: func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
close(completeChan)
return &proto.Empty{}, nil
},
@ -144,7 +144,7 @@ func TestProvisionerd(t *testing.T) {
}, nil
},
updateJob: noopUpdateJob,
cancelJob: func(ctx context.Context, job *proto.CancelledJob) (*proto.Empty, error) {
failJob: func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
close(completeChan)
return &proto.Empty{}, nil
},
@ -179,7 +179,7 @@ func TestProvisionerd(t *testing.T) {
close(completeChan)
return &proto.UpdateJobResponse{}, nil
},
cancelJob: func(ctx context.Context, job *proto.CancelledJob) (*proto.Empty, error) {
failJob: func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
return &proto.Empty{}, nil
},
}), nil
@ -362,6 +362,127 @@ func TestProvisionerd(t *testing.T) {
require.True(t, didComplete.Load())
require.NoError(t, closer.Close())
})
t.Run("WorkspaceProvisionFailComplete", func(t *testing.T) {
t.Parallel()
var (
didFail atomic.Bool
didAcquireJob atomic.Bool
)
completeChan := make(chan struct{})
closer := createProvisionerd(t, func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
return createProvisionerDaemonClient(t, provisionerDaemonTestServer{
acquireJob: func(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
if didAcquireJob.Load() {
close(completeChan)
return &proto.AcquiredJob{}, nil
}
didAcquireJob.Store(true)
return &proto.AcquiredJob{
JobId: "test",
Provisioner: "someprovisioner",
ProjectSourceArchive: createTar(t, map[string]string{
"test.txt": "content",
}),
Type: &proto.AcquiredJob_WorkspaceProvision_{
WorkspaceProvision: &proto.AcquiredJob_WorkspaceProvision{
Metadata: &sdkproto.Provision_Metadata{},
},
},
}, nil
},
failJob: func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
didFail.Store(true)
return &proto.Empty{}, nil
},
}), nil
}, provisionerd.Provisioners{
"someprovisioner": createProvisionerClient(t, provisionerTestServer{
provision: func(request *sdkproto.Provision_Request, stream sdkproto.DRPCProvisioner_ProvisionStream) error {
return stream.Send(&sdkproto.Provision_Response{
Type: &sdkproto.Provision_Response_Complete{
Complete: &sdkproto.Provision_Complete{
Error: "some error",
},
},
})
},
}),
})
<-completeChan
require.True(t, didFail.Load())
require.NoError(t, closer.Close())
})
t.Run("Shutdown", func(t *testing.T) {
t.Parallel()
updateChan := make(chan struct{})
completeChan := make(chan struct{})
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())
defer shutdownCtxCancel()
server := createProvisionerd(t, func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
return createProvisionerDaemonClient(t, provisionerDaemonTestServer{
acquireJob: func(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
return &proto.AcquiredJob{
JobId: "test",
Provisioner: "someprovisioner",
ProjectSourceArchive: createTar(t, map[string]string{
"test.txt": "content",
}),
Type: &proto.AcquiredJob_WorkspaceProvision_{
WorkspaceProvision: &proto.AcquiredJob_WorkspaceProvision{
Metadata: &sdkproto.Provision_Metadata{},
},
},
}, nil
},
updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
if len(update.Logs) > 0 {
// Close on a log so we know when the job is in progress!
close(updateChan)
}
return &proto.UpdateJobResponse{}, nil
},
failJob: func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
close(completeChan)
return &proto.Empty{}, nil
},
}), nil
}, provisionerd.Provisioners{
"someprovisioner": createProvisionerClient(t, provisionerTestServer{
shutdown: func(_ context.Context, _ *sdkproto.Empty) (*sdkproto.Empty, error) {
shutdownCtxCancel()
return &sdkproto.Empty{}, nil
},
provision: func(request *sdkproto.Provision_Request, stream sdkproto.DRPCProvisioner_ProvisionStream) error {
err := stream.Send(&sdkproto.Provision_Response{
Type: &sdkproto.Provision_Response_Log{
Log: &sdkproto.Log{
Level: sdkproto.LogLevel_DEBUG,
Output: "in progress",
},
},
})
require.NoError(t, err)
<-shutdownCtx.Done()
err = stream.Send(&sdkproto.Provision_Response{
Type: &sdkproto.Provision_Response_Complete{
Complete: &sdkproto.Provision_Complete{
Error: "some error",
},
},
})
require.NoError(t, err)
return nil
},
}),
})
<-updateChan
err := server.Shutdown(context.Background())
require.NoError(t, err)
<-completeChan
require.NoError(t, server.Close())
})
}
// Creates an in-memory tar of the files provided.
@ -385,8 +506,8 @@ func createTar(t *testing.T, files map[string]string) []byte {
}
// Creates a provisionerd implementation with the provided dialer and provisioners.
func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners provisionerd.Provisioners) io.Closer {
closer := provisionerd.New(dialer, &provisionerd.Options{
func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners provisionerd.Provisioners) *provisionerd.Server {
server := provisionerd.New(dialer, &provisionerd.Options{
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
PollInterval: 50 * time.Millisecond,
UpdateInterval: 50 * time.Millisecond,
@ -394,9 +515,9 @@ func createProvisionerd(t *testing.T, dialer provisionerd.Dialer, provisioners p
WorkDirectory: t.TempDir(),
})
t.Cleanup(func() {
_ = closer.Close()
_ = server.Close()
})
return closer
return server
}
// Creates a provisionerd protobuf client that's connected
@ -440,10 +561,15 @@ func createProvisionerClient(t *testing.T, server provisionerTestServer) sdkprot
}
type provisionerTestServer struct {
shutdown func(_ context.Context, _ *sdkproto.Empty) (*sdkproto.Empty, error)
parse func(request *sdkproto.Parse_Request, stream sdkproto.DRPCProvisioner_ParseStream) error
provision func(request *sdkproto.Provision_Request, stream sdkproto.DRPCProvisioner_ProvisionStream) error
}
func (p *provisionerTestServer) Shutdown(ctx context.Context, empty *sdkproto.Empty) (*sdkproto.Empty, error) {
return p.shutdown(ctx, empty)
}
func (p *provisionerTestServer) Parse(request *sdkproto.Parse_Request, stream sdkproto.DRPCProvisioner_ParseStream) error {
return p.parse(request, stream)
}
@ -457,7 +583,7 @@ func (p *provisionerTestServer) Provision(request *sdkproto.Provision_Request, s
type provisionerDaemonTestServer struct {
acquireJob func(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error)
updateJob func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error)
cancelJob func(ctx context.Context, job *proto.CancelledJob) (*proto.Empty, error)
failJob func(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error)
completeJob func(ctx context.Context, job *proto.CompletedJob) (*proto.Empty, error)
}
@ -469,8 +595,8 @@ func (p *provisionerDaemonTestServer) UpdateJob(ctx context.Context, update *pro
return p.updateJob(ctx, update)
}
func (p *provisionerDaemonTestServer) CancelJob(ctx context.Context, job *proto.CancelledJob) (*proto.Empty, error) {
return p.cancelJob(ctx, job)
func (p *provisionerDaemonTestServer) FailJob(ctx context.Context, job *proto.FailedJob) (*proto.Empty, error) {
return p.failJob(ctx, job)
}
func (p *provisionerDaemonTestServer) CompleteJob(ctx context.Context, job *proto.CompletedJob) (*proto.Empty, error) {