forked from ververica/lab-flink-repository-analytics
-
Notifications
You must be signed in to change notification settings - Fork 1
/
FlinkPullRequestsToKafka.java
114 lines (100 loc) · 4.51 KB
/
FlinkPullRequestsToKafka.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.ververica.platform;
import static com.ververica.platform.Utils.localDateTimeToInstant;
import static org.apache.flink.table.api.Expressions.$;
import com.ververica.platform.entities.PullRequest;
import com.ververica.platform.io.source.GithubPullRequestSource;
import java.time.Instant;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* Flink job that reads pull requests issued to the apache/flink Github repository (using the Github
* API) and writes their metadata to Kafka.
*/
public class FlinkPullRequestsToKafka {
public static final String APACHE_FLINK_REPOSITORY = "apache/flink";
public static void main(String[] args) {
ParameterTool params = ParameterTool.fromArgs(args);
// Sink
String kafkaServer = params.get("kafka-server", "kafka.vvp.svc");
String kafkaTopic = params.get("kafka-topic", "flink_pulls");
String kafkaSecurityProtocol = params.get("kafka-security-protocol", null);
String kafkaSaslMechanism = params.get("kafka-sasl-mechanism", null);
String kafkaSaslJaasConfig = params.get("kafka-sasl-jaas-config", null);
String cfltRegistryURL = params.getRequired("sr-url");
String cfltRegistryCredSource = params.get("sr-cred-source", "USER_INFO");
String cfltRegistryUserInfo = params.getRequired("sr-user-info");
// Source
long delayBetweenQueries = params.getLong("poll-interval-ms", 10_000L);
String startDateString = params.get("start-date", "");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
env.getConfig().enableObjectReuse();
DataStream<PullRequest> commits =
env.addSource(getGithubPullRequestSource(delayBetweenQueries, startDateString))
.name("flink-pulls-source")
.uid("flink-pulls-source");
tableEnv.executeSql(
"CREATE TABLE `pulls` (\n"
+ "`closedAt` TIMESTAMP(3),\n"
+ "`commentsCount` INT,\n"
+ "`createdAt` TIMESTAMP(3),\n"
+ "`creator` STRING,\n"
+ "`creatorEmail` STRING,\n"
+ "`description` STRING,\n"
+ "`labels` ARRAY<STRING>,\n"
+ "`mergeCommit` STRING,\n"
+ "`mergedAt` TIMESTAMP(3),\n"
+ "`number` INT,\n"
+ "`state` STRING,\n"
+ "`title` STRING,\n"
+ "`updatedAt` TIMESTAMP(3)\n"
+ ") WITH (\n"
+ "'connector' = 'kafka',\n"
+ "'topic' = '"
+ kafkaTopic
+ "',\n"
+ "'properties.bootstrap.servers' = '"
+ kafkaServer
+ "',\n"
+ (kafkaSecurityProtocol != null
? "'properties.security.protocol' = '" + kafkaSecurityProtocol + "',\n"
: "")
+ (kafkaSaslMechanism != null
? "'properties.sasl.mechanism' = '" + kafkaSaslMechanism + "',\n"
: "")
+ (kafkaSaslJaasConfig != null
? "'properties.sasl.jaas.config' = '" + kafkaSaslJaasConfig + "',\n"
: "")
+ "'format' = 'avro-confluent',\n"
+ "'avro-confluent.url' = '" + cfltRegistryURL + "',\n"
+ "'avro-confluent.basic-auth.credentials-source' = '" + cfltRegistryCredSource + "',\n"
+ "'avro-confluent.basic-auth.user-info' = '" + cfltRegistryUserInfo + "'\n"
+ ")");
tableEnv
.fromDataStream(
commits,
$("closedAt"),
$("commentsCount"),
$("createdAt"),
$("creator"),
$("creatorEmail"),
$("description"),
$("labels"),
$("mergeCommit"),
$("mergedAt"),
$("number"),
$("state"),
$("title"),
$("updatedAt"))
.executeInsert("pulls");
}
private static GithubPullRequestSource getGithubPullRequestSource(
final long delayBetweenQueries, final String startDateString) {
Instant startDate = localDateTimeToInstant(Utils.parseFlexibleDate(startDateString));
return new GithubPullRequestSource(APACHE_FLINK_REPOSITORY, startDate, delayBetweenQueries);
}
}