From e97cd7ba4bfa61b2ecd216caf0566eb9505683be Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 1 Apr 2020 15:28:46 +0200 Subject: [PATCH 1/2] Wait for sync group response as long as session timeout TODO: should really introduce rebalance timeout instead --- changelog.md | 2 ++ src/brod_group_coordinator.erl | 16 +++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/changelog.md b/changelog.md index 6e82de46..8d5e9ccf 100644 --- a/changelog.md +++ b/changelog.md @@ -199,3 +199,5 @@ * Stop supporting rebar * Update kafka_protocol dependency to 2.3.6 * Add new brod_group_coordinator:update_topics API +* 3.10.1 + * Wait as long as session timeout for sync response diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index a6e5d398..8c7f5f11 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -612,12 +612,13 @@ join_group(#state{ groupId = GroupId {ok, State}. -spec sync_group(state()) -> {ok, state()}. -sync_group(#state{ groupId = GroupId - , generationId = GenerationId - , memberId = MemberId - , connection = Connection - , member_pid = MemberPid - , member_module = MemberModule +sync_group(#state{ groupId = GroupId + , generationId = GenerationId + , memberId = MemberId + , connection = Connection + , member_pid = MemberPid + , member_module = MemberModule + , session_timeout_seconds = SessionTimeoutSec } = State) -> ReqBody = [ {group_id, GroupId} @@ -625,9 +626,10 @@ sync_group(#state{ groupId = GroupId , {member_id, MemberId} , {group_assignment, assign_partitions(State)} ], + SessionTimeout = timer:seconds(SessionTimeoutSec), SyncReq = brod_kafka_request:sync_group(Connection, ReqBody), %% send sync group request and wait for response - RspBody = send_sync(Connection, SyncReq), + RspBody = send_sync(Connection, SyncReq, SessionTimeout), %% get my partition assignments Assignment = kpro:find(member_assignment, RspBody), TopicAssignments = get_topic_assignments(State, Assignment), From 94d9ad08ef5df4a97e7b1d41b4356e0adab0490e Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 1 Apr 2020 16:41:00 +0200 Subject: [PATCH 2/2] fix default value doc --- src/brod_group_coordinator.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 8c7f5f11..0a76830c 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -179,7 +179,7 @@ %% partitions. %% %% -%%
  • `session_timeout_seconds' (optional, default = 10) +%%
  • `session_timeout_seconds' (optional, default = 30) %% %% Time in seconds for the group coordinator broker to consider a member %% 'down' if no heartbeat or any kind of requests received from a broker @@ -187,7 +187,7 @@ %% A group member may also consider the coordinator broker 'down' if no %% heartbeat response response received in the past N seconds.
  • %% -%%
  • `heartbeat_rate_seconds' (optional, default = 2) +%%
  • `heartbeat_rate_seconds' (optional, default = 5) %% %% Time in seconds for the member to 'ping' the group coordinator. %% OBS: Care should be taken when picking the number, on one hand, we do