diff --git a/benches/bench_goworker/main.go b/benches/bench_goworker/main.go deleted file mode 100644 index faf41356..00000000 --- a/benches/bench_goworker/main.go +++ /dev/null @@ -1,122 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/benmanns/goworker" - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" -) - -func myJob(queue string, args ...interface{}) error { - atomic.AddInt64(&totcount, 1) - //fmt.Println("job! ", queue) - return nil -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -// go run *.go -queues="myqueue,myqueue2,myqueue3,myqueue4,myqueue5" -namespace="bench_test:" -concurrency=50 -use-nuber -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - goworker.Register("MyClass", myJob) - - go monitor() - - // Blocks until process is told to exit via unix signal - goworker.Work() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for range t { - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - conn := pool.Get() - defer conn.Close() - - for i := 0; i < count; i++ { - //workers.Enqueue(queue, "Foo", []int{i}) - conn.Do("RPUSH", "bench_test:queue:"+queue, `{"class":"MyClass","args":[]}`) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_goworkers/main.go b/benches/bench_goworkers/main.go deleted file mode 100644 index 6e51df7c..00000000 --- a/benches/bench_goworkers/main.go +++ /dev/null @@ -1,129 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" - "github.com/jrallison/go-workers" -) - -func myJob(m *workers.Msg) { - atomic.AddInt64(&totcount, 1) -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - workers.Configure(map[string]string{ - // location of redis instance - "server": "localhost:6379", - // instance of the database - "database": "0", - // number of connections to keep open with redis - "pool": "10", - // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) - "process": "1", - "namespace": namespace, - }) - workers.Middleware = &workers.Middlewares{} - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - for _, q := range queues { - workers.Process(q, myJob, 10) - } - - go monitor() - - // Blocks until process is told to exit via unix signal - workers.Run() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for range t { - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - } - - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - for i := 0; i < count; i++ { - workers.Enqueue(queue, "Foo", []int{i}) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_jobs/main.go b/benches/bench_jobs/main.go deleted file mode 100644 index 7676348c..00000000 --- a/benches/bench_jobs/main.go +++ /dev/null @@ -1,138 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/albrow/jobs" - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" -) - -var namespace = "jobs" -var pool = newPool(":6379") - -func epsilonHandler(i int) error { - atomic.AddInt64(&totcount, 1) - return nil -} - -func main() { - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - cleanKeyspace() - - queueNames := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - queues := []*jobs.Type{} - - for _, qn := range queueNames { - q, err := jobs.RegisterType(qn, 3, epsilonHandler) - if err != nil { - panic(err) - } - queues = append(queues, q) - } - - job := stream.NewJob("enqueue_all") - - numJobs := 40000 / len(queues) - for _, q := range queues { - for i := 0; i < numJobs; i++ { - _, err := q.Schedule(100, time.Now(), i) - if err != nil { - panic(err) - } - } - } - - job.Complete(health.Success) - - go monitor() - - job = stream.NewJob("run_all") - pool, err := jobs.NewPool(&jobs.PoolConfig{ - // NumWorkers: 1000, - // BatchSize: 3000, - }) - if err != nil { - panic(err) - } - defer func() { - pool.Close() - if err := pool.Wait(); err != nil { - panic(err) - } - }() - if err := pool.Start(); err != nil { - panic(err) - } - job.Complete(health.Success) - select {} -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for range t { - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/go.mod b/go.mod index a16ce294..58fe3633 100644 --- a/go.mod +++ b/go.mod @@ -3,25 +3,11 @@ module github.com/gocraft/work go 1.14 require ( - github.com/albrow/jobs v0.4.2 - github.com/benmanns/goworker v0.1.3 - github.com/bitly/go-simplejson v0.5.0 // indirect - github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd - github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect - github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 // indirect - github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect - github.com/garyburd/redigo v1.6.0 // indirect github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect - github.com/gomodule/redigo v2.0.0+incompatible - github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb - github.com/kr/pretty v0.2.0 // indirect - github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect + github.com/gomodule/redigo v1.8.2 + github.com/google/uuid v1.1.2 github.com/robfig/cron/v3 v3.0.1 - github.com/stretchr/testify v1.5.1 - github.com/youtube/vitess v2.1.1+incompatible // indirect - golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + github.com/stretchr/testify v1.6.1 ) diff --git a/go.sum b/go.sum index 5bfe9d86..679a2dfa 100644 --- a/go.sum +++ b/go.sum @@ -1,42 +1,15 @@ -github.com/albrow/jobs v0.4.2 h1:AhhNNgtnOz3h+Grt6uuRJP+uj/AVq+ZhIBY8Mzkf4TM= -github.com/albrow/jobs v0.4.2/go.mod h1:e4sWh7D1DxPbpxrzJhNo/cMARAljpTYF/osgh2j3+r8= -github.com/benmanns/goworker v0.1.3 h1:ekwn7WiKsn8oUOKfbHDqsA6g5bXz/uEZ9AdnKgtAECY= -github.com/benmanns/goworker v0.1.3/go.mod h1:Gj3m7lTyCswE3+Kta7c79CMOmm5rHJmj2qh/GAmojJ4= -github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= -github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd h1:ePesaBzdTmoMQjwqRCLP2jY+jjWMBpwws/LEQdt1fMM= github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd/go.mod h1:TNehV1AhBwtT7Bd+rh8G6MoGDbBLNs/sKdk3nvr4Yzg= -github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= -github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY= -github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 h1:RAV05c0xOkJ3dZGS0JFybxFKZ2WMLabgx3uXnd7rpGs= -github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= -github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7VGI1HksxDJqSjaGED3cSw9GeSI98= github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= -github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb h1:y9LFhCM3gwK94Xz9/h7GcSVLteky9pFHEkP04AqQupA= -github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb/go.mod h1:ziQRRNHCWZe0wVNzF8y8kCWpso0VMpqHJjB19DSenbE= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7qkZ3nf2NPqy4BMzlCmnQzIEbI1vuqKb2FkQ= -github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= +github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= @@ -45,15 +18,11 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/youtube/vitess v2.1.1+incompatible h1:SE+P7DNX/jw5RHFs5CHRhZQjq402EJFCD33JhzQMdDw= -github.com/youtube/vitess v2.1.1+incompatible/go.mod h1:hpMim5/30F1r+0P8GGtB29d0gWHr0IZ5unS+CG0zMx8= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=