Skip to content

Commit

Permalink
[Maintenance] Use native formatter settings
Browse files Browse the repository at this point in the history
With todays screens size, there is no need to enforce 80 chars line lenght.
Let's use native mix format settings
  • Loading branch information
Argonus committed Oct 8, 2023
1 parent 4ca6c43 commit 57e6bce
Show file tree
Hide file tree
Showing 63 changed files with 297 additions and 497 deletions.
3 changes: 1 addition & 2 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 80
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
3 changes: 1 addition & 2 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ defmodule KafkaEx do
KafkaEx.worker_setting() | {:server_impl, module}
]) :: GenServer.on_start()
def start_link_worker(name, worker_init \\ []) do
{server_impl, worker_init} =
Keyword.pop(worker_init, :server_impl, Config.server_impl())
{server_impl, worker_init} = Keyword.pop(worker_init, :server_impl, Config.server_impl())

{:ok, full_worker_init} = build_worker_options(worker_init)
server_impl.start_link(full_worker_init, name)
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do

GenServer.start_link(
__MODULE__,
{{gen_consumer_module, consumer_module}, group_name, topics,
consumer_opts},
{{gen_consumer_module, consumer_module}, group_name, topics, consumer_opts},
gen_server_opts
)
end
Expand Down Expand Up @@ -396,8 +395,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id
}

leave_group_response =
KafkaEx.leave_group(leave_request, worker_name: worker_name)
leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name)

case leave_group_response do
%{error_code: :no_error} ->
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ defmodule KafkaEx.DefaultPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -70,8 +68,7 @@ defmodule KafkaEx.DefaultPartitioner do
) do
hash = Murmur.umurmur2(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ defmodule KafkaEx.GenConsumer do
options
) :: GenServer.on_start()
def start_link(consumer_module, group_name, topic, partition, opts \\ []) do
{server_opts, consumer_opts} =
Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])
{server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])

GenServer.start_link(
__MODULE__,
Expand Down Expand Up @@ -563,8 +562,7 @@ defmodule KafkaEx.GenConsumer do

given_fetch_options = Keyword.get(opts, :fetch_options, [])

fetch_options =
Keyword.merge(default_fetch_options, given_fetch_options)
fetch_options = Keyword.merge(default_fetch_options, given_fetch_options)

state = %State{
consumer_module: consumer_module,
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/legacy_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ defmodule KafkaEx.LegacyPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -71,8 +69,7 @@ defmodule KafkaEx.LegacyPartitioner do
) do
hash = Murmur.umurmur2_legacy(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ defmodule KafkaEx.NetworkClient do
:ok ->
case Socket.recv(socket, 0, timeout) do
{:ok, data} ->
:ok =
Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])
:ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])

data

Expand Down
27 changes: 9 additions & 18 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ defmodule KafkaEx.New.Adapter do
| _
]
}) do
{message_set, last_offset} =
kayrock_message_set_to_kafka_ex(record_set, topic, partition)
{message_set, last_offset} = kayrock_message_set_to_kafka_ex(record_set, topic, partition)

{[
%FetchResponse{
Expand Down Expand Up @@ -267,8 +266,7 @@ defmodule KafkaEx.New.Adapter do
group_id: request.group_name,
generation_id: request.generation_id,
member_id: request.member_id,
group_assignment:
Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
group_assignment: Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
}, request.group_name}
end

Expand All @@ -280,8 +278,7 @@ defmodule KafkaEx.New.Adapter do
}) do
%SyncGroupResponse{
error_code: Kayrock.ErrorCode.code_to_atom(error_code),
assignments:
Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
assignments: Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
}
end

Expand Down Expand Up @@ -313,8 +310,7 @@ defmodule KafkaEx.New.Adapter do
def create_topics_request(requests, timeout) do
%Kayrock.CreateTopics.V0.Request{
timeout: timeout,
create_topic_requests:
Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
create_topic_requests: Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
}
end

Expand Down Expand Up @@ -373,11 +369,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_fetch_request(offset_fetch_request, client_consumer_group) do
consumer_group =
offset_fetch_request.consumer_group || client_consumer_group
consumer_group = offset_fetch_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)
request = Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)

{%{
request
Expand Down Expand Up @@ -422,11 +416,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_commit_request(offset_commit_request, client_consumer_group) do
consumer_group =
offset_commit_request.consumer_group || client_consumer_group
consumer_group = offset_commit_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)
request = Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)

request = %{
request
Expand Down Expand Up @@ -518,8 +510,7 @@ defmodule KafkaEx.New.Adapter do
topic: request.topic,
num_partitions: request.num_partitions,
replication_factor: request.replication_factor,
replica_assignment:
Enum.map(request.replica_assignment, &Map.from_struct/1),
replica_assignment: Enum.map(request.replica_assignment, &Map.from_struct/1),
config_entries:
Enum.map(request.config_entries, fn ce ->
%{config_name: ce.config_name, config_value: ce.config_value}
Expand Down
21 changes: 7 additions & 14 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ defmodule KafkaEx.New.Client do
Kernel.reraise(e, System.stacktrace())
end

{:ok, _} =
:timer.send_interval(state.metadata_update_interval, :update_metadata)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)

{:ok, state}
end
Expand Down Expand Up @@ -159,15 +158,13 @@ defmodule KafkaEx.New.Client do
end

def handle_call({:topic_metadata, topics, allow_topic_creation}, _from, state) do
{topic_metadata, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{topic_metadata, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} =
kayrock_network_request(request, node_selector, state)
{response, updated_state} = kayrock_network_request(request, node_selector, state)

{:reply, response, updated_state}
end
Expand Down Expand Up @@ -220,8 +217,7 @@ defmodule KafkaEx.New.Client do
updated_state

_ ->
new_cluster_metadata =
ClusterMetadata.from_metadata_v1_response(response)
new_cluster_metadata = ClusterMetadata.from_metadata_v1_response(response)

{updated_cluster_metadata, brokers_to_close} =
ClusterMetadata.merge_brokers(
Expand Down Expand Up @@ -345,8 +341,7 @@ defmodule KafkaEx.New.Client do
end

_ ->
message =
"Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."

Logger.log(:error, message)
raise message
Expand Down Expand Up @@ -549,8 +544,7 @@ defmodule KafkaEx.New.Client do
end

defp run_client_request(
%{client_id: client_id, correlation_id: correlation_id} =
client_request,
%{client_id: client_id, correlation_id: correlation_id} = client_request,
send_request,
synchronous
)
Expand Down Expand Up @@ -694,8 +688,7 @@ defmodule KafkaEx.New.Client do

topic_metadata = State.topics_metadata(updated_state, topics)

{topic_metadata,
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
{topic_metadata, %{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
end

defp close_broker_by_socket(state, socket) do
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ defmodule KafkaEx.New.Client.State do
:consumer_group_update_interval,
@default_consumer_group_update_interval
),
allow_auto_topic_creation:
Keyword.get(args, :allow_auto_topic_creation, true),
allow_auto_topic_creation: Keyword.get(args, :allow_auto_topic_creation, true),
use_ssl: Keyword.get(args, :use_ssl, false),
ssl_options: Keyword.get(args, :ssl_options, []),
consumer_group_for_auto_commit: Keyword.get(args, :consumer_group)
Expand Down Expand Up @@ -99,8 +98,7 @@ defmodule KafkaEx.New.Client.State do
) do
%{
state
| cluster_metadata:
ClusterMetadata.remove_topics(cluster_metadata, topics)
| cluster_metadata: ClusterMetadata.remove_topics(cluster_metadata, topics)
}
end

Expand Down
9 changes: 3 additions & 6 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule KafkaEx.New.ClientCompatibility do
def handle_call({:metadata, topic}, _from, state) do
updated_state = update_metadata(state, [topic])

{:reply, Adapter.metadata_response(updated_state.cluster_metadata),
updated_state}
{:reply, Adapter.metadata_response(updated_state.cluster_metadata), updated_state}
end

def handle_call({:offset, topic, partition, time}, _from, state) do
Expand Down Expand Up @@ -141,8 +140,7 @@ defmodule KafkaEx.New.ClientCompatibility do
{response, updated_state}
end

{:reply, response,
%{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
{:reply, response, %{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
end

def handle_call({:join_group, request, network_timeout}, _from, state) do
Expand Down Expand Up @@ -339,8 +337,7 @@ defmodule KafkaEx.New.ClientCompatibility do
state

_ ->
{_, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{_, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

updated_state
end
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/structs/cluster_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
brokers =
metadata.brokers
|> Enum.into(%{}, fn broker_metadata ->
%{host: host, port: port, node_id: node_id, rack: rack} =
broker_metadata
%{host: host, port: port, node_id: node_id, rack: rack} = broker_metadata

{node_id, %Broker{host: host, port: port, node_id: node_id, rack: rack}}
end)
Expand Down Expand Up @@ -240,8 +239,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
KafkaExAPI.node_id()
) :: t
def put_consumer_group_coordinator(
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} =
cluster_metadata,
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} = cluster_metadata,
consumer_group,
coordinator_node_id
) do
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/new/structs/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ defmodule KafkaEx.New.Structs.Topic do
end
)

partitions =
Enum.map(partition_metadata, &Partition.from_partition_metadata/1)
partitions = Enum.map(partition_metadata, &Partition.from_partition_metadata/1)

%__MODULE__{
name: name,
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ defmodule KafkaEx.Protocol do
client_id,
api_version \\ @default_api_version
) do
<<api_key(type)::16, api_version::16, correlation_id::32,
byte_size(client_id)::16, client_id::binary>>
<<api_key(type)::16, api_version::16, correlation_id::32, byte_size(client_id)::16,
client_id::binary>>
end

@error_map %{
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ defmodule KafkaEx.Protocol.ApiVersions do
def parse_response(binary, this_api_version \\ @default_this_api_version)

def parse_response(
<<_correlation_id::32-signed, error_code::16-signed,
api_versions_count::32-signed, rest::binary>>,
<<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed,
rest::binary>>,
this_api_version
) do
%{
Expand Down Expand Up @@ -100,8 +100,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
end

defp parse_one_api_version(
<<api_key::16-signed, min_version::16-signed, max_version::16-signed,
rest::binary>>
<<api_key::16-signed, min_version::16-signed, max_version::16-signed, rest::binary>>
) do
{%ApiVersion{
api_key: api_key,
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ defmodule KafkaEx.Protocol.Common do

def parse_topics(
topics_size,
<<topic_size::16-signed, topic::size(topic_size)-binary,
partitions_size::32-signed, rest::binary>>,
<<topic_size::16-signed, topic::size(topic_size)-binary, partitions_size::32-signed,
rest::binary>>,
mod
) do
struct_module = Module.concat(mod, Response)

{partitions, topics_data} =
mod.parse_partitions(partitions_size, rest, [], topic)
{partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic)

[
%{
Expand Down
Loading

0 comments on commit 57e6bce

Please sign in to comment.