Skip to content

Commit

Permalink
Merge pull request #10 from qbicsoftware/development
Browse files Browse the repository at this point in the history
[0.1] Release
  • Loading branch information
Zethson authored May 14, 2019
2 parents 4836612 + b2dbe04 commit 395446d
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

# DB properties
src/main/resources/database_properties.txt
database_properties.txt
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
language: scala

jdk: oraclejdk8

scala:
- 2.12.8

script:
- sbt ++$TRAVIS_SCALA_VERSION test
58 changes: 54 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,63 @@
[![Build Status](https://travis-ci.com/qbicsoftware/spark-benchmark-cli.svg?branch=development)](https://travis-ci.com/qbicsoftware/spark-benchmark-cli)

# spark-benchmark-cli
Here, various expensive queries and operations for spark benchmarking are defined.
A tool for submitting SQL queries to a Spark Cluster. Various benchmarking statistics will be calculated.
Currently MariaDB is supported out of the box.

# Building
## Building
SBT assembly plugin is configured. Hence, in the project root:
```bash
sbt assembly
```
will build the fat jar. The result will be written to /target/$scala-version/$name-assembly-$version.jar
Run it as usual:

## Running
```bash
java -jar spark-benchmark-cli-assembly-$version.jar
```

## Usage
### Options
```bash
java -jar $name-assembly-$version.jar
Usage: Benchmark [-h] -q[=<sqlQuery>] -c=<configFilePath>
Benchmark Tool for evaluating the performance of a Spark Cluster. Run custom
SQL Queries inside Spark!
-s, --spark run with spark support
-l, --local run spark in local mode - requires -s option to be in effect
-t, --table[=<table>] table to execute SQL query in, mandatory if running with spark support
-d, --driver[=<driver>] driver to access Database, e.g. org.mariadb.jdbc.Driver, mandatory if running with spark support
-q, --query[=<sqlQuery>] SQL query to execute
-c, --config[=<configFilePath>]
database config file path
-h, --help display a help message
```
Required parameters are:
```
-c, --config=<configFilePath>
-q, --query
```
Queries are optionally interactive.
You can either use ```-q``` to get a prompt for your query or supply a full query when running the tool: ```--q[=<sqlQuery>]```.
## Spark
A query can be submitted to spark via:
```bash
/spark/bin/spark-submit --master spark://spark-master:7077 \
/opt/spark-apps/spark-benchmark-cli-assembly-0.1.jar -s -d org.mariadb.jdbc.Driver -c /opt/spark-data/database_properties.txt -t <table> -q <"query">
```
## Example Query
```bash
/spark/bin/spark-submit --master spark://spark-master:7077 \
/opt/spark-apps/spark-benchmark-cli-assembly-0.1.jar -s -d org.mariadb.jdbc.Driver -c /opt/spark-data/database_properties.txt -t Consequence -q "SELECT id FROM Consequence"
```
## Tests
Run tests <b>inside the sbt console</b> from the root project directory using:
```bash
test
```
## Known issues
Due to a bug in the MariaDB connector and Spark, mariadb in the jdbc URL has to be replaced with mysql.
Please refer to: https://github.com/qbicsoftware/spark-benchmark-cli/issues/9 .
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,23 @@ name := "spark-benchmark-cli"

version := "0.1"

scalaVersion := "2.12.8"
scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.0.5",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"info.picocli" % "picocli" % "3.9.6",
"org.mariadb.jdbc" % "mariadb-java-client" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1",
"org.apache.spark" %% "spark-mllib" % "2.4.1",
"org.apache.spark" %% "spark-streaming" % "2.4.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
)

// see: https://github.com/sbt/sbt-assembly#merge-strategy
assemblyMergeStrategy in assembly := {
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat // workaround to resolve building issues with the jdbc connector
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
5 changes: 5 additions & 0 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
114 changes: 114 additions & 0 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import io.cli.CommandLineParser
import io.parser.PropertiesParser
import java.sql.DriverManager

import org.apache.spark.sql.SparkSession
import java.util.Properties

import com.typesafe.scalalogging.Logger

object Main {

val LOG = Logger("Spark Benchmark CLI")

def main(args: Array[String]) {
// parse commandline parameters, get database properties
val commandLineParser = new CommandLineParser()
val commandLineParameters = commandLineParser.parseCommandLineParameters(args)
val databaseProperties = PropertiesParser.readPropertiesFile(commandLineParameters.configFilePath)

try {
if (commandLineParameters.sparkSupport) {
LOG.info("Enabling Spark support")

if (databaseProperties.jdbcURL.contains("mariadb")) {
LOG.warn("________________________________________________________________________________________________")
LOG.warn("Please note that specifying mariadb in the jdbc URL can lead to corrupt MariaDB access.")
LOG.warn("Replace mariadb with mysql in your jdbc URL if you are running into issues.")
LOG.warn("More information can be found here: https://github.com/qbicsoftware/spark-benchmark-cli/issues/9")
LOG.warn("________________________________________________________________________________________________")
}

val spark =
if (commandLineParameters.localMode) {
LOG.info("Running Spark in local mode!")
SparkSession
.builder()
.appName("Spark Benchmark CLI")
.config("spark.master", "local")
.config("spark.driver.extraClassPath", "/opt/spark-apps/spark-apps/mariadb-java-client-2.4.1.jar")
.getOrCreate()
} else {
SparkSession
.builder()
.appName("Spark Benchmark CLI")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
}
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

Class.forName("org.mariadb.jdbc.Driver")

// test database connection
try {
val connection = DriverManager.getConnection(databaseProperties.jdbcURL, databaseProperties.user, databaseProperties.password)
if (!connection.isClosed) {
LOG.error("Connection to the database did not close automatically when performing a connection test!")
}
} catch {
case e: Exception => LOG.error("Something went wrong when attempting to connect to the database. %s", e.getMessage)
}

// Spark likes working with properties, hence we create a properties object
val connectionProperties = new Properties()
connectionProperties.put("user", s"${databaseProperties.user}")
connectionProperties.put("password", s"${databaseProperties.password}")
connectionProperties.put("driver", s"${commandLineParameters.databaseDriver}")

val table = spark.read.jdbc(databaseProperties.jdbcURL, commandLineParameters.table, connectionProperties)
table.printSchema()
table.show()

// NOTE
// Spark requires a View of a table to allow for SQL queries
// CreateOrReplaceTempView will create a temporary view of the table in memory. It is not persistent at this moment but you can run sql query on top of that.
// If you want to save it you can either persist or use saveAsTable to save.
table.createOrReplaceTempView(commandLineParameters.table)

val result = spark.sql(commandLineParameters.sqlQuery)
result.show()
} else {
// RUNNING OUTSIDE SPARK
LOG.info("Spark support has been disabled!")

// connect
// Class.forName("org.mariadb.jdbc.Driver")
val connection = DriverManager.getConnection(databaseProperties.jdbcURL, databaseProperties.user, databaseProperties.password)
connection.isClosed

// execute query
val statement = connection.createStatement()
val rs = statement.executeQuery(commandLineParameters.sqlQuery)

// print results
val list = Iterator.from(0).takeWhile(_ => rs.next()).map(_ => rs.getString(1)).toList
println(list)

// while (rs.next()) {
// println(rs.getString(1)) //or rs.getString("column name");
// }

connection.isClosed
}
} catch {
case eie: ExceptionInInitializerError => {
LOG.error("__________________________________________________________________________________________")
LOG.error("Hadoop support is possibly wrongly configured or missing! " + eie.getException.getMessage)
LOG.error("__________________________________________________________________________________________")
}
case e: Exception => LOG.error("Hadoop support is possibly wrongly configured or missing! " + e.getMessage)
}

}
}
9 changes: 9 additions & 0 deletions src/main/scala/core/DatabaseProperties.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package core

case class DatabaseProperties(jdbcURL: String,
user: String,
password: String,
port: String = "3306",
databaseName: String)


7 changes: 7 additions & 0 deletions src/main/scala/example/CubeCalculator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package example

object CubeCalculator {
def cube(x: Int): Int = {
x * x * x
}
}
6 changes: 0 additions & 6 deletions src/main/scala/example/Main.scala

This file was deleted.

41 changes: 41 additions & 0 deletions src/main/scala/io/cli/CommandLineOptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.cli

import picocli.CommandLine.{Command, Option}

@Command(name = "Benchmark", version = Array("1.0"),
description = Array("@|bold Benchmark Tool|@ for evaluating the performance of a @|red Spark Cluster|@. Run custom SQL Queries inside Spark!"))
class CommandLineOptions {

@Option(names = Array("-s", "--spark"),
description = Array("Spark support"))
var sparkSupport = false

@Option(names = Array("-l", "--local"),
description = Array("Spark in local mode"))
var localMode = false

@Option(names = Array("-d", "--driver"),
description = Array("Database driver name"))
var databaseDriver = ""

@Option(names = Array("-c", "--config"),
description = Array("Database config file path."),
required = true)
var configFilePath = ""

@Option(names = Array("-t", "--table"),
description = Array("Table to run query on. Required if using Spark."))
var table = ""

@Option(names = Array("-q", "--query"),
description = Array("SQL query to execute."),
arity = "0..1",
interactive = true,
required = true)
var sqlQuery = ""

@Option(names = Array("-h", "--help"),
description = Array("Display a help message."),
usageHelp = true)
var helpRequested = false
}
24 changes: 24 additions & 0 deletions src/main/scala/io/cli/CommandLineParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.cli
import picocli.CommandLine


class CommandLineParser {

def parseCommandLineParameters(args: Array[String]): CommandLineOptions = {
if (args.isEmpty) {
CommandLine.usage(new CommandLineOptions, System.out)
System.exit(0)
}

val commandLineOptions = new CommandLineOptions()
new CommandLine(commandLineOptions).parse(args:_*)

if (commandLineOptions.helpRequested) {
CommandLine.usage(new CommandLineOptions, System.out)
System.exit(0)
}

commandLineOptions
}

}
33 changes: 33 additions & 0 deletions src/main/scala/io/parser/PropertiesParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.parser

import core.DatabaseProperties

import scala.io.Source

object PropertiesParser {

def readPropertiesFile(propertiesFileName: String): DatabaseProperties = {
val lines = Source.fromFile(propertiesFileName).getLines.toList
val split = for (line <- lines) yield line.split(":")

// TODO Moms' Spaghetti Bolognese
var host = split.head(1) + ":" + split.head(2) + ":" + split.head(3)
.filterNot((x: Char) => x.equals('\"'))
host = host.replaceAll("\\s", "")
val port = split(1)(1)
.filterNot((x: Char) => x.equals('\"'))
.filterNot((x: Char) => x.isWhitespace)
val name = split(2)(1)
.filterNot((x: Char) => x.equals('\"'))
.filterNot((x: Char) => x.isWhitespace)
val password = split(3)(1)
.filterNot((x: Char) => x.equals('\"'))
.filterNot((x: Char) => x.isWhitespace)
val user = split(4)(1)
.filterNot((x: Char) => x.equals('\"'))
.filterNot((x: Char) => x.isWhitespace)

DatabaseProperties(host, user, password, port, name)
}

}
9 changes: 9 additions & 0 deletions src/test/scala/example/CubeCalculatorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package example

import org.scalatest.FunSuite

class CubeCalculatorTest extends FunSuite {
test("CubeCalculator.cube") {
assert(CubeCalculator.cube(3) === 27)
}
}

0 comments on commit 395446d

Please sign in to comment.