Skip to content

Commit

Permalink
Merge pull request #7778 from NBKelly/rework-threading-model
Browse files Browse the repository at this point in the history
Rework threading model
  • Loading branch information
NBKelly authored Oct 4, 2024
2 parents 8c000dd + b01e061 commit 1e5b0d1
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 80 deletions.
20 changes: 15 additions & 5 deletions src/clj/web/app_state.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(ns web.app-state
(:require
[clojure.core.async :refer [<! go timeout]]
[cljc.java-time.temporal.chrono-unit :as chrono]
[cljc.java-time.instant :as inst]
[taoensso.encore :as enc]
[medley.core :refer [dissoc-in find-first]]))

(defonce app-state
(atom {:lobbies {}
:lobby-updates {}
:users {}}))

(defonce lobby-subs-timeout-hours 6)
(defonce lobby-subs-timeout-hours 1)

(defn register-user
[app-state uid user]
Expand Down Expand Up @@ -56,7 +58,6 @@
[uid user]
(swap! app-state register-user uid user))


(defn pause-lobby-updates
[uid]
(swap! app-state dissoc-in [:lobby-updates uid]))
Expand All @@ -66,14 +67,23 @@
(swap! app-state assoc-in [:lobby-updates uid] (inst/now)))

(defn receive-lobby-updates?
"checks if a user receives lobby updates, and updates the state if they've timed out to amortize subsequent checks. Mutates"
[uid]
(if-let [last-ping (get-in @app-state [:lobby-updates uid])]
(.isBefore (inst/now) (inst/plus last-ping lobby-subs-timeout-hours chrono/hours))
(do (pause-lobby-updates uid)
nil)))
(if (.isBefore (inst/now) (inst/plus last-ping lobby-subs-timeout-hours chrono/hours))
true
(do (pause-lobby-updates uid) nil))
(do (pause-lobby-updates uid) nil)))

(defn deregister-user!
"Remove user from app-state. Mutates."
[uid]
(pause-lobby-updates uid)
(swap! app-state dissoc-in [:users uid]))

(defonce lobby-subs-clearout-freq (enc/ms :mins 5))
(defonce cleanup-lobby-subs
(go (while true
(<! (timeout lobby-subs-clearout-freq))
(doseq [{uid :uid} (vals (:users @app-state))]
(receive-lobby-updates? uid)))))
22 changes: 16 additions & 6 deletions src/clj/web/chat.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[monger.collection :as mc]
[monger.query :as q]
[web.app-state :as app-state]
[web.lobby :as lobby]
[web.mongodb :refer [->object-id]]
[web.user :refer [active-user? visible-to-user]]
[web.utils :refer [response mongo-time-to-utc-string]]
Expand Down Expand Up @@ -67,7 +68,9 @@
chat-settings :system/chat
user :user} :ring-req
uid :uid
{:keys [channel msg]} :?data}]
{:keys [channel msg]} :?data
id :id
timestamp :timestamp}]
(when (and (active-user? user)
(not (s/blank? msg)))
(let [len-valid (<= (count msg) (chat-max-length chat-settings))
Expand All @@ -88,13 +91,16 @@
(visible-to-user user {:username uid} connected-users))]
(ws/broadcast-to! [uid] :chat/message inserted)))
(when uid
(ws/broadcast-to! [uid] :chat/blocked {:reason (if len-valid :rate-exceeded :length-exceeded)}))))))
(ws/broadcast-to! [uid] :chat/blocked {:reason (if len-valid :rate-exceeded :length-exceeded)})))))
(lobby/log-delay! timestamp id))

(defmethod ws/-msg-handler :chat/delete-msg
chat--delete-msg
[{{db :system/db
{:keys [username isadmin ismoderator] :as user} :user} :ring-req
{:keys [msg]} :?data}]
{:keys [msg]} :?data
id :id
timestamp :timestamp}]
(when-let [id (:_id msg)]
(when (or isadmin ismoderator)
(println username "deleted message" msg "\n")
Expand All @@ -105,13 +111,16 @@
:date (inst/now)
:msg msg})
(doseq [uid (ws/connected-uids)]
(ws/broadcast-to! [uid] :chat/delete-msg msg)))))
(ws/broadcast-to! [uid] :chat/delete-msg msg))))
(lobby/log-delay! timestamp id))

(defmethod ws/-msg-handler :chat/delete-all
chat--delete-all
[{{db :system/db
{:keys [username isadmin ismoderator]} :user :as user} :ring-req
{:keys [sender]} :?data}]
{:keys [sender]} :?data
id :id
timestamp :timestamp}]
(when (and sender
(or isadmin ismoderator))
(println username "deleted all messages from user" sender "\n")
Expand All @@ -122,4 +131,5 @@
:date (inst/now)
:sender sender})
(doseq [uid (ws/connected-uids)]
(ws/broadcast-to! [uid] :chat/delete-all {:username sender}))))
(ws/broadcast-to! [uid] :chat/delete-all {:username sender})))
(lobby/log-delay! timestamp id))
8 changes: 6 additions & 2 deletions src/clj/web/decks.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[jinteki.validator :refer [calculate-deck-status]]
[monger.collection :as mc]
[monger.result :refer [acknowledged?]]
[web.lobby :as lobby]
[web.mongodb :refer [->object-id ->object-id]]
[web.nrdb :as nrdb]
[web.utils :refer [response mongo-time-to-utc-string]]
Expand Down Expand Up @@ -101,7 +102,9 @@
[{{db :system/db
{username :username} :user} :ring-req
uid :uid
{:keys [input]} :?data}]
{:keys [input]} :?data
id :id
timestamp :timestamp}]
(try
(let [deck (nrdb/download-public-decklist db input)]
(if (every? #(contains? deck %) [:name :identity :cards])
Expand All @@ -116,4 +119,5 @@
(ws/broadcast-to! [uid] :decks/import-success "Imported"))
(ws/broadcast-to! [uid] :decks/import-failure "Failed to parse imported deck.")))
(catch Exception _
(ws/broadcast-to! [uid] :decks/import-failure "Failed to import deck."))))
(ws/broadcast-to! [uid] :decks/import-failure "Failed to import deck.")))
(lobby/log-delay! timestamp id))
77 changes: 55 additions & 22 deletions src/clj/web/game.clj
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@
game--start
[{{db :system/db} :ring-req
uid :uid
{gameid :gameid} :?data}]
{gameid :gameid} :?data
id :id
timestamp :timestamp}]
(lobby/lobby-thread
(let [{:keys [players started] :as lobby} (app-state/get-lobby gameid)]
(when (and lobby (lobby/first-player? uid lobby) (not started))
Expand All @@ -186,14 +188,17 @@
(stats/game-started db lobby?)
(lobby/send-lobby-state lobby?)
(lobby/broadcast-lobby-list)
(send-state-to-participants :game/start lobby? (diffs/public-states (:state lobby?)))))))))
(send-state-to-participants :game/start lobby? (diffs/public-states (:state lobby?)))))))
(lobby/log-delay! timestamp id)))

(defmethod ws/-msg-handler :game/leave
game--leave
[{{db :system/db user :user} :ring-req
uid :uid
{gameid :gameid} :?data
?reply-fn :?reply-fn}]
?reply-fn :?reply-fn
id :id
timestamp :timestamp}]
(lobby/lobby-thread
(let [{:keys [started state] :as lobby} (app-state/get-lobby gameid)]
(when (and lobby (lobby/in-lobby? uid lobby) started state)
Expand All @@ -203,7 +208,8 @@
lobby? nil nil (str (:username user) " has left the game.")))
(lobby/send-lobby-list uid)
(lobby/broadcast-lobby-list)
(when ?reply-fn (?reply-fn true))))))
(when ?reply-fn (?reply-fn true))))
(lobby/log-delay! timestamp id)))

(defn uid-in-lobby-as-original-player? [uid]
(find-first
Expand All @@ -215,7 +221,9 @@
game--rejoin
[{{user :user} :ring-req
uid :uid
?data :?data}]
?data :?data
id :id
timestamp :timestamp}]
(lobby/lobby-thread
(let [{:keys [original-players started players] :as lobby} (uid-in-lobby-as-original-player? uid)
original-player (find-first #(= uid (:uid %)) original-players)]
Expand All @@ -226,24 +234,30 @@
lobby? (lobby/join-lobby! user uid ?data nil lobby)]
(when lobby?
(send-state-to-uid! uid :game/start lobby? (diffs/public-states (:state lobby?)))
(update-and-send-diffs! main/handle-rejoin lobby? user)))))))
(update-and-send-diffs! main/handle-rejoin lobby? user)))))
(lobby/log-delay! timestamp id)))

(defmethod ws/-msg-handler :game/concede
game--concede
[{uid :uid
{gameid :gameid} :?data}]
{gameid :gameid} :?data
id :id
timestamp :timestamp}]
(let [lobby (app-state/get-lobby gameid)
player (lobby/player? uid lobby)]
(lobby/game-thread
lobby
(when (and lobby player)
(let [side (side-from-str (:side player))]
(update-and-send-diffs! main/handle-concede lobby side))))))
(update-and-send-diffs! main/handle-concede lobby side)))
(lobby/log-delay! timestamp id))))

(defmethod ws/-msg-handler :game/action
game--action
[{uid :uid
{:keys [gameid command args]} :?data}]
{:keys [gameid command args]} :?data
id :id
timestamp :timestamp}]
(try
(let [{:keys [state] :as lobby} (app-state/get-lobby gameid)
player (lobby/player? uid lobby)
Expand All @@ -268,7 +282,8 @@
:players (map #(select-keys % [:uid :side]) (:players lobby))
:spectators (map #(select-keys % [:uid]) (:spectators lobby))
:command command
:args args})))))
:args args})))
(lobby/log-delay! timestamp id)))
(catch Exception e
(ws/chsk-send! uid [:game/error])
(println (str "Caught exception"
Expand All @@ -278,7 +293,9 @@
(defmethod ws/-msg-handler :game/resync
game--resync
[{uid :uid
{gameid :gameid} :?data}]
{gameid :gameid} :?data
id :id
timestamp :timestamp}]
(let [lobby (app-state/get-lobby gameid)]
(lobby/game-thread
lobby
Expand All @@ -290,14 +307,17 @@
"\nGameID by ClientID:" gameid
"\nClientID:" uid
"\nPlayers:" (map #(select-keys % [:uid :side]) (:players lobby))
"\nSpectators" (map #(select-keys % [:uid]) (:spectators lobby)))))))))
"\nSpectators" (map #(select-keys % [:uid]) (:spectators lobby))))))
(lobby/log-delay! timestamp id))))

(defmethod ws/-msg-handler :game/watch
game--watch
[{{user :user} :ring-req
uid :uid
{:keys [gameid password request-side]} :?data
?reply-fn :?reply-fn}]
?reply-fn :?reply-fn
id :id
timestamp :timestamp}]
(lobby/lobby-thread
(let [lobby (app-state/get-lobby gameid)]
(when (and lobby (lobby/allowed-in-lobby user lobby))
Expand All @@ -322,13 +342,16 @@
(false? correct-password?)
(when ?reply-fn (?reply-fn 403))
:else
(when ?reply-fn (?reply-fn 404))))))))
(when ?reply-fn (?reply-fn 404))))))
(lobby/log-delay! timestamp id)))

(defmethod ws/-msg-handler :game/mute-spectators
game--mute-spectators
[{{user :user} :ring-req
uid :uid
{gameid :gameid} :?data}]
{gameid :gameid} :?data
id :id
timestamp :timestamp}]
(let [new-app-state (swap! app-state/app-state update :lobbies #(-> %
(lobby/handle-toggle-spectator-mute gameid uid)
(lobby/handle-set-last-update gameid uid)))
Expand All @@ -340,13 +363,16 @@
lobby?
(handle-message-and-send-diffs! lobby? nil nil (str (:username user) " " message " spectators."))
;; needed to update the status bar
(lobby/send-lobby-state lobby?)))))
(lobby/send-lobby-state lobby?)
(lobby/log-delay! timestamp id)))))

(defmethod ws/-msg-handler :game/say
game--say
[{{user :user} :ring-req
uid :uid
{:keys [gameid msg]} :?data}]
{:keys [gameid msg]} :?data
id :id
timestamp :timestamp}]
(let [new-app-state (swap! app-state/app-state update :lobbies lobby/handle-set-last-update gameid uid)
{:keys [state mute-spectators] :as lobby?} (get-in new-app-state [:lobbies gameid])
side (cond+
Expand All @@ -355,25 +381,31 @@
(when (and lobby? state side)
(lobby/game-thread
lobby?
(handle-message-and-send-diffs! lobby? side user msg)))))
(handle-message-and-send-diffs! lobby? side user msg)
(lobby/log-delay! timestamp id)))))

(defmethod ws/-msg-handler :game/typing
game--typing
[{uid :uid
{:keys [gameid typing]} :?data}]
{:keys [gameid typing]} :?data
id :id
timestamp :timestamp}]
(let [{:keys [state players] :as lobby} (app-state/get-lobby gameid)]
(lobby/game-thread
lobby
(when (and state (lobby/player? uid lobby))
(doseq [{:keys [uid]} (remove #(= uid (:uid %)) players)]
(ws/chsk-send! uid [:game/typing typing]))))))
(ws/chsk-send! uid [:game/typing typing])))
(lobby/log-delay! timestamp id))))

(defmethod ws/-msg-handler :chsk/uidport-close
chsk--uidport-close
[{{db :system/db
user :user} :ring-req
uid :uid
?reply-fn :?reply-fn}]
?reply-fn :?reply-fn
id :id
timestamp :timestamp}]
(lobby/lobby-thread
(let [{:keys [started state] :as lobby} (app-state/uid->lobby uid)]
(when (and started state)
Expand All @@ -384,4 +416,5 @@
(lobby/send-lobby-list uid)
(lobby/broadcast-lobby-list)
(app-state/deregister-user! uid)
(when ?reply-fn (?reply-fn true))))
(when ?reply-fn (?reply-fn true))
(lobby/log-delay! timestamp id)))
Loading

0 comments on commit 1e5b0d1

Please sign in to comment.