Skip to content

Commit

Permalink
feat: use predefined channel configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Jul 6, 2023
1 parent 5e82842 commit d72ed50
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:

- name: run integration tests
run: |
if [[ ! -z "${{ secrets.PROFILE_JSON }}" ]]; then
echo "${{ secrets.PROFILE_JSON }}" > profile.json
if [[ ! -z "${{ secrets.CONFIG_EDN }}" ]]; then
echo "${{ secrets.CONFIG_EDN }}" > config.edn
clojure -X:test :includes '[:integration]'
fi
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pom.xml.asc
/.clj-kondo/
/profile.json
/.lsp/
/config.edn
20 changes: 20 additions & 0 deletions config.example.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{:client {:client-name "MY_CLIENT"
:user "TESTUSER"
:url "https://ORG_ID-ACCOUNT_ID.snowflakecomputing.com:443"
:private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n"
:port 443
:host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com"
:scheme "https"
:role "TESTROLE"}
:channels [{:chan-name "my_channel"
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE"
:on-error :continue}
{:chan-name "my_channel2"
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE2"
:on-error :abort}]
:server {:host "0.0.0.0"
:port 9099}}
27 changes: 25 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
[![test](https://github.com/thalesmg/emqx-snowflake-sidecar/actions/workflows/test.yaml/badge.svg?branch=main)](https://github.com/thalesmg/emqx-snowflake-sidecar/actions/workflows/test.yaml)

# prerequisites

1. Have an account on Snowflake
2. Clojure 1.11
3. Have an user with a role that has the sufficient privileges on all relevant objects.
- Such role must have:
- `USAGE` on the database.
4. Set up a key pair for the user.
- Create a key pair
```sh
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.p8 -nocrypt
openssl rsa -in snowflake_rsa_key.p8 -pubout -out snowflake_rsa_key.pub
```
- Associate the public key with the user in Snowflake.
```sql
alter user testuser set rsa_public_key='MII...';
desc user testuser;
```
5. Configure the service using `config.edn`. See `config.example.edn` for an example.
6. While it's not required, setting the `APP_NAME` environment variable to an unique and
fixed string per container/process is recommended to avoid creating unlimited channels
when the container/process restarts. Otherwise, an UUID is generated and used.
## running directly
```sh
clj -M -m emqx.core
env APP_NAME=myapp clj -M -m emqx.core
```
## running with repl
Expand All @@ -22,7 +45,7 @@ clj
## testing
```sh
clj -X:test
env APP_NAME=myapp clj -X:test
# only tests marked as :integration
clj -X:test :includes '[:integration]'
Expand Down
27 changes: 15 additions & 12 deletions src/emqx/channel.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns emqx.channel
(:require
[cheshire.core :as json]
[camel-snake-kebab.core :as csk]
[taoensso.timbre :as log])
(:import
[java.util Properties]
Expand All @@ -18,12 +18,12 @@
(defonce channel-manager (agent {}))

(defn start-client
[]
(let [params (json/parse-string (slurp "profile.json"))
props (Properties.)
_ (doseq [[^String k ^String v] params]
(.setProperty props k (str v)))
c (.. (SnowflakeStreamingIngestClientFactory/builder "my_client")
[{:keys [:client-name] :as params}]
(let [props (Properties.)
_ (doseq [[k v] params
:when (not= :client-name k)]
(.setProperty props (csk/->snake_case_string k) (str v)))
c (.. (SnowflakeStreamingIngestClientFactory/builder client-name)
(setProperties props)
(build))]
(reset! client c)))
Expand All @@ -47,11 +47,14 @@
function provided to `swap!` _may_ be executed multiple times, so it
should be side-effect free. We then use an agent to serialize the
side-effects."
[{:keys [:chan-name :database :schema :table :on-error] :as chan-params}]
[app-name {:keys [:chan-name :database :schema :table :on-error] :as chan-params}]
;; see https://github.com/snowflakedb/snowflake-ingest-java/blob/64182caf0af959271f4249e4bef9203e2a1f6d8d/profile_streaming.json.example
(let [on-error (case on-error
:continue OpenChannelRequest$OnErrorOption/CONTINUE
:abort OpenChannelRequest$OnErrorOption/ABORT)
;; each channel should be owned by a single
;; process/application to avoid invalidating each other.
chan-name (str chan-name "-" app-name)
chan-req (.. (OpenChannelRequest/builder chan-name)
(setDBName database)
(setSchemaName schema)
Expand All @@ -65,20 +68,20 @@
chan-agent))

(defn- do-ensure-streaming-agent
[state {:keys [:chan-name] :as params} reply-promise]
[state app-name {:keys [:chan-name] :as params} reply-promise]
(let [chan-agent (@channels chan-name)]
(if chan-agent
;; already created by a concurrent call
(deliver reply-promise chan-agent)
(let [chan-agent (start-streaming-agent params)]
(let [chan-agent (start-streaming-agent app-name params)]
(swap! channels assoc chan-name chan-agent)
(deliver reply-promise chan-agent)))
state))

(defn ensure-streaming-agent
[params]
[app-name params]
(let [reply-promise (promise)]
(send-off channel-manager do-ensure-streaming-agent params reply-promise)
(send-off channel-manager do-ensure-streaming-agent app-name params reply-promise)
@reply-promise))

;; note: offset can be null when the channel is new.
Expand Down
45 changes: 45 additions & 0 deletions src/emqx/config.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
(ns emqx.config
(:require
[clojure.edn :as edn]
[clojure.java.io :as io]
[schema.coerce :as coerce]
[schema.core :as s]))

(s/defschema ClientConfig
{:client-name s/Str
:user s/Str
:url s/Str
:private-key s/Str
:port s/Num
:host s/Str
:scheme s/Str
:role s/Str})

(s/defschema ChannelConfig
{:chan-name s/Str
:database s/Str
:schema s/Str
:table s/Str
:on-error (s/enum :continue :abort)})

(s/defschema ServerConfig
{:host s/Str
:port s/Num})

(s/defschema EMQXConfig
{:client ClientConfig
:channels [ChannelConfig]
:server ServerConfig})

(defn get-config!
"Get the configurations for startup.
That config is read from a `config.edn` file in the app's working
directory."
[]
(let [params (with-open [r (io/reader "config.edn")]
(edn/read (java.io.PushbackReader. r)))
coercer (coerce/coercer! EMQXConfig coerce/keyword-enum-matcher)
config (coercer params)
app-name (or (System/getenv "APP_NAME")
(str (java.util.UUID/randomUUID)))]
(assoc config :app-name app-name)))
13 changes: 9 additions & 4 deletions src/emqx/core.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
(ns emqx.core
(:require
[emqx.channel :as chan]
[emqx.http :as server]
[emqx.config :as config]
[emqx.http :as http-server]
[taoensso.timbre :as log]))

(defn -main
[& args]
(prn "args" args)
(log/info :msg ::init-args :args args)
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable
Expand All @@ -19,5 +20,9 @@
(chan/stop-client)
(log/info :msg ::shutting-down-agents)
(shutdown-agents))))
(chan/start-client)
(server/start))
(let [{:keys [:app-name :client :channels :server]} (config/get-config!)]
;; Should the client name be unique?
(chan/start-client client)
(doseq [chan-params channels]
(chan/ensure-streaming-agent app-name chan-params))
(http-server/start server)))
10 changes: 5 additions & 5 deletions src/emqx/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,16 @@
::http/type :jetty
;; FIXME: for repl
::http/join? false
::http/host "0.0.0.0"
::http/host (get params :host "0.0.0.0")
::http/port (get params :port 9099)}))

(defonce server (atom nil))

(defn start
[]
(reset! server (http/start (create-server {}))))
[params]
(reset! server (http/start (create-server params))))

(defn restart
[]
[params]
(swap! server http/stop)
(reset! server (start)))
(reset! server (start params)))
27 changes: 13 additions & 14 deletions test/emqx/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
(:require
[cheshire.core :as json]
[clojure.test :refer [deftest is testing use-fixtures]]
[emqx.config :as config]
[emqx.channel :as chan]
[emqx.http :as server]
[emqx.http :as http-server]
[io.pedestal.http :as http]
[io.pedestal.http.route :as route]
[io.pedestal.test :refer [response-for]]
[matcher-combinators.test]))

(defn make-service
[]
(::http/service-fn (http/create-servlet {::http/routes server/routes})))
(::http/service-fn (http/create-servlet {::http/routes http-server/routes})))

(def url-for (route/url-for-routes server/routes))
(def url-for (route/url-for-routes http-server/routes))

(def ^:dynamic *service* nil)

(defn client-fixture
[f]
(chan/start-client)
(let [{:keys [:client]} (config/get-config!)]
(chan/start-client client))
(f)
(chan/stop-client))

Expand All @@ -39,21 +41,21 @@
(defn get-channel
[chan-name]
(response-for *service*
:get (url-for ::server/get-channel
:get (url-for ::http-server/get-channel
:path-params {:chan-name chan-name})))

#_(defn upsert-channel
[chan-name chan-params]
(response-for *service*
:post (url-for ::server/upsert-channel
:post (url-for ::http-server/upsert-channel
:path-params {:chan-name chan-name})
:headers {"Content-Type" "application/json"}
:body (json/generate-string chan-params)))

#_(defn delete-channel
[chan-name]
(response-for *service*
:delete (url-for ::server/delete-channel
:delete (url-for ::http-server/delete-channel
:path-params {:chan-name chan-name})))

#_(deftest ^:integration channels-test
Expand Down Expand Up @@ -163,7 +165,7 @@
(defn insert-rows
[chan-name body-params]
(response-for *service*
:post (url-for ::server/insert-rows
:post (url-for ::http-server/insert-rows
:path-params {:chan-name chan-name})
:headers {"Content-Type" "application/json"}
:body (json/generate-string body-params)))
Expand All @@ -176,12 +178,9 @@
(let [resp (insert-rows chan-name valid-rows)]
(is (= 404 (:status resp)))))

(let [chan-params {:chan-name chan-name
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE"
:on-error :continue}]
(chan/ensure-streaming-agent chan-params))
(let [{:keys [:app-name :channels]} (config/get-config!)]
(doseq [chan-params channels]
(chan/ensure-streaming-agent app-name chan-params)))

(testing "inserting valid rows"
(let [resp (insert-rows chan-name valid-rows)]
Expand Down

0 comments on commit d72ed50

Please sign in to comment.