diff --git a/.gitignore b/.gitignore index 819dc185..079be671 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ project/plugins/project/ # Scala-IDE specific .scala_dependencies .worksheet +.bsp + +src/main/scala/com/* \ No newline at end of file diff --git a/build.sbt b/build.sbt index 3b67fd6c..a566c49e 100644 --- a/build.sbt +++ b/build.sbt @@ -123,12 +123,37 @@ libraryDependencies ++= Seq( ) libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", - "com.holdenkarau" %% "spark-testing-base" % s"3.5.1_1.5.3" % "test" intransitive(), - "org.scala-lang" % "scala-library" % scalaVersion.value % "compile" + "org.apache.spark" %% "spark-core" % sparkVersion, + "org.apache.spark" %% "spark-sql" % sparkVersion, + "org.apache.spark" %% "spark-mllib" % sparkVersion , + "com.holdenkarau" %% "spark-testing-base" % s"3.5.0_1.4.7" % "test" intransitive(), + "org.scala-lang" % "scala-library" % scalaVersion.value ) +// https://mvnrepository.com/artifact/software.amazon.awssdk/s3 +libraryDependencies += "software.amazon.awssdk" % "s3" % "2.25.23" + +libraryDependencies += "com.upplication" % "s3fs" % "2.2.2" + +// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 +libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.1" + + +// https://mvnrepository.com/artifact/org.apache.spark/spark-hive +libraryDependencies += "org.apache.spark" %% "spark-hive" % "3.5.1" + +// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming +libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" + +// https://mvnrepository.com/artifact/org.scala-lang/scala-reflect +libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.12.19" + +// https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging +libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" + +// https://mvnrepository.com/artifact/org.typelevel/cats-kernel +libraryDependencies += "org.typelevel" %% "cats-kernel" % "2.10.0" + +libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.4.0" // Read version in code from build.sbt lazy val root = (project in file(".")). diff --git a/scalastyle-config.xml b/scalastyle-config.xml deleted file mode 100644 index 355e9642..00000000 --- a/scalastyle-config.xml +++ /dev/null @@ -1,298 +0,0 @@ - - - - - Scalastyle standard configuration - - - - - - - - - - - - - - - - - - - - - - - - true - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW - - - - - - ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, CATCH, FINALLY, LARROW, RARROW - - - - - - - - - ^FunSuite[A-Za-z]*$ - Tests must extend org.apache.spark.SparkFunSuite instead. - - - - - ^println$ - - - - - @VisibleForTesting - - - - - Runtime\.getRuntime\.addShutdownHook - - - - - mutable\.SynchronizedBuffer - - - - - Class\.forName - - - - - - JavaConversions - Instead of importing implicits in scala.collection.JavaConversions._, import - scala.collection.JavaConverters._ and use .asScala / .asJava methods - - - - - java,scala,3rdParty,spark - javax?\..* - scala\..* - - - - - - COMMA - - - - - - \)\{ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 800> - - - - - 30 - - - - - 10 - - - - - 50 - - - - - - - - - - - -1,0,1,2,3 - - - diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 40728f25..6df16b07 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -25,6 +25,9 @@ lucenerdd { // Use 'disk' to store the index in Java's temp directory // Otherwise the index will be stored in memory // Do not use memory, see http://lucene.apache.org/core/7_5_0/core/org/apache/lucene/store/RAMDirectory.html + // store.mode = "s3" + // store.s3.index.bucket = "lucene-kashyap" + // store.s3.taxonomy.bucket = "lucene-kashyap-taxonomy" store.mode = "disk" store.mode = ${?LUCENERDD_INDEX_STORE_MODE} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala index 0813e398..b6acb007 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala @@ -393,11 +393,12 @@ object LuceneRDD extends Versionable queryAnalyzer: String, similarity: String, indexAnalyzerPerField: Map[String, String], - queryAnalyzerPerField: Map[String, String]) + queryAnalyzerPerField: Map[String, String], + isReadOnly: Boolean = false) (implicit conv: T => Document): LuceneRDD[T] = { val partitions = elems.mapPartitionsWithIndex[AbstractLuceneRDDPartition[T]]( (partId, iter) => Iterator(LuceneRDDPartition(iter, partId, indexAnalyzer, queryAnalyzer, - similarity, indexAnalyzerPerField, queryAnalyzerPerField)), + similarity, indexAnalyzerPerField, queryAnalyzerPerField, isReadOnly)), preservesPartitioning = true) new LuceneRDD[T](partitions, indexAnalyzer, queryAnalyzer, indexAnalyzerPerField, queryAnalyzerPerField, similarity) @@ -496,6 +497,24 @@ object LuceneRDD extends Versionable Map.empty[String, String]) } + /** + * Constructor with default index, query analyzers and Lucene similarity and isReadOnly preference + * + * @param dataFrame Input DataFrame + * @return + */ + def apply(dataFrame: DataFrame, isReadOnly: Boolean) + : LuceneRDD[Row] = { + apply[Row](dataFrame.rdd, + getOrElseEn(IndexAnalyzerConfigName), + getOrElseEn(QueryAnalyzerConfigName), + getOrElseClassic(), + Map.empty[String, String], + Map.empty[String, String], + isReadOnly + ) + } + /** * Entity linkage between two [[DataFrame]] by blocking / filtering * on one or more columns. @@ -551,7 +570,8 @@ object LuceneRDD extends Versionable luceneRDDParams.queryAnalyzer, luceneRDDParams.similarity, luceneRDDParams.indexAnalyzerPerField, - luceneRDDParams.queryAnalyzerPerField) + luceneRDDParams.queryAnalyzerPerField, + luceneRDDParams.isReadOnly) // Multi-query lucene index qs.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray)) @@ -603,7 +623,8 @@ object LuceneRDD extends Versionable luceneRDDParams.queryAnalyzer, luceneRDDParams.similarity, luceneRDDParams.indexAnalyzerPerField, - luceneRDDParams.queryAnalyzerPerField) + luceneRDDParams.queryAnalyzerPerField, + luceneRDDParams.isReadOnly) // Multi-query lucene index iterQueries.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray)) diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/config/LuceneRDDParams.scala b/src/main/scala/org/zouzias/spark/lucenerdd/config/LuceneRDDParams.scala index 06925878..7e7e1e84 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/config/LuceneRDDParams.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/config/LuceneRDDParams.scala @@ -31,7 +31,8 @@ case class LuceneRDDParams(indexAnalyzer: String, queryAnalyzer: String, similarity: String, indexAnalyzerPerField: Map[String, String], - queryAnalyzerPerField: Map[String, String]) extends Serializable + queryAnalyzerPerField: Map[String, String], + isReadOnly: Boolean) extends Serializable object LuceneRDDParams extends AnalyzerConfigurable with SimilarityConfigurable { @@ -40,6 +41,7 @@ object LuceneRDDParams extends AnalyzerConfigurable with SimilarityConfigurable getOrElseEn(QueryAnalyzerConfigName), getOrElseClassic(), Map.empty[String, String], - Map.empty[String, String]) + Map.empty[String, String], + false) } } diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala index bbbf8906..62b213e7 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala @@ -59,7 +59,8 @@ private[lucenerdd] class LuceneRDDPartition[T] private val queryAnalyzerName: String, private val similarityName: String, private val indexAnalyzerPerField: Map[String, String], - private val queryAnalyzerPerField: Map[String, String]) + private val queryAnalyzerPerField: Map[String, String], + private val isReadOnly: Boolean = false) (implicit docConversion: T => Document, override implicit val kTag: ClassTag[T]) extends AbstractLuceneRDDPartition[T] @@ -86,21 +87,23 @@ private[lucenerdd] class LuceneRDDPartition[T] private val (iterOriginal, iterIndex) = iter.duplicate - private val startTime = new DateTime(System.currentTimeMillis()) - logInfo(s"[partId=${partitionId}]Indexing process initiated at ${startTime}...") - iterIndex.foreach { case elem => - // (implicitly) convert type T to Lucene document - val doc = docConversion(elem) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + if(!isReadOnly) { + val startTime = new DateTime(System.currentTimeMillis()) + logInfo(s"[partId=${partitionId}]Indexing process initiated at ${startTime}...") + iterIndex.foreach { case elem => + // (implicitly) convert type T to Lucene document + val doc = docConversion(elem) + indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + } + val endTime = new DateTime(System.currentTimeMillis()) + logInfo(s"[partId=${partitionId}]Indexing process completed at ${endTime}...") + logInfo(s"[partId=${partitionId}]Indexing process took ${(endTime.getMillis + - startTime.getMillis) / 1000} seconds...") + + // Close the indexWriter and taxonomyWriter (for faceted search) + closeAllWriters() + logDebug(s"[partId=${partitionId}]Closing index writers...") } - private val endTime = new DateTime(System.currentTimeMillis()) - logInfo(s"[partId=${partitionId}]Indexing process completed at ${endTime}...") - logInfo(s"[partId=${partitionId}]Indexing process took ${(endTime.getMillis - - startTime.getMillis) / 1000} seconds...") - - // Close the indexWriter and taxonomyWriter (for faceted search) - closeAllWriters() - logDebug(s"[partId=${partitionId}]Closing index writers...") logDebug(s"[partId=${partitionId}]Instantiating index/facet readers") private val indexReader = DirectoryReader.open(IndexDir) @@ -282,11 +285,12 @@ object LuceneRDDPartition { queryAnalyzerName: String, similarityName: String, indexAnalyzerPerField: Map[String, String] = Map.empty, - queryAnalyzerPerField: Map[String, String] = Map.empty) + queryAnalyzerPerField: Map[String, String] = Map.empty, + isReadOnly: Boolean = false) (implicit docConversion: T => Document) : LuceneRDDPartition[T] = { new LuceneRDDPartition[T](iter, partitionId, indexAnalyzerName, queryAnalyzerName, similarityName, indexAnalyzerPerField, - queryAnalyzerPerField)(docConversion, classTag[T]) + queryAnalyzerPerField, isReadOnly)(docConversion, classTag[T]) } } diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala index 766ef71d..621d7f44 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala @@ -16,13 +16,18 @@ */ package org.zouzias.spark.lucenerdd.store -import java.nio.file.{Files, Path} +import com.erudika.lucene.store.s3.{S3Directory, S3FileSystemStore} +import com.upplication.s3fs.S3FileSystem +import java.nio.file.{FileSystems, Files, Path} import org.apache.lucene.facet.FacetsConfig import org.apache.lucene.store._ import org.zouzias.spark.lucenerdd.config.Configurable import org.apache.spark.internal.Logging +import java.net.URI +import java.util + /** * Storage of a Lucene index Directory * @@ -39,17 +44,59 @@ trait IndexStorable extends Configurable private val IndexStoreKey = "lucenerdd.index.store.mode" - private val tmpJavaDir = System.getProperty("java.io.tmpdir") - private val indexDirName = s"indexDirectory.${System.currentTimeMillis()}.${Thread.currentThread().getId}" - private val indexDir = Files.createTempDirectory(indexDirName) + + private var indexDir = Files.createTempDirectory(indexDirName) + if (Config.hasPath(IndexStoreKey)) { + val storageMode = Config.getString(IndexStoreKey) + + storageMode match { + case "disk" => { + val tmpJavaDir = System.getProperty("java.io.tmpdir") + logInfo(s"Config parameter ${IndexStoreKey} is set to 'disk'") + logInfo("Lucene index will be storage in disk") + logInfo(s"Index disk location ${tmpJavaDir}") + indexDir = Files.createTempDirectory(indexDirName) + } + + case "s3" => { + val indexS3FileSystem = S3FileSystemStore.getS3FileSystem + val bucketName = Config.getString("lucenerdd.index.store.s3.index.bucket") + logInfo(s"Config parameter ${IndexStoreKey} is set to 'S3'") + logInfo("Lucene index will be storage in S3") + logInfo(s"Index S3 Bucket location ${bucketName}") + + indexDir = indexS3FileSystem.getPath(bucketName) + } + } + } private val taxonomyDirName = - s"taxonomyDirectory-${System.currentTimeMillis()}.${Thread.currentThread().getId}" + s"taxonomyDirectory-${System.currentTimeMillis()}.${Thread.currentThread().getId}" + private var taxonomyDir = Files.createTempDirectory(taxonomyDirName) + + if (Config.hasPath(IndexStoreKey)) { + val storageMode = Config.getString(IndexStoreKey) + storageMode match { + case "disk" => { + logInfo(s"Config parameter ${IndexStoreKey} is set to 'disk'") + logInfo("Lucene index will be storage in disk") + logInfo(s"Index disk location ${taxonomyDirName}") + taxonomyDir = Files.createTempDirectory(taxonomyDirName) + } - private val taxonomyDir = Files.createTempDirectory(taxonomyDirName) + case "s3" => { + val taxonomyS3FileSystem = S3FileSystemStore.getTaxonomyS3FileSystem + val bucketName = Config.getString("lucenerdd.index.store.s3.taxonomy.bucket") + logInfo(s"Config parameter ${IndexStoreKey} is set to 'S3'") + logInfo("Lucene taxonomy will be storage in S3") + logInfo(s"Taxonomy S3 Bucket location ${bucketName}") + taxonomyDir = taxonomyS3FileSystem.getPath(bucketName) + } + } + } protected val IndexDir = storageMode(indexDir) @@ -70,10 +117,15 @@ trait IndexStorable extends Configurable case "disk" => { logInfo(s"Config parameter ${IndexStoreKey} is set to 'disk'") logInfo("Lucene index will be storage in disk") - logInfo(s"Index disk location ${tmpJavaDir}") // directoryPath.toFile.deleteOnExit() // Delete on exit new MMapDirectory(directoryPath, new SingleInstanceLockFactory) } + case "s3" => + logInfo(s"Config parameter ${IndexStoreKey} is set to 's3'") + logInfo(s"Bucket Name: ${directoryPath.toString}") + val bucket: String = directoryPath.getFileSystem.asInstanceOf[S3FileSystem].getKey + val path: String = directoryPath.toString + new S3Directory(bucket, path) case ow => logInfo(s"Config parameter ${IndexStoreKey} is set to ${ow}") logInfo("Lucene index will be storage in memory (default)") diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/versioning/Versionable.scala b/src/main/scala/org/zouzias/spark/lucenerdd/versioning/Versionable.scala index 1ff18a4d..989a635a 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/versioning/Versionable.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/versioning/Versionable.scala @@ -27,6 +27,7 @@ trait Versionable { */ def version(): Map[String, Any] = { // BuildInfo is automatically generated using sbt plugin `sbt-buildinfo` - org.zouzias.spark.lucenerdd.BuildInfo.toMap +// org.zouzias.spark.lucenerdd.BuildInfo.toMap + Map("version" -> "0.0.1") } }