diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala index 0f7a6e54..06cb51cd 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDDKryoRegistrator.scala @@ -20,8 +20,7 @@ import com.twitter.algebird.TopK import com.twitter.chill.Kryo import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD -import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkScoreDoc} import org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition import org.zouzias.spark.lucenerdd.response.{LuceneRDDResponse, LuceneRDDResponsePartition} import org.zouzias.spark.lucenerdd.testing.{FavoriteCaseClass, Person} @@ -30,7 +29,6 @@ class LuceneRDDKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[LuceneRDD[_]]) kryo.register(classOf[LuceneRDDPartition[_]]) - kryo.register(classOf[FacetedLuceneRDD[_]]) kryo.register(classOf[SparkDoc]) kryo.register(classOf[Number]) kryo.register(classOf[java.lang.Double]) @@ -57,7 +55,6 @@ class LuceneRDDKryoRegistrator extends KryoRegistrator { kryo.register(classOf[scala.collection.immutable.Set$EmptySet$]) kryo.register(classOf[scala.collection.immutable.Map[_, _]]) kryo.register(classOf[Array[scala.collection.immutable.Map[_, _]]]) - kryo.register(classOf[SparkFacetResult]) kryo.register(classOf[SparkScoreDoc]) kryo.register(classOf[LuceneRDDResponse]) kryo.register(classOf[LuceneRDDResponsePartition]) diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala b/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala deleted file mode 100644 index 3de0cb7e..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/aggregate/SparkFacetResultMonoid.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.aggregate - -import com.twitter.algebird.MapMonoid -import org.zouzias.spark.lucenerdd.models.SparkFacetResult - -/** - * Monoid used to aggregate faceted results [[SparkFacetResult]] - * from the executors to the driver - */ -object SparkFacetResultMonoid extends Serializable { - - private lazy val facetMonoid = new MapMonoid[String, Long]() - - def zero(facetName: String): SparkFacetResult = SparkFacetResult(facetName, facetMonoid.zero) - - def plus(l: SparkFacetResult, r: SparkFacetResult): SparkFacetResult = { - require(l.facetName == r.facetName) // Check if summing same facets - SparkFacetResult(l.facetName, facetMonoid.plus(l.facets, r.facets)) - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala b/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala deleted file mode 100644 index e2d6cf2a..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/facets/package.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd - -import org.apache.lucene.document._ -import org.apache.lucene.facet.FacetField -import org.apache.spark.sql.Row - -import scala.reflect.ClassTag - -/** - * Contains implicit conversion to [[org.apache.lucene.document.Document]] - * which prepares the index for faceted search as well. - */ -package object facets { - - private val Stored = Field.Store.YES - private val DefaultFieldName = "_1" - - /** - * Adds extra field on index with suffix [[FacetedLuceneRDD.FacetTextFieldSuffix]] - * This fiels is used on faceted queries - * - * @param doc Input document - * @param fieldName Field name - * @param fieldValue Field value to be indexed - */ - private def addTextFacetField(doc: Document, fieldName: String, fieldValue: String): Unit = { - if ( fieldValue.nonEmpty) { // Issues with empty strings on facets - doc.add(new FacetField(s"${fieldName}${FacetedLuceneRDD.FacetTextFieldSuffix}", - fieldValue)) - } - } - - implicit def intToDocument(v: Int): Document = { - val doc = new Document - doc.add(new IntPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def longToDocument(v: Long): Document = { - val doc = new Document - doc.add(new LongPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def doubleToDocument(v: Double): Document = { - val doc = new Document - doc.add(new DoublePoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def floatToDocument(v: Float): Document = { - val doc = new Document - doc.add(new FloatPoint(DefaultFieldName, v)) - addTextFacetField(doc, DefaultFieldName, v.toString) - doc - } - - implicit def stringToDocument(s: String): Document = { - val doc = new Document - doc.add(new TextField(DefaultFieldName, s, Stored)) - addTextFacetField(doc, DefaultFieldName, s) - doc - } - - private def tupleTypeToDocument[T: ClassTag](doc: Document, index: Int, s: T): Document = { - typeToDocument(doc, s"_${index}", s) - } - - def typeToDocument[T: ClassTag](doc: Document, fName: String, s: T): Document = { - s match { - case x: String => - doc.add(new TextField(fName, x, Stored)) - addTextFacetField(doc, fName, x) - case x: Long => - doc.add(new LongPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new NumericDocValuesField(s"${fName} ${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - case x: Int => - doc.add(new IntPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new NumericDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x.toLong)) - case x: Float => - doc.add(new FloatPoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new FloatDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - case x: Double => - doc.add(new DoublePoint(fName, x)) - doc.add(new StoredField(fName, x)) - doc.add(new DoubleDocValuesField(s"${fName}${FacetedLuceneRDD.FacetNumericFieldSuffix}", - x)) - } - doc - } - - implicit def iterablePrimitiveToDocument[T: ClassTag](iter: Iterable[T]): Document = { - val doc = new Document - iter.foreach( item => tupleTypeToDocument(doc, 1, item)) - doc - } - - implicit def mapToDocument[T: ClassTag](map: Map[String, T]): Document = { - val doc = new Document - map.foreach{ case (key, value) => - typeToDocument(doc, key, value) - } - doc - } - - /** - * Implicit conversion for all product types, such as case classes and Tuples - * @param s - * @tparam T - * @return - */ - implicit def productTypeToDocument[T <: Product : ClassTag](s: T): Document = { - val doc = new Document - - val fieldNames = s.getClass.getDeclaredFields.map(_.getName).toIterator - val fieldValues = s.productIterator - fieldValues.zip(fieldNames).foreach{ case (elem, fieldName) => - typeToDocument(doc, fieldName, elem) - } - - doc - } - - /** - * Implicit conversion for Spark Row: used for DataFrame - * @param row - * @return - */ - implicit def sparkRowToDocument(row: Row): Document = { - val doc = new Document - - val fieldNames = row.schema.fieldNames - fieldNames.foreach{ case fieldName => - val index = row.fieldIndex(fieldName) - typeToDocument(doc, fieldName, row.get(index)) - } - - doc - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala b/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala deleted file mode 100644 index 8d953b69..00000000 --- a/src/main/scala/org/zouzias/spark/lucenerdd/models/SparkFacetResult.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.models - -import org.apache.lucene.facet.FacetResult - -case class SparkFacetResult(facetName: String, facets: Map[String, Long]) { - - /** - * Return facet counts sorted descending - * @return Sequence of (facet value, facet counts) - */ - def sortedFacets(): Seq[(String, Long)] = { - facets.toSeq.sortBy[Long](x => -x._2) - } -} - - -object SparkFacetResult extends Serializable { - - /** - * Convert [[org.apache.lucene.facet.FacetResult]] - * to [[org.zouzias.spark.lucenerdd.models.SparkFacetResult]] - * - * @param facetName name of facet - * @param facetResult input facet results - * @return - */ - def apply(facetName: String, facetResult: FacetResult): SparkFacetResult = { - val facetResultOpt = Option(facetResult) - facetResultOpt match { - case Some(fctResult) => - val map = fctResult.labelValues - .map(labelValue => (labelValue.label, labelValue.value.longValue())) - .toMap[String, Long] - SparkFacetResult(facetName, map) - case _ => SparkFacetResult(facetName, Map.empty[String, Long]) - } - } -} diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala index 3a43c91d..fd0652de 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala @@ -18,7 +18,7 @@ package org.zouzias.spark.lucenerdd.partition import org.apache.lucene.search.{BooleanClause, Query} import org.zouzias.spark.lucenerdd.models.indexstats.IndexStatistics -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry} +import org.zouzias.spark.lucenerdd.models.TermVectorEntry import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition import scala.reflect.ClassTag @@ -81,15 +81,6 @@ private[lucenerdd] abstract class AbstractLuceneRDDPartition[T] extends Serializ def queries(searchString: Iterable[String], topK: Int) : Iterable[(String, LuceneRDDResponsePartition)] - /** - * Generic Lucene faceted Query using QueryParser - * @param searchString Lucene query string, i.e., textField:hello* - * @param topK Number of facets to return - * @return - */ - def facetQuery(searchString: String, facetField: String, topK: Int) - : SparkFacetResult - /** * Term Query * @param fieldName Name of field diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala index 8becb27a..c2b965ab 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala @@ -19,18 +19,15 @@ package org.zouzias.spark.lucenerdd.partition import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper import org.apache.lucene.document._ -import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader import org.apache.lucene.index.{DirectoryReader, IndexReader} import org.apache.lucene.search._ import org.joda.time.DateTime -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD import org.zouzias.spark.lucenerdd.models.indexstats.{FieldStatistics, IndexStatistics} -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry} +import org.zouzias.spark.lucenerdd.models.TermVectorEntry import org.zouzias.spark.lucenerdd.query.{LuceneQueryHelpers, SimilarityConfigurable} import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter import org.zouzias.spark.lucenerdd.LuceneRDD -import scala.collection.JavaConverters._ +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.reflect.{ClassTag, _} import scala.collection.mutable.ArrayBuffer @@ -64,7 +61,7 @@ private[lucenerdd] class LuceneRDDPartition[T] (implicit docConversion: T => Document, override implicit val kTag: ClassTag[T]) extends AbstractLuceneRDDPartition[T] - with IndexWithTaxonomyWriter + with IndexWritable with SimilarityConfigurable { logInfo(s"[partId=${partitionId}] Partition is created...") @@ -92,7 +89,7 @@ private[lucenerdd] class LuceneRDDPartition[T] iterIndex.foreach { case elem => // (implicitly) convert type T to Lucene document val doc = docConversion(elem) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + indexWriter.addDocument(doc) } private val endTime = new DateTime(System.currentTimeMillis()) logInfo(s"[partId=${partitionId}]Indexing process completed at ${endTime}...") @@ -106,7 +103,6 @@ private[lucenerdd] class LuceneRDDPartition[T] logDebug(s"[partId=${partitionId}]Instantiating index/facet readers") private val indexReader = DirectoryReader.open(IndexDir) private lazy val indexSearcher = initializeIndexSearcher(indexReader) - private val taxoReader = new DirectoryTaxonomyReader(TaxonomyDir) logDebug(s"[partId=${partitionId}]Index readers instantiated successfully") logInfo(s"[partId=${partitionId}]Indexed ${size} documents") @@ -199,15 +195,6 @@ private[lucenerdd] class LuceneRDDPartition[T] LuceneRDDResponsePartition(results.toIterator) } - override def facetQuery(searchString: String, - facetField: String, - topK: Int): SparkFacetResult = { - LuceneQueryHelpers.facetedTextSearch(indexSearcher, taxoReader, FacetsConfig, - searchString, - facetField + FacetedLuceneRDD.FacetTextFieldSuffix, - topK, QueryAnalyzer) - } - override def moreLikeThis(fieldName: String, query: String, minTermFreq: Int, minDocFreq: Int, topK: Int) : LuceneRDDResponsePartition = { diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala b/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala index a998ef38..e957cdd9 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala @@ -22,15 +22,11 @@ import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.lucene.document.Document -import org.apache.lucene.facet.{FacetsCollector, FacetsConfig} -import org.apache.lucene.facet.sortedset.SortedSetDocValuesFacetCounts -import org.apache.lucene.facet.taxonomy.{FastTaxonomyFacetCounts, TaxonomyReader} import org.apache.lucene.index.Term import org.apache.lucene.queries.mlt.MoreLikeThis import org.apache.lucene.queryparser.classic.QueryParser import org.apache.lucene.search._ -import org.zouzias.spark.lucenerdd.aggregate.SparkFacetResultMonoid -import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.SparkScoreDoc import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -126,38 +122,6 @@ object LuceneQueryHelpers extends Serializable { indexSearcher.search(query, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _)) } - /** - * Faceted search using [[SortedSetDocValuesFacetCounts]] - * - * @param indexSearcher Index searcher - * @param taxoReader taxonomy reader used for faceted search - * @param searchString Lucene search query string - * @param facetField Facet field name - * @param topK Number of returned documents - * @return - */ - def facetedTextSearch(indexSearcher: IndexSearcher, - taxoReader: TaxonomyReader, - facetsConfig: FacetsConfig, - searchString: String, - facetField: String, - topK: Int, analyzer: Analyzer): SparkFacetResult = { - // Prepare the query - val queryParser = new QueryParser(QueryParserDefaultField, analyzer) - val q: Query = queryParser.parse(searchString) - - // Collect the facets - val fc = new FacetsCollector() - FacetsCollector.search(indexSearcher, q, topK, fc) - val facets = Option(new FastTaxonomyFacetCounts(taxoReader, facetsConfig, fc)) - - // Present the facets - facets match { - case Some(fcts) => SparkFacetResult(facetField, fcts.getTopChildren(topK, facetField)) - case None => SparkFacetResultMonoid.zero(facetField) - } - } - /** * Returns total number of lucene documents * diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala index eb2b5269..420c91eb 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/ShapeLuceneRDDKryoRegistrator.scala @@ -22,7 +22,7 @@ import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ -import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkFacetResult, SparkScoreDoc} +import org.zouzias.spark.lucenerdd.models.{SparkDoc, SparkScoreDoc} import org.zouzias.spark.lucenerdd.spatial.shape.partition.ShapeLuceneRDDPartition @@ -73,7 +73,6 @@ class ShapeLuceneRDDKryoRegistrator extends KryoRegistrator { kryo.register(classOf[scala.collection.immutable.Set$EmptySet$]) kryo.register(classOf[scala.collection.immutable.Map[_, _]]) kryo.register(classOf[Array[scala.collection.immutable.Map[_, _]]]) - kryo.register(classOf[SparkFacetResult]) kryo.register(classOf[SparkScoreDoc]) kryo.register(classOf[TopK[_]]) diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala index 7d06f537..8a8d604d 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/spatial/shape/partition/ShapeLuceneRDDPartition.scala @@ -30,8 +30,7 @@ import org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition import org.zouzias.spark.lucenerdd.spatial.shape.ShapeLuceneRDD.PointType import org.zouzias.spark.lucenerdd.spatial.shape.strategies.SpatialStrategy -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter -import scala.collection.JavaConverters._ +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.reflect._ @@ -46,7 +45,7 @@ private[shape] class ShapeLuceneRDDPartition[K, V] (implicit shapeConversion: K => Shape, docConversion: V => Document) extends AbstractShapeLuceneRDDPartition[K, V] - with IndexWithTaxonomyWriter + with IndexWritable with SpatialStrategy { override def indexAnalyzer(): Analyzer = getAnalyzer(Some(indexAnalyzerName)) @@ -83,7 +82,7 @@ private[shape] class ShapeLuceneRDDPartition[K, V] val doc = docConversion(value) val shape = shapeConversion(key) val docWithLocation = decorateWithLocation(doc, Seq(shape)) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, docWithLocation)) + indexWriter.addDocument(docWithLocation) } private val endTime = new DateTime(System.currentTimeMillis()) logInfo(s"Indexing process completed at ${endTime}...") diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala index c7e4f292..f8062b50 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexStorable.scala @@ -18,7 +18,6 @@ package org.zouzias.spark.lucenerdd.store import java.nio.file.{Files, Path} -import org.apache.lucene.facet.FacetsConfig import org.apache.lucene.store._ import org.zouzias.spark.lucenerdd.config.Configurable import org.zouzias.spark.lucenerdd.logging.Logging @@ -35,8 +34,6 @@ trait IndexStorable extends Configurable with AutoCloseable with Logging { - protected lazy val FacetsConfig = new FacetsConfig() - private val IndexStoreKey = "lucenerdd.index.store.mode" private val tmpJavaDir = System.getProperty("java.io.tmpdir") @@ -46,14 +43,7 @@ trait IndexStorable extends Configurable private val indexDir = Files.createTempDirectory(indexDirName) - private val taxonomyDirName = - s"taxonomyDirectory-${System.currentTimeMillis()}.${Thread.currentThread().getId}" - - private val taxonomyDir = Files.createTempDirectory(taxonomyDirName) - - protected val IndexDir = storageMode(indexDir) - - protected val TaxonomyDir = storageMode(taxonomyDir) + protected val IndexDir: Directory = storageMode(indexDir) /** * Select Lucene index storage implementation based on config @@ -110,6 +100,5 @@ trait IndexStorable extends Configurable override def close(): Unit = { IndexDir.close() - TaxonomyDir.close() } } diff --git a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala similarity index 87% rename from src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala rename to src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala index 0a6674e0..b6b8883e 100644 --- a/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWithTaxonomyWriter.scala +++ b/src/main/scala/org/zouzias/spark/lucenerdd/store/IndexWritable.scala @@ -24,9 +24,9 @@ import org.apache.lucene.index.{IndexWriter, IndexWriterConfig} import org.zouzias.spark.lucenerdd.analyzers.AnalyzerConfigurable /** - * Index and Taxonomy Writer used for facet queries + * Index writer */ -trait IndexWithTaxonomyWriter extends IndexStorable +trait IndexWritable extends IndexStorable with AnalyzerConfigurable { protected def indexAnalyzer(): Analyzer @@ -37,12 +37,8 @@ trait IndexWithTaxonomyWriter extends IndexStorable new IndexWriterConfig(indexPerFieldAnalyzer()) .setOpenMode(OpenMode.CREATE)) - protected lazy val taxoWriter = new DirectoryTaxonomyWriter(TaxonomyDir) - protected def closeAllWriters(): Unit = { indexWriter.commit() - taxoWriter.commit() - taxoWriter.close() indexWriter.close() } } diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala deleted file mode 100644 index b26fe3f5..00000000 --- a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDFacetSpec.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.facets - -import com.holdenkarau.spark.testing.SharedSparkContext -import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.{LuceneRDD, LuceneRDDKryoRegistrator} - -class FacetedLuceneRDDFacetSpec extends FlatSpec - with Matchers - with BeforeAndAfterEach - with SharedSparkContext { - - override val conf = LuceneRDDKryoRegistrator.registerKryoClasses(new SparkConf(). - setMaster("local[*]"). - setAppName("test"). - set("spark.ui.enabled", "false"). - set("spark.app.id", appID)) - - // Check if sequence is sorted in descending order - def sortedDesc(seq : Seq[Long]) : Boolean = { - if (seq.isEmpty) true else seq.zip(seq.tail).forall(x => x._1 >= x._2) - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.get("aaa") - .foreach(value => value should equal (4)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly with ints" in { - val words = Array(10, 10, 10, 10, 22, 22, 22, 33, 33) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("10") should equal (true) - facetResults.facets.contains("22") should equal (true) - facetResults.facets.contains("33") should equal (true) - facetResults.facets.get("10").foreach(value => value should equal (4)) - facetResults.facets.get("33").foreach(value => value should equal (2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets correctly with doubles" in { - val words = Array(10.5D, 10.5D, 10.5D, 10.5D, 22.2D, 22.2D, 22.2D, 33.2D, 33.2D) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val facetResults = luceneRDD.facetQuery("*:*", "_1")._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("10.5") should equal (true) - facetResults.facets.contains("22.2") should equal (true) - facetResults.facets.contains("33.2") should equal (true) - facetResults.facets.get("10.5").foreach(value => value should equal (4)) - facetResults.facets.get("33.2").foreach(value => value should equal (2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQueries" should "compute facets correctly" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - - val facetResults = luceneRDD.facetQueries("*:*", Seq("_1"))._2 - - facetResults.contains("_1") should equal(true) - facetResults.foreach(_._2.facets.size should equal (3)) - facetResults.foreach(_._2.facets.contains("aaa") should equal (true)) - facetResults.foreach(_._2.facets.get("aaa").foreach(value => value should equal (4))) - - luceneRDD.close() - } - - "FacetedLuceneRDD.sortedFacets" should "return facets sorted by decreasing order" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - - val sortedFacetCounts = luceneRDD.facetQuery("*:*", "_1")._2.sortedFacets().map(_._2) - sortedDesc(sortedFacetCounts) should equal(true) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with prefix search" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aa*", "_1") - val facetResults = results._2 - - facetResults.facets.size should equal (1) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.get("aaa") - .foreach(value => value should equal (4)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with term search" in { - val words = Array("aaa", "aaa", "aaa", "aaa", "aaaa", "bb", "bb", "bb", "cc", "cc") - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aaa", "_1") - val facetResults = results._2 - - facetResults.facets.size should equal (1) - facetResults.facets.contains("aaa") should equal (true) - facetResults.facets.contains("bb") should equal (false) - facetResults.facets.contains("cc") should equal (false) - facetResults.facets.get("aaa") should equal (Some(4)) - - val resultsB = luceneRDD.facetQuery("_1:bb", "_1") - val facetResultsB = resultsB._2 - - facetResultsB.facets.contains("bb") should equal (true) - facetResultsB.facets.get("bb") should equal (Some(3)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.facetQuery" should "compute facets with term search in Tuple2" in { - val words = Array(("aaa", "aaa1"), ("aaa", "aaa2"), ("aaa", "aaa3"), ("aaa", "aaa3"), - ("aaaa", "aaa3"), ("bb", "cc1"), ("bb", "cc1"), ("bb", "cc1"), ("cc", "cc2"), ("cc", "cc2")) - val rdd = sc.parallelize(words) - val luceneRDD = FacetedLuceneRDD(rdd) - val results = luceneRDD.facetQuery("_1:aaa", "_2") - val facetResults = results._2 - - facetResults.facets.size should equal (3) - facetResults.facets.contains("aaa1") should equal (true) - facetResults.facets.contains("aaa2") should equal (true) - facetResults.facets.contains("aaa3") should equal (true) - facetResults.facets.get("aaa1") should equal (Some(1)) - facetResults.facets.get("aaa2") should equal (Some(1)) - facetResults.facets.get("aaa3") should equal (Some(2)) - - luceneRDD.close() - } - - "FacetedLuceneRDD.version" should "return project sbt build information" in { - val map = LuceneRDD.version() - map.contains("name") should equal(true) - map.contains("builtAtMillis") should equal(true) - map.contains("scalaVersion") should equal(true) - map.contains("version") should equal(true) - map.contains("sbtVersion") should equal(true) - map.contains("builtAtString") should equal(true) - } - -} diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala deleted file mode 100644 index e4348292..00000000 --- a/src/test/scala/org/zouzias/spark/lucenerdd/facets/FacetedLuceneRDDImplicitsSpec.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.zouzias.spark.lucenerdd.facets - -import com.holdenkarau.spark.testing.SharedSparkContext -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.testing.FavoriteCaseClass -import org.zouzias.spark.lucenerdd.{LuceneRDD, LuceneRDDKryoRegistrator} - -class FacetedLuceneRDDImplicitsSpec extends FlatSpec - with Matchers - with BeforeAndAfterEach - with SharedSparkContext { - - var luceneRDD: LuceneRDD[_] = _ - - - override val conf = LuceneRDDKryoRegistrator.registerKryoClasses(new SparkConf(). - setMaster("local[*]"). - setAppName("test"). - set("spark.ui.enabled", "false"). - set("spark.app.id", appID)) - - override def afterEach() { - luceneRDD.close() - } - - - val elem = Array("fear", "death", "water", "fire", "house") - .zipWithIndex.map{ case (str, index) => - FavoriteCaseClass(str, index, 10L, 12.3F, s"${str}@gmail.com")} - - - "FacetedLuceneRDD(case class).count" should "return correct number of elements" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - luceneRDD.count should equal (elem.size) - } - - "FacetedLuceneRDD(case class).fields" should "return all fields" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - - luceneRDD.fields().size should equal(5) - luceneRDD.fields().contains("name") should equal(true) - luceneRDD.fields().contains("age") should equal(true) - luceneRDD.fields().contains("myLong") should equal(true) - luceneRDD.fields().contains("myFloat") should equal(true) - luceneRDD.fields().contains("email") should equal(true) - } - - "FacetedLuceneRDD(case class).termQuery" should "correctly search with TermQueries" in { - val rdd = sc.parallelize(elem) - val spark = SparkSession.builder().getOrCreate() - import spark.implicits._ - val df = rdd.toDF() - luceneRDD = FacetedLuceneRDD(df) - - val results = luceneRDD.termQuery("name", "water") - results.count() should equal(1) - } -} diff --git a/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala b/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala index ce201d99..bd854c7a 100644 --- a/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala +++ b/src/test/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpersSpec.scala @@ -20,24 +20,20 @@ import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper import org.apache.lucene.document.Field.Store import org.apache.lucene.document._ -import org.apache.lucene.facet.FacetField -import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader import org.apache.lucene.index.DirectoryReader import org.apache.lucene.search.IndexSearcher import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.zouzias.spark.lucenerdd.facets.FacetedLuceneRDD -import org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter -import scala.collection.JavaConverters._ +import org.zouzias.spark.lucenerdd.store.IndexWritable import scala.io.Source class LuceneQueryHelpersSpec extends FlatSpec - with IndexWithTaxonomyWriter + with IndexWritable with Matchers with BeforeAndAfterEach { // Load cities - val countries = Source.fromFile("src/test/resources/countries.txt").getLines() + val countries: Seq[String] = Source.fromFile("src/test/resources/countries.txt").getLines() .map(_.toLowerCase()).toSeq val indexAnalyzerPerField: Map[String, String] = Map("name" @@ -45,7 +41,7 @@ class LuceneQueryHelpersSpec extends FlatSpec private val MaxFacetValue: Int = 10 - override def indexAnalyzer: Analyzer = getAnalyzer(Some("en")) + override def indexAnalyzer(): Analyzer = getAnalyzer(Some("en")) override def indexPerFieldAnalyzer(): PerFieldAnalyzerWrapper = { val analyzerPerField: Map[String, Analyzer] = indexAnalyzerPerField @@ -55,27 +51,22 @@ class LuceneQueryHelpersSpec extends FlatSpec countries.zipWithIndex.foreach { case (elem, index) => val doc = convertToDoc(index % MaxFacetValue, elem) - indexWriter.addDocument(FacetsConfig.build(taxoWriter, doc)) + indexWriter.addDocument(doc) } indexWriter.commit() - taxoWriter.close() indexWriter.close() private val indexReader = DirectoryReader.open(IndexDir) private val indexSearcher = new IndexSearcher(indexReader) - private lazy val taxoReader = new DirectoryTaxonomyReader(TaxonomyDir) - private lazy val TestFacetName = s"_2${FacetedLuceneRDD.FacetTextFieldSuffix}" def convertToDoc(pos: Int, text: String): Document = { val doc = new Document() doc.add(new StringField("_1", text, Store.YES)) - doc.add(new FacetField(s"_1${FacetedLuceneRDD.FacetTextFieldSuffix}", text)) doc.add(new IntPoint("_2", pos)) doc.add(new StoredField("_2", pos)) - doc.add(new FacetField(TestFacetName, pos.toString)) doc } @@ -87,14 +78,6 @@ class LuceneQueryHelpersSpec extends FlatSpec LuceneQueryHelpers.totalDocs(indexSearcher) should equal (countries.size) } - "LuceneQueryHelpers.facetedTextSearch" should "return correct facet counts" in { - val facets = LuceneQueryHelpers.facetedTextSearch(indexSearcher, taxoReader, - FacetsConfig, "*:*", TestFacetName, 100, indexAnalyzer) - - facets.facetName should equal(TestFacetName) - facets.facets.size should equal(MaxFacetValue) - } - "LuceneQueryHelpers.termQuery" should "return correct documents" in { val greece = "greece" val topDocs = LuceneQueryHelpers.termQuery(indexSearcher, "_1", greece, 100)