diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 13e7b51..e942c46 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -61,7 +61,7 @@ jobs: - name: run integration tests run: | - if [[ ! -z "${{ secrets.PROFILE_JSON }}" ]]; then - echo "${{ secrets.PROFILE_JSON }}" > profile.json + if [[ ! -z "${{ secrets.CONFIG_EDN }}" ]]; then + echo "${{ secrets.CONFIG_EDN }}" > config.edn clojure -X:test :includes '[:integration]' fi diff --git a/.gitignore b/.gitignore index de37205..dd1b34e 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ pom.xml.asc /.clj-kondo/ /profile.json /.lsp/ +/config.edn diff --git a/config.example.edn b/config.example.edn new file mode 100644 index 0000000..fe72d8f --- /dev/null +++ b/config.example.edn @@ -0,0 +1,20 @@ +{:client {:client-name "MY_CLIENT" + :user "TESTUSER" + :url "https://ORG_ID-ACCOUNT_ID.snowflakecomputing.com:443" + :private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n" + :port 443 + :host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com" + :scheme "https" + :role "TESTROLE"} + :channels [{:chan-name "my_channel" + :database "TESTDATABASE" + :schema "PUBLIC" + :table "TESTTABLE" + :on-error :continue} + {:chan-name "my_channel2" + :database "TESTDATABASE" + :schema "PUBLIC" + :table "TESTTABLE2" + :on-error :abort}] + :server {:host "0.0.0.0" + :port 9099}} diff --git a/readme.md b/readme.md index 0b0487b..d321369 100644 --- a/readme.md +++ b/readme.md @@ -1,9 +1,32 @@ [![test](https://github.com/thalesmg/emqx-snowflake-sidecar/actions/workflows/test.yaml/badge.svg?branch=main)](https://github.com/thalesmg/emqx-snowflake-sidecar/actions/workflows/test.yaml) +# prerequisites + +1. Have an account on Snowflake +2. Clojure 1.11 +3. Have an user with a role that has the sufficient privileges on all relevant objects. + - Such role must have: + - `USAGE` on the database. +4. Set up a key pair for the user. + - Create a key pair + ```sh + openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.p8 -nocrypt + openssl rsa -in snowflake_rsa_key.p8 -pubout -out snowflake_rsa_key.pub + ``` + - Associate the public key with the user in Snowflake. + ```sql + alter user testuser set rsa_public_key='MII...'; + desc user testuser; + ``` +5. Configure the service using `config.edn`. See `config.example.edn` for an example. +6. While it's not required, setting the `APP_NAME` environment variable to an unique and + fixed string per container/process is recommended to avoid creating unlimited channels + when the container/process restarts. Otherwise, an UUID is generated and used. + ## running directly ```sh -clj -M -m emqx.core +env APP_NAME=myapp clj -M -m emqx.core ``` ## running with repl @@ -22,7 +45,7 @@ clj ## testing ```sh -clj -X:test +env APP_NAME=myapp clj -X:test # only tests marked as :integration clj -X:test :includes '[:integration]' diff --git a/src/emqx/channel.clj b/src/emqx/channel.clj index 3cc5bd4..f641907 100644 --- a/src/emqx/channel.clj +++ b/src/emqx/channel.clj @@ -1,6 +1,6 @@ (ns emqx.channel (:require - [cheshire.core :as json] + [camel-snake-kebab.core :as csk] [taoensso.timbre :as log]) (:import [java.util Properties] @@ -18,12 +18,12 @@ (defonce channel-manager (agent {})) (defn start-client - [] - (let [params (json/parse-string (slurp "profile.json")) - props (Properties.) - _ (doseq [[^String k ^String v] params] - (.setProperty props k (str v))) - c (.. (SnowflakeStreamingIngestClientFactory/builder "my_client") + [{:keys [:client-name] :as params}] + (let [props (Properties.) + _ (doseq [[k v] params + :when (not= :client-name k)] + (.setProperty props (csk/->snake_case_string k) (str v))) + c (.. (SnowflakeStreamingIngestClientFactory/builder client-name) (setProperties props) (build))] (reset! client c))) @@ -47,11 +47,14 @@ function provided to `swap!` _may_ be executed multiple times, so it should be side-effect free. We then use an agent to serialize the side-effects." - [{:keys [:chan-name :database :schema :table :on-error] :as chan-params}] + [app-name {:keys [:chan-name :database :schema :table :on-error] :as chan-params}] ;; see https://github.com/snowflakedb/snowflake-ingest-java/blob/64182caf0af959271f4249e4bef9203e2a1f6d8d/profile_streaming.json.example (let [on-error (case on-error :continue OpenChannelRequest$OnErrorOption/CONTINUE :abort OpenChannelRequest$OnErrorOption/ABORT) + ;; each channel should be owned by a single + ;; process/application to avoid invalidating each other. + chan-name (str chan-name "-" app-name) chan-req (.. (OpenChannelRequest/builder chan-name) (setDBName database) (setSchemaName schema) @@ -65,20 +68,20 @@ chan-agent)) (defn- do-ensure-streaming-agent - [state {:keys [:chan-name] :as params} reply-promise] + [state app-name {:keys [:chan-name] :as params} reply-promise] (let [chan-agent (@channels chan-name)] (if chan-agent ;; already created by a concurrent call (deliver reply-promise chan-agent) - (let [chan-agent (start-streaming-agent params)] + (let [chan-agent (start-streaming-agent app-name params)] (swap! channels assoc chan-name chan-agent) (deliver reply-promise chan-agent))) state)) (defn ensure-streaming-agent - [params] + [app-name params] (let [reply-promise (promise)] - (send-off channel-manager do-ensure-streaming-agent params reply-promise) + (send-off channel-manager do-ensure-streaming-agent app-name params reply-promise) @reply-promise)) ;; note: offset can be null when the channel is new. diff --git a/src/emqx/config.clj b/src/emqx/config.clj new file mode 100644 index 0000000..b02430a --- /dev/null +++ b/src/emqx/config.clj @@ -0,0 +1,45 @@ +(ns emqx.config + (:require + [clojure.edn :as edn] + [clojure.java.io :as io] + [schema.coerce :as coerce] + [schema.core :as s])) + +(s/defschema ClientConfig + {:client-name s/Str + :user s/Str + :url s/Str + :private-key s/Str + :port s/Num + :host s/Str + :scheme s/Str + :role s/Str}) + +(s/defschema ChannelConfig + {:chan-name s/Str + :database s/Str + :schema s/Str + :table s/Str + :on-error (s/enum :continue :abort)}) + +(s/defschema ServerConfig + {:host s/Str + :port s/Num}) + +(s/defschema EMQXConfig + {:client ClientConfig + :channels [ChannelConfig] + :server ServerConfig}) + +(defn get-config! + "Get the configurations for startup. + That config is read from a `config.edn` file in the app's working + directory." + [] + (let [params (with-open [r (io/reader "config.edn")] + (edn/read (java.io.PushbackReader. r))) + coercer (coerce/coercer! EMQXConfig coerce/keyword-enum-matcher) + config (coercer params) + app-name (or (System/getenv "APP_NAME") + (str (java.util.UUID/randomUUID)))] + (assoc config :app-name app-name))) diff --git a/src/emqx/core.clj b/src/emqx/core.clj index d9f5178..3269cb5 100644 --- a/src/emqx/core.clj +++ b/src/emqx/core.clj @@ -1,12 +1,13 @@ (ns emqx.core (:require [emqx.channel :as chan] - [emqx.http :as server] + [emqx.config :as config] + [emqx.http :as http-server] [taoensso.timbre :as log])) (defn -main [& args] - (prn "args" args) + (log/info :msg ::init-args :args args) (.addShutdownHook (Runtime/getRuntime) (Thread. ^Runnable @@ -19,5 +20,9 @@ (chan/stop-client) (log/info :msg ::shutting-down-agents) (shutdown-agents)))) - (chan/start-client) - (server/start)) + (let [{:keys [:app-name :client :channels :server]} (config/get-config!)] + ;; Should the client name be unique? + (chan/start-client client) + (doseq [chan-params channels] + (chan/ensure-streaming-agent app-name chan-params)) + (http-server/start server))) diff --git a/src/emqx/http.clj b/src/emqx/http.clj index 9947f7d..5c41fad 100644 --- a/src/emqx/http.clj +++ b/src/emqx/http.clj @@ -175,16 +175,16 @@ ::http/type :jetty ;; FIXME: for repl ::http/join? false - ::http/host "0.0.0.0" + ::http/host (get params :host "0.0.0.0") ::http/port (get params :port 9099)})) (defonce server (atom nil)) (defn start - [] - (reset! server (http/start (create-server {})))) + [params] + (reset! server (http/start (create-server params)))) (defn restart - [] + [params] (swap! server http/stop) - (reset! server (start))) + (reset! server (start params))) diff --git a/test/emqx/http_test.clj b/test/emqx/http_test.clj index d011347..8eb0df5 100644 --- a/test/emqx/http_test.clj +++ b/test/emqx/http_test.clj @@ -5,8 +5,9 @@ (:require [cheshire.core :as json] [clojure.test :refer [deftest is testing use-fixtures]] + [emqx.config :as config] [emqx.channel :as chan] - [emqx.http :as server] + [emqx.http :as http-server] [io.pedestal.http :as http] [io.pedestal.http.route :as route] [io.pedestal.test :refer [response-for]] @@ -14,15 +15,16 @@ (defn make-service [] - (::http/service-fn (http/create-servlet {::http/routes server/routes}))) + (::http/service-fn (http/create-servlet {::http/routes http-server/routes}))) -(def url-for (route/url-for-routes server/routes)) +(def url-for (route/url-for-routes http-server/routes)) (def ^:dynamic *service* nil) (defn client-fixture [f] - (chan/start-client) + (let [{:keys [:client]} (config/get-config!)] + (chan/start-client client)) (f) (chan/stop-client)) @@ -39,13 +41,13 @@ (defn get-channel [chan-name] (response-for *service* - :get (url-for ::server/get-channel + :get (url-for ::http-server/get-channel :path-params {:chan-name chan-name}))) #_(defn upsert-channel [chan-name chan-params] (response-for *service* - :post (url-for ::server/upsert-channel + :post (url-for ::http-server/upsert-channel :path-params {:chan-name chan-name}) :headers {"Content-Type" "application/json"} :body (json/generate-string chan-params))) @@ -53,7 +55,7 @@ #_(defn delete-channel [chan-name] (response-for *service* - :delete (url-for ::server/delete-channel + :delete (url-for ::http-server/delete-channel :path-params {:chan-name chan-name}))) #_(deftest ^:integration channels-test @@ -163,7 +165,7 @@ (defn insert-rows [chan-name body-params] (response-for *service* - :post (url-for ::server/insert-rows + :post (url-for ::http-server/insert-rows :path-params {:chan-name chan-name}) :headers {"Content-Type" "application/json"} :body (json/generate-string body-params))) @@ -176,12 +178,9 @@ (let [resp (insert-rows chan-name valid-rows)] (is (= 404 (:status resp))))) - (let [chan-params {:chan-name chan-name - :database "TESTDATABASE" - :schema "PUBLIC" - :table "TESTTABLE" - :on-error :continue}] - (chan/ensure-streaming-agent chan-params)) + (let [{:keys [:app-name :channels]} (config/get-config!)] + (doseq [chan-params channels] + (chan/ensure-streaming-agent app-name chan-params))) (testing "inserting valid rows" (let [resp (insert-rows chan-name valid-rows)]