Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake Support #5500

Open
turb opened this issue Sep 20, 2024 · 4 comments
Open

Snowflake Support #5500

turb opened this issue Sep 20, 2024 · 4 comments
Labels
enhancement New feature or request io

Comments

@turb
Copy link
Contributor

turb commented Sep 20, 2024

Hello here,

Apache Beam has Snowflake support, so it's possible to use it with:

  import import org.apache.beam.sdk.io.snowflake.SnowflakeIO

   val thingMapper = new SnowflakeIO.CsvMapper[Thing] {
       override def mapRow(parts: Array[String]): Thing = Thing(Typed1(parts(0)), Typed2(parts(1)), ...)
    }

    val datasource = SnowflakeIO.DataSourceConfiguration.create()
      .withUsernamePasswordAuth(
        args("snowflake-user"), args("snowflake-password"))
      .withServerName("redacted.snowflakecomputing.com")
      .withDatabase("redacted_db")
      .withRole("redacted_role")
      .withWarehouse("redacted_warehouse")
      .withSchema("redacted_schema")

    val read = SnowflakeIO.read()
      .withDataSourceConfiguration(datasource)
      .fromQuery("SELECT COLUM1, COLUMN2... FROM THINGS")
      .withStagingBucketName("gs://paths") // here for Google Cloud
      .withStorageIntegrationName("redacted_integration")
      .withCsvMapper(thingMapper)
      .withCoder(CoderMaterializer.beam(sc, Thing.coder))

    val things: SCollection[Thing] = sc.customInput("things", read)

However a proper scio integration would be great. I suppose derivating Thing to SnowflakeIO.CsvMapper would need some first a PR in magnolify?

I can work on it.

@RustedBones RustedBones added enhancement New feature or request io labels Sep 20, 2024
@RustedBones
Copy link
Contributor

RustedBones commented Sep 20, 2024

For CSV, we've actually relied on the katan library. It looks to be possible to implement the IO simply with a RowDecoder.

We'd be very happy to see a new contribution from your side!

@turb
Copy link
Contributor Author

turb commented Sep 20, 2024

For CSV, we've actually relied on the katan library. It looks to be possible to implement the IO simply with a RowDecoder.

The parser is implemented on the Beam side, using opencsv: only the downstream mapper can be specified. So it would need a PR on Beam to allow to specify another parser.

@RustedBones
Copy link
Contributor

RustedBones commented Sep 20, 2024

I meant to leverage the decoding part of katan, with smth like

   val thingMapper = new SnowflakeIO.CsvMapper[Thing] {
       override def mapRow(parts: Array[String]): Thing =  implicitly[RowDecoder[Thing]].unsafeDecode(parts.toSeq)
    }

turb added a commit to turb/scio that referenced this issue Sep 25, 2024
turb added a commit to turb/scio that referenced this issue Sep 25, 2024
@turb
Copy link
Contributor Author

turb commented Sep 25, 2024

@RustedBones opened #5502

turb added a commit to turb/scio that referenced this issue Sep 25, 2024
turb added a commit to turb/scio that referenced this issue Sep 25, 2024
turb added a commit to turb/scio that referenced this issue Sep 25, 2024
turb added a commit to turb/scio that referenced this issue Sep 25, 2024
turb added a commit to turb/scio that referenced this issue Sep 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request io
Projects
None yet
Development

No branches or pull requests

2 participants