mirror of
https://github.com/coder/coder.git
synced 2025-07-03 16:13:58 +00:00
* feat: Add app support This adds apps as a property to a workspace agent. The resource is added to the Terraform provider here: https://github.com/coder/terraform-provider-coder/pull/17 Apps will be opened in the dashboard or via the CLI with `coder open <name>`. If `command` is specified, a terminal will appear locally and in the web. If `target` is specified, the browser will open to an exposed instance of that target. * Compare fields in apps test * Update Terraform provider to use relative path * Add some basic structure for routing * chore: Remove interface from coderd and lift API surface Abstracting coderd into an interface added misdirection because the interface was never intended to be fulfilled outside of a single implementation. This lifts the abstraction, and attaches all handlers to a root struct named `*coderd.API`. * Add basic proxy logic * Add proxying based on path * Add app proxying for wildcards * Add wsconncache * fix: Race when writing to a closed pipe This is such an intermittent race it's difficult to track, but regardless this is an improvement to the code. * fix: Race when writing to a closed pipe This is such an intermittent race it's difficult to track, but regardless this is an improvement to the code. * fix: Race when writing to a closed pipe This is such an intermittent race it's difficult to track, but regardless this is an improvement to the code. * fix: Race when writing to a closed pipe This is such an intermittent race it's difficult to track, but regardless this is an improvement to the code. * Add workspace route proxying endpoint - Makes the workspace conn cache concurrency-safe - Reduces unnecessary open checks in `peer.Channel` - Fixes the use of a temporary context when dialing a workspace agent * Add embed errors * chore: Refactor site to improve testing It was difficult to develop this package due to the embed build tag being mandatory on the tests. The logic to test doesn't require any embedded files. * Add test for error handler * Remove unused access url * Add RBAC tests * Fix dial agent syntax * Fix linting errors * Fix gen * Fix icon required * Adjust migration number * Fix proxy error status code * Fix empty db lookup
163 lines
4.1 KiB
Go
163 lines
4.1 KiB
Go
// Package wsconncache caches workspace agent connections by UUID.
|
|
package wsconncache
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/singleflight"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/coder/coder/agent"
|
|
)
|
|
|
|
// New creates a new workspace connection cache that closes
|
|
// connections after the inactive timeout provided.
|
|
//
|
|
// Agent connections are cached due to WebRTC negotiation
|
|
// taking a few hundred milliseconds.
|
|
func New(dialer Dialer, inactiveTimeout time.Duration) *Cache {
|
|
if inactiveTimeout == 0 {
|
|
inactiveTimeout = 5 * time.Minute
|
|
}
|
|
return &Cache{
|
|
closed: make(chan struct{}),
|
|
dialer: dialer,
|
|
inactiveTimeout: inactiveTimeout,
|
|
}
|
|
}
|
|
|
|
// Dialer creates a new agent connection by ID.
|
|
type Dialer func(r *http.Request, id uuid.UUID) (*agent.Conn, error)
|
|
|
|
// Conn wraps an agent connection with a reusable HTTP transport.
|
|
type Conn struct {
|
|
*agent.Conn
|
|
|
|
locks atomic.Uint64
|
|
timeoutMutex sync.Mutex
|
|
timeout *time.Timer
|
|
timeoutCancel context.CancelFunc
|
|
transport *http.Transport
|
|
}
|
|
|
|
func (c *Conn) HTTPTransport() *http.Transport {
|
|
return c.transport
|
|
}
|
|
|
|
// CloseWithError ends the HTTP transport if exists, and closes the agent.
|
|
func (c *Conn) CloseWithError(err error) error {
|
|
if c.transport != nil {
|
|
c.transport.CloseIdleConnections()
|
|
}
|
|
c.timeoutMutex.Lock()
|
|
defer c.timeoutMutex.Unlock()
|
|
if c.timeout != nil {
|
|
c.timeout.Stop()
|
|
}
|
|
return c.Conn.CloseWithError(err)
|
|
}
|
|
|
|
type Cache struct {
|
|
closed chan struct{}
|
|
closeMutex sync.Mutex
|
|
closeGroup sync.WaitGroup
|
|
connGroup singleflight.Group
|
|
connMap sync.Map
|
|
dialer Dialer
|
|
inactiveTimeout time.Duration
|
|
}
|
|
|
|
// Acquire gets or establishes a connection with the dialer using the ID provided.
|
|
// If a connection is in-progress, that connection or error will be returned.
|
|
//
|
|
// The returned function is used to release a lock on the connection. Once zero
|
|
// locks exist on a connection, the inactive timeout will begin to tick down.
|
|
// After the time expires, the connection will be cleared from the cache.
|
|
func (c *Cache) Acquire(r *http.Request, id uuid.UUID) (*Conn, func(), error) {
|
|
rawConn, found := c.connMap.Load(id.String())
|
|
// If the connection isn't found, establish a new one!
|
|
if !found {
|
|
var err error
|
|
// A singleflight group is used to allow for concurrent requests to the
|
|
// same identifier to resolve.
|
|
rawConn, err, _ = c.connGroup.Do(id.String(), func() (interface{}, error) {
|
|
agentConn, err := c.dialer(r, id)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("dial: %w", err)
|
|
}
|
|
timeoutCtx, timeoutCancelFunc := context.WithCancel(context.Background())
|
|
defaultTransport, valid := http.DefaultTransport.(*http.Transport)
|
|
if !valid {
|
|
panic("dev error: default transport is the wrong type")
|
|
}
|
|
transport := defaultTransport.Clone()
|
|
transport.DialContext = agentConn.DialContext
|
|
conn := &Conn{
|
|
Conn: agentConn,
|
|
timeoutCancel: timeoutCancelFunc,
|
|
transport: transport,
|
|
}
|
|
c.closeMutex.Lock()
|
|
c.closeGroup.Add(1)
|
|
c.closeMutex.Unlock()
|
|
go func() {
|
|
defer c.closeGroup.Done()
|
|
var err error
|
|
select {
|
|
case <-timeoutCtx.Done():
|
|
err = xerrors.New("cache timeout")
|
|
case <-c.closed:
|
|
err = xerrors.New("cache closed")
|
|
case <-conn.Closed():
|
|
}
|
|
|
|
c.connMap.Delete(id.String())
|
|
c.connGroup.Forget(id.String())
|
|
_ = conn.CloseWithError(err)
|
|
}()
|
|
return conn, nil
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
c.connMap.Store(id.String(), rawConn)
|
|
}
|
|
|
|
conn, _ := rawConn.(*Conn)
|
|
conn.timeoutMutex.Lock()
|
|
defer conn.timeoutMutex.Unlock()
|
|
if conn.timeout != nil {
|
|
conn.timeout.Stop()
|
|
}
|
|
conn.locks.Inc()
|
|
return conn, func() {
|
|
conn.timeoutMutex.Lock()
|
|
defer conn.timeoutMutex.Unlock()
|
|
if conn.timeout != nil {
|
|
conn.timeout.Stop()
|
|
}
|
|
conn.locks.Dec()
|
|
if conn.locks.Load() == 0 {
|
|
conn.timeout = time.AfterFunc(c.inactiveTimeout, conn.timeoutCancel)
|
|
}
|
|
}, nil
|
|
}
|
|
|
|
func (c *Cache) Close() error {
|
|
c.closeMutex.Lock()
|
|
defer c.closeMutex.Unlock()
|
|
select {
|
|
case <-c.closed:
|
|
return nil
|
|
default:
|
|
}
|
|
close(c.closed)
|
|
c.closeGroup.Wait()
|
|
return nil
|
|
}
|