feat: use agent v2 API to update app health (#11889)

Use the Agent v2 API to update App Health
This commit is contained in:
Spike Curtis
2024-01-30 11:35:12 +04:00
committed by GitHub
parent 2599850e54
commit 0fc177203e
5 changed files with 91 additions and 51 deletions

View File

@ -91,7 +91,6 @@ type Client interface {
Listen(ctx context.Context) (drpc.Conn, error) Listen(ctx context.Context) (drpc.Conn, error)
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error) ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
RewriteDERPMap(derpMap *tailcfg.DERPMap) RewriteDERPMap(derpMap *tailcfg.DERPMap)
@ -808,7 +807,7 @@ func (a *agent) run(ctx context.Context) error {
appReporterCtx, appReporterCtxCancel := context.WithCancel(ctx) appReporterCtx, appReporterCtxCancel := context.WithCancel(ctx)
defer appReporterCtxCancel() defer appReporterCtxCancel()
go NewWorkspaceAppHealthReporter( go NewWorkspaceAppHealthReporter(
a.logger, manifest.Apps, a.client.PostAppHealth)(appReporterCtx) a.logger, manifest.Apps, agentsdk.AppHealthPoster(aAPI))(appReporterCtx)
a.closeMutex.Lock() a.closeMutex.Lock()
network := a.network network := a.network

View File

@ -167,11 +167,6 @@ func (c *Client) PostLifecycle(ctx context.Context, req agentsdk.PostLifecycleRe
return nil return nil
} }
func (c *Client) PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error {
c.logger.Debug(ctx, "post app health", slog.F("req", req))
return nil
}
func (c *Client) GetStartup() <-chan *agentproto.Startup { func (c *Client) GetStartup() <-chan *agentproto.Startup {
return c.fakeAgentAPI.startupCh return c.fakeAgentAPI.startupCh
} }
@ -279,9 +274,9 @@ func (*FakeAgentAPI) UpdateLifecycle(context.Context, *agentproto.UpdateLifecycl
panic("implement me") panic("implement me")
} }
func (*FakeAgentAPI) BatchUpdateAppHealths(context.Context, *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) { func (f *FakeAgentAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
// TODO implement me f.logger.Debug(ctx, "batch update app health", slog.F("req", req))
panic("implement me") return &agentproto.BatchUpdateAppHealthResponse{}, nil
} }
func (f *FakeAgentAPI) UpdateStartup(_ context.Context, req *agentproto.UpdateStartupRequest) (*agentproto.Startup, error) { func (f *FakeAgentAPI) UpdateStartup(_ context.Context, req *agentproto.UpdateStartupRequest) (*agentproto.Startup, error) {

View File

@ -500,7 +500,14 @@ func TestWorkspaceAgentTailnetDirectDisabled(t *testing.T) {
// Verify that the manifest has DisableDirectConnections set to true. // Verify that the manifest has DisableDirectConnections set to true.
agentClient := agentsdk.New(client.URL) agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(r.AgentToken) agentClient.SetSessionToken(r.AgentToken)
manifest := requireGetManifest(ctx, t, agentClient) rpc, err := agentClient.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := rpc.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(rpc)
manifest := requireGetManifest(ctx, t, aAPI)
require.True(t, manifest.DisableDirectConnections) require.True(t, manifest.DisableDirectConnections)
_ = agenttest.New(t, client.URL, r.AgentToken) _ = agenttest.New(t, client.URL, r.AgentToken)
@ -823,46 +830,63 @@ func TestWorkspaceAgentAppHealth(t *testing.T) {
agentClient := agentsdk.New(client.URL) agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(r.AgentToken) agentClient.SetSessionToken(r.AgentToken)
conn, err := agentClient.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := conn.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(conn)
manifest := requireGetManifest(ctx, t, agentClient) manifest := requireGetManifest(ctx, t, aAPI)
require.EqualValues(t, codersdk.WorkspaceAppHealthDisabled, manifest.Apps[0].Health) require.EqualValues(t, codersdk.WorkspaceAppHealthDisabled, manifest.Apps[0].Health)
require.EqualValues(t, codersdk.WorkspaceAppHealthInitializing, manifest.Apps[1].Health) require.EqualValues(t, codersdk.WorkspaceAppHealthInitializing, manifest.Apps[1].Health)
err := agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{})
require.Error(t, err)
// empty // empty
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{}) _, err = aAPI.BatchUpdateAppHealths(ctx, &agentproto.BatchUpdateAppHealthRequest{})
require.Error(t, err) require.NoError(t, err)
// healthcheck disabled // healthcheck disabled
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{ _, err = aAPI.BatchUpdateAppHealths(ctx, &agentproto.BatchUpdateAppHealthRequest{
Healths: map[uuid.UUID]codersdk.WorkspaceAppHealth{ Updates: []*agentproto.BatchUpdateAppHealthRequest_HealthUpdate{
manifest.Apps[0].ID: codersdk.WorkspaceAppHealthInitializing, {
Id: manifest.Apps[0].ID[:],
Health: agentproto.AppHealth_INITIALIZING,
},
}, },
}) })
require.Error(t, err) require.Error(t, err)
// invalid value // invalid value
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{ _, err = aAPI.BatchUpdateAppHealths(ctx, &agentproto.BatchUpdateAppHealthRequest{
Healths: map[uuid.UUID]codersdk.WorkspaceAppHealth{ Updates: []*agentproto.BatchUpdateAppHealthRequest_HealthUpdate{
manifest.Apps[1].ID: codersdk.WorkspaceAppHealth("bad-value"), {
Id: manifest.Apps[1].ID[:],
Health: 99,
},
}, },
}) })
require.Error(t, err) require.Error(t, err)
// update to healthy // update to healthy
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{ _, err = aAPI.BatchUpdateAppHealths(ctx, &agentproto.BatchUpdateAppHealthRequest{
Healths: map[uuid.UUID]codersdk.WorkspaceAppHealth{ Updates: []*agentproto.BatchUpdateAppHealthRequest_HealthUpdate{
manifest.Apps[1].ID: codersdk.WorkspaceAppHealthHealthy, {
Id: manifest.Apps[1].ID[:],
Health: agentproto.AppHealth_HEALTHY,
},
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
manifest = requireGetManifest(ctx, t, agentClient) manifest = requireGetManifest(ctx, t, aAPI)
require.EqualValues(t, codersdk.WorkspaceAppHealthHealthy, manifest.Apps[1].Health) require.EqualValues(t, codersdk.WorkspaceAppHealthHealthy, manifest.Apps[1].Health)
// update to unhealthy // update to unhealthy
err = agentClient.PostAppHealth(ctx, agentsdk.PostAppHealthsRequest{ _, err = aAPI.BatchUpdateAppHealths(ctx, &agentproto.BatchUpdateAppHealthRequest{
Healths: map[uuid.UUID]codersdk.WorkspaceAppHealth{ Updates: []*agentproto.BatchUpdateAppHealthRequest_HealthUpdate{
manifest.Apps[1].ID: codersdk.WorkspaceAppHealthUnhealthy, {
Id: manifest.Apps[1].ID[:],
Health: agentproto.AppHealth_UNHEALTHY,
},
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
manifest = requireGetManifest(ctx, t, agentClient) manifest = requireGetManifest(ctx, t, aAPI)
require.EqualValues(t, codersdk.WorkspaceAppHealthUnhealthy, manifest.Apps[1].Health) require.EqualValues(t, codersdk.WorkspaceAppHealthUnhealthy, manifest.Apps[1].Health)
} }
@ -1105,8 +1129,15 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
agentClient.SetSessionToken(r.AgentToken) agentClient.SetSessionToken(r.AgentToken)
ctx := testutil.Context(t, testutil.WaitMedium) ctx := testutil.Context(t, testutil.WaitMedium)
conn, err := agentClient.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := conn.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(conn)
manifest := requireGetManifest(ctx, t, agentClient) manifest := requireGetManifest(ctx, t, aAPI)
// Verify manifest API response. // Verify manifest API response.
require.Equal(t, workspace.ID, manifest.WorkspaceID) require.Equal(t, workspace.ID, manifest.WorkspaceID)
@ -1276,8 +1307,15 @@ func TestWorkspaceAgent_Metadata_CatchMemoryLeak(t *testing.T) {
agentClient.SetSessionToken(r.AgentToken) agentClient.SetSessionToken(r.AgentToken)
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitSuperLong)) ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitSuperLong))
conn, err := agentClient.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := conn.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(conn)
manifest := requireGetManifest(ctx, t, agentClient) manifest := requireGetManifest(ctx, t, aAPI)
post := func(ctx context.Context, key, value string) error { post := func(ctx context.Context, key, value string) error {
return agentClient.PostMetadata(ctx, agentsdk.PostMetadataRequest{ return agentClient.PostMetadata(ctx, agentsdk.PostMetadataRequest{
@ -1622,14 +1660,7 @@ func TestWorkspaceAgentExternalAuthListen(t *testing.T) {
}) })
} }
func requireGetManifest(ctx context.Context, t testing.TB, client agent.Client) agentsdk.Manifest { func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCAgentClient) agentsdk.Manifest {
conn, err := client.Listen(ctx)
require.NoError(t, err)
defer func() {
cErr := conn.Close()
require.NoError(t, cErr)
}()
aAPI := agentproto.NewDRPCAgentClient(conn)
mp, err := aAPI.GetManifest(ctx, &agentproto.GetManifestRequest{}) mp, err := aAPI.GetManifest(ctx, &agentproto.GetManifestRequest{})
require.NoError(t, err) require.NoError(t, err)
manifest, err := agentsdk.ManifestFromProto(mp) manifest, err := agentsdk.ManifestFromProto(mp)

View File

@ -231,18 +231,18 @@ type PostAppHealthsRequest struct {
Healths map[uuid.UUID]codersdk.WorkspaceAppHealth Healths map[uuid.UUID]codersdk.WorkspaceAppHealth
} }
// PostAppHealth updates the workspace agent app health status. func AppHealthPoster(aAPI proto.DRPCAgentClient) func(ctx context.Context, req PostAppHealthsRequest) error {
func (c *Client) PostAppHealth(ctx context.Context, req PostAppHealthsRequest) error { return func(ctx context.Context, req PostAppHealthsRequest) error {
res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/app-health", req) pReq, err := ProtoFromAppHealthsRequest(req)
if err != nil { if err != nil {
return err return xerrors.Errorf("convert AppHealthsRequest: %w", err)
}
_, err = aAPI.BatchUpdateAppHealths(ctx, pReq)
if err != nil {
return xerrors.Errorf("batch update app healths: %w", err)
}
return nil
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return codersdk.ReadBodyAsError(res)
}
return nil
} }
// AuthenticateResponse is returned when an instance ID // AuthenticateResponse is returned when an instance ID

View File

@ -278,3 +278,18 @@ func ProtoFromSubsystems(ss []codersdk.AgentSubsystem) ([]proto.Startup_Subsyste
} }
return ret, nil return ret, nil
} }
func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAppHealthRequest, error) {
pReq := &proto.BatchUpdateAppHealthRequest{}
for id, h := range req.Healths {
hp, ok := proto.AppHealth_value[strings.ToUpper(string(h))]
if !ok {
return nil, xerrors.Errorf("unknown app health: %s", h)
}
pReq.Updates = append(pReq.Updates, &proto.BatchUpdateAppHealthRequest_HealthUpdate{
Id: id[:],
Health: proto.AppHealth(hp),
})
}
return pReq, nil
}