Skip to content

Releases: ArroyoSystems/arroyo

V0.9.0

17 Jan 16:47
Compare
Choose a tag to compare

These release notes are also available on the Arroyo blog.

Arroyo 0.9.0 introduces async UDFs, which allow users to use databases, services, and models from within their pipelines. It also brings support for joining update tables, more control over bad data handling, a redesigned connection profile editor, and more.

For this release, we are thrilled to welcome two new contributors to the Arroyo project:

Thanks to all our contributors for this release:

Async UDFs

User-defined functions (UDFs) and user-defined aggregate functions (UDAFs) allow you to extend Arroyo with custom logic. New in Arroyo 0.9 is support for what we call async UDFs.

Existing (sync) UDFs are expected to implement simple, computational logic. Common use cases include parsing custom formats, implementing functions that are not natively supported, or implementing custom business logic that would be difficult to express in SQL. Because they are run synchronously with processing, if they take too long to run they can block the entire pipeline.

This isn't a hypothetical concern. A UDF that takes 10ms to run will be limited to processing just 100 events per second per subtask!

And there are many use cases where you want to run logic that is not a simple, stateless computation. You may need to do point lookups in a database to enrich events, make an API call to another service, or even perform inference against an AI model.

Async UDFs allow you to do all of these things, without blocking the pipeline. Async UDFs are defined as a Rust async fn, supporting non-blocking IO. Then within the Arroyo runtime, many instances of the UDF can be run in parallel, with a configurable concurrency limit.

What does this look like? Let's take an example of a UDF that enriches web events
with GeoIP data by calling a GeoIP service. First, we define the UDF:

/*
[dependencies]
reqwest = { version = "0.11.23", features = ["json"] }
serde_json = "1"

[udfs]
async_max_concurrency = 1000
*/

pub async fn get_city(ip: String) -> Option<String> {
  let body: serde_json::Value =
    reqwest::get(
      format!("http://geoip-service:8000/{ip}"))
        .await
        .ok()?
        .json()
        .await
        .ok()?;

    body.pointer("/names/en").and_then(|t|
      t.as_str()
    ).map(|t| t.to_string())
}

Then we can use this UDF in a query, for example this one that finds the most common cities in the last 15 minutes:

create view cities as
select get_city(logs.ip) as city
from logs;

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        city,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM cities
            WHERE city IS NOT NULL
            group by city, window)) WHERE row_num <= 5;

Async UDFs support several configuration options that control their behavior, including the maximum number of concurrent requests, timeouts, and whether event order should be preserved.

They also support defining a Context struct which is passed in to each invocation of the UDF. This allows you to do setup for expensive operations (like setting up a Database connection pool) just once, and share the result with all invocations.

For example, we can create a client for Postgres like this:

pub struct Context {
    client: RwLock<Option<Client>>,
}

impl Context {
    pub fn new() -> Self {
        Self {
            client: RwLock::new(None),
        }
    }
}

#[async_trait]
impl arroyo_types::UdfContext for Context {
    async fn init(&self) {
        let conn_str = "host=localhost user=dbuser password=dbpassword dbname=my_db";

        let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

        let mut c = self.client.write().await;
        *c = Some(client);

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                println!("connection error: {}", e);
            }
        });
    }
}

See the docs for full details.

Joining Update tables

Arroyo has two semantics for streaming SQL, one based on event-time watermarks and another that we call "Update Tables," which allow for incremental computation of most analytical SQL constructs. However, previously there was a restriction on joins, which were only supported for the watermark semantic.

Now that restriction is gone, and update tables can be joined. Hurray!

For example, this query will count the number of page views by user, and when a transaction comes in joins that accumulated count:

CREATE VIEW page_view_counts as
SELECT count(*) as count, user_id as user_id
FROM page_views
GROUP BY user_id;


SELECT T.user_id, amount, P.count as page_views
FROM transactions T
LEFT JOIN page_view_counts P on T.user_id = P.user_id;

Bad Data handling

It happens to the best of us. You've carefully built out your data architecture, perform rigorous code reviews, and diligently monitor your rollouts.

And yet it happens—somehow, invalid data got into your Kafka topic. Now in Arroyo 0.9, you can now configure the source to drop bad data instead of causing a processing failure.

This behavior can be configured via the Web UI or with the bad_data option when creating a source in SQL. Two options are currently available:

  • fail (default, and behavior in Arroyo 0.8) causes the pipeline to restart when bad data is encountered

  • drop causes the data to be dropped, while logging an error message to the console and incrementing the arroyo_worker_deserialization_errors metric

  • Add control for deserialization error behavior by @jbeisen in #443

  • Add bad data option to create connection form by @jbeisen in #452

Environment variable substitution

Some connectors need to be configured with authentication details or other secret data. Today in Arroyo those secrets are stored in the configuration database (postgres), which is not very secure.

Now in Arroyo 0.9, we're introducing environment-variable substitution for sensitive configuration fields:

This feature allows you to use double curly braces ({{ }}) to reference environment variables, which will get substituted in at run time. For example, if you have an environment variable called WEBHOOK_SECRET you can now include this as a header in a webhook sink like

Authentication: Basic {{ WEBHOOK_SECRET }}

Env variable substitution can be used for both connections created via the Web UI and those created directly in SQL, like:

CREATE TABLE slack (
    value TEXT
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/{{ SLACK_KEY }}',
    method = 'POST',
    headers = 'Content-Type:application/json',
    format = 'json'
);
  • Support environment variable substitution by @jbeisen in #433

Confluent Cloud connector

Confluent is the company founded by the creators of Apache Kafka. They provide a cloud-native distribution of Kafka and serverless Kafka platform. While Arroyo has always supported reading and writing from Kafka, with Arroyo 0.9 we're making it even easier to integrate with your Confluent Cloud topics with the new Confluent Cloud connector.

See the complete integration guide to get started.

Connection Profile UI

Connection Profiles encapsulate common configuration that is shared across multiple connection tables. For example, a single Kafka cluster may have many topics that you would like to consumer or produce to from Arroyo.

We've upgraded the process of creating and managing these profiles. It's now possible to view and delete existing connection profiles, and the whole UI has gotten a little spiffier.

We're also now validating that the profile is good (like ensuring we can talk to Kafka with the provided address and credentials), so you don't discover you mistyped something at the end of the connection creation process.

  • Redesign cluster profile UI and add v...
Read more

v0.8.1

08 Dec 23:07
Compare
Choose a tag to compare

0.8.1 is a patch release, with several bug fixes over 0.8.0. It is a drop-in upgrade for clusters running 0.8.0.

Full Changelog: v0.8.0...v0.8.1

v0.8.0

29 Nov 06:12
Compare
Choose a tag to compare

These release notes are also available on our blog

The Arroyo team is pleased to announce the release of Arroyo 0.8.0. This release includes a number of new connectors, including a FileSystem source, Delta Lake sink, and a Redis sink. There are also other new features like Avro support, global UDFs, and more.

Arroyo is an open-source stream processing engine, that allows anyone to build correct, reliable, and scalable real-time data pipelines using SQL.

Read on for more, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

FileSystem source

Arroyo 0.5 added a high-performance FileSystem sink, capable of writing JSON and Parquet files to local or remote filesystems and object stores like S3. Now in 0.8, we're adding a corresponding FileSystem source.

The FileSystem source can be configured with a directory (on a local filesystem, or a remote object store like S3). It will read all files in that directory and emit them as records for processing, with support for JSON and Parquet files and a variety of compression codecs.

Files are read in parallel, enabling a very high rate of ingestion.

A FileSystem source can be created in SQL like this:

CREATE TABLE logs (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'filesystem',
    type = 'source',
    path = 's3://my-bucket/inputs/events',
    compression_format = 'gzip',
    format = 'json'
)

select * from logs;

In the next release, we will be improving our support for bootstrapping using the FileSystem source, which is the process of creating the initial state of the pipeline from historical data.

For example, you may have a window over 30 days of data. If you start reading today from Kafka, that will take 30 days to fully fill the window. But if you have the historical data available on S3, as is common in many modern data architectures, you can read from both the FileSystem and Kafka sources in parallel with a SQL union.

Full details on the FileSystem source are available in the docs.

Thanks to @rcjmurillo for this major contribution!

Delta Lake Sink

Delta Lake is a popular open-source storage format, with support for ACID transactions, schema enforcement, time travel, and more. It's a great choice for modern data lakes and supported by many popular query engines.

In Arroyo 0.8 we've enhanced our existing FileSystem sink to support Delta Lake, allowing you to write data transactionally to Delta tables.

With Arroyo, a high-performance Kafka to Delta Lake pipeline can be created in SQL as easily as

CREATE TABLE events (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'kafka',
    type = 'source',
    topic = 'events',
    format = 'json'
);

CREATE TABLE deltatable (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'delta',
    path = 's3://arroyo-deltalake/delta_uuid_remote_checkpoints',
    format = 'parquet',
    'filename.strategy' = 'uuid'
);

INSERT INTO deltatable
SELECT * FROM events;

See the Delta Lake docs for more details.

Redis Sink

For applications that rely on real-time data Redis is a popular choice for a fast, in-memory state store. Now Arroyo can sink its results directly to Redis, providing a great option for serving state.

Redis supports a variety of data structures. In the initial release of the Redis sink, we support writing to String tables, Lists, and Hashes. Keys can be constructed dynamically from columns in the record.

For example, to write results to a Redis String table, you can use a SQL statement like

CREATE TABLE redis (
    user_id TEXT,
    count INT
) WITH (
    connector = 'redis',
    type = 'sink',
    format = 'json',
    'address' = 'redis://localhost:6379',
    target = 'string',
    'target.key_prefix' = 'counts.',
    'target.key_column' = 'user_id'
);

INSERT INTO redis
SELECT user_id, count(*)
FROM events
GROUP BY user_id, hop(interval '5 seconds', interval '1 hour');

This will write JSON-encoded values to keys like counts.fred for the user fred.

Writes are performed efficiently using Redis pipelines and can achieve throughputs of millions of writes per second on a single Redis node.

Both standalone and cluster modes are supported.

See the Redis connector docs for more details.

Avro

Arroyo today supports JSON, Parquet, and custom string formats. New in 0.8 is support for Avro, including deep integration with Confluent Schema Registry.

Avro-encoded Kafka sources can be easily created using the Confluent Schema Registry, using the Arroyo UI or API:

From SQL, those sources can be queried like any other:

select * from pizza_orders;

When reading from schema registry sources, Arroyo will use the schema registry to decode the data with full support for schema evolution.

It's also possible to specify your schema directly, although without a schema registry you will not be able to evolve your schemas.

Avro sinks are now supported as well, allowing you to write Avro-encoded data to Kafka and other Avro-compatible systems. When using the Schema Registry, Arroyo will automatically register new schemas as they are encountered.

create table output with (
    connector = 'kafka',
    type = 'sink',
    bootstrap_servers = 'localhost:9092',
    'schema_registry.endpoint' =
      'http://localhost:8081',
    format = 'avro',
    'avro.confluent_schema_registry' = 'true',
    'topic' = 'outputs'
);

insert into output
select * from source;

See the Avro format docs for more details.

Schema Registry improvements

Arroyo supports Confluent Schema Registry for loading and storing schemas of data read and written to Kafka.

We've made several improvementsto our schema registry support in addition to adding Avro support (see above).

  • It's now possible to write JSON data to Kafka using a pre-existing schema that has been registered with the schema registry. This provides a more efficient way to write data to Kafka for use with Kafka Connect and other systems that rely on having a schema (previously Arroyo supported an option to embed the schema in the message, which is less efficient).a
  • We now support using schema registries that require authentication, for example Confluent Cloud.

See the Kafka connector docs for more details.

  • Implement json sink for schema registry, use connection types for sinks, add auth for schema registry by @jacksonrnewhouse in #416

UDFs

UDFs (user-defined functions) and UDAFs (user-defined aggregate functions) have become a widely-used feature of Arroyo, allowing users to extend Arroyo with custom logic written in Rust.

Arroyo 0.8 includes a number of improvements to UDFs, making them easier to use and more powerful.

Global UDFs

In earlier versions of Arroyo, UDFs could be defined within a single pipeline, but could not be easily shared. It's common to have a set of UDFs that are useful across an organization, for example to parse custom formats or implementing business logic.

Now in 0.8, UDFs can be defined globally, and used across multiple pipelines. Along with this, we've also completely reworked the UDF editing experience.

Custom UDF dependencies

UDFs can now depend on a custom set of external Rust crates (libraries) by specifying them as a special comment in the UDF definition. Previously, UDFs had a few built-in dependencies (including serde_json and regex) but now any Rust crate can be used.

For example, the access_log_parser crate provides a comprehensive suite of web server log parsers. It's now possible to write a UDF that uses this crate to parse access logs and extract fields from them.

/*
[dependencies]
access_log_parser = "0.8"
serde_json = "1"
*/

use access_log_parser::{parse, AccessLogError, LogType, LogEntry};
use serde_json::json;
use std::time::{UNIX_EPOCH, Duration};
...
Read more

v0.7.1

24 Oct 18:19
Compare
Choose a tag to compare

0.7.1 is a patch release that includes one fix over v0.7.0:

  • Use udfs crate for compilation in addition to checking (#382 by @mwylde)

v0.7.0

17 Oct 16:36
Compare
Choose a tag to compare

These release notes are also available on our blog.

The Arroyo team is excited to announce the release of Arroyo 0.7.0, the latest version of our open-source stream processing engine. This release includes a number of new features, including custom partitioning for the filesystem sink, message framing and unnest support, unions, state compaction, and more. Our focus on quality also continues, with a more sophisticated correctness test suite that can now test checkpointing and restoration.

Read on for more, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

With the 0.7 release out, we're already working on the next one. Arroyo 0.8 is targeted for mid-November, and will include a number of new features, including support for Avro, Delta Lake integration, a FileSystem source, saved UDFs, and more. We've also been working on a new distributed state backend, which will allow Arroyo to scale to multi-TB state sizes while maintaining fast restarts and frequently checkpoints.

Anything you'd like to see? Let us know on Discord!

Now on to the details.


Custom partitioning for FileSystem sink

Arroyo 0.5 added a high-performance, transactional filesystem sink which enables
ingestion into data warehouses on S3. The initial version did not support custom partitioning of data, so records were written to a single file per subtask (with time and size-based rollovers).

In many cases, you will get better query performance if you partition your data by a field in your data (like an event type) or by time.

Arroyo 0.7 introduces support for field-based and time-based partitioning, allowing you to optimize your data layout according to your query patterns.

For example you can now create a table like this:

CREATE TABLE file_sink (
    time TIMESTAMP,
    event_type TEXT,
    user_id TEXT,
    count INT
) WITH (
    connector = 'filesystem',
    format = 'parquet',
    path = 's3://arroyo-events/realtime',
    rollover_seconds = '3600',
    time_partition_pattern = 'year=%Y/month=%m/day=%d/hour=%H',
    partition_fields = 'event_type'
);

This will write data to a path like s3://arroyo-events/realtime/year=2023/month=10/day=19/hour=10/event_type=login.

See all of the available options in the docs.

Message Framing

Arroyo now supports defining a framing strategy for messages. This allows users to customize how messages read off of a source are split into records for processing, where previously there was always a one-to-one mapping. This is particularly useful for sources which do not have a built-in framing strategy, such as HTTP APIs.

As an example, Arroyo can now directly consume metrics from a prometheus-compatible application—without using Prometheus! Here's a query that polls a prometheus endpoint (for a node exporter) and computes the CPU usage over a 1 minute sliding window:

create table raw_metrics (
    value TEXT,
    parsed TEXT generated always as (parse_prom(value))
) WITH (
    connector = 'polling_http',
    endpoint = 'http://localhost:9100/metrics',
    format = 'raw_string',
    framing = 'newline',
    emit_behavior = 'changed',
    poll_interval_ms = '1000'
);

create table metrics as
    select extract_json_string(parsed, '$.name') as name,
        cast(extract_json_string(parsed, '$.value') as float) as value,
        get_first_json_object(parsed, '$.labels') as labels
    from raw_metrics;

create table cpu as
select
    extract_json_string(labels, '$.cpu') as cpu,
    extract_json_string(labels, '$.mode') as mode,
    value
from metrics
where name = 'node_cpu_seconds_total';

select sum(usage) from (
    select rate(value) as usage, cpu, mode,
        hop(interval '2 seconds', '60 seconds') as window
    from cpu
    where mode = 'user' or mode = 'system'
    group by cpu, mode, window);

This relies on the following UDFs:

fn parse_prom(s: String) -> Option
fn parse_prom(s: String) -> Option<String> {
    let regex = regex::Regex::new(r"(?P<metric_name>\w+)\{(?P<labels>[^}]+)\}\s+(?P<metric_value>[\d.]+)").unwrap();
    let label_regex = regex::Regex::new(r##"(?P<label>[^,]+)="(?P<value>[^"]+)""##).unwrap();

    let captures = regex.captures(&s)?;

    let name = captures.name("metric_name").unwrap().as_str();
    let labels = captures.name("labels").unwrap().as_str();
    let value = captures.name("metric_value").unwrap().as_str();

    let labels: std::collections::HashMap<String, String> = label_regex.captures_iter(&labels)
        .map(|capture| (
            capture.name("label").unwrap().as_str().to_string(),
            capture.name("value").unwrap().as_str().to_string()
        ))
        .collect();


    Some(serde_json::json!({
        "name": name,
        "labels": labels,
        "value": value
    }).to_string())
}
fn rate(values: Vec) -> Option
fn rate(values: Vec<f32>) -> Option<f32> {
    let start = values.first()?;
    let end = values.last()?;

    Some((end - start) / 60.0)
}

Currently we support framing via newlines (specified as framing = 'newline' in SQL), although we plan to add support for other framing strategies in the future.

See the format docs for more.

  • Add support for framing in message deserialization by @mwylde in #339

SQL unnest

While framing allows you to split a single message into multiple records, this can only be applied at the source and for fairly simple framing rules. For other use cases you may want to do some computation or parsing that produces an array, and then unroll that array into multiple records.

This is what the new unnest operator does. It takes a column of type Array and produces a new record for each element in the array. This is often useful for dealing with JSON data, particularly from web APIs.

For example, the Github API doesn't provide a websocket feed of events, but it does provide a REST API endpoint. We can use the polling_http connector along with unnest to turn that into a stream:

CREATE TABLE raw_events (
    value TEXT
) WITH (
    connector = 'polling_http',
    endpoint = 'https://api.github.com/networks/arroyosystems/events',
    poll_interval_ms = '5000',
    emit_behavior = 'changed',
    headers = 'User-Agent:arroyo/0.7',
    format = 'json',
    'json.unstructured' = 'true'
);

create table events AS (
    select
        extract_json_string(event, '$.id') as id,
        extract_json_string(event, '$.type') as type,
        extract_json_string(event, '$.actor.login') as login,
        extract_json_string(event, '$.repo.name') as repo
    FROM
        (select unnest(extract_json(value, '$[*]'))
            as event from raw_events));

select concat(type, ' from ', login, ' in ', repo) FROM (
    select distinct(id), type, login, repo
    from events
);

SQL union

The union operator allows you to combine the results of multiple queries into a single stream. This is often useful for combining similar data from multiple sources (for example, two kafka streams); it can also be very useful for bootstrapping, which is the process of processing historical data and then switching to a live stream.

For example, we can use union to combine two Kafka topics like this:

create table topic1 (
    value TEXT
) WITH (
    connector = 'kafka',
    topic = 'topic1',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    format = 'raw_string'
);

create table topic2 (
    value TEXT
) WITH (
    connector = 'kafka',
    topic = 'topic2',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    format = 'raw_string'
);

select value from topic1
union all select value from topic2;

State compaction

Arroyo pipelines are stateful; operators like windows and joins need to remember things in order to aggregate across time, and sources and sinks need to remember what data they've already read or written to provide exactly-once semantics. While state is stored in memory for processing, it also needs to be written durably so that it can be recovered in the event of a failure. Today we write this state to local disk or to a remote object store like S3 as Parquet files.

This pro...

Read more

V0.6.0

14 Sep 19:58
268f562
Compare
Choose a tag to compare

These release notes are also available on our blog.

Arroyo 0.6 is focused on quality. After a break-neck pace of new features in the past few releases, we're taking a step back to focus on testing, correctness, and stability. I'm particularly excited about our new SQL test suite, which has uncovered a number of bugs and edge cases in our SQL support.

That said, it wouldn't be a new Arroyo release without some new features! We've added support for Google Cloud Storage (GCS) and improved our Helm chart so that Arroyo now runs well on Google Kubernetes Engine (GKE). We've also included a couple of new connectors—a polling HTTP source and a webhook sink—that helps Arroyo fit into companies that are earlier in their streaming journey. And we've shipped the first version of our UDAF support, which allows you to write your own SQL aggregate functions in Rust.

Thanks to all our contributors for this release:

What's next

Now that 0.6 is out the door, we're already hard at work on the next release, planned for mid-October. We're working on a number of exciting features and improvements. We're adding support for accumulator-based UDAFS, which will allow for efficient incremental calculations of aggregates. We're also working on improvements to our checkpoint storage to support compaction and more efficient deletes. And we'll be releasing a Postgres connector to allow directly writing to Postgres tables without having to use Debezium.

Anything you'd like to see? Let us know on Discord!

Now on to the details.


Features

GCS/GKE

Arroyo has long supported Amazon S3 as a storage backend for storing pipeline artifacts and checkpoints. With this release we're greatly expanding our storage options, with support for Google Cloud Storage (GCS) and alternative S3-compatible systems like Minio and Localstack.

We've also made the way storage is configured more flexible and consistent, via two environment variables: CHECKPOINT_URL and ARTIFACT_URL. These variables can take a variety of URL forms, like:

  • s3://my-bucket
  • s3::https://my-custom-s3:1234/my-bucket
  • https://s3.us-east-1.amazonaws.com/my-bucket
  • file:///my/local/filesystem
  • gs://my-gcs-bucket

See the docs for how to configure Arroyo on GKE.

  • Add support for GCS and minio/localstack by @mwylde in #296

User-defined aggregate functions

SQL Aggregate functions allow you to compute summary statistics over a group of rows, like SUM, AVG, and COUNT. Arroyo 0.6 adds initial support for user-defined aggregate functions (UDAFs) which allow you to write your own aggregate functions in Rust.

For example, Arroyo doesn't include an aggregate function for computing Exponential Moving Average (EMA), but now you can write your own:

fn ema(data: Vec<f64>) -> f64 {
    const alpha: f64 = 0.1;
    let mut ema = data[0] as f64;

    for i in 1..data.len() {
        let ema_value = alpha * data[i] as f64 + (1.0 - alpha) * ema;
        ema = ema_value;
    }

    ema
}

Currently only UDAFs over vectors are supported, but we plan to expand this to support UDAFs that rely on partial aggregates and two-phase aggregations.

Connectors

We've added two new connectors that are well suited for companies that are trying to integrate Arroyo with HTTP-based systems, to complement our existing Server-Sent Events source and Websocket sources.

Polling HTTP source

The new polling http source lets you turn any HTTP API into a streaming source. It periodically polls the endpoint, and emits any new data into the stream.

For example, we're so excited whenever we get a new PR on Github and we want to know about it as soon as possible. We can use the polling HTTP source to periodically fetch our PRs and emit them whenever this changes:

CREATE TABLE prs (
    value TEXT
) WITH (
    connector = 'polling_http',
    endpoint = 'https://api.github.com/repos/ArroyoSystems/arroyo/pulls?per_page=1&state=all',
    poll_interval_ms = '5000',
    emit_behavior = 'changed',
    headers = 'User-Agent:arroyo/0.6',
    format = 'json',
    'json.unstructured' = 'true'
);

SELECT extract_json_string(value, '$[0].url') as url
from prs;

Webhook sink

On the other end of your application, we now have a webhook sink that lets you send pipeline results to any HTTP endpoint. This allows you to consume outputs without needing to adopt streaming technologies like Kafka.

For example, we can build a simple Slack notification for our PRs using the webhook sink and the source we defined above:

CREATE TABLE slack (
    value TEXT
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX',
    method = 'POST',
    headers = 'Content-Type:application/json',
    format = 'json',
    'json.unstructured' = 'true'
);

INSERT INTO slack
SELECT concat('A new PR was created ', extract_json_string(value, '$[0].url')) as text
from prs;

Rest API

We've finished our migration from gRPC to REST for the public-facing Arroyo API. As of this release, all endpoints are now available via REST, and we've removed the gRPC endpoint from the API service. New endpoints in this release include the ability to create connections and fetch all job details including checkpoint information.

  • Switch all connection endpoints to REST API by @jbeisen in #242
  • Add pagination to pipelines and connection tables by @jbeisen in #255
  • Switch checkpoint details to REST API by @jbeisen in #265
  • Delete the GRPC API from the API service by @jbeisen in #266

Raw string serialization

Arroyo supports a number of different serialization formats, for handling data coming into and out of pipelines, including JSON, Parquet, and raw strings. In this release we've added support for serializing arbitrary string data. This is particularly powerful when combined with UDFs, which make it easy to construct complex string-based formats that Arroyo doesn't natively support.

For example it's now possible to write a UDF that produces complex, nested JSON that Arroyo's table schemas can't represent. For example:

fn my_to_json(f: f64) -> String {
    let v = serde_json::json!({
        "my_complex": {
            "nested_format": f
        }
    });

    serde_json::to_string(&v).unwrap()
}
  • Add support for serializing raw_string formats by @mwylde in #302

Improvements & Fixes

SQL

As I mentioned in the introduction, this is really the meat of this release. We've performed a major refactor to our SQL compiler that has fixed a number of bugs and inconsistencies. And we've added a new set of SQL correctness tests that perform end-to-end validation of our SQL implementation.

This work has uncovered a number of bugs and edge cases (fixed in this release) and gives us much more confidence in the quality of our SQL support going forward.

Console & API

Beyond SQL, we've also made a number of quality-of-life improvements to the console, including an improved preview experience, better error handling, and pagination.

Read more

v0.5.1

28 Aug 23:13
Compare
Choose a tag to compare

0.5.1 is a patch release that includes several bug fixes on top of 0.5.0:

v0.5.0

16 Aug 07:09
Compare
Choose a tag to compare

Arroyo 0.5 brings a number of new features and improvements to the Arroyo platform. The biggest of these is the new FileSystem connector, which is a high-performance, transactional sink for writing data to filesystems and object stores like S3. This allows Arroyo to write into data lakes and data warehouses. We've also added exactly-once support for Kafka sinks, a new Kinesis connector, expanded our SQL support, and made a number of improvements to the Web UI and REST API.

Read on for more details, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

Features

FileSystem connector

Columnar files (like Parquet) on S3 have become the de-facto standard for storing data at rest, combining low cost of storage with decent query performance. Modern query engines like Trino, ClickHouse, and DuckDB can operate directly on these files, as can many data warehouses like Snowflake and Redshift.

And with the new FileSystem connector, Arroyo can efficiently perform real-time ETL into these S3-backed systems.

The FileSystem connector is a high-performance, transactional sink for writing data (as Parquet or JSON files) to file systems and object stores like S3.

It's deeply integrated with Arroyo's checkpoint system for exactly-once processing. This means that even if a machine is lost or a job is restarted, the data written to S3 will be consistent and correct. Unlike other systems like Flink, it's even able to perform consistent checkpointing while in the process of writing a single Parquet file. This means that you can write larger files for better query performance while still performing frequent checkpoints.

Look out for a blog post in the near future with more details on how all of this works.

FileSystem sinks can be created in SQL via a CREATE TABLE statement like this:

CREATE TABLE bids (
  time timestamp,
  auction bigint,
  bidder bigint,
  price bigint
) WITH (
  connector ='filesystem',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  parquet_compression = 'zstd',
  rollover_seconds = '60'
);

See the docs for all of the details and available options.

Exactly-once Kafka sink

Arroyo has always supported exactly-once processing when reading from Kafka by integrating offset-tracking with its checkpoint system. In 0.5 we're adding exactly-once support for writing to Kafka as well. This enables end-to-end exactly-once processing when integrating with other systems via Kafka.

Exactly-once processing is achieved by leveraging Kafka's transactional API. When processing starts, Arroyo will begin a transaction which is used for all writes.

Once a checkpoint is completed successfully, the transaction is committed, allowing consumers to read the records. This ensures that records are only read once, even if a failure occurs.

If a failure does occur, the transaction will be rolled back and processing will restart from the last checkpoint.

Exactly-once Kafka sinks can be created in SQL via a CREATE TABLE statement by configuring the new 'sink.commit_mode' = 'exactly_once' option, for example:

CREATE TABLE sink (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kafka',
  topic = 'results',
  bootstrap_servers = 'localhost:9092',
  type = 'sink',
  format = 'json',
  'sink.commit_mode' = 'exactly_once'
);

There is also now a corresponding source.read_mode option for Kafka sources, which can set to read_committed to read only committed records produced by a transactional producer.

See the Kafka connector docs for more details.

  • implement exactly-once commits to Kafka sinks and read_committed reads to Kafka sources by @jacksonrnewhouse in #218

Kinesis connector

Arroyo now supports reading from and writing to AWS Kinesis data streams via the
new Kinesis connector. Like the existing Kafka connector, the Kinesis connector supports exactly-once processing of records.

Kinesis sources and sinks can be created in the Web UI or via SQL, for example

CREATE TABLE kinesis_source (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kinesis',
  stream_name = 'my-source',
  type = 'source',
  format = 'json'
);

CREATE TABLE kinesis_sink (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kinesis',
  stream_name = 'my-sink',
  type = 'sink',
  format = 'json'
);

INSERT INTO kinesis_sink
SELECT * from kinesis_source;

See the Kinesis connector docs for all the available options.

Postgres sink via Debezium

Arroyo now supports writing to relational databases (including Postgres and Mysql) via Debezium.

As part of this work, we've added support for embedding JSON schemas in outputs in Kafka Connect format. This allows integration with Kafka Connect connectors that, like Debezium, require a schema.

See the Postgres connector docs for the details.

We've also improved our format system to allow for more control over how data is serialized and deserialized, for example allowing for custom date and timestamp formats. Refer to the new format docs.

  • Support sinking to relational databases via Debezium by @mwylde in #235

Session windows

Arroyo 0.5 adds support for session windows.

Unlike sliding and tumbling windows which divide time up into fixed intervals, session windows are defined by a gap in time between records. This is often useful for determining when some period of activity has finished and can be analyzed.

For example, let's take a query over user events on an ecommerce site. A user may arrive on the site, browse around, add some items to their cart, then disappear. A day later they may return and complete their purchase. With session windows we can independently (and efficiently) analyze each of these sessions.

We can create a session window using the session function, which takes as an argument that gap time:

SELECT
  session(INTERVAL '1 hour') as window,
  user_id,
  count(*)
FROM clickstream
GROUP BY window, user_id;

Idle watermarks

Partitioned sources (like Kafka or Kinesis) may experience periods when some partitions are active but others are idle due to the way that they are keyed. This can cause delayed processing due to how watermarks are calculated: as the minimum of the watermarks of all partitions.

If some partitions are idle, the watermark will not advance, and queries that depend on it will not make progress. To address this, sources now support a concept of idleness, which allows them to mark partitions as idle after a period of inactivity. Idle partitions, meanwhile, are ignored for the purpose of calculating watermarks and so allow queries to advance.

Idleness is now enabled by default for all sources with a period of 5 minutes. It can be configured when creating a source in SQL by setting the idle_micros options, or disabled by setting it to -1.

A special case of idleness occurs when there are more Arroyo source tasks than partitions (for example, a Kafka topic with 4 partitions read by 8 Arroyo tasks). This means that some tasks will never receive data, and so will never advance their watermarks. This can occur as well for non-partitioned sources like WebSocket, where only a single task is able to read data. Now sources will immediately set inactive tasks to idle.

REST API

Continuing the work started in 0.4, we are migrating our API from gRPC to REST. This release includes a number of new endpoints, and can now be used to fully manage pipelines and jobs.

For example, let's walk through creating a new pipeline:

curl http://localhost:8000/api/v1/pipelines \
  -X POST -H "Content-Type: application/json" \
  --data @- << EOF
{
  "name": "my_pipeline",
  "parallelism": 1,
  "query": "
    CREATE TABLE impulse (
      counter BIGINT UNSIGNED NOT NULL,
      subtask_index BIGINT UNSIGNED NOT NULL
     )
     WITH (
      connector = 'impulse',
      event_rate = '100'
   );
   SELECT * from impulse;",
  "udfs": []
}
EOF

{
  "id": "pl_W2UjDI6Iud",
  "name": "my_pipeline",
  "stop": "none",
  "createdAt": 1692054789252281,
  ...
}

Each pipeline has one or more jobs, whic...

Read more

v0.4.1

17 Jul 23:09
Compare
Choose a tag to compare

0.4.1 is a patch release that includes two fixes on top of 0.4.0:

Full Changelog: v0.4.0...v0.4.1

v0.4.0

13 Jul 18:11
Compare
Choose a tag to compare

Overview

Arroyo 0.4 brings some big new features like update tables, Debezium support, and a major redesign of the connectors system that makes it much easier to build new connectors. Leveraging that, we've added websocket and fluvio connectors. We're also releasing the initial endpoints for our new REST API, which makes it easier to build automations around Arroyo.

Read on for more details, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

With 0.4 out, we're already looking ahead to Arroyo 0.5, to be released in early August. The headline feature of 0.5 will be the new Filesystem connector, which will support high throughput, transactional writes from Arroyo into data warehouses and data lakes backed by object stores like S3. We'll also be finishing the transition to the new REST API, adding Redis and Kinesis connectors, and adding a transactional Kafka sink. On the SQL side we'll be working on session windows and support for joining on external tables.

Anything else you'd like to see? Let us know on Discord!

Now on to the release notes.


Features

Update Tables

Arroyo 0.4 brings support for update tables. Exactly what that means is a bit complicated (and we'll dive into it below) but the short version is that you can now use Arroyo to efficiently read and write data from databases like Postgres and MySQL via Debezium, and many queries that were previously unsupported are now supported.

So what are update tables? Let's talk through the semantics of Arroyo tables today, which we'll call append tables going forward.

Take this query:

SELECT store_id, status from orders;

which produces this output stream:

Time                        store     status
7/10/23, 11:34:34 AM PDT    1142      "accepted"
7/10/23, 11:34:34 AM PDT    1737      "accepted"
7/10/23, 11:34:34 AM PDT    1149      "accepted"

This query will output one row for every record that comes in on the orders stream (let's say that's a kafka topic that receives every order). You can think of this as modeling a virtual table with three columns (time, store, and status). Each new order that comes in produces a new row in that table, or in other words is appended.

But what if we have a query that needs other operations beside appends? For example, consider this query:

SELECT store, count(*) AS count
FROM orders
GROUP BY customer;

which models a table with one row per customer. When a new order comes in, we may append a new row if it's a new customer, or we may need to update an existing row if we've already seen that customer. In other words, we need to support updates.

In Arroyo 0.3 that query is not supported, but in 0.4 it will produce an update stream that looks like this:

Time                     previous                              current                               op
7/10/23, 4:03:42 PM PDT  { "orders_store_id": 3, "count": 1 }  { "orders_store_id": 3, "count": 2 }  "u"
7/10/23, 4:03:40 PM PDT  null	                               { "orders_store_id": 1, "count": 1 }  "c"
7/10/23, 4:03:40 PM PDT  null                                  { "orders_store_id": 3, "count": 1 }  "c"

Each output records an update of some kind, either a [c]reate, [u]pdate, or [d]elete. This stream can be used directly, or it can be used to materialize the output into another database like Postgres or MySQL via Debezium, which natively supports this kind of update stream.

Update tables can also be used with Debezium to write to Arroyo from a SQL database CDC source. See the new Debezium tutorial for more details on how to set this up.

Beyond use with Debezium, update tables can also be very useful for efficiently implementing queries where it's important to know when some key enters or leaves a set. For example, for a fraud detection system you may have a set of rules that indicate possibly-fraudulent activity, like this query which looks for sites with suspiciously high click-through rates:

SELECT site as suspicious_site
FROM (
    SELECT site, clicks / impressions as click_through_rate
    FROM (SELECT site,
        SUM(CASE
            WHEN imp_type = 'click' THEN 1 ELSE 0 END) as clicks,
        SUM(CASE
            WHEN imp_type = 'impression' THEN 1 ELSE 0 END) as impressions
        FROM event_stream
    GROUP BY 1)
) WHERE click_through_rate > 0.02;

This query will produce a record with "op": "c" whenever a site first exceeds the threshold, and "op": "d" whenever a site falls below the threshold.

Connector redesign

Connectors integrate Arroyo with external systems. They implement sources that read data from external systems and sinks that write data to external systems.

Arroyo 0.4 brings a major redesign of the connectors system, making it much easier to build new connectors. In previous releases of Arroyo, connectors were deeply integrated with the various Arroyo sub-systems (the console, api, database, sql planner, compiler, etc.) and adding or modifying a connector required changes to all of those systems.

In 0.4, connector implementations are cleanly separated out into the new arroyo-connectors crate. New connectors can be created by implementing a simple trait.

This redesign has allowed us to add a number of new connectors in 0.4 (detailed below), and will accelerate our connector development going forward.

We've also revamped the UI experience around creating sources and sinks, which are now jointly managed in the new Connections tab in the console. This provides a more straightforward experience for creating and managing connections.

Finally, DDL for creating sources and sinks has also been updated to be more consistent and easier to use. For example, a Kafka source can be created with the following SQL:

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'broker-1.cluster:9092,broker-2.cluster:9092',
  topic = 'order_topic',
  type = 'source',
  'source.offset' = 'earliest'
);

New connectors

Arroyo 0.4 includes a number of new connectors leveraging the connector redesign. See the connector docs the full list of supported connectors.

Websocket sources

Arroyo 0.4 adds a new Websocket source, which allows Arroyo to read data from the many available websocket APIs.

For example, Coinbase provides a websocket API that streams the full orderbook for various cryptocurrencies. We can use the new Websocket source to stream that data into Arroyo, and perform real-time analytics on it.

As a simple example, this query computes the average price of Bitcoin in USD over the last minute:

CREATE TABLE coinbase (
    type TEXT,
    price TEXT
) WITH (
    connector = 'websocket',
    endpoint = 'wss://ws-feed.exchange.coinbase.com',
    subscription_message = '{
      "type": "subscribe",
      "product_ids": [
        "BTC-USD"
      ],
      "channels": ["ticker"]
    }',
    format = 'json'
);

SELECT avg(CAST(price as FLOAT)) from coinbase
WHERE type = 'ticker'
GROUP BY hop(interval '5' second, interval '1 minute');

Fluvio source/sink

Arroyo 0.4 adds a new Fluvio source and sink, which allows Arroyo to read and write data from Fluvio, a high-performance distributed streaming platform built on top of Rust and Kubernetes.

Fluvio has support for simple, stateless processing, but with Arroyo it can be extended to perform complex, stateful processing and analytics.

REST API

Today Arroyo is primarily used through the web console, which is great for individual users and small teams. But for more advanced use cases and larger orgs it's important to build automation and integrate Arroyo with internal infrastructure.

Arroyo has always provided a gRPC API that controls all aspects of the system. This is the API that powers the console. But gRPC can be difficult to work with, and it isn't widely supported by existing tools and libraries. We also haven't treated the gRPC API as a stable interface and have made regular breaking changes.

So with this release, we're starting the process of migrating the API to REST, and making it a first-class, stable interface for Arroyo. Arroyo 0.4 adds the first REST endpoints that support pipeline creation, management, and inspection. For example, a SQL pipeline can be created with the following curl command:

curl -XPOST http://localhost:8003/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders",
    "query": "SELECT * FROM orders;"
    "udfs": [],
    "parallelism": 1,
  }'

See the [REST API ...

Read more