Skip to content

Commit

Permalink
Add keepalive pings for websocket proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Aug 20, 2023
1 parent 3e317e2 commit 3ce0250
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
59 changes: 53 additions & 6 deletions cmd/bbctl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ const defaultReconnectBackoff = 2 * time.Second
const maxReconnectBackoff = 2 * time.Minute
const reconnectBackoffReset = 5 * time.Minute

func runAppserviceWebsocket(ctx *cli.Context, as *appservice.AppService) {
func runAppserviceWebsocket(ctx context.Context, doneCallback func(), as *appservice.AppService) {
defer doneCallback()
reconnectBackoff := defaultReconnectBackoff
lastDisconnect := time.Now()
for {
Expand Down Expand Up @@ -190,6 +191,49 @@ func prepareAppserviceWebsocketProxy(ctx *cli.Context, as *appservice.AppService
_ = as.SetHomeserverURL(GetEnvConfig(ctx).HungryAddress)
}

type wsPingData struct {
Timestamp int64 `json:"timestamp"`
}

func keepaliveAppserviceWebsocket(ctx context.Context, doneCallback func(), as *appservice.AppService) {
log := as.Log.With().Str("component", "websocket pinger").Logger()
defer doneCallback()
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-ctx.Done():
return
}
if !as.HasWebsocket() {
log.Debug().Msg("Not pinging: websocket not connected")
continue
}
var resp wsPingData
start := time.Now()
err := as.RequestWebsocket(ctx, &appservice.WebsocketRequest{
Command: "ping",
Data: &wsPingData{Timestamp: time.Now().UnixMilli()},
}, &resp)
if ctx.Err() != nil {
return
}
duration := time.Since(start)
if err != nil {
log.Warn().Err(err).Dur("duration", duration).Msg("Websocket ping returned error")
as.StopWebsocket(fmt.Errorf("websocket ping returned error in %s: %w", duration, err))
} else {
serverTs := time.UnixMilli(resp.Timestamp)
log.Debug().
Dur("duration", duration).
Dur("req_duration", serverTs.Sub(start)).
Dur("resp_duration", time.Since(serverTs)).
Msg("Websocket ping returned success")
}
}
}

func proxyAppserviceWebsocket(ctx *cli.Context) error {
regPath := ctx.String("registration")
reg, err := appservice.LoadRegistration(regPath)
Expand All @@ -207,14 +251,17 @@ func proxyAppserviceWebsocket(ctx *cli.Context) error {

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

wsCtx, cancel := context.WithCancel(ctx.Context)
var wg sync.WaitGroup
wg.Add(1)
go func() {
runAppserviceWebsocket(ctx, as)
wg.Done()
}()
wg.Add(2)
go runAppserviceWebsocket(wsCtx, wg.Done, as)
go keepaliveAppserviceWebsocket(wsCtx, wg.Done, as)

<-c

fmt.Println()
cancel()
as.Log.Info().Msg("Interrupt received, stopping...")
as.StopWebsocket(appservice.ErrWebsocketManualStop)
wg.Wait()
Expand Down
17 changes: 12 additions & 5 deletions cmd/bbctl/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,23 @@ func runBridge(ctx *cli.Context) error {
}
var as *appservice.AppService
var wg sync.WaitGroup
var cancelWS context.CancelFunc
wsProxyClosed := make(chan struct{})
if needsWebsocketProxy {
wg.Add(1)
wg.Add(2)
log.Printf("Starting websocket proxy")
as = appservice.Create()
as.Registration = cfg.Registration
as.HomeserverDomain = "beeper.local"
prepareAppserviceWebsocketProxy(ctx, as)
go func() {
runAppserviceWebsocket(ctx, as)
close(wsProxyClosed)
var wsCtx context.Context
wsCtx, cancelWS = context.WithCancel(ctx.Context)
defer cancelWS()
go runAppserviceWebsocket(wsCtx, func() {
wg.Done()
}()
close(wsProxyClosed)
}, as)
go keepaliveAppserviceWebsocket(wsCtx, wg.Done, as)
}

log.Printf("Starting [cyan]%s[reset]", cfg.BridgeType)
Expand Down Expand Up @@ -271,6 +275,9 @@ func runBridge(ctx *cli.Context) error {
if as != nil && as.StopWebsocket != nil {
as.StopWebsocket(appservice.ErrWebsocketManualStop)
}
if cancelWS != nil {
cancelWS()
}
if err != nil {
return err
}
Expand Down

0 comments on commit 3ce0250

Please sign in to comment.