fix: tolerate non-json lines in provisionerd logs (#5006)

Co-authored-by: Dean Sheather <dean@deansheather.com>
This commit is contained in:
Colin Adler
2022-11-10 13:26:57 -06:00
committed by GitHub
parent a25deb939b
commit 8c8344ca13
3 changed files with 81 additions and 78 deletions

View File

@ -18,6 +18,7 @@ import (
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/provisionersdk/proto"
) )
@ -171,10 +172,12 @@ func versionFromBinaryPath(ctx context.Context, binaryPath string) (*version.Ver
return version.NewVersion(vj.Version) return version.NewVersion(vj.Version)
} }
func (e executor) init(ctx, killCtx context.Context, logr logger) error { func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG) outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() { defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut <-doneOut
<-doneErr <-doneErr
}() }()
@ -201,7 +204,7 @@ func (e executor) init(ctx, killCtx context.Context, logr logger) error {
} }
// revive:disable-next-line:flag-parameter // revive:disable-next-line:flag-parameter
func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool) (*proto.Provision_Response, error) { func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
planfilePath := filepath.Join(e.workdir, "terraform.tfplan") planfilePath := filepath.Join(e.workdir, "terraform.tfplan")
args := []string{ args := []string{
"plan", "plan",
@ -221,6 +224,8 @@ func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr lo
outWriter, doneOut := provisionLogWriter(logr) outWriter, doneOut := provisionLogWriter(logr)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() { defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut <-doneOut
<-doneErr <-doneErr
}() }()
@ -287,7 +292,7 @@ func (e executor) graph(ctx, killCtx context.Context) (string, error) {
} }
// revive:disable-next-line:flag-parameter // revive:disable-next-line:flag-parameter
func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool, func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool,
) (*proto.Provision_Response, error) { ) (*proto.Provision_Response, error) {
args := []string{ args := []string{
"apply", "apply",
@ -307,6 +312,8 @@ func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr l
outWriter, doneOut := provisionLogWriter(logr) outWriter, doneOut := provisionLogWriter(logr)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR) errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() { defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut <-doneOut
<-doneErr <-doneErr
}() }()
@ -380,86 +387,104 @@ func interruptCommandOnCancel(ctx, killCtx context.Context, cmd *exec.Cmd) {
}() }()
} }
type logger interface { type logSink interface {
Log(*proto.Log) error Log(*proto.Log)
} }
type streamLogger struct { type streamLogSink struct {
// Any errors writing to the stream will be logged to logger.
logger slog.Logger
stream proto.DRPCProvisioner_ProvisionStream stream proto.DRPCProvisioner_ProvisionStream
} }
func (s streamLogger) Log(l *proto.Log) error { var _ logSink = streamLogSink{}
return s.stream.Send(&proto.Provision_Response{
func (s streamLogSink) Log(l *proto.Log) {
err := s.stream.Send(&proto.Provision_Response{
Type: &proto.Provision_Response_Log{ Type: &proto.Provision_Response_Log{
Log: l, Log: l,
}, },
}) })
if err != nil {
s.logger.Warn(context.Background(), "write log to stream",
slog.F("level", l.Level.String()),
slog.F("message", l.Output),
slog.Error(err),
)
}
} }
// logWriter creates a WriteCloser that will log each line of text at the given level. The WriteCloser must be closed // logWriter creates a WriteCloser that will log each line of text at the given level. The WriteCloser must be closed
// by the caller to end logging, after which the returned channel will be closed to indicate that logging of the written // by the caller to end logging, after which the returned channel will be closed to indicate that logging of the written
// data has finished. Failure to close the WriteCloser will leak a goroutine. // data has finished. Failure to close the WriteCloser will leak a goroutine.
func logWriter(logr logger, level proto.LogLevel) (io.WriteCloser, <-chan any) { func logWriter(sink logSink, level proto.LogLevel) (io.WriteCloser, <-chan any) {
r, w := io.Pipe() r, w := io.Pipe()
done := make(chan any) done := make(chan any)
go readAndLog(logr, r, done, level) go readAndLog(sink, r, done, level)
return w, done return w, done
} }
func readAndLog(logr logger, r io.Reader, done chan<- any, level proto.LogLevel) { func readAndLog(sink logSink, r io.Reader, done chan<- any, level proto.LogLevel) {
defer close(done) defer close(done)
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
for scanner.Scan() { for scanner.Scan() {
err := logr.Log(&proto.Log{Level: level, Output: scanner.Text()}) sink.Log(&proto.Log{Level: level, Output: scanner.Text()})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
}
} }
} }
// provisionLogWriter creates a WriteCloser that will log each JSON formatted terraform log. The WriteCloser must be // provisionLogWriter creates a WriteCloser that will log each JSON formatted terraform log. The WriteCloser must be
// closed by the caller to end logging, after which the returned channel will be closed to indicate that logging of the // closed by the caller to end logging, after which the returned channel will be closed to indicate that logging of the
// written data has finished. Failure to close the WriteCloser will leak a goroutine. // written data has finished. Failure to close the WriteCloser will leak a goroutine.
func provisionLogWriter(logr logger) (io.WriteCloser, <-chan any) { func provisionLogWriter(sink logSink) (io.WriteCloser, <-chan any) {
r, w := io.Pipe() r, w := io.Pipe()
done := make(chan any) done := make(chan any)
go provisionReadAndLog(logr, r, done) go provisionReadAndLog(sink, r, done)
return w, done return w, done
} }
func provisionReadAndLog(logr logger, reader io.Reader, done chan<- any) { func provisionReadAndLog(sink logSink, r io.Reader, done chan<- any) {
defer close(done) defer close(done)
decoder := json.NewDecoder(reader) scanner := bufio.NewScanner(r)
for { for scanner.Scan() {
var log terraformProvisionLog var log terraformProvisionLog
err := decoder.Decode(&log) err := json.Unmarshal(scanner.Bytes(), &log)
if err != nil { if err != nil {
return // Sometimes terraform doesn't log JSON, even though we asked it to.
// The terraform maintainers have said on the issue tracker that
// they don't guarantee that non-JSON lines won't get printed.
// https://github.com/hashicorp/terraform/issues/29252#issuecomment-887710001
//
// > I think as a practical matter it isn't possible for us to
// > promise that the output will always be entirely JSON, because
// > there's plenty of code that runs before command line arguments
// > are parsed and thus before we even know we're in JSON mode.
// > Given that, I'd suggest writing code that consumes streaming
// > JSON output from Terraform in such a way that it can tolerate
// > the output not having JSON in it at all.
//
// Log lines such as:
// - Acquiring state lock. This may take a few moments...
// - Releasing state lock. This may take a few moments...
if strings.TrimSpace(scanner.Text()) == "" {
continue
} }
logLevel := convertTerraformLogLevel(log.Level, logr) log.Level = "info"
log.Message = scanner.Text()
err = logr.Log(&proto.Log{Level: logLevel, Output: log.Message})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
} }
logLevel := convertTerraformLogLevel(log.Level, sink)
sink.Log(&proto.Log{Level: logLevel, Output: log.Message})
// If the diagnostic is provided, let's provide a bit more info!
if log.Diagnostic == nil { if log.Diagnostic == nil {
continue continue
} }
logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, sink)
// If the diagnostic is provided, let's provide a bit more info! sink.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail})
logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, logr)
err = logr.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
}
} }
} }
func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel { func convertTerraformLogLevel(logLevel string, sink logSink) proto.LogLevel {
switch strings.ToLower(logLevel) { switch strings.ToLower(logLevel) {
case "trace": case "trace":
return proto.LogLevel_TRACE return proto.LogLevel_TRACE
@ -472,7 +497,7 @@ func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel {
case "error": case "error":
return proto.LogLevel_ERROR return proto.LogLevel_ERROR
default: default:
_ = logr.Log(&proto.Log{ sink.Log(&proto.Log{
Level: proto.LogLevel_WARN, Level: proto.LogLevel_WARN,
Output: fmt.Sprintf("unable to convert log level %s", logLevel), Output: fmt.Sprintf("unable to convert log level %s", logLevel),
}) })

View File

@ -4,25 +4,24 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/provisionersdk/proto"
) )
type mockLogger struct { type mockLogger struct {
logs []*proto.Log logs []*proto.Log
retVal error
} }
func (m *mockLogger) Log(l *proto.Log) error { var _ logSink = &mockLogger{}
func (m *mockLogger) Log(l *proto.Log) {
m.logs = append(m.logs, l) m.logs = append(m.logs, l)
return m.retVal
} }
func TestLogWriter_Mainline(t *testing.T) { func TestLogWriter_Mainline(t *testing.T) {
t.Parallel() t.Parallel()
logr := &mockLogger{retVal: nil} logr := &mockLogger{}
writer, doneLogging := logWriter(logr, proto.LogLevel_INFO) writer, doneLogging := logWriter(logr, proto.LogLevel_INFO)
_, err := writer.Write([]byte(`Sitting in an English garden _, err := writer.Write([]byte(`Sitting in an English garden
@ -40,23 +39,5 @@ From standing in the English rain`))
{Level: proto.LogLevel_INFO, Output: "If the sun don't come you get a tan"}, {Level: proto.LogLevel_INFO, Output: "If the sun don't come you get a tan"},
{Level: proto.LogLevel_INFO, Output: "From standing in the English rain"}, {Level: proto.LogLevel_INFO, Output: "From standing in the English rain"},
} }
require.Equal(t, logr.logs, expected) require.Equal(t, expected, logr.logs)
}
func TestLogWriter_SendError(t *testing.T) {
t.Parallel()
logr := &mockLogger{retVal: xerrors.New("Goo goo g'joob")}
writer, doneLogging := logWriter(logr, proto.LogLevel_INFO)
_, err := writer.Write([]byte(`Sitting in an English garden
Waiting for the sun
If the sun don't come you get a tan
From standing in the English rain`))
require.NoError(t, err)
err = writer.Close()
require.NoError(t, err)
<-doneLogging
expected := []*proto.Log{{Level: proto.LogLevel_INFO, Output: "Sitting in an English garden"}}
require.Equal(t, logr.logs, expected)
} }

View File

@ -69,16 +69,17 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
} }
}() }()
logr := streamLogger{stream: stream} sink := streamLogSink{
logger: s.logger.Named("execution_logs"),
stream: stream,
}
start := request.GetStart() start := request.GetStart()
e := s.executor(start.Directory) e := s.executor(start.Directory)
if err = e.checkMinVersion(ctx); err != nil { if err = e.checkMinVersion(ctx); err != nil {
return err return err
} }
if err = logTerraformEnvVars(logr); err != nil { logTerraformEnvVars(sink)
return err
}
statefilePath := filepath.Join(start.Directory, "terraform.tfstate") statefilePath := filepath.Join(start.Directory, "terraform.tfstate")
if len(start.State) > 0 { if len(start.State) > 0 {
@ -111,7 +112,7 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
} }
s.logger.Debug(ctx, "running initialization") s.logger.Debug(ctx, "running initialization")
err = e.init(ctx, killCtx, logr) err = e.init(ctx, killCtx, sink)
if err != nil { if err != nil {
if ctx.Err() != nil { if ctx.Err() != nil {
return stream.Send(&proto.Provision_Response{ return stream.Send(&proto.Provision_Response{
@ -136,10 +137,10 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
} }
var resp *proto.Provision_Response var resp *proto.Provision_Response
if start.DryRun { if start.DryRun {
resp, err = e.plan(ctx, killCtx, env, vars, logr, resp, err = e.plan(ctx, killCtx, env, vars, sink,
start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY) start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY)
} else { } else {
resp, err = e.apply(ctx, killCtx, env, vars, logr, resp, err = e.apply(ctx, killCtx, env, vars, sink,
start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY) start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY)
} }
if err != nil { if err != nil {
@ -231,7 +232,7 @@ var (
} }
) )
func logTerraformEnvVars(logr logger) error { func logTerraformEnvVars(sink logSink) {
env := safeEnviron() env := safeEnviron()
for _, e := range env { for _, e := range env {
if strings.HasPrefix(e, "TF_") { if strings.HasPrefix(e, "TF_") {
@ -242,14 +243,10 @@ func logTerraformEnvVars(logr logger) error {
if !tfEnvSafeToPrint[parts[0]] { if !tfEnvSafeToPrint[parts[0]] {
parts[1] = "<value redacted>" parts[1] = "<value redacted>"
} }
err := logr.Log(&proto.Log{ sink.Log(&proto.Log{
Level: proto.LogLevel_WARN, Level: proto.LogLevel_WARN,
Output: fmt.Sprintf("terraform environment variable: %s=%s", parts[0], parts[1]), Output: fmt.Sprintf("terraform environment variable: %s=%s", parts[0], parts[1]),
}) })
if err != nil {
return err
} }
} }
}
return nil
} }