Skip to content

Commit

Permalink
Added Concurrency option
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Aug 18, 2023
1 parent 3db3066 commit 0444906
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 57 deletions.
3 changes: 3 additions & 0 deletions examples/xload/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ require (
)

require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 12 additions & 1 deletion examples/xload/go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/gotidy/ptr v1.4.0 h1:7++suUs+HNHMnyz6/AW3SE+4EnBhupPSQTSI7QNijVc=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
77 changes: 26 additions & 51 deletions xload/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@ package xload
import (
"context"
"reflect"
"sync"
)

// LoadAsync loads values concurrently by calling Loader in parallel.
// Number of goroutines can be controlled with xload.Concurrency.
func LoadAsync(ctx context.Context, v any, opts ...Option) error {
o := newOptions(opts...)

return processAsync(ctx, v, o.tagName, o.loader)
}
"github.com/sourcegraph/conc/pool"
)

//nolint:funlen,nestif
func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) error {
func processAsync(ctx context.Context, obj any, o *options, loader Loader) error {
v := reflect.ValueOf(obj)

if v.Kind() != reflect.Ptr {
Expand All @@ -29,9 +22,7 @@ func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) er

typ := value.Type()

var wg sync.WaitGroup

errors := make([]error, 0)
p := pool.New().WithErrors().WithMaxGoroutines(o.concurrency)

for i := 0; i < typ.NumField(); i++ {
fTyp := typ.Field(i)
Expand All @@ -42,7 +33,7 @@ func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) er
continue
}

tag := fTyp.Tag.Get(tagKey)
tag := fTyp.Tag.Get(o.tagName)

if tag == "-" {
continue
Expand Down Expand Up @@ -88,36 +79,32 @@ func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) er
// if the struct has a key, load it
// and set the value to the struct
if meta.name != "" && hasDecoder(fVal) {
loadAndSet := func(original reflect.Value, fVal reflect.Value, isNilStructPtr bool) {
defer wg.Done()

loadAndSet := func(original reflect.Value, fVal reflect.Value, isNilStructPtr bool) error {
val, err := loader.Load(ctx, meta.name)
if err != nil {
errors = append(errors, err)

return
return err
}

if val == "" && meta.required {
errors = append(errors, ErrRequired)

return
return ErrRequired
}

if ok, err := decode(fVal, val); ok {
if err != nil {
errors = append(errors, err)

return
return err
}

setNilStructPtr(original, fVal, isNilStructPtr)
}

return nil
}

wg.Add(1)
original := value.Field(i)

go loadAndSet(value.Field(i), fVal, isNilStructPtr)
p.Go(func() error {
return loadAndSet(original, fVal, isNilStructPtr)
})

continue
}
Expand All @@ -127,7 +114,7 @@ func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) er
pld = PrefixLoader(meta.prefix, loader)
}

err := processAsync(ctx, fVal.Interface(), tagKey, pld)
err := processAsync(ctx, fVal.Interface(), o, pld)
if err != nil {
return err
}
Expand All @@ -141,42 +128,30 @@ func processAsync(ctx context.Context, obj any, tagKey string, loader Loader) er
return ErrInvalidPrefix
}

loadAndSet := func(fVal reflect.Value) {
defer wg.Done()

loadAndSet := func(fVal reflect.Value) error {
// lookup value
val, err := loader.Load(ctx, meta.name)
if err != nil {
errors = append(errors, err)

return
return err
}

if val == "" && meta.required {
errors = append(errors, ErrRequired)

return
return ErrRequired
}

// set value
err = setVal(fVal, val, meta)
if err != nil {
errors = append(errors, err)

return
return err
}
}

wg.Add(1)

go loadAndSet(fVal)
}

wg.Wait()
return nil
}

if len(errors) > 0 {
return errors[0]
p.Go(func() error {
return loadAndSet(fVal)
})
}

return nil
return p.Wait()
}
3 changes: 3 additions & 0 deletions xload/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ go 1.20

require (
github.com/gotidy/ptr v1.4.0
github.com/sourcegraph/conc v0.3.0
github.com/spf13/cast v1.5.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 10 additions & 1 deletion xload/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
Expand All @@ -9,11 +10,19 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 4 additions & 0 deletions xload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const (
func Load(ctx context.Context, v any, opts ...Option) error {
o := newOptions(opts...)

if o.concurrency > 1 {
return processAsync(ctx, v, o, o.loader)
}

return process(ctx, v, o.tagName, o.loader)
}

Expand Down
17 changes: 14 additions & 3 deletions xload/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,9 +674,7 @@ func runTestcases(t *testing.T, testcases []testcase) {
for _, tc := range testcases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

t.Run("Load_"+tc.name, func(t *testing.T) {
err := Load(context.Background(), tc.input, WithLoader(tc.loader))
if tc.err != nil {
assert.Error(t, err)
Expand All @@ -688,5 +686,18 @@ func runTestcases(t *testing.T, testcases []testcase) {
require.NoError(t, err)
assert.EqualValues(t, tc.want, tc.input)
})

t.Run("LoadAsync_"+tc.name, func(t *testing.T) {
err := Load(context.Background(), tc.input, Concurrency(5), WithLoader(tc.loader))
if tc.err != nil {
assert.Error(t, err)
assert.ErrorContains(t, err, tc.err.Error())

return
}

require.NoError(t, err)
assert.EqualValues(t, tc.want, tc.input)
})
}
}
3 changes: 3 additions & 0 deletions xload/providers/yaml/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ require github.com/stretchr/testify v1.8.4
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
)
11 changes: 10 additions & 1 deletion xload/providers/yaml/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
Expand All @@ -8,11 +9,19 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit 0444906

Please sign in to comment.