Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kayrock Preparations] Structs and unit tests #483

Merged
merged 11 commits into from
Sep 7, 2023
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Setup elixir & erlang environment
uses: actions/setup-elixir@v1
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]
Expand Down
21 changes: 10 additions & 11 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
defmodule KafkaEx.NetworkClient do
@moduledoc """
KafkaEx implementation of Client used to connect to Kafka Broker
"""
@behaviour KafkaEx.NetworkClient.Behaviour

require Logger
alias KafkaEx.New
alias KafkaEx.Protocol.Metadata.Broker
alias KafkaEx.Socket

@moduledoc false
@spec create_socket(binary, non_neg_integer, KafkaEx.ssl_options(), boolean) ::
nil | Socket.t()
@impl true
def create_socket(host, port, ssl_options \\ [], use_ssl \\ false) do
case Socket.create(
format_host(host),
Expand All @@ -32,12 +33,11 @@ defmodule KafkaEx.NetworkClient do
end
end

@spec close_socket(nil | Socket.t()) :: :ok
@impl true
def close_socket(nil), do: :ok
def close_socket(socket), do: Socket.close(socket)

@spec send_async_request(Broker.t() | New.Broker.t(), iodata) ::
:ok | {:error, :closed | :inet.posix()}
@impl true
def send_async_request(broker, data) do
socket = broker.socket

Expand All @@ -55,8 +55,7 @@ defmodule KafkaEx.NetworkClient do
end
end

@spec send_sync_request(Broker.t() | New.Broker.t(), iodata, timeout) ::
iodata | {:error, any()}
@impl true
def send_sync_request(%{:socket => socket} = broker, data, timeout) do
:ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, false}])

Expand Down Expand Up @@ -99,7 +98,7 @@ defmodule KafkaEx.NetworkClient do
{:error, :no_broker}
end

@spec format_host(binary) :: [char] | :inet.ip_address()
@impl true
def format_host(host) do
case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do
[match_data] = [[_, _, _, _, _]] ->
Expand Down
54 changes: 54 additions & 0 deletions lib/kafka_ex/network_client/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule KafkaEx.NetworkClient.Behaviour do
@moduledoc """
Behaviour for any network client.
Created mainly to allow mocking request & responses in unit tests.
"""

@type host :: binary
@type host_port :: non_neg_integer
@type use_ssl :: boolean
@type kafka_ex_broker ::
KafkaEx.Protocol.Metadata.Broker.t() | KafkaEx.New.Structs.Broker.t()
@type request_data :: iodata
@type response_data :: iodata

# Dirty hack to allow mocking of socket in unit tests for elixir 1.8
# We should remove this when we drop support for elixir 1.8
if Version.match?(System.version(), ">= 1.10.0") do
@type kafka_ex_socket :: KafkaEx.Socket.t()
else
@type kafka_ex_socket :: any
end

@doc """
Creates a socket to the given host and port.
"""
@callback create_socket(host, host_port) :: kafka_ex_socket | nil
@callback create_socket(host, host_port, KafkaEx.ssl_options()) ::
kafka_ex_socket | nil
@callback create_socket(host, host_port, KafkaEx.ssl_options(), use_ssl) ::
kafka_ex_socket | nil

@doc """
Close socket, if socket is nil, do nothing.
"""
@callback close_socket(kafka_ex_socket | nil) :: :ok

@doc """
Send request asynchronously to broker.
"""
@callback send_async_request(kafka_ex_broker, request_data) ::
:ok | {:error, :closed | :inet.posix()}

@doc """
Send request synchronously to broker.
"""
@callback send_sync_request(kafka_ex_broker, iodata, timeout) ::
response_data | {:error, any()}

@doc """
Returns the host in Erlang IP address format if the host is an IP address.
Otherwise, returns the host as a char list.
"""
@callback format_host(binary) :: [char] | :inet.ip_address()
end
8 changes: 4 additions & 4 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule KafkaEx.New.Client do
alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Broker
alias KafkaEx.New.ClusterMetadata
alias KafkaEx.New.NodeSelector
alias KafkaEx.New.Structs.Broker
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.NodeSelector

alias KafkaEx.New.Client.State

Expand Down Expand Up @@ -49,7 +49,7 @@ defmodule KafkaEx.New.Client do
@spec send_request(
KafkaEx.New.KafkaExAPI.client(),
map,
KafkaEx.New.NodeSelector.t(),
KafkaEx.New.Structs.NodeSelector.t(),
pos_integer | nil
) :: {:ok, term} | {:error, term}
def send_request(
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/new/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule KafkaEx.New.Client.State do

# state struct for New.Client

alias KafkaEx.New.ClusterMetadata
alias KafkaEx.New.Structs.ClusterMetadata

defstruct(
bootstrap_uris: [],
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule KafkaEx.New.ClientCompatibility do
# module can be removed once we drop compatibility for the old API

alias KafkaEx.New.Adapter
alias KafkaEx.New.NodeSelector
alias KafkaEx.New.Structs.NodeSelector

alias KafkaEx.New.Client.State

Expand Down
6 changes: 3 additions & 3 deletions lib/kafka_ex/new/kafka_ex_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule KafkaEx.New.KafkaExAPI do
"""

alias KafkaEx.New.Client
alias KafkaEx.New.ClusterMetadata
alias KafkaEx.New.Topic
alias KafkaEx.New.NodeSelector
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.Topic
alias KafkaEx.New.Structs.NodeSelector

@type node_id :: non_neg_integer
@type topic_name :: binary
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule KafkaEx.New.Broker do
defmodule KafkaEx.New.Structs.Broker do
@moduledoc """
Encapsulates what we know about a broker
Encapsulates what we know about a broker and our connection
"""

alias KafkaEx.Socket
Expand All @@ -11,7 +11,13 @@ defmodule KafkaEx.New.Broker do
socket: nil,
rack: nil

@type t :: %__MODULE__{}
@type t :: %__MODULE__{
node_id: non_neg_integer,
host: binary,
port: non_neg_integer,
socket: Socket.t() | nil,
rack: binary
}

@doc false
def put_socket(%__MODULE__{} = broker, socket), do: %{broker | socket: socket}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
defmodule KafkaEx.New.ClusterMetadata do
defmodule KafkaEx.New.Structs.ClusterMetadata do
@moduledoc """
Encapsulates what we know about the state of a Kafka broker cluster

This module is mainly used internally in New.Client, but some of its
functions may be useful for extracting metadata information
"""

alias KafkaEx.New.Broker
alias KafkaEx.New.Topic
alias KafkaEx.New.Structs.Broker
alias KafkaEx.New.Structs.Topic
alias KafkaEx.New.KafkaExAPI
alias KafkaEx.New.NodeSelector
alias KafkaEx.New.Structs.NodeSelector

defstruct brokers: %{},
controller_id: nil,
Expand Down Expand Up @@ -64,7 +64,7 @@ defmodule KafkaEx.New.ClusterMetadata do

Note this will not update the metadata, only select a node given the current metadata.

See `t:KafkaEx.New.NodeSelector.t/0`
See `t:KafkaEx.New.Structs.NodeSelector.t/0`
"""
@spec select_node(t, NodeSelector.t()) ::
{:ok, KafkaExAPI.node_id()} | {:error, node_select_error}
Expand Down Expand Up @@ -231,8 +231,9 @@ defmodule KafkaEx.New.ClusterMetadata do
%{cluster_metadata | brokers: updated_brokers}
end

@doc false
# update a consumer group coordinator node id
@doc """
update a consumer group coordinator node id
"""
@spec put_consumer_group_coordinator(
t,
KafkaExAPI.consumer_group_name(),
Expand All @@ -255,8 +256,9 @@ defmodule KafkaEx.New.ClusterMetadata do
}
end

@doc false
# remove the given topics (e.g., when they are deleted)
@doc """
remove the given topics (e.g., when they are deleted)
"""
@spec remove_topics(t, [KafkaExAPI.topic_name()]) :: t
def remove_topics(
%__MODULE__{topics: topics} = cluster_metadata,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,65 @@
defmodule KafkaEx.New.NodeSelector do
defmodule KafkaEx.New.Structs.NodeSelector do
@moduledoc """
Defines node selector functions and macros
"""

alias KafkaEx.New.KafkaExAPI
alias KafkaEx.New.NodeSelector

defstruct strategy: nil,
node_id: nil,
topic: nil,
partition: nil,
consumer_group_name: nil

@type t :: %__MODULE__{}
@type valid_strategy ::
:node_id
| :random
| :first_available
| :controller
| :topic_partition
| :consumer_group
@type t :: %__MODULE__{
strategy: valid_strategy,
node_id: non_neg_integer | nil,
topic: KafkaExAPI.topic_name() | nil,
partition: KafkaExAPI.partition_id() | nil,
consumer_group_name: KafkaExAPI.consumer_group_name() | nil
}

@doc """
Select a specific node
"""
@spec node_id(KafkaExAPI.node_id()) :: t
@spec node_id(KafkaExAPI.node_id()) :: t()
def node_id(node_id) when is_integer(node_id) do
%NodeSelector{strategy: :node_id, node_id: node_id}
%__MODULE__{strategy: :node_id, node_id: node_id}
end

@doc """
Select a random node
"""
@spec random :: t
def random, do: %NodeSelector{strategy: :random}
@spec random :: __MODULE__.t()
def random, do: %__MODULE__{strategy: :random}

@doc """
Select first available node
"""
@spec first_available :: t
def first_available, do: %NodeSelector{strategy: :first_available}
@spec first_available :: __MODULE__.t()
def first_available, do: %__MODULE__{strategy: :first_available}

@doc """
Select the cluster's controller node
"""
@spec controller :: t
def controller, do: %NodeSelector{strategy: :controller}
@spec controller :: __MODULE__.t()
def controller, do: %__MODULE__{strategy: :controller}

@doc """
Select the controller for the given topic and partition
"""
@spec topic_partition(KafkaExAPI.topic_name(), KafkaExAPI.partition_id()) :: t
@spec topic_partition(KafkaExAPI.topic_name(), KafkaExAPI.partition_id()) ::
__MODULE__.t()
def topic_partition(topic, partition)
when is_binary(topic) and is_integer(partition) do
%NodeSelector{
%__MODULE__{
strategy: :topic_partition,
topic: topic,
partition: partition
Expand All @@ -56,9 +69,9 @@ defmodule KafkaEx.New.NodeSelector do
@doc """
Select the controller for the given consumer group
"""
@spec consumer_group(KafkaExAPI.consumer_group_name()) :: t
@spec consumer_group(KafkaExAPI.consumer_group_name()) :: __MODULE__.t()
def consumer_group(consumer_group_name) when is_binary(consumer_group_name) do
%NodeSelector{
%__MODULE__{
strategy: :consumer_group,
consumer_group_name: consumer_group_name
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
defmodule KafkaEx.New.Partition do
defmodule KafkaEx.New.Structs.Partition do
@moduledoc """
Encapsulates what we know about a partition
"""

defstruct partition_id: nil, leader: -1, replicas: [], isr: []

@type t :: %__MODULE__{}
@type t :: %__MODULE__{
partition_id: integer,
leader: integer,
replicas: list(integer),
isr: list(integer)
}

@doc false
def from_partition_metadata(%{
Expand Down
11 changes: 8 additions & 3 deletions lib/kafka_ex/new/topic.ex → lib/kafka_ex/new/structs/topic.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
defmodule KafkaEx.New.Topic do
defmodule KafkaEx.New.Structs.Topic do
@moduledoc """
Encapsulates what we know about a topic
"""

alias KafkaEx.New.Partition
alias KafkaEx.New.Structs.Partition

defstruct name: nil,
partition_leaders: %{},
is_internal: false,
partitions: []

@type t :: %__MODULE__{}
@type t :: %__MODULE__{
name: String.t(),
partition_leaders: %{integer() => integer()},
is_internal: boolean(),
partitions: [Partition.t()]
}

@doc false
def from_topic_metadata(%{
Expand Down
Loading
Loading