Skip to content

Commit

Permalink
Merge pull request #15 from kkasravi/issue_#14
Browse files Browse the repository at this point in the history
Fixes #14 Upgrade to GearPump 0.6.4.1
  • Loading branch information
kkasravi committed Sep 18, 2015
2 parents d8bbead + 13bc35e commit ca59322
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
* limitations under the License.
*/

package org.apache.gearpump.examples.kafka_hbase_pipeline
package io.gearpump.examples.kafka_hbase_pipeline

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import org.apache.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.streaming.{Processor, StreamApplication}
import org.apache.gearpump.util.Graph._
import org.apache.gearpump.util.{Graph, LogUtil}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.slf4j.Logger

object PipeLine extends App with ArgumentsParser {
object PipeLine extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
val PROCESSORS = "pipeline.processors"
val PERSISTORS = "pipeline.persistors"
Expand All @@ -42,7 +43,8 @@ object PipeLine extends App with ArgumentsParser {
"zookeepers" -> CLIOption[String]("<zookeepers>", required = false, defaultValue = Some("10.10.10.46:2181,10.10.10.236:2181,10.10.10.164:2181/kafka"))
)

def application(config: ParseResult): StreamApplication = {
def application(config: ParseResult, system: ActorSystem): StreamApplication = {
implicit val actorSystem = system
import Messages._
val pipelineString =
"""
Expand Down Expand Up @@ -85,10 +87,11 @@ object PipeLine extends App with ArgumentsParser {
app
}

val config = parse(args)
val context = ClientContext()
implicit val system = context.system
val appId = context.submit(application(config))
context.close()
override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val context = ClientContext(akkaConf)
val appId = context.submit(application(config, context.system))
context.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
* limitations under the License.
*/

package org.apache.gearpump.examples.kafka_hbase_pipeline
package io.gearpump.examples.kafka_hbase_pipeline

import com.typesafe.config.Config
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.examples.kafka_hbase_pipeline.Messages._
import org.apache.gearpump.external.hbase.HBaseSink
import org.apache.gearpump.external.hbase.HBaseSink._
import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
import org.apache.gearpump.util.LogUtil
import io.gearpump._
import io.gearpump.cluster.UserConfig
import io.gearpump.examples.kafka_hbase_pipeline.Messages._
import io.gearpump.external.hbase.HBaseSink
import io.gearpump.external.hbase.HBaseSink._
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.util.LogUtil
import org.slf4j.Logger
import upickle._
import upickle.default._

import scala.language.implicitConversions
import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hbase_pipeline
package io.gearpump.examples.kafka_hbase_pipeline

//Todo
class PipeLineSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hbase_pipeline
package io.gearpump.examples.kafka_hbase_pipeline

import com.typesafe.config.ConfigFactory
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.examples.kafka_hbase_pipeline.Messages.{Body, Datum, Envelope, _}
import org.apache.gearpump.external.hbase.HBaseSink
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.task.StartTime
import org.apache.gearpump.util.LogUtil
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.examples.kafka_hbase_pipeline.Messages.{Body, Datum, Envelope, _}
import io.gearpump.external.hbase.HBaseSink
import io.gearpump.streaming.MockUtil
import io.gearpump.streaming.task.StartTime
import io.gearpump.util.LogUtil
import org.mockito.Mockito
import org.mockito.Mockito._
import org.mockito.mock.SerializableMode
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
import org.slf4j.Logger
import upickle._
import upickle.default._

object Processors {
val LOG: Logger = LogUtil.getLogger(getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hdfs_pipeline
package io.gearpump.examples.kafka_hdfs_pipeline

import org.apache.avro.Schema
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.examples.kafka_hdfs_pipeline.ParquetWriterTask._
import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.examples.kafka_hdfs_pipeline.ParquetWriterTask._
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.parquet.avro.AvroParquetWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hdfs_pipeline
package io.gearpump.examples.kafka_hdfs_pipeline

import akka.actor.ActorSystem
import com.julianpeeters.avro.annotations._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import org.apache.gearpump.partitioner.ShufflePartitioner
import org.apache.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.streaming.{Processor, StreamApplication}
import org.apache.gearpump.util.Graph._
import org.apache.gearpump.util.{Graph, LogUtil}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.ShufflePartitioner
import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.slf4j.Logger

import scala.util.Try

case class SpaceShuttleMessage(id: String, on: String, body: String)

/**
Expand All @@ -41,7 +40,7 @@ case class SpaceShuttleMessage(id: String, on: String, body: String)
@AvroRecord
case class SpaceShuttleRecord(var ts: Long, var anomaly: Double)

object PipeLine extends App with ArgumentsParser {
object PipeLine extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)

override val options: Array[(String, CLIOption[Any])] = Array(
Expand All @@ -54,10 +53,8 @@ object PipeLine extends App with ArgumentsParser {
"zookeepers" -> CLIOption[String]("<zookeepers>", required = false, defaultValue = Some("10.10.10.46:2181,10.10.10.236:2181,10.10.10.164:2181/kafka"))
)

val context = ClientContext()
implicit val system = context.system

def application(context: ClientContext, config: ParseResult): Unit = {
def application(config: ParseResult, system: ActorSystem): StreamApplication = {
implicit val actorSystem = system
val readerNum = config.getInt("reader")
val scorerNum = config.getInt("scorer")
val writerNum = config.getInt("writer")
Expand All @@ -76,14 +73,13 @@ object PipeLine extends App with ArgumentsParser {

val dag = Graph(reader ~ partitioner ~> scorer ~ partitioner ~> writer)
val app = StreamApplication("KafkaHdfsPipeLine", dag, appConfig)
app
}

context.submit(app)
override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val context = ClientContext(akkaConf)
val appId = context.submit(application(config, context.system))
context.close()
}

Try({
application(context, parse(args))
}).failed.foreach(throwable => {
LOG.error("Application Failed", throwable)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hdfs_pipeline
package io.gearpump.examples.kafka_hdfs_pipeline

import akka.io.IO
import akka.pattern.ask
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Constants
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.task.{Task, TaskContext}
import io.gearpump.util.Constants
import spray.can.Http
import spray.http.HttpMethods._
import spray.http.{HttpRequest, HttpResponse, Uri}
import upickle._
import upickle.default._

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.gearpump.examples.kafka_hdfs_pipeline
package io.gearpump.examples.kafka_hdfs_pipeline

import akka.actor.ActorSystem
import org.apache.avro.Schema
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.MockUtil
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.parquet.avro.{AvroParquetReader, AvroParquetWriter}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.examples.kafka_hdfs_pipeline
package io.gearpump.examples.kafka_hdfs_pipeline

import akka.actor.ActorSystem
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource
import org.apache.gearpump.util.LogUtil
import io.gearpump._
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.streaming.transaction.api.TimeReplayableSource
import io.gearpump.util.LogUtil
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
import org.slf4j.Logger
Expand Down
21 changes: 6 additions & 15 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ object Build extends sbt.Build {
*/
val travis_deploy = taskKey[Unit]("use this after sbt assembly packArchive, it will rename the package so that travis deploy can find the package.")

val akkaVersion = "2.3.6"
val akkaVersion = "2.3.12"
val clouderaVersion = "2.6.0-cdh5.4.2"
val clouderaHBaseVersion = "1.0.0-cdh5.4.2"
val gearpumpVersion = "0.4.1"
val hadoopVersion = "2.6.0"
val gearpumpVersion = "0.6.2-SNAPSHOT"
val junitVersion = "4.12"
val kafkaVersion = "0.8.2.1"
val mockitoVersion = "1.10.17"
val parquetVersion = "1.7.0"
val sprayVersion = "1.3.2"
val upickleVersion = "0.2.8"
val upickleVersion = "0.3.4"

val scalaVersionMajor = "scala-2.11"
val scalaVersionNumber = "2.11.5"
Expand Down Expand Up @@ -121,7 +120,7 @@ object Build extends sbt.Build {
)

lazy val root = Project(
id = "gearpump",
id = "gearpump-examples",
base = file("."),
settings = commonSettings ++
Seq(
Expand Down Expand Up @@ -151,8 +150,6 @@ object Build extends sbt.Build {
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-daemon" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests",
Expand Down Expand Up @@ -195,7 +192,7 @@ object Build extends sbt.Build {
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
) ++ hadoopDependency,
mainClass in (Compile, packageBin) := Some("org.apache.gearpump.examples.kafka_hdfs_pipeline.PipeLine"),
mainClass in (Compile, packageBin) := Some("io.gearpump.examples.kafka_hdfs_pipeline.PipeLine"),
target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
)
)
Expand All @@ -218,16 +215,10 @@ object Build extends sbt.Build {
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core"),
"com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-daemon" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core"),
"com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests",
"com.github.intel-hadoop" %% "gearpump-experiments-dsl" % gearpumpVersion % "provided"
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core"),
"com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion
exclude("org.fusesource.leveldbjni", "leveldbjni-all")
exclude("org.apache.htrace", "htrace-core"),
Expand Down Expand Up @@ -270,7 +261,7 @@ object Build extends sbt.Build {
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
) ++ hadoopDependency,
mainClass in (Compile, packageBin) := Some("org.apache.gearpump.examples.kafka_hbase_pipeline.PipeLine"),
mainClass in (Compile, packageBin) := Some("io.gearpump.examples.kafka_hbase_pipeline.PipeLine"),
target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
)
)
Expand Down
16 changes: 10 additions & 6 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.2")
resolvers += Resolver.url("fvunicorn", url("http://dl.bintray.com/fvunicorn/sbt-plugins"))(Resolver.ivyStylePatterns)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
resolvers += Resolver.url("sbt-plugin", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

resolvers += Classpaths.sbtPluginReleases

addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.4")

resolvers += "fvunicorn" at "http://dl.bintray.com/fvunicorn/sbt-plugins"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.4")
addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.6")

addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6")

Expand All @@ -16,8 +20,8 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.8.5")

resolvers += Classpaths.sbtPluginReleases

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.1.0")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")

addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.1-SNAPSHOT"
version in ThisBuild := "0.2-SNAPSHOT"

0 comments on commit ca59322

Please sign in to comment.