diff --git a/.gitignore b/.gitignore index 20a46bf..029ea61 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,7 @@ project/plugins/project/ .scala_dependencies .worksheet .idea + +# DB properties +src/main/resources/database_properties.txt +database_properties.txt diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..12fa179 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: scala + +jdk: oraclejdk8 + +scala: + - 2.12.8 + +script: + - sbt ++$TRAVIS_SCALA_VERSION test \ No newline at end of file diff --git a/README.md b/README.md index e477405..020e9e8 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,63 @@ +[![Build Status](https://travis-ci.com/qbicsoftware/spark-benchmark-cli.svg?branch=development)](https://travis-ci.com/qbicsoftware/spark-benchmark-cli) + # spark-benchmark-cli -Here, various expensive queries and operations for spark benchmarking are defined. +A tool for submitting SQL queries to a Spark Cluster. Various benchmarking statistics will be calculated. +Currently MariaDB is supported out of the box. -# Building +## Building SBT assembly plugin is configured. Hence, in the project root: ```bash sbt assembly ``` will build the fat jar. The result will be written to /target/$scala-version/$name-assembly-$version.jar -Run it as usual: + +## Running +```bash +java -jar spark-benchmark-cli-assembly-$version.jar +``` + +## Usage +### Options ```bash -java -jar $name-assembly-$version.jar +Usage: Benchmark [-h] -q[=] -c= +Benchmark Tool for evaluating the performance of a Spark Cluster. Run custom +SQL Queries inside Spark! + -s, --spark run with spark support + -l, --local run spark in local mode - requires -s option to be in effect + -t, --table[=] table to execute SQL query in, mandatory if running with spark support + -d, --driver[=] driver to access Database, e.g. org.mariadb.jdbc.Driver, mandatory if running with spark support + -q, --query[=] SQL query to execute + -c, --config[=] + database config file path + -h, --help display a help message ``` +Required parameters are: +``` +-c, --config= +-q, --query +``` +Queries are optionally interactive. +You can either use ```-q``` to get a prompt for your query or supply a full query when running the tool: ```--q[=]```. + +## Spark +A query can be submitted to spark via: +```bash +/spark/bin/spark-submit --master spark://spark-master:7077 \ +/opt/spark-apps/spark-benchmark-cli-assembly-0.1.jar -s -d org.mariadb.jdbc.Driver -c /opt/spark-data/database_properties.txt -t
-q <"query"> +``` + +## Example Query +```bash +/spark/bin/spark-submit --master spark://spark-master:7077 \ +/opt/spark-apps/spark-benchmark-cli-assembly-0.1.jar -s -d org.mariadb.jdbc.Driver -c /opt/spark-data/database_properties.txt -t Consequence -q "SELECT id FROM Consequence" +``` + +## Tests +Run tests inside the sbt console from the root project directory using: +```bash +test +``` + +## Known issues +Due to a bug in the MariaDB connector and Spark, mariadb in the jdbc URL has to be replaced with mysql. +Please refer to: https://github.com/qbicsoftware/spark-benchmark-cli/issues/9 . diff --git a/build.sbt b/build.sbt index b0e0885..d92178b 100644 --- a/build.sbt +++ b/build.sbt @@ -2,4 +2,23 @@ name := "spark-benchmark-cli" version := "0.1" -scalaVersion := "2.12.8" \ No newline at end of file +scalaVersion := "2.12.8" + +libraryDependencies ++= Seq( + "org.scalactic" %% "scalactic" % "3.0.5", + "org.scalatest" %% "scalatest" % "3.0.5" % "test", + "info.picocli" % "picocli" % "3.9.6", + "org.mariadb.jdbc" % "mariadb-java-client" % "2.4.1", + "org.apache.spark" %% "spark-core" % "2.4.1", + "org.apache.spark" %% "spark-sql" % "2.4.1", + "org.apache.spark" %% "spark-mllib" % "2.4.1", + "org.apache.spark" %% "spark-streaming" % "2.4.1", + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" +) + +// see: https://github.com/sbt/sbt-assembly#merge-strategy +assemblyMergeStrategy in assembly := { + case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat // workaround to resolve building issues with the jdbc connector + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..b10ba2b --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n \ No newline at end of file diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala new file mode 100644 index 0000000..ece718c --- /dev/null +++ b/src/main/scala/Main.scala @@ -0,0 +1,114 @@ +import io.cli.CommandLineParser +import io.parser.PropertiesParser +import java.sql.DriverManager + +import org.apache.spark.sql.SparkSession +import java.util.Properties + +import com.typesafe.scalalogging.Logger + +object Main { + + val LOG = Logger("Spark Benchmark CLI") + + def main(args: Array[String]) { + // parse commandline parameters, get database properties + val commandLineParser = new CommandLineParser() + val commandLineParameters = commandLineParser.parseCommandLineParameters(args) + val databaseProperties = PropertiesParser.readPropertiesFile(commandLineParameters.configFilePath) + + try { + if (commandLineParameters.sparkSupport) { + LOG.info("Enabling Spark support") + + if (databaseProperties.jdbcURL.contains("mariadb")) { + LOG.warn("________________________________________________________________________________________________") + LOG.warn("Please note that specifying mariadb in the jdbc URL can lead to corrupt MariaDB access.") + LOG.warn("Replace mariadb with mysql in your jdbc URL if you are running into issues.") + LOG.warn("More information can be found here: https://github.com/qbicsoftware/spark-benchmark-cli/issues/9") + LOG.warn("________________________________________________________________________________________________") + } + + val spark = + if (commandLineParameters.localMode) { + LOG.info("Running Spark in local mode!") + SparkSession + .builder() + .appName("Spark Benchmark CLI") + .config("spark.master", "local") + .config("spark.driver.extraClassPath", "/opt/spark-apps/spark-apps/mariadb-java-client-2.4.1.jar") + .getOrCreate() + } else { + SparkSession + .builder() + .appName("Spark Benchmark CLI") + // .config("spark.some.config.option", "some-value") + .getOrCreate() + } + // For implicit conversions like converting RDDs to DataFrames + import spark.implicits._ + + Class.forName("org.mariadb.jdbc.Driver") + + // test database connection + try { + val connection = DriverManager.getConnection(databaseProperties.jdbcURL, databaseProperties.user, databaseProperties.password) + if (!connection.isClosed) { + LOG.error("Connection to the database did not close automatically when performing a connection test!") + } + } catch { + case e: Exception => LOG.error("Something went wrong when attempting to connect to the database. %s", e.getMessage) + } + + // Spark likes working with properties, hence we create a properties object + val connectionProperties = new Properties() + connectionProperties.put("user", s"${databaseProperties.user}") + connectionProperties.put("password", s"${databaseProperties.password}") + connectionProperties.put("driver", s"${commandLineParameters.databaseDriver}") + + val table = spark.read.jdbc(databaseProperties.jdbcURL, commandLineParameters.table, connectionProperties) + table.printSchema() + table.show() + + // NOTE + // Spark requires a View of a table to allow for SQL queries + // CreateOrReplaceTempView will create a temporary view of the table in memory. It is not persistent at this moment but you can run sql query on top of that. + // If you want to save it you can either persist or use saveAsTable to save. + table.createOrReplaceTempView(commandLineParameters.table) + + val result = spark.sql(commandLineParameters.sqlQuery) + result.show() + } else { + // RUNNING OUTSIDE SPARK + LOG.info("Spark support has been disabled!") + + // connect + // Class.forName("org.mariadb.jdbc.Driver") + val connection = DriverManager.getConnection(databaseProperties.jdbcURL, databaseProperties.user, databaseProperties.password) + connection.isClosed + + // execute query + val statement = connection.createStatement() + val rs = statement.executeQuery(commandLineParameters.sqlQuery) + + // print results + val list = Iterator.from(0).takeWhile(_ => rs.next()).map(_ => rs.getString(1)).toList + println(list) + + // while (rs.next()) { + // println(rs.getString(1)) //or rs.getString("column name"); + // } + + connection.isClosed + } + } catch { + case eie: ExceptionInInitializerError => { + LOG.error("__________________________________________________________________________________________") + LOG.error("Hadoop support is possibly wrongly configured or missing! " + eie.getException.getMessage) + LOG.error("__________________________________________________________________________________________") + } + case e: Exception => LOG.error("Hadoop support is possibly wrongly configured or missing! " + e.getMessage) + } + + } +} diff --git a/src/main/scala/core/DatabaseProperties.scala b/src/main/scala/core/DatabaseProperties.scala new file mode 100644 index 0000000..debb2a4 --- /dev/null +++ b/src/main/scala/core/DatabaseProperties.scala @@ -0,0 +1,9 @@ +package core + +case class DatabaseProperties(jdbcURL: String, + user: String, + password: String, + port: String = "3306", + databaseName: String) + + diff --git a/src/main/scala/example/CubeCalculator.scala b/src/main/scala/example/CubeCalculator.scala new file mode 100644 index 0000000..5405686 --- /dev/null +++ b/src/main/scala/example/CubeCalculator.scala @@ -0,0 +1,7 @@ +package example + +object CubeCalculator { + def cube(x: Int): Int = { + x * x * x + } +} \ No newline at end of file diff --git a/src/main/scala/example/Main.scala b/src/main/scala/example/Main.scala deleted file mode 100644 index b70b631..0000000 --- a/src/main/scala/example/Main.scala +++ /dev/null @@ -1,6 +0,0 @@ -package example - -object Main extends App { - val ages = Seq(42, 75, 29, 64) - println(s"The oldest person is ${ages.max}") -} diff --git a/src/main/scala/io/cli/CommandLineOptions.scala b/src/main/scala/io/cli/CommandLineOptions.scala new file mode 100644 index 0000000..e3523b2 --- /dev/null +++ b/src/main/scala/io/cli/CommandLineOptions.scala @@ -0,0 +1,41 @@ +package io.cli + +import picocli.CommandLine.{Command, Option} + +@Command(name = "Benchmark", version = Array("1.0"), + description = Array("@|bold Benchmark Tool|@ for evaluating the performance of a @|red Spark Cluster|@. Run custom SQL Queries inside Spark!")) +class CommandLineOptions { + + @Option(names = Array("-s", "--spark"), + description = Array("Spark support")) + var sparkSupport = false + + @Option(names = Array("-l", "--local"), + description = Array("Spark in local mode")) + var localMode = false + + @Option(names = Array("-d", "--driver"), + description = Array("Database driver name")) + var databaseDriver = "" + + @Option(names = Array("-c", "--config"), + description = Array("Database config file path."), + required = true) + var configFilePath = "" + + @Option(names = Array("-t", "--table"), + description = Array("Table to run query on. Required if using Spark.")) + var table = "" + + @Option(names = Array("-q", "--query"), + description = Array("SQL query to execute."), + arity = "0..1", + interactive = true, + required = true) + var sqlQuery = "" + + @Option(names = Array("-h", "--help"), + description = Array("Display a help message."), + usageHelp = true) + var helpRequested = false +} diff --git a/src/main/scala/io/cli/CommandLineParser.scala b/src/main/scala/io/cli/CommandLineParser.scala new file mode 100644 index 0000000..b2d9280 --- /dev/null +++ b/src/main/scala/io/cli/CommandLineParser.scala @@ -0,0 +1,24 @@ +package io.cli +import picocli.CommandLine + + +class CommandLineParser { + + def parseCommandLineParameters(args: Array[String]): CommandLineOptions = { + if (args.isEmpty) { + CommandLine.usage(new CommandLineOptions, System.out) + System.exit(0) + } + + val commandLineOptions = new CommandLineOptions() + new CommandLine(commandLineOptions).parse(args:_*) + + if (commandLineOptions.helpRequested) { + CommandLine.usage(new CommandLineOptions, System.out) + System.exit(0) + } + + commandLineOptions + } + +} diff --git a/src/main/scala/io/parser/PropertiesParser.scala b/src/main/scala/io/parser/PropertiesParser.scala new file mode 100644 index 0000000..f8e8793 --- /dev/null +++ b/src/main/scala/io/parser/PropertiesParser.scala @@ -0,0 +1,33 @@ +package io.parser + +import core.DatabaseProperties + +import scala.io.Source + +object PropertiesParser { + + def readPropertiesFile(propertiesFileName: String): DatabaseProperties = { + val lines = Source.fromFile(propertiesFileName).getLines.toList + val split = for (line <- lines) yield line.split(":") + + // TODO Moms' Spaghetti Bolognese + var host = split.head(1) + ":" + split.head(2) + ":" + split.head(3) + .filterNot((x: Char) => x.equals('\"')) + host = host.replaceAll("\\s", "") + val port = split(1)(1) + .filterNot((x: Char) => x.equals('\"')) + .filterNot((x: Char) => x.isWhitespace) + val name = split(2)(1) + .filterNot((x: Char) => x.equals('\"')) + .filterNot((x: Char) => x.isWhitespace) + val password = split(3)(1) + .filterNot((x: Char) => x.equals('\"')) + .filterNot((x: Char) => x.isWhitespace) + val user = split(4)(1) + .filterNot((x: Char) => x.equals('\"')) + .filterNot((x: Char) => x.isWhitespace) + + DatabaseProperties(host, user, password, port, name) + } + +} \ No newline at end of file diff --git a/src/test/scala/example/CubeCalculatorTest.scala b/src/test/scala/example/CubeCalculatorTest.scala new file mode 100644 index 0000000..7b39e8b --- /dev/null +++ b/src/test/scala/example/CubeCalculatorTest.scala @@ -0,0 +1,9 @@ +package example + +import org.scalatest.FunSuite + +class CubeCalculatorTest extends FunSuite { + test("CubeCalculator.cube") { + assert(CubeCalculator.cube(3) === 27) + } +}