Skip to content

Commit

Permalink
Updated librdkafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Silviu Caragea committed Jun 4, 2019
1 parent 973cc46 commit aaa9e2c
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 36 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
### Changelog:

##### v1.1.7

- Based on librdkafka v1.0.1
- Removed configs: `queuing_strategy`, `offset_store_method`, `reconnect_backoff_jitter_ms`
- Added new configs: `reconnect_backoff_ms`, `reconnect_backoff_max_ms`, `max_poll_interval_ms`, `enable_idempotence`, `enable_gapless_guarantee`
- `get_stats` decodes json to maps instead of proplists

##### v1.1.6

- Fixed memory leaks on the consumer
Expand Down
39 changes: 20 additions & 19 deletions CONFIGURATION.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fi

REPO=https://github.com/edenhill/librdkafka.git
BRANCH=master
REV=849c066b559950b02e37a69256f0cb7b04381d0e
REV=8681f884020e880a4c6cda3cfc672f0669e1f38e

function fail_check
{
Expand Down
13 changes: 6 additions & 7 deletions include/erlkaf.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,23 @@
-type partition_assignment_strategy() :: range |roundrobin.
-type compression_codec() :: none | gzip | snappy | lz4 | inherit.
-type offset_reset() :: smallest | earliest | beginning | largest | latest.
-type offset_store_method() :: file | broker.
-type ip_family() :: any| v4| v6.
-type security_protocol() :: plaintext | ssl | sasl_plaintext | sasl_ssl.
-type overflow_strategy() :: local_disk_queue | block_calling_process | drop_records.
-type queuing_strategy() :: fifo | lifo.
-type partitioner() :: random|consistent|consistent_random|murmur2|murmur2_random.
-type headers() :: undefined | proplists:proplist() | maps:map().

-type topic_option() ::
{request_required_acks, integer()} |
{request_timeout_ms, integer()} |
{message_timeout_ms, integer()} |
{queuing_strategy, queuing_strategy()} |
{compression_codec, compression_codec()} |
{compression_level, integer()} |
{auto_commit_interval_ms, integer()} |
{auto_offset_reset, offset_reset()} |
{partitioner, partitioner()} |
{offset_store_path, binary()} |
{offset_store_sync_interval_ms, integer()} |
{offset_store_method, offset_store_method()}.
{offset_store_sync_interval_ms, integer()}.

-type client_option() ::
{client_id, binary()} |
Expand All @@ -56,7 +52,8 @@
{socket_max_fails, integer()} |
{broker_address_ttl, integer()} |
{broker_address_family, ip_family()} |
{reconnect_backoff_jitter_ms, integer()} |
{reconnect_backoff_ms, integer()} |
{reconnect_backoff_max_ms, integer()} |
{statistics_interval_ms, integer()} |
{log_level, integer()} |
{log_connection_close, boolean()} |
Expand Down Expand Up @@ -86,6 +83,7 @@
{partition_assignment_strategy, partition_assignment_strategy()} |
{heartbeat_interval_ms, integer()} |
{coordinator_query_interval_ms, integer()} |
{max_poll_interval_ms, integer()} |
{auto_commit_interval_ms, integer()} |
{queued_min_messages, integer()} |
{queued_max_messages_kbytes, integer()} |
Expand All @@ -94,8 +92,9 @@
{fetch_max_bytes, integer()} |
{fetch_min_bytes, integer()} |
{fetch_error_backoff_ms, integer()} |
{offset_store_method, offset_store_method()} |
{check_crcs, boolean()} |
{enable_idempotence, boolean()} |
{enable_gapless_guarantee, boolean()} |
{queue_buffering_max_messages, integer()} |
{queue_buffering_max_kbytes, integer()} |
{queue_buffering_max_ms, integer()} |
Expand Down
2 changes: 1 addition & 1 deletion src/erlkaf.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{description, "erlkaf - Erlang Kafka library based on librdkafka"},
{licenses, ["MIT"]},
{links,[{"Github","https://github.com/silviucpp/erlkaf"}]},
{vsn, "1.1.6"},
{vsn, "1.1.7"},
{registered, []},
{applications, [
kernel,
Expand Down
18 changes: 10 additions & 8 deletions src/erlkaf_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ to_librdkafka_topic_config(request_timeout_ms, V) ->
{<<"request.timeout.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(message_timeout_ms, V) ->
{<<"message.timeout.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(queuing_strategy, V) ->
{<<"queuing.strategy">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(partitioner, V) ->
{<<"partitioner">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(compression_codec, V) ->
Expand All @@ -80,8 +78,6 @@ to_librdkafka_topic_config(offset_store_path, V) ->
{<<"offset.store.path">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(offset_store_sync_interval_ms, V) ->
{<<"offset.store.sync.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(offset_store_method, V) ->
{<<"offset.store.method">>, erlkaf_utils:to_binary(V)};
to_librdkafka_topic_config(K, V) ->
throw({error, {options, {K, V}}}).

Expand Down Expand Up @@ -147,8 +143,10 @@ to_librdkafka_config(broker_address_ttl, V) ->
{<<"broker.address.ttl">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(broker_address_family, V) ->
{<<"broker.address.family">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(reconnect_backoff_jitter_ms, V) ->
{<<"reconnect.backoff.jitter.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(reconnect_backoff_ms, V) ->
{<<"reconnect.backoff.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(reconnect_backoff_max_ms, V) ->
{<<"reconnect.backoff.max.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(statistics_interval_ms, V) ->
{<<"statistics.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(log_level, V) ->
Expand Down Expand Up @@ -207,6 +205,8 @@ to_librdkafka_config(heartbeat_interval_ms, V) ->
{<<"heartbeat.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(coordinator_query_interval_ms, V) ->
{<<"coordinator.query.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(max_poll_interval_ms, V) ->
{<<"max.poll.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(auto_commit_interval_ms, V) ->
{<<"auto.commit.interval.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(queued_min_messages, V) ->
Expand All @@ -223,10 +223,12 @@ to_librdkafka_config(fetch_min_bytes, V) ->
{<<"fetch.min.bytes">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(fetch_error_backoff_ms, V) ->
{<<"fetch.error.backoff.ms">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(offset_store_method, V) ->
{<<"offset.store.method">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(check_crcs, V) ->
{<<"check.crcs">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(enable_idempotence, V) ->
{<<"enable.idempotence">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(enable_gapless_guarantee, V) ->
{<<"enable.gapless.guarantee">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(queue_buffering_max_messages, V) ->
{<<"queue.buffering.max.messages">>, erlkaf_utils:to_binary(V)};
to_librdkafka_config(queue_buffering_max_kbytes, V) ->
Expand Down

0 comments on commit aaa9e2c

Please sign in to comment.