mirror of
https://github.com/coder/coder.git
synced 2025-07-06 15:41:45 +00:00
chore: add derpserver to wsproxy, add proxies to derpmap (#7311)
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -14,6 +15,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/buildinfo"
|
||||
agpl "github.com/coder/coder/coderd"
|
||||
"github.com/coder/coder/coderd/audit"
|
||||
"github.com/coder/coder/coderd/database"
|
||||
@ -25,6 +27,7 @@ import (
|
||||
"github.com/coder/coder/codersdk"
|
||||
"github.com/coder/coder/cryptorand"
|
||||
"github.com/coder/coder/enterprise/coderd/proxyhealth"
|
||||
"github.com/coder/coder/enterprise/replicasync"
|
||||
"github.com/coder/coder/enterprise/wsproxy/wsproxysdk"
|
||||
)
|
||||
|
||||
@ -347,10 +350,13 @@ func (api *API) postWorkspaceProxy(rw http.ResponseWriter, r *http.Request) {
|
||||
DisplayName: req.DisplayName,
|
||||
Icon: req.Icon,
|
||||
TokenHashedSecret: hashedSecret[:],
|
||||
CreatedAt: database.Now(),
|
||||
UpdatedAt: database.Now(),
|
||||
// Enabled by default, but will be disabled on register if the proxy has
|
||||
// it disabled.
|
||||
DerpEnabled: true,
|
||||
CreatedAt: database.Now(),
|
||||
UpdatedAt: database.Now(),
|
||||
})
|
||||
if database.IsUniqueViolation(err) {
|
||||
if database.IsUniqueViolation(err, database.UniqueWorkspaceProxiesLowerNameIndex) {
|
||||
httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
|
||||
Message: fmt.Sprintf("Workspace proxy with name %q already exists.", req.Name),
|
||||
})
|
||||
@ -489,13 +495,17 @@ func (api *API) workspaceProxyIssueSignedAppToken(rw http.ResponseWriter, r *htt
|
||||
// in the database and returns a signed token that can be used to authenticate
|
||||
// tokens.
|
||||
//
|
||||
// This is called periodically by the proxy in the background (every 30s per
|
||||
// replica) to ensure that the proxy is still registered and the corresponding
|
||||
// replica table entry is refreshed.
|
||||
//
|
||||
// @Summary Register workspace proxy
|
||||
// @ID register-workspace-proxy
|
||||
// @Security CoderSessionToken
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Tags Enterprise
|
||||
// @Param request body wsproxysdk.RegisterWorkspaceProxyRequest true "Issue signed app token request"
|
||||
// @Param request body wsproxysdk.RegisterWorkspaceProxyRequest true "Register workspace proxy request"
|
||||
// @Success 201 {object} wsproxysdk.RegisterWorkspaceProxyResponse
|
||||
// @Router /workspaceproxies/me/register [post]
|
||||
// @x-apidocgen {"skip": true}
|
||||
@ -523,6 +533,17 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
// Version check should be forced in non-dev builds and when running in
|
||||
// tests.
|
||||
shouldForceVersion := !buildinfo.IsDev() || flag.Lookup("test.v") != nil
|
||||
if shouldForceVersion && req.Version != buildinfo.Version() {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Version mismatch.",
|
||||
Detail: fmt.Sprintf("Proxy version %q does not match primary server version %q", req.Version, buildinfo.Version()),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := validateProxyURL(req.AccessURL); err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "URL is invalid.",
|
||||
@ -541,11 +562,80 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := api.Database.RegisterWorkspaceProxy(ctx, database.RegisterWorkspaceProxyParams{
|
||||
ID: proxy.ID,
|
||||
Url: req.AccessURL,
|
||||
WildcardHostname: req.WildcardHostname,
|
||||
})
|
||||
if req.ReplicaID == uuid.Nil {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Replica ID is invalid.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
startingRegionID, _ := getProxyDERPStartingRegionID(api.Options.BaseDERPMap)
|
||||
regionID := int32(startingRegionID) + proxy.RegionID
|
||||
|
||||
err := api.Database.InTx(func(db database.Store) error {
|
||||
// First, update the proxy's values in the database.
|
||||
_, err := db.RegisterWorkspaceProxy(ctx, database.RegisterWorkspaceProxyParams{
|
||||
ID: proxy.ID,
|
||||
Url: req.AccessURL,
|
||||
DerpEnabled: req.DerpEnabled,
|
||||
WildcardHostname: req.WildcardHostname,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("register workspace proxy: %w", err)
|
||||
}
|
||||
|
||||
// Second, find the replica that corresponds to this proxy and refresh
|
||||
// it if it exists. If it doesn't exist, create it.
|
||||
now := time.Now()
|
||||
replica, err := db.GetReplicaByID(ctx, req.ReplicaID)
|
||||
if err == nil {
|
||||
// Replica exists, update it.
|
||||
if replica.StoppedAt.Valid && !replica.StartedAt.IsZero() {
|
||||
// If the replica deregistered, it shouldn't be able to
|
||||
// re-register before restarting.
|
||||
// TODO: sadly this results in 500 when it should be 400
|
||||
return xerrors.Errorf("replica %s is marked stopped", replica.ID)
|
||||
}
|
||||
|
||||
replica, err = db.UpdateReplica(ctx, database.UpdateReplicaParams{
|
||||
ID: replica.ID,
|
||||
UpdatedAt: now,
|
||||
StartedAt: replica.StartedAt,
|
||||
StoppedAt: replica.StoppedAt,
|
||||
RelayAddress: req.ReplicaRelayAddress,
|
||||
RegionID: regionID,
|
||||
Hostname: req.ReplicaHostname,
|
||||
Version: req.Version,
|
||||
Error: req.ReplicaError,
|
||||
DatabaseLatency: 0,
|
||||
Primary: false,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("update replica: %w", err)
|
||||
}
|
||||
} else if xerrors.Is(err, sql.ErrNoRows) {
|
||||
// Replica doesn't exist, create it.
|
||||
replica, err = db.InsertReplica(ctx, database.InsertReplicaParams{
|
||||
ID: req.ReplicaID,
|
||||
CreatedAt: now,
|
||||
StartedAt: now,
|
||||
UpdatedAt: now,
|
||||
Hostname: req.ReplicaHostname,
|
||||
RegionID: regionID,
|
||||
RelayAddress: req.ReplicaRelayAddress,
|
||||
Version: req.Version,
|
||||
DatabaseLatency: 0,
|
||||
Primary: false,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("insert replica: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return xerrors.Errorf("get replica: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil)
|
||||
if httpapi.Is404Error(err) {
|
||||
httpapi.ResourceNotFound(rw)
|
||||
return
|
||||
@ -555,39 +645,112 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
// Update replica sync and notify all other replicas to update their
|
||||
// replica list.
|
||||
err = api.replicaManager.PublishUpdate()
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
replicaUpdateCtx, replicaUpdateCancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer replicaUpdateCancel()
|
||||
err = api.replicaManager.UpdateNow(replicaUpdateCtx)
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Find sibling regions to respond with for derpmesh.
|
||||
siblings := api.replicaManager.InRegion(regionID)
|
||||
siblingsRes := make([]codersdk.Replica, 0, len(siblings))
|
||||
for _, replica := range siblings {
|
||||
if replica.ID == req.ReplicaID {
|
||||
continue
|
||||
}
|
||||
siblingsRes = append(siblingsRes, convertReplica(replica))
|
||||
}
|
||||
|
||||
// aReq.New = updatedProxy
|
||||
httpapi.Write(ctx, rw, http.StatusCreated, wsproxysdk.RegisterWorkspaceProxyResponse{
|
||||
AppSecurityKey: api.AppSecurityKey.String(),
|
||||
AppSecurityKey: api.AppSecurityKey.String(),
|
||||
DERPMeshKey: api.DERPServer.MeshKey(),
|
||||
DERPRegionID: regionID,
|
||||
SiblingReplicas: siblingsRes,
|
||||
})
|
||||
|
||||
go api.forceWorkspaceProxyHealthUpdate(api.ctx)
|
||||
}
|
||||
|
||||
// workspaceProxyGoingAway is used to tell coderd that the workspace proxy is
|
||||
// shutting down and going away. The main purpose of this function is for the
|
||||
// health status of the workspace proxy to be more quickly updated when we know
|
||||
// that the proxy is going to be unhealthy. This does not delete the workspace
|
||||
// or cause any other side effects.
|
||||
// If the workspace proxy comes back online, even without a register, it will
|
||||
// be found healthy again by the normal checks.
|
||||
// @Summary Workspace proxy going away
|
||||
// @ID workspace-proxy-going-away
|
||||
// @Summary Deregister workspace proxy
|
||||
// @ID deregister-workspace-proxy
|
||||
// @Security CoderSessionToken
|
||||
// @Produce json
|
||||
// @Accept json
|
||||
// @Tags Enterprise
|
||||
// @Success 201 {object} codersdk.Response
|
||||
// @Router /workspaceproxies/me/goingaway [post]
|
||||
// @Param request body wsproxysdk.DeregisterWorkspaceProxyRequest true "Deregister workspace proxy request"
|
||||
// @Success 204
|
||||
// @Router /workspaceproxies/me/deregister [post]
|
||||
// @x-apidocgen {"skip": true}
|
||||
func (api *API) workspaceProxyGoingAway(rw http.ResponseWriter, r *http.Request) {
|
||||
func (api *API) workspaceProxyDeregister(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
// Force a health update to happen immediately. The proxy should
|
||||
// not return a successful response if it is going away.
|
||||
go api.forceWorkspaceProxyHealthUpdate(api.ctx)
|
||||
var req wsproxysdk.DeregisterWorkspaceProxyRequest
|
||||
if !httpapi.Read(ctx, rw, r, &req) {
|
||||
return
|
||||
}
|
||||
|
||||
httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
|
||||
Message: "OK",
|
||||
})
|
||||
err := api.Database.InTx(func(db database.Store) error {
|
||||
now := time.Now()
|
||||
replica, err := db.GetReplicaByID(ctx, req.ReplicaID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get replica: %w", err)
|
||||
}
|
||||
|
||||
if replica.StoppedAt.Valid && !replica.StartedAt.IsZero() {
|
||||
// TODO: sadly this results in 500 when it should be 400
|
||||
return xerrors.Errorf("replica %s is already marked stopped", replica.ID)
|
||||
}
|
||||
|
||||
replica, err = db.UpdateReplica(ctx, database.UpdateReplicaParams{
|
||||
ID: replica.ID,
|
||||
UpdatedAt: now,
|
||||
StartedAt: replica.StartedAt,
|
||||
StoppedAt: sql.NullTime{
|
||||
Valid: true,
|
||||
Time: now,
|
||||
},
|
||||
RelayAddress: replica.RelayAddress,
|
||||
RegionID: replica.RegionID,
|
||||
Hostname: replica.Hostname,
|
||||
Version: replica.Version,
|
||||
Error: replica.Error,
|
||||
DatabaseLatency: replica.DatabaseLatency,
|
||||
Primary: replica.Primary,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("update replica: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil)
|
||||
if httpapi.Is404Error(err) {
|
||||
httpapi.ResourceNotFound(rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Publish a replicasync event with a nil ID so every replica (yes, even the
|
||||
// current replica) will refresh its replicas list.
|
||||
err = api.Pubsub.Publish(replicasync.PubsubEvent, []byte(uuid.Nil.String()))
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
rw.WriteHeader(http.StatusNoContent)
|
||||
go api.forceWorkspaceProxyHealthUpdate(api.ctx)
|
||||
}
|
||||
|
||||
// reconnectingPTYSignedToken issues a signed app token for use when connecting
|
||||
@ -670,7 +833,8 @@ func (api *API) reconnectingPTYSignedToken(rw http.ResponseWriter, r *http.Reque
|
||||
},
|
||||
SessionToken: httpmw.APITokenFromRequest(r),
|
||||
// The following fields aren't required as long as the request is authed
|
||||
// with a valid API key.
|
||||
// with a valid API key, which we know since this endpoint is protected
|
||||
// by auth middleware already.
|
||||
PathAppBaseURL: "",
|
||||
AppHostname: "",
|
||||
// The following fields are empty for terminal apps.
|
||||
@ -733,10 +897,11 @@ func convertProxy(p database.WorkspaceProxy, status proxyhealth.ProxyStatus) cod
|
||||
status.Status = proxyhealth.Unknown
|
||||
}
|
||||
return codersdk.WorkspaceProxy{
|
||||
Region: convertRegion(p, status),
|
||||
CreatedAt: p.CreatedAt,
|
||||
UpdatedAt: p.UpdatedAt,
|
||||
Deleted: p.Deleted,
|
||||
Region: convertRegion(p, status),
|
||||
DerpEnabled: p.DerpEnabled,
|
||||
CreatedAt: p.CreatedAt,
|
||||
UpdatedAt: p.UpdatedAt,
|
||||
Deleted: p.Deleted,
|
||||
Status: codersdk.WorkspaceProxyStatus{
|
||||
Status: codersdk.ProxyHealthStatus(status.Status),
|
||||
Report: status.Report,
|
||||
|
Reference in New Issue
Block a user