Skip to content

Commit

Permalink
Merge pull request #489 from Argonus/native-formatter
Browse files Browse the repository at this point in the history
[Maintenance] Use native formatter settings
  • Loading branch information
Argonus authored Nov 10, 2023
2 parents 74109be + 57e6bce commit db3e4d1
Show file tree
Hide file tree
Showing 63 changed files with 297 additions and 497 deletions.
3 changes: 1 addition & 2 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 80
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
3 changes: 1 addition & 2 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ defmodule KafkaEx do
KafkaEx.worker_setting() | {:server_impl, module}
]) :: GenServer.on_start()
def start_link_worker(name, worker_init \\ []) do
{server_impl, worker_init} =
Keyword.pop(worker_init, :server_impl, Config.server_impl())
{server_impl, worker_init} = Keyword.pop(worker_init, :server_impl, Config.server_impl())

{:ok, full_worker_init} = build_worker_options(worker_init)
server_impl.start_link(full_worker_init, name)
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do

GenServer.start_link(
__MODULE__,
{{gen_consumer_module, consumer_module}, group_name, topics,
consumer_opts},
{{gen_consumer_module, consumer_module}, group_name, topics, consumer_opts},
gen_server_opts
)
end
Expand Down Expand Up @@ -396,8 +395,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id
}

leave_group_response =
KafkaEx.leave_group(leave_request, worker_name: worker_name)
leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name)

case leave_group_response do
%{error_code: :no_error} ->
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ defmodule KafkaEx.DefaultPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -70,8 +68,7 @@ defmodule KafkaEx.DefaultPartitioner do
) do
hash = Murmur.umurmur2(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ defmodule KafkaEx.GenConsumer do
options
) :: GenServer.on_start()
def start_link(consumer_module, group_name, topic, partition, opts \\ []) do
{server_opts, consumer_opts} =
Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])
{server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])

GenServer.start_link(
__MODULE__,
Expand Down Expand Up @@ -563,8 +562,7 @@ defmodule KafkaEx.GenConsumer do

given_fetch_options = Keyword.get(opts, :fetch_options, [])

fetch_options =
Keyword.merge(default_fetch_options, given_fetch_options)
fetch_options = Keyword.merge(default_fetch_options, given_fetch_options)

state = %State{
consumer_module: consumer_module,
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/legacy_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ defmodule KafkaEx.LegacyPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -71,8 +69,7 @@ defmodule KafkaEx.LegacyPartitioner do
) do
hash = Murmur.umurmur2_legacy(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ defmodule KafkaEx.NetworkClient do
:ok ->
case Socket.recv(socket, 0, timeout) do
{:ok, data} ->
:ok =
Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])
:ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])

data

Expand Down
27 changes: 9 additions & 18 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ defmodule KafkaEx.New.Adapter do
| _
]
}) do
{message_set, last_offset} =
kayrock_message_set_to_kafka_ex(record_set, topic, partition)
{message_set, last_offset} = kayrock_message_set_to_kafka_ex(record_set, topic, partition)

{[
%FetchResponse{
Expand Down Expand Up @@ -267,8 +266,7 @@ defmodule KafkaEx.New.Adapter do
group_id: request.group_name,
generation_id: request.generation_id,
member_id: request.member_id,
group_assignment:
Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
group_assignment: Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
}, request.group_name}
end

Expand All @@ -280,8 +278,7 @@ defmodule KafkaEx.New.Adapter do
}) do
%SyncGroupResponse{
error_code: Kayrock.ErrorCode.code_to_atom(error_code),
assignments:
Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
assignments: Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
}
end

Expand Down Expand Up @@ -313,8 +310,7 @@ defmodule KafkaEx.New.Adapter do
def create_topics_request(requests, timeout) do
%Kayrock.CreateTopics.V0.Request{
timeout: timeout,
create_topic_requests:
Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
create_topic_requests: Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
}
end

Expand Down Expand Up @@ -373,11 +369,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_fetch_request(offset_fetch_request, client_consumer_group) do
consumer_group =
offset_fetch_request.consumer_group || client_consumer_group
consumer_group = offset_fetch_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)
request = Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)

{%{
request
Expand Down Expand Up @@ -422,11 +416,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_commit_request(offset_commit_request, client_consumer_group) do
consumer_group =
offset_commit_request.consumer_group || client_consumer_group
consumer_group = offset_commit_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)
request = Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)

request = %{
request
Expand Down Expand Up @@ -518,8 +510,7 @@ defmodule KafkaEx.New.Adapter do
topic: request.topic,
num_partitions: request.num_partitions,
replication_factor: request.replication_factor,
replica_assignment:
Enum.map(request.replica_assignment, &Map.from_struct/1),
replica_assignment: Enum.map(request.replica_assignment, &Map.from_struct/1),
config_entries:
Enum.map(request.config_entries, fn ce ->
%{config_name: ce.config_name, config_value: ce.config_value}
Expand Down
21 changes: 7 additions & 14 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ defmodule KafkaEx.New.Client do
Kernel.reraise(e, System.stacktrace())
end

{:ok, _} =
:timer.send_interval(state.metadata_update_interval, :update_metadata)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)

{:ok, state}
end
Expand Down Expand Up @@ -159,15 +158,13 @@ defmodule KafkaEx.New.Client do
end

def handle_call({:topic_metadata, topics, allow_topic_creation}, _from, state) do
{topic_metadata, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{topic_metadata, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} =
kayrock_network_request(request, node_selector, state)
{response, updated_state} = kayrock_network_request(request, node_selector, state)

{:reply, response, updated_state}
end
Expand Down Expand Up @@ -220,8 +217,7 @@ defmodule KafkaEx.New.Client do
updated_state

_ ->
new_cluster_metadata =
ClusterMetadata.from_metadata_v1_response(response)
new_cluster_metadata = ClusterMetadata.from_metadata_v1_response(response)

{updated_cluster_metadata, brokers_to_close} =
ClusterMetadata.merge_brokers(
Expand Down Expand Up @@ -345,8 +341,7 @@ defmodule KafkaEx.New.Client do
end

_ ->
message =
"Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."

Logger.log(:error, message)
raise message
Expand Down Expand Up @@ -549,8 +544,7 @@ defmodule KafkaEx.New.Client do
end

defp run_client_request(
%{client_id: client_id, correlation_id: correlation_id} =
client_request,
%{client_id: client_id, correlation_id: correlation_id} = client_request,
send_request,
synchronous
)
Expand Down Expand Up @@ -694,8 +688,7 @@ defmodule KafkaEx.New.Client do

topic_metadata = State.topics_metadata(updated_state, topics)

{topic_metadata,
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
{topic_metadata, %{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
end

defp close_broker_by_socket(state, socket) do
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ defmodule KafkaEx.New.Client.State do
:consumer_group_update_interval,
@default_consumer_group_update_interval
),
allow_auto_topic_creation:
Keyword.get(args, :allow_auto_topic_creation, true),
allow_auto_topic_creation: Keyword.get(args, :allow_auto_topic_creation, true),
use_ssl: Keyword.get(args, :use_ssl, false),
ssl_options: Keyword.get(args, :ssl_options, []),
consumer_group_for_auto_commit: Keyword.get(args, :consumer_group)
Expand Down Expand Up @@ -99,8 +98,7 @@ defmodule KafkaEx.New.Client.State do
) do
%{
state
| cluster_metadata:
ClusterMetadata.remove_topics(cluster_metadata, topics)
| cluster_metadata: ClusterMetadata.remove_topics(cluster_metadata, topics)
}
end

Expand Down
9 changes: 3 additions & 6 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule KafkaEx.New.ClientCompatibility do
def handle_call({:metadata, topic}, _from, state) do
updated_state = update_metadata(state, [topic])

{:reply, Adapter.metadata_response(updated_state.cluster_metadata),
updated_state}
{:reply, Adapter.metadata_response(updated_state.cluster_metadata), updated_state}
end

def handle_call({:offset, topic, partition, time}, _from, state) do
Expand Down Expand Up @@ -141,8 +140,7 @@ defmodule KafkaEx.New.ClientCompatibility do
{response, updated_state}
end

{:reply, response,
%{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
{:reply, response, %{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
end

def handle_call({:join_group, request, network_timeout}, _from, state) do
Expand Down Expand Up @@ -339,8 +337,7 @@ defmodule KafkaEx.New.ClientCompatibility do
state

_ ->
{_, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{_, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

updated_state
end
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/structs/cluster_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
brokers =
metadata.brokers
|> Enum.into(%{}, fn broker_metadata ->
%{host: host, port: port, node_id: node_id, rack: rack} =
broker_metadata
%{host: host, port: port, node_id: node_id, rack: rack} = broker_metadata

{node_id, %Broker{host: host, port: port, node_id: node_id, rack: rack}}
end)
Expand Down Expand Up @@ -240,8 +239,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
KafkaExAPI.node_id()
) :: t
def put_consumer_group_coordinator(
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} =
cluster_metadata,
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} = cluster_metadata,
consumer_group,
coordinator_node_id
) do
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/new/structs/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ defmodule KafkaEx.New.Structs.Topic do
end
)

partitions =
Enum.map(partition_metadata, &Partition.from_partition_metadata/1)
partitions = Enum.map(partition_metadata, &Partition.from_partition_metadata/1)

%__MODULE__{
name: name,
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ defmodule KafkaEx.Protocol do
client_id,
api_version \\ @default_api_version
) do
<<api_key(type)::16, api_version::16, correlation_id::32,
byte_size(client_id)::16, client_id::binary>>
<<api_key(type)::16, api_version::16, correlation_id::32, byte_size(client_id)::16,
client_id::binary>>
end

@error_map %{
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ defmodule KafkaEx.Protocol.ApiVersions do
def parse_response(binary, this_api_version \\ @default_this_api_version)

def parse_response(
<<_correlation_id::32-signed, error_code::16-signed,
api_versions_count::32-signed, rest::binary>>,
<<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed,
rest::binary>>,
this_api_version
) do
%{
Expand Down Expand Up @@ -100,8 +100,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
end

defp parse_one_api_version(
<<api_key::16-signed, min_version::16-signed, max_version::16-signed,
rest::binary>>
<<api_key::16-signed, min_version::16-signed, max_version::16-signed, rest::binary>>
) do
{%ApiVersion{
api_key: api_key,
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ defmodule KafkaEx.Protocol.Common do

def parse_topics(
topics_size,
<<topic_size::16-signed, topic::size(topic_size)-binary,
partitions_size::32-signed, rest::binary>>,
<<topic_size::16-signed, topic::size(topic_size)-binary, partitions_size::32-signed,
rest::binary>>,
mod
) do
struct_module = Module.concat(mod, Response)

{partitions, topics_data} =
mod.parse_partitions(partitions_size, rest, [], topic)
{partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic)

[
%{
Expand Down
Loading

0 comments on commit db3e4d1

Please sign in to comment.