Skip to content

Commit

Permalink
Cleaned-up README and added more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
MartijnVisser committed Oct 21, 2021
1 parent 333940d commit c3fe3ae
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 21 deletions.
69 changes: 48 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This demo is used by Martijn Visser in his [Flink Forward talk 'Only SQL: Empower data analysts end-to-end with Flink SQL'](https://www.flink-forward.org/global-2021/conference-program#only-sql--empower-data-analysts-end-to-end-with-flink-sql)

[![Twitter Follow](https://img.shields.io/twitter/follow/MartijnVisser82?style=social&logo=twitter)](https://twitter.com/MartijnVisser82) [![GitHub Follow](https://img.shields.io/github/followers/MartijnVisser?style=social&logo=github)](https://github.com/MartijnVisser)

## Docker

We'll use Docker Compose to start all necessary services to run the demos. It will start the following services:
Expand All @@ -14,7 +16,7 @@ We'll use Docker Compose to start all necessary services to run the demos. It wi
* Elasticsearch 7.15.0, accessible via http://localhost:9200 (or http://elasticsearch:9200 via Docker networking)
* http-server: a simple static HTTP server, accessible via http://localhost:8080

![Demo overview](overview.png "Demo overview")
![Demo only-sql-overview](only-sql-overview.png "Demo overview")

## Starting the demo

Expand All @@ -34,7 +36,7 @@ docker exec -it `docker ps -q --filter "ancestor=ftisiot/flink_sql_cli:1.13.2"`

## Explore all website behaviour

Any visit to one of the webpages is sent to the Kafka topic `pageview`. In order to explore them, we first need to register this Kafka topic as a TABLE.
Any visit to one of the webpages is sent to the Kafka topic `pageview`. In order to explore them, we first need to register this Kafka topic as a table.

```sql
--Create table pageviews:
Expand All @@ -56,7 +58,7 @@ CREATE TABLE pageviews (
);
```

Any cookie that belongs to our website, is also sent to the topic. We are specifically interested in a cookie called `identifier`. We're going to register a VIEW, which returns this value by applygin a regular expressing on the incoming data.
Any cookie that belongs to the domain `localhostd` (which is where our website runs), is also sent to the topic. We are specifically interested in a cookie called `identifier`. We're going to register a view, which returns this value by applying a regular expressing on the incoming data.

```sql
--Create view which already extracts the identifier from the cookies
Expand All @@ -79,20 +81,32 @@ By now running queries on the view while visiting a webpage, you will see data a
SELECT * from web_activities;
```

![Flink SQL Client Results](flink-sql-client-results-01.png "Flink SQL Client Results")
![Flink SQL Client Results](only-sql-results-01.png "Flink SQL Client Results")

In this example, we're interested in users who visit our homepage more than 3 times in 10 seconds. In order to achieve that,
we're first going to select all identifiers (which contains the value of the cookie `identifier`) who are visiting the url that
matches with `http://localhost:8080`. We're expanding our selection with the `TUMBLE` function which creates fixed windows of
10 seconds. Last but not least, we're using the `HAVING` function to make sure that only the identifiers that are occurring more
than 3 times are being selected. The end result looks like this:

```sql
--Get the users who visit the homepage more than 3 times in 10 seconds
SELECT
`identifier`,
CONCAT('You Have Seen The Homepage ', CAST(COUNT(*) as VARCHAR), ' Times')
FROM web_activities
WHERE `url` = 'http://localhost:8080/'
WHERE `url` = 'http://localhost:8080/' OR `url` = 'http://127.0.0.1:8080/'
GROUP BY
TUMBLE(ts, INTERVAL '10' SECONDS), identifier
HAVING COUNT(*) > 3;
```

We've just created the list of `identifier` that meet our requirement of visiting the homepage more than 3 times in 10 seconds.
We now want to act on this data. In order to achieve that, we're going to send the list of `identifer` to our Elasticsearch sink.
Our website checks if there's any result in the Elasticsearch results and if so, it will display the notification.

To send the data to Elasticsearch, we first have to create another table like we've done before. We use the following DDL:

```sql
--Create a sink to display a notification
CREATE TABLE notifications (
Expand All @@ -108,6 +122,9 @@ CREATE TABLE notifications (
);
```

When that table is created, we'll re-use the previous SQL that returns the list of `identifier` and send those results to the
previously created table.

```sql
INSERT INTO notifications (`identifier`, `notification_id`, `notification_text`)
SELECT
Expand All @@ -121,6 +138,22 @@ GROUP BY
HAVING COUNT(*) > 3;
```

> :warning: The default value of the cookie `identifier` is `anonymous`. No notifications will be displayed if the value is `anonymous`.
>
> In order to change the value, you need to open the Developer Tools via either Cmd + Opt + J (on Mac) or Ctrl + Shift + J (on Windows)
>
> In the opened console, you then need to type `document.cookie="identifier=YourIdentifier"` to change the value of the `identifier` cookie.
If you've changed the value of your `identifier` cookie, and you refresh the page more than 3 times in 10 seconds, a notification will be displayed to you.

![Displaying a personal notification](only-sql-results-02.png "Displaying a personal notification")

Another common use case in SQL is that you need join data from multiple sources. In the next example, we will display a notification
to a user of our website who is calling us. We're mocking that call by using the [simulate call](http://localhost:8080/simulate-call.html)
link from the menu on our website. That page will send data to a separate topic which we'll use with our existing website behaviour data.

The first thing that we'll do is create another table, so we can connect to the data.

```sql
CREATE TABLE text_to_speech (
`identifier` STRING,
Expand All @@ -137,6 +170,15 @@ CREATE TABLE text_to_speech (
);
```

The following SQL statement will do a couple of things:

1. It will create a new notification by insert into our Elasticsearch sink, including a link to a page
2. For all identifiers that are making a call
3. While they have been active on the website 10 seconds prior to making the call
4. If their call is about how to contribute

The third step is using an interval join, which means that the join result is only there if the join condition and a time constraint is met.

```sql
INSERT INTO notifications (`identifier`, `notification_id`, `notification_text`, `notification_link`)
SELECT
Expand All @@ -150,19 +192,4 @@ AND w.ts BETWEEN t.ts - INTERVAL '10' SECONDS AND t.ts
AND t.category = 'how-to-contribute';
```

### For debugging purposes

```bash
# List all Kafka topics
docker-compose exec broker kafka-topics \
--list \
--bootstrap-server broker:29092
```

```bash
# See data on topic:
docker-compose exec broker kafka-console-consumer \
--bootstrap-server broker:29092 \
--topic notifications \
--from-beginning
```
![Displaying a notification with link](only-sql-results-03.png "Displaying a personal notification on how to contribute")
File renamed without changes
File renamed without changes
Binary file added only-sql-results-02.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added only-sql-results-03.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit c3fe3ae

Please sign in to comment.