From aaa9e2c57096dd79170736048bb9ebce9f7cf8c7 Mon Sep 17 00:00:00 2001 From: Silviu Caragea Date: Tue, 4 Jun 2019 12:29:04 +0300 Subject: [PATCH] Updated librdkafka --- CHANGELOG.md | 7 +++++++ CONFIGURATION.md | 39 ++++++++++++++++++++------------------- build_deps.sh | 2 +- include/erlkaf.hrl | 13 ++++++------- src/erlkaf.app.src | 2 +- src/erlkaf_config.erl | 18 ++++++++++-------- 6 files changed, 45 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90c4970..65a2f2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ### Changelog: +##### v1.1.7 + +- Based on librdkafka v1.0.1 +- Removed configs: `queuing_strategy`, `offset_store_method`, `reconnect_backoff_jitter_ms` +- Added new configs: `reconnect_backoff_ms`, `reconnect_backoff_max_ms`, `max_poll_interval_ms`, `enable_idempotence`, `enable_gapless_guarantee` +- `get_stats` decodes json to maps instead of proplists + ##### v1.1.6 - Fixed memory leaks on the consumer diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 3e4e359..ac7d9c4 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -4,16 +4,16 @@ C/P legend: C = Consumer, P = Producer, * = both Property | C/P | Range | Default | Description -----------------------------------------|-----|--------------------|--------------:|-------------------------- -debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, all | undefined | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch +debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all | undefined | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch client_id | * | | rdkafka | Client identifier bootstrap_servers | * | | | Initial list of brokers host:port separated by comma message_max_bytes | * | 1000 .. 1000000000 | 1000000 | Maximum transmit message size. message_copy_max_bytes | * | 0 .. 1000000000 | 65535 | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs -receive_message_max_bytes | * | 1000 .. 2147483647 | 100000000 | Maximum receive message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. The value should be at least `fetch_message_max_bytes` * number of partitions consumed from + messaging overhead (e.g. 200000 bytes) -max_in_flight | * | 1 .. 1000000 | 1000000 | Maximum number of in-flight requests the client will send. This setting applies per broker connection +receive_message_max_bytes | * | 1000 .. 2147483647 | 100000000 | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch_max_bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. +max_in_flight | * | 1 .. 1000000 | 1000000 | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. metadata_request_timeout_ms | * | 10 .. 900000 | 60000 | Non-topic request timeout in milliseconds. This is for metadata requests topic_metadata_refresh_interval_ms | * | -1 .. 3600000 | 300000 | Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh -metadata_max_age_ms | * | 1 .. 86400000 | -1 | Metadata cache max age. Defaults to `topic_metadata_refresh_interval_ms` * 3 +metadata_max_age_ms | * | 1 .. 86400000 | 900000 | Metadata cache max age. Defaults to `topic_metadata_refresh_interval_ms` * 3 topic_metadata_refresh_fast_interval_ms | * | 1 .. 60000 | 250 | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers topic_metadata_refresh_sparse | * | true, false | true | Sparse metadata requests (consumes less network bandwidth) topic_blacklist | * | | | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist @@ -25,14 +25,15 @@ socket_nagle_disable | * | true, false | fa socket_max_fails | * | 0 .. 1000000 | 1 | Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. NOTE: The connection is automatically re-established broker_address_ttl | * | 0 .. 86400000 | 1000 | How long to cache the broker address resolving results (milliseconds) broker_address_family | * | any, v4, v6 | any | Allowed broker IP address families: any, v4, v6 -reconnect_backoff_jitter_ms | * | 0 .. 3600000 | 500 | Throttle broker reconnection attempts by this value +-50% +reconnect_backoff_ms | * | 0 .. 3600000 | 100 | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. +reconnect_backoff_max_ms | * | 0 .. 3600000 | 10000 | The maximum time to wait before reconnecting to a broker after the connection has been closed. statistics_interval_ms | * | 0 .. 86400000 | 0 | Statistics emit interval. The application also needs to register a stats callback using `stats_callback` config. The granularity is 1000ms. A value of 0 disables statistics. stats_callback | * | module or fun/2 | undefined| A callback where stats are sent log_level | * | 0 .. 7 | 6 | Logging level (syslog(3) levels) log_connection_close | * | true, false | true | Log broker disconnects. It might be useful to turn this off when interacting with 0.9 brokers with an aggressive `connection_max_idle_ms` value. -api_version_request | * | true, false | false | Request broker's supported API versions to adjust functionality to available protocol features. If set to false, or the ApiVersionRequest fails, the fallback version `broker_version_fallback` will be used. **NOTE**: Depends on broker version >=0.10.0. If the request is not supported by (an older) broker the `broker_version_fallback` fallback is used -api_version_fallback_ms | * | 0 .. 604800000 | 1200000 | Dictates how long the `broker_version_fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade) -broker_version_fallback | * | | 0.9.0 | Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see `api_version_request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api_version_fallback_ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. +api_version_request | * | true, false | false | Request broker's supported API versions to adjust functionality to available protocol features. If set to false, or the ApiVersionRequest fails, the fallback version `broker_version_fallback` will be used. **NOTE**: Depends on broker version >=0.10.0. If the request is not supported by (an older) broker the `broker_version_fallback` fallback is used +api_version_fallback_ms | * | 0 .. 604800000 | 0 | Dictates how long the `broker_version_fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade) +broker_version_fallback | * | | 0.10.0 | Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see `api_version_request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api_version_fallback_ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | Protocol used to communicate with brokers ssl_cipher_suites | * | | | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3) ssl_curves_list | * | | | The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required. @@ -52,10 +53,11 @@ sasl_kerberos_keytab | * | | sasl_kerberos_min_time_before_relogin | * | 1 .. 86400000 | 60000 | Minimum time in milliseconds between key refresh attempts. sasl_username | * | | | SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms sasl_password | * | | | SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism -partition_assignment_strategy | * | | range, roundrobin | Name of partition assignment strategy to use when elected group leader assigns partitions to group members -session_timeout_ms | * | 1 .. 3600000 | 30000 | Client group session and failure detection timeout -heartbeat_interval_ms | * | 1 .. 3600000 | 1000 | Group session keepalive heartbeat interval -coordinator_query_interval_ms | * | 1 .. 3600000 | 600000 | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment +partition_assignment_strategy | C | | range, roundrobin | Name of partition assignment strategy to use when elected group leader assigns partitions to group members +session_timeout_ms | C | 1 .. 3600000 | 30000 | Client group session and failure detection timeout +heartbeat_interval_ms | C | 1 .. 3600000 | 1000 | Group session keepalive heartbeat interval +coordinator_query_interval_ms | C | 1 .. 3600000 | 600000 | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment +max_poll_interval_ms | C | 1 .. 86400000 | 300000 | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. The interval is checked two times per second. See KIP-62 for more information. auto_commit_interval_ms | C | 0 .. 86400000 | 5000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). This setting is used by the high-level consumer queued_min_messages | C | 1 .. 10000000 | 100000 | Minimum number of messages per topic+partition in the local consumer queue queued_max_messages_kbytes | C | 1 .. 1000000000 | 1048576 | Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by `fetch_message_max_bytes` @@ -64,15 +66,16 @@ fetch_message_max_bytes | C | 1 .. 1000000000 | 1048576 fetch_max_bytes | C | 0 .. 2147483135 | 52428800 | Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message_max_bytes` (broker config) or `max_message_bytes` (broker topic config). `fetch_max_bytes` is automatically adjusted upwards to be at least `message_max_bytes` (consumer config). fetch_min_bytes | C | 1 .. 100000000 | 1 | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting fetch_error_backoff_ms | C | 0 .. 300000 | 500 | How long to postpone the next fetch request for a topic+partition in case of a fetch error -offset_store_method | C | none, file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker) check_crcs | C | true, false | false | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage -queue_buffering_max_messages | P | 1 .. 10000000 | 100000 | Maximum number of messages allowed on the producer queue +enable_idempotence | P | true, false | false | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max_in_flight_requests_per_connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing_strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. +enable_gapless_guarantee | P | true, false | false | When set to `true`, any error that could result in a gap in the produced message series when a batch of messages fails, will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop the producer. Requires `enable_idempotence=true`. +queue_buffering_max_messages | P | 1 .. 10000000 | 100000 | Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. queue_buffering_max_kbytes | P | 1 .. 2097151 | 1048576 | Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than `queue_buffering_max_messages`. queue_buffering_max_ms | P | 0 .. 900000 | 0 | Maximum time, in milliseconds, for buffering data on the producer queue queue_buffering_overflow_strategy | P | local_disk_queue, block_calling_process, drop_records | local_disk_queue| What strategy to pick in case the memory queue is full: write messages on a local disk queue and send them in kafka when the in memory queue have enough space, block calling process until memory queue has enough space or drop the messages message_send_max_retries | P | 0 .. 10000000 | 2 | How many times to retry sending a failing MessageSet. **Note:** retrying may cause reordering retry_backoff_ms | P | 1 .. 300000 | 100 | The backoff time in milliseconds before retrying a message send -compression_codec | P | none, gzip, snappy, lz4 | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property `compression_codec` +compression_codec | P | none, gzip, snappy, lz4, zstd | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property `compression_codec` batch_num_messages | P | 1 .. 1000000 | 10000 | Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by `message_max_bytes` delivery_report_only_error | P | true, false | false | Only provide delivery reports for failed messages delivery_report_callback | P | module or fun/2 | undefined| A callback where delivery reports are sent (`erlkaf_producer_callbacks` behaviour) @@ -83,16 +86,14 @@ C/P legend: C = Consumer, P = Producer, * = both Property | C/P | Range | Default | Description -----------------------------------------|-----|-----------------|--------------:|-------------------------- -request_required_acks | P | -1 .. 1000 | 1 | This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *1*=Only the leader broker will need to ack the message, *-1* or *all*=broker will block until message is committed by all in sync replicas (ISRs) or broker's `in.sync.replicas` setting before sending response. +request_required_acks | P | -1 .. 1000 | -1 | This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *1*=Only the leader broker will need to ack the message, *-1* or *all*=broker will block until message is committed by all in sync replicas (ISRs) or broker's `in.sync.replicas` setting before sending response. request_timeout_ms | P | 1 .. 900000 | 5000 | The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request_required_acks` being != 0 message_timeout_ms | P | 0 .. 900000 | 300000 | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite -queuing_strategy | P | fifo, lifo | fifo | Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages. WARNING: `lifo` is experimental and subject to change or removal. -compression_codec | P | none, gzip, snappy, lz4, inherit | inherit | Compression codec to use for compressing message sets +compression_codec | P | none, gzip, snappy, lz4, inherit, zstd | inherit | Compression codec to use for compressing message sets compression_level | P | -1 .. 12 | -1 | Compression level parameter for algorithm selected by configuration property `compression_codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level. auto_commit_interval_ms | C | 10 .. 86400000 | 60000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. This setting is used by the low-level legacy consumer auto_offset_reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error. partitioner | P | random,consistent,consistent_random, murmur2, murmur2_random | consistent_random | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.). offset_store_path | C | | . | Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition offset_store_sync_interval_ms | C | -1 .. 86400000 | -1 | fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write -offset_store_method | C | file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.) dispatch_mode | C | one_by_one, {batch, integer()} | one_by_one | This field indicates how messages are dispatched to the message handler. One message per callback call (default) or multiple messages up to the specified size (batch). If the batch approach is used the consumer will mark as processed the last offset form the batch. diff --git a/build_deps.sh b/build_deps.sh index 4579f14..f11ab75 100755 --- a/build_deps.sh +++ b/build_deps.sh @@ -10,7 +10,7 @@ fi REPO=https://github.com/edenhill/librdkafka.git BRANCH=master -REV=849c066b559950b02e37a69256f0cb7b04381d0e +REV=8681f884020e880a4c6cda3cfc672f0669e1f38e function fail_check { diff --git a/include/erlkaf.hrl b/include/erlkaf.hrl index 0747262..d4b4ba5 100644 --- a/include/erlkaf.hrl +++ b/include/erlkaf.hrl @@ -13,11 +13,9 @@ -type partition_assignment_strategy() :: range |roundrobin. -type compression_codec() :: none | gzip | snappy | lz4 | inherit. -type offset_reset() :: smallest | earliest | beginning | largest | latest. --type offset_store_method() :: file | broker. -type ip_family() :: any| v4| v6. -type security_protocol() :: plaintext | ssl | sasl_plaintext | sasl_ssl. -type overflow_strategy() :: local_disk_queue | block_calling_process | drop_records. --type queuing_strategy() :: fifo | lifo. -type partitioner() :: random|consistent|consistent_random|murmur2|murmur2_random. -type headers() :: undefined | proplists:proplist() | maps:map(). @@ -25,15 +23,13 @@ {request_required_acks, integer()} | {request_timeout_ms, integer()} | {message_timeout_ms, integer()} | - {queuing_strategy, queuing_strategy()} | {compression_codec, compression_codec()} | {compression_level, integer()} | {auto_commit_interval_ms, integer()} | {auto_offset_reset, offset_reset()} | {partitioner, partitioner()} | {offset_store_path, binary()} | - {offset_store_sync_interval_ms, integer()} | - {offset_store_method, offset_store_method()}. + {offset_store_sync_interval_ms, integer()}. -type client_option() :: {client_id, binary()} | @@ -56,7 +52,8 @@ {socket_max_fails, integer()} | {broker_address_ttl, integer()} | {broker_address_family, ip_family()} | - {reconnect_backoff_jitter_ms, integer()} | + {reconnect_backoff_ms, integer()} | + {reconnect_backoff_max_ms, integer()} | {statistics_interval_ms, integer()} | {log_level, integer()} | {log_connection_close, boolean()} | @@ -86,6 +83,7 @@ {partition_assignment_strategy, partition_assignment_strategy()} | {heartbeat_interval_ms, integer()} | {coordinator_query_interval_ms, integer()} | + {max_poll_interval_ms, integer()} | {auto_commit_interval_ms, integer()} | {queued_min_messages, integer()} | {queued_max_messages_kbytes, integer()} | @@ -94,8 +92,9 @@ {fetch_max_bytes, integer()} | {fetch_min_bytes, integer()} | {fetch_error_backoff_ms, integer()} | - {offset_store_method, offset_store_method()} | {check_crcs, boolean()} | + {enable_idempotence, boolean()} | + {enable_gapless_guarantee, boolean()} | {queue_buffering_max_messages, integer()} | {queue_buffering_max_kbytes, integer()} | {queue_buffering_max_ms, integer()} | diff --git a/src/erlkaf.app.src b/src/erlkaf.app.src index 117c773..2f5d8eb 100644 --- a/src/erlkaf.app.src +++ b/src/erlkaf.app.src @@ -2,7 +2,7 @@ {description, "erlkaf - Erlang Kafka library based on librdkafka"}, {licenses, ["MIT"]}, {links,[{"Github","https://github.com/silviucpp/erlkaf"}]}, - {vsn, "1.1.6"}, + {vsn, "1.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/src/erlkaf_config.erl b/src/erlkaf_config.erl index 5cb14ca..81ecce0 100644 --- a/src/erlkaf_config.erl +++ b/src/erlkaf_config.erl @@ -64,8 +64,6 @@ to_librdkafka_topic_config(request_timeout_ms, V) -> {<<"request.timeout.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(message_timeout_ms, V) -> {<<"message.timeout.ms">>, erlkaf_utils:to_binary(V)}; -to_librdkafka_topic_config(queuing_strategy, V) -> - {<<"queuing.strategy">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(partitioner, V) -> {<<"partitioner">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(compression_codec, V) -> @@ -80,8 +78,6 @@ to_librdkafka_topic_config(offset_store_path, V) -> {<<"offset.store.path">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(offset_store_sync_interval_ms, V) -> {<<"offset.store.sync.interval.ms">>, erlkaf_utils:to_binary(V)}; -to_librdkafka_topic_config(offset_store_method, V) -> - {<<"offset.store.method">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(K, V) -> throw({error, {options, {K, V}}}). @@ -147,8 +143,10 @@ to_librdkafka_config(broker_address_ttl, V) -> {<<"broker.address.ttl">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(broker_address_family, V) -> {<<"broker.address.family">>, erlkaf_utils:to_binary(V)}; -to_librdkafka_config(reconnect_backoff_jitter_ms, V) -> - {<<"reconnect.backoff.jitter.ms">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(reconnect_backoff_ms, V) -> + {<<"reconnect.backoff.ms">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(reconnect_backoff_max_ms, V) -> + {<<"reconnect.backoff.max.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(statistics_interval_ms, V) -> {<<"statistics.interval.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(log_level, V) -> @@ -207,6 +205,8 @@ to_librdkafka_config(heartbeat_interval_ms, V) -> {<<"heartbeat.interval.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(coordinator_query_interval_ms, V) -> {<<"coordinator.query.interval.ms">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(max_poll_interval_ms, V) -> + {<<"max.poll.interval.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(auto_commit_interval_ms, V) -> {<<"auto.commit.interval.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(queued_min_messages, V) -> @@ -223,10 +223,12 @@ to_librdkafka_config(fetch_min_bytes, V) -> {<<"fetch.min.bytes">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(fetch_error_backoff_ms, V) -> {<<"fetch.error.backoff.ms">>, erlkaf_utils:to_binary(V)}; -to_librdkafka_config(offset_store_method, V) -> - {<<"offset.store.method">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(check_crcs, V) -> {<<"check.crcs">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(enable_idempotence, V) -> + {<<"enable.idempotence">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(enable_gapless_guarantee, V) -> + {<<"enable.gapless.guarantee">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(queue_buffering_max_messages, V) -> {<<"queue.buffering.max.messages">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(queue_buffering_max_kbytes, V) ->