diff --git a/cloudevents-server/go.mod b/cloudevents-server/go.mod index 18b65b21..1adddf16 100644 --- a/cloudevents-server/go.mod +++ b/cloudevents-server/go.mod @@ -17,9 +17,10 @@ require ( ) require ( - github.com/bytedance/sonic/loader v0.1.1 // indirect - github.com/cloudwego/base64x v0.1.4 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect ) require ( @@ -35,8 +36,11 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blendle/zapdriver v1.3.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful v2.15.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect @@ -72,6 +76,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -85,13 +90,14 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.11.1 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/segmentio/kafka-go v0.4.47 github.com/shopspring/decimal v1.2.0 // indirect github.com/spf13/cast v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -99,18 +105,18 @@ require ( github.com/ugorji/go/codec v1.2.12 // indirect github.com/zclconf/go-cty v1.8.0 // indirect go.opencensus.io v0.24.0 // indirect - go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.9.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/mod v0.15.0 // indirect - golang.org/x/net v0.25.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.15.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/term v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect diff --git a/cloudevents-server/go.sum b/cloudevents-server/go.sum index ad513166..c043400c 100644 --- a/cloudevents-server/go.sum +++ b/cloudevents-server/go.sum @@ -77,8 +77,6 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -317,6 +315,9 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -389,6 +390,9 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -430,6 +434,8 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= @@ -470,6 +476,12 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/vmihailenco/msgpack/v4 v4.3.12/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -488,16 +500,16 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= @@ -511,8 +523,9 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -548,8 +561,9 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8= -golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -594,8 +608,11 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -623,8 +640,9 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -684,15 +702,20 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= -golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= -golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -702,9 +725,13 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -762,8 +789,9 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= -golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/cloudevents-server/handlers.go b/cloudevents-server/handlers.go index f897a1d4..3ed70b45 100644 --- a/cloudevents-server/handlers.go +++ b/cloudevents-server/handlers.go @@ -29,13 +29,12 @@ func newEventsHandlerFunc(cfg *config.Config) gin.HandlerFunc { log.Fatal().Err(err).Msg("Failed to create protocol") } - handler, err := newCloudEventsHandler(cfg) + handler, err := handler.NewEventProducer(cfg.Kafka) if err != nil { - log.Fatal().Err(err).Msg("failed to create cloudevents handler") + log.Fatal().Err(err).Msg("failed to create broker handler") } - log.Debug().Any("types", handler.SupportEventTypes()).Msgf("registered event handlers") - h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle) + h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.HandleCloudEvent) if err != nil { log.Fatal().Err(err).Msg("failed to create handler") } diff --git a/cloudevents-server/main.go b/cloudevents-server/main.go index e22bbc80..4e970943 100644 --- a/cloudevents-server/main.go +++ b/cloudevents-server/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "net/http" @@ -9,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler" ) func main() { @@ -44,8 +46,21 @@ func main() { _ = r.SetTrustedProxies(nil) setRouters(r, cfg) - log.Info().Str("address", serveAddr).Msg("server started.") + hd, err := newCloudEventsHandler(cfg) + if err != nil { + log.Fatal().Err(err).Msg("failed to create cloudevents handler") + } + log.Debug().Any("types", hd.SupportEventTypes()).Msgf("registered event handlers") + + cg, err := handler.NewEventConsumerGroup(cfg.Kafka, hd) + if err != nil { + log.Fatal().Err(err).Msg("failed to create consumer group") + } + defer cg.Close() + go cg.Start(context.Background()) + + log.Info().Str("address", serveAddr).Msg("server started.") if err := http.ListenAndServe(serveAddr, r); err != nil { log.Fatal().Err(err).Send() } diff --git a/cloudevents-server/pkg/config/config.go b/cloudevents-server/pkg/config/config.go index 723dd9c8..dc940983 100644 --- a/cloudevents-server/pkg/config/config.go +++ b/cloudevents-server/pkg/config/config.go @@ -8,6 +8,8 @@ import ( "regexp" "gopkg.in/yaml.v3" + + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/kafka" ) type Store struct { @@ -50,14 +52,25 @@ type Tekton struct { FailedStepTailLines int `yaml:"failed_step_tail_lines,omitempty" json:"failed_step_tail_lines,omitempty"` } +type Kafka struct { + Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty"` + ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"` + Authentication kafka.Authentication `yaml:"authentication,omitempty" json:"authentication,omitempty"` + Producer kafka.Producer `yaml:"producer,omitempty" json:"producer,omitempty"` + Consumer kafka.Consumer `yaml:"consumer,omitempty" json:"consumer,omitempty"` +} + +type TiBuild struct { + ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"` + TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"` +} + type Config struct { Store Store `yaml:"store,omitempty" json:"store,omitempty"` Lark LarkBotApp `yaml:"lark,omitempty" json:"lark,omitempty"` Tekton Tekton `yaml:"tekton,omitempty" json:"tekton,omitempty"` - TiBuild struct { - ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"` - TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"` - } `yaml:"tibuild,omitempty" json:"tibuild,omitempty"` + TiBuild TiBuild `yaml:"tibuild,omitempty" json:"tibuild,omitempty"` + Kafka Kafka `yaml:"kafka,omitempty" json:"kafka,omitempty"` } func (c *Config) LoadFromFile(file string) error { diff --git a/cloudevents-server/pkg/events/handler/kafka.go b/cloudevents-server/pkg/events/handler/kafka.go new file mode 100644 index 00000000..7983a394 --- /dev/null +++ b/cloudevents-server/pkg/events/handler/kafka.go @@ -0,0 +1,172 @@ +package handler + +import ( + "context" + "os" + "os/signal" + "sync" + "syscall" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/rs/zerolog/log" + kafka "github.com/segmentio/kafka-go" + + "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config" + skakfa "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/kafka" +) + +func NewEventProducer(cfg config.Kafka) (*EventProducer, error) { + writer, err := skakfa.NewWriter(cfg.Authentication, cfg.Brokers, "", cfg.ClientID) + if err != nil { + return nil, err + } + + return &EventProducer{ + writer: writer, + unknowEventTopic: cfg.Producer.DefaultTopic, + topicMapping: cfg.Producer.TopicMapping, + }, nil +} + +func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler) (*EventConsumer, error) { + reader, err := skakfa.NewReader(cfg.Authentication, cfg.Brokers, topic, cfg.Consumer.GroupID, cfg.ClientID) + if err != nil { + return nil, err + } + + return &EventConsumer{ + reader: reader, + handler: hander, + }, nil +} + +func NewEventConsumerGroup(cfg config.Kafka, hander EventHandler) (EventConsumerGroup, error) { + consumerGroup := make(EventConsumerGroup) + for _, topic := range cfg.Consumer.TopicMapping { + if consumerGroup[topic] != nil { + continue + } + consumer, err := NewEventConsumer(cfg, topic, hander) + if err != nil { + return nil, err + } + + consumerGroup[topic] = consumer + } + + return consumerGroup, nil +} + +// EventProducer is the main structure for our event broker +type EventProducer struct { + writer *kafka.Writer + unknowEventTopic string + topicMapping map[string]string // Map event type to Kafka topic +} + +func (eb *EventProducer) HandleCloudEvent(ctx context.Context, event cloudevents.Event) cloudevents.Result { + eventType := event.Type() + topic, ok := eb.topicMapping[eventType] + + // Use default topic if not found in mapping + if !ok { + log.Debug().Str("event-type", eventType).Msg("No topic found for event type, using default topic") + topic = eb.unknowEventTopic + } + + cloudEventBytes, err := event.MarshalJSON() + if err != nil { + log.Err(err).Msg("error marshalling Cloud Event") + return cloudevents.ResultNACK + } + + message := kafka.Message{ + Topic: topic, + Key: []byte(event.ID()), + Value: cloudEventBytes, + } + + err = eb.writer.WriteMessages(ctx, message) + if err != nil { + log.Err(err).Str("topic", topic).Str("ce-id", event.ID()).Msg("error writing message to Kafka") + return err + } + + log.Debug().Str("topic", topic).Str("ce-id", event.ID()).Msg("message written to Kafka") + return cloudevents.ResultACK +} + +type EventConsumerGroup map[string]*EventConsumer + +func (ecs EventConsumerGroup) Close() { + for _, ec := range ecs { + if ec != nil { + ec.Close() + } + } +} + +func (ecs EventConsumerGroup) Start(ctx context.Context) { + wg := new(sync.WaitGroup) + for _, ec := range ecs { + if ec != nil { + wg.Add(1) + go func(c *EventConsumer) { + c.Start(ctx) + wg.Done() + }(ec) + } + } + wg.Wait() +} + +type EventConsumer struct { + reader *kafka.Reader + writer *kafka.Writer // used for ack and put into dead letter queue. + handler EventHandler + faultTopic string // dead letter topic +} + +// consumer workers +func (ec *EventConsumer) Start(ctx context.Context) error { + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + + defer ec.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-sigterm: + // When SIGTERM received, try to flush remaining messages + // and exit gracefully + return nil + default: + m, err := ec.reader.ReadMessage(ctx) + if err != nil { + return err + } + + var event cloudevents.Event + err = event.UnmarshalJSON(m.Value) + if err != nil { + return err + } + + result := ec.handler.Handle(event) + if !cloudevents.IsACK(result) { + ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value}) + } + } + } +} + +func (ec *EventConsumer) Close() { + if ec.reader != nil { + ec.reader.Close() + } + if ec.writer != nil { + ec.writer.Close() + } +} diff --git a/cloudevents-server/pkg/kafka/auth.go b/cloudevents-server/pkg/kafka/auth.go new file mode 100644 index 00000000..220b0a82 --- /dev/null +++ b/cloudevents-server/pkg/kafka/auth.go @@ -0,0 +1,100 @@ +package kafka + +import ( + "crypto/tls" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" +) + +type Authentication struct { + // Mechanism is the name of the SASL mechanism. + // Possible values: "", "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" (defaults to ""). + Mechanism string `yaml:"mechanism,omitempty" json:"mechanism,omitempty"` + // User is the authentication username for SASL/* authentication + User string `yaml:"user,omitempty" json:"user,omitempty"` + // User is the authentication password for SASL/* authentication + Password string `yaml:"password,omitempty" json:"password,omitempty"` +} + +// GetMechanism returns a SASL mechanism based on the provided mechanism type, username, and password. +// +// Parameters: +// - mechanismType: a string representing the type of SASL mechanism to use. +// - username: a string representing the username for authentication. +// - password: a string representing the password for authentication. +// +// Returns: +// - sasl.Mechanism: a SASL mechanism that can be used for authentication. +// - error: an error if the mechanism type is not supported or if there is an issue creating the mechanism. +func GetMechanism(auth Authentication) (sasl.Mechanism, error) { + switch auth.Mechanism { + case "PLAIN": + return plain.Mechanism{Username: auth.User, Password: auth.Password}, nil + case scram.SHA256.Name(): + return scram.Mechanism(scram.SHA256, auth.User, auth.Password) + case scram.SHA512.Name(): + return scram.Mechanism(scram.SHA512, auth.User, auth.Password) + } + + return nil, nil +} + +// NewDialer creates a new kafka dialer with the specified mechanism and client ID. +// +// The mechanism parameter specifies the SASL mechanism to use for authentication. +// The clientID parameter specifies the client ID to use for the dialer. +// Returns a pointer to a kafka.Dialer and an error. +func NewDialer(mechanism sasl.Mechanism, clientID string) (*kafka.Dialer, error) { + dialer := &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + SASLMechanism: mechanism, + ClientID: clientID, + } + if mechanism != nil && mechanism.Name() != "PLAIN" { + dialer.TLS = &tls.Config{MinVersion: tls.VersionTLS12} + } + + return dialer, nil +} + +// NewReaderConfig creates a new kafka reader configuration. +// +// Parameters: +// - brokers: a slice of strings representing the list of kafka brokers. +// - topics: a slice of strings representing the list of kafka topics. +// - dialer: a kafka.Dialer used for connection. +// +// Returns: +// - kafka.ReaderConfig: a kafka reader configuration. +func NewReaderConfig(brokers, topics []string, dialer *kafka.Dialer) kafka.ReaderConfig { + ret := kafka.ReaderConfig{ + Brokers: brokers, + Dialer: dialer, + } + + if len(topics) == 1 { + ret.Topic = topics[0] + } + if len(topics) > 1 { + ret.GroupTopics = topics + } + + return ret +} + +// NewWriterConfig returns a kafka.WriterConfig with the specified brokers, topic, and dialer. +// +// brokers is a list of Kafka brokers to connect to, topic is the Kafka topic to write to, and dialer is the dialer to use for connections. +// kafka.WriterConfig +func NewWriterConfig(brokers []string, topic string, dialer *kafka.Dialer) kafka.WriterConfig { + return kafka.WriterConfig{ + Brokers: brokers, + Dialer: dialer, + Topic: topic, + } +} diff --git a/cloudevents-server/pkg/kafka/consumer.go b/cloudevents-server/pkg/kafka/consumer.go new file mode 100644 index 00000000..c5abb711 --- /dev/null +++ b/cloudevents-server/pkg/kafka/consumer.go @@ -0,0 +1,26 @@ +package kafka + +import ( + "github.com/rs/zerolog/log" + kafka "github.com/segmentio/kafka-go" +) + +func NewReader(auth Authentication, brokers []string, topic, consumerGroupID, clientID string) (*kafka.Reader, error) { + mechanism, err := GetMechanism(auth) + if err != nil { + return nil, err + } + dialer, err := NewDialer(mechanism, clientID) + if err != nil { + return nil, err + } + + readerConfig := NewReaderConfig(brokers, []string{topic}, dialer) + readerConfig.GroupID = consumerGroupID + readerConfig.Logger = kafka.LoggerFunc(log.Printf) + readerConfig.ErrorLogger = kafka.LoggerFunc(func(msg string, keysAndValues ...interface{}) { + log.Error().Msgf(msg, keysAndValues...) + }) + + return kafka.NewReader(readerConfig), nil +} diff --git a/cloudevents-server/pkg/kafka/producer.go b/cloudevents-server/pkg/kafka/producer.go new file mode 100644 index 00000000..394447b1 --- /dev/null +++ b/cloudevents-server/pkg/kafka/producer.go @@ -0,0 +1,35 @@ +package kafka + +import ( + "github.com/rs/zerolog/log" + kafka "github.com/segmentio/kafka-go" +) + +func NewWriter(auth Authentication, brokers []string, topic, clientID string) (*kafka.Writer, error) { + mechanism, err := GetMechanism(auth) + if err != nil { + return nil, err + } + dialer, err := NewDialer(mechanism, clientID) + if err != nil { + return nil, err + } + + writeConfig := kafka.WriterConfig{ + Brokers: brokers, + Async: true, + Logger: kafka.LoggerFunc(log.Printf), + ErrorLogger: kafka.LoggerFunc(func(msg string, keysAndValues ...interface{}) { + log.Error().Msgf(msg, keysAndValues...) + }), + Balancer: &kafka.Hash{}, + Dialer: dialer, + Topic: topic, + } + + writer := kafka.NewWriter(writeConfig) + // create topic if not exist + writer.AllowAutoTopicCreation = true + + return writer, nil +} diff --git a/cloudevents-server/pkg/kafka/types.go b/cloudevents-server/pkg/kafka/types.go new file mode 100644 index 00000000..859a1833 --- /dev/null +++ b/cloudevents-server/pkg/kafka/types.go @@ -0,0 +1,11 @@ +package kafka + +type Producer struct { + TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic. + DefaultTopic string `yaml:"default_topic,omitempty" json:"default_topic,omitempty"` +} + +type Consumer struct { + GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` + TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic. +}