Skip to content

Commit

Permalink
Revert "Cleanly close agent goroutines"
Browse files Browse the repository at this point in the history
This reverts commit 822108b.
  • Loading branch information
Sean-Der committed Jul 15, 2024
1 parent a4fdbb3 commit 32a077e
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 58 deletions.
9 changes: 3 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

userBindingRequestHandler: config.BindingRequestHandler,
}
a.connectionStateNotifier = &handlerNotifier{connectionStateFunc: a.onConnectionStateChange, done: make(chan struct{})}
a.candidateNotifier = &handlerNotifier{candidateFunc: a.onCandidate, done: make(chan struct{})}
a.selectedCandidatePairNotifier = &handlerNotifier{candidatePairFunc: a.onSelectedCandidatePairChange, done: make(chan struct{})}
a.connectionStateNotifier = &handlerNotifier{connectionStateFunc: a.onConnectionStateChange}
a.candidateNotifier = &handlerNotifier{candidateFunc: a.onCandidate}
a.selectedCandidatePairNotifier = &handlerNotifier{candidatePairFunc: a.onSelectedCandidatePairChange}

if a.net == nil {
a.net, err = stdnet.NewNet()
Expand Down Expand Up @@ -931,9 +931,6 @@ func (a *Agent) Close() error {

close(a.done)
<-a.taskLoopDone
a.connectionStateNotifier.Close()
a.candidateNotifier.Close()
a.selectedCandidatePairNotifier.Close()
return nil
}

Expand Down
45 changes: 1 addition & 44 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func (a *Agent) onConnectionStateChange(s ConnectionState) {

type handlerNotifier struct {
sync.Mutex
running bool
notifiers sync.WaitGroup
running bool

connectionStates []ConnectionState
connectionStateFunc func(ConnectionState)
Expand All @@ -56,38 +55,13 @@ type handlerNotifier struct {

selectedCandidatePairs []*CandidatePair
candidatePairFunc func(*CandidatePair)

// State for closing
done chan struct{}
}

func (h *handlerNotifier) Close() {
h.Lock()

select {
case <-h.done:
h.Unlock()
return
default:
}
close(h.done)
h.Unlock()

h.notifiers.Wait()
}

func (h *handlerNotifier) EnqueueConnectionState(s ConnectionState) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.connectionStates) == 0 {
Expand All @@ -105,7 +79,6 @@ func (h *handlerNotifier) EnqueueConnectionState(s ConnectionState) {
h.connectionStates = append(h.connectionStates, s)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
Expand All @@ -114,14 +87,7 @@ func (h *handlerNotifier) EnqueueCandidate(c Candidate) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.candidates) == 0 {
Expand All @@ -139,7 +105,6 @@ func (h *handlerNotifier) EnqueueCandidate(c Candidate) {
h.candidates = append(h.candidates, c)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
Expand All @@ -148,14 +113,7 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(p *CandidatePair) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.selectedCandidatePairs) == 0 {
Expand All @@ -173,7 +131,6 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(p *CandidatePair) {
h.selectedCandidatePairs = append(h.selectedCandidatePairs, p)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
4 changes: 0 additions & 4 deletions agent_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestConnectionStateNotifier(t *testing.T) {
connectionStateFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
done: make(chan struct{}),
}
// Enqueue all updates upfront to ensure that it
// doesn't block
Expand All @@ -39,7 +38,6 @@ func TestConnectionStateNotifier(t *testing.T) {
close(done)
}()
<-done
c.Close()
})
t.Run("TestUpdateOrdering", func(t *testing.T) {
report := test.CheckRoutines(t)
Expand All @@ -49,7 +47,6 @@ func TestConnectionStateNotifier(t *testing.T) {
connectionStateFunc: func(cs ConnectionState) {
updates <- cs
},
done: make(chan struct{}),
}
done := make(chan struct{})
go func() {
Expand All @@ -70,6 +67,5 @@ func TestConnectionStateNotifier(t *testing.T) {
c.EnqueueConnectionState(ConnectionState(i))
}
<-done
c.Close()
})
}
5 changes: 1 addition & 4 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,12 +1423,11 @@ func TestCloseInConnectionStateCallback(t *testing.T) {

isClosed := make(chan interface{})
isConnected := make(chan interface{})
connectionStateConnectedSeen := make(chan interface{})
err = aAgent.OnConnectionStateChange(func(c ConnectionState) {
switch c {
case ConnectionStateConnected:
<-isConnected
close(connectionStateConnectedSeen)
assert.NoError(t, aAgent.Close())
case ConnectionStateClosed:
close(isClosed)
default:
Expand All @@ -1440,8 +1439,6 @@ func TestCloseInConnectionStateCallback(t *testing.T) {

connect(aAgent, bAgent)
close(isConnected)
<-connectionStateConnectedSeen
require.NoError(t, aAgent.Close())

<-isClosed
assert.NoError(t, bAgent.Close())
Expand Down

0 comments on commit 32a077e

Please sign in to comment.