From 65a8d135a41c6071844e673df3f895e62689213d Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 18 Mar 2023 14:30:26 +0100 Subject: [PATCH 01/11] [Kayrock Preparation] Unify file nesting between different dirs --- lib/kafka_ex/new/{ => structs}/broker.ex | 10 ++- .../new/{ => structs}/cluster_metadata.ex | 10 ++- .../new/{ => structs}/node_selector.ex | 25 ++++-- lib/kafka_ex/new/{ => structs}/partition.ex | 7 +- lib/kafka_ex/new/{ => structs}/topic.ex | 7 +- test/{ => kafka_ex}/compression_test.exs | 4 +- .../protocol/consumer_metadata_test.exs | 0 .../protocol/delete_topics_test.exs | 0 test/{ => kafka_ex}/protocol/fetch_test.exs | 0 .../protocol/heartbeat_test.exs | 0 .../protocol/join_group_test.exs | 0 .../protocol/leave_group_test.exs | 0 .../{ => kafka_ex}/protocol/metadata_test.exs | 0 .../protocol/offset_commit_test.exs | 0 .../protocol/offset_fetch_test.exs | 0 test/{ => kafka_ex}/protocol/offset_test.exs | 0 test/{ => kafka_ex}/protocol/produce_test.exs | 0 .../protocol/sync_group_test.exs | 0 test/{ => kafka_ex}/socket_test.exs | 81 ------------------- 19 files changed, 45 insertions(+), 99 deletions(-) rename lib/kafka_ex/new/{ => structs}/broker.ex (68%) rename lib/kafka_ex/new/{ => structs}/cluster_metadata.ex (98%) rename lib/kafka_ex/new/{ => structs}/node_selector.ex (70%) rename lib/kafka_ex/new/{ => structs}/partition.ex (74%) rename lib/kafka_ex/new/{ => structs}/topic.ex (81%) rename test/{ => kafka_ex}/compression_test.exs (94%) rename test/{ => kafka_ex}/protocol/consumer_metadata_test.exs (100%) rename test/{ => kafka_ex}/protocol/delete_topics_test.exs (100%) rename test/{ => kafka_ex}/protocol/fetch_test.exs (100%) rename test/{ => kafka_ex}/protocol/heartbeat_test.exs (100%) rename test/{ => kafka_ex}/protocol/join_group_test.exs (100%) rename test/{ => kafka_ex}/protocol/leave_group_test.exs (100%) rename test/{ => kafka_ex}/protocol/metadata_test.exs (100%) rename test/{ => kafka_ex}/protocol/offset_commit_test.exs (100%) rename test/{ => kafka_ex}/protocol/offset_fetch_test.exs (100%) rename test/{ => kafka_ex}/protocol/offset_test.exs (100%) rename test/{ => kafka_ex}/protocol/produce_test.exs (100%) rename test/{ => kafka_ex}/protocol/sync_group_test.exs (100%) rename test/{ => kafka_ex}/socket_test.exs (59%) diff --git a/lib/kafka_ex/new/broker.ex b/lib/kafka_ex/new/structs/broker.ex similarity index 68% rename from lib/kafka_ex/new/broker.ex rename to lib/kafka_ex/new/structs/broker.ex index 70a17283..67810f00 100644 --- a/lib/kafka_ex/new/broker.ex +++ b/lib/kafka_ex/new/structs/broker.ex @@ -1,6 +1,6 @@ defmodule KafkaEx.New.Broker do @moduledoc """ - Encapsulates what we know about a broker + Encapsulates what we know about a broker and our connection """ alias KafkaEx.Socket @@ -11,7 +11,13 @@ defmodule KafkaEx.New.Broker do socket: nil, rack: nil - @type t :: %__MODULE__{} + @type t :: %__MODULE__{ + node_id: non_neg_integer, + host: binary, + port: non_neg_integer, + socket: Socket.t() | nil, + rack: binary + } @doc false def put_socket(%__MODULE__{} = broker, socket), do: %{broker | socket: socket} diff --git a/lib/kafka_ex/new/cluster_metadata.ex b/lib/kafka_ex/new/structs/cluster_metadata.ex similarity index 98% rename from lib/kafka_ex/new/cluster_metadata.ex rename to lib/kafka_ex/new/structs/cluster_metadata.ex index 925a51ee..a43182ad 100644 --- a/lib/kafka_ex/new/cluster_metadata.ex +++ b/lib/kafka_ex/new/structs/cluster_metadata.ex @@ -231,8 +231,9 @@ defmodule KafkaEx.New.ClusterMetadata do %{cluster_metadata | brokers: updated_brokers} end - @doc false - # update a consumer group coordinator node id + @doc """ + update a consumer group coordinator node id + """ @spec put_consumer_group_coordinator( t, KafkaExAPI.consumer_group_name(), @@ -255,8 +256,9 @@ defmodule KafkaEx.New.ClusterMetadata do } end - @doc false - # remove the given topics (e.g., when they are deleted) + @doc """ + remove the given topics (e.g., when they are deleted) + """ @spec remove_topics(t, [KafkaExAPI.topic_name()]) :: t def remove_topics( %__MODULE__{topics: topics} = cluster_metadata, diff --git a/lib/kafka_ex/new/node_selector.ex b/lib/kafka_ex/new/structs/node_selector.ex similarity index 70% rename from lib/kafka_ex/new/node_selector.ex rename to lib/kafka_ex/new/structs/node_selector.ex index 00d4a2ad..62e7dd28 100644 --- a/lib/kafka_ex/new/node_selector.ex +++ b/lib/kafka_ex/new/structs/node_selector.ex @@ -4,7 +4,6 @@ defmodule KafkaEx.New.NodeSelector do """ alias KafkaEx.New.KafkaExAPI - alias KafkaEx.New.NodeSelector defstruct strategy: nil, node_id: nil, @@ -12,33 +11,43 @@ defmodule KafkaEx.New.NodeSelector do partition: nil, consumer_group_name: nil - @type t :: %__MODULE__{} + @type valid_strategy :: + :node_id + | :random + | :first_available + | :controller + | :topic_partition + | :consumer_group + @type t :: %__MODULE__{ + strategy: valid_strategy, + node_id: non_neg_integer + } @doc """ Select a specific node """ @spec node_id(KafkaExAPI.node_id()) :: t def node_id(node_id) when is_integer(node_id) do - %NodeSelector{strategy: :node_id, node_id: node_id} + %__MODULE__{strategy: :node_id, node_id: node_id} end @doc """ Select a random node """ @spec random :: t - def random, do: %NodeSelector{strategy: :random} + def random, do: %__MODULE__{strategy: :random} @doc """ Select first available node """ @spec first_available :: t - def first_available, do: %NodeSelector{strategy: :first_available} + def first_available, do: %__MODULE__{strategy: :first_available} @doc """ Select the cluster's controller node """ @spec controller :: t - def controller, do: %NodeSelector{strategy: :controller} + def controller, do: %__MODULE__{strategy: :controller} @doc """ Select the controller for the given topic and partition @@ -46,7 +55,7 @@ defmodule KafkaEx.New.NodeSelector do @spec topic_partition(KafkaExAPI.topic_name(), KafkaExAPI.partition_id()) :: t def topic_partition(topic, partition) when is_binary(topic) and is_integer(partition) do - %NodeSelector{ + %__MODULE__{ strategy: :topic_partition, topic: topic, partition: partition @@ -58,7 +67,7 @@ defmodule KafkaEx.New.NodeSelector do """ @spec consumer_group(KafkaExAPI.consumer_group_name()) :: t def consumer_group(consumer_group_name) when is_binary(consumer_group_name) do - %NodeSelector{ + %__MODULE__{ strategy: :consumer_group, consumer_group_name: consumer_group_name } diff --git a/lib/kafka_ex/new/partition.ex b/lib/kafka_ex/new/structs/partition.ex similarity index 74% rename from lib/kafka_ex/new/partition.ex rename to lib/kafka_ex/new/structs/partition.ex index 468acb3f..7bf8bdd0 100644 --- a/lib/kafka_ex/new/partition.ex +++ b/lib/kafka_ex/new/structs/partition.ex @@ -5,7 +5,12 @@ defmodule KafkaEx.New.Partition do defstruct partition_id: nil, leader: -1, replicas: [], isr: [] - @type t :: %__MODULE__{} + @type t :: %__MODULE__{ + partition_id: integer, + leader: integer, + replicas: list(integer), + isr: list(integer) + } @doc false def from_partition_metadata(%{ diff --git a/lib/kafka_ex/new/topic.ex b/lib/kafka_ex/new/structs/topic.ex similarity index 81% rename from lib/kafka_ex/new/topic.ex rename to lib/kafka_ex/new/structs/topic.ex index e39bc1e6..27eb4233 100644 --- a/lib/kafka_ex/new/topic.ex +++ b/lib/kafka_ex/new/structs/topic.ex @@ -10,7 +10,12 @@ defmodule KafkaEx.New.Topic do is_internal: false, partitions: [] - @type t :: %__MODULE__{} + @type t :: %__MODULE__{ + name: String.t(), + partition_leaders: %{integer() => integer()}, + is_internal: boolean(), + partitions: [Partition.t()] + } @doc false def from_topic_metadata(%{ diff --git a/test/compression_test.exs b/test/kafka_ex/compression_test.exs similarity index 94% rename from test/compression_test.exs rename to test/kafka_ex/compression_test.exs index 227dea5f..ad2c33a4 100644 --- a/test/compression_test.exs +++ b/test/kafka_ex/compression_test.exs @@ -1,5 +1,5 @@ -defmodule CompressionTest do - use ExUnit.Case +defmodule KafkaEx.CompressionTest do + use ExUnit.Case, async: true test "snappy decompression works with chunked messages" do data = diff --git a/test/protocol/consumer_metadata_test.exs b/test/kafka_ex/protocol/consumer_metadata_test.exs similarity index 100% rename from test/protocol/consumer_metadata_test.exs rename to test/kafka_ex/protocol/consumer_metadata_test.exs diff --git a/test/protocol/delete_topics_test.exs b/test/kafka_ex/protocol/delete_topics_test.exs similarity index 100% rename from test/protocol/delete_topics_test.exs rename to test/kafka_ex/protocol/delete_topics_test.exs diff --git a/test/protocol/fetch_test.exs b/test/kafka_ex/protocol/fetch_test.exs similarity index 100% rename from test/protocol/fetch_test.exs rename to test/kafka_ex/protocol/fetch_test.exs diff --git a/test/protocol/heartbeat_test.exs b/test/kafka_ex/protocol/heartbeat_test.exs similarity index 100% rename from test/protocol/heartbeat_test.exs rename to test/kafka_ex/protocol/heartbeat_test.exs diff --git a/test/protocol/join_group_test.exs b/test/kafka_ex/protocol/join_group_test.exs similarity index 100% rename from test/protocol/join_group_test.exs rename to test/kafka_ex/protocol/join_group_test.exs diff --git a/test/protocol/leave_group_test.exs b/test/kafka_ex/protocol/leave_group_test.exs similarity index 100% rename from test/protocol/leave_group_test.exs rename to test/kafka_ex/protocol/leave_group_test.exs diff --git a/test/protocol/metadata_test.exs b/test/kafka_ex/protocol/metadata_test.exs similarity index 100% rename from test/protocol/metadata_test.exs rename to test/kafka_ex/protocol/metadata_test.exs diff --git a/test/protocol/offset_commit_test.exs b/test/kafka_ex/protocol/offset_commit_test.exs similarity index 100% rename from test/protocol/offset_commit_test.exs rename to test/kafka_ex/protocol/offset_commit_test.exs diff --git a/test/protocol/offset_fetch_test.exs b/test/kafka_ex/protocol/offset_fetch_test.exs similarity index 100% rename from test/protocol/offset_fetch_test.exs rename to test/kafka_ex/protocol/offset_fetch_test.exs diff --git a/test/protocol/offset_test.exs b/test/kafka_ex/protocol/offset_test.exs similarity index 100% rename from test/protocol/offset_test.exs rename to test/kafka_ex/protocol/offset_test.exs diff --git a/test/protocol/produce_test.exs b/test/kafka_ex/protocol/produce_test.exs similarity index 100% rename from test/protocol/produce_test.exs rename to test/kafka_ex/protocol/produce_test.exs diff --git a/test/protocol/sync_group_test.exs b/test/kafka_ex/protocol/sync_group_test.exs similarity index 100% rename from test/protocol/sync_group_test.exs rename to test/kafka_ex/protocol/sync_group_test.exs diff --git a/test/socket_test.exs b/test/kafka_ex/socket_test.exs similarity index 59% rename from test/socket_test.exs rename to test/kafka_ex/socket_test.exs index b4ff1dfe..126ede1a 100644 --- a/test/socket_test.exs +++ b/test/kafka_ex/socket_test.exs @@ -1,87 +1,6 @@ defmodule KafkaEx.Socket.Test do use ExUnit.Case, async: false - defmodule Server do - def start(port) do - {:ok, listen_socket} = - :gen_tcp.listen(port, [ - :binary, - {:active, false}, - {:reuseaddr, true}, - {:packet, 0} - ]) - - spawn_link(fn -> listen(listen_socket) end) - end - - defp listen(socket) do - {:ok, conn} = :gen_tcp.accept(socket) - spawn_link(fn -> recv(conn) end) - listen(socket) - end - - defp recv(conn) do - case :gen_tcp.recv(conn, 0) do - {:ok, data} -> - :ok = :gen_tcp.send(conn, data) - - {:error, :closed} -> - :ok - end - end - end - - defmodule SSLServer do - def start(port) do - {:ok, listen_socket} = - :ssl.listen(port, [ - :binary, - {:verify, :verify_none}, - {:active, false}, - {:reuseaddr, true}, - {:packet, 0}, - {:certfile, 'test/fixtures/server.crt'}, - {:keyfile, 'test/fixtures/server.key'} - ]) - - spawn_link(fn -> listen(listen_socket) end) - end - - defp listen(socket) do - case :ssl.transport_accept(socket) do - {:ok, conn} -> - if otp_version_21_plus?() do - {:ok, _socket} = :ssl.handshake(conn) - else - :ok = :ssl.ssl_accept(conn) - end - - pid = spawn_link(fn -> recv(conn) end) - :ssl.controlling_process(socket, pid) - - _ -> - :ok - end - - listen(socket) - end - - defp recv(conn) do - case :ssl.recv(conn, 0) do - {:ok, data} -> - :ok = :ssl.send(conn, data) - - {:error, :closed} -> - :ok - end - end - - defp otp_version_21_plus? do - {version, _} = System.otp_release() |> Float.parse() - version >= 21 - end - end - setup_all do :ssl.start() end From d5bd1c320b1a5996e4de0390555827caa3a776a7 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 18 Mar 2023 14:32:13 +0100 Subject: [PATCH 02/11] [Kayrock Preparation] Add unit tests for structs --- test/kafka_ex/new/structs/broker_test.exs | 86 ++++++ .../new/structs/cluster_metadata_test.exs | 270 ++++++++++++++++++ .../new/structs/node_selector_test.exs | 83 ++++++ test/kafka_ex/new/structs/partiton_test.exs | 43 +++ test/kafka_ex/new/structs/topic_test.exs | 52 ++++ 5 files changed, 534 insertions(+) create mode 100644 test/kafka_ex/new/structs/broker_test.exs create mode 100644 test/kafka_ex/new/structs/cluster_metadata_test.exs create mode 100644 test/kafka_ex/new/structs/node_selector_test.exs create mode 100644 test/kafka_ex/new/structs/partiton_test.exs create mode 100644 test/kafka_ex/new/structs/topic_test.exs diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs new file mode 100644 index 00000000..29d94d34 --- /dev/null +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -0,0 +1,86 @@ +defmodule KafkaEx.New.BrokerTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Broker + + setup do + pid = Server.start(3040) + + {:ok, socket} = + KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + + on_exit(fn -> + KafkaEx.Socket.close(socket) + Process.exit(pid, :normal) + end) + + {:ok, [socket: socket]} + end + + describe "connect_broker/1" do + @tag skip: true + test "connects broker via socket" do + end + end + + describe "put_socket/2" do + test "nullify broker socket" do + broker = %Broker{socket: %KafkaEx.Socket{}} |> Broker.put_socket(nil) + + assert is_nil(broker.socket) + end + + test "updates broker socket to new one" do + socket = %KafkaEx.Socket{} + broker = %Broker{socket: nil} |> Broker.put_socket(socket) + + assert broker.socket == socket + end + end + + describe "connected?/1" do + test "returns false if socket is nil" do + broker = %Broker{socket: nil} + + refute Broker.connected?(broker) + end + + test "returns false if socket is not connected" do + broker = %Broker{socket: nil} + + refute Broker.connected?(broker) + end + + test "returns true if socket is connected", %{socket: socket} do + broker = %Broker{socket: socket} + + assert Broker.connected?(broker) + end + end + + describe "has_socket?/1" do + test "returns false if broker doesn't have a socket" do + broker = %Broker{socket: nil} + socket = %KafkaEx.Socket{} + + refute Broker.has_socket?(broker, socket) + end + + test "returns false if broker has different socket", %{socket: socket_one} do + {:ok, socket_two} = + KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + + broker = %Broker{socket: nil} |> Broker.put_socket(socket_one) + + refute Broker.has_socket?(broker, socket_two.socket) + KafkaEx.Socket.close(socket_two) + end + + test "returns true if broker has same socket", %{socket: socket} do + socket = %KafkaEx.Socket{} + broker = %Broker{socket: nil} |> Broker.put_socket(socket) + + assert Broker.has_socket?(broker, socket.socket) + end + end +end diff --git a/test/kafka_ex/new/structs/cluster_metadata_test.exs b/test/kafka_ex/new/structs/cluster_metadata_test.exs new file mode 100644 index 00000000..ff773503 --- /dev/null +++ b/test/kafka_ex/new/structs/cluster_metadata_test.exs @@ -0,0 +1,270 @@ +defmodule KafkaEx.New.ClusterMetadataTest do + use ExUnit.Case, async: true + alias KafkaEx.New.ClusterMetadata + + describe "from_metadata_v1_response/1" do + @tag skip: true + test "builds cluster metadata based on v1 response" do + end + end + + describe "known_topics/1" do + test "return list of all known topics" do + topic = %KafkaEx.New.Topic{name: "test-topic"} + cluster = %ClusterMetadata{topics: %{topic.name => topic}} + + assert ClusterMetadata.known_topics(cluster) == ["test-topic"] + end + end + + describe "topics_metadata/1" do + test "returns metadata for topics we've asked for" do + topic_1 = %KafkaEx.New.Topic{name: "test-topic-one"} + topic_2 = %KafkaEx.New.Topic{name: "test-topic-two"} + + cluster = %ClusterMetadata{ + topics: %{ + topic_1.name => topic_1, + topic_2.name => topic_2 + } + } + + assert ClusterMetadata.topics_metadata(cluster, ["test-topic-one"]) == [ + topic_1 + ] + end + end + + describe "brokers/1" do + test "returns list of brokers" do + broker = %KafkaEx.New.Broker{node_id: 1} + cluster = %ClusterMetadata{brokers: %{1 => broker}} + + assert ClusterMetadata.brokers(cluster) == [broker] + end + end + + describe "select_node/2" do + test "returns random node" do + broker = %KafkaEx.New.Broker{node_id: 1} + node_selector = KafkaEx.New.NodeSelector.random() + cluster = %ClusterMetadata{brokers: %{1 => broker}} + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:ok, broker.node_id} + end + + test "returns controller node" do + broker_1 = %KafkaEx.New.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Broker{node_id: 2} + node_selector = KafkaEx.New.NodeSelector.controller() + + cluster = %ClusterMetadata{ + controller_id: 1, + brokers: %{1 => broker_1, 2 => broker_2} + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:ok, broker_1.node_id} + end + + test "returns node based on node_id" do + broker_1 = %KafkaEx.New.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Broker{node_id: 2} + node_selector = KafkaEx.New.NodeSelector.node_id(2) + + cluster = %ClusterMetadata{ + controller_id: 1, + brokers: %{1 => broker_1, 2 => broker_2} + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:ok, broker_2.node_id} + end + + test "returns error when node does not exist" do + broker_1 = %KafkaEx.New.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Broker{node_id: 2} + node_selector = KafkaEx.New.NodeSelector.node_id(3) + + cluster = %ClusterMetadata{ + controller_id: 1, + brokers: %{1 => broker_1, 2 => broker_2} + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:error, :no_such_node} + end + + test "returns node based on topic & partition id" do + topic_one = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-one", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 123, replicas: [], isr: []} + ] + }) + + topic_two = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-two", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 321, replicas: [], isr: []} + ] + }) + + node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-one", 0) + + cluster = %ClusterMetadata{ + topics: %{ + topic_one.name => topic_one, + topic_two.name => topic_two + } + } + + assert ClusterMetadata.select_node(cluster, node_selector) == {:ok, 123} + end + + test "returns error when topic does not exist" do + topic = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-one", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 123, replicas: [], isr: []} + ] + }) + + node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-two", 0) + + cluster = %ClusterMetadata{ + topics: %{ + topic.name => topic + } + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:error, :no_such_topic} + end + + test "returns error when partition does not exist" do + topic = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-one", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 123, replicas: [], isr: []} + ] + }) + + node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-one", 1) + + cluster = %ClusterMetadata{ + topics: %{ + topic.name => topic + } + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:error, :no_such_partition} + end + + test "returns node based on consumer group name" do + node_selector = + KafkaEx.New.NodeSelector.consumer_group("consumer-group-one") + + cluster = %ClusterMetadata{ + consumer_group_coordinators: %{ + "consumer-group-one" => 1, + "consumer-group-two" => 2 + } + } + + assert ClusterMetadata.select_node(cluster, node_selector) == {:ok, 1} + end + + test "returns error when consumer group does not exist" do + node_selector = + KafkaEx.New.NodeSelector.consumer_group("consumer-group-three") + + cluster = %ClusterMetadata{ + consumer_group_coordinators: %{ + "consumer-group-one" => 1, + "consumer-group-two" => 2 + } + } + + assert ClusterMetadata.select_node(cluster, node_selector) == + {:error, :no_such_consumer_group} + end + end + + describe "merge_brokers/2" do + end + + describe "broker_by_node_id/1" do + test "returns broker by its node id" do + broker = %KafkaEx.New.Broker{node_id: 1} + cluster = %ClusterMetadata{brokers: %{1 => broker}} + + assert ClusterMetadata.broker_by_node_id(cluster, 1) == broker + end + + test "returns nil when broker is not found" do + broker = %KafkaEx.New.Broker{node_id: 1} + cluster = %ClusterMetadata{brokers: %{1 => broker}} + + refute ClusterMetadata.broker_by_node_id(cluster, 2) + end + end + + describe "update_brokers/2" do + test "updates brokers based on given function" do + socket = %KafkaEx.Socket{} + broker = %KafkaEx.New.Broker{node_id: 1, socket: socket} + cluster = %ClusterMetadata{brokers: %{1 => broker}} + + updated_cluster = + ClusterMetadata.update_brokers(cluster, fn broker_to_update -> + KafkaEx.New.Broker.put_socket(broker_to_update, nil) + end) + + updated_broker = ClusterMetadata.broker_by_node_id(updated_cluster, 1) + refute updated_broker.socket + end + end + + describe "remove_topics/2" do + test "test removes topic based on its name" do + topic_one = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-one", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 123, replicas: [], isr: []} + ] + }) + + topic_two = + KafkaEx.New.Topic.from_topic_metadata(%{ + topic: "topic-two", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 0, leader: 321, replicas: [], isr: []} + ] + }) + + cluster = %ClusterMetadata{ + topics: %{ + topic_one.name => topic_one, + topic_two.name => topic_two + } + } + + updated_cluster = ClusterMetadata.remove_topics(cluster, ["topic-two"]) + assert ClusterMetadata.known_topics(updated_cluster) == ["topic-one"] + end + end +end diff --git a/test/kafka_ex/new/structs/node_selector_test.exs b/test/kafka_ex/new/structs/node_selector_test.exs new file mode 100644 index 00000000..b0a012b3 --- /dev/null +++ b/test/kafka_ex/new/structs/node_selector_test.exs @@ -0,0 +1,83 @@ +defmodule KafkaEx.New.NodeSelectorTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.NodeSelector + + describe "node_id/1" do + test "build selector based on node id strategy" do + selector = NodeSelector.node_id(123) + + assert selector.strategy == :node_id + assert selector.node_id == 123 + refute selector.topic + refute selector.partition + refute selector.consumer_group_name + end + + test "raises error when node id is invalid" do + assert_raise FunctionClauseError, fn -> + NodeSelector.node_id("invalid data") + end + end + end + + describe "random/0" do + test "returns selector with random strategy" do + selector = NodeSelector.random() + + assert selector.strategy == :random + refute selector.node_id + refute selector.topic + refute selector.partition + refute selector.consumer_group_name + end + end + + describe "first_available/0" do + test "returns selector with first_available strategy" do + selector = NodeSelector.first_available() + + assert selector.strategy == :first_available + refute selector.node_id + refute selector.topic + refute selector.partition + refute selector.consumer_group_name + end + end + + describe "controller/0" do + test "returns selector with controller strategy" do + selector = NodeSelector.controller() + + assert selector.strategy == :controller + refute selector.node_id + refute selector.topic + refute selector.partition + refute selector.consumer_group_name + end + end + + describe "topic_partition/2" do + test "returns selector with topic_partition strategy" do + selector = NodeSelector.topic_partition("topic-name", 123) + + assert selector.strategy == :topic_partition + refute selector.node_id + assert selector.topic == "topic-name" + assert selector.partition == 123 + refute selector.consumer_group_name + end + end + + describe "consumer_group/1" do + test "returns selector with topic_partition strategy" do + selector = NodeSelector.consumer_group("consumer-group-name") + + assert selector.strategy == :consumer_group + refute selector.node_id + refute selector.topic + refute selector.partition + refute selector.consumer_group_name == "consumer_group-name" + end + end +end diff --git a/test/kafka_ex/new/structs/partiton_test.exs b/test/kafka_ex/new/structs/partiton_test.exs new file mode 100644 index 00000000..20ee78e2 --- /dev/null +++ b/test/kafka_ex/new/structs/partiton_test.exs @@ -0,0 +1,43 @@ +defmodule KafkaEx.New.PartitionTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Partition + + describe "from_partition_metadata/1" do + setup do + metadata = %{ + error_code: 0, + partition: 1, + leader: 2, + replicas: [123], + isr: [21] + } + + {:ok, %{metadata: metadata}} + end + + test "set partition id", %{metadata: metadata} do + partition = Partition.from_partition_metadata(metadata) + + assert partition.partition_id == 1 + end + + test "set partition leader", %{metadata: metadata} do + partition = Partition.from_partition_metadata(metadata) + + assert partition.leader == 2 + end + + test "set partition replicas", %{metadata: metadata} do + partition = Partition.from_partition_metadata(metadata) + + assert partition.replicas == [123] + end + + test "set partition isr", %{metadata: metadata} do + partition = Partition.from_partition_metadata(metadata) + + assert partition.isr == [21] + end + end +end diff --git a/test/kafka_ex/new/structs/topic_test.exs b/test/kafka_ex/new/structs/topic_test.exs new file mode 100644 index 00000000..d7a04d40 --- /dev/null +++ b/test/kafka_ex/new/structs/topic_test.exs @@ -0,0 +1,52 @@ +defmodule KafkaEx.New.TopicTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Topic + + describe "from_topic_metadata/1" do + setup do + metadata = %{ + topic: "test-topic", + is_internal: false, + partition_metadata: [ + %{error_code: 0, partition: 123, leader: 321, replicas: [], isr: []} + ] + } + + {:ok, %{metadata: metadata}} + end + + test "sets topic name from metadata", %{metadata: metadata} do + topic = Topic.from_topic_metadata(metadata) + + assert topic.name == "test-topic" + end + + test "sets is_internal from metadata", %{metadata: metadata} do + topic = Topic.from_topic_metadata(metadata) + + assert topic.is_internal == false + end + + test "sets only valid partition leaders from metadata", %{ + metadata: metadata + } do + topic = Topic.from_topic_metadata(metadata) + + assert topic.partition_leaders == %{123 => 321} + end + + test "sets partitions from metadata", %{metadata: metadata} do + topic = Topic.from_topic_metadata(metadata) + + assert topic.partitions == [ + %KafkaEx.New.Partition{ + partition_id: 123, + leader: 321, + replicas: [], + isr: [] + } + ] + end + end +end From 89c26a3b5fe70d18fa848e69ca49392583bdbf08 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 18 Mar 2023 14:34:33 +0100 Subject: [PATCH 03/11] [Kayrock Preparation] Add NetworkClient behaviour --- lib/kafka_ex/network_client.ex | 21 +++++------ lib/kafka_ex/network_client/behaviour.ex | 47 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 11 deletions(-) create mode 100644 lib/kafka_ex/network_client/behaviour.ex diff --git a/lib/kafka_ex/network_client.ex b/lib/kafka_ex/network_client.ex index f38f3cc6..cd460e17 100644 --- a/lib/kafka_ex/network_client.ex +++ b/lib/kafka_ex/network_client.ex @@ -1,12 +1,13 @@ defmodule KafkaEx.NetworkClient do + @moduledoc """ + KafkaEx implementation of Client used to connect to Kafka Broker + """ + @behaviour KafkaEx.NetworkClient.Behaviour + require Logger - alias KafkaEx.New - alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Socket - @moduledoc false - @spec create_socket(binary, non_neg_integer, KafkaEx.ssl_options(), boolean) :: - nil | Socket.t() + @impl true def create_socket(host, port, ssl_options \\ [], use_ssl \\ false) do case Socket.create( format_host(host), @@ -32,12 +33,11 @@ defmodule KafkaEx.NetworkClient do end end - @spec close_socket(nil | Socket.t()) :: :ok + @impl true def close_socket(nil), do: :ok def close_socket(socket), do: Socket.close(socket) - @spec send_async_request(Broker.t() | New.Broker.t(), iodata) :: - :ok | {:error, :closed | :inet.posix()} + @impl true def send_async_request(broker, data) do socket = broker.socket @@ -55,8 +55,7 @@ defmodule KafkaEx.NetworkClient do end end - @spec send_sync_request(Broker.t() | New.Broker.t(), iodata, timeout) :: - iodata | {:error, any()} + @impl true def send_sync_request(%{:socket => socket} = broker, data, timeout) do :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, false}]) @@ -99,7 +98,7 @@ defmodule KafkaEx.NetworkClient do {:error, :no_broker} end - @spec format_host(binary) :: [char] | :inet.ip_address() + @impl true def format_host(host) do case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do [match_data] = [[_, _, _, _, _]] -> diff --git a/lib/kafka_ex/network_client/behaviour.ex b/lib/kafka_ex/network_client/behaviour.ex new file mode 100644 index 00000000..f8d71e06 --- /dev/null +++ b/lib/kafka_ex/network_client/behaviour.ex @@ -0,0 +1,47 @@ +defmodule KafkaEx.NetworkClient.Behaviour do + @moduledoc """ + Behaviour for any network client. + Created mainly to allow mocking request & responses in unit tests. + """ + + @type host :: binary + @type host_port :: non_neg_integer + @type use_ssl :: boolean + @type kafka_ex_socket :: KafkaEx.Socket.t() + @type kafka_ex_broker :: + KafkaEx.Protocol.Metadata.Broker.t() | KafkaEx.New.Broker.t() + @type request_data :: iodata + @type response_data :: iodata + + @doc """ + Creates a socket to the given host and port. + """ + @callback create_socket(host, host_port) :: kafka_ex_socket | nil + @callback create_socket(host, host_port, KafkaEx.ssl_options()) :: + kafka_ex_socket | nil + @callback create_socket(host, host_port, KafkaEx.ssl_options(), use_ssl) :: + kafka_ex_socket | nil + + @doc """ + Close socket, if socket is nil, do nothing. + """ + @callback close_socket(kafka_ex_socket | nil) :: :ok + + @doc """ + Send request asynchronously to broker. + """ + @callback send_async_request(kafka_ex_broker, request_data) :: + :ok | {:error, :closed | :inet.posix()} + + @doc """ + Send request synchronously to broker. + """ + @callback send_sync_request(kafka_ex_broker, iodata, timeout) :: + response_data | {:error, any()} + + @doc """ + Returns the host in Erlang IP address format if the host is an IP address. + Otherwise, returns the host as a char list. + """ + @callback format_host(binary) :: [char] | :inet.ip_address() +end From 07fba217e3c2e6256bbcc9e2b97c0ec165cf8660 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 18 Mar 2023 14:39:01 +0100 Subject: [PATCH 04/11] [Kayrock Preparation] Add hammox & first typed checks --- mix.exs | 8 ++- mix.lock | 6 ++- test/integration/new_client_test.exs | 11 ++++- test/kafka_ex/network_client_test.exs | 70 +++++++++++++++++++++++++++ test/network_client_test.exs | 20 -------- test/support/mocks.ex | 1 + test/test_helper.exs | 1 + 7 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 test/kafka_ex/network_client_test.exs delete mode 100644 test/network_client_test.exs create mode 100644 test/support/mocks.ex diff --git a/mix.exs b/mix.exs index defa9b1a..edb6f03b 100644 --- a/mix.exs +++ b/mix.exs @@ -10,6 +10,7 @@ defmodule KafkaEx.Mixfile do app: :kafka_ex, version: @version, elixir: "~> 1.8", + elixirc_paths: elixirc_paths(Mix.env()), dialyzer: [ plt_add_deps: :transitive, plt_add_apps: [:ssl], @@ -47,12 +48,14 @@ defmodule KafkaEx.Mixfile do end defp deps do - main_deps = [ + [ {:kayrock, "~> 0.1.12"}, {:credo, "~> 1.1", only: [:dev, :test], runtime: false}, + {:ex_doc, "~> 0.23", only: :dev, runtime: false}, {:dialyxir, "~> 1.0", only: :dev, runtime: false}, {:excoveralls, "~> 0.7", only: :test, runtime: false}, {:ex_doc, "~> 0.23", only: :dev, runtime: false}, + {:hammox, "~> 0.5.0", only: :test}, {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}, {:snappyer, "~> 1.2", only: [:dev, :test]} @@ -63,6 +66,9 @@ defmodule KafkaEx.Mixfile do "Kafka client for Elixir/Erlang." end + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + defp package do [ maintainers: ["Abejide Ayodele", "Dan Swain", "Jack Lund", "Joshua Scott"], diff --git a/mix.lock b/mix.lock index 2f5cab45..9dcceb17 100644 --- a/mix.lock +++ b/mix.lock @@ -11,19 +11,23 @@ "ex_doc": {:hex, :ex_doc, "0.28.3", "6eea2f69995f5fba94cd6dd398df369fe4e777a47cd887714a0976930615c9e6", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "05387a6a2655b5f9820f3f627450ed20b4325c25977b2ee69bed90af6688e718"}, "excoveralls": {:hex, :excoveralls, "0.12.1", "a553c59f6850d0aff3770e4729515762ba7c8e41eedde03208182a8dc9d0ce07", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "5c1f717066a299b1b732249e736c5da96bb4120d1e55dc2e6f442d251e18a812"}, "hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "e0100f8ef7d1124222c11ad362c857d3df7cb5f4204054f9f0f4a728666591fc"}, + "hammox": {:hex, :hammox, "0.5.0", "e621c7832a2226cd5ef4b20d16adc825d12735fd40c43e01527995a180823ca5", [:mix], [{:mox, "~> 1.0", [hex: :mox, repo: "hexpm", optional: false]}, {:ordinal, "~> 0.1", [hex: :ordinal, repo: "hexpm", optional: false]}], "hexpm", "15bf108989b894e87ef6778a2950025399bc8d69f344f319247b22531e32de2f"}, "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, - "kayrock": {:hex, :kayrock, "0.1.12", "6c7d840808bf771cd2cd5b650e583aaa7c13309a79008295da0f98a2de7d2fb3", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "5693d225deeea6448d0e84dcf4b1ecd99bad8426e88144c649be6bf8cc0c830a"}, + "kayrock": {:hex, :kayrock, "0.1.12", "6c7d840808bf771cd2cd5b650e583aaa7c13309a79008295da0f98a2de7d2fb3", [:mix], [{:connection, "~> 1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~> 0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~> 1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "5693d225deeea6448d0e84dcf4b1ecd99bad8426e88144c649be6bf8cc0c830a"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "ordinal": {:hex, :ordinal, "0.2.0", "d3eda0cb04ee1f0ca0aae37bf2cf56c28adce345fe56a75659031b6068275191", [:mix], [], "hexpm", "defca8f10dee9f03a090ed929a595303252700a9a73096b6f2f8d88341690d65"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "snappy": {:git, "https://github.com/fdmanana/snappy-erlang-nif", "e8907ee8e37cfa07d933a070669a88798082c3d7", []}, "snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"}, + "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, "varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"}, } diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 009b594c..f51327d8 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -55,11 +55,20 @@ defmodule KafkaEx.New.Client.Test do %Kayrock.ListOffsets.V1.Response{responses: responses} = resp [main_resp] = responses + [%{error_code: error_code, offset: offset}] = + main_resp.partition_responses + + %Kayrock.ListOffsets.V1.Response{responses: responses} = resp + [main_resp] = responses + [%{error_code: error_code, offset: offset}] = main_resp.partition_responses assert error_code == 0 - {:ok, latest_offset} = KafkaExAPI.latest_offset(client, topic, partition) + + {:ok, latest_offset} = + KafkaExAPI.latest_offset(client, topic, partition) + assert latest_offset == offset end end diff --git a/test/kafka_ex/network_client_test.exs b/test/kafka_ex/network_client_test.exs new file mode 100644 index 00000000..a7254c2d --- /dev/null +++ b/test/kafka_ex/network_client_test.exs @@ -0,0 +1,70 @@ +defmodule KafkaEx.NetworkClientTest do + use ExUnit.Case, async: true + + use Hammox.Protect, + module: KafkaEx.NetworkClient, + behaviour: KafkaEx.NetworkClient.Behaviour + + alias KafkaEx.NetworkClient + + describe "close_socket/1" do + test "closes the socket" do + {:ok, socket} = + :gen_tcp.listen(3001, [ + :binary, + {:active, false}, + {:reuseaddr, true}, + {:packet, 0} + ]) + + kafka_ex_socket = %KafkaEx.Socket{socket: socket} + + assert :ok == close_socket(kafka_ex_socket) + assert {:error, :closed} == :gen_tcp.send(socket, <<>>) + end + + test "does not fail if socket is nil" do + assert :ok == close_socket(nil) + end + end + + describe "create_socket/3" do + setup do + pid = Server.start(3040) + + on_exit(fn -> + Process.exit(pid, :normal) + end) + end + + test "creates a socket" do + kafka_ex_socket = create_socket("localhost", 3040, [], false) + + assert kafka_ex_socket.socket + assert kafka_ex_socket.ssl == false + end + + test "returns nil if socket creation fails" do + assert nil == create_socket("localhost", 3002, [], true) + end + end + + describe "format_host/1" do + test "format_host returns Erlang IP address format if IP address string is specified" do + assert {100, 20, 3, 4} == format_host("100.20.3.4") + end + + test "format_host returns the char list version of the string passed in if host is not IP address" do + assert 'host' == format_host("host") + end + + test "format_host handles hosts with embedded digits correctly" do + assert 'host0' == format_host("host0") + end + + test "format_host correct handles hosts embedded with ip address" do + assert 'ip.10.4.1.11' == format_host("ip.10.4.1.11") + assert 'ip-10-4-1-11' == format_host("ip-10-4-1-11") + end + end +end diff --git a/test/network_client_test.exs b/test/network_client_test.exs deleted file mode 100644 index dbce0526..00000000 --- a/test/network_client_test.exs +++ /dev/null @@ -1,20 +0,0 @@ -defmodule KafkaEx.NetworkClient.Test do - use ExUnit.Case - - test "format_host returns Erlang IP address format if IP address string is specified" do - assert {100, 20, 3, 4} == KafkaEx.NetworkClient.format_host("100.20.3.4") - end - - test "format_host returns the char list version of the string passed in if host is not IP address" do - assert 'host' == KafkaEx.NetworkClient.format_host("host") - end - - test "format_host handles hosts with embedded digits correctly" do - assert 'host0' == KafkaEx.NetworkClient.format_host("host0") - end - - test "format_host correct handles hosts embedded with ip address" do - assert 'ip.10.4.1.11' == KafkaEx.NetworkClient.format_host("ip.10.4.1.11") - assert 'ip-10-4-1-11' == KafkaEx.NetworkClient.format_host("ip-10-4-1-11") - end -end diff --git a/test/support/mocks.ex b/test/support/mocks.ex new file mode 100644 index 00000000..9066e96d --- /dev/null +++ b/test/support/mocks.ex @@ -0,0 +1 @@ +Hammox.defmock(KafkaEx.NetworkClientMock, for: KafkaEx.NetworkClient.Behaviour) diff --git a/test/test_helper.exs b/test/test_helper.exs index ee2ee16d..bf078b33 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,5 @@ ExUnit.start() +{:ok, _} = Application.ensure_all_started(:hammox) ExUnit.configure( timeout: 120 * 1000, From caaa045ca4fad39ee33438b1acf3393e6a943bec Mon Sep 17 00:00:00 2001 From: Argonus Date: Tue, 9 May 2023 16:38:51 +0200 Subject: [PATCH 05/11] [Kayrock Preparation] Add Server & SSL Server to test support --- test/kafka_ex/socket_test.exs | 4 +-- test/support/server.ex | 29 ++++++++++++++++++++ test/support/sll_server.ex | 50 +++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 test/support/server.ex create mode 100644 test/support/sll_server.ex diff --git a/test/kafka_ex/socket_test.exs b/test/kafka_ex/socket_test.exs index 126ede1a..6232a864 100644 --- a/test/kafka_ex/socket_test.exs +++ b/test/kafka_ex/socket_test.exs @@ -7,7 +7,7 @@ defmodule KafkaEx.Socket.Test do describe "without SSL socket" do setup do - Server.start(3040) + KafkaEx.TestSupport.Server.start(3040) {:ok, [port: 3040]} end @@ -56,7 +56,7 @@ defmodule KafkaEx.Socket.Test do describe "with ssl socket" do setup do - SSLServer.start(3030) + KafkaEx.TestSupport.SSLServer.start(3030) {:ok, [ssl_port: 3030]} end diff --git a/test/support/server.ex b/test/support/server.ex new file mode 100644 index 00000000..ab0439a0 --- /dev/null +++ b/test/support/server.ex @@ -0,0 +1,29 @@ +defmodule KafkaEx.TestSupport.Server do + def start(port) do + {:ok, listen_socket} = + :gen_tcp.listen(port, [ + :binary, + {:active, false}, + {:reuseaddr, true}, + {:packet, 0} + ]) + + spawn_link(fn -> listen(listen_socket) end) + end + + defp listen(socket) do + {:ok, conn} = :gen_tcp.accept(socket) + spawn_link(fn -> recv(conn) end) + listen(socket) + end + + defp recv(conn) do + case :gen_tcp.recv(conn, 0) do + {:ok, data} -> + :ok = :gen_tcp.send(conn, data) + + {:error, :closed} -> + :ok + end + end +end diff --git a/test/support/sll_server.ex b/test/support/sll_server.ex new file mode 100644 index 00000000..89a7f89a --- /dev/null +++ b/test/support/sll_server.ex @@ -0,0 +1,50 @@ +defmodule KafkaEx.TestSupport.SSLServer do + def start(port) do + {:ok, listen_socket} = + :ssl.listen(port, [ + :binary, + {:verify, :verify_none}, + {:active, false}, + {:reuseaddr, true}, + {:packet, 0}, + {:certfile, 'test/fixtures/server.crt'}, + {:keyfile, 'test/fixtures/server.key'} + ]) + + spawn_link(fn -> listen(listen_socket) end) + end + + defp listen(socket) do + case :ssl.transport_accept(socket) do + {:ok, conn} -> + if otp_version_21_plus?() do + {:ok, _socket} = :ssl.handshake(conn) + else + :ok = :ssl.ssl_accept(conn) + end + + pid = spawn_link(fn -> recv(conn) end) + :ssl.controlling_process(socket, pid) + + _ -> + :ok + end + + listen(socket) + end + + defp recv(conn) do + case :ssl.recv(conn, 0) do + {:ok, data} -> + :ok = :ssl.send(conn, data) + + {:error, :closed} -> + :ok + end + end + + defp otp_version_21_plus? do + {version, _} = System.otp_release() |> Float.parse() + version >= 21 + end +end From 1ff796577831e268529f3b6e745114b60b4de5f2 Mon Sep 17 00:00:00 2001 From: Argonus Date: Tue, 9 May 2023 16:43:01 +0200 Subject: [PATCH 06/11] [Kayrock Preparations] Mix format & Fix tests --- test/integration/new_client_test.exs | 13 ++----------- test/kafka_ex/compression_test.exs | 4 +++- test/kafka_ex/network_client_test.exs | 4 +--- test/kafka_ex/new/structs/broker_test.exs | 3 +-- 4 files changed, 7 insertions(+), 17 deletions(-) diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index f51327d8..ea814822 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -52,23 +52,14 @@ defmodule KafkaEx.New.Client.Test do NodeSelector.topic_partition(topic, partition) ) - %Kayrock.ListOffsets.V1.Response{responses: responses} = resp - [main_resp] = responses - - [%{error_code: error_code, offset: offset}] = - main_resp.partition_responses - - %Kayrock.ListOffsets.V1.Response{responses: responses} = resp - [main_resp] = responses + %Kayrock.ListOffsets.V1.Response{responses: [main_resp]} = resp [%{error_code: error_code, offset: offset}] = main_resp.partition_responses assert error_code == 0 - {:ok, latest_offset} = - KafkaExAPI.latest_offset(client, topic, partition) - + {:ok, latest_offset} = KafkaExAPI.latest_offset(client, topic, partition) assert latest_offset == offset end end diff --git a/test/kafka_ex/compression_test.exs b/test/kafka_ex/compression_test.exs index ad2c33a4..c0516b35 100644 --- a/test/kafka_ex/compression_test.exs +++ b/test/kafka_ex/compression_test.exs @@ -12,7 +12,9 @@ defmodule KafkaEx.CompressionTest do expected = <<0, 0, 0, 0, 0, 0, 0, 37, 0, 0, 3, 246, 10, 44, 16, 236, 0, 0, 255, 255, - 255, 255, 0, 0, 3, 232>> <> String.duplicate("ABCDEFGHIJ", 100) + 255, 255, 0, 0, 3, + 232>> <> + String.duplicate("ABCDEFGHIJ", 100) ## enable :snappy module, and test it Application.put_env(:kafka_ex, :snappy_module, :snappy) diff --git a/test/kafka_ex/network_client_test.exs b/test/kafka_ex/network_client_test.exs index a7254c2d..c9ee89cc 100644 --- a/test/kafka_ex/network_client_test.exs +++ b/test/kafka_ex/network_client_test.exs @@ -5,8 +5,6 @@ defmodule KafkaEx.NetworkClientTest do module: KafkaEx.NetworkClient, behaviour: KafkaEx.NetworkClient.Behaviour - alias KafkaEx.NetworkClient - describe "close_socket/1" do test "closes the socket" do {:ok, socket} = @@ -30,7 +28,7 @@ defmodule KafkaEx.NetworkClientTest do describe "create_socket/3" do setup do - pid = Server.start(3040) + pid = KafkaEx.TestSupport.Server.start(3040) on_exit(fn -> Process.exit(pid, :normal) diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index 29d94d34..c2d06859 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -4,7 +4,7 @@ defmodule KafkaEx.New.BrokerTest do alias KafkaEx.New.Broker setup do - pid = Server.start(3040) + pid = KafkaEx.TestSupport.Server.start(3040) {:ok, socket} = KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) @@ -77,7 +77,6 @@ defmodule KafkaEx.New.BrokerTest do end test "returns true if broker has same socket", %{socket: socket} do - socket = %KafkaEx.Socket{} broker = %Broker{socket: nil} |> Broker.put_socket(socket) assert Broker.has_socket?(broker, socket.socket) From 5644d1d678c5ada62cf54b14bf4ab3f3c2c0b9d8 Mon Sep 17 00:00:00 2001 From: Argonus Date: Tue, 9 May 2023 17:01:42 +0200 Subject: [PATCH 07/11] [Kayrock Preparation] Satisfy dialyzer --- lib/kafka_ex/new/structs/node_selector.ex | 18 +++++++++++------- test/kafka_ex/new/structs/broker_test.exs | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/kafka_ex/new/structs/node_selector.ex b/lib/kafka_ex/new/structs/node_selector.ex index 62e7dd28..1b5ef8c3 100644 --- a/lib/kafka_ex/new/structs/node_selector.ex +++ b/lib/kafka_ex/new/structs/node_selector.ex @@ -20,13 +20,16 @@ defmodule KafkaEx.New.NodeSelector do | :consumer_group @type t :: %__MODULE__{ strategy: valid_strategy, - node_id: non_neg_integer + node_id: non_neg_integer | nil, + topic: KafkaExAPI.topic_name() | nil, + partition: KafkaExAPI.partition_id() | nil, + consumer_group_name: KafkaExAPI.consumer_group_name() | nil } @doc """ Select a specific node """ - @spec node_id(KafkaExAPI.node_id()) :: t + @spec node_id(KafkaExAPI.node_id()) :: __MODULE__.t() def node_id(node_id) when is_integer(node_id) do %__MODULE__{strategy: :node_id, node_id: node_id} end @@ -34,25 +37,26 @@ defmodule KafkaEx.New.NodeSelector do @doc """ Select a random node """ - @spec random :: t + @spec random :: __MODULE__.t() def random, do: %__MODULE__{strategy: :random} @doc """ Select first available node """ - @spec first_available :: t + @spec first_available :: __MODULE__.t() def first_available, do: %__MODULE__{strategy: :first_available} @doc """ Select the cluster's controller node """ - @spec controller :: t + @spec controller :: __MODULE__.t() def controller, do: %__MODULE__{strategy: :controller} @doc """ Select the controller for the given topic and partition """ - @spec topic_partition(KafkaExAPI.topic_name(), KafkaExAPI.partition_id()) :: t + @spec topic_partition(KafkaExAPI.topic_name(), KafkaExAPI.partition_id()) :: + __MODULE__.t() def topic_partition(topic, partition) when is_binary(topic) and is_integer(partition) do %__MODULE__{ @@ -65,7 +69,7 @@ defmodule KafkaEx.New.NodeSelector do @doc """ Select the controller for the given consumer group """ - @spec consumer_group(KafkaExAPI.consumer_group_name()) :: t + @spec consumer_group(KafkaExAPI.consumer_group_name()) :: __MODULE__.t() def consumer_group(consumer_group_name) when is_binary(consumer_group_name) do %__MODULE__{ strategy: :consumer_group, diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index c2d06859..6c31937d 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -1,5 +1,5 @@ defmodule KafkaEx.New.BrokerTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false alias KafkaEx.New.Broker From 51bb98eafdb36cee35f66a0935bd3b70bc32ab43 Mon Sep 17 00:00:00 2001 From: Argonus Date: Tue, 9 May 2023 17:27:45 +0200 Subject: [PATCH 08/11] Add dirty hack to support types for old versions of elixir --- lib/kafka_ex/network_client/behaviour.ex | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/kafka_ex/network_client/behaviour.ex b/lib/kafka_ex/network_client/behaviour.ex index f8d71e06..3aa47d45 100644 --- a/lib/kafka_ex/network_client/behaviour.ex +++ b/lib/kafka_ex/network_client/behaviour.ex @@ -7,12 +7,19 @@ defmodule KafkaEx.NetworkClient.Behaviour do @type host :: binary @type host_port :: non_neg_integer @type use_ssl :: boolean - @type kafka_ex_socket :: KafkaEx.Socket.t() @type kafka_ex_broker :: KafkaEx.Protocol.Metadata.Broker.t() | KafkaEx.New.Broker.t() @type request_data :: iodata @type response_data :: iodata + # Dirty hack to allow mocking of socket in unit tests for elixir 1.8 + # We should remove this when we drop support for elixir 1.8 + if Version.match?(System.version(), ">= 1.10.0") do + @type kafka_ex_socket :: KafkaEx.Socket.t() + else + @type kafka_ex_socket :: any + end + @doc """ Creates a socket to the given host and port. """ From 07f265ee09ef9277ee2e276af432aefb5d4f19e8 Mon Sep 17 00:00:00 2001 From: Piotr Rybarczyk Date: Sat, 3 Jun 2023 08:37:35 +0200 Subject: [PATCH 09/11] Apply suggestions from code review Co-authored-by: Joshua Scott --- lib/kafka_ex/new/structs/node_selector.ex | 2 +- mix.exs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/kafka_ex/new/structs/node_selector.ex b/lib/kafka_ex/new/structs/node_selector.ex index 1b5ef8c3..c93af7fd 100644 --- a/lib/kafka_ex/new/structs/node_selector.ex +++ b/lib/kafka_ex/new/structs/node_selector.ex @@ -29,7 +29,7 @@ defmodule KafkaEx.New.NodeSelector do @doc """ Select a specific node """ - @spec node_id(KafkaExAPI.node_id()) :: __MODULE__.t() + @spec node_id(KafkaExAPI.node_id()) :: t() def node_id(node_id) when is_integer(node_id) do %__MODULE__{strategy: :node_id, node_id: node_id} end diff --git a/mix.exs b/mix.exs index edb6f03b..54a6ef76 100644 --- a/mix.exs +++ b/mix.exs @@ -51,7 +51,6 @@ defmodule KafkaEx.Mixfile do [ {:kayrock, "~> 0.1.12"}, {:credo, "~> 1.1", only: [:dev, :test], runtime: false}, - {:ex_doc, "~> 0.23", only: :dev, runtime: false}, {:dialyxir, "~> 1.0", only: :dev, runtime: false}, {:excoveralls, "~> 0.7", only: :test, runtime: false}, {:ex_doc, "~> 0.23", only: :dev, runtime: false}, From 2110b9fb606bda5a40d9ed98e8776dab36eaa752 Mon Sep 17 00:00:00 2001 From: Argonus Date: Thu, 3 Aug 2023 13:17:11 +0200 Subject: [PATCH 10/11] [KP-p1] Apply suggestion from CR Fix module name to be same as path --- lib/kafka_ex/network_client/behaviour.ex | 2 +- lib/kafka_ex/new/client.ex | 8 +-- lib/kafka_ex/new/client/state.ex | 2 +- lib/kafka_ex/new/client_compatibility.ex | 2 +- lib/kafka_ex/new/kafka_ex_api.ex | 6 +- lib/kafka_ex/new/structs/broker.ex | 2 +- lib/kafka_ex/new/structs/cluster_metadata.ex | 10 +-- lib/kafka_ex/new/structs/node_selector.ex | 2 +- lib/kafka_ex/new/structs/partition.ex | 2 +- lib/kafka_ex/new/structs/topic.ex | 4 +- .../kayrock/compatibility_test.exs | 2 +- test/integration/new_client_test.exs | 6 +- test/kafka_ex/new/structs/broker_test.exs | 4 +- .../new/structs/cluster_metadata_test.exs | 67 ++++++++++--------- .../new/structs/node_selector_test.exs | 4 +- test/kafka_ex/new/structs/partiton_test.exs | 4 +- test/kafka_ex/new/structs/topic_test.exs | 6 +- test/test_helper.exs | 2 +- 18 files changed, 69 insertions(+), 66 deletions(-) diff --git a/lib/kafka_ex/network_client/behaviour.ex b/lib/kafka_ex/network_client/behaviour.ex index 3aa47d45..427c1773 100644 --- a/lib/kafka_ex/network_client/behaviour.ex +++ b/lib/kafka_ex/network_client/behaviour.ex @@ -8,7 +8,7 @@ defmodule KafkaEx.NetworkClient.Behaviour do @type host_port :: non_neg_integer @type use_ssl :: boolean @type kafka_ex_broker :: - KafkaEx.Protocol.Metadata.Broker.t() | KafkaEx.New.Broker.t() + KafkaEx.Protocol.Metadata.Broker.t() | KafkaEx.New.Structs.Broker.t() @type request_data :: iodata @type response_data :: iodata diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index c6b17923..50d12885 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -14,9 +14,9 @@ defmodule KafkaEx.New.Client do alias KafkaEx.Config alias KafkaEx.NetworkClient - alias KafkaEx.New.Broker - alias KafkaEx.New.ClusterMetadata - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.Broker + alias KafkaEx.New.Structs.ClusterMetadata + alias KafkaEx.New.Structs.NodeSelector alias KafkaEx.New.Client.State @@ -49,7 +49,7 @@ defmodule KafkaEx.New.Client do @spec send_request( KafkaEx.New.KafkaExAPI.client(), map, - KafkaEx.New.NodeSelector.t(), + KafkaEx.New.Structs.NodeSelector.t(), pos_integer | nil ) :: {:ok, term} | {:error, term} def send_request( diff --git a/lib/kafka_ex/new/client/state.ex b/lib/kafka_ex/new/client/state.ex index b8821b07..61116655 100644 --- a/lib/kafka_ex/new/client/state.ex +++ b/lib/kafka_ex/new/client/state.ex @@ -3,7 +3,7 @@ defmodule KafkaEx.New.Client.State do # state struct for New.Client - alias KafkaEx.New.ClusterMetadata + alias KafkaEx.New.Structs.ClusterMetadata defstruct( bootstrap_uris: [], diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index d43e6fb6..5574c449 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -7,7 +7,7 @@ defmodule KafkaEx.New.ClientCompatibility do # module can be removed once we drop compatibility for the old API alias KafkaEx.New.Adapter - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.NodeSelector alias KafkaEx.New.Client.State diff --git a/lib/kafka_ex/new/kafka_ex_api.ex b/lib/kafka_ex/new/kafka_ex_api.ex index 50076f46..937c6547 100644 --- a/lib/kafka_ex/new/kafka_ex_api.ex +++ b/lib/kafka_ex/new/kafka_ex_api.ex @@ -14,9 +14,9 @@ defmodule KafkaEx.New.KafkaExAPI do """ alias KafkaEx.New.Client - alias KafkaEx.New.ClusterMetadata - alias KafkaEx.New.Topic - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.ClusterMetadata + alias KafkaEx.New.Structs.Topic + alias KafkaEx.New.Structs.NodeSelector @type node_id :: non_neg_integer @type topic_name :: binary diff --git a/lib/kafka_ex/new/structs/broker.ex b/lib/kafka_ex/new/structs/broker.ex index 67810f00..f9d2fc50 100644 --- a/lib/kafka_ex/new/structs/broker.ex +++ b/lib/kafka_ex/new/structs/broker.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.New.Broker do +defmodule KafkaEx.New.Structs.Broker do @moduledoc """ Encapsulates what we know about a broker and our connection """ diff --git a/lib/kafka_ex/new/structs/cluster_metadata.ex b/lib/kafka_ex/new/structs/cluster_metadata.ex index a43182ad..d2b276d7 100644 --- a/lib/kafka_ex/new/structs/cluster_metadata.ex +++ b/lib/kafka_ex/new/structs/cluster_metadata.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.New.ClusterMetadata do +defmodule KafkaEx.New.Structs.ClusterMetadata do @moduledoc """ Encapsulates what we know about the state of a Kafka broker cluster @@ -6,10 +6,10 @@ defmodule KafkaEx.New.ClusterMetadata do functions may be useful for extracting metadata information """ - alias KafkaEx.New.Broker - alias KafkaEx.New.Topic + alias KafkaEx.New.Structs.Broker + alias KafkaEx.New.Structs.Topic alias KafkaEx.New.KafkaExAPI - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.NodeSelector defstruct brokers: %{}, controller_id: nil, @@ -64,7 +64,7 @@ defmodule KafkaEx.New.ClusterMetadata do Note this will not update the metadata, only select a node given the current metadata. - See `t:KafkaEx.New.NodeSelector.t/0` + See `t:KafkaEx.New.Structs.NodeSelector.t/0` """ @spec select_node(t, NodeSelector.t()) :: {:ok, KafkaExAPI.node_id()} | {:error, node_select_error} diff --git a/lib/kafka_ex/new/structs/node_selector.ex b/lib/kafka_ex/new/structs/node_selector.ex index c93af7fd..750a8362 100644 --- a/lib/kafka_ex/new/structs/node_selector.ex +++ b/lib/kafka_ex/new/structs/node_selector.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.New.NodeSelector do +defmodule KafkaEx.New.Structs.NodeSelector do @moduledoc """ Defines node selector functions and macros """ diff --git a/lib/kafka_ex/new/structs/partition.ex b/lib/kafka_ex/new/structs/partition.ex index 7bf8bdd0..57555c22 100644 --- a/lib/kafka_ex/new/structs/partition.ex +++ b/lib/kafka_ex/new/structs/partition.ex @@ -1,4 +1,4 @@ -defmodule KafkaEx.New.Partition do +defmodule KafkaEx.New.Structs.Partition do @moduledoc """ Encapsulates what we know about a partition """ diff --git a/lib/kafka_ex/new/structs/topic.ex b/lib/kafka_ex/new/structs/topic.ex index 27eb4233..4457e62e 100644 --- a/lib/kafka_ex/new/structs/topic.ex +++ b/lib/kafka_ex/new/structs/topic.ex @@ -1,9 +1,9 @@ -defmodule KafkaEx.New.Topic do +defmodule KafkaEx.New.Structs.Topic do @moduledoc """ Encapsulates what we know about a topic """ - alias KafkaEx.New.Partition + alias KafkaEx.New.Structs.Partition defstruct name: nil, partition_leaders: %{}, diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index 5f945505..f81f5a9d 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -16,7 +16,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do alias KafkaEx.Protocol.Metadata.TopicMetadata alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse - alias KafkaEx.New.ClusterMetadata + alias KafkaEx.New.Structs.ClusterMetadata alias KafkaEx.New.KafkaExAPI setup do diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index ea814822..33168413 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -3,10 +3,10 @@ defmodule KafkaEx.New.Client.Test do alias KafkaEx.New.Client - alias KafkaEx.New.ClusterMetadata + alias KafkaEx.New.Structs.ClusterMetadata alias KafkaEx.New.KafkaExAPI - alias KafkaEx.New.Topic - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.Topic + alias KafkaEx.New.Structs.NodeSelector alias Kayrock.RecordBatch alias Kayrock.RecordBatch.Record diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index 6c31937d..ca45a9d9 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.BrokerTest do +defmodule KafkaEx.New.Structs.BrokerTest do use ExUnit.Case, async: false - alias KafkaEx.New.Broker + alias KafkaEx.New.Structs.Broker setup do pid = KafkaEx.TestSupport.Server.start(3040) diff --git a/test/kafka_ex/new/structs/cluster_metadata_test.exs b/test/kafka_ex/new/structs/cluster_metadata_test.exs index ff773503..93017c7a 100644 --- a/test/kafka_ex/new/structs/cluster_metadata_test.exs +++ b/test/kafka_ex/new/structs/cluster_metadata_test.exs @@ -1,6 +1,6 @@ -defmodule KafkaEx.New.ClusterMetadataTest do +defmodule KafkaEx.New.Structs.ClusterMetadataTest do use ExUnit.Case, async: true - alias KafkaEx.New.ClusterMetadata + alias KafkaEx.New.Structs.ClusterMetadata describe "from_metadata_v1_response/1" do @tag skip: true @@ -10,7 +10,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "known_topics/1" do test "return list of all known topics" do - topic = %KafkaEx.New.Topic{name: "test-topic"} + topic = %KafkaEx.New.Structs.Topic{name: "test-topic"} cluster = %ClusterMetadata{topics: %{topic.name => topic}} assert ClusterMetadata.known_topics(cluster) == ["test-topic"] @@ -19,8 +19,8 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "topics_metadata/1" do test "returns metadata for topics we've asked for" do - topic_1 = %KafkaEx.New.Topic{name: "test-topic-one"} - topic_2 = %KafkaEx.New.Topic{name: "test-topic-two"} + topic_1 = %KafkaEx.New.Structs.Topic{name: "test-topic-one"} + topic_2 = %KafkaEx.New.Structs.Topic{name: "test-topic-two"} cluster = %ClusterMetadata{ topics: %{ @@ -37,7 +37,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "brokers/1" do test "returns list of brokers" do - broker = %KafkaEx.New.Broker{node_id: 1} + broker = %KafkaEx.New.Structs.Broker{node_id: 1} cluster = %ClusterMetadata{brokers: %{1 => broker}} assert ClusterMetadata.brokers(cluster) == [broker] @@ -46,8 +46,8 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "select_node/2" do test "returns random node" do - broker = %KafkaEx.New.Broker{node_id: 1} - node_selector = KafkaEx.New.NodeSelector.random() + broker = %KafkaEx.New.Structs.Broker{node_id: 1} + node_selector = KafkaEx.New.Structs.NodeSelector.random() cluster = %ClusterMetadata{brokers: %{1 => broker}} assert ClusterMetadata.select_node(cluster, node_selector) == @@ -55,9 +55,9 @@ defmodule KafkaEx.New.ClusterMetadataTest do end test "returns controller node" do - broker_1 = %KafkaEx.New.Broker{node_id: 1} - broker_2 = %KafkaEx.New.Broker{node_id: 2} - node_selector = KafkaEx.New.NodeSelector.controller() + broker_1 = %KafkaEx.New.Structs.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Structs.Broker{node_id: 2} + node_selector = KafkaEx.New.Structs.NodeSelector.controller() cluster = %ClusterMetadata{ controller_id: 1, @@ -69,9 +69,9 @@ defmodule KafkaEx.New.ClusterMetadataTest do end test "returns node based on node_id" do - broker_1 = %KafkaEx.New.Broker{node_id: 1} - broker_2 = %KafkaEx.New.Broker{node_id: 2} - node_selector = KafkaEx.New.NodeSelector.node_id(2) + broker_1 = %KafkaEx.New.Structs.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Structs.Broker{node_id: 2} + node_selector = KafkaEx.New.Structs.NodeSelector.node_id(2) cluster = %ClusterMetadata{ controller_id: 1, @@ -83,9 +83,9 @@ defmodule KafkaEx.New.ClusterMetadataTest do end test "returns error when node does not exist" do - broker_1 = %KafkaEx.New.Broker{node_id: 1} - broker_2 = %KafkaEx.New.Broker{node_id: 2} - node_selector = KafkaEx.New.NodeSelector.node_id(3) + broker_1 = %KafkaEx.New.Structs.Broker{node_id: 1} + broker_2 = %KafkaEx.New.Structs.Broker{node_id: 2} + node_selector = KafkaEx.New.Structs.NodeSelector.node_id(3) cluster = %ClusterMetadata{ controller_id: 1, @@ -98,7 +98,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do test "returns node based on topic & partition id" do topic_one = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-one", is_internal: false, partition_metadata: [ @@ -107,7 +107,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do }) topic_two = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-two", is_internal: false, partition_metadata: [ @@ -115,7 +115,8 @@ defmodule KafkaEx.New.ClusterMetadataTest do ] }) - node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-one", 0) + node_selector = + KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 0) cluster = %ClusterMetadata{ topics: %{ @@ -129,7 +130,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do test "returns error when topic does not exist" do topic = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-one", is_internal: false, partition_metadata: [ @@ -137,7 +138,8 @@ defmodule KafkaEx.New.ClusterMetadataTest do ] }) - node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-two", 0) + node_selector = + KafkaEx.New.Structs.NodeSelector.topic_partition("topic-two", 0) cluster = %ClusterMetadata{ topics: %{ @@ -151,7 +153,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do test "returns error when partition does not exist" do topic = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-one", is_internal: false, partition_metadata: [ @@ -159,7 +161,8 @@ defmodule KafkaEx.New.ClusterMetadataTest do ] }) - node_selector = KafkaEx.New.NodeSelector.topic_partition("topic-one", 1) + node_selector = + KafkaEx.New.Structs.NodeSelector.topic_partition("topic-one", 1) cluster = %ClusterMetadata{ topics: %{ @@ -173,7 +176,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do test "returns node based on consumer group name" do node_selector = - KafkaEx.New.NodeSelector.consumer_group("consumer-group-one") + KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-one") cluster = %ClusterMetadata{ consumer_group_coordinators: %{ @@ -187,7 +190,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do test "returns error when consumer group does not exist" do node_selector = - KafkaEx.New.NodeSelector.consumer_group("consumer-group-three") + KafkaEx.New.Structs.NodeSelector.consumer_group("consumer-group-three") cluster = %ClusterMetadata{ consumer_group_coordinators: %{ @@ -206,14 +209,14 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "broker_by_node_id/1" do test "returns broker by its node id" do - broker = %KafkaEx.New.Broker{node_id: 1} + broker = %KafkaEx.New.Structs.Broker{node_id: 1} cluster = %ClusterMetadata{brokers: %{1 => broker}} assert ClusterMetadata.broker_by_node_id(cluster, 1) == broker end test "returns nil when broker is not found" do - broker = %KafkaEx.New.Broker{node_id: 1} + broker = %KafkaEx.New.Structs.Broker{node_id: 1} cluster = %ClusterMetadata{brokers: %{1 => broker}} refute ClusterMetadata.broker_by_node_id(cluster, 2) @@ -223,12 +226,12 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "update_brokers/2" do test "updates brokers based on given function" do socket = %KafkaEx.Socket{} - broker = %KafkaEx.New.Broker{node_id: 1, socket: socket} + broker = %KafkaEx.New.Structs.Broker{node_id: 1, socket: socket} cluster = %ClusterMetadata{brokers: %{1 => broker}} updated_cluster = ClusterMetadata.update_brokers(cluster, fn broker_to_update -> - KafkaEx.New.Broker.put_socket(broker_to_update, nil) + KafkaEx.New.Structs.Broker.put_socket(broker_to_update, nil) end) updated_broker = ClusterMetadata.broker_by_node_id(updated_cluster, 1) @@ -239,7 +242,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do describe "remove_topics/2" do test "test removes topic based on its name" do topic_one = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-one", is_internal: false, partition_metadata: [ @@ -248,7 +251,7 @@ defmodule KafkaEx.New.ClusterMetadataTest do }) topic_two = - KafkaEx.New.Topic.from_topic_metadata(%{ + KafkaEx.New.Structs.Topic.from_topic_metadata(%{ topic: "topic-two", is_internal: false, partition_metadata: [ diff --git a/test/kafka_ex/new/structs/node_selector_test.exs b/test/kafka_ex/new/structs/node_selector_test.exs index b0a012b3..1234a7a3 100644 --- a/test/kafka_ex/new/structs/node_selector_test.exs +++ b/test/kafka_ex/new/structs/node_selector_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.NodeSelectorTest do +defmodule KafkaEx.New.Structs.NodeSelectorTest do use ExUnit.Case, async: true - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.NodeSelector describe "node_id/1" do test "build selector based on node id strategy" do diff --git a/test/kafka_ex/new/structs/partiton_test.exs b/test/kafka_ex/new/structs/partiton_test.exs index 20ee78e2..97476520 100644 --- a/test/kafka_ex/new/structs/partiton_test.exs +++ b/test/kafka_ex/new/structs/partiton_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.PartitionTest do +defmodule KafkaEx.New.Structs.PartitionTest do use ExUnit.Case, async: true - alias KafkaEx.New.Partition + alias KafkaEx.New.Structs.Partition describe "from_partition_metadata/1" do setup do diff --git a/test/kafka_ex/new/structs/topic_test.exs b/test/kafka_ex/new/structs/topic_test.exs index d7a04d40..8187699c 100644 --- a/test/kafka_ex/new/structs/topic_test.exs +++ b/test/kafka_ex/new/structs/topic_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.TopicTest do +defmodule KafkaEx.New.Structs.TopicTest do use ExUnit.Case, async: true - alias KafkaEx.New.Topic + alias KafkaEx.New.Structs.Topic describe "from_topic_metadata/1" do setup do @@ -40,7 +40,7 @@ defmodule KafkaEx.New.TopicTest do topic = Topic.from_topic_metadata(metadata) assert topic.partitions == [ - %KafkaEx.New.Partition{ + %KafkaEx.New.Structs.Partition{ partition_id: 123, leader: 321, replicas: [], diff --git a/test/test_helper.exs b/test/test_helper.exs index bf078b33..9a5a1f04 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -15,7 +15,7 @@ ExUnit.configure( defmodule TestHelper do alias KafkaEx.New.Client - alias KafkaEx.New.NodeSelector + alias KafkaEx.New.Structs.NodeSelector require Logger def generate_random_string(string_length \\ 20) do From 1fc35af6708aa916742b6bd2d5eea8eed6158e5d Mon Sep 17 00:00:00 2001 From: Argonus Date: Thu, 3 Aug 2023 13:25:49 +0200 Subject: [PATCH 11/11] Fix Github Action --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a4e666ea..d6950324 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,7 +86,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Setup elixir & erlang environment - uses: actions/setup-elixir@v1 + uses: erlef/setup-beam@v1 with: elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required] otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]