Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Sep 6, 2024
1 parent 078519c commit 2207f5d
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 9 deletions.
64 changes: 57 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ val kantanCodecsVersion = "0.5.3"
val kantanCsvVersion = "0.7.0"
val kryoVersion = "4.0.3"
val magnoliaVersion = "1.1.10"
val magnolifyVersion = "0.7.4"
val magnolifyVersion = "0.7.4-47-aa9f05b-20240903T173705Z-SNAPSHOT"
val metricsVersion = "4.2.27"
val munitVersion = "1.0.1"
val neo4jDriverVersion = "4.4.18"
Expand Down Expand Up @@ -697,6 +697,7 @@ lazy val `scio-bom` = project
`scio-extra`,
`scio-google-cloud-platform`,
`scio-grpc`,
`scio-iceberg`,
`scio-jdbc`,
`scio-macros`,
`scio-neo4j`,
Expand Down Expand Up @@ -1171,6 +1172,25 @@ lazy val `scio-grpc` = project
)
)

lazy val `scio-iceberg` = project
.in(file("scio-iceberg"))
.dependsOn(
`scio-core` % "compile;test->test"
)
.settings(commonSettings)
.settings(
description := "Scio add-on for Iceberg",
libraryDependencies ++= Seq(
// compile
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-managed" % beamVersion,
// TODO add iceberg as test source
"org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion,
"com.spotify" %% "magnolify-beam" % magnolifyVersion,
// test
)
)

lazy val `scio-jdbc` = project
.in(file("scio-jdbc"))
.dependsOn(
Expand Down Expand Up @@ -1340,17 +1360,18 @@ lazy val `scio-examples` = project
.enablePlugins(NoPublishPlugin)
.disablePlugins(ScalafixPlugin)
.dependsOn(
`scio-core` % "compile->test",
`scio-avro` % "compile->test",
`scio-core` % "compile->test",
`scio-elasticsearch8`,
`scio-extra`,
`scio-google-cloud-platform`,
`scio-iceberg`,
`scio-jdbc`,
`scio-extra`,
`scio-elasticsearch8`,
`scio-neo4j`,
`scio-tensorflow`,
`scio-smb`,
`scio-parquet`,
`scio-redis`,
`scio-parquet`
`scio-smb`,
`scio-tensorflow`
)
.settings(commonSettings)
.settings(soccoSettings)
Expand Down Expand Up @@ -1422,6 +1443,35 @@ lazy val `scio-examples` = project
"org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion,
// no
"org.apache.iceberg" % "iceberg-hive-metastore" % "1.4.2",
"org.apache.hive.hcatalog" % "hive-hcatalog-core" % "3.1.3",
/*
def hive_version = "3.1.3"
def iceberg_version = "1.4.2"
testRuntimeOnly library.java.snake_yaml
testRuntimeOnly library.java.bigdataoss_gcs_connector
testRuntimeOnly library.java.hadoop_client
// needed to set up the test environment
testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version"
testImplementation "org.assertj:assertj-core:3.11.1"
testImplementation library.java.junit
// needed to set up test Hive Metastore and run tests
testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
testImplementation project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow")
testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
exclude group: "org.apache.hive", module: "hive-exec"
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
}
testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
testImplementation "org.apache.parquet:parquet-column:1.12.0"
*/

// no
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.httpcomponents" % "httpcore" % httpCoreVersion,
"org.apache.parquet" % "parquet-column" % parquetVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, PaneInfo}

@FunctionalInterface
trait FilenamePolicySupplier {
def apply(path: String, suffix: String): FilenamePolicy
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.examples.extra

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.iceberg._
import magnolify.beam._
import magnolify.beam.logical.nanos._
import org.joda.time.Instant

// ## Iceberg IO example

// Usage:

// `sbt "runMain com.spotify.scio.examples.extra.IcebergExample
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// FIXME
object IcebergExample {
/*
--------------------------------------------------------------------------------------------->
partition | row(__PARTITIONTIME timestamp(6)) >
record_count | BigDecimal >
file_count | BigDecimal >
total_size | BigDecimal >
data | row(timestamp row(min timestamp(6), max timestamp(6), null_count BigDecimal, nan_count BigDecimal), country_code row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), url row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), project row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), tls_protocol
*/
case class FileDownloads(
record_count: BigDecimal,
file_count: BigDecimal,
total_size: BigDecimal,
data: Data
)
case class Data(timestamp: Timestamp)
case class Timestamp(min: Instant, max: Instant, null_count: BigDecimal, nan_count: BigDecimal)

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

sc.iceberg[FileDownloads](
// TODO quoted things don't work via iceberg/hive
"",
"",
Map(
"type" -> "hive",
"uri" -> "",
"warehouse" -> ""
)
).debug(prefix = "FileDownload: ")

sc.run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.spotify.scio.examples.extra

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.coders.Coder
import com.spotify.scio.managed._
import com.spotify.scio.values.SCollection
import magnolify.beam._
import org.apache.beam.sdk.managed.Managed
import org.apache.beam.sdk.values.Row

// ## Managed IO example

// Usage:

// `sbt "runMain com.spotify.scio.examples.extra.ManagedExample
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// --table=[TABLE] --catalogName=[CATALOG] --catalogType=[CATALOG TYPE]
// --catalogUri=[CATALOG URI] --catalogWarehouse=[CATALOG WAREHOUSE]
// --output=[OUTPUT PATH]
object ManagedExample {

case class Record(a: Int, b: String)

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val config: Map[String, Object] = Map(
"table" -> args("table"),
"catalog_name" -> args("catalogName"),
"catalog_properties" ->
Map(
"type" -> args("catalogType"),
"uri" -> args("catalogUri"),
"warehouse" -> args("catalogWarehouse")
)
)

val rt = RowType[Record]
// provide implicit coder for Row with the schema derived from Record case class
implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema)

// read beam Row instances from iceberg
val records: SCollection[Record] = sc
.managed(
Managed.ICEBERG,
// schema derived from the Record case class
rt.schema,
config
)
// convert the Row instance to a Record
.map(rt.apply)

records
.map(r => r.copy(a = r.a + 1))
// convert the Record to a Row
.map(rt.apply)
// save Row instances to Iceberg
.saveAsManaged(Managed.ICEBERG, config)

sc.run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.iceberg

import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{EmptyTapOf, ScioIO, Tap, TapT}
import com.spotify.scio.values.SCollection
import magnolify.beam.RowType
import org.apache.beam.sdk.managed.Managed
import com.spotify.scio.managed.ManagedIO
import org.apache.beam.sdk.values.Row

final case class IcebergIO[T : RowType : Coder](config: Map[String, AnyRef]) extends ScioIO[T] {
override type ReadP = IcebergIO.ReadParam
override type WriteP = IcebergIO.WriteParam
override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T]

private lazy val rowType: RowType[T] = implicitly
private lazy val underlying: ManagedIO = ManagedIO(Managed.ICEBERG, config)
private lazy implicit val rowCoder: Coder[Row] = Coder.row(rowType.schema)

override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] =
underlying.readWithContext(sc, params).map(rowType.from)

override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] =
underlying.writeWithContext(data.transform(_.map(rowType.to)), params).underlying

override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = underlying.tap(read)
}

object IcebergIO {
type ReadParam = ManagedIO.ReadParam
type WriteParam = ManagedIO.WriteParam
}
28 changes: 28 additions & 0 deletions scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio

import com.spotify.scio.iceberg.syntax.{ScioContextSyntax, SCollectionSyntax}

/**
* Iceberg IO APIs. Import all.
*
* {{{
* import com.spotify.scio.iceberg._
* }}}
*/
package object iceberg extends ScioContextSyntax with SCollectionSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.iceberg.syntax

import com.spotify.scio.coders.Coder
import com.spotify.scio.iceberg.IcebergIO
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.managed.ManagedIO
import com.spotify.scio.values.SCollection
import magnolify.beam.RowType

class IcebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]) {
def saveAsIceberg(config: Map[String, AnyRef]): ClosedTap[Nothing] =
self.write(IcebergIO(config))(ManagedIO.WriteParam())
}

trait SCollectionSyntax {
implicit def icebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]): IcebergSCollectionSyntax[T] =
new IcebergSCollectionSyntax(self)
}
Loading

0 comments on commit 2207f5d

Please sign in to comment.