Skip to content

Commit

Permalink
Add additional layer to parse protocols & add request builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 8, 2023
1 parent 01af8f4 commit b8b6654
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 4 deletions.
36 changes: 36 additions & 0 deletions lib/kafka_ex/new/client/request_builder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule KafkaEx.New.Client.RequestBuilder do
@moduledoc """
This module is used to build request for KafkaEx.New.Client.
It's main decision point which protocol to use for building request and what
is required version.
"""
@protocol Application.get_env(

Check warning on line 7 in lib/kafka_ex/new/client/request_builder.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 25.2)

Application.get_env/3 is discouraged in the module body, use Application.compile_env/3 instead
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@default_api_version %{
describe_groups: 1
}

alias KafkaEx.New.Client.State

@doc """
Builds request for Describe Groups API
"""
@spec describe_groups_request([binary], State.t()) :: term
def describe_groups_request(group_names, state) do
api_version = get_api_version(state, :describe_groups)

@protocol.build_request(:describe_groups, api_version,
group_names: group_names
)
end

# -----------------------------------------------------------------------------
defp get_api_version(state, request_type) do
default = Map.fetch!(@default_api_version, request_type)
State.max_supported_api_version(state, request_type, default)
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defprotocol KafkaEx.New.Protocols.DescribeGroups do
defprotocol KafkaEx.New.Protocols.Kayrock.DescribeGroups do

Check warning on line 1 in lib/kafka_ex/new/protocols/kayrock/describe_groups.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 25.2)

protocols must define at least one function, but none was defined
@moduledoc """
This module handles Describe Groups request & response handling & parsing
This module handles Describe Groups request & response parsing.
Request is built using Kayrock protocol, response is parsed to
native KafkaEx structs.
"""

defprotocol Request do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defimpl KafkaEx.New.Protocols.DescribeGroups.Request,
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request,
for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do
def build_request(request_template, consumer_group_names) do
Map.put(request_template, :group_ids, consumer_group_names)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defimpl KafkaEx.New.Protocols.DescribeGroups.Response,
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response,
for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do
def parse_response(%{groups: groups}) do
case Enum.filter(groups, &(&1.error_code != 0)) do
Expand Down
20 changes: 20 additions & 0 deletions lib/kafka_ex/new/protocols/kayrock_protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule KafkaEx.New.Protocols.KayrockProtocol do
@moduledoc """
This module handles Kayrock request & response handling & parsing.
Once Kafka Ex v1.0 is released, this module will be renamed to KayrockProtocol
and will become a separated package.
"""

alias KafkaEx.New.Protocols.Kayrock, as: KayrockProtocol

@doc """
Builds request for Describe Groups API
"""
def build_request(:describe_groups, api_version, opts) do
group_names = Keyword.fetch!(opts, :group_names)

api_version
|> Kayrock.DescribeGroups.get_request_struct()
|> KayrockProtocol.DescribeGroups.Request.build_request(group_names)
end
end
17 changes: 17 additions & 0 deletions test/kafka_ex/new/client/request_builder_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule KafkaEx.New.Client.RequestBuilderTest do
use ExUnit.Case, async: true
alias KafkaEx.New.Client.RequestBuilder

describe "describe_groups_request/2" do
test "returns request for DescribeGroups API" do
state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}}
group_names = ["group1", "group2"]

assert RequestBuilder.describe_groups_request(group_names, state) ==
{:ok,
%KafkaEx.New.Protocols.DescribeGroups.Request{

Check failure on line 12 in test/kafka_ex/new/client/request_builder_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 25.2)

** (CompileError) test/kafka_ex/new/client/request_builder_test.exs:12: KafkaEx.New.Protocols.DescribeGroups.Request.__struct__/1 is undefined, cannot expand struct KafkaEx.New.Protocols.DescribeGroups.Request. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code

Check failure on line 12 in test/kafka_ex/new/client/request_builder_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

** (CompileError) test/kafka_ex/new/client/request_builder_test.exs:12: KafkaEx.New.Protocols.DescribeGroups.Request.__struct__/1 is undefined, cannot expand struct KafkaEx.New.Protocols.DescribeGroups.Request. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code

Check failure on line 12 in test/kafka_ex/new/client/request_builder_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.8, 20.3)

** (CompileError) test/kafka_ex/new/client/request_builder_test.exs:12: KafkaEx.New.Protocols.DescribeGroups.Request.__struct__/1 is undefined, cannot expand struct KafkaEx.New.Protocols.DescribeGroups.Request

Check failure on line 12 in test/kafka_ex/new/client/request_builder_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.11, 21.3)

** (CompileError) test/kafka_ex/new/client/request_builder_test.exs:12: KafkaEx.New.Protocols.DescribeGroups.Request.__struct__/1 is undefined, cannot expand struct KafkaEx.New.Protocols.DescribeGroups.Request. Make sure the struct name is correct. If the struct name exists and is correct but it still cannot be found, you likely have cyclic module usage in your code
group_names: group_names
}}
end
end
end
81 changes: 81 additions & 0 deletions test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Protocols.Kayrock.DescribeGroups, as: KayrockDescribeGroups

alias Kayrock.DescribeGroups.V0

describe "build_request/2" do
test "builds request for Describe Groups API" do
api_version = 0
consumer_group_names = ["test-group"]
expected_request = %V0.Request{group_ids: groups}

assert KayrockDescribeGroups.build_request(
api_version,
consumer_group_names
) == expected_request
end
end

describe "build_response/1" do
test "for api version 0 - returns response if all groups succeeded" do
response = %V0.Response{
groups: [
%{
group_id: "succeeded",
error_code: 0,
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment: %{
version: 0,
user_data: "user_data",
partition_assignments: [
%{topic: "test-topic", partitions: [1, 2, 3]}
]
}
}
]
}
]
}

assert {:ok,
[
%KafkaEx.New.ConsumerGroup{
group_id: "succeeded",
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%KafkaEx.New.ConsumerGroup.Member{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment:
%KafkaEx.New.ConsumerGroup.Member.MemberAssignment{
version: 0,
user_data: "user_data",
partition_assignments: [
%KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{
topic: "test-topic",
partitions: [1, 2, 3]
}
]
}
}
]
}
]} ==
DescribeGroups.Response.parse_response(response)
end
end
end

0 comments on commit b8b6654

Please sign in to comment.