Skip to content

Commit

Permalink
Fix naming
Browse files Browse the repository at this point in the history
  • Loading branch information
kaplanbar committed Aug 20, 2023
1 parent a01d8b5 commit 5c24d51
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
71 changes: 53 additions & 18 deletions pekko/src/main/scala/anorm/PekkoStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,64 @@

package anorm

import java.sql.Connection

import scala.util.control.NonFatal
import org.apache.pekko.stream.scaladsl.Source

import java.sql.Connection
import scala.concurrent.{ Future, Promise }

import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Source }
import scala.util.control.NonFatal

/**
* Anorm companion for the [[http://doc.akka.io/docs/akka/2.4.4/scala/stream/]].
* Anorm companion for the Pekko Streams.
*
* @define materialization It materializes a [[scala.concurrent.Future]] of [[scala.Int]] containing the number of rows read from the source upon completion, and a possible exception if row parsing failed.
* @define sqlParam the SQL query
* @define materializerParam the stream materializer
* @define connectionParam the JDBC connection, which must not be closed until the source is materialized.
* @define columnAliaserParam the column aliaser
*/
object PekkoStream {

/**
* Returns the rows parsed from the `sql` query as a reactive source.
*
* $materialization
*
* @tparam T the type of the result elements
* @param sql $sqlParam
* @param parser the result (row) parser
* @param as $columnAliaserParam
* @param connection $connectionParam
*
* {{{
* import java.sql.Connection
*
* import scala.concurrent.Future
*
* import org.apache.pekko.stream.scaladsl.Source
*
* import anorm._
*
* def resultSource(implicit m: Materializer, con: Connection): Source[String, Future[Int]] = PekkoStream.source(SQL"SELECT * FROM Test", SqlParser.scalar[String], ColumnAliaser.empty)
* }}}
*/
@SuppressWarnings(Array("UnusedMethodParameter"))
def source[T](sql: => Sql, parser: RowParser[T], as: ColumnAliaser)(implicit
con: Connection
): Source[T, Future[Int]] = Source.fromGraph(new ResultSource[T](con, sql, as, parser))

/**
* Returns the rows parsed from the `sql` query as a reactive source.
*
* $materialization
*
* @tparam T the type of the result elements
* @param sql $sqlParam
* @param parser the result (row) parser
* @param connection $connectionParam
*/
@SuppressWarnings(Array("UnusedMethodParameter"))
def source[T](sql: => Sql, parser: RowParser[T])(implicit con: Connection): Source[T, Future[Int]] =
source[T](sql, parser, ColumnAliaser.empty)

/**
* Returns the result rows from the `sql` query as an enumerator.
* This is equivalent to `source[Row](sql, RowParser.successful, as)`.
Expand All @@ -32,13 +70,10 @@ object PekkoStream {
*
* @param sql $sqlParam
* @param as $columnAliaserParam
* @param m $materializerParam
* @param connection $connectionParam
*/
def source(sql: => Sql, as: ColumnAliaser)(implicit
m: Materializer,
connnection: Connection
): Source[Row, Future[Int]] = source(sql, RowParser.successful, as)
def source(sql: => Sql, as: ColumnAliaser)(implicit connection: Connection): Source[Row, Future[Int]] =
source(sql, RowParser.successful, as)

/**
* Returns the result rows from the `sql` query as an enumerator.
Expand All @@ -48,18 +83,18 @@ object PekkoStream {
* $materialization
*
* @param sql $sqlParam
* @param m $materializerParam
* @param connection $connectionParam
*/
def source(sql: => Sql)(implicit m: Materializer, connnection: Connection): Source[Row, Future[Int]] =
def source(sql: => Sql)(implicit connnection: Connection): Source[Row, Future[Int]] =
source(sql, RowParser.successful, ColumnAliaser.empty)

// Internal stages

import scala.util.{ Failure, Success }
import java.sql.ResultSet
import org.apache.pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape }
import org.apache.pekko.stream.stage.{ GraphStageWithMaterializedValue, GraphStageLogic, OutHandler }

import java.sql.ResultSet
import scala.util.{ Failure, Success }

private[anorm] class ResultSource[T](connection: Connection, sql: Sql, as: ColumnAliaser, parser: RowParser[T])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[Int]] {
Expand Down
2 changes: 1 addition & 1 deletion pekko/src/test/scala-2.13+/PekkoCompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

package anorm

private[anorm] object AkkaCompat {
private[anorm] object PekkoCompat {
type Seq[T] = _root_.scala.collection.immutable.Seq[T]
}
2 changes: 1 addition & 1 deletion pekko/src/test/scala-2.13-/PekkoCompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

package anorm

private[anorm] object AkkaCompat {
private[anorm] object PekkoCompat {
type Seq[T] = _root_.scala.collection.Seq[T]
}
2 changes: 1 addition & 1 deletion pekko/src/test/scala/anorm/PekkoStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ final class PekkoStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutabl
def unsafeStatement(
connection: Connection,
generatedColumn: String,
generatedColumns: AkkaCompat.Seq[String]
generatedColumns: PekkoCompat.Seq[String]
): PreparedStatement = ???

def unsafeStatement(connection: Connection, getGeneratedKeys: Boolean): PreparedStatement =
Expand Down

0 comments on commit 5c24d51

Please sign in to comment.