Skip to content

Commit

Permalink
Fixes #13 Add an ATK example
Browse files Browse the repository at this point in the history
  • Loading branch information
kkasravi committed Nov 6, 2015
1 parent 3abca27 commit 482b4b6
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.gearpump.examples.atk_pipeline

import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask
import io.gearpump.streaming.task.{StartTime, TaskContext}

class ATKTask(taskContext: TaskContext, userConf: UserConfig) extends HandlerTask[Scoring](taskContext, userConf) {
val TAR = "trustedanalytics.scoring-engine.archive-tar"

override def onStart(startTime : StartTime) : Unit = {
LOG.info("onStart")
val tar = userConf.getString(TAR).get
handler.load(tar)
}

override def onNext(msg : Message) : Unit = {
LOG.info("onNext")
val seq = msg.msg.asInstanceOf[Seq[Array[String]]]
val score = handler.score(seq)
score.foreach(result => {
result.onComplete(seq => {
seq.map(seq => {
taskContext.output(new Message(seq))
})
})
})
}

}

object ATKTask {
implicit val atkTask = classOf[ATKTask]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.gearpump.examples.atk_pipeline

import scala.concurrent.Future

class KMeans extends Scoring {

override def score(vector: Seq[Array[String]]): Option[Future[Seq[Any]]] = {
model.map(model => {
model.score(vector)
})
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.gearpump.examples.atk_pipeline

import akka.actor.ActorSystem
import com.typesafe.config.{ConfigRenderOptions, ConfigFactory}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.streaming.dsl.CollectionDataSource
import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.DataSource
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.tap.TapJsonConfig
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.slf4j.Logger


object PipeLine extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)

override val options: Array[(String, CLIOption[Any])] = Array(
"models"-> CLIOption[String]("<models found in hdfs>", required = false, defaultValue = Some("/user/gearpump/atk/kmeans.tar")),
"randomforest"-> CLIOption[String]("<tar file location in hdfs>", required = false, defaultValue = Some("/user/gearpump/atk/randomforest.tar")),
"hbase"-> CLIOption[String]("<hbase instance>", required = false, defaultValue = Some("hbase")),
"kafka"-> CLIOption[String]("<kafka instance>", required = false, defaultValue = Some("kafka")),
"zookeeper"-> CLIOption[String]("<zookeeper instance>", required = false, defaultValue = Some("zookeeper")),
"table"-> CLIOption[String]("<hbase table>", required = false, defaultValue = Some("gp_tap_table")),
"topic"-> CLIOption[String]("<kafka topic>", required = false, defaultValue = Some("gp_tap_topic"))
)

def application(config: ParseResult, system: ActorSystem): StreamApplication = {
import ATKTask._
import SourceTask._
implicit val actorSystem = system
val TAR = "trustedanalytics.scoring-engine.archive-tar"
val appConfig = UserConfig.empty.withString(TAR, config.getString("tar"))
val conf = ConfigFactory.load
val services = conf.root.withOnlyKey("VCAP_SERVICES").render(ConfigRenderOptions.defaults().setJson(true))
val tjc = new TapJsonConfig(services)
val hbaseconfig = tjc.getHBase(config.getString("hbase"))
val kafkaconfig = tjc.getKafkaConfig(config.getString("kafka"))
val zookeeperconfig = tjc.getZookeeperConfig(config.getString("zookeeper"))
val topic = config.getString("topic")
val table = config.getString("table")
val zookeepers = zookeeperconfig.get("zookeepers")
val brokers = kafkaconfig.get("brokers")
val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty)
val kmeans = Processor[HandlerTask,Scoring](new KMeans, 1, "ATK", appConfig)
val app = StreamApplication("ATKPipeline", Graph(
kafka ~> kmeans
), UserConfig.empty)
app
}

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val context = ClientContext(akkaConf)
val appId = context.submit(application(config, context.system))
context.close()
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.gearpump.examples.atk_pipeline

import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.trustedanalytics.atk.model.publish.format.ModelPublishFormat
import org.trustedanalytics.atk.scoring.interfaces.Model

import scala.concurrent.Future

trait Scoring extends java.io.Serializable {

var model: Option[Model] = None

def load(tar: String): Unit = {
val pt = new Path(tar)
val uri = new URI(tar)
val hdfsFileSystem: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.FileSystem.get(uri, new Configuration())
val tempFilePath = "/tmp/kmeans.tar"
val local = new Path(tempFilePath)
hdfsFileSystem.copyToLocalFile(false, pt, local)
model = Option(ModelPublishFormat.read(new File(tempFilePath), Thread.currentThread().getContextClassLoader))
}

def score(vector: Seq[Array[String]]): Option[Future[Seq[Any]]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import com.typesafe.config.ConfigFactory
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.source.DataSource
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
Expand All @@ -44,6 +45,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
)

def application(config: ParseResult, system: ActorSystem): StreamApplication = {
import SourceTask._
implicit val actorSystem = system
import Messages._
val pipelineString =
Expand Down Expand Up @@ -75,7 +77,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {

val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
val kafka = DataSourceProcessor(source, 1)
val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty)
val cpuProcessor = Processor[CpuProcessor](processors, "CpuProcessor")
val memoryProcessor = Processor[MemoryProcessor](processors, "MemoryProcessor")
val cpuPersistor = Processor[CpuPersistor](persistors, "CpuPersistor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.ShufflePartitioner
import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.source.DataSource
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
Expand Down Expand Up @@ -54,6 +55,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
)

def application(config: ParseResult, system: ActorSystem): StreamApplication = {
import SourceTask._
implicit val actorSystem = system
val readerNum = config.getInt("reader")
val scorerNum = config.getInt("scorer")
Expand All @@ -67,7 +69,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {

val partitioner = new ShufflePartitioner()
val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
val reader = DataSourceProcessor(source, readerNum)
val reader = Processor[HandlerTask,DataSource](source, readerNum, "KafkaSource", UserConfig.empty)
val scorer = Processor[ScoringTask](scorerNum)
val writer = Processor[ParquetWriterTask](writerNum)

Expand Down
73 changes: 71 additions & 2 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object Build extends sbt.Build {
val travis_deploy = taskKey[Unit]("use this after sbt assembly packArchive, it will rename the package so that travis deploy can find the package.")

val akkaVersion = "2.3.12"
val atkVersion = "0.4.3-master-SNAPSHOT"
val clouderaVersion = "2.6.0-cdh5.4.2"
val clouderaHBaseVersion = "1.0.0-cdh5.4.2"
val gearpumpVersion = "0.6.2-SNAPSHOT"
Expand Down Expand Up @@ -48,7 +49,8 @@ object Build extends sbt.Build {
"bintray/non" at "http://dl.bintray.com/non/maven",
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos",
"clockfly" at "http://dl.bintray.com/clockfly/maven",
"local maven" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
"tap" at "https://maven.trustedanalytics.org/content/repositories/snapshots",
"localmaven" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
),
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
) ++
Expand Down Expand Up @@ -133,7 +135,7 @@ object Build extends sbt.Build {
new File(packagePath).renameTo(new File(target))
}
)
).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, tap_pipeline)
).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, atk_pipeline, tap_pipeline)

lazy val kafka_hdfs_pipeline = Project(
id = "gearpump-kafka-hdfs-pipeline",
Expand Down Expand Up @@ -268,6 +270,73 @@ object Build extends sbt.Build {
)
)

lazy val atk_pipeline = Project(
id = "gearpump-atk-pipeline",
base = file("atk-pipeline"),
settings = commonSettings ++ myAssemblySettings ++
Seq(
mergeStrategy in assembly := {
case PathList("META-INF", "maven","org.slf4j","slf4j-api", ps) if ps.startsWith("pom") => MergeStrategy.discard
case x =>
val oldStrategy = (mergeStrategy in assembly).value
oldStrategy(x)
},
libraryDependencies ++= Seq(
"com.lihaoyi" %% "upickle" % upickleVersion,
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"org.trustedanalytics.gearpump" % "config-tools" % gearpumpTapVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core")
exclude("commons-beanutils", "commons-beanutils-core")
exclude("commons-beanutils", "commons-beanutils"),
"org.trustedanalytics.atk" % "model-publish-format" % atkVersion,
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2",
"com.julianpeeters" % "avro-scala-macro-annotations_2.11" % "0.9.0",
"org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.mortbay.jetty", "jetty-util")
exclude("org.mortbay.jetty", "jetty")
exclude("org.apache.htrace", "htrace-core")
exclude("tomcat", "jasper-runtime"),
"org.apache.hadoop" % "hadoop-yarn-api" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("com.google.guava", "guava")
exclude("com.google.protobuf", "protobuf-java")
exclude("commons-lang", "commons-lang")
exclude("org.apache.htrace", "htrace-core")
exclude("commons-logging", "commons-logging")
exclude("org.apache.hadoop", "hadoop-annotations"),
"org.apache.hadoop" % "hadoop-yarn-client" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("com.google.guava", "guava")
exclude("com.sun.jersey", "jersey-client")
exclude("commons-cli", "commons-cli")
exclude("commons-lang", "commons-lang")
exclude("commons-logging", "commons-logging")
exclude("org.apache.htrace", "htrace-core")
exclude("log4j", "log4j")
exclude("org.apache.hadoop", "hadoop-annotations")
exclude("org.mortbay.jetty", "jetty-util")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-common"),
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
) ++ hadoopDependency,
mainClass in (Compile, packageBin) := Some("io.gearpump.examples.atk_pipeline.PipeLine"),
target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
)
)

lazy val tap_pipeline = Project(
id = "gearpump-tap-pipeline",
base = file("tap-pipeline"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.external.hbase.HBaseSink
import io.gearpump.streaming.StreamApplication
import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.sink.DataSinkProcessor
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.sink.DataSink
import io.gearpump.streaming.source.DataSource
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.tap.TapJsonConfig
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
Expand All @@ -39,6 +40,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array(
"hbase"-> CLIOption[String]("<hbase instance>", required = false, defaultValue = Some("hbase")),
"kafka"-> CLIOption[String]("<kafka instance>", required = false, defaultValue = Some("kafka")),
"zookeeper"-> CLIOption[String]("<zookeeper instance>", required = false, defaultValue = Some("zookeeper")),
"table"-> CLIOption[String]("<hbase table>", required = false, defaultValue = Some("gp_tap_table")),
"topic"-> CLIOption[String]("<kafka topic>", required = false, defaultValue = Some("gp_tap_topic"))
)
Expand All @@ -50,17 +52,16 @@ object PipeLine extends AkkaApp with ArgumentsParser {
val services = conf.root.withOnlyKey("VCAP_SERVICES").render(ConfigRenderOptions.defaults().setJson(true))
val tjc = new TapJsonConfig(services)
val hbaseconfig = tjc.getHBase(config.getString("hbase"))
//val kafkaconfig = tjc.getKafka(config.getString("hbase"))
val kafkaconfig = Map(
"zookeepers" -> "10.10.10.46:9092,10.10.10.164:9092,10.10.10.236:9092",
"brokers" -> "10.10.10.46:2181,10.10.10.236:2181,10.10.10.164:2181/kafka"
)
val kafkaconfig = tjc.getKafkaConfig(config.getString("kafka"))
val zookeeperconfig = tjc.getZookeeperConfig(config.getString("zookeeper"))
val topic = config.getString("topic")
val table = config.getString("table")
val zookeepers = kafkaconfig.get("zookeepers").get
val brokers = kafkaconfig.get("brokers").get
val source = DataSourceProcessor(new KafkaSource(topic, zookeepers,new KafkaStorageFactory(zookeepers, brokers)), 1)
val sink = DataSinkProcessor(new HBaseSink(table, hbaseconfig), 1)
val zookeepers = zookeeperconfig.get("zookeepers")
val brokers = kafkaconfig.get("brokers")
val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty)
val sink = Processor[HandlerTask,DataSink](new HBaseSink(table, hbaseconfig), 1, "HBaseSink", UserConfig.empty)
val app = StreamApplication("TAPPipeline", Graph(
source ~> sink
), UserConfig.empty)
Expand Down

0 comments on commit 482b4b6

Please sign in to comment.