Skip to content

Commit

Permalink
Merge pull request #265 from seznam/slechta/issue-262
Browse files Browse the repository at this point in the history
Slechta/issue 262
  • Loading branch information
je-ik authored Feb 13, 2018
2 parents 88f365f + c9f7afa commit 9163b2e
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
@Audience(Audience.Type.EXECUTOR)
public interface UnboundedDataSource<T, OFFSET extends Serializable> extends DataSource<T> {

/** @return a list of all partitions of this source */
/**
* Error should be reported via throwing RuntimeException
* (Do not return empty list on error)
* @return a list of all partitions of this source
* */
List<UnboundedPartition<T, OFFSET>> getPartitions();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.kafka;

import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shadow.com.google.common.collect.AbstractIterator;
import cz.seznam.euphoria.shadow.com.google.common.collect.Lists;
import cz.seznam.euphoria.core.client.io.UnboundedDataSource;
Expand Down Expand Up @@ -206,7 +207,7 @@ public List<UnboundedPartition<Pair<byte[], byte[]>, Long>> getPartitions() {
stopReadingAtStamp);
}
}
try (Consumer<?, ?> c = KafkaUtils.newConsumer(
try (Consumer<?, ?> c = newConsumer(
brokerList, "euphoria.partition-probe-" + UUID.randomUUID().toString(),
config)) {

Expand All @@ -219,20 +220,36 @@ public List<UnboundedPartition<Pair<byte[], byte[]>, Long>> getPartitions() {
throw new RuntimeException(e);
}
List<PartitionInfo> ps = c.partitionsFor(topicId);

if (ps.isEmpty()) {
throw new IllegalStateException("No kafka partitions found for topic " + topicId);
}

final long stopAtStamp = stopReadingAtStamp;
final long defaultOffsetTimestamp = offsetTimestamp;
return ps.stream().map(p ->
// ~ FIXME a leader might not be available (check p.leader().id() == -1)
// ... fail in this situation
new KafkaPartition(
brokerList, topicId, p.partition(),
config,
offs.getOrDefault(p.partition(), defaultOffsetTimestamp),
stopAtStamp))
.collect(Collectors.toList());
return ps.stream()
.map((PartitionInfo p) -> {
if (p.leader().id() == -1) {
throw new IllegalStateException("Leader not available");
}

return new KafkaPartition(
brokerList, topicId, p.partition(),
config,
offs.getOrDefault(p.partition(), defaultOffsetTimestamp),
stopAtStamp);
})
.collect(Collectors.toList());
}
}

@VisibleForTesting
Consumer<byte[], byte[]> newConsumer(String brokerList, @Nullable String groupId, @Nullable Settings config) {
return KafkaUtils.newConsumer(
brokerList, "euphoria.partition-probe-" + UUID.randomUUID().toString(),
config);
}

@Override
public boolean isBounded() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016-2018 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Test;
import org.junit.experimental.categories.Categories;

import java.util.Collections;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class KafkaSourceTest {

/**
* Test if where there are no partitions
* an IllegalStateException is expected
*/
@Test(expected = IllegalStateException.class)
@SuppressWarnings("unchecked")
public void testNoPartitions() {
tryGetPartitions(Collections.emptyList());
}

@SuppressWarnings("unchecked")
public void testPartitions() {
Node leaderNode = new Node(1, "localhost", 3333);
PartitionInfo pi = new PartitionInfo("topic", 0, leaderNode, null, null);
tryGetPartitions(Collections.singletonList(pi));
}

@Test(expected = IllegalStateException.class)
@SuppressWarnings("unchecked")
public void testNoLeader() {
PartitionInfo pi = new PartitionInfo("topic", 0, Node.noNode(), null, null);
tryGetPartitions(Collections.singletonList(pi));
}

public void tryGetPartitions(List<PartitionInfo> partitions) {
KafkaSource source = mock(KafkaSource.class);
Consumer<byte[], byte[]> consumer = mock(Consumer.class);
when(consumer.partitionsFor(any(String.class))).thenReturn(partitions);
when(source.newConsumer(any(), any(), any())).thenReturn(consumer);
when(source.getPartitions()).thenCallRealMethod();
source.getPartitions();
}
}

0 comments on commit 9163b2e

Please sign in to comment.