Skip to content

Commit

Permalink
Fix minor actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 16, 2024
1 parent ad3c022 commit 82ad81e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
50 changes: 36 additions & 14 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ defmodule KafkaEx do
def describe_group(consumer_group_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())

case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
case Server.call(worker_name, {:describe_groups, [consumer_group_name], opts}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
Expand Down Expand Up @@ -227,8 +227,9 @@ defmodule KafkaEx do
"""
@spec latest_offset(binary, integer, atom | pid) ::
[OffsetResponse.t()] | :topic_not_found
def latest_offset(topic, partition, name \\ Config.default_worker()),
do: offset(topic, partition, :latest, name)
def latest_offset(topic, partition, name \\ Config.default_worker()) do
offset(topic, partition, :latest, name)
end

@doc """
Get the offset of the earliest message still persistent in Kafka
Expand All @@ -240,10 +241,10 @@ defmodule KafkaEx do
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}]
```
"""
@spec earliest_offset(binary, integer, atom | pid) ::
[OffsetResponse.t()] | :topic_not_found
def earliest_offset(topic, partition, name \\ Config.default_worker()),
do: offset(topic, partition, :earliest, name)
@spec earliest_offset(binary, integer, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
def earliest_offset(topic, partition, name \\ Config.default_worker()) do
offset(topic, partition, :earliest, name)
end

@doc """
Get the offset of the message sent at the specified date/time
Expand All @@ -255,14 +256,14 @@ defmodule KafkaEx do
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}]
```
"""
@spec offset(
binary,
number,
:calendar.datetime() | :earliest | :latest,
atom | pid
) :: [OffsetResponse.t()] | :topic_not_found
@type valid_timestamp :: :earliest | :latest | :calendar.datetime()
@spec offset(binary, number, valid_timestamp, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
def offset(topic, partition, time, name \\ Config.default_worker()) do
Server.call(name, {:offset, topic, partition, time})
case Server.call(name, {:offset, topic, partition, time}) do
{:ok, response} -> parse_offset_value(response)
{:error, :topic_not_found} -> :topic_not_found
result -> result
end
end

@wait_time 10
Expand Down Expand Up @@ -812,4 +813,25 @@ defmodule KafkaEx do
end
end
end

# -------------------------------------------------------------------
# Backwards compatibility
# -------------------------------------------------------------------
defp parse_offset_value(
[%KafkaEx.New.Structs.Offset{topic: topic, partition_offsets: partition_offsets} | _] = offsets

Check warning on line 821 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.12, 22.3)

variable "partition_offsets" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 821 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.12, 22.3)

variable "topic" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 821 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

variable "partition_offsets" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 821 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

variable "topic" is unused (if the variable is not meant to be used, prefix it with an underscore)
) do
Enum.map(offsets, fn offset ->
%OffsetResponse{
topic: offset.topic,
partition_offsets:
Enum.map(offset.partition_offsets, fn value ->
%{
partition: value.partition,
error_code: value.error_code,
offset: [value.offset]
}
end)
}
end)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
end

defp build_offset(topic, %{partition: partition, error_code: 0, offsets: []}) do
data = %{partition: partition, offset: 0}
data = %{partition: partition, offset: 0, error_code: :no_error}
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
end

defp build_offset(topic, %{partition: partition, error_code: 0, offsets: [offset | _]}) do
data = %{partition: partition, offset: offset}
data = %{partition: partition, offset: offset, error_code: :no_error}
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
end

defp build_offset(topic, %{error_code: 0, partition: p, offset: o}) do
data = %{partition: p, offset: o}
data = %{partition: p, offset: o, error_code: :no_error}
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
end

defp build_offset(topic, %{error_code: 0, partition: p, offset: o, timestamp: t}) do
data = %{partition: p, offset: o, timestamp: t}
data = %{partition: p, offset: o, timestamp: t, error_code: :no_error}
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
end

Expand Down
12 changes: 8 additions & 4 deletions lib/kafka_ex/new/structs/offset/partition_offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do
@moduledoc """
This module represents Offset value for a specific partition
"""
defstruct [:partition, :offset, :timestamp]
defstruct [:partition, :offset, :error_code, :timestamp]

@type partition :: KafkaEx.Types.partition()
@type offset :: KafkaEx.Types.offset()
@type timestamp :: KafkaEx.Types.timestamp()
@type error_code :: KafkaEx.Types.error_code()

@type partition_response :: %{
required(:partition) => partition,
required(:error_code) => error_code,
required(:offset) => offset,
optional(:timestamp) => timestamp
}
Expand All @@ -24,12 +26,14 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do
For backward compatibility with kafka_ex, we will replace this nil values with -1
"""
@spec build(partition_response) :: __MODULE__.t()
def build(%{partition: p, offset: o, timestamp: t}), do: do_build(p, o, t)
def build(%{partition: p, offset: o}), do: do_build(p, o, -1)
def build(%{partition: p, offset: o, error_code: e, timestamp: t}), do: do_build(p, o, e, t)
def build(%{partition: p, offset: o, error_code: e}), do: do_build(p, o, e, -1)
def build(%{partition: p, offset: o}), do: do_build(p, o, :no_error, -1)

defp do_build(partition, offset, timestamp) do
defp do_build(partition, offset, error_code, timestamp) do
%__MODULE__{
partition: partition,
error_code: error_code,
offset: offset,
timestamp: timestamp
}
Expand Down
30 changes: 5 additions & 25 deletions test/integration/kayrock/compatibility_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
{:ok, %{consumer_group: consumer_group, topic: topic}}
end

test "with new client - returns group metadata", %{
client: client,
consumer_group: consumer_group,
topic: topic
} do
test "with new client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group)
Expand All @@ -57,11 +53,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
assert length(group_metadata.members) == 1
end

test "with old client - returns group metadata", %{
client: client,
consumer_group: consumer_group,
topic: topic
} do
test "with old client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} = KafkaEx.describe_group(consumer_group, worker_name: client)
Expand Down Expand Up @@ -109,21 +101,11 @@ defmodule KafkaEx.KayrockCompatibilityTest do
end

test "produce/4 without an acq required returns :ok", %{client: client} do
assert KafkaEx.produce("food", 0, "hey",
worker_name: client,
required_acks: 0
) == :ok
assert KafkaEx.produce("food", 0, "hey", worker_name: client, required_acks: 0) == :ok
end

test "produce/4 with ack required returns an ack", %{client: client} do
{:ok, offset} =
KafkaEx.produce(
"food",
0,
"hey",
worker_name: client,
required_acks: 1
)
{:ok, offset} = KafkaEx.produce("food",0,"hey",worker_name: client,required_acks: 1)

assert is_integer(offset)
refute offset == nil
Expand Down Expand Up @@ -306,9 +288,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
assert offset == 0
end

test "latest_offset retrieves offset of 0 for non-existing topic", %{
client: client
} do
test "latest_offset retrieves offset of 0 for non-existing topic", %{client: client} do
random_string = KafkaEx.TestHelpers.generate_random_string()

{:ok, produce_offset} =
Expand Down

0 comments on commit 82ad81e

Please sign in to comment.