Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Maintenance] Use native formatter settings #489

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 80
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
3 changes: 1 addition & 2 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@
KafkaEx.worker_setting() | {:server_impl, module}
]) :: GenServer.on_start()
def start_link_worker(name, worker_init \\ []) do
{server_impl, worker_init} =
Keyword.pop(worker_init, :server_impl, Config.server_impl())
{server_impl, worker_init} = Keyword.pop(worker_init, :server_impl, Config.server_impl())

{:ok, full_worker_init} = build_worker_options(worker_init)
server_impl.start_link(full_worker_init, name)
Expand Down Expand Up @@ -644,7 +643,7 @@
end
end

defp current_offset(

Check warning on line 646 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

default values for the optional arguments in current_offset/6 are never used

Check warning on line 646 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 25.2)

default values for the optional arguments in current_offset/6 are never used

Check warning on line 646 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.8, 20.3)

default arguments in current_offset/6 are never used

Check warning on line 646 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / Static Code Analysis (1.12.3, 24.3.4)

default values for the optional arguments in current_offset/6 are never used
supplied_offset,
partition,
topic,
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do

GenServer.start_link(
__MODULE__,
{{gen_consumer_module, consumer_module}, group_name, topics,
consumer_opts},
{{gen_consumer_module, consumer_module}, group_name, topics, consumer_opts},
gen_server_opts
)
end
Expand Down Expand Up @@ -396,8 +395,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id
}

leave_group_response =
KafkaEx.leave_group(leave_request, worker_name: worker_name)
leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name)

case leave_group_response do
%{error_code: :no_error} ->
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/default_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ defmodule KafkaEx.DefaultPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -70,8 +68,7 @@ defmodule KafkaEx.DefaultPartitioner do
) do
hash = Murmur.umurmur2(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ defmodule KafkaEx.GenConsumer do
options
) :: GenServer.on_start()
def start_link(consumer_module, group_name, topic, partition, opts \\ []) do
{server_opts, consumer_opts} =
Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])
{server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])

GenServer.start_link(
__MODULE__,
Expand Down Expand Up @@ -563,8 +562,7 @@ defmodule KafkaEx.GenConsumer do

given_fetch_options = Keyword.get(opts, :fetch_options, [])

fetch_options =
Keyword.merge(default_fetch_options, given_fetch_options)
fetch_options = Keyword.merge(default_fetch_options, given_fetch_options)

state = %State{
consumer_module: consumer_module,
Expand Down
7 changes: 2 additions & 5 deletions lib/kafka_ex/legacy_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ defmodule KafkaEx.LegacyPartitioner do
assign_partition_with_key(request, metadata, key)

{:error, reason} ->
Logger.warn(
"#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}"
)
Logger.warn("#{__MODULE__}: couldn't assign partition due to #{inspect(reason)}")

assign_partition_randomly(request, metadata)
end
Expand Down Expand Up @@ -71,8 +69,7 @@ defmodule KafkaEx.LegacyPartitioner do
) do
hash = Murmur.umurmur2_legacy(key)

partitions_count =
metadata |> MetadataResponse.partitions_for_topic(topic) |> length()
partitions_count = metadata |> MetadataResponse.partitions_for_topic(topic) |> length()

partition_id = rem(hash, partitions_count)
%{request | partition: partition_id}
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ defmodule KafkaEx.NetworkClient do
:ok ->
case Socket.recv(socket, 0, timeout) do
{:ok, data} ->
:ok =
Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])
:ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])

data

Expand Down
27 changes: 9 additions & 18 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ defmodule KafkaEx.New.Adapter do
| _
]
}) do
{message_set, last_offset} =
kayrock_message_set_to_kafka_ex(record_set, topic, partition)
{message_set, last_offset} = kayrock_message_set_to_kafka_ex(record_set, topic, partition)

{[
%FetchResponse{
Expand Down Expand Up @@ -267,8 +266,7 @@ defmodule KafkaEx.New.Adapter do
group_id: request.group_name,
generation_id: request.generation_id,
member_id: request.member_id,
group_assignment:
Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
group_assignment: Enum.map(request.assignments, &kafka_ex_group_assignment_to_kayrock/1)
}, request.group_name}
end

Expand All @@ -280,8 +278,7 @@ defmodule KafkaEx.New.Adapter do
}) do
%SyncGroupResponse{
error_code: Kayrock.ErrorCode.code_to_atom(error_code),
assignments:
Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
assignments: Enum.map(partition_assignments, fn p -> {p.topic, p.partitions} end)
}
end

Expand Down Expand Up @@ -313,8 +310,7 @@ defmodule KafkaEx.New.Adapter do
def create_topics_request(requests, timeout) do
%Kayrock.CreateTopics.V0.Request{
timeout: timeout,
create_topic_requests:
Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
create_topic_requests: Enum.map(requests, &kafka_ex_to_kayrock_create_topics/1)
}
end

Expand Down Expand Up @@ -373,11 +369,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_fetch_request(offset_fetch_request, client_consumer_group) do
consumer_group =
offset_fetch_request.consumer_group || client_consumer_group
consumer_group = offset_fetch_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)
request = Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version)

{%{
request
Expand Down Expand Up @@ -422,11 +416,9 @@ defmodule KafkaEx.New.Adapter do
end

def offset_commit_request(offset_commit_request, client_consumer_group) do
consumer_group =
offset_commit_request.consumer_group || client_consumer_group
consumer_group = offset_commit_request.consumer_group || client_consumer_group

request =
Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)
request = Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version)

request = %{
request
Expand Down Expand Up @@ -518,8 +510,7 @@ defmodule KafkaEx.New.Adapter do
topic: request.topic,
num_partitions: request.num_partitions,
replication_factor: request.replication_factor,
replica_assignment:
Enum.map(request.replica_assignment, &Map.from_struct/1),
replica_assignment: Enum.map(request.replica_assignment, &Map.from_struct/1),
config_entries:
Enum.map(request.config_entries, fn ce ->
%{config_name: ce.config_name, config_value: ce.config_value}
Expand Down
21 changes: 7 additions & 14 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ defmodule KafkaEx.New.Client do
Kernel.reraise(e, System.stacktrace())
end

{:ok, _} =
:timer.send_interval(state.metadata_update_interval, :update_metadata)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)

{:ok, state}
end
Expand Down Expand Up @@ -159,15 +158,13 @@ defmodule KafkaEx.New.Client do
end

def handle_call({:topic_metadata, topics, allow_topic_creation}, _from, state) do
{topic_metadata, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{topic_metadata, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} =
kayrock_network_request(request, node_selector, state)
{response, updated_state} = kayrock_network_request(request, node_selector, state)

{:reply, response, updated_state}
end
Expand Down Expand Up @@ -220,8 +217,7 @@ defmodule KafkaEx.New.Client do
updated_state

_ ->
new_cluster_metadata =
ClusterMetadata.from_metadata_v1_response(response)
new_cluster_metadata = ClusterMetadata.from_metadata_v1_response(response)

{updated_cluster_metadata, brokers_to_close} =
ClusterMetadata.merge_brokers(
Expand Down Expand Up @@ -345,8 +341,7 @@ defmodule KafkaEx.New.Client do
end

_ ->
message =
"Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."

Logger.log(:error, message)
raise message
Expand Down Expand Up @@ -549,8 +544,7 @@ defmodule KafkaEx.New.Client do
end

defp run_client_request(
%{client_id: client_id, correlation_id: correlation_id} =
client_request,
%{client_id: client_id, correlation_id: correlation_id} = client_request,
send_request,
synchronous
)
Expand Down Expand Up @@ -694,8 +688,7 @@ defmodule KafkaEx.New.Client do

topic_metadata = State.topics_metadata(updated_state, topics)

{topic_metadata,
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
{topic_metadata, %{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
end

defp close_broker_by_socket(state, socket) do
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ defmodule KafkaEx.New.Client.State do
:consumer_group_update_interval,
@default_consumer_group_update_interval
),
allow_auto_topic_creation:
Keyword.get(args, :allow_auto_topic_creation, true),
allow_auto_topic_creation: Keyword.get(args, :allow_auto_topic_creation, true),
use_ssl: Keyword.get(args, :use_ssl, false),
ssl_options: Keyword.get(args, :ssl_options, []),
consumer_group_for_auto_commit: Keyword.get(args, :consumer_group)
Expand Down Expand Up @@ -99,8 +98,7 @@ defmodule KafkaEx.New.Client.State do
) do
%{
state
| cluster_metadata:
ClusterMetadata.remove_topics(cluster_metadata, topics)
| cluster_metadata: ClusterMetadata.remove_topics(cluster_metadata, topics)
}
end

Expand Down
9 changes: 3 additions & 6 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule KafkaEx.New.ClientCompatibility do
def handle_call({:metadata, topic}, _from, state) do
updated_state = update_metadata(state, [topic])

{:reply, Adapter.metadata_response(updated_state.cluster_metadata),
updated_state}
{:reply, Adapter.metadata_response(updated_state.cluster_metadata), updated_state}
end

def handle_call({:offset, topic, partition, time}, _from, state) do
Expand Down Expand Up @@ -141,8 +140,7 @@ defmodule KafkaEx.New.ClientCompatibility do
{response, updated_state}
end

{:reply, response,
%{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
{:reply, response, %{state_out | allow_auto_topic_creation: allow_auto_topic_creation}}
end

def handle_call({:join_group, request, network_timeout}, _from, state) do
Expand Down Expand Up @@ -339,8 +337,7 @@ defmodule KafkaEx.New.ClientCompatibility do
state

_ ->
{_, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)
{_, updated_state} = fetch_topics_metadata(state, topics, allow_topic_creation)

updated_state
end
Expand Down
6 changes: 2 additions & 4 deletions lib/kafka_ex/new/structs/cluster_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
brokers =
metadata.brokers
|> Enum.into(%{}, fn broker_metadata ->
%{host: host, port: port, node_id: node_id, rack: rack} =
broker_metadata
%{host: host, port: port, node_id: node_id, rack: rack} = broker_metadata

{node_id, %Broker{host: host, port: port, node_id: node_id, rack: rack}}
end)
Expand Down Expand Up @@ -240,8 +239,7 @@ defmodule KafkaEx.New.Structs.ClusterMetadata do
KafkaExAPI.node_id()
) :: t
def put_consumer_group_coordinator(
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} =
cluster_metadata,
%__MODULE__{consumer_group_coordinators: consumer_group_coordinators} = cluster_metadata,
consumer_group,
coordinator_node_id
) do
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/new/structs/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ defmodule KafkaEx.New.Structs.Topic do
end
)

partitions =
Enum.map(partition_metadata, &Partition.from_partition_metadata/1)
partitions = Enum.map(partition_metadata, &Partition.from_partition_metadata/1)

%__MODULE__{
name: name,
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ defmodule KafkaEx.Protocol do
client_id,
api_version \\ @default_api_version
) do
<<api_key(type)::16, api_version::16, correlation_id::32,
byte_size(client_id)::16, client_id::binary>>
<<api_key(type)::16, api_version::16, correlation_id::32, byte_size(client_id)::16,
client_id::binary>>
end

@error_map %{
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ defmodule KafkaEx.Protocol.ApiVersions do
def parse_response(binary, this_api_version \\ @default_this_api_version)

def parse_response(
<<_correlation_id::32-signed, error_code::16-signed,
api_versions_count::32-signed, rest::binary>>,
<<_correlation_id::32-signed, error_code::16-signed, api_versions_count::32-signed,
rest::binary>>,
this_api_version
) do
%{
Expand Down Expand Up @@ -100,8 +100,7 @@ defmodule KafkaEx.Protocol.ApiVersions do
end

defp parse_one_api_version(
<<api_key::16-signed, min_version::16-signed, max_version::16-signed,
rest::binary>>
<<api_key::16-signed, min_version::16-signed, max_version::16-signed, rest::binary>>
) do
{%ApiVersion{
api_key: api_key,
Expand Down
7 changes: 3 additions & 4 deletions lib/kafka_ex/protocol/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ defmodule KafkaEx.Protocol.Common do

def parse_topics(
topics_size,
<<topic_size::16-signed, topic::size(topic_size)-binary,
partitions_size::32-signed, rest::binary>>,
<<topic_size::16-signed, topic::size(topic_size)-binary, partitions_size::32-signed,
rest::binary>>,
mod
) do
struct_module = Module.concat(mod, Response)

{partitions, topics_data} =
mod.parse_partitions(partitions_size, rest, [], topic)
{partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic)

[
%{
Expand Down
Loading
Loading