From cbee5de31ef9f635e40a8e03e53a957adc6159c2 Mon Sep 17 00:00:00 2001 From: Argonus <9743549+Argonus@users.noreply.github.com> Date: Thu, 18 Apr 2024 15:13:06 +0100 Subject: [PATCH] Add List Offsets API code as Kayrock & refactor describe groups No breaking changes to existing api, yet we are extending it with possibility of adding opts to request s --- .credo.exs | 1 - .formatter.exs | 3 +- lib/kafka_ex/consumer_group.ex | 3 +- lib/kafka_ex/gen_consumer/supervisor.ex | 4 +- lib/kafka_ex/new/client.ex | 136 ++++++-------- lib/kafka_ex/new/client/request_builder.ex | 44 ++++- lib/kafka_ex/new/client/response_parser.ex | 19 +- lib/kafka_ex/new/client/state.ex | 19 +- lib/kafka_ex/new/client_compatibility.ex | 3 +- lib/kafka_ex/new/kafka_ex_api.ex | 71 +++---- .../describe_groups/default_request_impl.ex | 5 +- .../new/protocols/kayrock/list_offsets.ex | 25 +++ .../protocols/kayrock/list_offsets/shared.ex | 40 ++++ .../kayrock/list_offsets/v0_request_impl.ex | 27 +++ .../kayrock/list_offsets/v0_response_impl.ex | 26 +++ .../kayrock/list_offsets/v1_request_impl.ex | 19 ++ .../kayrock/list_offsets/v1_response_impl.ex | 22 +++ .../kayrock/list_offsets/v2_request_impl.ex | 24 +++ .../kayrock/list_offsets/v2_response_impl.ex | 22 +++ .../new/protocols/kayrock_protocol.ex | 18 +- lib/kafka_ex/new/structs/error.ex | 25 +++ lib/kafka_ex/new/structs/offset.ex | 38 ++++ lib/kafka_ex/protocol.ex | 3 +- lib/kafka_ex/protocol/api_versions.ex | 7 +- lib/kafka_ex/protocol/common.ex | 3 +- lib/kafka_ex/protocol/consumer_metadata.ex | 5 +- lib/kafka_ex/protocol/create_topics.ex | 3 +- lib/kafka_ex/protocol/delete_topics.ex | 3 +- lib/kafka_ex/protocol/fetch.ex | 4 +- lib/kafka_ex/protocol/heartbeat.ex | 5 +- lib/kafka_ex/protocol/join_group.ex | 7 +- lib/kafka_ex/protocol/leave_group.ex | 4 +- lib/kafka_ex/protocol/metadata.ex | 3 +- lib/kafka_ex/protocol/offset.ex | 7 +- lib/kafka_ex/protocol/offset_commit.ex | 13 +- lib/kafka_ex/protocol/offset_fetch.ex | 9 +- lib/kafka_ex/protocol/produce.ex | 3 +- lib/kafka_ex/protocol/sync_group.ex | 16 +- lib/kafka_ex/server.ex | 4 +- lib/kafka_ex/types.ex | 39 ++++ mix.exs | 2 +- .../consumer_group_implementation_test.exs | 4 +- test/integration/new_client_test.exs | 45 ++++- test/kafka_ex/compression_test.exs | 12 +- .../new/client/request_builder_test.exs | 66 ++++++- .../kayrock/describe_groups/request_test.exs | 6 +- .../kayrock/describe_groups_test.exs | 23 ++- .../kayrock/list_offsets/request_test.exs | 177 ++++++++++++++++++ .../kayrock/list_offsets/response_test.exs | 126 +++++++++++++ .../protocol/consumer_metadata_test.exs | 3 +- test/kafka_ex/protocol/delete_topics_test.exs | 8 +- test/kafka_ex/protocol/fetch_test.exs | 95 +++++----- test/kafka_ex/protocol/metadata_test.exs | 8 +- test/kafka_ex/protocol/offset_commit_test.exs | 4 +- test/kafka_ex/protocol/offset_fetch_test.exs | 8 +- test/kafka_ex/protocol/offset_test.exs | 7 +- test/kafka_ex/protocol/produce_test.exs | 50 +++-- test/support/integration_helpers.ex | 29 +++ 58 files changed, 1058 insertions(+), 347 deletions(-) create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/shared.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_request_impl.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_request_impl.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_request_impl.ex create mode 100644 lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex create mode 100644 lib/kafka_ex/new/structs/error.ex create mode 100644 lib/kafka_ex/new/structs/offset.ex create mode 100644 lib/kafka_ex/types.ex create mode 100644 test/kafka_ex/new/protocols/kayrock/list_offsets/request_test.exs create mode 100644 test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs create mode 100644 test/support/integration_helpers.ex diff --git a/.credo.exs b/.credo.exs index 43c89f2f..276fbac0 100644 --- a/.credo.exs +++ b/.credo.exs @@ -109,7 +109,6 @@ {Credo.Check.Warning.ExpensiveEmptyEnumCheck}, {Credo.Check.Warning.IExPry}, {Credo.Check.Warning.IoInspect}, - {Credo.Check.Warning.LazyLogging}, {Credo.Check.Warning.OperationOnSameValues}, {Credo.Check.Warning.OperationWithConstantResult}, {Credo.Check.Warning.UnusedEnumOperation}, diff --git a/.formatter.exs b/.formatter.exs index 525446d4..76d930ab 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,5 @@ # Used by "mix format" [ - inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"], + line_length: 120 ] diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 05ce8b39..d8b2acb6 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -369,8 +369,7 @@ defmodule KafkaEx.ConsumerGroup do opts = Keyword.put(opts, :supervisor_pid, self()) children = [ - {KafkaEx.ConsumerGroup.Manager, - {{gen_consumer_module, consumer_module}, group_name, topics, opts}} + {KafkaEx.ConsumerGroup.Manager, {{gen_consumer_module, consumer_module}, group_name, topics, opts}} ] Supervisor.init(children, diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex index e47bf036..97bd6433 100644 --- a/lib/kafka_ex/gen_consumer/supervisor.ex +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -65,9 +65,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do child_spec_builder = fn topic, partition -> %{ id: gen_consumer_module, - start: - {gen_consumer_module, :start_link, - [consumer_module, group_name, topic, partition, opts]} + start: {gen_consumer_module, :start_link, [consumer_module, group_name, topic, partition, opts]} } end diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 3587e5d5..99facf5d 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -54,17 +54,8 @@ defmodule KafkaEx.New.Client do KafkaEx.New.Structs.NodeSelector.t(), pos_integer | nil ) :: {:ok, term} | {:error, term} - def send_request( - server, - request, - node_selector, - timeout \\ nil - ) do - GenServer.call( - server, - {:kayrock_request, request, node_selector}, - timeout_val(timeout) - ) + def send_request(server, request, node_selector, timeout \\ nil) do + GenServer.call(server, {:kayrock_request, request, node_selector}, timeout_val(timeout)) end require Logger @@ -165,11 +156,19 @@ defmodule KafkaEx.New.Client do {:reply, {:ok, topic_metadata}, updated_state} end - def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do - if KafkaEx.valid_consumer_group?(consumer_group_name) do - {response, updated_state} = describe_group_request(consumer_group_name, state) + def handle_call({:list_offsets, [{topic, partitions_data}], opts}, _from, state) do + case list_offset_request({topic, partitions_data}, opts, state) do + {:error, error} -> {:reply, {:error, error}, state} + {result, updated_state} -> {:reply, result, updated_state} + end + end - {:reply, response, updated_state} + def handle_call({:describe_groups, [consumer_group_name], opts}, _from, state) do + if KafkaEx.valid_consumer_group?(consumer_group_name) do + case describe_group_request(consumer_group_name, opts, state) do + {:error, error} -> {:reply, {:error, error}, state} + {result, updated_state} -> {:reply, result, updated_state} + end else {:reply, {:error, :invalid_consumer_group}, state} end @@ -257,66 +256,66 @@ defmodule KafkaEx.New.Client do end end - defp describe_group_request(consumer_group_name, state) do + # ---------------------------------------------------------------------------------------------------- + defp describe_group_request(consumer_group_name, opts, state) do node_selector = NodeSelector.consumer_group(consumer_group_name) + req_data = [{:group_names, [consumer_group_name]} | opts] + + case RequestBuilder.describe_groups_request(req_data, state) do + {:ok, request} -> handle_describe_group_request(request, node_selector, state) + {:error, error} -> {:error, error} + end + end + + defp list_offset_request({topic, partitions_data}, opts, state) do + [%{partition_num: partition_num}] = partitions_data + node_selector = NodeSelector.topic_partition(topic, partition_num) + req_data = [{:topics, [{topic, partitions_data}]} | opts] + + case RequestBuilder.lists_offset_request(req_data, state) do + {:ok, request} -> handle_lists_offsets_request(request, node_selector, state) + {:error, error} -> {:error, error} + end + end - [consumer_group_name] - |> RequestBuilder.describe_groups_request(state) - |> handle_describe_group_request(node_selector, state) + # ---------------------------------------------------------------------------------------------------- + defp handle_describe_group_request(request, node_selector, state) do + handle_request_with_retry(request, &ResponseParser.describe_groups_response/1, node_selector, state) end - defp handle_describe_group_request( - _, - _, - _, - retry_count \\ @retry_count, - _last_error \\ nil - ) + defp handle_lists_offsets_request(request, node_selector, state) do + handle_request_with_retry(request, &ResponseParser.list_offsets_response/1, node_selector, state) + end - defp handle_describe_group_request(_, _, state, 0, last_error) do + # ---------------------------------------------------------------------------------------------------- + defp handle_request_with_retry(_, _, _, _, retry_count \\ @retry_count, _last_error \\ nil) + + defp handle_request_with_retry(_, _, _, state, 0, last_error) do {{:error, last_error}, state} end - defp handle_describe_group_request( - request, - node_selector, - state, - retry_count, - _last_error - ) do + defp handle_request_with_retry(request, parser_fn, node_selector, state, retry_count, _last_error) do case kayrock_network_request(request, node_selector, state) do {{:ok, response}, state_out} -> - case ResponseParser.describe_groups_response(response) do - {:ok, consumer_groups} -> - {{:ok, consumer_groups}, state_out} + case parser_fn.(response) do + {:ok, result} -> + {{:ok, result}, state_out} {:error, [error | _]} -> - Logger.warn( - "Unable to fetch consumer group metadata for #{inspect(request.group_ids)}" - ) - - handle_describe_group_request( - request, - node_selector, - state, - retry_count - 1, - error - ) + request_name = request.__struct__ + Logger.warning("Unable to send request #{inspect(request_name)}, failed with error #{inspect(error)}") + handle_request_with_retry(request, parser_fn, node_selector, state, retry_count - 1, error) end {_, _state_out} -> - Logger.warn("Unable to fetch consumer group metadata for #{inspect(request.group_ids)}") - - handle_describe_group_request( - request, - node_selector, - state, - retry_count - 1, - :unknown - ) + request_name = request.__struct__ + Logger.warning("Unable to send request #{inspect(request_name)}, failed with error unknown") + error = KafkaEx.New.Structs.Error.build(:unknown, %{}) + handle_request_with_retry(request, parser_fn, node_selector, state, retry_count - 1, error) end end + # ---------------------------------------------------------------------------------------------------- defp maybe_connect_broker(broker, state) do case Broker.connected?(broker) do true -> @@ -575,12 +574,7 @@ defmodule KafkaEx.New.Client do {ok_or_error, response, state_out} end - defp kayrock_network_request( - request, - node_selector, - state, - network_timeout \\ nil - ) do + defp kayrock_network_request(request, node_selector, state, network_timeout \\ nil) do # produce request have an acks field and if this is 0 then we do not want to # wait for a response from the broker synchronous = @@ -590,14 +584,7 @@ defmodule KafkaEx.New.Client do end network_timeout = config_sync_timeout(network_timeout) - - {send_request, updated_state} = - get_send_request_function( - node_selector, - state, - network_timeout, - synchronous - ) + {send_request, updated_state} = get_send_request_function(node_selector, state, network_timeout, synchronous) case send_request do :no_broker -> @@ -661,12 +648,7 @@ defmodule KafkaEx.New.Client do network_timeout, synchronous ) do - {broker, updated_state} = - broker_for_partition_with_update( - state, - topic, - partition - ) + {broker, updated_state} = broker_for_partition_with_update(state, topic, partition) if broker do if synchronous do diff --git a/lib/kafka_ex/new/client/request_builder.ex b/lib/kafka_ex/new/client/request_builder.ex index 8e43d58d..a3951cb9 100644 --- a/lib/kafka_ex/new/client/request_builder.ex +++ b/lib/kafka_ex/new/client/request_builder.ex @@ -11,7 +11,8 @@ defmodule KafkaEx.New.Client.RequestBuilder do ) @default_api_version %{ - describe_groups: 1 + describe_groups: 1, + list_offsets: 1 } alias KafkaEx.New.Client.State @@ -19,16 +20,45 @@ defmodule KafkaEx.New.Client.RequestBuilder do @doc """ Builds request for Describe Groups API """ - @spec describe_groups_request([binary], State.t()) :: term - def describe_groups_request(group_names, state) do - api_version = get_api_version(state, :describe_groups) + @spec describe_groups_request(Keyword.t(), State.t()) :: {:ok, term} | {:error, :api_version_no_supported} + def describe_groups_request(request_opts, state) do + case get_api_version(state, :describe_groups, request_opts) do + {:ok, api_version} -> + group_names = Keyword.fetch!(request_opts, :group_names) + req = @protocol.build_request(:describe_groups, api_version, group_names: group_names) + {:ok, req} - @protocol.build_request(:describe_groups, api_version, group_names: group_names) + {:error, error_code} -> + {:error, error_code} + end + end + + @doc """ + Builds request for List Offsets API + """ + @spec lists_offset_request(Keyword.t(), State.t()) :: {:ok, term} | {:error, :api_version_no_supported} + def lists_offset_request(request_opts, state) do + case get_api_version(state, :list_offsets, request_opts) do + {:ok, api_version} -> + topics = Keyword.fetch!(request_opts, :topics) + req = @protocol.build_request(:list_offsets, api_version, topics: topics) + {:ok, req} + + {:error, error_code} -> + {:error, error_code} + end end # ----------------------------------------------------------------------------- - defp get_api_version(state, request_type) do + defp get_api_version(state, request_type, request_opts) do default = Map.fetch!(@default_api_version, request_type) - State.max_supported_api_version(state, request_type, default) + requested_version = Keyword.get(request_opts, :api_version, default) + max_supported = State.max_supported_api_version(state, request_type, default) + + if requested_version > max_supported do + {:error, :api_version_no_supported} + else + {:ok, requested_version} + end end end diff --git a/lib/kafka_ex/new/client/response_parser.ex b/lib/kafka_ex/new/client/response_parser.ex index 80394d33..842be20c 100644 --- a/lib/kafka_ex/new/client/response_parser.ex +++ b/lib/kafka_ex/new/client/response_parser.ex @@ -4,19 +4,24 @@ defmodule KafkaEx.New.Client.ResponseParser do It's main decision point which protocol to use for parsing response """ alias KafkaEx.New.Structs.ConsumerGroup + alias KafkaEx.New.Structs.Error + alias KafkaEx.New.Structs.Offset - @protocol Application.compile_env( - :kafka_ex, - :protocol, - KafkaEx.New.Protocols.KayrockProtocol - ) + @protocol Application.compile_env(:kafka_ex, :protocol, KafkaEx.New.Protocols.KayrockProtocol) @doc """ Parses response for Describe Groups API """ - @spec describe_groups_response(term) :: - {:ok, [ConsumerGroup.t()]} | {:error, term} + @spec describe_groups_response(term) :: {:ok, [ConsumerGroup.t()]} | {:error, term} def describe_groups_response(response) do @protocol.parse_response(:describe_groups, response) end + + @doc """ + Parses response for List Groups API + """ + @spec list_offsets_response(term) :: {:ok, [Offset.t()]} | {:error, Error.t()} + def list_offsets_response(response) do + @protocol.parse_response(:list_offsets, response) + end end diff --git a/lib/kafka_ex/new/client/state.ex b/lib/kafka_ex/new/client/state.ex index fe8a2c52..3a1b291d 100644 --- a/lib/kafka_ex/new/client/state.ex +++ b/lib/kafka_ex/new/client/state.ex @@ -52,12 +52,8 @@ defmodule KafkaEx.New.Client.State do %{state | correlation_id: cid + 1} end - def select_broker( - %__MODULE__{cluster_metadata: cluster_metadata}, - selector - ) do - with {:ok, node_id} <- - ClusterMetadata.select_node(cluster_metadata, selector), + def select_broker(%__MODULE__{cluster_metadata: cluster_metadata}, selector) do + with {:ok, node_id} <- ClusterMetadata.select_node(cluster_metadata, selector), broker <- ClusterMetadata.broker_by_node_id(cluster_metadata, node_id) do {:ok, broker} else @@ -130,12 +126,11 @@ defmodule KafkaEx.New.Client.State do %{state | api_versions: api_versions} end - def max_supported_api_version( - %__MODULE__{api_versions: api_versions}, - api, - default - ) - when is_atom(api) do + @doc """ + Returns max supported api version for request based on cached values in state. + Currently supports Kayrock metadata schema only. + """ + def max_supported_api_version(%__MODULE__{api_versions: api_versions}, api, default) when is_atom(api) do api_key = Kayrock.KafkaSchemaMetadata.api_key(api) {_, max_kayrock_version} = Kayrock.KafkaSchemaMetadata.version_range(api) diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index 45239b08..c407b0bb 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -258,8 +258,7 @@ defmodule KafkaEx.New.ClientCompatibility do case response do {:ok, resp} -> - {:reply, Adapter.delete_topics_response(resp), - State.remove_topics(updated_state, topics)} + {:reply, Adapter.delete_topics_response(resp), State.remove_topics(updated_state, topics)} _ -> {:reply, response, updated_state} diff --git a/lib/kafka_ex/new/kafka_ex_api.ex b/lib/kafka_ex/new/kafka_ex_api.ex index 1d6354d1..11987d0c 100644 --- a/lib/kafka_ex/new/kafka_ex_api.ex +++ b/lib/kafka_ex/new/kafka_ex_api.ex @@ -13,47 +13,37 @@ defmodule KafkaEx.New.KafkaExAPI do ``` """ - alias KafkaEx.New.Client alias KafkaEx.New.Structs.ClusterMetadata alias KafkaEx.New.Structs.ConsumerGroup alias KafkaEx.New.Structs.Topic - alias KafkaEx.New.Structs.NodeSelector + alias KafkaEx.New.Structs.Offset @type node_id :: non_neg_integer - @type topic_name :: binary - @type partition_id :: non_neg_integer - @type consumer_group_name :: binary - @type offset :: non_neg_integer + + @type topic_name :: KafkaEx.Types.topic() + @type partition_id :: KafkaEx.Types.partition() + @type consumer_group_name :: KafkaEx.Types.consumer_group_name() + @type offset_val :: KafkaEx.Types.offset() + @type timestamp :: KafkaEx.Types.timestamp() + @type error_atom :: atom @type client :: GenServer.server() @type correlation_id :: non_neg_integer + @type opts :: Keyword.t() @doc """ Fetch the latest offset for a given partition """ - @spec latest_offset(client, topic_name, partition_id) :: - {:error, error_atom} | {:ok, offset} - def latest_offset(client, topic, partition) do - request = %Kayrock.ListOffsets.V1.Request{ - replica_id: -1, - topics: [ - %{topic: topic, partitions: [%{partition: partition, timestamp: -1}]} - ] - } - - {:ok, resp} = - Client.send_request( - client, - request, - NodeSelector.topic_partition(topic, partition) - ) - - [topic_resp] = resp.responses - [%{error_code: error_code, offset: offset}] = topic_resp.partition_responses - - case error_code do - 0 -> {:ok, offset} - _ -> {:error, Kayrock.ErrorCode.code_to_atom(error_code)} + @spec latest_offset(client, topic_name, partition_id) :: {:error, error_atom} | {:ok, offset_val} + @spec latest_offset(client, topic_name, partition_id, opts) :: {:error, error_atom} | {:ok, offset_val} + def latest_offset(client, topic, partition, opts \\ []) do + opts = Keyword.merge([api_version: 1], opts) + partition = %{partition_num: partition, timestamp: -1} + + case GenServer.call(client, {:list_offsets, [{topic, [partition]}], opts}) do + {:ok, [offset_struct]} -> {:ok, offset_struct.offset} + {:error, %{error: error_atom}} -> {:error, error_atom} + {:error, error_atom} -> {:error, error_atom} end end @@ -62,10 +52,25 @@ defmodule KafkaEx.New.KafkaExAPI do We support only one consumer group per request for now, as we don't group requests by group coordinator. """ - @spec describe_group(client, Keyword.t()) :: - {:ok, ConsumerGroup.t()} | {:error, any} - def describe_group(client, consumer_group_name) do - case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do + @spec describe_group(client, consumer_group_name) :: {:ok, ConsumerGroup.t()} | {:error, any} + @spec describe_group(client, consumer_group_name, opts) :: {:ok, ConsumerGroup.t()} | {:error, any} + def describe_group(client, consumer_group_name, opts \\ []) do + case GenServer.call(client, {:describe_groups, [consumer_group_name], opts}) do + {:ok, [group]} -> {:ok, group} + {:error, error} -> {:error, error} + end + end + + @doc """ + Returns list of Offsets per topic per partition. + We support only one topic partition pair for now, as we don't request by leader. + """ + @spec list_offsets(client, {topic_name, [{partition_id, timestamp}]}) :: {:ok, list(Offset.t())} | {:error, any} + @spec list_offsets(client, {topic_name, [{partition_id, timestamp}]}, opts) :: {:ok, list(Offset.t())} | {:error, any} + def list_offsets(client, {topic, [{partition, timestamp}]}, opts \\ []) do + partition = %{partition_num: partition, timestamp: timestamp} + + case GenServer.call(client, {:list_offsets, [{topic, [partition]}], opts}) do {:ok, [group]} -> {:ok, group} {:error, error} -> {:error, error} end diff --git a/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex index 089e6a2e..74481cd2 100644 --- a/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex +++ b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex @@ -1,6 +1,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request, for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do - def build_request(request_template, consumer_group_names) do - Map.put(request_template, :group_ids, consumer_group_names) + def build_request(request_template, opts) do + group_ids = Keyword.fetch!(opts, :group_names) + Map.put(request_template, :group_ids, group_ids) end end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets.ex new file mode 100644 index 00000000..5b0120e5 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets.ex @@ -0,0 +1,25 @@ +defprotocol KafkaEx.New.Protocols.Kayrock.ListOffsets do + @moduledoc """ + This module handles List Offsets request & response parsing. + Request is built using Kayrock protocol, response is parsed to + native KafkaEx structs. + """ + + defprotocol Request do + @moduledoc """ + This protocol is used to build Lists Offsets request + """ + @spec build_request(t(), Keyword.t()) :: t() + def build_request(request, opts) + end + + defprotocol Response do + @moduledoc """ + This protocol is used to parse Lists Offsets response + """ + alias KafkaEx.New.Structs.Offset + + @spec parse_response(t()) :: {:ok, [Offset.t()]} | {:error, term} + def parse_response(response) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/shared.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/shared.ex new file mode 100644 index 00000000..2a3156ba --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/shared.ex @@ -0,0 +1,40 @@ +defmodule KafkaEx.New.Protocols.ListOffsets.Shared do + @moduledoc false + + alias KafkaEx.New.Structs.Error, as: ErrorStruct + + # ------------------------------------------------------------------------------ + def build_response(offsets) when is_list(offsets), do: {:ok, offsets} + + def build_response({error_code, topic, partition}) do + error = ErrorStruct.build(error_code, %{topic: topic, partition: partition}) + {:error, error} + end + + def fail_fast_iterate_topics(topics_data, parser_fn) do + Enum.reduce_while(topics_data, [], fn response, acc -> + case parser_fn.(response.topic, response.partition_responses) do + {:ok, result} -> {:cont, merge_acc(result, acc)} + {:error, error_data} -> {:halt, error_data} + end + end) + end + + # ------------------------------------------------------------------------------ + def fail_fast_iterate_partitions(partitions_data, topic, parser_fn) do + partitions_data + |> Enum.reduce_while([], fn datum, acc -> + case parser_fn.(topic, datum) do + {:ok, value} -> {:cont, merge_acc(value, acc)} + {:error, error_data} -> {:halt, error_data} + end + end) + |> case do + results when is_list(results) -> {:ok, results} + error_data -> {:error, error_data} + end + end + + defp merge_acc(result, acc) when is_list(result), do: result ++ acc + defp merge_acc(result, acc), do: [result | acc] +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_request_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_request_impl.ex new file mode 100644 index 00000000..2b762e02 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_request_impl.ex @@ -0,0 +1,27 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Request, for: Kayrock.ListOffsets.V0.Request do + def build_request(request_template, opts) do + replica_id = Keyword.get(opts, :replica_id, -1) + offset_num = Keyword.get(opts, :offset_num, 1) + + topics = + opts + |> Keyword.fetch!(:topics) + |> Enum.map(fn {topic, partitions} -> + %{ + topic: topic, + partitions: + Enum.map(partitions, fn partition_data -> + %{ + partition: partition_data.partition_num, + timestamp: partition_data.timestamp, + max_num_offsets: offset_num + } + end) + } + end) + + request_template + |> Map.put(:replica_id, replica_id) + |> Map.put(:topics, topics) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex new file mode 100644 index 00000000..249e5db2 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex @@ -0,0 +1,26 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOffsets.V0.Response do + import KafkaEx.New.Protocols.ListOffsets.Shared, + only: [build_response: 1, fail_fast_iterate_topics: 2, fail_fast_iterate_partitions: 3] + + def parse_response(%{responses: responses}) do + responses + |> fail_fast_iterate_topics(&parse_partition_responses/2) + |> build_response() + end + + defp parse_partition_responses(topic, partition_responses) do + fail_fast_iterate_partitions(partition_responses, topic, &build_offset/2) + end + + defp build_offset(topic, %{partition: partition, error_code: 0, offsets: []}) do + {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, partition, 0)} + end + + defp build_offset(topic, %{partition: partition, error_code: 0, offsets: [offset | _]}) do + {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, partition, offset)} + end + + defp build_offset(topic, %{error_code: error_code, partition: partition}) do + {:error, {error_code, topic, partition}} + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_request_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_request_impl.ex new file mode 100644 index 00000000..a2f0a297 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_request_impl.ex @@ -0,0 +1,19 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Request, for: [Kayrock.ListOffsets.V1.Request] do + def build_request(request_template, opts) do + replica_id = Keyword.get(opts, :replica_id, -1) + + topics = + opts + |> Keyword.fetch!(:topics) + |> Enum.map(fn {topic, partitions} -> + %{ + topic: topic, + partitions: Enum.map(partitions, &%{partition: &1.partition_num, timestamp: &1.timestamp}) + } + end) + + request_template + |> Map.put(:replica_id, replica_id) + |> Map.put(:topics, topics) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex new file mode 100644 index 00000000..cf00252d --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex @@ -0,0 +1,22 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOffsets.V1.Response do + import KafkaEx.New.Protocols.ListOffsets.Shared, + only: [build_response: 1, fail_fast_iterate_topics: 2, fail_fast_iterate_partitions: 3] + + def parse_response(%{responses: responses}) do + responses + |> fail_fast_iterate_topics(&parse_partition_responses/2) + |> build_response() + end + + defp parse_partition_responses(topic, partition_responses) do + fail_fast_iterate_partitions(partition_responses, topic, &build_offset/2) + end + + defp build_offset(topic, %{error_code: 0, partition: p, offset: o}) do + {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, p, o)} + end + + defp build_offset(topic, %{error_code: error_code, partition: partition}) do + {:error, {error_code, topic, partition}} + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_request_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_request_impl.ex new file mode 100644 index 00000000..8c8b8a10 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_request_impl.ex @@ -0,0 +1,24 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Request, for: [Kayrock.ListOffsets.V2.Request] do + @isolation_level %{read_uncommited: 0, read_commited: 1} + + def build_request(request_template, opts) do + replica_id = Keyword.get(opts, :replica_id, -1) + isolation_level = Keyword.get(opts, :isolation_level) + kafka_isolation_level = Map.get(@isolation_level, isolation_level, 0) + + topics = + opts + |> Keyword.fetch!(:topics) + |> Enum.map(fn {topic, partitions} -> + %{ + topic: topic, + partitions: Enum.map(partitions, &%{partition: &1.partition_num, timestamp: &1.timestamp}) + } + end) + + request_template + |> Map.put(:replica_id, replica_id) + |> Map.put(:isolation_level, kafka_isolation_level) + |> Map.put(:topics, topics) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex new file mode 100644 index 00000000..22f8331f --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex @@ -0,0 +1,22 @@ +defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOffsets.V2.Response do + import KafkaEx.New.Protocols.ListOffsets.Shared, + only: [build_response: 1, fail_fast_iterate_topics: 2, fail_fast_iterate_partitions: 3] + + def parse_response(%{responses: responses}) do + responses + |> fail_fast_iterate_topics(&parse_partition_responses/2) + |> build_response() + end + + defp parse_partition_responses(topic, partition_responses) do + fail_fast_iterate_partitions(partition_responses, topic, &build_offset/2) + end + + defp build_offset(topic, %{error_code: 0, partition: p, offset: o, timestamp: t}) do + {:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, p, o, t)} + end + + defp build_offset(topic, %{error_code: error_code, partition: partition}) do + {:error, {error_code, topic, partition}} + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock_protocol.ex b/lib/kafka_ex/new/protocols/kayrock_protocol.ex index a348aa6a..d94a1102 100644 --- a/lib/kafka_ex/new/protocols/kayrock_protocol.ex +++ b/lib/kafka_ex/new/protocols/kayrock_protocol.ex @@ -10,23 +10,31 @@ defmodule KafkaEx.New.Protocols.KayrockProtocol do # ----------------------------------------------------------------------------- @doc """ - Builds request for Describe Groups API + Builds kayrock request based on type, api version and opts """ @impl KafkaEx.New.Client.Protocol def build_request(:describe_groups, api_version, opts) do - group_names = Keyword.fetch!(opts, :group_names) - api_version |> Kayrock.DescribeGroups.get_request_struct() - |> KayrockProtocol.DescribeGroups.Request.build_request(group_names) + |> KayrockProtocol.DescribeGroups.Request.build_request(opts) + end + + def build_request(:list_offsets, api_version, opts) do + api_version + |> Kayrock.ListOffsets.get_request_struct() + |> KayrockProtocol.ListOffsets.Request.build_request(opts) end # ----------------------------------------------------------------------------- @doc """ - Parses response for Describe Groups API + Parses response based on request type and response """ @impl KafkaEx.New.Client.Protocol def parse_response(:describe_groups, response) do KayrockProtocol.DescribeGroups.Response.parse_response(response) end + + def parse_response(:list_offsets, response) do + KayrockProtocol.ListOffsets.Response.parse_response(response) + end end diff --git a/lib/kafka_ex/new/structs/error.ex b/lib/kafka_ex/new/structs/error.ex new file mode 100644 index 00000000..3fc98a3b --- /dev/null +++ b/lib/kafka_ex/new/structs/error.ex @@ -0,0 +1,25 @@ +defmodule KafkaEx.New.Structs.Error do + @moduledoc """ + This module represents kafka error as a struct with code and metadata + """ + defstruct error: nil, metadata: %{} + + @type error_code :: KafkaEx.Types.error_code() | atom + + @type t :: %__MODULE__{error: atom | nil, metadata: term} + + @spec build(error_code, term) :: __MODULE__.t() + def build(error_code, metadata) when is_integer(error_code) do + %__MODULE__{ + error: Kayrock.ErrorCode.code_to_atom(error_code), + metadata: metadata + } + end + + def build(error, metadata) when is_atom(error) do + %__MODULE__{ + error: error, + metadata: metadata + } + end +end diff --git a/lib/kafka_ex/new/structs/offset.ex b/lib/kafka_ex/new/structs/offset.ex new file mode 100644 index 00000000..3d8c10a3 --- /dev/null +++ b/lib/kafka_ex/new/structs/offset.ex @@ -0,0 +1,38 @@ +defmodule KafkaEx.New.Structs.Offset do + @moduledoc """ + This module represents Offset value coming from Kafka + """ + defstruct [:topic, :partition, :offset, :timestamp] + + @type topic :: KafkaEx.Types.topic() + @type partition :: KafkaEx.Types.partition() + @type offset :: KafkaEx.Types.offset() + @type timestamp :: KafkaEx.Types.timestamp() + + @type t :: %__MODULE__{ + topic: topic(), + partition: partition(), + offset: offset(), + timestamp: timestamp() | nil + } + + @spec from_list_offset(topic, partition, offset) :: __MODULE__.t() + def from_list_offset(topic, partition, offset) do + %__MODULE__{ + topic: topic, + partition: partition, + offset: offset, + timestamp: nil + } + end + + @spec from_list_offset(topic, partition, offset, timestamp) :: __MODULE__.t() + def from_list_offset(topic, partition, offset, timestamp) do + %__MODULE__{ + topic: topic, + partition: partition, + offset: offset, + timestamp: timestamp + } + end +end diff --git a/lib/kafka_ex/protocol.ex b/lib/kafka_ex/protocol.ex index 064ce7cd..9e275d38 100644 --- a/lib/kafka_ex/protocol.ex +++ b/lib/kafka_ex/protocol.ex @@ -34,8 +34,7 @@ defmodule KafkaEx.Protocol do client_id, api_version \\ @default_api_version ) do - <> + <> end @error_map %{ diff --git a/lib/kafka_ex/protocol/api_versions.ex b/lib/kafka_ex/protocol/api_versions.ex index 9f7ac586..c962fab2 100644 --- a/lib/kafka_ex/protocol/api_versions.ex +++ b/lib/kafka_ex/protocol/api_versions.ex @@ -62,8 +62,7 @@ defmodule KafkaEx.Protocol.ApiVersions do def parse_response(binary, this_api_version \\ @default_this_api_version) def parse_response( - <<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed, - rest::binary>>, + <<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed, rest::binary>>, this_api_version ) do %{ @@ -99,9 +98,7 @@ defmodule KafkaEx.Protocol.ApiVersions do end end - defp parse_one_api_version( - <> - ) do + defp parse_one_api_version(<>) do {%ApiVersion{ api_key: api_key, min_version: min_version, diff --git a/lib/kafka_ex/protocol/common.ex b/lib/kafka_ex/protocol/common.ex index 7725fc8b..2cd0f1b0 100644 --- a/lib/kafka_ex/protocol/common.ex +++ b/lib/kafka_ex/protocol/common.ex @@ -17,8 +17,7 @@ defmodule KafkaEx.Protocol.Common do def parse_topics( topics_size, - <>, + <>, mod ) do struct_module = Module.concat(mod, Response) diff --git a/lib/kafka_ex/protocol/consumer_metadata.ex b/lib/kafka_ex/protocol/consumer_metadata.ex index aa81880c..e778c0d2 100644 --- a/lib/kafka_ex/protocol/consumer_metadata.ex +++ b/lib/kafka_ex/protocol/consumer_metadata.ex @@ -47,9 +47,8 @@ defmodule KafkaEx.Protocol.ConsumerMetadata do @spec parse_response(binary) :: Response.t() def parse_response( - <<_corr_id::32-signed, error_code::16-signed, coord_id::32-signed, - coord_host_size::16-signed, coord_host::size(coord_host_size)-binary, - coord_port::32-signed, _::binary>> + <<_corr_id::32-signed, error_code::16-signed, coord_id::32-signed, coord_host_size::16-signed, + coord_host::size(coord_host_size)-binary, coord_port::32-signed, _::binary>> ) do %Response{ coordinator_id: coord_id, diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index bde40e7c..2bd284f7 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -147,8 +147,7 @@ defmodule KafkaEx.Protocol.CreateTopics do defp parse_topic_errors( topic_errors_count, - <> + <> ) do [ %TopicError{ diff --git a/lib/kafka_ex/protocol/delete_topics.ex b/lib/kafka_ex/protocol/delete_topics.ex index 0e960999..b512359e 100644 --- a/lib/kafka_ex/protocol/delete_topics.ex +++ b/lib/kafka_ex/protocol/delete_topics.ex @@ -80,8 +80,7 @@ defmodule KafkaEx.Protocol.DeleteTopics do defp parse_topic_errors( topic_errors_count, - <> + <> ) do [ %TopicError{ diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index a896ee90..f6142208 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -106,8 +106,8 @@ defmodule KafkaEx.Protocol.Fetch do def parse_partitions( partitions_size, - <>, + <>, partitions, topic ) do diff --git a/lib/kafka_ex/protocol/heartbeat.ex b/lib/kafka_ex/protocol/heartbeat.ex index f6f3dd72..43180557 100644 --- a/lib/kafka_ex/protocol/heartbeat.ex +++ b/lib/kafka_ex/protocol/heartbeat.ex @@ -26,9 +26,8 @@ defmodule KafkaEx.Protocol.Heartbeat do def create_request(correlation_id, client_id, request) do [ KafkaEx.Protocol.create_request(:heartbeat, correlation_id, client_id), - <> + <> ] end diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index b07a2711..1988424c 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -74,10 +74,9 @@ defmodule KafkaEx.Protocol.JoinGroup do @spec parse_response(binary) :: Response.t() def parse_response( - <<_correlation_id::32-signed, error_code::16-signed, generation_id::32-signed, - protocol_len::16-signed, _protocol::size(protocol_len)-binary, leader_len::16-signed, - leader::size(leader_len)-binary, member_id_len::16-signed, - member_id::size(member_id_len)-binary, members_size::32-signed, rest::binary>> + <<_correlation_id::32-signed, error_code::16-signed, generation_id::32-signed, protocol_len::16-signed, + _protocol::size(protocol_len)-binary, leader_len::16-signed, leader::size(leader_len)-binary, + member_id_len::16-signed, member_id::size(member_id_len)-binary, members_size::32-signed, rest::binary>> ) do members = parse_members(members_size, rest, []) diff --git a/lib/kafka_ex/protocol/leave_group.ex b/lib/kafka_ex/protocol/leave_group.ex index bfe4435b..91af1840 100644 --- a/lib/kafka_ex/protocol/leave_group.ex +++ b/lib/kafka_ex/protocol/leave_group.ex @@ -25,8 +25,8 @@ defmodule KafkaEx.Protocol.LeaveGroup do def create_request(correlation_id, client_id, request) do [ KafkaEx.Protocol.create_request(:leave_group, correlation_id, client_id), - <> + <> ] end diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index b9dd22e5..c5100ab9 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -230,8 +230,7 @@ defmodule KafkaEx.Protocol.Metadata do defp parse_brokers( 0, brokers_size, - <>, + <>, brokers ) do parse_brokers(0, brokers_size - 1, rest, [ diff --git a/lib/kafka_ex/protocol/offset.ex b/lib/kafka_ex/protocol/offset.ex index fff72ed2..62b0bf38 100644 --- a/lib/kafka_ex/protocol/offset.ex +++ b/lib/kafka_ex/protocol/offset.ex @@ -37,8 +37,8 @@ defmodule KafkaEx.Protocol.Offset do def create_request(correlation_id, client_id, topic, partition, time) do [ KafkaEx.Protocol.create_request(:offset, correlation_id, client_id), - <<-1::32-signed, 1::32-signed, byte_size(topic)::16-signed, topic::binary, 1::32-signed, - partition::32-signed, parse_time(time)::64, 1::32>> + <<-1::32-signed, 1::32-signed, byte_size(topic)::16-signed, topic::binary, 1::32-signed, partition::32-signed, + parse_time(time)::64, 1::32>> ] end @@ -62,8 +62,7 @@ defmodule KafkaEx.Protocol.Offset do defp parse_topics( topics_size, - <> + <> ) do {partitions, topics_data} = parse_partitions(partitions_size, rest) diff --git a/lib/kafka_ex/protocol/offset_commit.ex b/lib/kafka_ex/protocol/offset_commit.ex index 50e8a786..7954e76e 100644 --- a/lib/kafka_ex/protocol/offset_commit.ex +++ b/lib/kafka_ex/protocol/offset_commit.ex @@ -40,12 +40,10 @@ defmodule KafkaEx.Protocol.OffsetCommit do def create_request(correlation_id, client_id, offset_commit_request) do [ Protocol.create_request(:offset_commit, correlation_id, client_id), - <> + <> ] end @@ -58,8 +56,7 @@ defmodule KafkaEx.Protocol.OffsetCommit do defp parse_topics( topic_count, - <> + <> ) do {partitions, topics_data} = parse_partitions(partitions_count, rest, []) diff --git a/lib/kafka_ex/protocol/offset_fetch.ex b/lib/kafka_ex/protocol/offset_fetch.ex index 4578206d..8980fd54 100644 --- a/lib/kafka_ex/protocol/offset_fetch.ex +++ b/lib/kafka_ex/protocol/offset_fetch.ex @@ -51,9 +51,8 @@ defmodule KafkaEx.Protocol.OffsetFetch do def create_request(correlation_id, client_id, offset_fetch_request) do [ KafkaEx.Protocol.create_request(:offset_fetch, correlation_id, client_id), - <> ] end @@ -66,8 +65,8 @@ defmodule KafkaEx.Protocol.OffsetFetch do def parse_partitions( partitions_size, - <>, + <>, partitions, topic ) do diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 02fd4fb1..45fe3703 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -84,8 +84,7 @@ defmodule KafkaEx.Protocol.Produce do [ KafkaEx.Protocol.create_request(:produce, correlation_id, client_id), <>, - <>, + <>, message_set ] end diff --git a/lib/kafka_ex/protocol/sync_group.ex b/lib/kafka_ex/protocol/sync_group.ex index 079dcd5e..5b323e65 100644 --- a/lib/kafka_ex/protocol/sync_group.ex +++ b/lib/kafka_ex/protocol/sync_group.ex @@ -41,9 +41,8 @@ defmodule KafkaEx.Protocol.SyncGroup do def create_request(correlation_id, client_id, %Request{} = request) do [ KafkaEx.Protocol.create_request(:sync_group, correlation_id, client_id), - <> ] end @@ -75,8 +74,8 @@ defmodule KafkaEx.Protocol.SyncGroup do 0::32-signed >> - <> + <> end defp topic_assignment_data([], acc), do: acc @@ -98,9 +97,7 @@ defmodule KafkaEx.Protocol.SyncGroup do defp parse_member_assignment(<<>>), do: [] - defp parse_member_assignment( - <<@member_assignment_version::16-signed, assignments_size::32-signed, rest::binary>> - ) do + defp parse_member_assignment(<<@member_assignment_version::16-signed, assignments_size::32-signed, rest::binary>>) do parse_assignments(assignments_size, rest, []) end @@ -108,8 +105,7 @@ defmodule KafkaEx.Protocol.SyncGroup do defp parse_assignments( size, - <>, + <>, assignments ) do {partitions, rest} = parse_partitions(partition_len, rest, []) diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 8daf84ea..ce6cfded 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -879,9 +879,7 @@ defmodule KafkaEx.Server do module.parse_response(response) rescue _ -> - Logger.error( - "Failed to parse a response from the server: #{inspect(response)}" - ) + Logger.error("Failed to parse a response from the server: #{inspect(response)}") Kernel.reraise( "Parse error during #{inspect(module)}.parse_response. Couldn't parse: #{inspect(response)}", diff --git a/lib/kafka_ex/types.ex b/lib/kafka_ex/types.ex new file mode 100644 index 00000000..61df8fde --- /dev/null +++ b/lib/kafka_ex/types.ex @@ -0,0 +1,39 @@ +defmodule KafkaEx.Types do + @moduledoc """ + KafkaEx.Types + This module contains basic types shared between KafkaEx modules + """ + + @typedoc """ + Integer representing error code returned by kafka + 0 means no error, any other value matches some error + """ + @type error_code :: integer + + @typedoc """ + Topic name UTF-8 Encoded + """ + @type topic :: String.t() + + @typedoc """ + Integer representing a partition number in topic + """ + @type partition :: non_neg_integer + + @typedoc """ + Integer representing an offset of a given message + Unique per topic/partition pair + """ + @type offset :: non_neg_integer + + @typedoc """ + Integer representing a kafka timestamp value. + It's a number of milliseconds since Unix epoch. + """ + @type timestamp :: integer + + @typedoc """ + Consumer group name UTF-8 encoded + """ + @type consumer_group_name :: String.t() +end diff --git a/mix.exs b/mix.exs index 0f5342e7..e6e9b368 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule KafkaEx.Mixfile do use Mix.Project @source_url "https://github.com/kafkaex/kafka_ex" - @version "0.14.0-rc.1" + @version "0.15.0-dev" def project do [ diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index d3f5a39e..ba12cc26 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -184,9 +184,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do { :ok, - consumer_group_pid1: consumer_group_pid1, - consumer_group_pid2: consumer_group_pid2, - ports_before: ports_before + consumer_group_pid1: consumer_group_pid1, consumer_group_pid2: consumer_group_pid2, ports_before: ports_before } end diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index f4556bc4..ab309a84 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -1,6 +1,7 @@ defmodule KafkaEx.New.Client.Test do use ExUnit.Case import KafkaEx.TestHelpers + import KafkaEx.IntegrationHelpers alias KafkaEx.New.Client @@ -17,7 +18,6 @@ defmodule KafkaEx.New.Client.Test do setup do {:ok, args} = KafkaEx.build_worker_options([]) - {:ok, pid} = Client.start_link(args, :no_name) {:ok, %{client: pid}} @@ -38,7 +38,7 @@ defmodule KafkaEx.New.Client.Test do } do join_to_group(client, topic, consumer_group) - {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, [consumer_group]}) + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, [consumer_group], []}) assert group_metadata.group_id == consumer_group assert group_metadata.protocol_type == "consumer" @@ -47,13 +47,51 @@ defmodule KafkaEx.New.Client.Test do end test "returns dead when consumer group does not exist", %{client: client} do - {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, ["non-existing-group"]}) + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, ["non-existing-group"], []}) assert group_metadata.group_id == "non-existing-group" assert group_metadata.state == "Dead" end end + describe "list_offsets/1" do + test "list offsets for empty topic", %{client: client} do + topic_name = generate_random_string() + _ = create_topic(client, topic_name) + partition = [%{partition_num: 0, timestamp: -1}] + + {:ok, [offset]} = GenServer.call(client, {:list_offsets, [{topic_name, partition}], []}) + + assert offset.topic == topic_name + assert offset.partition == 0 + assert offset.offset == 0 + end + + test "lists offsets for topic with existing messages", %{client: client} do + topic_name = generate_random_string() + _ = create_topic(client, topic_name) + _ = partition_produce(client, topic_name, "value", 0) + partition = [%{partition_num: 0, timestamp: -1}] + + {:ok, [offset]} = GenServer.call(client, {:list_offsets, [{topic_name, partition}], []}) + + assert offset.topic == topic_name + assert offset.partition == 0 + assert offset.offset == 1 + end + + test "returns error when opts are invalid", %{client: client} do + topic_name = generate_random_string() + _ = create_topic(client, topic_name) + _ = partition_produce(client, topic_name, "value", 0) + partition = [%{partition_num: 0, timestamp: -1}] + + {:error, error} = GenServer.call(client, {:list_offsets, [{topic_name, partition}], [api_version: 5]}) + + assert error == :api_version_no_supported + end + end + test "update metadata", %{client: client} do {:ok, updated_metadata} = GenServer.call(client, :update_metadata) %ClusterMetadata{topics: topics} = updated_metadata @@ -84,7 +122,6 @@ defmodule KafkaEx.New.Client.Test do ) %Kayrock.ListOffsets.V1.Response{responses: [main_resp]} = resp - [%{error_code: error_code, offset: offset}] = main_resp.partition_responses assert error_code == 0 diff --git a/test/kafka_ex/compression_test.exs b/test/kafka_ex/compression_test.exs index bd2708d1..a4aeabef 100644 --- a/test/kafka_ex/compression_test.exs +++ b/test/kafka_ex/compression_test.exs @@ -3,15 +3,13 @@ defmodule KafkaEx.CompressionTest do test "snappy decompression works with chunked messages" do data = - <<130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 14, 12, 44, 0, 0, 0, 0, - 0, 0, 0, 37, 0, 0, 3, 246, 0, 0, 0, 75, 246, 7, 92, 10, 44, 16, 236, 0, 0, 255, 255, 255, - 255, 0, 0, 3, 232, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 254, 10, 0, 254, 10, 0, 254, - 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, - 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 118, 10, 0>> + <<130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 14, 12, 44, 0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, + 246, 0, 0, 0, 75, 246, 7, 92, 10, 44, 16, 236, 0, 0, 255, 255, 255, 255, 0, 0, 3, 232, 65, 66, 67, 68, 69, 70, + 71, 72, 73, 74, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, + 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 118, 10, 0>> expected = - <<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255, 255, 255, 0, 0, 3, - 232>> <> + <<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255, 255, 255, 0, 0, 3, 232>> <> String.duplicate("ABCDEFGHIJ", 100) ## enable :snappyer module, and test it diff --git a/test/kafka_ex/new/client/request_builder_test.exs b/test/kafka_ex/new/client/request_builder_test.exs index 0f06ca92..bf159080 100644 --- a/test/kafka_ex/new/client/request_builder_test.exs +++ b/test/kafka_ex/new/client/request_builder_test.exs @@ -5,16 +5,74 @@ defmodule KafkaEx.New.Client.RequestBuilderTest do describe "describe_groups_request/2" do test "returns request for DescribeGroups API" do - state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}} + state = %KafkaEx.New.Client.State{api_versions: %{15 => {0, 1}}} group_names = ["group1", "group2"] - expected_request = %Kayrock.DescribeGroups.V1.Request{ - group_ids: group_names + expected_request = %Kayrock.DescribeGroups.V1.Request{group_ids: group_names} + + {:ok, request} = RequestBuilder.describe_groups_request([group_names: group_names], state) + + assert expected_request == request + end + + test "returns request with custom API version" do + state = %KafkaEx.New.Client.State{api_versions: %{15 => {0, 1}}} + group_names = ["group1", "group2"] + + expected_request = %Kayrock.DescribeGroups.V0.Request{group_ids: group_names} + + {:ok, request} = RequestBuilder.describe_groups_request([group_names: group_names, api_version: 0], state) + + assert expected_request == request + end + + test "returns error when api version is not supported" do + state = %KafkaEx.New.Client.State{api_versions: %{15 => {0, 1}}} + group_names = ["group1", "group2"] + + {:error, error_value} = RequestBuilder.describe_groups_request([group_names: group_names, api_version: 3], state) + + assert error_value == :api_version_no_supported + end + end + + describe "lists_offset_request/2" do + test "returns request for ListOffsets API" do + state = %KafkaEx.New.Client.State{api_versions: %{2 => {0, 2}}} + topic_data = [{"test-topic", [1]}] + + {:ok, request} = RequestBuilder.lists_offset_request([topics: topic_data], state) + + expected_request = %Kayrock.ListOffsets.V1.Request{ + replica_id: -1, + topics: [%{partitions: [%{timestamp: -1, partition: 1}], topic: "test-topic"}] } - request = RequestBuilder.describe_groups_request(group_names, state) + assert expected_request == request + end + + test "returns request with custom API version" do + state = %KafkaEx.New.Client.State{api_versions: %{2 => {0, 2}}} + topic_data = [{"test-topic", [1]}] + + {:ok, request} = RequestBuilder.lists_offset_request([topics: topic_data, api_version: 2], state) + + expected_request = %Kayrock.ListOffsets.V2.Request{ + replica_id: -1, + isolation_level: 0, + topics: [%{partitions: [%{timestamp: -1, partition: 1}], topic: "test-topic"}] + } assert expected_request == request end + + test "returns error when api version is not supported" do + state = %KafkaEx.New.Client.State{api_versions: %{2 => {0, 2}}} + topic_data = [{"test-topic", [1]}] + + {:error, error_value} = RequestBuilder.lists_offset_request([topics: topic_data, api_version: 3], state) + + assert error_value == :api_version_no_supported + end end end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs index 18a900c4..7b8507c6 100644 --- a/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs @@ -13,8 +13,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.RequestTest do expected_request = %V0.Request{group_ids: groups} - assert expected_request == - DescribeGroupsRequest.build_request(%V0.Request{}, groups) + assert expected_request == DescribeGroupsRequest.build_request(%V0.Request{}, group_names: groups) end test "for api version 1 - builds describe group request" do @@ -22,8 +21,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.RequestTest do expected_request = %V1.Request{group_ids: groups} - assert expected_request == - DescribeGroupsRequest.build_request(%V1.Request{}, groups) + assert expected_request == DescribeGroupsRequest.build_request(%V1.Request{}, group_names: groups) end end end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs index eb07b7a5..ebcc0772 100644 --- a/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs @@ -12,7 +12,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do assert KayrockDescribeGroups.Request.build_request( %V0.Request{}, - consumer_group_names + group_names: consumer_group_names ) == expected_request end end @@ -59,17 +59,16 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do client_id: "client_id", client_host: "client_host", member_metadata: "member_metadata", - member_assignment: - %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ - version: 0, - user_data: "user_data", - partition_assignments: [ - %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ - topic: "test-topic", - partitions: [1, 2, 3] - } - ] - } + member_assignment: %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } } ] } diff --git a/test/kafka_ex/new/protocols/kayrock/list_offsets/request_test.exs b/test/kafka_ex/new/protocols/kayrock/list_offsets/request_test.exs new file mode 100644 index 00000000..865831db --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/list_offsets/request_test.exs @@ -0,0 +1,177 @@ +defmodule KafkaEx.New.Protocols.Kayrock.ListOffsets.RequestTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.Kayrock.ListOffsets.Request, as: ListOffsetsRequest + + alias Kayrock.ListOffsets.V0 + alias Kayrock.ListOffsets.V1 + alias Kayrock.ListOffsets.V2 + + @partitions_data [ + %{partition_num: 1, timestamp: -1}, + %{partition_num: 2, timestamp: -1}, + %{partition_num: 3, timestamp: -1} + ] + + describe "build_request/3" do + test "for v0 - it builds a list offsets request with defaults" do + topics = [{"test_topic", @partitions_data}] + + request = ListOffsetsRequest.build_request(%V0.Request{}, topics: topics) + + assert request |> attach_client_data() |> V0.Request.serialize() + + assert request == %V0.Request{ + client_id: nil, + correlation_id: nil, + replica_id: -1, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: -1, max_num_offsets: 1}, + %{partition: 2, timestamp: -1, max_num_offsets: 1}, + %{partition: 3, timestamp: -1, max_num_offsets: 1} + ] + } + ] + } + end + + test "for v0 - it builds a list offsets request with customs" do + partitions_data = Enum.map(@partitions_data, &Map.update(&1, :timestamo, 100)) + topics = [{"test_topic", partitions_data}] + + request = ListOffsetsRequest.build_request(%V0.Request{}, topics: topics, replica_id: 2) + + assert request |> attach_client_data() |> V0.Request.serialize() + + assert request == %V0.Request{ + client_id: nil, + correlation_id: nil, + replica_id: 2, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: 100, max_num_offsets: 1}, + %{partition: 2, timestamp: 100, max_num_offsets: 1}, + %{partition: 3, timestamp: 100, max_num_offsets: 1} + ] + } + ] + } + end + + test "for v1 - it builds a list offsets request with defaults" do + topics = [{"test_topic", @partitions_data}] + + request = ListOffsetsRequest.build_request(%V1.Request{}, topics: topics) + + assert request |> attach_client_data() |> V1.Request.serialize() + + assert request == %V1.Request{ + client_id: nil, + correlation_id: nil, + replica_id: -1, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: -1}, + %{partition: 2, timestamp: -1}, + %{partition: 3, timestamp: -1} + ] + } + ] + } + end + + test "for v1 - it builds a list offsets request with customs" do + partitions_data = Enum.map(@partitions_data, &Map.update(&1, :timestamo, 100)) + topics = [{"test_topic", partitions_data}] + + request = ListOffsetsRequest.build_request(%V1.Request{}, topics: topics, replica_id: 2) + + assert request |> attach_client_data() |> V1.Request.serialize() + + assert request == %V1.Request{ + client_id: nil, + correlation_id: nil, + replica_id: 2, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: 100}, + %{partition: 2, timestamp: 100}, + %{partition: 3, timestamp: 100} + ] + } + ] + } + end + + test "for v2 - it builds a list offsets request with defaults" do + topics = [{"test_topic", @partitions_data}] + + request = ListOffsetsRequest.build_request(%V2.Request{}, topics: topics) + + assert request |> attach_client_data() |> V2.Request.serialize() + + assert request == %V2.Request{ + client_id: nil, + correlation_id: nil, + replica_id: -1, + isolation_level: 0, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: -1}, + %{partition: 2, timestamp: -1}, + %{partition: 3, timestamp: -1} + ] + } + ] + } + end + + test "for v2 - it builds a list offsets request with customs" do + partitions_data = Enum.map(@partitions_data, &Map.update(&1, :timestamo, 100)) + topics = [{"test_topic", partitions_data}] + + request = + ListOffsetsRequest.build_request(%V2.Request{}, + topics: topics, + replica_id: 2, + isolation_level: :read_commited + ) + + assert request |> attach_client_data() |> V2.Request.serialize() + + assert request == %V2.Request{ + client_id: nil, + correlation_id: nil, + replica_id: 2, + isolation_level: 1, + topics: [ + %{ + topic: "test_topic", + partitions: [ + %{partition: 1, timestamp: 100}, + %{partition: 2, timestamp: 100}, + %{partition: 3, timestamp: 100} + ] + } + ] + } + end + end + + defp attach_client_data(request) do + request + |> Map.put(:client_id, "test") + |> Map.put(:correlation_id, 123) + end +end diff --git a/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs b/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs new file mode 100644 index 00000000..4d14cfc1 --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/list_offsets/response_test.exs @@ -0,0 +1,126 @@ +defmodule KafkaEx.New.Protocols.Kayrock.ListOffsets.ResponseTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, as: ListOffsetsResponse + + alias Kayrock.ListOffsets.V0 + alias Kayrock.ListOffsets.V1 + alias Kayrock.ListOffsets.V2 + + describe "parse_response/1" do + @expected_offset %KafkaEx.New.Structs.Offset{ + topic: "test-topic", + partition: 1, + offset: 0, + timestamp: nil + } + + test "for api version 0 - returns response if all groups succeeded" do + response = %V0.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [%{partition: 1, error_code: 0, offsets: [0]}] + } + ], + correlation_id: 3 + } + + assert {:ok, [@expected_offset]} == ListOffsetsResponse.parse_response(response) + end + + test "for api version 0 - returns error if any group failed" do + response = %V0.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [%{partition: 0, error_code: 11, offsets: []}] + } + ], + correlation_id: 3 + } + + expected_error = %KafkaEx.New.Structs.Error{ + error: :stale_controller_epoch, + metadata: %{partition: 0, topic: "test-topic"} + } + + assert {:error, expected_error} == ListOffsetsResponse.parse_response(response) + end + + test "for api version 1 - returns response if all groups succeeded" do + response = %V1.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [ + %{offset: 0, partition: 1, error_code: 0} + ] + } + ], + correlation_id: 3 + } + + assert {:ok, [@expected_offset]} == ListOffsetsResponse.parse_response(response) + end + + test "for api version 1 - returns error if any group failed" do + response = %V1.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [ + %{offset: 0, partition: 1, error_code: 1, timestamps: 1_000_000_000_000} + ] + } + ], + correlation_id: 3 + } + + expected_error = %KafkaEx.New.Structs.Error{ + error: :offset_out_of_range, + metadata: %{partition: 1, topic: "test-topic"} + } + + assert {:error, expected_error} == ListOffsetsResponse.parse_response(response) + end + + test "for api version 2 - returns response if all groups succeeded" do + response = %V2.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [ + %{offset: 0, partition: 1, error_code: 0, timestamp: 1_000_000_000_000} + ] + } + ], + correlation_id: 3 + } + + result_with_timestamp = Map.put(@expected_offset, :timestamp, 1_000_000_000_000) + assert {:ok, [result_with_timestamp]} == ListOffsetsResponse.parse_response(response) + end + + test "for api version 2 - returns error if any group failed" do + response = %V2.Response{ + responses: [ + %{ + topic: "test-topic", + partition_responses: [ + %{offset: 0, partition: 1, error_code: 1} + ] + } + ], + correlation_id: 3 + } + + expected_error = %KafkaEx.New.Structs.Error{ + error: :offset_out_of_range, + metadata: %{partition: 1, topic: "test-topic"} + } + + assert {:error, expected_error} == ListOffsetsResponse.parse_response(response) + end + end +end diff --git a/test/kafka_ex/protocol/consumer_metadata_test.exs b/test/kafka_ex/protocol/consumer_metadata_test.exs index a9febd2d..768fe471 100644 --- a/test/kafka_ex/protocol/consumer_metadata_test.exs +++ b/test/kafka_ex/protocol/consumer_metadata_test.exs @@ -9,8 +9,7 @@ defmodule KafkaEx.Protocol.ConsumerMetadata.Test do test "parse_response correctly parses a valid response" do response = - <<0, 0, 156, 65, 0, 0, 0, 0, 192, 6, 0, 14, 49, 57, 50, 46, 49, 54, 56, 46, 53, 57, 46, 49, - 48, 51, 0, 0, 192, 6>> + <<0, 0, 156, 65, 0, 0, 0, 0, 192, 6, 0, 14, 49, 57, 50, 46, 49, 54, 56, 46, 53, 57, 46, 49, 48, 51, 0, 0, 192, 6>> assert KafkaEx.Protocol.ConsumerMetadata.parse_response(response) == %KafkaEx.Protocol.ConsumerMetadata.Response{ diff --git a/test/kafka_ex/protocol/delete_topics_test.exs b/test/kafka_ex/protocol/delete_topics_test.exs index 32924731..f67b6c7b 100644 --- a/test/kafka_ex/protocol/delete_topics_test.exs +++ b/test/kafka_ex/protocol/delete_topics_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do describe "create_request/4" do test "creates a request to delete a single topic" do expected_request = - <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, 6::16, - "topic1"::binary, 100::32-signed>> + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, 6::16, "topic1"::binary, + 100::32-signed>> delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ topics: ["topic1"], @@ -25,8 +25,8 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do test "creates a request to delete a multiple topic" do expected_response = - <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, 6::16, - "topic3"::binary, 6::16, "topic2"::binary, 6::16, "topic1"::binary, 100::32-signed>> + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, 6::16, "topic3"::binary, 6::16, + "topic2"::binary, 6::16, "topic1"::binary, 100::32-signed>> delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ topics: ["topic1", "topic2", "topic3"], diff --git a/test/kafka_ex/protocol/fetch_test.exs b/test/kafka_ex/protocol/fetch_test.exs index c4140027..83572364 100644 --- a/test/kafka_ex/protocol/fetch_test.exs +++ b/test/kafka_ex/protocol/fetch_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "create_request creates a valid fetch request" do good_request = - <<1::16, 0::16, 1::32, 3::16, "foo"::binary, -1::32, 10::32, 1::32, 1::32, 3::16, - "bar"::binary, 1::32, 0::32, 1::64, 10000::32>> + <<1::16, 0::16, 1::32, 3::16, "foo"::binary, -1::32, 10::32, 1::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, + 1::64, 10000::32>> fetch_request = %KafkaEx.Protocol.Fetch.Request{ correlation_id: 1, @@ -29,8 +29,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do partition = 0 response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 32::32, 1::64, 20::32, - 0::32, 0::8, 0::8, 3::32, key::binary, 3::32, value::binary>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 32::32, 1::64, 20::32, 0::32, 0::8, 0::8, + 3::32, key::binary, 3::32, value::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -62,12 +62,11 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a response with excess bytes" do response = - <<0, 0, 0, 1, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 56, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, - 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 17, - 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, - 0, 2, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, - 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 17, 254>> + <<0, 0, 0, 1, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 56, 0, 0, + 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, + 121, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, + 121, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, + 121, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 17, 254>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -120,8 +119,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do value = "bar" response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, - 0::32, 0::8, 0::8, -1::32, 3::32, value::binary>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, + -1::32, 3::32, value::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -156,8 +155,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do key = "foo" response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, - 0::32, 0::8, 0::8, 3::32, key::binary, -1::32>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, + 3::32, key::binary, -1::32>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -189,9 +188,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with multiple messages" do response = - <<0::32, 1::32, 3::16, "foo"::binary, 1::32, 0::32, 0::16, 10::64, 58::32, 1::64, 17::32, - 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 2::64, 17::32, 0::32, 0::8, 0::8, -1::32, - 3::32, "baz"::binary>> + <<0::32, 1::32, 3::16, "foo"::binary, 1::32, 0::32, 0::16, 10::64, 58::32, 1::64, 17::32, 0::32, 0::8, 0::8, + -1::32, 3::32, "bar"::binary, 2::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "baz"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -234,9 +232,9 @@ defmodule KafkaEx.Protocol.Fetch.Test do topic = "foo" response = - <<0::32, 1::32, 3::16, topic::binary, 2::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, - 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 1::32, 0::16, 10::64, 29::32, 1::64, - 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "baz"::binary>> + <<0::32, 1::32, 3::16, topic::binary, 2::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, + -1::32, 3::32, "bar"::binary, 1::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, + "baz"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -285,9 +283,9 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with multiple topics" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, - 0::32, 0::8, 0::8, -1::32, 3::32, "foo"::binary, 3::16, "baz"::binary, 1::32, 0::32, - 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary>> + <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, + -1::32, 3::32, "foo"::binary, 3::16, "baz"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, + 0::8, 0::8, -1::32, 3::32, "bar"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -341,11 +339,10 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with a gzip-encoded message" do response = - <<0, 0, 0, 4, 0, 0, 0, 1, 0, 9, 103, 122, 105, 112, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 74, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 62, - 38, 244, 178, 37, 0, 1, 255, 255, 255, 255, 0, 0, 0, 48, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, - 99, 96, 128, 3, 169, 101, 15, 206, 246, 50, 48, 252, 7, 2, 32, 143, 167, 36, 181, 184, 68, - 33, 55, 181, 184, 56, 49, 61, 21, 0, 10, 31, 112, 82, 38, 0, 0, 0>> + <<0, 0, 0, 4, 0, 0, 0, 1, 0, 9, 103, 122, 105, 112, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 1, 0, 0, 0, 74, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 62, 38, 244, 178, 37, 0, 1, 255, 255, 255, 255, 0, + 0, 0, 48, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 169, 101, 15, 206, 246, 50, 48, 252, 7, 2, 32, 143, + 167, 36, 181, 184, 68, 33, 55, 181, 184, 56, 49, 61, 21, 0, 10, 31, 112, 82, 38, 0, 0, 0>> topic = "gzip_test" @@ -379,16 +376,14 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with batched gzip-encoded messages" do response = - <<0, 0, 0, 3, 0, 0, 0, 1, 0, 15, 103, 122, 105, 112, 95, 98, 97, 116, 99, 104, 95, 116, 101, - 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 180, 0, 0, 0, 0, - 0, 0, 0, 1, 0, 0, 0, 74, 112, 213, 163, 157, 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, - 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 169, 119, 54, 19, 103, 51, 48, 252, 7, 2, 32, - 143, 39, 41, 177, 36, 57, 67, 161, 36, 181, 184, 68, 193, 16, 170, 130, 17, 164, 170, 220, - 244, 128, 34, 86, 85, 70, 0, 83, 29, 3, 53, 76, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, - 82, 59, 149, 134, 225, 0, 1, 255, 255, 255, 255, 0, 0, 0, 68, 31, 139, 8, 0, 0, 0, 0, 0, - 0, 0, 99, 96, 0, 3, 38, 32, 150, 59, 147, 154, 199, 4, 230, 177, 100, 167, 86, 26, 2, 105, - 158, 164, 196, 146, 228, 12, 133, 146, 212, 226, 18, 5, 67, 136, 66, 6, 102, 144, 74, 182, - 111, 41, 54, 112, 149, 70, 104, 42, 141, 0, 135, 95, 114, 164, 84, 0, 0, 0>> + <<0, 0, 0, 3, 0, 0, 0, 1, 0, 15, 103, 122, 105, 112, 95, 98, 97, 116, 99, 104, 95, 116, 101, 115, 116, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 180, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 74, 112, 213, 163, 157, + 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 169, 119, 54, 19, 103, + 51, 48, 252, 7, 2, 32, 143, 39, 41, 177, 36, 57, 67, 161, 36, 181, 184, 68, 193, 16, 170, 130, 17, 164, 170, + 220, 244, 128, 34, 86, 85, 70, 0, 83, 29, 3, 53, 76, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 82, 59, 149, 134, + 225, 0, 1, 255, 255, 255, 255, 0, 0, 0, 68, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 0, 3, 38, 32, 150, 59, 147, + 154, 199, 4, 230, 177, 100, 167, 86, 26, 2, 105, 158, 164, 196, 146, 228, 12, 133, 146, 212, 226, 18, 5, 67, + 136, 66, 6, 102, 144, 74, 182, 111, 41, 54, 112, 149, 70, 104, 42, 141, 0, 135, 95, 114, 164, 84, 0, 0, 0>> topic = "gzip_batch_test" partition_id = 0 @@ -453,12 +448,11 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with a snappy-encoded message" do response = - <<0, 0, 0, 8, 0, 0, 0, 1, 0, 11, 115, 110, 97, 112, 112, 121, 95, 116, 101, 115, 116, 0, 0, - 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, - 0, 71, 183, 227, 95, 48, 0, 2, 255, 255, 255, 255, 0, 0, 0, 57, 130, 83, 78, 65, 80, 80, - 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 37, 38, 0, 0, 9, 1, 120, 1, 0, 0, 0, 26, 166, 224, - 205, 141, 0, 0, 255, 255, 255, 255, 0, 0, 0, 12, 116, 101, 115, 116, 32, 109, 101, 115, - 115, 97, 103, 101>> + <<0, 0, 0, 8, 0, 0, 0, 1, 0, 11, 115, 110, 97, 112, 112, 121, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 1, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 71, 183, 227, 95, 48, 0, 2, 255, 255, + 255, 255, 0, 0, 0, 57, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 37, 38, 0, 0, 9, 1, 120, + 1, 0, 0, 0, 26, 166, 224, 205, 141, 0, 0, 255, 255, 255, 255, 0, 0, 0, 12, 116, 101, 115, 116, 32, 109, 101, + 115, 115, 97, 103, 101>> value = "test message" @@ -496,13 +490,12 @@ defmodule KafkaEx.Protocol.Fetch.Test do partition_id = 0 response = - <<0, 0, 0, 14, 0, 0, 0, 1, 0, 17, 115, 110, 97, 112, 112, 121, 95, 98, 97, 116, 99, 104, 95, - 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 105, 0, - 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 93, 70, 199, 142, 116, 0, 2, 255, 255, 255, 255, 0, 0, 0, - 79, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 59, 84, 0, 0, 25, 1, - 16, 30, 204, 101, 110, 2, 5, 15, 76, 4, 107, 101, 121, 49, 0, 0, 0, 12, 98, 97, 116, 99, - 104, 32, 116, 101, 115, 116, 32, 1, 16, 1, 1, 32, 1, 0, 0, 0, 30, 6, 246, 100, 60, 1, 13, - 5, 42, 0, 50, 58, 42, 0, 0, 50>> + <<0, 0, 0, 14, 0, 0, 0, 1, 0, 17, 115, 110, 97, 112, 112, 121, 95, 98, 97, 116, 99, 104, 95, 116, 101, 115, 116, + 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 105, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 93, 70, + 199, 142, 116, 0, 2, 255, 255, 255, 255, 0, 0, 0, 79, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, + 0, 0, 59, 84, 0, 0, 25, 1, 16, 30, 204, 101, 110, 2, 5, 15, 76, 4, 107, 101, 121, 49, 0, 0, 0, 12, 98, 97, 116, + 99, 104, 32, 116, 101, 115, 116, 32, 1, 16, 1, 1, 32, 1, 0, 0, 0, 30, 6, 246, 100, 60, 1, 13, 5, 42, 0, 50, 58, + 42, 0, 0, 50>> topic = "snappy_batch_test" diff --git a/test/kafka_ex/protocol/metadata_test.exs b/test/kafka_ex/protocol/metadata_test.exs index ca2c77a8..070b23c6 100644 --- a/test/kafka_ex/protocol/metadata_test.exs +++ b/test/kafka_ex/protocol/metadata_test.exs @@ -16,8 +16,8 @@ defmodule KafkaEx.Protocol.Metadata.Test do test "create_request with a multiple topics creates a valid metadata request" do good_request = - <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 3::32, 3::16, "bar"::binary, 3::16, - "baz"::binary, 4::16, "food"::binary>> + <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 3::32, 3::16, "bar"::binary, 3::16, "baz"::binary, 4::16, + "food"::binary>> request = KafkaEx.Protocol.Metadata.create_request(1, "foo", ["bar", "baz", "food"]) @@ -26,8 +26,8 @@ defmodule KafkaEx.Protocol.Metadata.Test do test "parse_response correctly parses a valid response" do response = - <<0::32, 1::32, 0::32, 3::16, "foo"::binary, 9092::32, 1::32, 0::16, 3::16, "bar"::binary, - 1::32, 0::16, 0::32, 0::32, 0::32, 1::32, 0::32>> + <<0::32, 1::32, 0::32, 3::16, "foo"::binary, 9092::32, 1::32, 0::16, 3::16, "bar"::binary, 1::32, 0::16, 0::32, + 0::32, 0::32, 1::32, 0::32>> expected_response = %KafkaEx.Protocol.Metadata.Response{ brokers: [ diff --git a/test/kafka_ex/protocol/offset_commit_test.exs b/test/kafka_ex/protocol/offset_commit_test.exs index 2f55e142..4587d9b1 100644 --- a/test/kafka_ex/protocol/offset_commit_test.exs +++ b/test/kafka_ex/protocol/offset_commit_test.exs @@ -14,8 +14,8 @@ defmodule KafkaEx.Protocol.OffsetCommit.Test do } good_request = - <<8::16, 0::16, corr_id::32, byte_size(client_id)::16, client_id::binary, 3::16, "bar", - 1::32, 3::16, "foo", 1::32, 0::32, 10::64, 3::16, "baz">> + <<8::16, 0::16, corr_id::32, byte_size(client_id)::16, client_id::binary, 3::16, "bar", 1::32, 3::16, "foo", + 1::32, 0::32, 10::64, 3::16, "baz">> request = KafkaEx.Protocol.OffsetCommit.create_request( diff --git a/test/kafka_ex/protocol/offset_fetch_test.exs b/test/kafka_ex/protocol/offset_fetch_test.exs index c56cfde2..b470e565 100644 --- a/test/kafka_ex/protocol/offset_fetch_test.exs +++ b/test/kafka_ex/protocol/offset_fetch_test.exs @@ -12,8 +12,8 @@ defmodule KafkaEx.Protocol.OffsetFetch.Test do } good_request = - <<9::16, 0::16, 3::32, 8::16, "kafka_ex"::binary, 3::16, "bar"::binary, 1::32, 3::16, - "foo"::binary, 1::32, 0::32>> + <<9::16, 0::16, 3::32, 8::16, "kafka_ex"::binary, 3::16, "bar"::binary, 1::32, 3::16, "foo"::binary, 1::32, + 0::32>> request = KafkaEx.Protocol.OffsetFetch.create_request( @@ -27,8 +27,8 @@ defmodule KafkaEx.Protocol.OffsetFetch.Test do test "parse_response correctly parses a valid response" do response = - <<0, 0, 156, 66, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 9, 0, 0, 0, 0>> + <<0, 0, 156, 66, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, + 0>> assert KafkaEx.Protocol.OffsetFetch.parse_response(response) == [ %KafkaEx.Protocol.OffsetFetch.Response{ diff --git a/test/kafka_ex/protocol/offset_test.exs b/test/kafka_ex/protocol/offset_test.exs index 505a12f8..bd5d61d1 100644 --- a/test/kafka_ex/protocol/offset_test.exs +++ b/test/kafka_ex/protocol/offset_test.exs @@ -33,8 +33,7 @@ defmodule KafkaEx.Protocol.Offset.Test do test "parse_response correctly parses a valid response with multiple partitions" do response = - <<0::32, 1::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 1::32, 10::64, 1::32, 0::16, - 1::32, 20::64>> + <<0::32, 1::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 1::32, 10::64, 1::32, 0::16, 1::32, 20::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ @@ -51,8 +50,8 @@ defmodule KafkaEx.Protocol.Offset.Test do test "parse_response correctly parses a valid response with multiple topics" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64, 3::16, - "baz"::binary, 1::32, 0::32, 0::16, 1::32, 20::64>> + <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64, 3::16, "baz"::binary, 1::32, 0::32, + 0::16, 1::32, 20::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ diff --git a/test/kafka_ex/protocol/produce_test.exs b/test/kafka_ex/protocol/produce_test.exs index d0feddd9..bb7cb553 100644 --- a/test/kafka_ex/protocol/produce_test.exs +++ b/test/kafka_ex/protocol/produce_test.exs @@ -3,9 +3,9 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request creates a valid payload" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, - 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 29, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, - 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 29, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, + 104, 101, 121>> request = KafkaEx.Protocol.Produce.create_request( @@ -28,11 +28,10 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request correctly batches multiple request messages" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, - 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 88, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, - 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 16, 225, 27, 42, 82, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 104, 105, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 19, 119, 44, 195, 207, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 88, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, + 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 225, 27, 42, 82, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 104, 105, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 119, 44, 195, 207, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111>> request = KafkaEx.Protocol.Produce.create_request( @@ -57,14 +56,12 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request correctly encodes messages with gzip" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, - 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, - 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, - 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 79, 44, 46, 209, 0, 1, 255, - 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 3, 99, 96, 128, 3, 153, 135, - 115, 4, 255, 131, 89, 172, 217, 169, 149, 10, 137, 64, 6, 103, 110, 106, 113, 113, 98, - 122, 42, 152, 3, 87, 199, 242, 37, 117, 30, 66, 93, 18, 178, 186, 36, 0, 127, 205, 212, - 97, 80, 0, 0, 0>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, 99, 108, 105, 101, 110, + 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, + 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 79, + 44, 46, 209, 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 3, 99, 96, 128, 3, 153, 135, + 115, 4, 255, 131, 89, 172, 217, 169, 149, 10, 137, 64, 6, 103, 110, 106, 113, 113, 98, 122, 42, 152, 3, 87, 199, + 242, 37, 117, 30, 66, 93, 18, 178, 186, 36, 0, 127, 205, 212, 97, 80, 0, 0, 0>> client_id = "compression_client_test" topic = "compressed_topic" @@ -101,8 +98,7 @@ defmodule KafkaEx.Protocol.Produce.Test do post_crc_header_size = 10 <> = request + post_crc_header::binary-size(post_crc_header_size), compressed_message_set::binary>> = request <> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, 99, 108, 105, 101, 110, + 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, + 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 67, + 64, 33, 66, 0, 2, 255, 255, 255, 255, 0, 0, 0, 60, 80, 0, 0, 25, 1, 16, 28, 225, 156, 17, 255, 5, 15, 64, 5, + 107, 101, 121, 32, 97, 0, 0, 0, 9, 109, 101, 115, 115, 97, 103, 101, 5, 13, 17, 1, 16, 28, 4, 244, 101, 158, 5, + 13, 5, 40, 52, 98, 0, 0, 0, 9, 109, 101, 115, 115, 97, 103, 101, 32, 98>> produce = %KafkaEx.Protocol.Produce.Request{ topic: "compressed_topic", @@ -167,8 +161,8 @@ defmodule KafkaEx.Protocol.Produce.Test do test "parse_response correctly parses a valid response with multiple topics and partitions" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 10::64, 1::32, 0::16, 20::64, - 3::16, "baz"::binary, 2::32, 0::32, 0::16, 30::64, 1::32, 0::16, 40::64>> + <<0::32, 2::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 10::64, 1::32, 0::16, 20::64, 3::16, "baz"::binary, + 2::32, 0::32, 0::16, 30::64, 1::32, 0::16, 40::64>> expected_response = [ %KafkaEx.Protocol.Produce.Response{ diff --git a/test/support/integration_helpers.ex b/test/support/integration_helpers.ex new file mode 100644 index 00000000..cf33c11f --- /dev/null +++ b/test/support/integration_helpers.ex @@ -0,0 +1,29 @@ +defmodule KafkaEx.IntegrationHelpers do + @moduledoc false + + @doc """ + Creates basic topic + """ + def create_topic(client, topic_name, opts \\ []) do + KafkaEx.create_topics( + [ + %{ + topic: topic_name, + num_partitions: Keyword.get(opts, :partitions, 1), + replication_factor: 1, + replica_assignment: [], + config_entries: %{} + } + ], + timeout: 10_000, + worker_name: client + ) + end + + @doc """ + Produce messages to a given topic + """ + def partition_produce(client, topic_name, message, partition) do + KafkaEx.produce(topic_name, partition, message, worker_name: client) + end +end