diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/BaseAllocator.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/BaseAllocator.java index 2a918b5d8..ff910dd70 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/BaseAllocator.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/BaseAllocator.java @@ -14,6 +14,7 @@ public class BaseAllocator extends WorkAllocator { + protected final static int DEFAULT_MAP_TASKS_COUNT = 30; protected Properties props; @@ -40,7 +41,7 @@ public int compare(CamusRequest o1, CamusRequest o2) { @Override public List allocateWork(List requests, JobContext context) throws IOException { - int numTasks = context.getConfiguration().getInt("mapred.map.tasks", 30); + int numTasks = getMapTasksCount(context); reverseSortRequests(requests); @@ -73,4 +74,8 @@ protected EtlSplit getSmallestMultiSplit(List kafkaETLSplits) throws return smallest; } + protected int getMapTasksCount(JobContext context) { + return context.getConfiguration().getInt("mapred.map.tasks", DEFAULT_MAP_TASKS_COUNT); + } + } diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/RoundRobinAllocator.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/RoundRobinAllocator.java new file mode 100644 index 000000000..8ca25da57 --- /dev/null +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/RoundRobinAllocator.java @@ -0,0 +1,43 @@ +package com.linkedin.camus.workallocater; + +import com.google.common.collect.Lists; +import com.linkedin.camus.etl.kafka.mapred.EtlSplit; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; +import java.util.List; + +public class RoundRobinAllocator extends BaseAllocator { + + /** + * Creates splits using round-robin algorithm + * + * @param requests sorted by topic requests + * @param context camus job context + * @return splits + * @throws IOException + */ + @Override + public List allocateWork(List requests, JobContext context) throws IOException { + int numTasks = Math.min(getMapTasksCount(context), requests.size()); + List kafkaETLSplits = Lists.newArrayListWithCapacity(numTasks); + + for (int i = 0; i < numTasks; i++) { + if (requests.size() > 0) { + kafkaETLSplits.add(new EtlSplit()); + } + } + + int currentSplitIndex = 0; + for (CamusRequest request : requests) { + if (currentSplitIndex >= kafkaETLSplits.size()) { + currentSplitIndex = 0; + } + + ((EtlSplit) kafkaETLSplits.get(currentSplitIndex)).addRequest(request); + currentSplitIndex++; + } + return kafkaETLSplits; + } +} diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/TopicGroupingAllocator.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/TopicGroupingAllocator.java index 5c426b7d3..bf73120d9 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/TopicGroupingAllocator.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/workallocater/TopicGroupingAllocator.java @@ -22,7 +22,7 @@ public class TopicGroupingAllocator extends BaseAllocator { @Override public List allocateWork(List requests, JobContext context) throws IOException { - int numTasks = context.getConfiguration().getInt("mapred.map.tasks", 30); + int numTasks = getMapTasksCount(context); List kafkaETLSplits = new ArrayList(); for (int i = 0; i < numTasks; i++) { diff --git a/camus-etl-kafka/src/test/java/com/linkedin/camus/workallocater/RoundRobinAllocatorTest.java b/camus-etl-kafka/src/test/java/com/linkedin/camus/workallocater/RoundRobinAllocatorTest.java new file mode 100644 index 000000000..a4bf320bb --- /dev/null +++ b/camus-etl-kafka/src/test/java/com/linkedin/camus/workallocater/RoundRobinAllocatorTest.java @@ -0,0 +1,148 @@ +package com.linkedin.camus.workallocater; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.linkedin.camus.etl.kafka.common.EtlRequest; +import com.linkedin.camus.etl.kafka.mapred.EtlSplit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RoundRobinAllocatorTest { + private WorkAllocator allocator = new RoundRobinAllocator(); + private JobContext context; + private Configuration configuration; + private int mappersCount = 5; + + @Before + public void setUp() { + configuration = EasyMock.createMock(Configuration.class); + EasyMock.expect(configuration.getInt(EasyMock.anyString(), EasyMock.anyInt())).andReturn(mappersCount).anyTimes(); + + context = EasyMock.createMock(JobContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(configuration).anyTimes(); + + EasyMock.replay(new Object[]{configuration, context}); + } + + @Test + public void testOneSplit() throws Exception { + int topicsCount = 1; + int partitionsPerTopic = 1; + int expectedSplits = 1; + + performTest(topicsCount, partitionsPerTopic, expectedSplits); + } + + @Test + public void testSplitPerPartition() throws Exception { + int topicsCount = 1; + int partitionsPerTopic = 3; + int expectedSplits = 3; + + performTest(topicsCount, partitionsPerTopic, expectedSplits); + } + + @Test + public void testNotMoreMappers() throws Exception { + int topicsCount = 3; + int partitionsPerTopic = 3; + int expectedSplits = mappersCount; + + performTest(topicsCount, partitionsPerTopic, expectedSplits); + } + + @Test + public void testSplitPerTopic() throws Exception { + int topicsCount = 3; + int partitionsPerTopic = 1; + int expectedSplits = 3; + + performTest(topicsCount, partitionsPerTopic, expectedSplits); + } + + @Test + public void testRoundRobiness() throws Exception { + int topicsCount = 3; + int partitionsPerTopic = mappersCount; + int expectedSplits = mappersCount; + + List splits = performTest(topicsCount, partitionsPerTopic, expectedSplits); + + for (InputSplit split: splits) { + Set topics = Sets.newHashSet(); + EtlSplit etlSplit = (EtlSplit) split; + while(etlSplit.getNumRequests() > 0) { + CamusRequest request = etlSplit.popRequest(); + topics.add(request.getTopic()); + } + assertEquals("Each split must contain one partition for each topic", topicsCount, topics.size()); + } + } + + @Test + public void testDifferenceBetweenSplits() throws Exception { + int topicsCount = 42; + int partitionsPerTopic = 5545; + int expectedSplits = mappersCount; + + performTest(topicsCount, partitionsPerTopic, expectedSplits); + } + + private List performTest(int topicsCount, int partitionsPerTopic, int expectedSplitsCount) throws Exception { + List requests = generateRequests(topicsCount, partitionsPerTopic); + List streams = allocator.allocateWork(requests, context); + + assertEquals("Must create exact count of splits", expectedSplitsCount, streams.size()); + assertTrue("Max difference between splits must not be more than one", 1 >= calculateMaxDifferenceInSplitSize(streams)); + assertEquals("Must not skip partitions", topicsCount * partitionsPerTopic, calculaterequestsInSplits(streams)); + + return streams; + } + + private List generateRequests(int topicsCount, int partitions) { + List requests = Lists.newArrayListWithCapacity(topicsCount * partitions); + + for (int topicNum = 0; topicNum < topicsCount; topicNum++) { + for (int partNum = 0; partNum < partitions; partNum++) { + EtlRequest request = new EtlRequest(context, "some_topic_" + topicNum, "some_leader_id", partNum); + requests.add(request); + } + } + return requests; + } + + private int calculaterequestsInSplits(List splits) throws IOException, InterruptedException { + long requests = 0; + for (InputSplit split : splits) { + requests += ((EtlSplit)split).getNumRequests(); + } + return (int) requests; + } + + private int calculateMaxDifferenceInSplitSize(List splits) throws IOException, InterruptedException { + long min = Integer.MAX_VALUE; + long max = Integer.MIN_VALUE; + + for (InputSplit split : splits) { + EtlSplit etlSplit = (EtlSplit) split; + if (split.getLength() > max) { + max = etlSplit.getNumRequests(); + } + if (split.getLength() < min) { + min = etlSplit.getNumRequests(); + } + } + return (int) (max - min); + } +}