Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure sequential execution of connection state callbacks #656

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@

chanCandidate chan Candidate
chanCandidatePair chan *CandidatePair
chanState chan ConnectionState
stateNotifier *connectionStateNotifier

loggerFactory logging.LoggerFactory
log logging.LeveledLogger
Expand Down Expand Up @@ -227,7 +227,6 @@

after()

close(a.chanState)
close(a.chanCandidate)
close(a.chanCandidatePair)
close(a.taskLoopDone)
Expand Down Expand Up @@ -278,7 +277,6 @@

a := &Agent{
chanTask: make(chan task),
chanState: make(chan ConnectionState),
chanCandidate: make(chan Candidate),
chanCandidatePair: make(chan *CandidatePair),
tieBreaker: globalMathRandomGenerator.Uint64(),
Expand Down Expand Up @@ -322,6 +320,7 @@

disableActiveTCP: config.DisableActiveTCP,
}
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}

Check warning on line 323 in agent.go

View check run for this annotation

Codecov / codecov/patch

agent.go#L323

Added line #L323 was not covered by tests

if a.net == nil {
a.net, err = stdnet.NewNet()
Expand Down Expand Up @@ -369,7 +368,6 @@
// Blocking one by the other one causes deadlock.
// Hence, we call handlers from independent Goroutines.
go a.candidatePairRoutine()
go a.connectionStateRoutine()
go a.candidateRoutine()

// Restart is also used to initialize the agent for the first time
Expand Down Expand Up @@ -503,12 +501,7 @@

a.log.Infof("Setting new connection state: %s", newState)
a.connectionState = newState

// Call handler after finishing current task since we may be holding the agent lock
// and the handler may also require it
a.afterRun(func(_ context.Context) {
a.chanState <- newState
})
a.stateNotifier.Enqueue(newState)

Check warning on line 504 in agent.go

View check run for this annotation

Codecov / codecov/patch

agent.go#L504

Added line #L504 was not covered by tests
}
}

Expand Down
34 changes: 31 additions & 3 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package ice

import "sync"

// OnConnectionStateChange sets a handler that is fired when the connection state changes
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
a.onConnectionStateChangeHdlr.Store(f)
Expand Down Expand Up @@ -47,9 +49,35 @@ func (a *Agent) candidatePairRoutine() {
}
}

func (a *Agent) connectionStateRoutine() {
for s := range a.chanState {
go a.onConnectionStateChange(s)
type connectionStateNotifier struct {
sync.Mutex
states []ConnectionState
running bool
NotificationFunc func(ConnectionState)
}

func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
c.Lock()
defer c.Unlock()
c.states = append(c.states, s)
if !c.running {
c.running = true
go c.notify()
}
}

func (c *connectionStateNotifier) notify() {
for {
c.Lock()
if len(c.states) == 0 {
c.running = false
c.Unlock()
return
}
s := c.states[0]
c.states = c.states[1:]
c.Unlock()
c.NotificationFunc(s)
}
}

Expand Down
71 changes: 71 additions & 0 deletions agent_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package ice

import (
"testing"
"time"

"github.com/pion/transport/v3/test"
)

func TestConnectionStateNotifier(t *testing.T) {
t.Run("TestManyUpdates", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan struct{}, 1)
c := &connectionStateNotifier{
NotificationFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
}
// Enqueue all updates upfront to ensure that it
// doesn't block
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionStateNew)
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
<-updates
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
<-done
})
t.Run("TestUpdateOrdering", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan ConnectionState)
c := &connectionStateNotifier{
NotificationFunc: func(cs ConnectionState) {
updates <- cs
},
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
x := <-updates
if x != ConnectionState(i) {
t.Errorf("expected %d got %d", x, i)
}
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionState(i))
}
<-done
})
}
Loading