Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change app from http listener to mqtt client #5

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: docker

on: [push]
on:
push:
tags:
- "**"
workflow_dispatch: {}

jobs:
docker:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pom.xml.asc
/profile.json
/.lsp/
/config.edn
.rebel_readline_history
16 changes: 12 additions & 4 deletions config.example.edn
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE"
:on-error :continue}
:on-error :continue
:mqtt {:host "emqx1.emqx.net"
:port 1883
:clientid "chan1"
:topic "t/sf/1"
:qos 1}}
{:chan-name "my_channel2"
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE2"
:on-error :abort}]
:server {:host "0.0.0.0"
:port 9099}}
:on-error :abort
:mqtt {:host "emqx1.emqx.net"
:port 1883
:clientid "chan2"
:topic "t/sf/2"
:qos 1}}]}
17 changes: 7 additions & 10 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
{:paths ["src" "resources"]
:deps {
;; https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk
net.snowflake/snowflake-ingest-sdk {:mvn/version "2.0.1"}
net.snowflake/snowflake-ingest-sdk {:mvn/version "2.2.2"
:exclusions [org.slf4j/slf4j-api]}

;; https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3
;; org.eclipse.paho/org.eclipse.paho.client.mqttv3 {:mvn/version "1.2.5"}
clojurewerkz/machine_head {:mvn/version "1.0.0"}

io.pedestal/pedestal.jetty {:mvn/version "0.6.0"}
io.pedestal/pedestal.route {:mvn/version "0.6.0"}
io.pedestal/pedestal.service {:mvn/version "0.6.0"}
;; https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
;; org.slf4j/slf4j-simple {:mvn/version "2.0.7"}
com.fzakaria/slf4j-timbre {:mvn/version "0.4.0"}
;; https://mvnrepository.com/artifact/org.slf4j/slf4j-api
org.slf4j/slf4j-api {:mvn/version "2.0.16"}
com.taoensso/telemere {:mvn/version "1.0.0-beta22"}
com.taoensso/telemere-slf4j {:mvn/version "1.0.0-beta22"}

prismatic/schema {:mvn/version "1.4.1"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
com.taoensso/timbre {:mvn/version "6.2.1"}
org.clojure/tools.cli {:mvn/version "1.0.219"}
cheshire/cheshire {:mvn/version "5.11.0"}
funcool/cats {:mvn/version "2.4.2"}
Expand Down
12 changes: 7 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# prerequisites

1. Have an account on Snowflake
2. Clojure 1.11
2. Clojure 1.11+
3. Have an user with a role that has the sufficient privileges on all relevant objects.
- Such role must have:
- `USAGE` on the database.
Expand Down Expand Up @@ -38,10 +38,13 @@ clj
```

```clojure
(require '[emqx.http :as server])
(require '[emqx.config :as config])
(require '[emqx.channel :as chan])
(chan/start-client)
(server/start)
(let [{:keys [:app-name :client :channels]} (config/get-config!)]
(chan/start-client client)
(doseq [chan-params channels]
(chan/ensure-streaming-agent app-name chan-params)
(mqtt/start-client chan-params)))
```

## uberjar
Expand All @@ -50,7 +53,6 @@ clj
clj -T:build uber

java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar
java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar -D taoensso.timbre.config.edn='{:min-level :info}'
```

## docker
Expand Down
1 change: 0 additions & 1 deletion resources/taoensso.timbre.config.edn

This file was deleted.

64 changes: 20 additions & 44 deletions src/emqx/adapter.clj
Original file line number Diff line number Diff line change
@@ -1,49 +1,25 @@
(ns emqx.adapter
(:require
[cats.monad.either :refer [left right]]
[schema.core :as s]))
[cheshire.core :as json]))

(s/defschema UpsertChannelWire
{:chan-name s/Str
:database s/Str
:schema s/Str
:table s/Str
:on-error (s/enum :continue :abort)})
(defn mqtt-client-config
[{:keys [:host :port :clientid :topic :qos :username :password]}]
{:uri (str "tcp://" host ":" port)
:clientid clientid
:topic topic
:qos qos
:opts (cond-> {:auto-reconnect true}
username (assoc :username username)
password (assoc :password password))})

(defn upsert-channel-in
[{:keys [:json-params :path-params]}]
(let [{:keys [:chan-name]} path-params
{:keys [:database :schema :table :on_error]} json-params
on-error (keyword on_error)
chan-params {:chan-name chan-name
:database database
:schema schema
:table table
:on-error on-error}
errors (s/check UpsertChannelWire chan-params)]
(if errors
(left errors)
(right chan-params))))

(s/defschema JsonVal
(s/maybe
(s/conditional
string? s/Str
number? s/Num
boolean? s/Bool
map? {s/Str (s/recursive #'JsonVal)}
vector? [(s/recursive #'JsonVal)])))

(s/defschema InsertRowsWire
;; TODO: should use more lax value types???
{:rows [(s/conditional
(every-pred map? not-empty) {s/Str JsonVal})]})

(defn insert-rows-in
[{:keys [:json-params]}]
(let [rows (get json-params "rows")
insert-rows-params {:rows rows}
errors (s/check InsertRowsWire insert-rows-params)]
(if errors
(left errors)
(right insert-rows-params))))
(defn channel-data-in
"Parses the incoming payload and expects it to be a JSON encoded object"
[^bytes payload]
(try
(let [decoded (json/parse-string (String. payload "UTF-8"))]
(if (map? decoded)
(right decoded)
(left "bad input: should be a single JSON object")))
(catch Exception e
(left (str "bad input: failure parsing json: " (.getMessage e))))))
54 changes: 32 additions & 22 deletions src/emqx/channel.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns emqx.channel
(:require
[camel-snake-kebab.core :as csk]
[taoensso.timbre :as log])
[emqx.log :as log])
(:import
[java.util Properties]
[net.snowflake.ingest.streaming
Expand Down Expand Up @@ -91,15 +91,17 @@
(try
(.getLatestCommittedOffsetToken chan)
(catch SFException e
(log/error :msg ::get-latest-commited-offset-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e))
(log/error!
::get-latest-commited-offset-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e))
(.printStackTrace e)
nil)
(catch Exception e
(log/error :msg ::get-latest-commited-offset-exception
:exception (.getMessage e))
(log/error!
::get-latest-commited-offset-exception
:exception (.getMessage e))
nil)))

;; TODO: return partial success count???
Expand All @@ -113,29 +115,36 @@
(map (fn [e]
{:row-index (.getRowIndex e)
:message (.getMessage e)})))]
(log/error :msg ::insert-row-errors :errors errors :offset-token offset-token)
(log/error!
::insert-row-errors
:errors errors
:offset-token offset-token)
{:errors errors})
(do
(log/debug :msg ::insert-rows-success :offset-token offset-token)
(log/debug!
::insert-rows-success
:offset-token offset-token)
{:errors nil})))
(catch SFException e
(log/error :msg ::insert-row-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e)
:offset-token offset-token)
(log/error!
::insert-row-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e)
:offset-token offset-token)
(.printStackTrace e)
{:errors [(.getMessage e)]
:vendor-code (.getVendorCode e)})
(catch Exception e
(log/error :msg ::insert-row-exception
:exception (.getMessage e)
:offset-token offset-token)
(log/error!
::insert-row-exception
:exception (.getMessage e)
:offset-token offset-token)
{:errors [(.getMessage e)]})))

(defn- insert-rows
[{:keys [:chan :n] :as state} rows reply-promise]
(log/debug :msg ::insert-rows-enter :rows rows)
(log/debug! ::insert-rows-enter :rows rows)
(let [row-count (count rows)
offset-token (str (+ n row-count))
response (do-insert-rows chan rows offset-token)]
Expand All @@ -160,10 +169,11 @@
(if-let [channel-agent (@channels chan-name)]
(let [{:keys [:chan]} @channel-agent]
(.close chan)
(log/info :msg ::channel-closed
:chan-name chan-name
:closed? (.isClosed chan)
:valid? (.isValid chan))
(log/info!
::channel-closed
:chan-name chan-name
:closed? (.isClosed chan)
:valid? (.isValid chan))
(swap! channels dissoc chan-name)
(deliver reply-promise true))
(deliver reply-promise true))
Expand Down
46 changes: 39 additions & 7 deletions src/emqx/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,51 @@
:scheme s/Str
:role s/Str})
Copy link
Collaborator

@id id Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

role is unnecessary in the config, it's not used in streaming


(s/defschema MQTTConfig
{:host s/Str
:port s/Num
:clientid s/Str
:topic s/Str
:qos (s/enum 0 1 2)
(s/optional-key :username) s/Str
(s/optional-key :password) s/Str})

(s/defschema ChannelConfig
{:chan-name s/Str
:database s/Str
:schema s/Str
:table s/Str
:on-error (s/enum :continue :abort)})

(s/defschema ServerConfig
{:host s/Str
:port s/Num})
:on-error (s/enum :continue :abort)
:mqtt MQTTConfig})

(s/defschema EMQXConfig
{:client ClientConfig
:channels [ChannelConfig]
:server ServerConfig})
:channels [ChannelConfig]})

(defn- duplicates
[coll]
(->> coll
frequencies
(filter (fn [[_ n]] (> n 1)))
(map (fn [[x _]] x))))

(defn- validate-unique!
[{:keys [:channels]} ks what]
(let [all-things (map (fn [chan-config]
(get-in chan-config ks))
channels)
duplicate-things (duplicates all-things)]
(when (seq duplicate-things)
(throw (RuntimeException.
(format "Duplicate %s found: %s" what (pr-str duplicate-things)))))))

(defn- validate-unique-clientids!
[channels]
(validate-unique! channels [:mqtt :clientid] "clientids"))

(defn- validate-unique-chan-names!
[channels]
(validate-unique! channels [:chan-name] "channel names"))

(defn get-config!
"Get the configurations for startup.
Expand All @@ -40,6 +70,8 @@
(edn/read (java.io.PushbackReader. r)))
coercer (coerce/coercer! EMQXConfig coerce/keyword-enum-matcher)
config (coercer params)
_ (validate-unique-clientids! config)
_ (validate-unique-chan-names! config)
app-name (or (System/getenv "APP_NAME")
(str (java.util.UUID/randomUUID)))]
(assoc config :app-name app-name)))
Loading
Loading