From a1369cb5fcb2700a18743d6a7d7f82c1e044bf8e Mon Sep 17 00:00:00 2001 From: Kam Kasravi Date: Wed, 23 Sep 2015 05:53:00 -0700 Subject: [PATCH] Fixes #13 Add an ATK example --- .../examples/atk_pipeline/ATKTask.scala | 34 ++++++++++ .../examples/atk_pipeline/KMeans.scala | 13 ++++ .../examples/atk_pipeline/PipeLine.scala | 64 +++++++++++++++++ .../examples/atk_pipeline/Scoring.scala | 29 ++++++++ .../kafka_hbase_pipeline/PipeLine.scala | 6 +- .../kafka_hdfs_pipeline/PipeLine.scala | 6 +- project/Build.scala | 68 ++++++++++++++++++- .../examples/tap_pipeline/PipeLine.scala | 13 ++-- 8 files changed, 222 insertions(+), 11 deletions(-) create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala new file mode 100644 index 0000000..3e7944b --- /dev/null +++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala @@ -0,0 +1,34 @@ +package io.gearpump.examples.atk_pipeline + +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask +import io.gearpump.streaming.task.{StartTime, TaskContext} + +class ATKTask(taskContext: TaskContext, userConf: UserConfig) extends HandlerTask[Scoring](taskContext, userConf) { + val TAR = "trustedanalytics.scoring-engine.archive-tar" + + override def onStart(startTime : StartTime) : Unit = { + LOG.info("onStart") + val tar = userConf.getString(TAR).get + handler.load(tar) + } + + override def onNext(msg : Message) : Unit = { + LOG.info("onNext") + val seq = msg.msg.asInstanceOf[Seq[Array[String]]] + val score = handler.score(seq) + score.foreach(result => { + result.onComplete(seq => { + seq.map(seq => { + taskContext.output(new Message(seq)) + }) + }) + }) + } + +} + +object ATKTask { + implicit val atkTask = classOf[ATKTask] +} diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala new file mode 100644 index 0000000..716fdc7 --- /dev/null +++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala @@ -0,0 +1,13 @@ +package io.gearpump.examples.atk_pipeline + +import scala.concurrent.Future + +class KMeans extends Scoring { + + override def score(vector: Seq[Array[String]]): Option[Future[Seq[Any]]] = { + model.map(model => { + model.score(vector) + }) + } + +} diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala new file mode 100644 index 0000000..46e4478 --- /dev/null +++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.examples.atk_pipeline + +import akka.actor.ActorSystem +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import io.gearpump.streaming.dsl.CollectionDataSource +import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask} +import io.gearpump.streaming.source.DataSource +import io.gearpump.streaming.{Processor, StreamApplication} +import io.gearpump.util.Graph._ +import io.gearpump.util.{AkkaApp, Graph, LogUtil} +import org.slf4j.Logger + + +object PipeLine extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "models"-> CLIOption[String]("", required = false, defaultValue = Some("/user/gearpump/atk/kmeans.tar")), + "randomforest"-> CLIOption[String]("", required = false, defaultValue = Some("/user/gearpump/atk/randomforest.tar")) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + import ATKTask._ + import SourceTask._ + implicit val actorSystem = system + val TAR = "trustedanalytics.scoring-engine.archive-tar" + val appConfig = UserConfig.empty.withString(TAR, config.getString("tar")) + val sourceProcessor = Processor[HandlerTask,DataSource](new CollectionDataSource[String](Seq("one","two","three")), 1, "Source", UserConfig.empty) + val kmeansProcessor = Processor[HandlerTask,Scoring](new KMeans, 1, "ATK", appConfig) + val app = StreamApplication("ATKPipeline", Graph( + sourceProcessor ~> kmeansProcessor + ), UserConfig.empty) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } + +} + diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala new file mode 100644 index 0000000..2f24956 --- /dev/null +++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala @@ -0,0 +1,29 @@ +package io.gearpump.examples.atk_pipeline + +import java.io.File +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.trustedanalytics.atk.model.publish.format.ModelPublishFormat +import org.trustedanalytics.atk.scoring.interfaces.Model + +import scala.concurrent.Future + +trait Scoring extends java.io.Serializable { + + var model: Option[Model] = None + + def load(tar: String): Unit = { + val pt = new Path(tar) + val uri = new URI(tar) + val hdfsFileSystem: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.FileSystem.get(uri, new Configuration()) + val tempFilePath = "/tmp/kmeans.tar" + val local = new Path(tempFilePath) + hdfsFileSystem.copyToLocalFile(false, pt, local) + model = Option(ModelPublishFormat.read(new File(tempFilePath), Thread.currentThread().getContextClassLoader)) + } + + def score(vector: Seq[Array[String]]): Option[Future[Seq[Any]]] + +} diff --git a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala index daef567..e07526a 100644 --- a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala +++ b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala @@ -23,8 +23,9 @@ import com.typesafe.config.ConfigFactory import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask} import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.streaming.source.DataSource import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} @@ -44,6 +45,7 @@ object PipeLine extends AkkaApp with ArgumentsParser { ) def application(config: ParseResult, system: ActorSystem): StreamApplication = { + import SourceTask._ implicit val actorSystem = system import Messages._ val pipelineString = @@ -75,7 +77,7 @@ object PipeLine extends AkkaApp with ArgumentsParser { val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers) val source = new KafkaSource(topic, zookeepers, offsetStorageFactory) - val kafka = DataSourceProcessor(source, 1) + val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty) val cpuProcessor = Processor[CpuProcessor](processors, "CpuProcessor") val memoryProcessor = Processor[MemoryProcessor](processors, "MemoryProcessor") val cpuPersistor = Processor[CpuPersistor](persistors, "CpuPersistor") diff --git a/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala b/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala index 7de9a88..49ed013 100644 --- a/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala +++ b/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala @@ -23,8 +23,9 @@ import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.ShufflePartitioner +import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask} import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.streaming.source.DataSource import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} @@ -54,6 +55,7 @@ object PipeLine extends AkkaApp with ArgumentsParser { ) def application(config: ParseResult, system: ActorSystem): StreamApplication = { + import SourceTask._ implicit val actorSystem = system val readerNum = config.getInt("reader") val scorerNum = config.getInt("scorer") @@ -67,7 +69,7 @@ object PipeLine extends AkkaApp with ArgumentsParser { val partitioner = new ShufflePartitioner() val source = new KafkaSource(topic, zookeepers, offsetStorageFactory) - val reader = DataSourceProcessor(source, readerNum) + val reader = Processor[HandlerTask,DataSource](source, readerNum, "KafkaSource", UserConfig.empty) val scorer = Processor[ScoringTask](scorerNum) val writer = Processor[ParquetWriterTask](writerNum) diff --git a/project/Build.scala b/project/Build.scala index 8a4d6f8..17fb76e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -21,6 +21,7 @@ object Build extends sbt.Build { val travis_deploy = taskKey[Unit]("use this after sbt assembly packArchive, it will rename the package so that travis deploy can find the package.") val akkaVersion = "2.3.12" + val atkVersion = "0.4.3-master-SNAPSHOT" val clouderaVersion = "2.6.0-cdh5.4.2" val clouderaHBaseVersion = "1.0.0-cdh5.4.2" val gearpumpVersion = "0.6.2-SNAPSHOT" @@ -48,7 +49,8 @@ object Build extends sbt.Build { "bintray/non" at "http://dl.bintray.com/non/maven", "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos", "clockfly" at "http://dl.bintray.com/clockfly/maven", - "local maven" at "file://"+Path.userHome.absolutePath+"/.m2/repository" + "tap" at "https://maven.trustedanalytics.org/content/repositories/snapshots", + "localmaven" at "file://"+Path.userHome.absolutePath+"/.m2/repository" ), addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full) ) ++ @@ -133,7 +135,7 @@ object Build extends sbt.Build { new File(packagePath).renameTo(new File(target)) } ) - ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, tap_pipeline) + ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, atk_pipeline, tap_pipeline) lazy val kafka_hdfs_pipeline = Project( id = "gearpump-kafka-hdfs-pipeline", @@ -268,6 +270,68 @@ object Build extends sbt.Build { ) ) + lazy val atk_pipeline = Project( + id = "gearpump-atk-pipeline", + base = file("atk-pipeline"), + settings = commonSettings ++ myAssemblySettings ++ + Seq( + mergeStrategy in assembly := { + case PathList("META-INF", "maven","org.slf4j","slf4j-api", ps) if ps.startsWith("pom") => MergeStrategy.discard + case x => + val oldStrategy = (mergeStrategy in assembly).value + oldStrategy(x) + }, + libraryDependencies ++= Seq( + "com.lihaoyi" %% "upickle" % upickleVersion, + "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided" + exclude("org.fusesource.leveldbjni", "leveldbjni-all"), + "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests", + "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided" + exclude("org.fusesource.leveldbjni", "leveldbjni-all"), + "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests", + "com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all"), + "org.trustedanalytics.atk" % "model-publish-format" % atkVersion, + "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2", + "com.julianpeeters" % "avro-scala-macro-annotations_2.11" % "0.9.0", + "org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.mortbay.jetty", "jetty") + exclude("org.apache.htrace", "htrace-core") + exclude("tomcat", "jasper-runtime"), + "org.apache.hadoop" % "hadoop-yarn-api" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("com.google.guava", "guava") + exclude("com.google.protobuf", "protobuf-java") + exclude("commons-lang", "commons-lang") + exclude("org.apache.htrace", "htrace-core") + exclude("commons-logging", "commons-logging") + exclude("org.apache.hadoop", "hadoop-annotations"), + "org.apache.hadoop" % "hadoop-yarn-client" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("com.google.guava", "guava") + exclude("com.sun.jersey", "jersey-client") + exclude("commons-cli", "commons-cli") + exclude("commons-lang", "commons-lang") + exclude("commons-logging", "commons-logging") + exclude("org.apache.htrace", "htrace-core") + exclude("log4j", "log4j") + exclude("org.apache.hadoop", "hadoop-annotations") + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.apache.hadoop", "hadoop-yarn-api") + exclude("org.apache.hadoop", "hadoop-yarn-common"), + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", + "org.mockito" % "mockito-core" % mockitoVersion % "test", + "junit" % "junit" % junitVersion % "test" + ) ++ hadoopDependency, + mainClass in (Compile, packageBin) := Some("io.gearpump.examples.atk_pipeline.PipeLine"), + target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor + ) + ) + lazy val tap_pipeline = Project( id = "gearpump-tap-pipeline", base = file("tap-pipeline"), diff --git a/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala index 74610db..5b7d93f 100644 --- a/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala +++ b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala @@ -24,10 +24,11 @@ import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.external.hbase.HBaseSink -import io.gearpump.streaming.StreamApplication +import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.streaming.sink.DataSink +import io.gearpump.streaming.source.DataSource +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.tap.TapJsonConfig import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} @@ -59,8 +60,10 @@ object PipeLine extends AkkaApp with ArgumentsParser { val table = config.getString("table") val zookeepers = kafkaconfig.get("zookeepers").get val brokers = kafkaconfig.get("brokers").get - val source = DataSourceProcessor(new KafkaSource(topic, zookeepers,new KafkaStorageFactory(zookeepers, brokers)), 1) - val sink = DataSinkProcessor(new HBaseSink(table, hbaseconfig), 1) + val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers) + val source = new KafkaSource(topic, zookeepers, offsetStorageFactory) + val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty) + val sink = Processor[HandlerTask,DataSink](new HBaseSink(table, hbaseconfig), 1, "HBaseSink", UserConfig.empty) val app = StreamApplication("TAPPipeline", Graph( source ~> sink ), UserConfig.empty)