mirror of
https://github.com/coder/coder.git
synced 2025-07-03 16:13:58 +00:00
Addresses https://github.com/coder/coder/issues/16231. This PR reduces the volume of logs we print after server startup in order to surface the web UI URL better. Here are the logs after the changes a couple of seconds after starting the server: <img width="868" alt="Screenshot 2025-02-18 at 16 31 32" src="https://github.com/user-attachments/assets/786dc4b8-7383-48c8-a5c3-a997c01ca915" /> The warning is due to running a development site-less build. It wouldn't show in a release build.
1592 lines
44 KiB
Go
1592 lines
44 KiB
Go
package tailnet
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"math"
|
|
"net/netip"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
"storj.io/drpc"
|
|
"storj.io/drpc/drpcerr"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/util/dnsname"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/coderd/util/ptr"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/tailnet/proto"
|
|
"github.com/coder/quartz"
|
|
"github.com/coder/retry"
|
|
)
|
|
|
|
// A Controller connects to the tailnet control plane, and then uses the control protocols to
|
|
// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
|
|
// delegates this task to sub-controllers responsible for the main areas of the tailnet control
|
|
// protocol: coordination, DERP map updates, resume tokens, telemetry, and workspace updates.
|
|
type Controller struct {
|
|
Dialer ControlProtocolDialer
|
|
CoordCtrl CoordinationController
|
|
DERPCtrl DERPController
|
|
ResumeTokenCtrl ResumeTokenController
|
|
TelemetryCtrl TelemetryController
|
|
WorkspaceUpdatesCtrl WorkspaceUpdatesController
|
|
|
|
ctx context.Context
|
|
gracefulCtx context.Context
|
|
cancelGracefulCtx context.CancelFunc
|
|
logger slog.Logger
|
|
closedCh chan struct{}
|
|
|
|
// Testing only
|
|
clock quartz.Clock
|
|
gracefulTimeout time.Duration
|
|
}
|
|
|
|
type CloserWaiter interface {
|
|
Close(context.Context) error
|
|
Wait() <-chan error
|
|
}
|
|
|
|
// CoordinatorClient is an abstraction of the Coordinator's control protocol interface from the
|
|
// perspective of a protocol client (i.e. the Coder Agent is also a client of this interface).
|
|
type CoordinatorClient interface {
|
|
Close() error
|
|
Send(*proto.CoordinateRequest) error
|
|
Recv() (*proto.CoordinateResponse, error)
|
|
}
|
|
|
|
// A CoordinationController accepts connections to the control plane, and handles the Coordination
|
|
// protocol on behalf of some Coordinatee (tailnet.Conn in production). This is the "glue" code
|
|
// between them.
|
|
type CoordinationController interface {
|
|
New(CoordinatorClient) CloserWaiter
|
|
}
|
|
|
|
// DERPClient is an abstraction of the stream of DERPMap updates from the control plane.
|
|
type DERPClient interface {
|
|
Close() error
|
|
Recv() (*tailcfg.DERPMap, error)
|
|
}
|
|
|
|
// A DERPController accepts connections to the control plane, and handles the DERPMap updates
|
|
// delivered over them by programming the data plane (tailnet.Conn or some test interface).
|
|
type DERPController interface {
|
|
New(DERPClient) CloserWaiter
|
|
}
|
|
|
|
type ResumeTokenClient interface {
|
|
RefreshResumeToken(ctx context.Context, in *proto.RefreshResumeTokenRequest) (*proto.RefreshResumeTokenResponse, error)
|
|
}
|
|
|
|
type ResumeTokenController interface {
|
|
New(ResumeTokenClient) CloserWaiter
|
|
Token() (string, bool)
|
|
}
|
|
|
|
type TelemetryClient interface {
|
|
PostTelemetry(ctx context.Context, in *proto.TelemetryRequest) (*proto.TelemetryResponse, error)
|
|
}
|
|
|
|
type TelemetryController interface {
|
|
New(TelemetryClient)
|
|
}
|
|
|
|
type WorkspaceUpdatesClient interface {
|
|
Close() error
|
|
Recv() (*proto.WorkspaceUpdate, error)
|
|
}
|
|
|
|
type WorkspaceUpdatesController interface {
|
|
New(WorkspaceUpdatesClient) CloserWaiter
|
|
}
|
|
|
|
// DNSHostsSetter is something that you can set a mapping of DNS names to IPs on. It's the subset
|
|
// of the tailnet.Conn that we use to configure DNS records.
|
|
type DNSHostsSetter interface {
|
|
SetDNSHosts(hosts map[dnsname.FQDN][]netip.Addr) error
|
|
}
|
|
|
|
// UpdatesHandler is anything that expects a stream of workspace update diffs.
|
|
type UpdatesHandler interface {
|
|
Update(WorkspaceUpdate) error
|
|
}
|
|
|
|
// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
|
|
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
|
|
// connection).
|
|
type ControlProtocolClients struct {
|
|
Closer io.Closer
|
|
Coordinator CoordinatorClient
|
|
DERP DERPClient
|
|
ResumeToken ResumeTokenClient
|
|
Telemetry TelemetryClient
|
|
WorkspaceUpdates WorkspaceUpdatesClient
|
|
}
|
|
|
|
type ControlProtocolDialer interface {
|
|
// Dial connects to the tailnet control plane and returns clients for the different control
|
|
// sub-protocols (coordination, DERP maps, resume tokens, and telemetry). If the
|
|
// ResumeTokenController is not nil, the dialer should query for a resume token and use it to
|
|
// dial, if available.
|
|
Dial(ctx context.Context, r ResumeTokenController) (ControlProtocolClients, error)
|
|
}
|
|
|
|
// BasicCoordinationController handles the basic coordination operations common to all types of
|
|
// tailnet consumers:
|
|
//
|
|
// 1. sending local node updates to the Coordinator
|
|
// 2. receiving peer node updates and programming them into the Coordinatee (e.g. tailnet.Conn)
|
|
// 3. (optionally) sending ReadyToHandshake acknowledgements for peer updates.
|
|
//
|
|
// It is designed to be used on its own, or composed into more advanced CoordinationControllers.
|
|
type BasicCoordinationController struct {
|
|
Logger slog.Logger
|
|
Coordinatee Coordinatee
|
|
SendAcks bool
|
|
}
|
|
|
|
// New satisfies the method on the CoordinationController interface
|
|
func (c *BasicCoordinationController) New(client CoordinatorClient) CloserWaiter {
|
|
return c.NewCoordination(client)
|
|
}
|
|
|
|
// NewCoordination creates a BasicCoordination
|
|
func (c *BasicCoordinationController) NewCoordination(client CoordinatorClient) *BasicCoordination {
|
|
b := &BasicCoordination{
|
|
logger: c.Logger,
|
|
errChan: make(chan error, 1),
|
|
coordinatee: c.Coordinatee,
|
|
Client: client,
|
|
respLoopDone: make(chan struct{}),
|
|
sendAcks: c.SendAcks,
|
|
}
|
|
|
|
c.Coordinatee.SetNodeCallback(func(node *Node) {
|
|
pn, err := NodeToProto(node)
|
|
if err != nil {
|
|
b.logger.Critical(context.Background(), "failed to convert node", slog.Error(err))
|
|
b.SendErr(err)
|
|
return
|
|
}
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.closed {
|
|
b.logger.Debug(context.Background(), "ignored node update because coordination is closed")
|
|
return
|
|
}
|
|
err = b.Client.Send(&proto.CoordinateRequest{UpdateSelf: &proto.CoordinateRequest_UpdateSelf{Node: pn}})
|
|
if err != nil {
|
|
b.SendErr(xerrors.Errorf("write: %w", err))
|
|
}
|
|
})
|
|
go b.respLoop()
|
|
|
|
return b
|
|
}
|
|
|
|
// BasicCoordination handles:
|
|
//
|
|
// 1. Sending local node updates to the control plane
|
|
// 2. Reading remote updates from the control plane and programming them into the Coordinatee.
|
|
//
|
|
// It does *not* handle adding any Tunnels, but these can be handled by composing
|
|
// BasicCoordinationController with a more advanced controller.
|
|
type BasicCoordination struct {
|
|
sync.Mutex
|
|
closed bool
|
|
errChan chan error
|
|
coordinatee Coordinatee
|
|
logger slog.Logger
|
|
Client CoordinatorClient
|
|
respLoopDone chan struct{}
|
|
sendAcks bool
|
|
}
|
|
|
|
// Close the coordination gracefully. If the context expires before the remote API server has hung
|
|
// up on us, we forcibly close the Client connection.
|
|
func (c *BasicCoordination) Close(ctx context.Context) (retErr error) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if c.closed {
|
|
return nil
|
|
}
|
|
c.closed = true
|
|
defer func() {
|
|
// We shouldn't just close the protocol right away, because the way dRPC streams work is
|
|
// that if you close them, that could take effect immediately, even before the Disconnect
|
|
// message is processed. Coordinators are supposed to hang up on us once they get a
|
|
// Disconnect message, so we should wait around for that until the context expires.
|
|
select {
|
|
case <-c.respLoopDone:
|
|
c.logger.Debug(ctx, "responses closed after disconnect")
|
|
return
|
|
case <-ctx.Done():
|
|
c.logger.Warn(ctx, "context expired while waiting for coordinate responses to close")
|
|
}
|
|
// forcefully close the stream
|
|
protoErr := c.Client.Close()
|
|
<-c.respLoopDone
|
|
if retErr == nil {
|
|
retErr = protoErr
|
|
}
|
|
}()
|
|
err := c.Client.Send(&proto.CoordinateRequest{Disconnect: &proto.CoordinateRequest_Disconnect{}})
|
|
if err != nil && !xerrors.Is(err, io.EOF) {
|
|
// Coordinator RPC hangs up when it gets disconnect, so EOF is expected.
|
|
return xerrors.Errorf("send disconnect: %w", err)
|
|
}
|
|
c.logger.Debug(context.Background(), "sent disconnect")
|
|
return nil
|
|
}
|
|
|
|
// Wait for the Coordination to complete
|
|
func (c *BasicCoordination) Wait() <-chan error {
|
|
return c.errChan
|
|
}
|
|
|
|
// SendErr is not part of the CloserWaiter interface, and is intended to be called internally, or
|
|
// by Controllers that use BasicCoordinationController in composition. It triggers Wait() to
|
|
// report the error if an error has not already been reported.
|
|
func (c *BasicCoordination) SendErr(err error) {
|
|
select {
|
|
case c.errChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (c *BasicCoordination) respLoop() {
|
|
defer func() {
|
|
cErr := c.Client.Close()
|
|
if cErr != nil {
|
|
c.logger.Debug(context.Background(),
|
|
"failed to close coordinate client after respLoop exit", slog.Error(cErr))
|
|
}
|
|
c.coordinatee.SetAllPeersLost()
|
|
close(c.respLoopDone)
|
|
}()
|
|
for {
|
|
resp, err := c.Client.Recv()
|
|
if err != nil {
|
|
c.logger.Debug(context.Background(),
|
|
"failed to read from protocol", slog.Error(err))
|
|
c.SendErr(xerrors.Errorf("read: %w", err))
|
|
return
|
|
}
|
|
|
|
err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
|
|
if err != nil {
|
|
c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err))
|
|
c.SendErr(xerrors.Errorf("update peers: %w", err))
|
|
return
|
|
}
|
|
|
|
// Only send ReadyForHandshake acks from peers without a target.
|
|
if c.sendAcks {
|
|
// Send an ack back for all received peers. This could
|
|
// potentially be smarter to only send an ACK once per client,
|
|
// but there's nothing currently stopping clients from reusing
|
|
// IDs.
|
|
rfh := []*proto.CoordinateRequest_ReadyForHandshake{}
|
|
for _, peer := range resp.GetPeerUpdates() {
|
|
if peer.Kind != proto.CoordinateResponse_PeerUpdate_NODE {
|
|
continue
|
|
}
|
|
|
|
rfh = append(rfh, &proto.CoordinateRequest_ReadyForHandshake{Id: peer.Id})
|
|
}
|
|
if len(rfh) > 0 {
|
|
err := c.Client.Send(&proto.CoordinateRequest{
|
|
ReadyForHandshake: rfh,
|
|
})
|
|
if err != nil {
|
|
c.logger.Debug(context.Background(),
|
|
"failed to send ready for handshake", slog.Error(err))
|
|
c.SendErr(xerrors.Errorf("send: %w", err))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type TunnelSrcCoordController struct {
|
|
*BasicCoordinationController
|
|
|
|
mu sync.Mutex
|
|
dests map[uuid.UUID]struct{}
|
|
coordination *BasicCoordination
|
|
}
|
|
|
|
// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
|
|
// tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
|
|
func NewTunnelSrcCoordController(
|
|
logger slog.Logger, coordinatee Coordinatee,
|
|
) *TunnelSrcCoordController {
|
|
return &TunnelSrcCoordController{
|
|
BasicCoordinationController: &BasicCoordinationController{
|
|
Logger: logger,
|
|
Coordinatee: coordinatee,
|
|
SendAcks: false,
|
|
},
|
|
dests: make(map[uuid.UUID]struct{}),
|
|
}
|
|
}
|
|
|
|
func (c *TunnelSrcCoordController) New(client CoordinatorClient) CloserWaiter {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
b := c.BasicCoordinationController.NewCoordination(client)
|
|
c.coordination = b
|
|
// resync destinations on reconnect
|
|
for dest := range c.dests {
|
|
err := client.Send(&proto.CoordinateRequest{
|
|
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
|
|
})
|
|
if err != nil {
|
|
b.SendErr(err)
|
|
c.coordination = nil
|
|
cErr := client.Close()
|
|
if cErr != nil {
|
|
c.Logger.Debug(
|
|
context.Background(),
|
|
"failed to close coordinator client after add tunnel failure",
|
|
slog.Error(cErr),
|
|
)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (c *TunnelSrcCoordController) AddDestination(dest uuid.UUID) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.Coordinatee.SetTunnelDestination(dest) // this prepares us for an ack
|
|
c.dests[dest] = struct{}{}
|
|
if c.coordination == nil {
|
|
return
|
|
}
|
|
err := c.coordination.Client.Send(
|
|
&proto.CoordinateRequest{
|
|
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
|
|
})
|
|
if err != nil {
|
|
c.coordination.SendErr(err)
|
|
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
|
|
if cErr != nil {
|
|
c.Logger.Debug(context.Background(),
|
|
"failed to close coordinator client after add tunnel failure",
|
|
slog.Error(cErr))
|
|
}
|
|
c.coordination = nil
|
|
}
|
|
}
|
|
|
|
func (c *TunnelSrcCoordController) RemoveDestination(dest uuid.UUID) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
delete(c.dests, dest)
|
|
if c.coordination == nil {
|
|
return
|
|
}
|
|
err := c.coordination.Client.Send(
|
|
&proto.CoordinateRequest{
|
|
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
|
|
})
|
|
if err != nil {
|
|
c.coordination.SendErr(err)
|
|
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
|
|
if cErr != nil {
|
|
c.Logger.Debug(context.Background(),
|
|
"failed to close coordinator client after remove tunnel failure",
|
|
slog.Error(cErr))
|
|
}
|
|
c.coordination = nil
|
|
}
|
|
}
|
|
|
|
func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
toAdd := make(map[uuid.UUID]struct{})
|
|
toRemove := maps.Clone(c.dests)
|
|
all := make(map[uuid.UUID]struct{})
|
|
for _, dest := range destinations {
|
|
all[dest] = struct{}{}
|
|
delete(toRemove, dest)
|
|
if _, ok := c.dests[dest]; !ok {
|
|
toAdd[dest] = struct{}{}
|
|
}
|
|
}
|
|
c.dests = all
|
|
if c.coordination == nil {
|
|
return
|
|
}
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
c.coordination.SendErr(err)
|
|
cErr := c.coordination.Client.Close() // don't gracefully disconnect
|
|
if cErr != nil {
|
|
c.Logger.Debug(context.Background(),
|
|
"failed to close coordinator client during sync destinations",
|
|
slog.Error(cErr))
|
|
}
|
|
c.coordination = nil
|
|
}
|
|
}()
|
|
for dest := range toAdd {
|
|
c.Coordinatee.SetTunnelDestination(dest)
|
|
err = c.coordination.Client.Send(
|
|
&proto.CoordinateRequest{
|
|
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
for dest := range toRemove {
|
|
err = c.coordination.Client.Send(
|
|
&proto.CoordinateRequest{
|
|
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
|
|
// create tunnels and always send ReadyToHandshake acknowledgements.
|
|
func NewAgentCoordinationController(
|
|
logger slog.Logger, coordinatee Coordinatee,
|
|
) CoordinationController {
|
|
return &BasicCoordinationController{
|
|
Logger: logger,
|
|
Coordinatee: coordinatee,
|
|
SendAcks: true,
|
|
}
|
|
}
|
|
|
|
type inMemoryCoordClient struct {
|
|
sync.Mutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
closed bool
|
|
logger slog.Logger
|
|
resps <-chan *proto.CoordinateResponse
|
|
reqs chan<- *proto.CoordinateRequest
|
|
}
|
|
|
|
func (c *inMemoryCoordClient) Close() error {
|
|
c.cancel()
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if c.closed {
|
|
return nil
|
|
}
|
|
c.closed = true
|
|
close(c.reqs)
|
|
return nil
|
|
}
|
|
|
|
func (c *inMemoryCoordClient) Send(request *proto.CoordinateRequest) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if c.closed {
|
|
return drpc.ClosedError.New("in-memory coordinator client closed")
|
|
}
|
|
select {
|
|
case c.reqs <- request:
|
|
return nil
|
|
case <-c.ctx.Done():
|
|
return drpc.ClosedError.New("in-memory coordinator client closed")
|
|
}
|
|
}
|
|
|
|
func (c *inMemoryCoordClient) Recv() (*proto.CoordinateResponse, error) {
|
|
select {
|
|
case resp, ok := <-c.resps:
|
|
if ok {
|
|
return resp, nil
|
|
}
|
|
// response from Coordinator was closed, so close the send direction as well, so that the
|
|
// Coordinator won't be waiting for us while shutting down.
|
|
_ = c.Close()
|
|
return nil, io.EOF
|
|
case <-c.ctx.Done():
|
|
return nil, drpc.ClosedError.New("in-memory coord client closed")
|
|
}
|
|
}
|
|
|
|
// NewInMemoryCoordinatorClient creates a coordination client that uses channels to connect to a
|
|
// local Coordinator. (The typical alternative is a DRPC-based client.)
|
|
func NewInMemoryCoordinatorClient(
|
|
logger slog.Logger,
|
|
clientID uuid.UUID,
|
|
auth CoordinateeAuth,
|
|
coordinator Coordinator,
|
|
) CoordinatorClient {
|
|
logger = logger.With(slog.F("client_id", clientID))
|
|
c := &inMemoryCoordClient{logger: logger}
|
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
|
|
|
// use the background context since we will depend exclusively on closing the req channel to
|
|
// tell the coordinator we are done.
|
|
c.reqs, c.resps = coordinator.Coordinate(context.Background(),
|
|
clientID, fmt.Sprintf("inmemory%s", clientID),
|
|
auth,
|
|
)
|
|
return c
|
|
}
|
|
|
|
type DERPMapSetter interface {
|
|
SetDERPMap(derpMap *tailcfg.DERPMap)
|
|
}
|
|
|
|
type basicDERPController struct {
|
|
logger slog.Logger
|
|
setter DERPMapSetter
|
|
}
|
|
|
|
func (b *basicDERPController) New(client DERPClient) CloserWaiter {
|
|
l := &derpSetLoop{
|
|
logger: b.logger,
|
|
setter: b.setter,
|
|
client: client,
|
|
errChan: make(chan error, 1),
|
|
recvLoopDone: make(chan struct{}),
|
|
}
|
|
go l.recvLoop()
|
|
return l
|
|
}
|
|
|
|
func NewBasicDERPController(logger slog.Logger, setter DERPMapSetter) DERPController {
|
|
return &basicDERPController{
|
|
logger: logger,
|
|
setter: setter,
|
|
}
|
|
}
|
|
|
|
type derpSetLoop struct {
|
|
logger slog.Logger
|
|
setter DERPMapSetter
|
|
client DERPClient
|
|
|
|
sync.Mutex
|
|
closed bool
|
|
errChan chan error
|
|
recvLoopDone chan struct{}
|
|
}
|
|
|
|
func (l *derpSetLoop) Close(ctx context.Context) error {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
if l.closed {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-l.recvLoopDone:
|
|
return nil
|
|
}
|
|
}
|
|
l.closed = true
|
|
cErr := l.client.Close()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-l.recvLoopDone:
|
|
return cErr
|
|
}
|
|
}
|
|
|
|
func (l *derpSetLoop) Wait() <-chan error {
|
|
return l.errChan
|
|
}
|
|
|
|
func (l *derpSetLoop) recvLoop() {
|
|
defer close(l.recvLoopDone)
|
|
for {
|
|
dm, err := l.client.Recv()
|
|
if err != nil {
|
|
l.logger.Debug(context.Background(), "failed to receive DERP message", slog.Error(err))
|
|
select {
|
|
case l.errChan <- err:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
l.logger.Debug(context.Background(), "got new DERP Map", slog.F("derp_map", dm))
|
|
l.setter.SetDERPMap(dm)
|
|
}
|
|
}
|
|
|
|
type BasicTelemetryController struct {
|
|
logger slog.Logger
|
|
|
|
sync.Mutex
|
|
client TelemetryClient
|
|
unavailable bool
|
|
}
|
|
|
|
func (b *BasicTelemetryController) New(client TelemetryClient) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
b.client = client
|
|
b.unavailable = false
|
|
b.logger.Debug(context.Background(), "new telemetry client connected to controller")
|
|
}
|
|
|
|
func (b *BasicTelemetryController) SendTelemetryEvent(event *proto.TelemetryEvent) {
|
|
b.Lock()
|
|
if b.client == nil {
|
|
b.Unlock()
|
|
b.logger.Debug(context.Background(),
|
|
"telemetry event dropped; no client", slog.F("event", event))
|
|
return
|
|
}
|
|
if b.unavailable {
|
|
b.Unlock()
|
|
b.logger.Debug(context.Background(),
|
|
"telemetry event dropped; unavailable", slog.F("event", event))
|
|
return
|
|
}
|
|
client := b.client
|
|
b.Unlock()
|
|
unavailable := sendTelemetry(b.logger, client, event)
|
|
if unavailable {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.client == client {
|
|
b.unavailable = true
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewBasicTelemetryController(logger slog.Logger) *BasicTelemetryController {
|
|
return &BasicTelemetryController{logger: logger}
|
|
}
|
|
|
|
var (
|
|
_ TelemetrySink = &BasicTelemetryController{}
|
|
_ TelemetryController = &BasicTelemetryController{}
|
|
)
|
|
|
|
func sendTelemetry(
|
|
logger slog.Logger, client TelemetryClient, event *proto.TelemetryEvent,
|
|
) (
|
|
unavailable bool,
|
|
) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_, err := client.PostTelemetry(ctx, &proto.TelemetryRequest{
|
|
Events: []*proto.TelemetryEvent{event},
|
|
})
|
|
if IsDRPCUnimplementedError(err) {
|
|
logger.Debug(
|
|
context.Background(),
|
|
"attempted to send telemetry to a server that doesn't support it",
|
|
slog.Error(err),
|
|
)
|
|
return true
|
|
} else if err != nil {
|
|
logger.Warn(
|
|
context.Background(),
|
|
"failed to post telemetry event",
|
|
slog.F("event", event), slog.Error(err),
|
|
)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IsDRPCUnimplementedError returns true if the error indicates the RPC called is not implemented
|
|
// by the server.
|
|
func IsDRPCUnimplementedError(err error) bool {
|
|
return drpcerr.Code(err) == drpcerr.Unimplemented ||
|
|
drpc.ProtocolError.Has(err) &&
|
|
strings.Contains(err.Error(), "unknown rpc: ")
|
|
}
|
|
|
|
type basicResumeTokenController struct {
|
|
logger slog.Logger
|
|
|
|
sync.Mutex
|
|
token *proto.RefreshResumeTokenResponse
|
|
refresher *basicResumeTokenRefresher
|
|
|
|
// for testing
|
|
clock quartz.Clock
|
|
}
|
|
|
|
func (b *basicResumeTokenController) New(client ResumeTokenClient) CloserWaiter {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.refresher != nil {
|
|
cErr := b.refresher.Close(context.Background())
|
|
if cErr != nil {
|
|
b.logger.Debug(context.Background(), "closed previous refresher", slog.Error(cErr))
|
|
}
|
|
}
|
|
b.refresher = newBasicResumeTokenRefresher(b.logger, b.clock, b, client)
|
|
return b.refresher
|
|
}
|
|
|
|
func (b *basicResumeTokenController) Token() (string, bool) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.token == nil {
|
|
return "", false
|
|
}
|
|
if b.token.ExpiresAt.AsTime().Before(b.clock.Now()) {
|
|
return "", false
|
|
}
|
|
return b.token.Token, true
|
|
}
|
|
|
|
func NewBasicResumeTokenController(logger slog.Logger, clock quartz.Clock) ResumeTokenController {
|
|
return &basicResumeTokenController{
|
|
logger: logger,
|
|
clock: clock,
|
|
}
|
|
}
|
|
|
|
type basicResumeTokenRefresher struct {
|
|
logger slog.Logger
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
ctrl *basicResumeTokenController
|
|
client ResumeTokenClient
|
|
errCh chan error
|
|
|
|
sync.Mutex
|
|
closed bool
|
|
timer *quartz.Timer
|
|
}
|
|
|
|
func (r *basicResumeTokenRefresher) Close(_ context.Context) error {
|
|
r.cancel()
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
if r.closed {
|
|
return nil
|
|
}
|
|
r.closed = true
|
|
r.timer.Stop()
|
|
select {
|
|
case r.errCh <- nil:
|
|
default: // already have an error
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basicResumeTokenRefresher) Wait() <-chan error {
|
|
return r.errCh
|
|
}
|
|
|
|
const never time.Duration = math.MaxInt64
|
|
|
|
func newBasicResumeTokenRefresher(
|
|
logger slog.Logger, clock quartz.Clock,
|
|
ctrl *basicResumeTokenController, client ResumeTokenClient,
|
|
) *basicResumeTokenRefresher {
|
|
r := &basicResumeTokenRefresher{
|
|
logger: logger,
|
|
ctrl: ctrl,
|
|
client: client,
|
|
errCh: make(chan error, 1),
|
|
}
|
|
r.ctx, r.cancel = context.WithCancel(context.Background())
|
|
r.timer = clock.AfterFunc(never, r.refresh, "basicResumeTokenRefresher")
|
|
go r.refresh()
|
|
return r
|
|
}
|
|
|
|
func (r *basicResumeTokenRefresher) refresh() {
|
|
if r.ctx.Err() != nil {
|
|
return // context done, no need to refresh
|
|
}
|
|
res, err := r.client.RefreshResumeToken(r.ctx, &proto.RefreshResumeTokenRequest{})
|
|
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
|
|
// these can only come from being closed, no need to log
|
|
select {
|
|
case r.errCh <- nil:
|
|
default: // already have an error
|
|
}
|
|
return
|
|
}
|
|
if IsDRPCUnimplementedError(err) {
|
|
r.logger.Info(r.ctx, "resume token is not supported by the server")
|
|
select {
|
|
case r.errCh <- nil:
|
|
default: // already have an error
|
|
}
|
|
return
|
|
} else if err != nil {
|
|
r.logger.Error(r.ctx, "error refreshing coordinator resume token", slog.Error(err))
|
|
select {
|
|
case r.errCh <- err:
|
|
default: // already have an error
|
|
}
|
|
return
|
|
}
|
|
r.logger.Debug(r.ctx, "refreshed coordinator resume token",
|
|
slog.F("expires_at", res.GetExpiresAt()),
|
|
slog.F("refresh_in", res.GetRefreshIn()),
|
|
)
|
|
r.ctrl.Lock()
|
|
if r.ctrl.refresher == r { // don't overwrite if we're not the current refresher
|
|
r.ctrl.token = res
|
|
} else {
|
|
r.logger.Debug(context.Background(), "not writing token because we have a new client")
|
|
}
|
|
r.ctrl.Unlock()
|
|
dur := res.RefreshIn.AsDuration()
|
|
if dur <= 0 {
|
|
// A sensible delay to refresh again.
|
|
dur = 30 * time.Minute
|
|
}
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
if r.closed {
|
|
return
|
|
}
|
|
r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh")
|
|
}
|
|
|
|
type TunnelAllWorkspaceUpdatesController struct {
|
|
coordCtrl *TunnelSrcCoordController
|
|
dnsHostSetter DNSHostsSetter
|
|
updateHandler UpdatesHandler
|
|
ownerUsername string
|
|
logger slog.Logger
|
|
|
|
mu sync.Mutex
|
|
updater *tunnelUpdater
|
|
}
|
|
|
|
type Workspace struct {
|
|
ID uuid.UUID
|
|
Name string
|
|
Status proto.Workspace_Status
|
|
|
|
ownerUsername string
|
|
agents map[uuid.UUID]*Agent
|
|
}
|
|
|
|
// updateDNSNames updates the DNS names for all agents in the workspace.
|
|
// DNS hosts must be all lowercase, or the resolver won't be able to find them.
|
|
// Usernames are globally unique & case-insensitive.
|
|
// Workspace names are unique per-user & case-insensitive.
|
|
// Agent names are unique per-workspace & case-insensitive.
|
|
func (w *Workspace) updateDNSNames() error {
|
|
wsName := strings.ToLower(w.Name)
|
|
username := strings.ToLower(w.ownerUsername)
|
|
for id, a := range w.agents {
|
|
agentName := strings.ToLower(a.Name)
|
|
names := make(map[dnsname.FQDN][]netip.Addr)
|
|
// TODO: technically, DNS labels cannot start with numbers, but the rules are often not
|
|
// strictly enforced.
|
|
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.%s.me.coder.", agentName, wsName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
|
|
fqdn, err = dnsname.ToFQDN(fmt.Sprintf("%s.%s.%s.coder.", agentName, wsName, username))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
|
|
if len(w.agents) == 1 {
|
|
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.coder.", wsName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, a := range w.agents {
|
|
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
|
|
}
|
|
}
|
|
a.Hosts = names
|
|
w.agents[id] = a
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type Agent struct {
|
|
ID uuid.UUID
|
|
Name string
|
|
WorkspaceID uuid.UUID
|
|
Hosts map[dnsname.FQDN][]netip.Addr
|
|
}
|
|
|
|
func (a *Agent) Clone() Agent {
|
|
hosts := make(map[dnsname.FQDN][]netip.Addr, len(a.Hosts))
|
|
for k, v := range a.Hosts {
|
|
hosts[k] = slices.Clone(v)
|
|
}
|
|
return Agent{
|
|
ID: a.ID,
|
|
Name: a.Name,
|
|
WorkspaceID: a.WorkspaceID,
|
|
Hosts: hosts,
|
|
}
|
|
}
|
|
|
|
func (t *TunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
updater := &tunnelUpdater{
|
|
client: client,
|
|
errChan: make(chan error, 1),
|
|
logger: t.logger,
|
|
coordCtrl: t.coordCtrl,
|
|
dnsHostsSetter: t.dnsHostSetter,
|
|
updateHandler: t.updateHandler,
|
|
ownerUsername: t.ownerUsername,
|
|
recvLoopDone: make(chan struct{}),
|
|
workspaces: make(map[uuid.UUID]*Workspace),
|
|
}
|
|
t.updater = updater
|
|
go t.updater.recvLoop()
|
|
return t.updater
|
|
}
|
|
|
|
func (t *TunnelAllWorkspaceUpdatesController) CurrentState() (WorkspaceUpdate, error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if t.updater == nil {
|
|
return WorkspaceUpdate{}, xerrors.New("no updater")
|
|
}
|
|
t.updater.Lock()
|
|
defer t.updater.Unlock()
|
|
out := WorkspaceUpdate{
|
|
UpsertedWorkspaces: make([]*Workspace, 0, len(t.updater.workspaces)),
|
|
UpsertedAgents: make([]*Agent, 0, len(t.updater.workspaces)),
|
|
DeletedWorkspaces: make([]*Workspace, 0),
|
|
DeletedAgents: make([]*Agent, 0),
|
|
}
|
|
for _, w := range t.updater.workspaces {
|
|
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &Workspace{
|
|
ID: w.ID,
|
|
Name: w.Name,
|
|
Status: w.Status,
|
|
})
|
|
for _, a := range w.agents {
|
|
out.UpsertedAgents = append(out.UpsertedAgents, ptr.Ref(a.Clone()))
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
type tunnelUpdater struct {
|
|
errChan chan error
|
|
logger slog.Logger
|
|
client WorkspaceUpdatesClient
|
|
coordCtrl *TunnelSrcCoordController
|
|
dnsHostsSetter DNSHostsSetter
|
|
updateHandler UpdatesHandler
|
|
ownerUsername string
|
|
recvLoopDone chan struct{}
|
|
|
|
sync.Mutex
|
|
workspaces map[uuid.UUID]*Workspace
|
|
closed bool
|
|
}
|
|
|
|
func (t *tunnelUpdater) Close(ctx context.Context) error {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
if t.closed {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-t.recvLoopDone:
|
|
return nil
|
|
}
|
|
}
|
|
t.closed = true
|
|
cErr := t.client.Close()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-t.recvLoopDone:
|
|
return cErr
|
|
}
|
|
}
|
|
|
|
func (t *tunnelUpdater) Wait() <-chan error {
|
|
return t.errChan
|
|
}
|
|
|
|
func (t *tunnelUpdater) recvLoop() {
|
|
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
|
|
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
|
|
defer close(t.recvLoopDone)
|
|
for {
|
|
update, err := t.client.Recv()
|
|
if err != nil {
|
|
t.logger.Debug(context.Background(), "failed to receive workspace Update", slog.Error(err))
|
|
select {
|
|
case t.errChan <- err:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
t.logger.Debug(context.Background(), "got workspace update",
|
|
slog.F("workspace_update", update),
|
|
)
|
|
err = t.handleUpdate(update)
|
|
if err != nil {
|
|
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
|
|
cErr := t.client.Close()
|
|
if cErr != nil {
|
|
t.logger.Warn(context.Background(), "failed to close client", slog.Error(cErr))
|
|
}
|
|
select {
|
|
case t.errChan <- err:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type WorkspaceUpdate struct {
|
|
UpsertedWorkspaces []*Workspace
|
|
UpsertedAgents []*Agent
|
|
DeletedWorkspaces []*Workspace
|
|
DeletedAgents []*Agent
|
|
}
|
|
|
|
func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
|
|
clone := WorkspaceUpdate{
|
|
UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)),
|
|
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
|
|
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
|
|
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
|
|
}
|
|
for i, ws := range w.UpsertedWorkspaces {
|
|
clone.UpsertedWorkspaces[i] = &Workspace{
|
|
ID: ws.ID,
|
|
Name: ws.Name,
|
|
Status: ws.Status,
|
|
}
|
|
}
|
|
for i, a := range w.UpsertedAgents {
|
|
clone.UpsertedAgents[i] = ptr.Ref(a.Clone())
|
|
}
|
|
for i, ws := range w.DeletedWorkspaces {
|
|
clone.DeletedWorkspaces[i] = &Workspace{
|
|
ID: ws.ID,
|
|
Name: ws.Name,
|
|
Status: ws.Status,
|
|
}
|
|
}
|
|
for i, a := range w.DeletedAgents {
|
|
clone.DeletedAgents[i] = ptr.Ref(a.Clone())
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
currentUpdate := WorkspaceUpdate{
|
|
UpsertedWorkspaces: []*Workspace{},
|
|
UpsertedAgents: []*Agent{},
|
|
DeletedWorkspaces: []*Workspace{},
|
|
DeletedAgents: []*Agent{},
|
|
}
|
|
|
|
for _, uw := range update.UpsertedWorkspaces {
|
|
workspaceID, err := uuid.FromBytes(uw.Id)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse workspace ID: %w", err)
|
|
}
|
|
w := &Workspace{
|
|
ID: workspaceID,
|
|
Name: uw.Name,
|
|
Status: uw.Status,
|
|
ownerUsername: t.ownerUsername,
|
|
agents: make(map[uuid.UUID]*Agent),
|
|
}
|
|
t.upsertWorkspaceLocked(w)
|
|
currentUpdate.UpsertedWorkspaces = append(currentUpdate.UpsertedWorkspaces, w)
|
|
}
|
|
|
|
// delete agents before deleting workspaces, since the agents have workspace ID references
|
|
for _, da := range update.DeletedAgents {
|
|
agentID, err := uuid.FromBytes(da.Id)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse agent ID: %w", err)
|
|
}
|
|
workspaceID, err := uuid.FromBytes(da.WorkspaceId)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse workspace ID: %w", err)
|
|
}
|
|
deletedAgent, err := t.deleteAgentLocked(workspaceID, agentID)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to delete agent: %w", err)
|
|
}
|
|
currentUpdate.DeletedAgents = append(currentUpdate.DeletedAgents, deletedAgent)
|
|
}
|
|
for _, dw := range update.DeletedWorkspaces {
|
|
workspaceID, err := uuid.FromBytes(dw.Id)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse workspace ID: %w", err)
|
|
}
|
|
deletedWorkspace, err := t.deleteWorkspaceLocked(workspaceID)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to delete workspace: %w", err)
|
|
}
|
|
currentUpdate.DeletedWorkspaces = append(currentUpdate.DeletedWorkspaces, deletedWorkspace)
|
|
}
|
|
|
|
// upsert agents last, after all workspaces have been added and deleted, since agents reference
|
|
// workspace ID.
|
|
for _, ua := range update.UpsertedAgents {
|
|
agentID, err := uuid.FromBytes(ua.Id)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse agent ID: %w", err)
|
|
}
|
|
workspaceID, err := uuid.FromBytes(ua.WorkspaceId)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse workspace ID: %w", err)
|
|
}
|
|
a := &Agent{Name: ua.Name, ID: agentID, WorkspaceID: workspaceID}
|
|
err = t.upsertAgentLocked(workspaceID, a)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to upsert agent: %w", err)
|
|
}
|
|
currentUpdate.UpsertedAgents = append(currentUpdate.UpsertedAgents, a)
|
|
}
|
|
allAgents := t.allAgentIDsLocked()
|
|
t.coordCtrl.SyncDestinations(allAgents)
|
|
dnsNames := t.updateDNSNamesLocked()
|
|
if t.dnsHostsSetter != nil {
|
|
t.logger.Debug(context.Background(), "updating dns hosts")
|
|
err := t.dnsHostsSetter.SetDNSHosts(dnsNames)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to set DNS hosts: %w", err)
|
|
}
|
|
} else {
|
|
t.logger.Debug(context.Background(), "skipping setting DNS names because we have no setter")
|
|
}
|
|
if t.updateHandler != nil {
|
|
t.logger.Debug(context.Background(), "calling update handler")
|
|
err := t.updateHandler.Update(currentUpdate.Clone())
|
|
if err != nil {
|
|
t.logger.Error(context.Background(), "failed to call update handler", slog.Error(err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *tunnelUpdater) upsertWorkspaceLocked(w *Workspace) *Workspace {
|
|
old, ok := t.workspaces[w.ID]
|
|
if !ok {
|
|
t.workspaces[w.ID] = w
|
|
return w
|
|
}
|
|
old.Name = w.Name
|
|
old.Status = w.Status
|
|
old.ownerUsername = w.ownerUsername
|
|
return w
|
|
}
|
|
|
|
func (t *tunnelUpdater) deleteWorkspaceLocked(id uuid.UUID) (*Workspace, error) {
|
|
w, ok := t.workspaces[id]
|
|
if !ok {
|
|
return nil, xerrors.Errorf("workspace %s not found", id)
|
|
}
|
|
delete(t.workspaces, id)
|
|
return w, nil
|
|
}
|
|
|
|
func (t *tunnelUpdater) upsertAgentLocked(workspaceID uuid.UUID, a *Agent) error {
|
|
w, ok := t.workspaces[workspaceID]
|
|
if !ok {
|
|
return xerrors.Errorf("workspace %s not found", workspaceID)
|
|
}
|
|
w.agents[a.ID] = a
|
|
return nil
|
|
}
|
|
|
|
func (t *tunnelUpdater) deleteAgentLocked(workspaceID, id uuid.UUID) (*Agent, error) {
|
|
w, ok := t.workspaces[workspaceID]
|
|
if !ok {
|
|
return nil, xerrors.Errorf("workspace %s not found", workspaceID)
|
|
}
|
|
a, ok := w.agents[id]
|
|
if !ok {
|
|
return nil, xerrors.Errorf("agent %s not found in workspace %s", id, workspaceID)
|
|
}
|
|
delete(w.agents, id)
|
|
return a, nil
|
|
}
|
|
|
|
func (t *tunnelUpdater) allAgentIDsLocked() []uuid.UUID {
|
|
out := make([]uuid.UUID, 0, len(t.workspaces))
|
|
for _, w := range t.workspaces {
|
|
for id := range w.agents {
|
|
out = append(out, id)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// updateDNSNamesLocked updates the DNS names for all workspaces in the tunnelUpdater.
|
|
// t.Mutex must be held.
|
|
func (t *tunnelUpdater) updateDNSNamesLocked() map[dnsname.FQDN][]netip.Addr {
|
|
names := make(map[dnsname.FQDN][]netip.Addr)
|
|
for _, w := range t.workspaces {
|
|
err := w.updateDNSNames()
|
|
if err != nil {
|
|
// This should never happen in production, because converting the FQDN only fails
|
|
// if names are too long, and we put strict length limits on agent, workspace, and user
|
|
// names.
|
|
t.logger.Critical(context.Background(),
|
|
"failed to include DNS name(s)",
|
|
slog.F("workspace_id", w.ID),
|
|
slog.Error(err))
|
|
}
|
|
for _, a := range w.agents {
|
|
for name, addrs := range a.Hosts {
|
|
names[name] = addrs
|
|
}
|
|
}
|
|
}
|
|
return names
|
|
}
|
|
|
|
type TunnelAllOption func(t *TunnelAllWorkspaceUpdatesController)
|
|
|
|
// WithDNS configures the tunnelAllWorkspaceUpdatesController to set DNS names for all workspaces
|
|
// and agents it learns about.
|
|
func WithDNS(d DNSHostsSetter, ownerUsername string) TunnelAllOption {
|
|
return func(t *TunnelAllWorkspaceUpdatesController) {
|
|
t.dnsHostSetter = d
|
|
t.ownerUsername = ownerUsername
|
|
}
|
|
}
|
|
|
|
func WithHandler(h UpdatesHandler) TunnelAllOption {
|
|
return func(t *TunnelAllWorkspaceUpdatesController) {
|
|
t.updateHandler = h
|
|
}
|
|
}
|
|
|
|
// NewTunnelAllWorkspaceUpdatesController creates a WorkspaceUpdatesController that creates tunnels
|
|
// (via the TunnelSrcCoordController) to all agents received over the WorkspaceUpdates RPC. If a
|
|
// DNSHostSetter is provided, it also programs DNS hosts based on the agent and workspace names.
|
|
func NewTunnelAllWorkspaceUpdatesController(
|
|
logger slog.Logger, c *TunnelSrcCoordController, opts ...TunnelAllOption,
|
|
) *TunnelAllWorkspaceUpdatesController {
|
|
t := &TunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c}
|
|
for _, opt := range opts {
|
|
opt(t)
|
|
}
|
|
return t
|
|
}
|
|
|
|
// NewController creates a new Controller without running it
|
|
func NewController(logger slog.Logger, dialer ControlProtocolDialer, opts ...ControllerOpt) *Controller {
|
|
c := &Controller{
|
|
logger: logger,
|
|
clock: quartz.NewReal(),
|
|
gracefulTimeout: time.Second,
|
|
Dialer: dialer,
|
|
closedCh: make(chan struct{}),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
return c
|
|
}
|
|
|
|
type ControllerOpt func(*Controller)
|
|
|
|
func WithTestClock(clock quartz.Clock) ControllerOpt {
|
|
return func(c *Controller) {
|
|
c.clock = clock
|
|
}
|
|
}
|
|
|
|
func WithGracefulTimeout(timeout time.Duration) ControllerOpt {
|
|
return func(c *Controller) {
|
|
c.gracefulTimeout = timeout
|
|
}
|
|
}
|
|
|
|
// manageGracefulTimeout allows the gracefulContext to last longer than the main context
|
|
// to allow a graceful disconnect.
|
|
func (c *Controller) manageGracefulTimeout() {
|
|
defer c.cancelGracefulCtx()
|
|
<-c.ctx.Done()
|
|
timer := c.clock.NewTimer(c.gracefulTimeout, "tailnetAPIClient", "gracefulTimeout")
|
|
defer timer.Stop()
|
|
select {
|
|
case <-c.closedCh:
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
// Run dials the API and uses it with the provided controllers.
|
|
func (c *Controller) Run(ctx context.Context) {
|
|
c.ctx = ctx
|
|
c.gracefulCtx, c.cancelGracefulCtx = context.WithCancel(context.Background())
|
|
go c.manageGracefulTimeout()
|
|
go func() {
|
|
defer close(c.closedCh)
|
|
// Sadly retry doesn't support quartz.Clock yet so this is not
|
|
// influenced by the configured clock.
|
|
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(c.ctx); {
|
|
// Check the context again before dialing, since `retrier.Wait()` could return true
|
|
// if the delay is 0, even if the context was canceled. This ensures we don't redial
|
|
// after a graceful shutdown.
|
|
if c.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
tailnetClients, err := c.Dialer.Dial(c.ctx, c.ResumeTokenCtrl)
|
|
if err != nil {
|
|
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
|
|
return
|
|
}
|
|
errF := slog.Error(err)
|
|
var sdkErr *codersdk.Error
|
|
if xerrors.As(err, &sdkErr) {
|
|
errF = slog.Error(sdkErr)
|
|
}
|
|
c.logger.Error(c.ctx, "failed to dial tailnet v2+ API", errF)
|
|
continue
|
|
}
|
|
c.logger.Debug(c.ctx, "obtained tailnet API v2+ client")
|
|
err = c.precheckClientsAndControllers(tailnetClients)
|
|
if err != nil {
|
|
c.logger.Critical(c.ctx, "failed precheck", slog.Error(err))
|
|
_ = tailnetClients.Closer.Close()
|
|
continue
|
|
}
|
|
retrier.Reset()
|
|
c.runControllersOnce(tailnetClients)
|
|
c.logger.Debug(c.ctx, "tailnet API v2+ connection lost")
|
|
}
|
|
}()
|
|
}
|
|
|
|
// precheckClientsAndControllers checks that the set of clients we got is compatible with the
|
|
// configured controllers. These checks will fail if the dialer is incompatible with the set of
|
|
// controllers, or not configured correctly with respect to Tailnet API version.
|
|
func (c *Controller) precheckClientsAndControllers(clients ControlProtocolClients) error {
|
|
if clients.Coordinator == nil && c.CoordCtrl != nil {
|
|
return xerrors.New("missing Coordinator client; have controller")
|
|
}
|
|
if clients.DERP == nil && c.DERPCtrl != nil {
|
|
return xerrors.New("missing DERPMap client; have controller")
|
|
}
|
|
if clients.WorkspaceUpdates == nil && c.WorkspaceUpdatesCtrl != nil {
|
|
return xerrors.New("missing WorkspaceUpdates client; have controller")
|
|
}
|
|
|
|
// Telemetry and ResumeToken support is considered optional, but the clients must be present
|
|
// so that we can call the functions and get an "unimplemented" error.
|
|
if clients.ResumeToken == nil && c.ResumeTokenCtrl != nil {
|
|
return xerrors.New("missing ResumeToken client; have controller")
|
|
}
|
|
if clients.Telemetry == nil && c.TelemetryCtrl != nil {
|
|
return xerrors.New("missing Telemetry client; have controller")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// runControllersOnce uses the provided clients to call into the controllers once. It is combined
|
|
// into one function so that a problem with one tears down the other and triggers a retry (if
|
|
// appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share
|
|
// the same fate.
|
|
func (c *Controller) runControllersOnce(clients ControlProtocolClients) {
|
|
// clients.Closer.Close should nominally be idempotent, but let's not press our luck
|
|
closeOnce := sync.Once{}
|
|
closeClients := func() {
|
|
closeOnce.Do(func() {
|
|
closeErr := clients.Closer.Close()
|
|
if closeErr != nil &&
|
|
!xerrors.Is(closeErr, io.EOF) &&
|
|
!xerrors.Is(closeErr, context.Canceled) &&
|
|
!xerrors.Is(closeErr, context.DeadlineExceeded) {
|
|
c.logger.Error(c.ctx, "error closing tailnet clients", slog.Error(closeErr))
|
|
}
|
|
})
|
|
}
|
|
defer closeClients()
|
|
|
|
if c.TelemetryCtrl != nil {
|
|
c.TelemetryCtrl.New(clients.Telemetry) // synchronous, doesn't need a goroutine
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
if c.CoordCtrl != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
c.coordinate(clients.Coordinator)
|
|
if c.ctx.Err() == nil {
|
|
// Main context is still active, but our coordination exited, due to some error.
|
|
// Close down all the rest of the clients so we'll exit and retry.
|
|
closeClients()
|
|
}
|
|
}()
|
|
}
|
|
if c.DERPCtrl != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
dErr := c.derpMap(clients.DERP)
|
|
if dErr != nil && c.ctx.Err() == nil {
|
|
// The main context is still active, meaning that we want the tailnet data plane to stay
|
|
// up, even though we hit some error getting DERP maps on the control plane. That means
|
|
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
|
|
// close the underlying connection. This will trigger a retry of the control plane in
|
|
// run().
|
|
closeClients()
|
|
}
|
|
}()
|
|
}
|
|
if c.WorkspaceUpdatesCtrl != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
c.workspaceUpdates(clients.WorkspaceUpdates)
|
|
if c.ctx.Err() == nil {
|
|
// Main context is still active, but our workspace updates stream exited, due to
|
|
// some error. Close down all the rest of the clients so we'll exit and retry.
|
|
closeClients()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Refresh token is a little different, in that we don't want its controller to hold open the
|
|
// connection on its own. So we keep it separate from the other wait group, and cancel its
|
|
// context as soon as the other routines exit.
|
|
refreshTokenCtx, refreshTokenCancel := context.WithCancel(c.ctx)
|
|
refreshTokenDone := make(chan struct{})
|
|
defer func() {
|
|
<-refreshTokenDone
|
|
}()
|
|
defer refreshTokenCancel()
|
|
go func() {
|
|
defer close(refreshTokenDone)
|
|
if c.ResumeTokenCtrl != nil {
|
|
c.refreshToken(refreshTokenCtx, clients.ResumeToken)
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (c *Controller) coordinate(client CoordinatorClient) {
|
|
defer func() {
|
|
cErr := client.Close()
|
|
if cErr != nil {
|
|
c.logger.Debug(c.ctx, "error closing Coordinate RPC", slog.Error(cErr))
|
|
}
|
|
}()
|
|
coordination := c.CoordCtrl.New(client)
|
|
c.logger.Debug(c.ctx, "serving coordinator")
|
|
select {
|
|
case <-c.ctx.Done():
|
|
c.logger.Debug(c.ctx, "main context canceled; do graceful disconnect")
|
|
crdErr := coordination.Close(c.gracefulCtx)
|
|
if crdErr != nil {
|
|
c.logger.Warn(c.ctx, "failed to close remote coordination", slog.Error(crdErr))
|
|
}
|
|
case err := <-coordination.Wait():
|
|
if err != nil &&
|
|
!xerrors.Is(err, io.EOF) &&
|
|
!xerrors.Is(err, context.Canceled) &&
|
|
!xerrors.Is(err, context.DeadlineExceeded) {
|
|
c.logger.Error(c.ctx, "remote coordination error", slog.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) derpMap(client DERPClient) error {
|
|
defer func() {
|
|
cErr := client.Close()
|
|
if cErr != nil {
|
|
c.logger.Debug(c.ctx, "error closing StreamDERPMaps RPC", slog.Error(cErr))
|
|
}
|
|
}()
|
|
cw := c.DERPCtrl.New(client)
|
|
select {
|
|
case <-c.ctx.Done():
|
|
cErr := client.Close()
|
|
if cErr != nil {
|
|
c.logger.Warn(c.ctx, "failed to close StreamDERPMaps RPC", slog.Error(cErr))
|
|
}
|
|
return nil
|
|
case err := <-cw.Wait():
|
|
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
|
|
return nil
|
|
}
|
|
if err != nil && !xerrors.Is(err, io.EOF) {
|
|
c.logger.Error(c.ctx, "error receiving DERP Map", slog.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (c *Controller) workspaceUpdates(client WorkspaceUpdatesClient) {
|
|
defer func() {
|
|
c.logger.Debug(c.ctx, "exiting workspaceUpdates control routine")
|
|
cErr := client.Close()
|
|
if cErr != nil {
|
|
c.logger.Debug(c.ctx, "error closing WorkspaceUpdates RPC", slog.Error(cErr))
|
|
}
|
|
}()
|
|
cw := c.WorkspaceUpdatesCtrl.New(client)
|
|
select {
|
|
case <-c.ctx.Done():
|
|
c.logger.Debug(c.ctx, "workspaceUpdates: context done")
|
|
return
|
|
case err := <-cw.Wait():
|
|
c.logger.Debug(c.ctx, "workspaceUpdates: wait done")
|
|
if err != nil &&
|
|
!xerrors.Is(err, io.EOF) &&
|
|
!xerrors.Is(err, context.Canceled) &&
|
|
!xerrors.Is(err, context.DeadlineExceeded) {
|
|
c.logger.Error(c.ctx, "workspace updates stream error", slog.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) refreshToken(ctx context.Context, client ResumeTokenClient) {
|
|
cw := c.ResumeTokenCtrl.New(client)
|
|
go func() {
|
|
<-ctx.Done()
|
|
cErr := cw.Close(c.ctx)
|
|
if cErr != nil {
|
|
c.logger.Error(c.ctx, "error closing token refresher", slog.Error(cErr))
|
|
}
|
|
}()
|
|
|
|
err := <-cw.Wait()
|
|
if err != nil && !xerrors.Is(err, context.Canceled) && !xerrors.Is(err, context.DeadlineExceeded) {
|
|
c.logger.Error(c.ctx, "error receiving refresh token", slog.Error(err))
|
|
}
|
|
}
|
|
|
|
func (c *Controller) Closed() <-chan struct{} {
|
|
return c.closedCh
|
|
}
|