Skip to content

Commit

Permalink
runner: replace websocket package
Browse files Browse the repository at this point in the history
  • Loading branch information
ucirello committed Nov 6, 2023
1 parent 24dca9a commit 837be2a
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 29 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ require (
github.com/buildkite/terminal-to-html v3.2.0+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/google/go-cmp v0.6.0
github.com/gorilla/websocket v1.5.1
github.com/urfave/cli v1.22.14
nhooyr.io/websocket v1.8.10
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
)
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand All @@ -28,12 +26,12 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
26 changes: 10 additions & 16 deletions run_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
Expand All @@ -24,8 +25,8 @@ import (
"time"

"cirello.io/runner/runner"
"github.com/gorilla/websocket"
cli "github.com/urfave/cli"
"nhooyr.io/websocket"
)

func logs() cli.Command {
Expand All @@ -38,8 +39,8 @@ func logs() cli.Command {
},
},
Action: func(c *cli.Context) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

u := url.URL{Scheme: "ws", Host: c.GlobalString("service-discovery"), Path: "/logs"}
if filter := c.String("filter"); filter != "" {
Expand All @@ -50,17 +51,17 @@ func logs() cli.Command {
log.Printf("connecting to %s", u.String())

follow := func() error {
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
ws, _, err := websocket.Dial(ctx, u.String(), nil)
if err != nil {
return fmt.Errorf("cannot dial to service discovery endpoint: %v s", err)
}
defer ws.Close()
defer ws.CloseNow()

done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := ws.ReadMessage()
_, message, err := ws.Read(ctx)
if err != nil {
log.Println("read:", err)
return
Expand All @@ -77,16 +78,9 @@ func logs() cli.Command {
select {
case <-done:
return nil
case <-interrupt:
case <-ctx.Done():
log.Println("interrupt")
err := ws.WriteMessage(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
)
if err != nil {
log.Println("write close:", err)
return nil
}
ws.Close(websocket.StatusNormalClosure, "")
select {
case <-done:
case <-time.After(time.Second):
Expand All @@ -98,7 +92,7 @@ func logs() cli.Command {
var err error
for {
select {
case <-interrupt:
case <-ctx.Done():
return err
default:
err = follow()
Expand Down
4 changes: 2 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,12 @@ func (r *Runner) startProcess(ctx context.Context, sv *ProcessType, procCount, p

stderrPipe, err := c.StderrPipe()
if err != nil {
fmt.Fprintln(pw, "cannot open stderr pipe", procName, cmd)
fmt.Fprintln(pw, "cannot open stderr pipe", procName, cmd, err)
continue
}
stdoutPipe, err := c.StdoutPipe()
if err != nil {
fmt.Fprintln(pw, "cannot open stdout pipe", procName, cmd)
fmt.Fprintln(pw, "cannot open stdout pipe", procName, cmd, err)
continue
}

Expand Down
9 changes: 4 additions & 5 deletions runner/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

oversight "cirello.io/oversight/easy"
terminal "github.com/buildkite/terminal-to-html"
"github.com/gorilla/websocket"
"nhooyr.io/websocket"
)

func (r *Runner) subscribeLogFwd() <-chan LogMessage {
Expand Down Expand Up @@ -93,13 +93,12 @@ func (r *Runner) serveWeb(ctx context.Context) error {
mode := req.URL.Query().Get("mode")
stream := r.subscribeLogFwd()
defer r.unsubscribeLogFwd(stream)
upgrader := websocket.Upgrader{}
c, err := upgrader.Upgrade(w, req, nil)
c, err := websocket.Accept(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
defer c.CloseNow()
for msg := range stream {
if filter != "" && !strings.Contains(msg.Name, filter) && !strings.Contains(msg.Line, filter) {
continue
Expand All @@ -112,7 +111,7 @@ func (r *Runner) serveWeb(ctx context.Context) error {
log.Println("encode:", err)
break
}
if err = c.WriteMessage(websocket.TextMessage, b); err != nil {
if err = c.Write(req.Context(), websocket.MessageText, b); err != nil {
log.Println("write:", err)
break
}
Expand Down

0 comments on commit 837be2a

Please sign in to comment.