Skip to content

Commit

Permalink
Merge pull request #36 from HeavyHorst/sqs
Browse files Browse the repository at this point in the history
sqs: empty buffer on Stop
  • Loading branch information
matryer authored Aug 10, 2017
2 parents 21ba7ec + 17facbc commit 888b717
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ before_install:
- wget https://github.com/nats-io/nats-streaming-server/releases/download/v0.5.0/nats-streaming-server-v0.5.0-linux-amd64.zip
- unzip -d gnatsd -j nats-streaming-server-v0.5.0-linux-amd64.zip
- ./gnatsd/nats-streaming-server &
# give the queues some time to start.
- sleep 5

before_script:
- go vet ./...
Expand All @@ -27,4 +29,4 @@ services:
- redis

script:
- go test -v -race ./...
- go test -v -timeout 20s -race ./...
9 changes: 9 additions & 0 deletions queues/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

// Transport is a vice.Transport for Amazon's SQS
type Transport struct {
wg *sync.WaitGroup

sm sync.Mutex
sendChans map[string]chan []byte

Expand All @@ -31,6 +33,7 @@ type Transport struct {
// New returns a new transport
func New() *Transport {
return &Transport{
wg: &sync.WaitGroup{},
sendChans: make(map[string]chan []byte),
receiveChans: make(map[string]chan []byte),
errChan: make(chan error, 10),
Expand Down Expand Up @@ -161,10 +164,15 @@ func (t *Transport) makePublisher(name string) (chan []byte, error) {

ch := make(chan []byte, 1024)

t.wg.Add(1)
go func() {
defer t.wg.Done()
for {
select {
case <-t.stopPubChan:
if len(ch) != 0 {
continue
}
return

case msg := <-ch:
Expand Down Expand Up @@ -194,6 +202,7 @@ func (t *Transport) ErrChan() <-chan error {
func (t *Transport) Stop() {
close(t.stopSubChan)
close(t.stopPubChan)
t.wg.Wait()
close(t.stopchan)
}

Expand Down

0 comments on commit 888b717

Please sign in to comment.