From 57e6bce0154c56e32a10dd091cd7d4b32c206f07 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sun, 8 Oct 2023 08:43:27 +0200 Subject: [PATCH] [Maintenance] Use native formatter settings With todays screens size, there is no need to enforce 80 chars line lenght. Let's use native mix format settings --- .formatter.exs | 3 +- lib/kafka_ex.ex | 3 +- lib/kafka_ex/consumer_group/manager.ex | 6 +- lib/kafka_ex/default_partitioner.ex | 7 +- lib/kafka_ex/gen_consumer.ex | 6 +- lib/kafka_ex/legacy_partitioner.ex | 7 +- lib/kafka_ex/network_client.ex | 3 +- lib/kafka_ex/new/adapter.ex | 27 ++--- lib/kafka_ex/new/client.ex | 21 ++-- lib/kafka_ex/new/client/state.ex | 6 +- lib/kafka_ex/new/client_compatibility.ex | 9 +- lib/kafka_ex/new/structs/cluster_metadata.ex | 6 +- lib/kafka_ex/new/structs/topic.ex | 3 +- lib/kafka_ex/protocol.ex | 4 +- lib/kafka_ex/protocol/api_versions.ex | 7 +- lib/kafka_ex/protocol/common.ex | 7 +- lib/kafka_ex/protocol/create_topics.ex | 6 +- lib/kafka_ex/protocol/delete_topics.ex | 3 +- lib/kafka_ex/protocol/fetch.ex | 18 +-- lib/kafka_ex/protocol/heartbeat.ex | 4 +- lib/kafka_ex/protocol/join_group.ex | 16 +-- lib/kafka_ex/protocol/metadata.ex | 18 ++- lib/kafka_ex/protocol/offset.ex | 20 ++-- lib/kafka_ex/protocol/offset_commit.ex | 17 +-- lib/kafka_ex/protocol/offset_fetch.ex | 13 +- lib/kafka_ex/protocol/produce.ex | 19 ++- lib/kafka_ex/protocol/sync_group.ex | 21 ++-- lib/kafka_ex/server.ex | 17 +-- lib/kafka_ex/server_0_p_10_and_later.ex | 9 +- lib/kafka_ex/server_0_p_8_p_0.ex | 10 +- lib/kafka_ex/server_0_p_8_p_2.ex | 15 +-- lib/kafka_ex/server_0_p_9_p_0.ex | 9 +- mix.exs | 3 +- .../consumer_group_implementation_test.exs | 18 +-- test/integration/consumer_group_test.exs | 18 +-- test/integration/integration_test.exs | 24 ++-- .../compatibility_0_p_10_and_later_test.exs | 3 +- .../kayrock/compatibility_0_p_8_p_0_test.exs | 3 +- .../kayrock/compatibility_0_p_9_p_0_test.exs | 3 +- ...ity_consumer_group_implementation_test.exs | 12 +- .../compatibility_consumer_group_test.exs | 6 +- .../kayrock/compatibility_test.exs | 21 ++-- test/integration/kayrock/offset_test.exs | 3 +- .../integration/kayrock/record_batch_test.exs | 3 +- test/integration/kayrock/timestamp_test.exs | 3 +- test/integration/new_client_test.exs | 12 +- test/integration/server0_p_9_p_0_test.exs | 12 +- test/kafka_ex/compression_test.exs | 14 +-- test/kafka_ex/default_partitioner_test.exs | 9 +- test/kafka_ex/legacy_partitioner_test.exs | 9 +- test/kafka_ex/new/structs/broker_test.exs | 6 +- .../new/structs/cluster_metadata_test.exs | 15 +-- .../protocol/consumer_metadata_test.exs | 4 +- test/kafka_ex/protocol/delete_topics_test.exs | 9 +- test/kafka_ex/protocol/fetch_test.exs | 112 ++++++++---------- test/kafka_ex/protocol/leave_group_test.exs | 3 +- test/kafka_ex/protocol/metadata_test.exs | 14 +-- test/kafka_ex/protocol/offset_commit_test.exs | 7 +- test/kafka_ex/protocol/offset_fetch_test.exs | 8 +- test/kafka_ex/protocol/offset_test.exs | 15 +-- test/kafka_ex/protocol/produce_test.exs | 73 +++++------- test/kafka_ex/protocol/sync_group_test.exs | 6 +- test/test_helper.exs | 6 +- 63 files changed, 297 insertions(+), 497 deletions(-) diff --git a/.formatter.exs b/.formatter.exs index b5de7f5a..525446d4 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,5 +1,4 @@ # Used by "mix format" [ - inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"], - line_length: 80 + inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] ] diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index ff687779..ead61ca4 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -607,8 +607,7 @@ defmodule KafkaEx do KafkaEx.worker_setting() | {:server_impl, module} ]) :: GenServer.on_start() def start_link_worker(name, worker_init \\ []) do - {server_impl, worker_init} = - Keyword.pop(worker_init, :server_impl, Config.server_impl()) + {server_impl, worker_init} = Keyword.pop(worker_init, :server_impl, Config.server_impl()) {:ok, full_worker_init} = build_worker_options(worker_init) server_impl.start_link(full_worker_init, name) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 3cbf4873..06af7887 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -70,8 +70,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do GenServer.start_link( __MODULE__, - {{gen_consumer_module, consumer_module}, group_name, topics, - consumer_opts}, + {{gen_consumer_module, consumer_module}, group_name, topics, consumer_opts}, gen_server_opts ) end @@ -396,8 +395,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do member_id: member_id } - leave_group_response = - KafkaEx.leave_group(leave_request, worker_name: worker_name) + leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name) case leave_group_response do %{error_code: :no_error} -> diff --git a/lib/kafka_ex/default_partitioner.ex b/lib/kafka_ex/default_partitioner.ex index 3b61ecea..4b002b8c 100644 --- a/lib/kafka_ex/default_partitioner.ex +++ b/lib/kafka_ex/default_partitioner.ex @@ -33,9 +33,7 @@ defmodule KafkaEx.DefaultPartitioner do assign_partition_with_key(request, metadata, key) {:error, reason} -> - Logger.warn( - "#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}" - ) + Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}") assign_partition_randomly(request, metadata) end @@ -70,8 +68,7 @@ defmodule KafkaEx.DefaultPartitioner do ) do hash = Murmur.umurmur2(key) - partitions_count = - metadata |> MetadataResponse.partitions_for_topic(topic) |> length() + partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length() partition_id = rem(hash, partitions_count) %{request | partition: partition_id} diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 7a67ff88..d17d31b9 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -461,8 +461,7 @@ defmodule KafkaEx.GenConsumer do options ) :: GenServer.on_start() def start_link(consumer_module, group_name, topic, partition, opts \\ []) do - {server_opts, consumer_opts} = - Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt]) + {server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt]) GenServer.start_link( __MODULE__, @@ -563,8 +562,7 @@ defmodule KafkaEx.GenConsumer do given_fetch_options = Keyword.get(opts, :fetch_options, []) - fetch_options = - Keyword.merge(default_fetch_options, given_fetch_options) + fetch_options = Keyword.merge(default_fetch_options, given_fetch_options) state = %State{ consumer_module: consumer_module, diff --git a/lib/kafka_ex/legacy_partitioner.ex b/lib/kafka_ex/legacy_partitioner.ex index 65843e59..c3a3f5f6 100644 --- a/lib/kafka_ex/legacy_partitioner.ex +++ b/lib/kafka_ex/legacy_partitioner.ex @@ -34,9 +34,7 @@ defmodule KafkaEx.LegacyPartitioner do assign_partition_with_key(request, metadata, key) {:error, reason} -> - Logger.warn( - "#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}" - ) + Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}") assign_partition_randomly(request, metadata) end @@ -71,8 +69,7 @@ defmodule KafkaEx.LegacyPartitioner do ) do hash = Murmur.umurmur2_legacy(key) - partitions_count = - metadata |> MetadataResponse.partitions_for_topic(topic) |> length() + partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length() partition_id = rem(hash, partitions_count) %{request | partition: partition_id} diff --git a/lib/kafka_ex/network_client.ex b/lib/kafka_ex/network_client.ex index cd460e17..0c4d2d07 100644 --- a/lib/kafka_ex/network_client.ex +++ b/lib/kafka_ex/network_client.ex @@ -64,8 +64,7 @@ defmodule KafkaEx.NetworkClient do :ok -> case Socket.recv(socket, 0, timeout) do {:ok, data} -> - :ok = - Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}]) + :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}]) data diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 2afa7dc7..391013f2 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -199,8 +199,7 @@ defmodule KafkaEx.New.Adapter do | _ ] }) do - {message_set, last_offset} = - kayrock_message_set_to_kafka_ex(record_set, topic, partition) + {message_set, last_offset} = kayrock_message_set_to_kafka_ex(record_set, topic, partition) {[ %FetchResponse{ @@ -267,8 +266,7 @@ defmodule KafkaEx.New.Adapter do group_id: request.group_name, generation_id: request.generation_id, member_id: request.member_id, - group_assignment: - Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1) + group_assignment: Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1) }, request.group_name} end @@ -280,8 +278,7 @@ defmodule KafkaEx.New.Adapter do }) do %SyncGroupResponse{ error_code: Kayrock.ErrorCode.code_to_atom(error_code), - assignments: - Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end) + assignments: Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end) } end @@ -313,8 +310,7 @@ defmodule KafkaEx.New.Adapter do def create_topics_request(requests, timeout) do %Kayrock.CreateTopics.V0.Request{ timeout: timeout, - create_topic_requests: - Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1) + create_topic_requests: Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1) } end @@ -373,11 +369,9 @@ defmodule KafkaEx.New.Adapter do end def offset_fetch_request(offset_fetch_request, client_consumer_group) do - consumer_group = - offset_fetch_request.consumer_group || client_consumer_group + consumer_group = offset_fetch_request.consumer_group || client_consumer_group - request = - Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version) + request = Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version) {%{ request @@ -422,11 +416,9 @@ defmodule KafkaEx.New.Adapter do end def offset_commit_request(offset_commit_request, client_consumer_group) do - consumer_group = - offset_commit_request.consumer_group || client_consumer_group + consumer_group = offset_commit_request.consumer_group || client_consumer_group - request = - Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version) + request = Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version) request = %{ request @@ -518,8 +510,7 @@ defmodule KafkaEx.New.Adapter do topic: request.topic, num_partitions: request.num_partitions, replication_factor: request.replication_factor, - replica_assignment: - Enum.map(request.replica_assignment, &Map.from_struct/1), + replica_assignment: Enum.map(request.replica_assignment, &Map.from_struct/1), config_entries: Enum.map(request.config_entries, fn ce -> %{config_name: ce.config_name, config_value: ce.config_value} diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 50d12885..85600f0d 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -126,8 +126,7 @@ defmodule KafkaEx.New.Client do Kernel.reraise(e, System.stacktrace()) end - {:ok, _} = - :timer.send_interval(state.metadata_update_interval, :update_metadata) + {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) {:ok, state} end @@ -159,15 +158,13 @@ defmodule KafkaEx.New.Client do end def handle_call({:topic_metadata, topics, allow_topic_creation}, _from, state) do - {topic_metadata, updated_state} = - fetch_topics_metadata(state, topics, allow_topic_creation) + {topic_metadata, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation) {:reply, {:ok, topic_metadata}, updated_state} end def handle_call({:kayrock_request, request, node_selector}, _from, state) do - {response, updated_state} = - kayrock_network_request(request, node_selector, state) + {response, updated_state} = kayrock_network_request(request, node_selector, state) {:reply, response, updated_state} end @@ -220,8 +217,7 @@ defmodule KafkaEx.New.Client do updated_state _ -> - new_cluster_metadata = - ClusterMetadata.from_metadata_v1_response(response) + new_cluster_metadata = ClusterMetadata.from_metadata_v1_response(response) {updated_cluster_metadata, brokers_to_close} = ClusterMetadata.merge_brokers( @@ -345,8 +341,7 @@ defmodule KafkaEx.New.Client do end _ -> - message = - "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." + message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." Logger.log(:error, message) raise message @@ -549,8 +544,7 @@ defmodule KafkaEx.New.Client do end defp run_client_request( - %{client_id: client_id, correlation_id: correlation_id} = - client_request, + %{client_id: client_id, correlation_id: correlation_id} = client_request, send_request, synchronous ) @@ -694,8 +688,7 @@ defmodule KafkaEx.New.Client do topic_metadata = State.topics_metadata(updated_state, topics) - {topic_metadata, - %{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}} + {topic_metadata, %{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}} end defp close_broker_by_socket(state, socket) do diff --git a/lib/kafka_ex/new/client/state.ex b/lib/kafka_ex/new/client/state.ex index 61116655..fe8a2c52 100644 --- a/lib/kafka_ex/new/client/state.ex +++ b/lib/kafka_ex/new/client/state.ex @@ -41,8 +41,7 @@ defmodule KafkaEx.New.Client.State do :consumer_group_update_interval, @default_consumer_group_update_interval ), - allow_auto_topic_creation: - Keyword.get(args, :allow_auto_topic_creation, true), + allow_auto_topic_creation: Keyword.get(args, :allow_auto_topic_creation, true), use_ssl: Keyword.get(args, :use_ssl, false), ssl_options: Keyword.get(args, :ssl_options, []), consumer_group_for_auto_commit: Keyword.get(args, :consumer_group) @@ -99,8 +98,7 @@ defmodule KafkaEx.New.Client.State do ) do %{ state - | cluster_metadata: - ClusterMetadata.remove_topics(cluster_metadata, topics) + | cluster_metadata: ClusterMetadata.remove_topics(cluster_metadata, topics) } end diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index 5574c449..45239b08 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -29,8 +29,7 @@ defmodule KafkaEx.New.ClientCompatibility do def handle_call({:metadata, topic}, _from, state) do updated_state = update_metadata(state, [topic]) - {:reply, Adapter.metadata_response(updated_state.cluster_metadata), - updated_state} + {:reply, Adapter.metadata_response(updated_state.cluster_metadata), updated_state} end def handle_call({:offset, topic, partition, time}, _from, state) do @@ -141,8 +140,7 @@ defmodule KafkaEx.New.ClientCompatibility do {response, updated_state} end - {:reply, response, - %{state_out | allow_auto_topic_creation: allow_auto_topic_creation}} + {:reply, response, %{state_out | allow_auto_topic_creation: allow_auto_topic_creation}} end def handle_call({:join_group, request, network_timeout}, _from, state) do @@ -339,8 +337,7 @@ defmodule KafkaEx.New.ClientCompatibility do state _ -> - {_, updated_state} = - fetch_topics_metadata(state, topics, allow_topic_creation) + {_, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation) updated_state end diff --git a/lib/kafka_ex/new/structs/cluster_metadata.ex b/lib/kafka_ex/new/structs/cluster_metadata.ex index d2b276d7..17d9da58 100644 --- a/lib/kafka_ex/new/structs/cluster_metadata.ex +++ b/lib/kafka_ex/new/structs/cluster_metadata.ex @@ -146,8 +146,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do brokers = metadata.brokers |> Enum.into(%{}, fn broker_metadata -> - %{host: host, port: port, node_id: node_id, rack: rack} = - broker_metadata + %{host: host, port: port, node_id: node_id, rack: rack} = broker_metadata {node_id, %Broker{host: host, port: port, node_id: node_id, rack: rack}} end) @@ -240,8 +239,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do KafkaExAPI.node_id() ) :: t def put_consumer_group_coordinator( - %__MODULE__{consumer_group_coordinators: consumer_group_coordinators} = - cluster_metadata, + %__MODULE__{consumer_group_coordinators: consumer_group_coordinators} = cluster_metadata, consumer_group, coordinator_node_id ) do diff --git a/lib/kafka_ex/new/structs/topic.ex b/lib/kafka_ex/new/structs/topic.ex index 4457e62e..459c9bdf 100644 --- a/lib/kafka_ex/new/structs/topic.ex +++ b/lib/kafka_ex/new/structs/topic.ex @@ -32,8 +32,7 @@ defmodule KafkaEx.New.Structs.Topic do end ) - partitions = - Enum.map(partition_metadata, &Partition.from_partition_metadata/1) + partitions = Enum.map(partition_metadata, &Partition.from_partition_metadata/1) %__MODULE__{ name: name, diff --git a/lib/kafka_ex/protocol.ex b/lib/kafka_ex/protocol.ex index a3a0ed44..064ce7cd 100644 --- a/lib/kafka_ex/protocol.ex +++ b/lib/kafka_ex/protocol.ex @@ -34,8 +34,8 @@ defmodule KafkaEx.Protocol do client_id, api_version \\ @default_api_version ) do - <> + <> end @error_map %{ diff --git a/lib/kafka_ex/protocol/api_versions.ex b/lib/kafka_ex/protocol/api_versions.ex index fe447a6a..9f7ac586 100644 --- a/lib/kafka_ex/protocol/api_versions.ex +++ b/lib/kafka_ex/protocol/api_versions.ex @@ -62,8 +62,8 @@ defmodule KafkaEx.Protocol.ApiVersions do def parse_response(binary, this_api_version \\ @default_this_api_version) def parse_response( - <<_correlation_id::32-signed, error_code::16-signed, - api_versions_count::32-signed, rest::binary>>, + <<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed, + rest::binary>>, this_api_version ) do %{ @@ -100,8 +100,7 @@ defmodule KafkaEx.Protocol.ApiVersions do end defp parse_one_api_version( - <> + <> ) do {%ApiVersion{ api_key: api_key, diff --git a/lib/kafka_ex/protocol/common.ex b/lib/kafka_ex/protocol/common.ex index 7f4d6e22..7725fc8b 100644 --- a/lib/kafka_ex/protocol/common.ex +++ b/lib/kafka_ex/protocol/common.ex @@ -17,14 +17,13 @@ defmodule KafkaEx.Protocol.Common do def parse_topics( topics_size, - <>, + <>, mod ) do struct_module = Module.concat(mod, Response) - {partitions, topics_data} = - mod.parse_partitions(partitions_size, rest, [], topic) + {partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic) [ %{ diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index ad00fbfc..bde40e7c 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -106,8 +106,7 @@ defmodule KafkaEx.Protocol.CreateTopics do @spec encode_topic_request(TopicRequest.t()) :: binary defp encode_topic_request(request) do encode_string(request.topic) <> - <> <> + <> <> encode_replica_assignments(request.replica_assignment) <> encode_config_entries(request.config_entries) end @@ -135,8 +134,7 @@ defmodule KafkaEx.Protocol.CreateTopics do @spec parse_response(binary, integer) :: [] | Response.t() def parse_response( - <<_correlation_id::32-signed, topic_errors_count::32-signed, - topic_errors::binary>>, + <<_correlation_id::32-signed, topic_errors_count::32-signed, topic_errors::binary>>, 0 ) do %Response{ diff --git a/lib/kafka_ex/protocol/delete_topics.ex b/lib/kafka_ex/protocol/delete_topics.ex index 1908d2d1..0e960999 100644 --- a/lib/kafka_ex/protocol/delete_topics.ex +++ b/lib/kafka_ex/protocol/delete_topics.ex @@ -67,8 +67,7 @@ defmodule KafkaEx.Protocol.DeleteTopics do @spec parse_response(binary, integer) :: [] | Response.t() def parse_response( - <<_correlation_id::32-signed, topic_errors_count::32-signed, - topic_errors::binary>>, + <<_correlation_id::32-signed, topic_errors_count::32-signed, topic_errors::binary>>, 0 ) do %Response{ diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 6c27ae49..a896ee90 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -98,9 +98,7 @@ defmodule KafkaEx.Protocol.Fetch do ] end - def parse_response( - <<_correlation_id::32-signed, topics_size::32-signed, rest::binary>> - ) do + def parse_response(<<_correlation_id::32-signed, topics_size::32-signed, rest::binary>>) do parse_topics(topics_size, rest, __MODULE__) end @@ -108,14 +106,12 @@ defmodule KafkaEx.Protocol.Fetch do def parse_partitions( partitions_size, - <>, + <>, partitions, topic ) do - {:ok, message_set, last_offset} = - parse_message_set([], msg_set_data, topic, partition) + {:ok, message_set, last_offset} = parse_message_set([], msg_set_data, topic, partition) parse_partitions( partitions_size - 1, @@ -140,8 +136,7 @@ defmodule KafkaEx.Protocol.Fetch do defp parse_message_set( list, - <>, + <>, topic, partition ) do @@ -200,8 +195,7 @@ defmodule KafkaEx.Protocol.Fetch do <<-1::32-signed, value_size::32, value::size(value_size)-binary>> = rest decompressed = Compression.decompress(attributes, value) - {:ok, msg_set, _offset} = - parse_message_set([], decompressed, topic, partition) + {:ok, msg_set, _offset} = parse_message_set([], decompressed, topic, partition) {:ok, msg_set} end diff --git a/lib/kafka_ex/protocol/heartbeat.ex b/lib/kafka_ex/protocol/heartbeat.ex index cb1c941e..f6f3dd72 100644 --- a/lib/kafka_ex/protocol/heartbeat.ex +++ b/lib/kafka_ex/protocol/heartbeat.ex @@ -27,8 +27,8 @@ defmodule KafkaEx.Protocol.Heartbeat do [ KafkaEx.Protocol.create_request(:heartbeat, correlation_id, client_id), <> + request.generation_id::32-signed, byte_size(request.member_id)::16-signed, + request.member_id::binary>> ] end diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index c1894f19..b07a2711 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -47,8 +47,7 @@ defmodule KafkaEx.Protocol.JoinGroup do @spec create_request(integer, binary, Request.t()) :: iodata def create_request(correlation_id, client_id, %Request{} = join_group_req) do metadata = [ - <<@metadata_version::16-signed, - length(join_group_req.topics)::32-signed>>, + <<@metadata_version::16-signed, length(join_group_req.topics)::32-signed>>, topic_data(join_group_req.topics), <<0::32-signed>> ] @@ -75,12 +74,10 @@ defmodule KafkaEx.Protocol.JoinGroup do @spec parse_response(binary) :: Response.t() def parse_response( - <<_correlation_id::32-signed, error_code::16-signed, - generation_id::32-signed, protocol_len::16-signed, - _protocol::size(protocol_len)-binary, leader_len::16-signed, + <<_correlation_id::32-signed, error_code::16-signed, generation_id::32-signed, + protocol_len::16-signed, _protocol::size(protocol_len)-binary, leader_len::16-signed, leader::size(leader_len)-binary, member_id_len::16-signed, - member_id::size(member_id_len)-binary, members_size::32-signed, - rest::binary>> + member_id::size(member_id_len)-binary, members_size::32-signed, rest::binary>> ) do members = parse_members(members_size, rest, []) @@ -97,9 +94,8 @@ defmodule KafkaEx.Protocol.JoinGroup do defp parse_members( size, - <>, + <>, members ) do parse_members(size - 1, rest, [member | members]) diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index 3d731ece..b9dd22e5 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -230,8 +230,8 @@ defmodule KafkaEx.Protocol.Metadata do defp parse_brokers( 0, brokers_size, - <>, + <>, brokers ) do parse_brokers(0, brokers_size - 1, rest, [ @@ -283,12 +283,10 @@ defmodule KafkaEx.Protocol.Metadata do defp parse_topic_metadatas( topic_metadatas_size, - <> + <> ) do - {partition_metadatas, rest} = - parse_partition_metadatas(partition_metadatas_size, [], rest) + {partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest) [ %TopicMetadata{ @@ -314,8 +312,7 @@ defmodule KafkaEx.Protocol.Metadata do rest::binary >> ) do - {partition_metadatas, rest} = - parse_partition_metadatas(partition_metadatas_size, [], rest) + {partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest) [ %TopicMetadata{ @@ -334,8 +331,7 @@ defmodule KafkaEx.Protocol.Metadata do defp parse_partition_metadatas( partition_metadatas_size, partition_metadatas, - <> + <> ) do {replicas, rest} = parse_replicas(rest) {isrs, rest} = parse_isrs(rest) diff --git a/lib/kafka_ex/protocol/offset.ex b/lib/kafka_ex/protocol/offset.ex index 74b1f982..fff72ed2 100644 --- a/lib/kafka_ex/protocol/offset.ex +++ b/lib/kafka_ex/protocol/offset.ex @@ -37,15 +37,13 @@ defmodule KafkaEx.Protocol.Offset do def create_request(correlation_id, client_id, topic, partition, time) do [ KafkaEx.Protocol.create_request(:offset, correlation_id, client_id), - <<-1::32-signed, 1::32-signed, byte_size(topic)::16-signed, topic::binary, - 1::32-signed, partition::32-signed, parse_time(time)::64, 1::32>> + <<-1::32-signed, 1::32-signed, byte_size(topic)::16-signed, topic::binary, 1::32-signed, + partition::32-signed, parse_time(time)::64, 1::32>> ] end - def parse_response( - <<_correlation_id::32-signed, num_topics::32-signed, rest::binary>> - ), - do: parse_topics(num_topics, rest) + def parse_response(<<_correlation_id::32-signed, num_topics::32-signed, rest::binary>>), + do: parse_topics(num_topics, rest) @spec parse_time(:latest | :earliest | :calendar.datetime()) :: integer def parse_time(:latest), do: -1 @@ -55,8 +53,7 @@ defmodule KafkaEx.Protocol.Offset do def parse_time(time) do current_time_in_seconds = :calendar.datetime_to_gregorian_seconds(time) - unix_epoch_in_seconds = - :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + unix_epoch_in_seconds = :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) (current_time_in_seconds - unix_epoch_in_seconds) * 1000 end @@ -65,8 +62,8 @@ defmodule KafkaEx.Protocol.Offset do defp parse_topics( topics_size, - <> + <> ) do {partitions, topics_data} = parse_partitions(partitions_size, rest) @@ -82,8 +79,7 @@ defmodule KafkaEx.Protocol.Offset do defp parse_partitions( partitions_size, - <>, + <>, partitions ) do {offsets, rest} = parse_offsets(offsets_size, rest) diff --git a/lib/kafka_ex/protocol/offset_commit.ex b/lib/kafka_ex/protocol/offset_commit.ex index a361fdcd..50e8a786 100644 --- a/lib/kafka_ex/protocol/offset_commit.ex +++ b/lib/kafka_ex/protocol/offset_commit.ex @@ -42,20 +42,15 @@ defmodule KafkaEx.Protocol.OffsetCommit do Protocol.create_request(:offset_commit, correlation_id, client_id), <> ] end @spec parse_response(binary) :: [] | [Response.t()] - def parse_response( - <<_correlation_id::32-signed, topics_count::32-signed, - topics_data::binary>> - ) do + def parse_response(<<_correlation_id::32-signed, topics_count::32-signed, topics_data::binary>>) do parse_topics(topics_count, topics_data) end @@ -63,8 +58,8 @@ defmodule KafkaEx.Protocol.OffsetCommit do defp parse_topics( topic_count, - <> + <> ) do {partitions, topics_data} = parse_partitions(partitions_count, rest, []) diff --git a/lib/kafka_ex/protocol/offset_fetch.ex b/lib/kafka_ex/protocol/offset_fetch.ex index 31b7e451..4578206d 100644 --- a/lib/kafka_ex/protocol/offset_fetch.ex +++ b/lib/kafka_ex/protocol/offset_fetch.ex @@ -53,16 +53,12 @@ defmodule KafkaEx.Protocol.OffsetFetch do KafkaEx.Protocol.create_request(:offset_fetch, correlation_id, client_id), <> + byte_size(offset_fetch_request.topic)::16-signed, offset_fetch_request.topic::binary, + 1::32-signed, offset_fetch_request.partition::32>> ] end - def parse_response( - <<_correlation_id::32-signed, topics_size::32-signed, - topics_data::binary>> - ) do + def parse_response(<<_correlation_id::32-signed, topics_size::32-signed, topics_data::binary>>) do parse_topics(topics_size, topics_data, __MODULE__) end @@ -71,8 +67,7 @@ defmodule KafkaEx.Protocol.OffsetFetch do def parse_partitions( partitions_size, <>, + metadata::size(metadata_size)-binary, error_code::16-signed, rest::binary>>, partitions, topic ) do diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 92beb534..02fd4fb1 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -84,16 +84,14 @@ defmodule KafkaEx.Protocol.Produce do [ KafkaEx.Protocol.create_request(:produce, correlation_id, client_id), <>, - <>, + <>, message_set ] end - def parse_response( - <<_correlation_id::32-signed, num_topics::32-signed, rest::binary>> - ), - do: parse_topics(num_topics, rest, __MODULE__) + def parse_response(<<_correlation_id::32-signed, num_topics::32-signed, rest::binary>>), + do: parse_topics(num_topics, rest, __MODULE__) def parse_response(unknown), do: unknown @@ -106,13 +104,11 @@ defmodule KafkaEx.Protocol.Produce do defp create_message_set(messages, compression_type) do {message_set, _} = create_message_set(messages, :none) - {compressed_message_set, attribute} = - Compression.compress(compression_type, message_set) + {compressed_message_set, attribute} = Compression.compress(compression_type, message_set) {message, msize} = create_message(compressed_message_set, nil, attribute) - {[<<0::64-signed>>, <>, message], - @int64_size + @int32_size + msize} + {[<<0::64-signed>>, <>, message], @int64_size + @int32_size + msize} end defp create_message_set_uncompressed([ @@ -145,8 +141,7 @@ defmodule KafkaEx.Protocol.Produce do def parse_partitions( partitions_size, - <>, + <>, partitions, topic ) do diff --git a/lib/kafka_ex/protocol/sync_group.ex b/lib/kafka_ex/protocol/sync_group.ex index ee70be2d..079dcd5e 100644 --- a/lib/kafka_ex/protocol/sync_group.ex +++ b/lib/kafka_ex/protocol/sync_group.ex @@ -42,17 +42,15 @@ defmodule KafkaEx.Protocol.SyncGroup do [ KafkaEx.Protocol.create_request(:sync_group, correlation_id, client_id), <> ] end @spec parse_response(binary) :: Response.t() def parse_response( - <<_correlation_id::32-signed, error_code::16-signed, - member_assignment_len::32-signed, + <<_correlation_id::32-signed, error_code::16-signed, member_assignment_len::32-signed, member_assignment::size(member_assignment_len)-binary>> ) do %Response{ @@ -78,8 +76,7 @@ defmodule KafkaEx.Protocol.SyncGroup do >> <> + byte_size(assignment_bytes_for_member)::32-signed, assignment_bytes_for_member::binary>> end defp topic_assignment_data([], acc), do: acc @@ -88,8 +85,7 @@ defmodule KafkaEx.Protocol.SyncGroup do do: topic_assignment_data(t, acc <> partition_assignment_data(h)) defp partition_assignment_data({topic_name, partition_ids}) do - <> end @@ -103,8 +99,7 @@ defmodule KafkaEx.Protocol.SyncGroup do defp parse_member_assignment(<<>>), do: [] defp parse_member_assignment( - <<@member_assignment_version::16-signed, assignments_size::32-signed, - rest::binary>> + <<@member_assignment_version::16-signed, assignments_size::32-signed, rest::binary>> ) do parse_assignments(assignments_size, rest, []) end @@ -113,8 +108,8 @@ defmodule KafkaEx.Protocol.SyncGroup do defp parse_assignments( size, - <>, + <>, assignments ) do {partitions, rest} = parse_partitions(partition_len, rest, []) diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 304a2fbc..640ea478 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -394,8 +394,7 @@ defmodule KafkaEx.Server do ) do correlation_id = state.correlation_id + 1 - produce_request = - default_partitioner().assign_partition(produce_request, metadata) + produce_request = default_partitioner().assign_partition(produce_request, metadata) produce_request_data = try do @@ -446,8 +445,7 @@ defmodule KafkaEx.Server do state.api_versions ) - state = - update_metadata(%{state | correlation_id: retrieved_corr_id}) + state = update_metadata(%{state | correlation_id: retrieved_corr_id}) { MetadataResponse.broker_for_topic( @@ -607,9 +605,7 @@ defmodule KafkaEx.Server do metadata_brokers = metadata.brokers - |> Enum.map( - &%{&1 | is_controller: &1.node_id == metadata.controller_id} - ) + |> Enum.map(&%{&1 | is_controller: &1.node_id == metadata.controller_id}) brokers = state.brokers @@ -728,8 +724,7 @@ defmodule KafkaEx.Server do ) end else - message = - "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." + message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." Logger.log(:error, message) raise message @@ -993,9 +988,7 @@ defmodule KafkaEx.Server do Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner) end - defp increment_state_correlation_id( - %_{correlation_id: correlation_id} = state - ) do + defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do %{state | correlation_id: correlation_id + 1} end end diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index bc1f25dc..62d7b931 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -141,8 +141,7 @@ defmodule KafkaEx.Server0P10AndLater do # Get the initial "real" broker list and start a regular refresh cycle. state = update_metadata(state) - {:ok, _} = - :timer.send_interval(state.metadata_update_interval, :update_metadata) + {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) state = if consumer_group?(state) do @@ -294,11 +293,9 @@ defmodule KafkaEx.Server0P10AndLater do nil -> {_, updated_state} = update_consumer_metadata(state) - default_broker = - if use_first_as_default, do: hd(state.brokers), else: nil + default_broker = if use_first_as_default, do: hd(state.brokers), else: nil - {broker_for_consumer_group(updated_state) || default_broker, - updated_state} + {broker_for_consumer_group(updated_state) || default_broker, updated_state} broker -> {broker, state} diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index 34e4655a..9a5e44c8 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -66,10 +66,7 @@ defmodule KafkaEx.Server0P8P0 do do: raise("Consumer Group is not supported in 0.8.0 version of Kafka") def kafka_server_consumer_group_metadata(_state), - do: - raise( - "Consumer Group Metadata is not supported in 0.8.0 version of Kafka" - ) + do: raise("Consumer Group Metadata is not supported in 0.8.0 version of Kafka") def kafka_server_join_group(_, _, _state), do: raise("Join Group is not supported in 0.8.0 version of Kafka") @@ -84,10 +81,7 @@ defmodule KafkaEx.Server0P8P0 do do: raise("Heartbeat is not supported in 0.8.0 version of Kafka") def kafka_server_update_consumer_metadata(_state), - do: - raise( - "Consumer Group Metadata is not supported in 0.8.0 version of Kafka" - ) + do: raise("Consumer Group Metadata is not supported in 0.8.0 version of Kafka") def kafka_server_api_versions(_state), do: raise("ApiVersions is not supported in 0.8.0 version of Kafka") diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 7a1b808d..4bdf2105 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -98,8 +98,7 @@ defmodule KafkaEx.Server0P8P2 do # Get the initial "real" broker list and start a regular refresh cycle. state = update_metadata(state) - {:ok, _} = - :timer.send_interval(state.metadata_update_interval, :update_metadata) + {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) if consumer_group?(state) do # If we are using consumer groups then initialize the state and start the update cycle @@ -233,8 +232,7 @@ defmodule KafkaEx.Server0P8P2 do end def update_consumer_metadata( - %State{consumer_group: consumer_group, correlation_id: correlation_id} = - state, + %State{consumer_group: consumer_group, correlation_id: correlation_id} = state, retry, _error_code ) do @@ -307,8 +305,7 @@ defmodule KafkaEx.Server0P8P2 do # if the request has a specific consumer group, use that # otherwise use the worker's consumer group - consumer_group = - offset_commit_request.consumer_group || state.consumer_group + consumer_group = offset_commit_request.consumer_group || state.consumer_group offset_commit_request = %{ offset_commit_request @@ -354,11 +351,9 @@ defmodule KafkaEx.Server0P8P2 do nil -> {_, updated_state} = update_consumer_metadata(state) - default_broker = - if use_first_as_default, do: hd(state.brokers), else: nil + default_broker = if use_first_as_default, do: hd(state.brokers), else: nil - {broker_for_consumer_group(updated_state) || default_broker, - updated_state} + {broker_for_consumer_group(updated_state) || default_broker, updated_state} broker -> {broker, state} diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 9b039a88..b778e39a 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -126,8 +126,7 @@ defmodule KafkaEx.Server0P9P0 do # Get the initial "real" broker list and start a regular refresh cycle. state = update_metadata(state) - {:ok, _} = - :timer.send_interval(state.metadata_update_interval, :update_metadata) + {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) state = if consumer_group?(state) do @@ -269,11 +268,9 @@ defmodule KafkaEx.Server0P9P0 do nil -> {_, updated_state} = update_consumer_metadata(state) - default_broker = - if use_first_as_default, do: hd(state.brokers), else: nil + default_broker = if use_first_as_default, do: hd(state.brokers), else: nil - {broker_for_consumer_group(updated_state) || default_broker, - updated_state} + {broker_for_consumer_group(updated_state) || default_broker, updated_state} broker -> {broker, state} diff --git a/mix.exs b/mix.exs index 54a6ef76..8479e6ab 100644 --- a/mix.exs +++ b/mix.exs @@ -55,8 +55,7 @@ defmodule KafkaEx.Mixfile do {:excoveralls, "~> 0.7", only: :test, runtime: false}, {:ex_doc, "~> 0.23", only: :dev, runtime: false}, {:hammox, "~> 0.5.0", only: :test}, - {:snappy, - git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}, + {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}, {:snappyer, "~> 1.2", only: [:dev, :test]} ] end diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index 09b049e1..a2970b25 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -235,8 +235,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do Enum.into(assignments2, MapSet.new()) ) - consumer1_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer1_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) consumer1_assignments = consumer1_pid @@ -246,8 +245,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do assert consumer1_assignments == Enum.sort(assignments1) - consumer2_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid2]) + consumer2_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid2]) consumer2_assignments = consumer2_pid @@ -370,8 +368,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do end test "handle_cast and handle_info calls", context do - consumer_group_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer_group_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) consumer_pids = GenConsumer.Supervisor.child_pids(consumer_group_pid) @@ -398,8 +395,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do end test "handle call stop returns from callbacks", context do - consumer_group_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer_group_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) [c1, c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) assert :foo = GenConsumer.call(c1, {:stop, :foo}) @@ -416,8 +412,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do end test "handle cast stop returns from callbacks", context do - consumer_group_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer_group_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) [c1, _c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) GenConsumer.cast(c1, :stop) @@ -433,8 +428,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do end test "handle info stop returns from callbacks", context do - consumer_group_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer_group_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) [c1, _c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) send(c1, :stop) diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index cde96195..993f3edc 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -17,8 +17,7 @@ defmodule KafkaEx.ConsumerGroup.Test do end test "create_worker allows us to disable the consumer group" do - {:ok, pid} = - KafkaEx.create_worker(:barney, consumer_group: :no_consumer_group) + {:ok, pid} = KafkaEx.create_worker(:barney, consumer_group: :no_consumer_group) consumer_group = :sys.get_state(pid).consumer_group assert consumer_group == :no_consumer_group @@ -88,8 +87,7 @@ defmodule KafkaEx.ConsumerGroup.Test do end test "create_worker allows us to provide a consumer group" do - {:ok, pid} = - KafkaEx.create_worker(:bah, consumer_group: "my_consumer_group") + {:ok, pid} = KafkaEx.create_worker(:bah, consumer_group: "my_consumer_group") consumer_group = :sys.get_state(pid).consumer_group @@ -104,8 +102,7 @@ defmodule KafkaEx.ConsumerGroup.Test do consumer_group_update_interval: 10 ) - consumer_group_update_interval = - :sys.get_state(pid).consumer_group_update_interval + consumer_group_update_interval = :sys.get_state(pid).consumer_group_update_interval assert consumer_group_update_interval == 10 end @@ -113,8 +110,7 @@ defmodule KafkaEx.ConsumerGroup.Test do test "create_worker provides a default consumer_group_update_interval of '30000'" do {:ok, pid} = KafkaEx.create_worker(:de, uris: uris()) - consumer_group_update_interval = - :sys.get_state(pid).consumer_group_update_interval + consumer_group_update_interval = :sys.get_state(pid).consumer_group_update_interval assert consumer_group_update_interval == 30000 end @@ -127,8 +123,7 @@ defmodule KafkaEx.ConsumerGroup.Test do end test "create_worker takes a consumer_group option and sets that as the consumer_group of the worker" do - {:ok, pid} = - KafkaEx.create_worker(:joe, uris: uris(), consumer_group: "foo") + {:ok, pid} = KafkaEx.create_worker(:joe, uris: uris(), consumer_group: "foo") consumer_group = :sys.get_state(pid).consumer_group @@ -357,8 +352,7 @@ defmodule KafkaEx.ConsumerGroup.Test do }) |> hd - offset_fetch_response_offset = - offset_fetch_response.partitions |> hd |> Map.get(:offset) + offset_fetch_response_offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) refute offset == offset_fetch_response_offset end diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index a7c9e364..086265cd 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -210,8 +210,7 @@ defmodule KafkaEx.Integration.Test do end ) - random_topic_metadata = - Enum.find(metadata.topic_metadatas, &(&1.topic == random_string)) + random_topic_metadata = Enum.find(metadata.topic_metadatas, &(&1.topic == random_string)) refute random_topic_metadata.partition_metadatas == [] @@ -223,8 +222,7 @@ defmodule KafkaEx.Integration.Test do pid = Process.whereis(Config.default_worker()) metadata = :sys.get_state(pid).metadata - random_topic_metadata = - Enum.find(metadata.topic_metadatas, &(&1.topic == random_string)) + random_topic_metadata = Enum.find(metadata.topic_metadatas, &(&1.topic == random_string)) refute random_topic_metadata.partition_metadatas == [] @@ -278,8 +276,7 @@ defmodule KafkaEx.Integration.Test do messages: [%Proto.Produce.Message{value: "hey foo"}] }) - fetch_response = - KafkaEx.fetch(topic_name, 0, offset: 0, auto_commit: false) |> hd + fetch_response = KafkaEx.fetch(topic_name, 0, offset: 0, auto_commit: false) |> hd message = fetch_response.partitions |> hd |> Map.get(:message_set) |> hd @@ -364,8 +361,7 @@ defmodule KafkaEx.Integration.Test do messages: [%Proto.Produce.Message{value: "foo"}] }) - [offset_response] = - TestHelper.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0) end) + [offset_response] = TestHelper.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0) end) offset = offset_response.partition_offsets |> hd |> Map.get(:offset) |> hd @@ -390,11 +386,9 @@ defmodule KafkaEx.Integration.Test do {:ok, offset} = KafkaEx.produce(produce_request) - fetch_response = - KafkaEx.fetch(random_string, 0, offset: 0, auto_commit: false) |> hd + fetch_response = KafkaEx.fetch(random_string, 0, offset: 0, auto_commit: false) |> hd - [got_message1, got_message2] = - fetch_response.partitions |> hd |> Map.get(:message_set) + [got_message1, got_message2] = fetch_response.partitions |> hd |> Map.get(:message_set) assert got_message1.key == message1.key assert got_message1.value == message1.value @@ -421,11 +415,9 @@ defmodule KafkaEx.Integration.Test do {:ok, offset} = KafkaEx.produce(produce_request) - fetch_response = - KafkaEx.fetch(random_string, 0, offset: 0, auto_commit: false) |> hd + fetch_response = KafkaEx.fetch(random_string, 0, offset: 0, auto_commit: false) |> hd - [got_message1, got_message2] = - fetch_response.partitions |> hd |> Map.get(:message_set) + [got_message1, got_message2] = fetch_response.partitions |> hd |> Map.get(:message_set) assert got_message1.key == message1.key assert got_message1.value == message1.value diff --git a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs index c69bd39d..db0d3fe1 100644 --- a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs +++ b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs @@ -16,8 +16,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do alias KafkaEx.New.KafkaExAPI setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs index 8525d670..f5ad92af 100644 --- a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs @@ -15,8 +15,7 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do @topic "test0p8p0" setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs index 45d73625..df472690 100644 --- a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs @@ -18,8 +18,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs index 92cbe256..66be66f9 100644 --- a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs @@ -163,16 +163,14 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do setup do {:ok, _} = TestPartitioner.start_link() - {:ok, client_pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, client_pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) # the client will die on its own, so don't count that ports_before = num_open_ports() topic_name = "#{@topic_name_prefix}#{:rand.uniform(2_000_000)}" - {:ok, topic_name} = - TestHelper.ensure_append_timestamp_topic(client_pid, topic_name) + {:ok, topic_name} = TestHelper.ensure_append_timestamp_topic(client_pid, topic_name) {:ok, consumer_group_pid1} = ConsumerGroup.start_link( @@ -261,8 +259,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do Enum.into(assignments2, MapSet.new()) ) - consumer1_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer1_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) consumer1_assignments = consumer1_pid @@ -272,8 +269,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do assert consumer1_assignments == Enum.sort(assignments1) - consumer2_pid = - ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid2]) + consumer2_pid = ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid2]) consumer2_assignments = consumer2_pid diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index 81c46956..7d7fe030 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -17,8 +17,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do alias KafkaEx.New.KafkaExAPI setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end @@ -241,8 +240,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do }) |> hd - offset_fetch_response_offset = - offset_fetch_response.partitions |> hd |> Map.get(:offset) + offset_fetch_response_offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) refute offset == offset_fetch_response_offset end diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index f81f5a9d..adf050c7 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -20,8 +20,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do alias KafkaEx.New.KafkaExAPI setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end @@ -48,8 +47,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "get metadata", %{client: client} do metadata = KafkaEx.metadata(worker_name: client, topic: "test0p8p0") - %MetadataResponse{topic_metadatas: topic_metadatas, brokers: brokers} = - metadata + %MetadataResponse{topic_metadatas: topic_metadatas, brokers: brokers} = metadata assert is_list(topic_metadatas) [topic_metadata | _] = topic_metadatas @@ -65,8 +63,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do resp = KafkaEx.offset(topic, 0, :earliest, client) - [%OffsetResponse{topic: ^topic, partition_offsets: [partition_offsets]}] = - resp + [%OffsetResponse{topic: ^topic, partition_offsets: [partition_offsets]}] = resp %{error_code: :no_error, offset: [offset], partition: 0} = partition_offsets assert offset >= 0 @@ -344,8 +341,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do ) |> hd - [got_message1, got_message2] = - fetch_response.partitions |> hd |> Map.get(:message_set) + [got_message1, got_message2] = fetch_response.partitions |> hd |> Map.get(:message_set) assert got_message1.key == message1.key assert got_message1.value == message1.value @@ -380,8 +376,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do ) |> hd - [got_message1, got_message2] = - fetch_response.partitions |> hd |> Map.get(:message_set) + [got_message1, got_message2] = fetch_response.partitions |> hd |> Map.get(:message_set) assert got_message1.key == message1.key assert got_message1.value == message1.value @@ -408,8 +403,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do {:ok, offset} = KafkaEx.produce(produce_request, worker_name: client) - fetch_response = - KafkaEx.fetch(topic, 0, offset: offset, worker_name: client) |> hd + fetch_response = KafkaEx.fetch(topic, 0, offset: offset, worker_name: client) |> hd [got_message] = fetch_response.partitions |> hd |> Map.get(:message_set) @@ -436,8 +430,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do {:ok, offset} = KafkaEx.produce(produce_request, worker_name: client) - fetch_response = - KafkaEx.fetch(topic, 0, offset: offset, worker_name: client) |> hd + fetch_response = KafkaEx.fetch(topic, 0, offset: offset, worker_name: client) |> hd [got_message] = fetch_response.partitions |> hd |> Map.get(:message_set) diff --git a/test/integration/kayrock/offset_test.exs b/test/integration/kayrock/offset_test.exs index bfebd40e..9df47510 100644 --- a/test/integration/kayrock/offset_test.exs +++ b/test/integration/kayrock/offset_test.exs @@ -11,8 +11,7 @@ defmodule KafkaEx.KayrockOffsetTest do @moduletag :new_client setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index bd7cc722..08a35eb7 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -8,8 +8,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do @moduletag :new_client setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs index 0c66448e..a6a45ede 100644 --- a/test/integration/kayrock/timestamp_test.exs +++ b/test/integration/kayrock/timestamp_test.exs @@ -12,8 +12,7 @@ defmodule KafkaEx.KayrockTimestampTest do @moduletag :new_client setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 33168413..821bea0b 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -28,8 +28,7 @@ defmodule KafkaEx.New.Client.Test do # we don't fetch any topics on startup assert topics == %{} - {:ok, [topic_metadata]} = - GenServer.call(client, {:topic_metadata, ["test0p8p0"], false}) + {:ok, [topic_metadata]} = GenServer.call(client, {:topic_metadata, ["test0p8p0"], false}) assert %Topic{name: "test0p8p0"} = topic_metadata end @@ -54,8 +53,7 @@ defmodule KafkaEx.New.Client.Test do %Kayrock.ListOffsets.V1.Response{responses: [main_resp]} = resp - [%{error_code: error_code, offset: offset}] = - main_resp.partition_responses + [%{error_code: error_code, offset: offset}] = main_resp.partition_responses assert error_code == 0 @@ -95,8 +93,7 @@ defmodule KafkaEx.New.Client.Test do %Kayrock.Produce.V1.Response{responses: [topic_response]} = response assert topic_response.topic == topic - [%{partition: ^partition, error_code: error_code}] = - topic_response.partition_responses + [%{partition: ^partition, error_code: error_code}] = topic_response.partition_responses assert error_code == 0 @@ -151,8 +148,7 @@ defmodule KafkaEx.New.Client.Test do %Kayrock.Produce.V1.Response{responses: [topic_response]} = response assert topic_response.topic == topic - [%{partition: ^partition, error_code: error_code}] = - topic_response.partition_responses + [%{partition: ^partition, error_code: error_code}] = topic_response.partition_responses assert error_code == 0 diff --git a/test/integration/server0_p_9_p_0_test.exs b/test/integration/server0_p_9_p_0_test.exs index b5da44bc..a50c4d7c 100644 --- a/test/integration/server0_p_9_p_0_test.exs +++ b/test/integration/server0_p_9_p_0_test.exs @@ -25,8 +25,7 @@ defmodule KafkaEx.Server0P9P0.Test do session_timeout: 6000 } - answer = - KafkaEx.join_group(request, worker_name: :join_group, timeout: 10000) + answer = KafkaEx.join_group(request, worker_name: :join_group, timeout: 10000) assert answer.error_code == :no_error assert answer.generation_id == 1 @@ -52,8 +51,7 @@ defmodule KafkaEx.Server0P9P0.Test do session_timeout: 6000 } - answer = - KafkaEx.join_group(request, worker_name: :sync_group, timeout: 10000) + answer = KafkaEx.join_group(request, worker_name: :sync_group, timeout: 10000) assert answer.error_code == :no_error @@ -95,8 +93,7 @@ defmodule KafkaEx.Server0P9P0.Test do session_timeout: 6000 } - answer = - KafkaEx.join_group(request, worker_name: :leave_group, timeout: 10000) + answer = KafkaEx.join_group(request, worker_name: :leave_group, timeout: 10000) assert answer.error_code == :no_error @@ -128,8 +125,7 @@ defmodule KafkaEx.Server0P9P0.Test do session_timeout: 6000 } - answer = - KafkaEx.join_group(request, worker_name: :heartbeat, timeout: 10000) + answer = KafkaEx.join_group(request, worker_name: :heartbeat, timeout: 10000) assert answer.error_code == :no_error diff --git a/test/kafka_ex/compression_test.exs b/test/kafka_ex/compression_test.exs index c0516b35..4dfb67ed 100644 --- a/test/kafka_ex/compression_test.exs +++ b/test/kafka_ex/compression_test.exs @@ -3,16 +3,14 @@ defmodule KafkaEx.CompressionTest do test "snappy decompression works with chunked messages" do data = - <<130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 14, 12, - 44, 0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 0, 0, 0, 75, 246, 7, 92, 10, - 44, 16, 236, 0, 0, 255, 255, 255, 255, 0, 0, 3, 232, 65, 66, 67, 68, 69, - 70, 71, 72, 73, 74, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, - 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, - 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 118, 10, 0>> + <<130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 14, 12, 44, 0, 0, 0, 0, + 0, 0, 0, 37, 0, 0, 3, 246, 0, 0, 0, 75, 246, 7, 92, 10, 44, 16, 236, 0, 0, 255, 255, 255, + 255, 0, 0, 3, 232, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 254, 10, 0, 254, 10, 0, 254, + 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, + 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 254, 10, 0, 118, 10, 0>> expected = - <<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255, - 255, 255, 0, 0, 3, + <<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255, 255, 255, 0, 0, 3, 232>> <> String.duplicate("ABCDEFGHIJ", 100) diff --git a/test/kafka_ex/default_partitioner_test.exs b/test/kafka_ex/default_partitioner_test.exs index fb6ec7a7..c6d6304a 100644 --- a/test/kafka_ex/default_partitioner_test.exs +++ b/test/kafka_ex/default_partitioner_test.exs @@ -48,8 +48,7 @@ defmodule KafkaEx.DefaultPartitionerTest do ] } - %{partition: partition} = - DefaultPartitioner.assign_partition(request, metadata(5)) + %{partition: partition} = DefaultPartitioner.assign_partition(request, metadata(5)) assert partition >= 0 and partition < 5 end @@ -74,11 +73,9 @@ defmodule KafkaEx.DefaultPartitionerTest do ] } - %{partition: 1} = - DefaultPartitioner.assign_partition(second_request, metadata(5)) + %{partition: 1} = DefaultPartitioner.assign_partition(second_request, metadata(5)) - %{partition: 5} = - DefaultPartitioner.assign_partition(second_request, metadata(6)) + %{partition: 5} = DefaultPartitioner.assign_partition(second_request, metadata(6)) end test "produce request with inconsistent keys" do diff --git a/test/kafka_ex/legacy_partitioner_test.exs b/test/kafka_ex/legacy_partitioner_test.exs index 55afb573..e95f743d 100644 --- a/test/kafka_ex/legacy_partitioner_test.exs +++ b/test/kafka_ex/legacy_partitioner_test.exs @@ -48,8 +48,7 @@ defmodule KafkaEx.LegacyPartitionerTest do ] } - %{partition: partition} = - LegacyPartitioner.assign_partition(request, metadata(5)) + %{partition: partition} = LegacyPartitioner.assign_partition(request, metadata(5)) assert partition >= 0 and partition < 5 end @@ -74,11 +73,9 @@ defmodule KafkaEx.LegacyPartitionerTest do ] } - %{partition: 1} = - LegacyPartitioner.assign_partition(second_request, metadata(5)) + %{partition: 1} = LegacyPartitioner.assign_partition(second_request, metadata(5)) - %{partition: 5} = - LegacyPartitioner.assign_partition(second_request, metadata(6)) + %{partition: 5} = LegacyPartitioner.assign_partition(second_request, metadata(6)) end test "produce request with inconsistent keys" do diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index ca45a9d9..0ecac58e 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -6,8 +6,7 @@ defmodule KafkaEx.New.Structs.BrokerTest do setup do pid = KafkaEx.TestSupport.Server.start(3040) - {:ok, socket} = - KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + {:ok, socket} = KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) on_exit(fn -> KafkaEx.Socket.close(socket) @@ -67,8 +66,7 @@ defmodule KafkaEx.New.Structs.BrokerTest do end test "returns false if broker has different socket", %{socket: socket_one} do - {:ok, socket_two} = - KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + {:ok, socket_two} = KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) broker = %Broker{socket: nil} |> Broker.put_socket(socket_one) diff --git a/test/kafka_ex/new/structs/cluster_metadata_test.exs b/test/kafka_ex/new/structs/cluster_metadata_test.exs index 93017c7a..fd73e191 100644 --- a/test/kafka_ex/new/structs/cluster_metadata_test.exs +++ b/test/kafka_ex/new/structs/cluster_metadata_test.exs @@ -115,8 +115,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadataTest do ] }) - node_selector = - KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 0) + node_selector = KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 0) cluster = %ClusterMetadata{ topics: %{ @@ -138,8 +137,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadataTest do ] }) - node_selector = - KafkaEx.New.Structs.NodeSelector.topic_partition("topic-two", 0) + node_selector = KafkaEx.New.Structs.NodeSelector.topic_partition("topic-two", 0) cluster = %ClusterMetadata{ topics: %{ @@ -161,8 +159,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadataTest do ] }) - node_selector = - KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 1) + node_selector = KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 1) cluster = %ClusterMetadata{ topics: %{ @@ -175,8 +172,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadataTest do end test "returns node based on consumer group name" do - node_selector = - KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-one") + node_selector = KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-one") cluster = %ClusterMetadata{ consumer_group_coordinators: %{ @@ -189,8 +185,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadataTest do end test "returns error when consumer group does not exist" do - node_selector = - KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-three") + node_selector = KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-three") cluster = %ClusterMetadata{ consumer_group_coordinators: %{ diff --git a/test/kafka_ex/protocol/consumer_metadata_test.exs b/test/kafka_ex/protocol/consumer_metadata_test.exs index 299a7804..a9febd2d 100644 --- a/test/kafka_ex/protocol/consumer_metadata_test.exs +++ b/test/kafka_ex/protocol/consumer_metadata_test.exs @@ -9,8 +9,8 @@ defmodule KafkaEx.Protocol.ConsumerMetadata.Test do test "parse_response correctly parses a valid response" do response = - <<0, 0, 156, 65, 0, 0, 0, 0, 192, 6, 0, 14, 49, 57, 50, 46, 49, 54, 56, - 46, 53, 57, 46, 49, 48, 51, 0, 0, 192, 6>> + <<0, 0, 156, 65, 0, 0, 0, 0, 192, 6, 0, 14, 49, 57, 50, 46, 49, 54, 56, 46, 53, 57, 46, 49, + 48, 51, 0, 0, 192, 6>> assert KafkaEx.Protocol.ConsumerMetadata.parse_response(response) == %KafkaEx.Protocol.ConsumerMetadata.Response{ diff --git a/test/kafka_ex/protocol/delete_topics_test.exs b/test/kafka_ex/protocol/delete_topics_test.exs index aa60e418..32924731 100644 --- a/test/kafka_ex/protocol/delete_topics_test.exs +++ b/test/kafka_ex/protocol/delete_topics_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do describe "create_request/4" do test "creates a request to delete a single topic" do expected_request = - <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, - 6::16, "topic1"::binary, 100::32-signed>> + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, 6::16, + "topic1"::binary, 100::32-signed>> delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ topics: ["topic1"], @@ -25,9 +25,8 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do test "creates a request to delete a multiple topic" do expected_response = - <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, - 6::16, "topic3"::binary, 6::16, "topic2"::binary, 6::16, - "topic1"::binary, 100::32-signed>> + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, 6::16, + "topic3"::binary, 6::16, "topic2"::binary, 6::16, "topic1"::binary, 100::32-signed>> delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ topics: ["topic1", "topic2", "topic3"], diff --git a/test/kafka_ex/protocol/fetch_test.exs b/test/kafka_ex/protocol/fetch_test.exs index 1fc72a5e..c4140027 100644 --- a/test/kafka_ex/protocol/fetch_test.exs +++ b/test/kafka_ex/protocol/fetch_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "create_request creates a valid fetch request" do good_request = - <<1::16, 0::16, 1::32, 3::16, "foo"::binary, -1::32, 10::32, 1::32, 1::32, - 3::16, "bar"::binary, 1::32, 0::32, 1::64, 10000::32>> + <<1::16, 0::16, 1::32, 3::16, "foo"::binary, -1::32, 10::32, 1::32, 1::32, 3::16, + "bar"::binary, 1::32, 0::32, 1::64, 10000::32>> fetch_request = %KafkaEx.Protocol.Fetch.Request{ correlation_id: 1, @@ -29,9 +29,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do partition = 0 response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 32::32, - 1::64, 20::32, 0::32, 0::8, 0::8, 3::32, key::binary, 3::32, - value::binary>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 32::32, 1::64, 20::32, + 0::32, 0::8, 0::8, 3::32, key::binary, 3::32, value::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -63,13 +62,12 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a response with excess bytes" do response = - <<0, 0, 0, 1, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 56, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, - 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 17, 254, 46, 107, 157, - 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, - 0, 2, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, - 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 17, 254>> + <<0, 0, 0, 1, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 56, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, + 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 17, + 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, + 0, 2, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121, + 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 17, 254>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -122,8 +120,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do value = "bar" response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, - 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, value::binary>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, + 0::32, 0::8, 0::8, -1::32, 3::32, value::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -158,8 +156,8 @@ defmodule KafkaEx.Protocol.Fetch.Test do key = "foo" response = - <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, - 1::64, 17::32, 0::32, 0::8, 0::8, 3::32, key::binary, -1::32>> + <<0::32, 1::32, 3::16, topic::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, + 0::32, 0::8, 0::8, 3::32, key::binary, -1::32>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -191,9 +189,9 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with multiple messages" do response = - <<0::32, 1::32, 3::16, "foo"::binary, 1::32, 0::32, 0::16, 10::64, 58::32, - 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 2::64, - 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "baz"::binary>> + <<0::32, 1::32, 3::16, "foo"::binary, 1::32, 0::32, 0::16, 10::64, 58::32, 1::64, 17::32, + 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 2::64, 17::32, 0::32, 0::8, 0::8, -1::32, + 3::32, "baz"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -236,10 +234,9 @@ defmodule KafkaEx.Protocol.Fetch.Test do topic = "foo" response = - <<0::32, 1::32, 3::16, topic::binary, 2::32, 0::32, 0::16, 10::64, 29::32, - 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 1::32, - 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, - "baz"::binary>> + <<0::32, 1::32, 3::16, topic::binary, 2::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, + 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary, 1::32, 0::16, 10::64, 29::32, 1::64, + 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "baz"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -288,10 +285,9 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with multiple topics" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, - 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "foo"::binary, 3::16, - "baz"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, - 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary>> + <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64, 29::32, 1::64, 17::32, + 0::32, 0::8, 0::8, -1::32, 3::32, "foo"::binary, 3::16, "baz"::binary, 1::32, 0::32, + 0::16, 10::64, 29::32, 1::64, 17::32, 0::32, 0::8, 0::8, -1::32, 3::32, "bar"::binary>> expected_response = [ %KafkaEx.Protocol.Fetch.Response{ @@ -345,12 +341,11 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with a gzip-encoded message" do response = - <<0, 0, 0, 4, 0, 0, 0, 1, 0, 9, 103, 122, 105, 112, 95, 116, 101, 115, - 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 74, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 62, 38, 244, 178, 37, 0, 1, 255, 255, - 255, 255, 0, 0, 0, 48, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, - 169, 101, 15, 206, 246, 50, 48, 252, 7, 2, 32, 143, 167, 36, 181, 184, - 68, 33, 55, 181, 184, 56, 49, 61, 21, 0, 10, 31, 112, 82, 38, 0, 0, 0>> + <<0, 0, 0, 4, 0, 0, 0, 1, 0, 9, 103, 122, 105, 112, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 74, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 62, + 38, 244, 178, 37, 0, 1, 255, 255, 255, 255, 0, 0, 0, 48, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, + 99, 96, 128, 3, 169, 101, 15, 206, 246, 50, 48, 252, 7, 2, 32, 143, 167, 36, 181, 184, 68, + 33, 55, 181, 184, 56, 49, 61, 21, 0, 10, 31, 112, 82, 38, 0, 0, 0>> topic = "gzip_test" @@ -384,19 +379,16 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with batched gzip-encoded messages" do response = - <<0, 0, 0, 3, 0, 0, 0, 1, 0, 15, 103, 122, 105, 112, 95, 98, 97, 116, 99, - 104, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 4, 0, 0, 0, 180, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 74, 112, 213, - 163, 157, 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, - 0, 0, 0, 99, 96, 128, 3, 169, 119, 54, 19, 103, 51, 48, 252, 7, 2, 32, - 143, 39, 41, 177, 36, 57, 67, 161, 36, 181, 184, 68, 193, 16, 170, 130, - 17, 164, 170, 220, 244, 128, 34, 86, 85, 70, 0, 83, 29, 3, 53, 76, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 82, 59, 149, 134, 225, 0, 1, 255, - 255, 255, 255, 0, 0, 0, 68, 31, 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 0, - 3, 38, 32, 150, 59, 147, 154, 199, 4, 230, 177, 100, 167, 86, 26, 2, - 105, 158, 164, 196, 146, 228, 12, 133, 146, 212, 226, 18, 5, 67, 136, - 66, 6, 102, 144, 74, 182, 111, 41, 54, 112, 149, 70, 104, 42, 141, 0, - 135, 95, 114, 164, 84, 0, 0, 0>> + <<0, 0, 0, 3, 0, 0, 0, 1, 0, 15, 103, 122, 105, 112, 95, 98, 97, 116, 99, 104, 95, 116, 101, + 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 180, 0, 0, 0, 0, + 0, 0, 0, 1, 0, 0, 0, 74, 112, 213, 163, 157, 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, + 139, 8, 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 169, 119, 54, 19, 103, 51, 48, 252, 7, 2, 32, + 143, 39, 41, 177, 36, 57, 67, 161, 36, 181, 184, 68, 193, 16, 170, 130, 17, 164, 170, 220, + 244, 128, 34, 86, 85, 70, 0, 83, 29, 3, 53, 76, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, + 82, 59, 149, 134, 225, 0, 1, 255, 255, 255, 255, 0, 0, 0, 68, 31, 139, 8, 0, 0, 0, 0, 0, + 0, 0, 99, 96, 0, 3, 38, 32, 150, 59, 147, 154, 199, 4, 230, 177, 100, 167, 86, 26, 2, 105, + 158, 164, 196, 146, 228, 12, 133, 146, 212, 226, 18, 5, 67, 136, 66, 6, 102, 144, 74, 182, + 111, 41, 54, 112, 149, 70, 104, 42, 141, 0, 135, 95, 114, 164, 84, 0, 0, 0>> topic = "gzip_batch_test" partition_id = 0 @@ -461,13 +453,12 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with a snappy-encoded message" do response = - <<0, 0, 0, 8, 0, 0, 0, 1, 0, 11, 115, 110, 97, 112, 112, 121, 95, 116, - 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, - 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 71, 183, 227, 95, 48, 0, 2, - 255, 255, 255, 255, 0, 0, 0, 57, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, - 0, 1, 0, 0, 0, 1, 0, 0, 0, 37, 38, 0, 0, 9, 1, 120, 1, 0, 0, 0, 26, 166, - 224, 205, 141, 0, 0, 255, 255, 255, 255, 0, 0, 0, 12, 116, 101, 115, - 116, 32, 109, 101, 115, 115, 97, 103, 101>> + <<0, 0, 0, 8, 0, 0, 0, 1, 0, 11, 115, 110, 97, 112, 112, 121, 95, 116, 101, 115, 116, 0, 0, + 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 83, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, + 0, 71, 183, 227, 95, 48, 0, 2, 255, 255, 255, 255, 0, 0, 0, 57, 130, 83, 78, 65, 80, 80, + 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 37, 38, 0, 0, 9, 1, 120, 1, 0, 0, 0, 26, 166, 224, + 205, 141, 0, 0, 255, 255, 255, 255, 0, 0, 0, 12, 116, 101, 115, 116, 32, 109, 101, 115, + 115, 97, 103, 101>> value = "test message" @@ -505,14 +496,13 @@ defmodule KafkaEx.Protocol.Fetch.Test do partition_id = 0 response = - <<0, 0, 0, 14, 0, 0, 0, 1, 0, 17, 115, 110, 97, 112, 112, 121, 95, 98, 97, - 116, 99, 104, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 105, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 93, - 70, 199, 142, 116, 0, 2, 255, 255, 255, 255, 0, 0, 0, 79, 130, 83, 78, - 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 59, 84, 0, 0, 25, 1, - 16, 30, 204, 101, 110, 2, 5, 15, 76, 4, 107, 101, 121, 49, 0, 0, 0, 12, - 98, 97, 116, 99, 104, 32, 116, 101, 115, 116, 32, 1, 16, 1, 1, 32, 1, 0, - 0, 0, 30, 6, 246, 100, 60, 1, 13, 5, 42, 0, 50, 58, 42, 0, 0, 50>> + <<0, 0, 0, 14, 0, 0, 0, 1, 0, 17, 115, 110, 97, 112, 112, 121, 95, 98, 97, 116, 99, 104, 95, + 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 105, 0, + 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 93, 70, 199, 142, 116, 0, 2, 255, 255, 255, 255, 0, 0, 0, + 79, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 59, 84, 0, 0, 25, 1, + 16, 30, 204, 101, 110, 2, 5, 15, 76, 4, 107, 101, 121, 49, 0, 0, 0, 12, 98, 97, 116, 99, + 104, 32, 116, 101, 115, 116, 32, 1, 16, 1, 1, 32, 1, 0, 0, 0, 30, 6, 246, 100, 60, 1, 13, + 5, 42, 0, 50, 58, 42, 0, 0, 50>> topic = "snappy_batch_test" diff --git a/test/kafka_ex/protocol/leave_group_test.exs b/test/kafka_ex/protocol/leave_group_test.exs index e9803fde..38e04153 100644 --- a/test/kafka_ex/protocol/leave_group_test.exs +++ b/test/kafka_ex/protocol/leave_group_test.exs @@ -22,8 +22,7 @@ defmodule KafkaEx.Protocol.LeaveGroup.Test do member_id: "member" } - request = - KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", leave_request) + request = KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", leave_request) assert :erlang.iolist_to_binary(request) == good_request end diff --git a/test/kafka_ex/protocol/metadata_test.exs b/test/kafka_ex/protocol/metadata_test.exs index c7adfd7f..ca2c77a8 100644 --- a/test/kafka_ex/protocol/metadata_test.exs +++ b/test/kafka_ex/protocol/metadata_test.exs @@ -8,8 +8,7 @@ defmodule KafkaEx.Protocol.Metadata.Test do end test "create_request with a single topic creates a valid metadata request" do - good_request = - <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 1::32, 3::16, "bar"::binary>> + good_request = <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 1::32, 3::16, "bar"::binary>> request = KafkaEx.Protocol.Metadata.create_request(1, "foo", ["bar"]) assert :erlang.iolist_to_binary(request) == good_request @@ -17,19 +16,18 @@ defmodule KafkaEx.Protocol.Metadata.Test do test "create_request with a multiple topics creates a valid metadata request" do good_request = - <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 3::32, 3::16, "bar"::binary, - 3::16, "baz"::binary, 4::16, "food"::binary>> + <<3::16, 0::16, 1::32, 3::16, "foo"::binary, 3::32, 3::16, "bar"::binary, 3::16, + "baz"::binary, 4::16, "food"::binary>> - request = - KafkaEx.Protocol.Metadata.create_request(1, "foo", ["bar", "baz", "food"]) + request = KafkaEx.Protocol.Metadata.create_request(1, "foo", ["bar", "baz", "food"]) assert :erlang.iolist_to_binary(request) == good_request end test "parse_response correctly parses a valid response" do response = - <<0::32, 1::32, 0::32, 3::16, "foo"::binary, 9092::32, 1::32, 0::16, - 3::16, "bar"::binary, 1::32, 0::16, 0::32, 0::32, 0::32, 1::32, 0::32>> + <<0::32, 1::32, 0::32, 3::16, "foo"::binary, 9092::32, 1::32, 0::16, 3::16, "bar"::binary, + 1::32, 0::16, 0::32, 0::32, 0::32, 1::32, 0::32>> expected_response = %KafkaEx.Protocol.Metadata.Response{ brokers: [ diff --git a/test/kafka_ex/protocol/offset_commit_test.exs b/test/kafka_ex/protocol/offset_commit_test.exs index 26a2da87..2f55e142 100644 --- a/test/kafka_ex/protocol/offset_commit_test.exs +++ b/test/kafka_ex/protocol/offset_commit_test.exs @@ -14,8 +14,8 @@ defmodule KafkaEx.Protocol.OffsetCommit.Test do } good_request = - <<8::16, 0::16, corr_id::32, byte_size(client_id)::16, client_id::binary, - 3::16, "bar", 1::32, 3::16, "foo", 1::32, 0::32, 10::64, 3::16, "baz">> + <<8::16, 0::16, corr_id::32, byte_size(client_id)::16, client_id::binary, 3::16, "bar", + 1::32, 3::16, "foo", 1::32, 0::32, 10::64, 3::16, "baz">> request = KafkaEx.Protocol.OffsetCommit.create_request( @@ -29,8 +29,7 @@ defmodule KafkaEx.Protocol.OffsetCommit.Test do test "parse_response correctly parses a valid response" do response = - <<0, 0, 156, 64, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, - 0, 0, 0, 0>> + <<0, 0, 156, 64, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0>> assert KafkaEx.Protocol.OffsetCommit.parse_response(response) == [ %KafkaEx.Protocol.OffsetCommit.Response{ diff --git a/test/kafka_ex/protocol/offset_fetch_test.exs b/test/kafka_ex/protocol/offset_fetch_test.exs index e1781def..c56cfde2 100644 --- a/test/kafka_ex/protocol/offset_fetch_test.exs +++ b/test/kafka_ex/protocol/offset_fetch_test.exs @@ -12,8 +12,8 @@ defmodule KafkaEx.Protocol.OffsetFetch.Test do } good_request = - <<9::16, 0::16, 3::32, 8::16, "kafka_ex"::binary, 3::16, "bar"::binary, - 1::32, 3::16, "foo"::binary, 1::32, 0::32>> + <<9::16, 0::16, 3::32, 8::16, "kafka_ex"::binary, 3::16, "bar"::binary, 1::32, 3::16, + "foo"::binary, 1::32, 0::32>> request = KafkaEx.Protocol.OffsetFetch.create_request( @@ -27,8 +27,8 @@ defmodule KafkaEx.Protocol.OffsetFetch.Test do test "parse_response correctly parses a valid response" do response = - <<0, 0, 156, 66, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0>> + <<0, 0, 156, 66, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 9, 0, 0, 0, 0>> assert KafkaEx.Protocol.OffsetFetch.parse_response(response) == [ %KafkaEx.Protocol.OffsetFetch.Response{ diff --git a/test/kafka_ex/protocol/offset_test.exs b/test/kafka_ex/protocol/offset_test.exs index 25707a96..7f0cf244 100644 --- a/test/kafka_ex/protocol/offset_test.exs +++ b/test/kafka_ex/protocol/offset_test.exs @@ -2,8 +2,7 @@ defmodule KafkaEx.Protocol.Offset.Test do use ExUnit.Case, async: true test "parse_response correctly parses a valid response with an offset" do - response = - <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64>> + response = <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ @@ -18,9 +17,7 @@ defmodule KafkaEx.Protocol.Offset.Test do end test "parse_response correctly parses a valid response with multiple offsets" do - response = - <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 2::32, 10::64, - 20::64>> + response = <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 2::32, 10::64, 20::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ @@ -36,8 +33,8 @@ defmodule KafkaEx.Protocol.Offset.Test do test "parse_response correctly parses a valid response with multiple partitions" do response = - <<0::32, 1::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 1::32, 10::64, - 1::32, 0::16, 1::32, 20::64>> + <<0::32, 1::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 1::32, 10::64, 1::32, 0::16, + 1::32, 20::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ @@ -54,8 +51,8 @@ defmodule KafkaEx.Protocol.Offset.Test do test "parse_response correctly parses a valid response with multiple topics" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64, - 3::16, "baz"::binary, 1::32, 0::32, 0::16, 1::32, 20::64>> + <<0::32, 2::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 1::32, 10::64, 3::16, + "baz"::binary, 1::32, 0::32, 0::16, 1::32, 20::64>> expected_response = [ %KafkaEx.Protocol.Offset.Response{ diff --git a/test/kafka_ex/protocol/produce_test.exs b/test/kafka_ex/protocol/produce_test.exs index 61777413..d0feddd9 100644 --- a/test/kafka_ex/protocol/produce_test.exs +++ b/test/kafka_ex/protocol/produce_test.exs @@ -3,10 +3,9 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request creates a valid payload" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, - 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 29, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 3, 104, 101, 121>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, + 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 29, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, + 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121>> request = KafkaEx.Protocol.Produce.create_request( @@ -29,13 +28,11 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request correctly batches multiple request messages" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, - 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 88, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 225, 27, 42, - 82, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 104, 105, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 19, 119, 44, 195, 207, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, - 108, 108, 111>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 4, 102, + 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 88, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, + 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 16, 225, 27, 42, 82, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 104, 105, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 19, 119, 44, 195, 207, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111>> request = KafkaEx.Protocol.Produce.create_request( @@ -60,15 +57,14 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request correctly encodes messages with gzip" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, - 105, 111, 110, 95, 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, - 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 99, 111, 109, 112, 114, 101, 115, - 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, - 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 79, 44, 46, 209, 0, 1, 255, - 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 3, 99, 96, - 128, 3, 153, 135, 115, 4, 255, 131, 89, 172, 217, 169, 149, 10, 137, 64, - 6, 103, 110, 106, 113, 113, 98, 122, 42, 152, 3, 87, 199, 242, 37, 117, - 30, 66, 93, 18, 178, 186, 36, 0, 127, 205, 212, 97, 80, 0, 0, 0>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, + 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, + 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 79, 44, 46, 209, 0, 1, 255, + 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 3, 99, 96, 128, 3, 153, 135, + 115, 4, 255, 131, 89, 172, 217, 169, 149, 10, 137, 64, 6, 103, 110, 106, 113, 113, 98, + 122, 42, 152, 3, 87, 199, 242, 37, 117, 30, 66, 93, 18, 178, 186, 36, 0, 127, 205, 212, + 97, 80, 0, 0, 0>> client_id = "compression_client_test" topic = "compressed_topic" @@ -87,8 +83,7 @@ defmodule KafkaEx.Protocol.Produce.Test do messages: messages } - iolist_request = - KafkaEx.Protocol.Produce.create_request(1, client_id, produce) + iolist_request = KafkaEx.Protocol.Produce.create_request(1, client_id, produce) request = :erlang.iolist_to_binary(iolist_request) @@ -105,13 +100,11 @@ defmodule KafkaEx.Protocol.Produce.Test do # should be the same. post_crc_header_size = 10 - <> = request - <> = expected_request @@ -120,23 +113,21 @@ defmodule KafkaEx.Protocol.Produce.Test do decompressed_message_set = :zlib.gunzip(compressed_message_set) - expect_decompressed_message_set = - :zlib.gunzip(expect_compressed_message_set) + expect_decompressed_message_set = :zlib.gunzip(expect_compressed_message_set) assert decompressed_message_set == expect_decompressed_message_set end test "create_request correctly encodes messages with snappy" do expected_request = - <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, - 105, 111, 110, 95, 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, - 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 99, 111, 109, 112, 114, 101, 115, - 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, - 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 67, 64, 33, 66, 0, 2, 255, - 255, 255, 255, 0, 0, 0, 60, 80, 0, 0, 25, 1, 16, 28, 225, 156, 17, 255, - 5, 15, 64, 5, 107, 101, 121, 32, 97, 0, 0, 0, 9, 109, 101, 115, 115, 97, - 103, 101, 5, 13, 17, 1, 16, 28, 4, 244, 101, 158, 5, 13, 5, 40, 52, 98, - 0, 0, 0, 9, 109, 101, 115, 115, 97, 103, 101, 32, 98>> + <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, + 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, + 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 67, 64, 33, 66, 0, 2, 255, + 255, 255, 255, 0, 0, 0, 60, 80, 0, 0, 25, 1, 16, 28, 225, 156, 17, 255, 5, 15, 64, 5, 107, + 101, 121, 32, 97, 0, 0, 0, 9, 109, 101, 115, 115, 97, 103, 101, 5, 13, 17, 1, 16, 28, 4, + 244, 101, 158, 5, 13, 5, 40, 52, 98, 0, 0, 0, 9, 109, 101, 115, 115, 97, 103, 101, 32, + 98>> produce = %KafkaEx.Protocol.Produce.Request{ topic: "compressed_topic", @@ -161,8 +152,7 @@ defmodule KafkaEx.Protocol.Produce.Test do end test "parse_response correctly parses a valid response with single topic and partition" do - response = - <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64>> + response = <<0::32, 1::32, 3::16, "bar"::binary, 1::32, 0::32, 0::16, 10::64>> expected_response = [ %KafkaEx.Protocol.Produce.Response{ @@ -177,9 +167,8 @@ defmodule KafkaEx.Protocol.Produce.Test do test "parse_response correctly parses a valid response with multiple topics and partitions" do response = - <<0::32, 2::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 10::64, 1::32, - 0::16, 20::64, 3::16, "baz"::binary, 2::32, 0::32, 0::16, 30::64, 1::32, - 0::16, 40::64>> + <<0::32, 2::32, 3::16, "bar"::binary, 2::32, 0::32, 0::16, 10::64, 1::32, 0::16, 20::64, + 3::16, "baz"::binary, 2::32, 0::32, 0::16, 30::64, 1::32, 0::16, 40::64>> expected_response = [ %KafkaEx.Protocol.Produce.Response{ diff --git a/test/kafka_ex/protocol/sync_group_test.exs b/test/kafka_ex/protocol/sync_group_test.exs index d30b0725..e1802379 100644 --- a/test/kafka_ex/protocol/sync_group_test.exs +++ b/test/kafka_ex/protocol/sync_group_test.exs @@ -74,15 +74,13 @@ defmodule KafkaEx.Protocol.SyncGroup.Test do ] } - request = - KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", sync_request) + request = KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", sync_request) assert :erlang.iolist_to_binary(request) == good_request end test "parse success response correctly" do - member_assignment = - <<0::16, 1::32, 6::16, "topic1", 3::32, 1::32, 3::32, 5::32>> + member_assignment = <<0::16, 1::32, 6::16, "topic1", 3::32, 1::32, 3::32, 5::32>> response = << # CorrelationId diff --git a/test/test_helper.exs b/test/test_helper.exs index 9a5a1f04..7b134f62 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -154,8 +154,7 @@ defmodule TestHelper do end defp first_partition_offset(response) do - [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = - response + [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = response first_partition = hd(partition_offsets) first_partition.offset |> hd @@ -183,8 +182,7 @@ defmodule TestHelper do end defp wait_for_accum(value_getter, acc, min_length, dwell, max_tries) do - value = - wait_for_value(value_getter, fn v -> length(v) > 0 end, dwell, max_tries) + value = wait_for_value(value_getter, fn v -> length(v) > 0 end, dwell, max_tries) wait_for_accum(value_getter, acc ++ value, min_length, dwell, max_tries) end