diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/UnboundedDataSource.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/UnboundedDataSource.java index 6cc65a8f..8522c0ed 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/UnboundedDataSource.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/UnboundedDataSource.java @@ -30,7 +30,11 @@ @Audience(Audience.Type.EXECUTOR) public interface UnboundedDataSource extends DataSource { - /** @return a list of all partitions of this source */ + /** + * Error should be reported via throwing RuntimeException + * (Do not return empty list on error) + * @return a list of all partitions of this source + * */ List> getPartitions(); @Override diff --git a/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java b/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java index 33f41922..126a2cc8 100644 --- a/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java +++ b/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.kafka; +import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting; import cz.seznam.euphoria.shadow.com.google.common.collect.AbstractIterator; import cz.seznam.euphoria.shadow.com.google.common.collect.Lists; import cz.seznam.euphoria.core.client.io.UnboundedDataSource; @@ -206,7 +207,7 @@ public List, Long>> getPartitions() { stopReadingAtStamp); } } - try (Consumer c = KafkaUtils.newConsumer( + try (Consumer c = newConsumer( brokerList, "euphoria.partition-probe-" + UUID.randomUUID().toString(), config)) { @@ -219,20 +220,36 @@ public List, Long>> getPartitions() { throw new RuntimeException(e); } List ps = c.partitionsFor(topicId); + + if (ps.isEmpty()) { + throw new IllegalStateException("No kafka partitions found for topic " + topicId); + } + final long stopAtStamp = stopReadingAtStamp; final long defaultOffsetTimestamp = offsetTimestamp; - return ps.stream().map(p -> - // ~ FIXME a leader might not be available (check p.leader().id() == -1) - // ... fail in this situation - new KafkaPartition( - brokerList, topicId, p.partition(), - config, - offs.getOrDefault(p.partition(), defaultOffsetTimestamp), - stopAtStamp)) - .collect(Collectors.toList()); + return ps.stream() + .map((PartitionInfo p) -> { + if (p.leader().id() == -1) { + throw new IllegalStateException("Leader not available"); + } + + return new KafkaPartition( + brokerList, topicId, p.partition(), + config, + offs.getOrDefault(p.partition(), defaultOffsetTimestamp), + stopAtStamp); + }) + .collect(Collectors.toList()); } } + @VisibleForTesting + Consumer newConsumer(String brokerList, @Nullable String groupId, @Nullable Settings config) { + return KafkaUtils.newConsumer( + brokerList, "euphoria.partition-probe-" + UUID.randomUUID().toString(), + config); + } + @Override public boolean isBounded() { return false; diff --git a/euphoria-kafka/src/test/java/cz/seznam/euphoria/kafka/KafkaSourceTest.java b/euphoria-kafka/src/test/java/cz/seznam/euphoria/kafka/KafkaSourceTest.java new file mode 100644 index 00000000..f9e0d789 --- /dev/null +++ b/euphoria-kafka/src/test/java/cz/seznam/euphoria/kafka/KafkaSourceTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016-2018 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; +import org.junit.experimental.categories.Categories; + +import java.util.Collections; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class KafkaSourceTest { + + /** + * Test if where there are no partitions + * an IllegalStateException is expected + */ + @Test(expected = IllegalStateException.class) + @SuppressWarnings("unchecked") + public void testNoPartitions() { + tryGetPartitions(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + public void testPartitions() { + Node leaderNode = new Node(1, "localhost", 3333); + PartitionInfo pi = new PartitionInfo("topic", 0, leaderNode, null, null); + tryGetPartitions(Collections.singletonList(pi)); + } + + @Test(expected = IllegalStateException.class) + @SuppressWarnings("unchecked") + public void testNoLeader() { + PartitionInfo pi = new PartitionInfo("topic", 0, Node.noNode(), null, null); + tryGetPartitions(Collections.singletonList(pi)); + } + + public void tryGetPartitions(List partitions) { + KafkaSource source = mock(KafkaSource.class); + Consumer consumer = mock(Consumer.class); + when(consumer.partitionsFor(any(String.class))).thenReturn(partitions); + when(source.newConsumer(any(), any(), any())).thenReturn(consumer); + when(source.getPartitions()).thenCallRealMethod(); + source.getPartitions(); + } +} \ No newline at end of file