Skip to content

Commit

Permalink
Fix examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Silviu Caragea committed May 6, 2020
1 parent 3b6a402 commit 236afa3
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 19 deletions.
6 changes: 3 additions & 3 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Example of a configuration file (for `sys.config`):
{erlkaf, [

{global_client_options, [
{bootstrap_servers, "broker1.com:9092,broker2.com:9092"},
{bootstrap_servers, <<"broker1.com:9092,broker2.com:9092">>},
]},

{clients, [
Expand Down Expand Up @@ -136,7 +136,7 @@ create_producer() ->
erlkaf:start(),

ProducerConfig = [
{bootstrap_servers, "broker1:9092"},
{bootstrap_servers, <<"broker1:9092">>},
{delivery_report_only_error, false},
{delivery_report_callback, ?MODULE}
],
Expand Down Expand Up @@ -209,7 +209,7 @@ create_consumer() ->
erlkaf:start(),
ClientId = client_consumer,
GroupId = <<"erlkaf_consumer">>,
ClientCfg = [{bootstrap_servers, "broker1:9092"}],
ClientCfg = [{bootstrap_servers, <<"broker1:9092">>}],
TopicConf = [{auto_offset_reset, smallest}],
ok = erlkaf:create_consumer_group(ClientId, GroupId, ?TOPICS, ClientCfg, TopicConf).

Expand Down
8 changes: 7 additions & 1 deletion test/benchmark_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ start(erlkaf) ->
ConsumerConfig = [{bootstrap_servers, <<?BROKER_HOST/binary, ":", (integer_to_binary(?BROKER_PORT))/binary>>}],
TopicConf = [{auto_offset_reset, smallest}],
GroupId = <<"erlkaf_consumer_benchmark">>,
ok = erlkaf:create_consumer_group(client_consumer, GroupId, [?TOPIC], ConsumerConfig, TopicConf, ?MODULE, []);
ConsumerTopics = [
{?TOPIC, [
{callback_module, ?MODULE}
]}
],

ok = erlkaf:create_consumer_group(client_consumer, GroupId, ConsumerTopics, ConsumerConfig, TopicConf);
_ ->
ok
end;
Expand Down
20 changes: 11 additions & 9 deletions test/sys.config
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

{erlkaf, [
{global_client_options, [
{bootstrap_servers, "172.17.33.123:9092,172.17.33.124:9092,172.17.33.125:9092"},
{bootstrap_servers, <<"172.17.33.123:9092,172.17.33.124:9092,172.17.33.125:9092">>},
{socket_keepalive_enable, true},
{queue_buffering_overflow_strategy, local_disk_queue}
]},
Expand All @@ -51,17 +51,19 @@
{type, consumer},

{group_id, <<"erlkaf_consumer">>},
{callback_module, test_consumer},
{callback_args, []},
{topics, [<<"benchmark">>]},
{topic_options, [
{auto_offset_reset, smallest},
{dispatch_mode, one_by_one}
{topics, [
{<<"benchmark">>, [
{callback_module, test_consumer},
{callback_args, []},
{dispatch_mode, one_by_one}
]}
]},

{client_options, [
{topic_options, [
{auto_offset_reset, smallest}
]},

]}
{client_options, []}
]}
]}
]}
Expand Down
7 changes: 6 additions & 1 deletion test/test_consume_produce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ create(BootstrapServers) ->
GroupId = atom_to_binary(?MODULE, latin1),
ConsumerConfig = append_bootstrap(BootstrapServers, []),
TopicConf = [{auto_offset_reset, smallest}],
ConsumerTopics = [
{?TOPIC, [
{callback_module, ?MODULE}
]}
],

ok = erlkaf:create_consumer_group(client_consumer, GroupId, [?TOPIC], ConsumerConfig, TopicConf, ?MODULE, []),
ok = erlkaf:create_consumer_group(client_consumer, GroupId, ConsumerTopics, ConsumerConfig, TopicConf),

% create producer

Expand Down
13 changes: 9 additions & 4 deletions test/test_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

-include("erlkaf.hrl").

-define(TOPICS, [<<"benchmark">>]).
-define(TOPICS, [
{<<"benchmark">>, [
{callback_module, ?MODULE},
{callback_args, []}, % default [] (you can skip it)
{dispatch_mode, one_by_one} % default one_by_one (you can skip it)
]}
]).

-export([
create_consumer/0,
Expand All @@ -20,14 +26,13 @@ create_consumer() ->
GroupId = <<"erlkaf_consumer">>,

ClientConfig = [
{bootstrap_servers, "172.17.3.163:9092"}
{bootstrap_servers, <<"172.17.33.123:9092">>}
],

TopicConf = [
{auto_offset_reset, smallest}
],

ok = erlkaf:create_consumer_group(client_consumer, GroupId, ?TOPICS, ClientConfig, TopicConf, ?MODULE, []).
ok = erlkaf:create_consumer_group(client_consumer, GroupId, ?TOPICS, ClientConfig, TopicConf).

init(Topic, Partition, Offset, Args) ->
io:format("init topic: ~p partition: ~p offset: ~p args: ~p ~n", [
Expand Down
2 changes: 1 addition & 1 deletion test/test_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ create_producer() ->
erlkaf:start(),

ProducerConfig = [
{bootstrap_servers, "172.17.3.163:9092"},
{bootstrap_servers, <<"172.17.33.123:9092">>},
{delivery_report_only_error, true},
{delivery_report_callback, ?MODULE}
],
Expand Down

0 comments on commit 236afa3

Please sign in to comment.