Skip to content

Commit

Permalink
Fixes gearpump#13 Add an ATK example
Browse files Browse the repository at this point in the history
  • Loading branch information
kkasravi committed Sep 24, 2015
1 parent ca59322 commit 23146cc
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.gearpump.examples.atk_pipeline

import java.io.{File, FileInputStream, FileOutputStream}
import java.net.URI

import akka.actor.ActorSystem
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.streaming.dsl.CollectionDataSource
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.task.{StartTime, TaskContext, TypedTask}
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.slf4j.Logger

class KMeansDataProcessor extends java.io.Serializable {
private val LOG: Logger = LogUtil.getLogger(getClass)
var archiveName: String = null
var modelName: String = null
var ModelBytesFileName: String = null

// This will replaced by ATK utility object that loads a tar from HDFS and returns a Model.
def load(tar: String): Unit = {
LOG.info(s"ATKDataProcessor load tar=$tar")
val pt = new Path(tar)
val uri = new URI(tar)
val hdfsFileSystem: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.FileSystem.get(uri, new Configuration())

val tempFilePath = "/tmp/kmeans.tar"
val local = new Path(tempFilePath)

hdfsFileSystem.copyToLocalFile(false, pt, local)
val tmpPath = "/tmp/"
val myTarFile: TarArchiveInputStream = new TarArchiveInputStream(new FileInputStream(new File(tempFilePath)))
var entry: TarArchiveEntry = null
entry = myTarFile.getNextTarEntry
while (entry != null) {
// Get the name of the file
val individualFile: String = entry.getName
// Get Size of the file and create a byte array for the size
val content: Array[Byte] = new Array[Byte](entry.getSize.toInt)
myTarFile.read(content, 0, content.length)
val outputFile = new FileOutputStream(new File(tmpPath + individualFile))
IOUtils.write(content, outputFile)
outputFile.close()
if (individualFile.contains(".jar")) {
archiveName = individualFile.substring(0, individualFile.indexOf(".jar"))
}
else if (individualFile.contains("modelname")) {
val s = new String(content)
modelName = s.replaceAll("\n", "")
}
else {
ModelBytesFileName = tmpPath + individualFile
}
entry = myTarFile.getNextTarEntry
}
myTarFile.close()

}
def next: Unit = {
LOG.info("ATKDataProcessor next")
}
def stop: Unit = {
LOG.info("ATKDataProcessor stop")
}
}

class KMeansTask(taskContext: TaskContext, userConf: UserConfig) extends TypedTask[KMeansDataProcessor](taskContext, userConf) {
import KMeansTask._
override def onStart(startTime : StartTime) : Unit = {
userConf.getString(TAR).foreach(typedTask.load(_))
}

override def onNext(msg : Message) : Unit = {
typedTask.next
}

override def onStop() : Unit = {
typedTask.stop
}
}

object KMeansTask {
val TAR = "trustedanalytics.scoring-engine.archive-tar"
}

object PipeLine extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)

override val options: Array[(String, CLIOption[Any])] = Array(
"tar"-> CLIOption[String]("<tar file location in hdfs>", required = false, defaultValue = Some("/user/gearpump/atk/kmeans.tar"))
)

def application(config: ParseResult, system: ActorSystem): StreamApplication = {
import KMeansTask._
implicit val actorSystem = system
implicit val atkTask = classOf[KMeansTask]
val tar = config.getString("tar")
val appConfig = UserConfig.empty.withString(TAR, tar)
val source = DataSourceProcessor(new CollectionDataSource[String](Seq("one","two","three")), 1)
val atk = Processor(classOf[KMeansDataProcessor], 1, "ATK", appConfig)
val app = StreamApplication("ATKPipeline", Graph(
source ~> atk
), appConfig)
app
}

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val context = ClientContext(akkaConf)
val appId = context.submit(application(config, context.system))
context.close()
}

}

62 changes: 61 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ object Build extends sbt.Build {
new File(packagePath).renameTo(new File(target))
}
)
).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline)
).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, atk_pipeline)

lazy val kafka_hdfs_pipeline = Project(
id = "gearpump-kafka-hdfs-pipeline",
Expand Down Expand Up @@ -266,4 +266,64 @@ object Build extends sbt.Build {
)
)

lazy val atk_pipeline = Project(
id = "gearpump-atk-pipeline",
base = file("atk-pipeline"),
settings = commonSettings ++ myAssemblySettings ++
Seq(
mergeStrategy in assembly := {
case PathList("META-INF", "maven","org.slf4j","slf4j-api", ps) if ps.startsWith("pom") => MergeStrategy.discard
case x =>
val oldStrategy = (mergeStrategy in assembly).value
oldStrategy(x)
},
libraryDependencies ++= Seq(
"com.lihaoyi" %% "upickle" % upickleVersion,
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2",
"com.julianpeeters" % "avro-scala-macro-annotations_2.11" % "0.9.0",
"org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.mortbay.jetty", "jetty-util")
exclude("org.mortbay.jetty", "jetty")
exclude("org.apache.htrace", "htrace-core")
exclude("tomcat", "jasper-runtime"),
"org.apache.hadoop" % "hadoop-yarn-api" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("com.google.guava", "guava")
exclude("com.google.protobuf", "protobuf-java")
exclude("commons-lang", "commons-lang")
exclude("org.apache.htrace", "htrace-core")
exclude("commons-logging", "commons-logging")
exclude("org.apache.hadoop", "hadoop-annotations"),
"org.apache.hadoop" % "hadoop-yarn-client" % clouderaVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("com.google.guava", "guava")
exclude("com.sun.jersey", "jersey-client")
exclude("commons-cli", "commons-cli")
exclude("commons-lang", "commons-lang")
exclude("commons-logging", "commons-logging")
exclude("org.apache.htrace", "htrace-core")
exclude("log4j", "log4j")
exclude("org.apache.hadoop", "hadoop-annotations")
exclude("org.mortbay.jetty", "jetty-util")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-common"),
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
) ++ hadoopDependency,
mainClass in (Compile, packageBin) := Some("io.gearpump.examples.atk_pipeline.PipeLine"),
target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
)
)
}

0 comments on commit 23146cc

Please sign in to comment.