From 8c8344ca13c294566084ba6cc6bdcf8c36d72eef Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 10 Nov 2022 13:26:57 -0600 Subject: [PATCH] fix: tolerate non-json lines in provisionerd logs (#5006) Co-authored-by: Dean Sheather --- provisioner/terraform/executor.go | 105 +++++++++++------- .../terraform/executor_internal_test.go | 31 +----- provisioner/terraform/provision.go | 23 ++-- 3 files changed, 81 insertions(+), 78 deletions(-) diff --git a/provisioner/terraform/executor.go b/provisioner/terraform/executor.go index b52b230459..db1dde3819 100644 --- a/provisioner/terraform/executor.go +++ b/provisioner/terraform/executor.go @@ -18,6 +18,7 @@ import ( tfjson "github.com/hashicorp/terraform-json" "golang.org/x/xerrors" + "cdr.dev/slog" "github.com/coder/coder/provisionersdk/proto" ) @@ -171,10 +172,12 @@ func versionFromBinaryPath(ctx context.Context, binaryPath string) (*version.Ver return version.NewVersion(vj.Version) } -func (e executor) init(ctx, killCtx context.Context, logr logger) error { +func (e executor) init(ctx, killCtx context.Context, logr logSink) error { outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) defer func() { + _ = outWriter.Close() + _ = errWriter.Close() <-doneOut <-doneErr }() @@ -201,7 +204,7 @@ func (e executor) init(ctx, killCtx context.Context, logr logger) error { } // revive:disable-next-line:flag-parameter -func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool) (*proto.Provision_Response, error) { +func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) { planfilePath := filepath.Join(e.workdir, "terraform.tfplan") args := []string{ "plan", @@ -221,6 +224,8 @@ func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr lo outWriter, doneOut := provisionLogWriter(logr) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) defer func() { + _ = outWriter.Close() + _ = errWriter.Close() <-doneOut <-doneErr }() @@ -287,7 +292,7 @@ func (e executor) graph(ctx, killCtx context.Context) (string, error) { } // revive:disable-next-line:flag-parameter -func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool, +func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool, ) (*proto.Provision_Response, error) { args := []string{ "apply", @@ -307,6 +312,8 @@ func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr l outWriter, doneOut := provisionLogWriter(logr) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) defer func() { + _ = outWriter.Close() + _ = errWriter.Close() <-doneOut <-doneErr }() @@ -380,86 +387,104 @@ func interruptCommandOnCancel(ctx, killCtx context.Context, cmd *exec.Cmd) { }() } -type logger interface { - Log(*proto.Log) error +type logSink interface { + Log(*proto.Log) } -type streamLogger struct { +type streamLogSink struct { + // Any errors writing to the stream will be logged to logger. + logger slog.Logger stream proto.DRPCProvisioner_ProvisionStream } -func (s streamLogger) Log(l *proto.Log) error { - return s.stream.Send(&proto.Provision_Response{ +var _ logSink = streamLogSink{} + +func (s streamLogSink) Log(l *proto.Log) { + err := s.stream.Send(&proto.Provision_Response{ Type: &proto.Provision_Response_Log{ Log: l, }, }) + if err != nil { + s.logger.Warn(context.Background(), "write log to stream", + slog.F("level", l.Level.String()), + slog.F("message", l.Output), + slog.Error(err), + ) + } } // logWriter creates a WriteCloser that will log each line of text at the given level. The WriteCloser must be closed // by the caller to end logging, after which the returned channel will be closed to indicate that logging of the written // data has finished. Failure to close the WriteCloser will leak a goroutine. -func logWriter(logr logger, level proto.LogLevel) (io.WriteCloser, <-chan any) { +func logWriter(sink logSink, level proto.LogLevel) (io.WriteCloser, <-chan any) { r, w := io.Pipe() done := make(chan any) - go readAndLog(logr, r, done, level) + go readAndLog(sink, r, done, level) return w, done } -func readAndLog(logr logger, r io.Reader, done chan<- any, level proto.LogLevel) { +func readAndLog(sink logSink, r io.Reader, done chan<- any, level proto.LogLevel) { defer close(done) scanner := bufio.NewScanner(r) for scanner.Scan() { - err := logr.Log(&proto.Log{Level: level, Output: scanner.Text()}) - if err != nil { - // Not much we can do. We can't log because logging is itself breaking! - return - } + sink.Log(&proto.Log{Level: level, Output: scanner.Text()}) } } // provisionLogWriter creates a WriteCloser that will log each JSON formatted terraform log. The WriteCloser must be // closed by the caller to end logging, after which the returned channel will be closed to indicate that logging of the // written data has finished. Failure to close the WriteCloser will leak a goroutine. -func provisionLogWriter(logr logger) (io.WriteCloser, <-chan any) { +func provisionLogWriter(sink logSink) (io.WriteCloser, <-chan any) { r, w := io.Pipe() done := make(chan any) - go provisionReadAndLog(logr, r, done) + go provisionReadAndLog(sink, r, done) return w, done } -func provisionReadAndLog(logr logger, reader io.Reader, done chan<- any) { +func provisionReadAndLog(sink logSink, r io.Reader, done chan<- any) { defer close(done) - decoder := json.NewDecoder(reader) - for { + scanner := bufio.NewScanner(r) + for scanner.Scan() { var log terraformProvisionLog - err := decoder.Decode(&log) + err := json.Unmarshal(scanner.Bytes(), &log) if err != nil { - return - } - logLevel := convertTerraformLogLevel(log.Level, logr) - - err = logr.Log(&proto.Log{Level: logLevel, Output: log.Message}) - if err != nil { - // Not much we can do. We can't log because logging is itself breaking! - return + // Sometimes terraform doesn't log JSON, even though we asked it to. + // The terraform maintainers have said on the issue tracker that + // they don't guarantee that non-JSON lines won't get printed. + // https://github.com/hashicorp/terraform/issues/29252#issuecomment-887710001 + // + // > I think as a practical matter it isn't possible for us to + // > promise that the output will always be entirely JSON, because + // > there's plenty of code that runs before command line arguments + // > are parsed and thus before we even know we're in JSON mode. + // > Given that, I'd suggest writing code that consumes streaming + // > JSON output from Terraform in such a way that it can tolerate + // > the output not having JSON in it at all. + // + // Log lines such as: + // - Acquiring state lock. This may take a few moments... + // - Releasing state lock. This may take a few moments... + if strings.TrimSpace(scanner.Text()) == "" { + continue + } + log.Level = "info" + log.Message = scanner.Text() } + logLevel := convertTerraformLogLevel(log.Level, sink) + sink.Log(&proto.Log{Level: logLevel, Output: log.Message}) + + // If the diagnostic is provided, let's provide a bit more info! if log.Diagnostic == nil { continue } - - // If the diagnostic is provided, let's provide a bit more info! - logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, logr) - err = logr.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail}) - if err != nil { - // Not much we can do. We can't log because logging is itself breaking! - return - } + logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, sink) + sink.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail}) } } -func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel { +func convertTerraformLogLevel(logLevel string, sink logSink) proto.LogLevel { switch strings.ToLower(logLevel) { case "trace": return proto.LogLevel_TRACE @@ -472,7 +497,7 @@ func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel { case "error": return proto.LogLevel_ERROR default: - _ = logr.Log(&proto.Log{ + sink.Log(&proto.Log{ Level: proto.LogLevel_WARN, Output: fmt.Sprintf("unable to convert log level %s", logLevel), }) diff --git a/provisioner/terraform/executor_internal_test.go b/provisioner/terraform/executor_internal_test.go index e23a13e354..7c9d6d6f08 100644 --- a/provisioner/terraform/executor_internal_test.go +++ b/provisioner/terraform/executor_internal_test.go @@ -4,25 +4,24 @@ import ( "testing" "github.com/stretchr/testify/require" - "golang.org/x/xerrors" "github.com/coder/coder/provisionersdk/proto" ) type mockLogger struct { - logs []*proto.Log - retVal error + logs []*proto.Log } -func (m *mockLogger) Log(l *proto.Log) error { +var _ logSink = &mockLogger{} + +func (m *mockLogger) Log(l *proto.Log) { m.logs = append(m.logs, l) - return m.retVal } func TestLogWriter_Mainline(t *testing.T) { t.Parallel() - logr := &mockLogger{retVal: nil} + logr := &mockLogger{} writer, doneLogging := logWriter(logr, proto.LogLevel_INFO) _, err := writer.Write([]byte(`Sitting in an English garden @@ -40,23 +39,5 @@ From standing in the English rain`)) {Level: proto.LogLevel_INFO, Output: "If the sun don't come you get a tan"}, {Level: proto.LogLevel_INFO, Output: "From standing in the English rain"}, } - require.Equal(t, logr.logs, expected) -} - -func TestLogWriter_SendError(t *testing.T) { - t.Parallel() - - logr := &mockLogger{retVal: xerrors.New("Goo goo g'joob")} - writer, doneLogging := logWriter(logr, proto.LogLevel_INFO) - - _, err := writer.Write([]byte(`Sitting in an English garden -Waiting for the sun -If the sun don't come you get a tan -From standing in the English rain`)) - require.NoError(t, err) - err = writer.Close() - require.NoError(t, err) - <-doneLogging - expected := []*proto.Log{{Level: proto.LogLevel_INFO, Output: "Sitting in an English garden"}} - require.Equal(t, logr.logs, expected) + require.Equal(t, expected, logr.logs) } diff --git a/provisioner/terraform/provision.go b/provisioner/terraform/provision.go index fd61d96773..713f8766d1 100644 --- a/provisioner/terraform/provision.go +++ b/provisioner/terraform/provision.go @@ -69,16 +69,17 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error { } }() - logr := streamLogger{stream: stream} + sink := streamLogSink{ + logger: s.logger.Named("execution_logs"), + stream: stream, + } start := request.GetStart() e := s.executor(start.Directory) if err = e.checkMinVersion(ctx); err != nil { return err } - if err = logTerraformEnvVars(logr); err != nil { - return err - } + logTerraformEnvVars(sink) statefilePath := filepath.Join(start.Directory, "terraform.tfstate") if len(start.State) > 0 { @@ -111,7 +112,7 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error { } s.logger.Debug(ctx, "running initialization") - err = e.init(ctx, killCtx, logr) + err = e.init(ctx, killCtx, sink) if err != nil { if ctx.Err() != nil { return stream.Send(&proto.Provision_Response{ @@ -136,10 +137,10 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error { } var resp *proto.Provision_Response if start.DryRun { - resp, err = e.plan(ctx, killCtx, env, vars, logr, + resp, err = e.plan(ctx, killCtx, env, vars, sink, start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY) } else { - resp, err = e.apply(ctx, killCtx, env, vars, logr, + resp, err = e.apply(ctx, killCtx, env, vars, sink, start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY) } if err != nil { @@ -231,7 +232,7 @@ var ( } ) -func logTerraformEnvVars(logr logger) error { +func logTerraformEnvVars(sink logSink) { env := safeEnviron() for _, e := range env { if strings.HasPrefix(e, "TF_") { @@ -242,14 +243,10 @@ func logTerraformEnvVars(logr logger) error { if !tfEnvSafeToPrint[parts[0]] { parts[1] = "" } - err := logr.Log(&proto.Log{ + sink.Log(&proto.Log{ Level: proto.LogLevel_WARN, Output: fmt.Sprintf("terraform environment variable: %s=%s", parts[0], parts[1]), }) - if err != nil { - return err - } } } - return nil }