diff --git a/docker-compose-arm.yml b/docker-compose-arm.yml new file mode 100644 index 00000000..ecaf5c1a --- /dev/null +++ b/docker-compose-arm.yml @@ -0,0 +1,94 @@ +version: '3.9' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.0.4.arm64 + restart: unless-stopped + ports: + - '32181:32181' + environment: + ZOOKEEPER_CLIENT_PORT: 32181 + ZOOKEEPER_TICK_TIME: 2000 + healthcheck: + test: "zookeeper-shell 127.0.01:32181 ls /" + interval: 10s + timeout: 10s + retries: 5 + + kafka-1: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9092:9092' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 + healthcheck: + test: kafka-topics --bootstrap-server kafka-1:29092 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka-2: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9093:9093' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093 + healthcheck: + test: kafka-topics --bootstrap-server kafka-2:29093 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka-3: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9094:9094' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094 + healthcheck: + test: kafka-topics --bootstrap-server kafka-2:29093 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka_setup: + image: confluentinc/cp-kafka:7.0.4.arm64 + depends_on: + zookeeper: + condition: service_healthy + kafka-1: + condition: service_healthy + kafka-2: + condition: service_healthy + kafka-3: + condition: service_healthy + command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \ + kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \ + kafka-topics --zookeeper zookeeper:32181 --list'" diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index ff687779..01138d3e 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -123,6 +123,22 @@ defmodule KafkaEx do Server.call(worker, :consumer_group) end + @doc """ + Sends a request to describe a group identified by its name. + We support only one consumer group per request for now, as we don't + group requests by group coordinator. + This is a new client implementation, and is not compatible with the old clients + """ + @spec describe_group(binary, Keyword.t()) :: {:ok, any} | {:error, any} + def describe_group(consumer_group_name, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker()) + + case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do + {:ok, [group]} -> {:ok, group} + {:error, error} -> {:error, error} + end + end + @doc """ Sends a request to join a consumer group. """ diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index cc7f2cdd..e0a862d9 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -292,14 +292,12 @@ defmodule KafkaEx.New.Client do case kayrock_network_request(request, node_selector, state) do {{:ok, response}, state_out} -> case ResponseParser.describe_groups_response(response) do - {:ok, [consumer_group]} -> - {{:ok, consumer_group}, state_out} + {:ok, consumer_groups} -> + {{:ok, consumer_groups}, state_out} {:error, [error | _]} -> - consumer_group = request.groups[0] - Logger.warn( - "Unable to fetch consumer group metadata for #{consumer_group.group_id}" + "Unable to fetch consumer group metadata for #{inspect(request.group_ids)}" ) handle_describe_group_request( @@ -312,10 +310,8 @@ defmodule KafkaEx.New.Client do end {_, _state_out} -> - consumer_group = request.groups[0] - Logger.warn( - "Unable to fetch consumer group metadata for #{consumer_group.group_id}" + "Unable to fetch consumer group metadata for #{inspect(request.group_ids)}" ) handle_describe_group_request( diff --git a/lib/kafka_ex/new/kafka_ex_api.ex b/lib/kafka_ex/new/kafka_ex_api.ex index 937c6547..1d6354d1 100644 --- a/lib/kafka_ex/new/kafka_ex_api.ex +++ b/lib/kafka_ex/new/kafka_ex_api.ex @@ -15,6 +15,7 @@ defmodule KafkaEx.New.KafkaExAPI do alias KafkaEx.New.Client alias KafkaEx.New.Structs.ClusterMetadata + alias KafkaEx.New.Structs.ConsumerGroup alias KafkaEx.New.Structs.Topic alias KafkaEx.New.Structs.NodeSelector @@ -56,6 +57,20 @@ defmodule KafkaEx.New.KafkaExAPI do end end + @doc """ + Sends a request to describe a group identified by its name. + We support only one consumer group per request for now, as we don't + group requests by group coordinator. + """ + @spec describe_group(client, Keyword.t()) :: + {:ok, ConsumerGroup.t()} | {:error, any} + def describe_group(client, consumer_group_name) do + case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do + {:ok, [group]} -> {:ok, group} + {:error, error} -> {:error, error} + end + end + @doc """ Get topic metadata for the given topics @@ -73,7 +88,7 @@ defmodule KafkaEx.New.KafkaExAPI do Returns the cluster metadata from the given client """ @spec cluster_metadata(client) :: {:ok, ClusterMetadata.t()} - def(cluster_metadata(client)) do + def cluster_metadata(client) do GenServer.call(client, :cluster_metadata) end diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index 7f6957fc..e42d2114 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -22,7 +22,8 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do partition = 0 consumer_group = "streamers" - {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic) + {:ok, topic} = + KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic) KafkaEx.produce(topic, partition, "foo 1", api_version: 3) KafkaEx.produce(topic, partition, "foo 2", api_version: 3) @@ -41,7 +42,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do } ) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> length(Enum.take(stream, 3)) == 3 end) @@ -81,7 +82,8 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do topic_name = "kayrock_stream_with_empty_log" consumer_group = "streamers_with_empty_log" - {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name) + {:ok, topic} = + KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name) {:ok, agent} = Agent.start(fn -> [] end) diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index 077bba9b..0318697c 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.KayrockCompatibilityTest do These mostly come from the original integration_test.exs file """ - use ExUnit.Case + import KafkaEx.TestHelpers @moduletag :new_client @@ -35,6 +35,46 @@ defmodule KafkaEx.KayrockCompatibilityTest do assert Process.alive?(pid) end + describe "describe_groups/1" do + setup do + consumer_group = generate_random_string() + topic = "new_client_implementation" + + {:ok, %{consumer_group: consumer_group, topic: topic}} + end + + test "with new client - returns group metadata", %{ + client: client, + consumer_group: consumer_group, + topic: topic + } do + join_to_group(client, topic, consumer_group) + + {:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group) + + assert group_metadata.group_id == consumer_group + assert group_metadata.protocol_type == "consumer" + assert group_metadata.protocol == "" + assert length(group_metadata.members) == 1 + end + + test "with old client - returns group metadata", %{ + client: client, + consumer_group: consumer_group, + topic: topic + } do + join_to_group(client, topic, consumer_group) + + {:ok, group_metadata} = + KafkaEx.describe_group(consumer_group, worker_name: client) + + assert group_metadata.group_id == consumer_group + assert group_metadata.protocol_type == "consumer" + assert group_metadata.protocol == "" + assert length(group_metadata.members) == 1 + end + end + test "worker updates metadata after specified interval" do {:ok, args} = KafkaEx.build_worker_options(metadata_update_interval: 100) {:ok, pid} = Client.start_link(args, :no_name) @@ -622,4 +662,16 @@ defmodule KafkaEx.KayrockCompatibilityTest do topic = KafkaEx.TestHelpers.generate_random_string() :ok = KafkaEx.produce(topic, nil, "hello", worker_name: client) end + + # ----------------------------------------------------------------------------- + defp join_to_group(client, topic, consumer_group) do + request = %KafkaEx.Protocol.JoinGroup.Request{ + group_name: consumer_group, + member_id: "", + topics: [topic], + session_timeout: 6000 + } + + KafkaEx.join_group(request, worker_name: client, timeout: 10000) + end end diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 15ae7b32..d92beb23 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -38,7 +38,7 @@ defmodule KafkaEx.New.Client.Test do } do join_to_group(client, topic, consumer_group) - {:ok, group_metadata} = + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, [consumer_group]}) assert group_metadata.group_id == consumer_group @@ -48,7 +48,7 @@ defmodule KafkaEx.New.Client.Test do end test "returns dead when consumer group does not exist", %{client: client} do - {:ok, group_metadata} = + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, ["non-existing-group"]}) assert group_metadata.group_id == "non-existing-group"