feat: Add speedtest command for tailnet (#3874)

This commit is contained in:
Kyle Carberry
2022-09-05 17:15:49 -05:00
committed by GitHub
parent 38825b9ab4
commit 1254e7a902
9 changed files with 199 additions and 8 deletions

View File

@ -29,6 +29,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
gossh "golang.org/x/crypto/ssh" gossh "golang.org/x/crypto/ssh"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"cdr.dev/slog" "cdr.dev/slog"
@ -58,6 +59,7 @@ var (
tailnetIP = netip.MustParseAddr("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4") tailnetIP = netip.MustParseAddr("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4")
tailnetSSHPort = 1 tailnetSSHPort = 1
tailnetReconnectingPTYPort = 2 tailnetReconnectingPTYPort = 2
tailnetSpeedtestPort = 3
) )
type Options struct { type Options struct {
@ -256,6 +258,23 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
go a.handleReconnectingPTY(ctx, msg, conn) go a.handleReconnectingPTY(ctx, msg, conn)
} }
}() }()
speedtestListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetSpeedtestPort))
if err != nil {
a.logger.Critical(ctx, "listen for speedtest", slog.Error(err))
return
}
go func() {
for {
conn, err := speedtestListener.Accept()
if err != nil {
a.logger.Debug(ctx, "speedtest listener failed", slog.Error(err))
return
}
go func() {
_ = speedtest.ServeConn(conn)
}()
}
}()
} }
// runCoordinator listens for nodes and updates the self-node as it changes. // runCoordinator listens for nodes and updates the self-node as it changes.

View File

@ -19,6 +19,7 @@ import (
"time" "time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
scp "github.com/bramvdbogaerde/go-scp" scp "github.com/bramvdbogaerde/go-scp"
@ -547,6 +548,21 @@ func TestAgent(t *testing.T) {
return err == nil return err == nil
}, testutil.WaitMedium, testutil.IntervalFast) }, testutil.WaitMedium, testutil.IntervalFast)
}) })
t.Run("Speedtest", func(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("The minimum duration for a speedtest is hardcoded in Tailscale to 5s!")
}
derpMap := tailnettest.RunDERPAndSTUN(t)
conn, _ := setupAgent(t, agent.Metadata{
DERPMap: derpMap,
}, 0)
defer conn.Close()
res, err := conn.Speedtest(speedtest.Upload, speedtest.MinDuration)
require.NoError(t, err)
t.Logf("%.2f MBits/s", res[len(res)-1].MBitsPerSecond())
})
} }
func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd { func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd {

View File

@ -16,6 +16,7 @@ import (
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn/ipnstate"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"github.com/coder/coder/peer" "github.com/coder/coder/peer"
@ -39,6 +40,7 @@ type Conn interface {
CloseWithError(err error) error CloseWithError(err error) error
ReconnectingPTY(id string, height, width uint16, command string) (net.Conn, error) ReconnectingPTY(id string, height, width uint16, command string) (net.Conn, error)
SSH() (net.Conn, error) SSH() (net.Conn, error)
Speedtest(direction speedtest.Direction, duration time.Duration) ([]speedtest.Result, error)
SSHClient() (*ssh.Client, error) SSHClient() (*ssh.Client, error)
DialContext(ctx context.Context, network string, addr string) (net.Conn, error) DialContext(ctx context.Context, network string, addr string) (net.Conn, error)
} }
@ -77,6 +79,10 @@ func (c *WebRTCConn) SSH() (net.Conn, error) {
return channel.NetConn(), nil return channel.NetConn(), nil
} }
func (*WebRTCConn) Speedtest(_ speedtest.Direction, _ time.Duration) ([]speedtest.Result, error) {
return nil, xerrors.New("not implemented")
}
// SSHClient calls SSH to create a client that uses a weak cipher // SSHClient calls SSH to create a client that uses a weak cipher
// for high throughput. // for high throughput.
func (c *WebRTCConn) SSHClient() (*ssh.Client, error) { func (c *WebRTCConn) SSHClient() (*ssh.Client, error) {
@ -227,6 +233,18 @@ func (c *TailnetConn) SSHClient() (*ssh.Client, error) {
return ssh.NewClient(sshConn, channels, requests), nil return ssh.NewClient(sshConn, channels, requests), nil
} }
func (c *TailnetConn) Speedtest(direction speedtest.Direction, duration time.Duration) ([]speedtest.Result, error) {
speedConn, err := c.DialContextTCP(context.Background(), netip.AddrPortFrom(tailnetIP, uint16(tailnetSpeedtestPort)))
if err != nil {
return nil, xerrors.Errorf("dial speedtest: %w", err)
}
results, err := speedtest.RunClientWithConn(direction, duration, speedConn)
if err != nil {
return nil, xerrors.Errorf("run speedtest: %w", err)
}
return results, err
}
func (c *TailnetConn) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) { func (c *TailnetConn) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
_, rawPort, _ := net.SplitHostPort(addr) _, rawPort, _ := net.SplitHostPort(addr)
port, _ := strconv.Atoi(rawPort) port, _ := strconv.Atoi(rawPort)

View File

@ -78,6 +78,7 @@ func Core() []*cobra.Command {
schedules(), schedules(),
show(), show(),
ssh(), ssh(),
speedtest(),
start(), start(),
state(), state(),
stop(), stop(),

91
cli/speedtest.go Normal file
View File

@ -0,0 +1,91 @@
package cli
import (
"context"
"fmt"
"time"
"cdr.dev/slog"
"github.com/coder/coder/cli/cliflag"
"github.com/coder/coder/cli/cliui"
"github.com/coder/coder/codersdk"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
"golang.org/x/xerrors"
tsspeedtest "tailscale.com/net/speedtest"
)
func speedtest() *cobra.Command {
var (
reverse bool
timeStr string
)
cmd := &cobra.Command{
Annotations: workspaceCommand,
Use: "speedtest <workspace>",
Short: "Run a speed test from your machine to the workspace.",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
dur, err := time.ParseDuration(timeStr)
if err != nil {
return err
}
client, err := CreateClient(cmd)
if err != nil {
return xerrors.Errorf("create codersdk client: %w", err)
}
workspace, workspaceAgent, err := getWorkspaceAndAgent(ctx, cmd, client, codersdk.Me, args[0], false)
if err != nil {
return err
}
err = cliui.Agent(ctx, cmd.ErrOrStderr(), cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
})
if err != nil {
return xerrors.Errorf("await agent: %w", err)
}
conn, err := client.DialWorkspaceAgentTailnet(ctx, slog.Logger{}, workspaceAgent.ID)
if err != nil {
return err
}
defer conn.Close()
_, _ = conn.Ping()
dir := tsspeedtest.Download
if reverse {
dir = tsspeedtest.Upload
}
cmd.Printf("Starting a %ds %s test...\n", int(dur.Seconds()), dir)
results, err := conn.Speedtest(dir, dur)
if err != nil {
return err
}
tableWriter := cliui.Table()
tableWriter.AppendHeader(table.Row{"Interval", "Transfer", "Bandwidth"})
for _, r := range results {
if r.Total {
tableWriter.AppendSeparator()
}
tableWriter.AppendRow(table.Row{
fmt.Sprintf("%.2f-%.2f sec", r.IntervalStart.Seconds(), r.IntervalEnd.Seconds()),
fmt.Sprintf("%.4f MBits", r.MegaBits()),
fmt.Sprintf("%.4f Mbits/sec", r.MBitsPerSecond()),
})
}
_, err = fmt.Fprintln(cmd.OutOrStdout(), tableWriter.Render())
return err
},
}
cliflag.BoolVarP(cmd.Flags(), &reverse, "reverse", "r", "", false,
"Specifies whether to run in reverse mode where the client receives and the server sends.")
cliflag.StringVarP(cmd.Flags(), &timeStr, "time", "t", "", "5s",
"Specifies the duration to monitor traffic.")
return cmd
}

46
cli/speedtest_test.go Normal file
View File

@ -0,0 +1,46 @@
package cli_test
import (
"context"
"testing"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/agent"
"github.com/coder/coder/cli/clitest"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/pty/ptytest"
"github.com/coder/coder/testutil"
"github.com/stretchr/testify/assert"
)
func TestSpeedtest(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("This test takes a minimum of 5ms per a hardcoded value in Tailscale!")
}
client, workspace, agentToken := setupWorkspaceForAgent(t)
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
WebRTCDialer: agentClient.ListenWorkspaceAgent,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer agentCloser.Close()
coderdtest.AwaitWorkspaceAgents(t, client, workspace.LatestBuild.ID)
cmd, root := clitest.New(t, "speedtest", workspace.Name)
clitest.SetupConfig(t, client, root)
pty := ptytest.New(t)
cmd.SetOut(pty.Output())
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
cmdDone := tGo(t, func() {
err := cmd.ExecuteContext(ctx)
assert.NoError(t, err)
})
<-cmdDone
}

View File

@ -31,7 +31,7 @@ import (
"github.com/coder/coder/testutil" "github.com/coder/coder/testutil"
) )
func setupWorkspaceForSSH(t *testing.T) (*codersdk.Client, codersdk.Workspace, string) { func setupWorkspaceForAgent(t *testing.T) (*codersdk.Client, codersdk.Workspace, string) {
t.Helper() t.Helper()
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
user := coderdtest.CreateFirstUser(t, client) user := coderdtest.CreateFirstUser(t, client)
@ -69,7 +69,7 @@ func TestSSH(t *testing.T) {
t.Run("ImmediateExit", func(t *testing.T) { t.Run("ImmediateExit", func(t *testing.T) {
t.Parallel() t.Parallel()
client, workspace, agentToken := setupWorkspaceForSSH(t) client, workspace, agentToken := setupWorkspaceForAgent(t)
cmd, root := clitest.New(t, "ssh", workspace.Name) cmd, root := clitest.New(t, "ssh", workspace.Name)
clitest.SetupConfig(t, client, root) clitest.SetupConfig(t, client, root)
pty := ptytest.New(t) pty := ptytest.New(t)
@ -104,7 +104,7 @@ func TestSSH(t *testing.T) {
}) })
t.Run("Stdio", func(t *testing.T) { t.Run("Stdio", func(t *testing.T) {
t.Parallel() t.Parallel()
client, workspace, agentToken := setupWorkspaceForSSH(t) client, workspace, agentToken := setupWorkspaceForAgent(t)
_, _ = tGoContext(t, func(ctx context.Context) { _, _ = tGoContext(t, func(ctx context.Context) {
// Run this async so the SSH command has to wait for // Run this async so the SSH command has to wait for
// the build and agent to connect! // the build and agent to connect!
@ -175,7 +175,7 @@ func TestSSH(t *testing.T) {
t.Parallel() t.Parallel()
client, workspace, agentToken := setupWorkspaceForSSH(t) client, workspace, agentToken := setupWorkspaceForAgent(t)
agentClient := codersdk.New(client.URL) agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken agentClient.SessionToken = agentToken

4
go.mod
View File

@ -49,7 +49,7 @@ replace github.com/tcnksm/go-httpstat => github.com/kylecarbs/go-httpstat v0.0.0
// There are a few minor changes we make to Tailscale that we're slowly upstreaming. Compare here: // There are a few minor changes we make to Tailscale that we're slowly upstreaming. Compare here:
// https://github.com/tailscale/tailscale/compare/main...coder:tailscale:main // https://github.com/tailscale/tailscale/compare/main...coder:tailscale:main
replace tailscale.com => github.com/coder/tailscale v1.1.1-0.20220902164407-ae46caa65076 replace tailscale.com => github.com/coder/tailscale v1.1.1-0.20220905194158-291661887d25
require ( require (
cdr.dev/slog v1.4.2-0.20220525200111-18dce5c2cd5f cdr.dev/slog v1.4.2-0.20220525200111-18dce5c2cd5f
@ -157,7 +157,7 @@ require (
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
nhooyr.io/websocket v1.8.7 nhooyr.io/websocket v1.8.7
storj.io/drpc v0.0.33-0.20220622181519-9206537a4db7 storj.io/drpc v0.0.33-0.20220622181519-9206537a4db7
tailscale.com v1.26.2 tailscale.com v1.30.0
) )
require ( require (

4
go.sum
View File

@ -352,8 +352,8 @@ github.com/coder/glog v1.0.1-0.20220322161911-7365fe7f2cd1 h1:UqBrPWSYvRI2s5RtOu
github.com/coder/glog v1.0.1-0.20220322161911-7365fe7f2cd1/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/coder/glog v1.0.1-0.20220322161911-7365fe7f2cd1/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/coder/retry v1.3.0 h1:5lAAwt/2Cm6lVmnfBY7sOMXcBOwcwJhmV5QGSELIVWY= github.com/coder/retry v1.3.0 h1:5lAAwt/2Cm6lVmnfBY7sOMXcBOwcwJhmV5QGSELIVWY=
github.com/coder/retry v1.3.0/go.mod h1:tXuRgZgWjUnU5LZPT4lJh4ew2elUhexhlnXzrJWdyFY= github.com/coder/retry v1.3.0/go.mod h1:tXuRgZgWjUnU5LZPT4lJh4ew2elUhexhlnXzrJWdyFY=
github.com/coder/tailscale v1.1.1-0.20220902164407-ae46caa65076 h1:PITEtBolloXfTMGSkL1hQSPBMT4+YJFUgjRQl5osB5k= github.com/coder/tailscale v1.1.1-0.20220905194158-291661887d25 h1:XOloZLgDkAmVBVYXSQBLY+a/Vd2c+dWRBMKNJMWSAWo=
github.com/coder/tailscale v1.1.1-0.20220902164407-ae46caa65076/go.mod h1:MO+tWkQp2YIF3KBnnej/mQvgYccRS5Xk/IrEpZ4Z3BU= github.com/coder/tailscale v1.1.1-0.20220905194158-291661887d25/go.mod h1:MO+tWkQp2YIF3KBnnej/mQvgYccRS5Xk/IrEpZ4Z3BU=
github.com/coder/wireguard-go/tun/netstack v0.0.0-20220823170024-a78136eb0cab h1:9yEvRWXXfyKzXu8AqywCi+tFZAoqCy4wVcsXwuvZNMc= github.com/coder/wireguard-go/tun/netstack v0.0.0-20220823170024-a78136eb0cab h1:9yEvRWXXfyKzXu8AqywCi+tFZAoqCy4wVcsXwuvZNMc=
github.com/coder/wireguard-go/tun/netstack v0.0.0-20220823170024-a78136eb0cab/go.mod h1:TCJ66NtXh3urJotTdoYQOHHkyE899vOQl5TuF+WLSes= github.com/coder/wireguard-go/tun/netstack v0.0.0-20220823170024-a78136eb0cab/go.mod h1:TCJ66NtXh3urJotTdoYQOHHkyE899vOQl5TuF+WLSes=
github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE= github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE=