Skip to content

Commit

Permalink
feat(loader): support kafka as datasource (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxiaocs7 authored Oct 7, 2023
1 parent aa5f739 commit bfabb14
Show file tree
Hide file tree
Showing 24 changed files with 1,168 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/loader-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
mvn test -P file
mvn test -P hdfs
mvn test -P jdbc
mvn test -P kafka
- name: Upload coverage to Codecov
uses: codecov/[email protected]
Expand Down
23 changes: 23 additions & 0 deletions hugegraph-loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<mysql.connector.version>8.0.28</mysql.connector.version>
<postgres.version>42.4.1</postgres.version>
<mssql.jdbc.version>7.2.0.jre8</mssql.jdbc.version>
<kafka.testcontainer.version>1.19.0</kafka.testcontainer.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -515,6 +516,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${kafka.testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -615,6 +627,17 @@
</plugins>
</build>
</profile>
<profile>
<id>kafka</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<source_type>kafka</source_type>
<store_path>/files</store_path>
<test-classes>**/KafkaLoadTest.java</test-classes>
</properties>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -239,8 +240,11 @@ private void loadStruct(InputStruct struct, InputReader reader) {
try {
// Read next line from data source
if (reader.hasNext()) {
lines.add(reader.next());
metrics.increaseReadSuccess();
Line next = reader.next();
if (Objects.nonNull(next)) {
lines.add(next);
metrics.increaseReadSuccess();
}
} else {
finished = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,13 @@ public final class Constants {
public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
public static final String LOAD_DATA_SER_SUFFIX = "ser";
public static final String LOAD_DATA_INSERT_SUFFIX = "insert";

public static final long KAFKA_SESSION_TIMEOUT = 30000;
public static final long KAFKA_AUTO_COMMIT_INTERVAL = 1000;
public static final String KAFKA_AUTO_COMMIT = "true";
public static final String KAFKA_EARLIEST_OFFSET = "earliest";
public static final String KAFKA_LATEST_OFFSET = "latest";
public static final long KAFKA_POLL_DURATION = 1000;
public static final long KAFKA_POLL_GAP_INTERVAL = 1000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

package org.apache.hugegraph.loader.reader;

import java.util.List;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hugegraph.loader.constant.AutoCloseableIterator;
import org.apache.hugegraph.loader.exception.InitException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.reader.file.LocalFileReader;
import org.apache.hugegraph.loader.reader.hdfs.HDFSFileReader;
import org.apache.hugegraph.loader.reader.jdbc.JDBCReader;
import org.apache.hugegraph.loader.reader.kafka.KafkaReader;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.loader.source.hdfs.HDFSSource;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
import org.apache.hugegraph.loader.source.kafka.KafkaSource;

/**
* Responsible for continuously reading the next batch of data lines
Expand All @@ -51,9 +56,15 @@ static InputReader create(InputSource source) {
return new HDFSFileReader((HDFSSource) source);
case JDBC:
return new JDBCReader((JDBCSource) source);
case KAFKA:
return new KafkaReader((KafkaSource) source);
default:
throw new AssertionError(String.format("Unsupported input source '%s'",
source.type()));
}
}

default List<InputReader> split() {
throw new NotImplementedException("Not support multiple readers");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.loader.reader.kafka;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Properties;
import java.util.Queue;

import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.exception.InitException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.parser.CsvLineParser;
import org.apache.hugegraph.loader.parser.JsonLineParser;
import org.apache.hugegraph.loader.parser.LineParser;
import org.apache.hugegraph.loader.parser.TextLineParser;
import org.apache.hugegraph.loader.reader.AbstractReader;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.kafka.KafkaSource;
import org.apache.hugegraph.util.Log;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableList;

import lombok.SneakyThrows;

public class KafkaReader extends AbstractReader {

private static final Logger LOG = Log.logger(KafkaReader.class);

private final KafkaSource source;

private final LineParser parser;
private Queue<String> batch;

private static final String BASE_CONSUMER_GROUP = "kafka-reader-base";
private final KafkaConsumer dataConsumer;
private final boolean earlyStop;
private boolean emptyPoll;

public KafkaReader(KafkaSource source) {
this.source = source;

this.dataConsumer = createKafkaConsumer();
this.parser = createLineParser();
this.earlyStop = source.isEarlyStop();
}

@Override
public void init(LoadContext context,
InputStruct struct) throws InitException {
this.progress(context, struct);
}

@Override
public void confirmOffset() {
// Do Nothing
}

@Override
public void close() {
this.dataConsumer.close();
}

@Override
public boolean hasNext() {
return !this.earlyStop || !this.emptyPoll;
}

@Override
public Line next() {
if (batch == null || batch.size() == 0) {
batch = nextBatch();
}

String rawValue = batch.poll();
if (rawValue != null) {
return this.parser.parse(this.source.header(), rawValue);
} else {
this.emptyPoll = true;
}

return null;
}

private int getKafkaTopicPartitionCount() {
Properties props = new Properties();
props.put("bootstrap.servers", this.source.getBootstrapServer());
props.put("group.id", BASE_CONSUMER_GROUP);

KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props);
int count = consumer.partitionsFor(this.source.getTopic()).size();
consumer.close();

return count;
}

private KafkaConsumer<String, String> createKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", this.source.getBootstrapServer());
props.put("max.poll.records", this.source.getBatchSize());
props.put("group.id", this.source.getGroup());
props.put("enable.auto.commit", Constants.KAFKA_AUTO_COMMIT);
props.put("auto.commit.interval.ms", String.valueOf(Constants.KAFKA_AUTO_COMMIT_INTERVAL));
props.put("session.timeout.ms", String.valueOf(Constants.KAFKA_SESSION_TIMEOUT));
if (this.source.isFromBeginning()) {
props.put("auto.offset.reset", Constants.KAFKA_EARLIEST_OFFSET);
} else {
props.put("auto.offset.reset", Constants.KAFKA_LATEST_OFFSET);
}
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(ImmutableList.of(this.source.getTopic()));
return consumer;
}

@SneakyThrows
private Deque<String> nextBatch() {
ConsumerRecords<String, String> records =
dataConsumer.poll(Duration.ofMillis(Constants.KAFKA_POLL_DURATION));
Deque<String> queue = new ArrayDeque<>(records.count());
if (records.count() == 0) {
Thread.sleep(Constants.KAFKA_POLL_GAP_INTERVAL);
} else {
for (ConsumerRecord<String, String> record : records) {
queue.add(record.value());
}
}

return queue;
}

private LineParser createLineParser() {
FileFormat format = source.getFormat();
switch (format) {
case CSV:
return new CsvLineParser();
case TEXT:
return new TextLineParser(source.getDelimiter());
case JSON:
return new JsonLineParser();
default:
throw new AssertionError(String.format(
"Unsupported file format '%s' of source '%s'",
format, source));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import org.apache.hugegraph.loader.source.kafka.KafkaSource;
import org.apache.hugegraph.loader.util.JsonUtil;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.SourceType;
Expand Down Expand Up @@ -66,6 +67,8 @@ private static InputSource readInputSource(JsonNode node) {
.toUpperCase());
objectNode.replace(FIELD_VENDOR, vendorNode);
return JsonUtil.convert(node, JDBCSource.class);
case KAFKA:
return JsonUtil.convert(node, KafkaSource.class);
default:
throw new AssertionError(String.format("Unsupported input source '%s'", type));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ public enum SourceType {

HDFS,

JDBC
JDBC,

KAFKA
}
Loading

0 comments on commit bfabb14

Please sign in to comment.