From 3703a56cef53815b40fa9333003ff3bda2f9fae6 Mon Sep 17 00:00:00 2001 From: Bozhidar Batsov Date: Sat, 27 Jun 2026 15:17:24 +0300 Subject: [PATCH 1/3] Allow multiple clients with most-recent-wins takeover The server no longer rejects a second connection. It tracks all connected channels and routes evaluations to the most recently connected one, so a new client takes over while the others stay connected and their prints still flow. To make coexistence correct the server routes replies per-channel: a client's ping is ponged back to that client (not the active one, which would storm the passive clients' heartbeats), and an evaluation is tied to the channel it was sent to (one atom holding {:channel :promise}), so only that client's :result satisfies it and a foreign or stale result can't answer the wrong eval. A client that drops mid-eval unblocks the REPL with an error instead of hanging it. The :clients/:ready lifecycle is guarded by a lock and is robust to connects, disconnects and frames that race start/stop: a connection landing after stop is ignored, a late on-close after stop can't re-arm readiness, a socket that drops mid-handshake is not left as a zombie, and a send after stop raises the documented IOException rather than an NPE. --- src/clj/weasel/repl/server.clj | 132 ++++++++++++++++++------ src/clj/weasel/repl/websocket.clj | 59 +++++++---- test/clj/weasel/repl/server_test.clj | 40 ++++++- test/clj/weasel/repl/websocket_test.clj | 41 +++++++- 4 files changed, 215 insertions(+), 57 deletions(-) diff --git a/src/clj/weasel/repl/server.clj b/src/clj/weasel/repl/server.clj index bf43c4e..fdc02cb 100644 --- a/src/clj/weasel/repl/server.clj +++ b/src/clj/weasel/repl/server.clj @@ -3,53 +3,125 @@ (:import [java.io IOException])) (defonce state (atom {:server nil - :channel nil ; when the server starts, a - ; promise that derefs to a - ; channel when a client - ; connects - :response-fn nil})) + :clients [] ; connected channels, oldest first + :ready nil ; a promise, realized while at least one + ; client is connected and replaced with a + ; fresh one once the last client leaves; + ; nil while the server is stopped + :response-fn nil ; (fn [channel data]) + :on-disconnect nil})) ; (fn [channel]) + +;; Guards the :clients/:ready invariant (":ready is a realized promise iff +;; :clients is non-empty, and nil iff the server is stopped") so add/remove, +;; start and stop stay consistent. +(def ^:private lock (Object.)) + +(defn- add-client! [channel] + (locking lock + ;; ignore a connection that races server shutdown (:ready is nil when stopped) + (when (:ready @state) + (swap! state update :clients conj channel) + ;; idempotent: only the first client of an empty server realizes the promise + (deliver (:ready @state) true)))) + +(defn- remove-client! [channel] + (let [removed? + (locking lock + (when (some #(identical? % channel) (:clients @state)) + (swap! state + (fn [s] + (let [clients (vec (remove #(identical? % channel) (:clients s)))] + (assoc s :clients clients + ;; re-arm a fresh promise only while the server runs and + ;; the last client just left, so the next eval blocks + ;; until a client reconnects + :ready (if (seq clients) (:ready s) (promise)))))) + true))] + (when removed? + (when-let [f (:on-disconnect @state)] + (f channel))))) (defn handler [request] (if-not (:websocket? request) {:status 200 :body "Please connect with a websocket!"} (with-channel request channel - (if (realized? (:channel @state)) - (do - (http/send! channel (pr-str {:op :error, :type :occupied})) - (http/close channel)) - (do - (deliver (:channel @state) channel) - (on-close channel (fn [_] (swap! state assoc :channel (promise)))) - (on-receive channel (:response-fn @state))))))) + ;; multiple clients may connect; the most recent one wins evaluations, + ;; while the others stay connected so their prints still reach the REPL + (on-receive channel (fn [data] + (when-let [f (:response-fn @state)] + (f channel data)))) + (add-client! channel) + ;; register the close handler only after adding the client: http-kit + ;; invokes it synchronously if the socket is already closed, so this + ;; removes a socket that dropped mid-handshake instead of leaving it as a + ;; zombie in :clients + (on-close channel (fn [_] (remove-client! channel)))))) + +(defn active-channel + "Blocks until at least one client is connected, then returns the channel that + should receive evaluations: the most recently connected one. Throws once the + server is stopped." + [] + (loop [] + (if-let [ready (:ready @state)] + (do + (deref ready) + (or (peek (:clients @state)) (recur))) + (throw (IOException. "WebSocket server not started!"))))) + +(defn send-to! + "Sends `msg` to a specific `channel`. Returns false when the channel is + already closed." + [channel msg] + (http/send! channel msg)) (defn send! + "Sends `msg` to the active (most recently connected) client, blocking until a + client is connected." [msg] - (if-let [channel (:channel @state)] - (http/send! (deref channel) msg) - (throw (IOException. "WebSocket server not started!")))) + (send-to! (active-channel) msg)) + +(defn ready + "Returns the promise that is realized while at least one client is connected." + [] + (:ready @state)) -(defn channel [] - (:channel @state)) +(defn on-disconnect! + "Registers a one-arg function invoked with a channel whenever a client + disconnects." + [f] + (swap! state assoc :on-disconnect f)) (defn start [f & {:keys [ip port] :as opts}] {:pre [(ifn? f)]} - (swap! state - assoc :server (http/run-server #'handler opts) - :channel (promise) - :response-fn f)) + (locking lock + (swap! state assoc + :server (http/run-server #'handler opts) + :clients [] + :ready (promise) + :response-fn f + :on-disconnect nil))) (defn stop [] - (let [stop-server (:server @state)] - (when-not (nil? stop-server) - (stop-server) - (reset! state {:server nil - :channel nil - :response-fn nil}) - @state))) + (locking lock + (let [stop-server (:server @state) + ready (:ready @state)] + (when-not (nil? stop-server) + (stop-server) + ;; wake anything blocked in active-channel/wait-for-client so it can + ;; observe the stopped server and bail out cleanly + (when ready (deliver ready true)) + (reset! state {:server nil + :clients [] + :ready nil + :response-fn nil + :on-disconnect nil}) + @state)))) (defn wait-for-client [] - (deref (:channel @state)) + (when-let [ready (:ready @state)] + (deref ready)) nil) (defn restart [] diff --git a/src/clj/weasel/repl/websocket.clj b/src/clj/weasel/repl/websocket.clj index 2fbc02c..bc6b75d 100644 --- a/src/clj/weasel/repl/websocket.clj +++ b/src/clj/weasel/repl/websocket.clj @@ -10,16 +10,18 @@ "stores the value of *out* when the server is started" (atom nil)) -(def ^:private client-response - "stores a promise fulfilled by a client's eval response" +(def ^:private pending-eval + "the outstanding evaluation as {:channel ch :promise p}, or nil. Keeping the + target channel and its result promise in one atom means they are always read + and written together, so only the client an eval was sent to can satisfy it." (atom nil)) (declare - send-for-eval! websocket-setup-env websocket-eval load-javascript websocket-tear-down-env + on-client-disconnect transitive-deps) (defrecord WebsocketEnv [] @@ -37,14 +39,25 @@ :port 9001} opts)) +(def ^:private disconnect-result + {:status :exception + :value "Weasel client disconnected before returning a result" + :stacktrace "No stacktrace available."}) + +(defn- deliver-if-active! + "Delivers `value` to the outstanding evaluation, but only if it was sent to + `channel` - so a stale or foreign message can't satisfy the wrong eval." + [channel value] + (when-let [{:keys [promise] ch :channel} @pending-eval] + (when (= channel ch) + (deliver promise value)))) + (defmulti ^:private process-message (fn [_ msg] (:op msg))) (defmethod process-message :result - [_ message] - (let [result (:value message)] - (when-not (nil? @client-response) - (deliver @client-response result)))) + [channel message] + (deliver-if-active! channel (:value message))) (defmethod process-message :print @@ -59,20 +72,28 @@ (defmethod process-message :ping - [_ _] - (server/send! (pr-str {:op :pong}))) + [channel _] + ;; pong back to the client that pinged, not the active one + (server/send-to! channel (pr-str {:op :pong}))) (defmethod process-message :default [_ _]) +(defn- on-client-disconnect + "Unblocks an outstanding evaluation when the client it was sent to drops, so + the REPL reports an error instead of hanging forever." + [channel] + (deliver-if-active! channel disconnect-result)) + (defn- websocket-setup-env [this opts] (reset! repl-out *out*) (server/start - (fn [data] (process-message this (read-string data))) + (fn [channel data] (process-message channel (read-string data))) :ip (:ip this) :port (:port this)) + (server/on-disconnect! on-client-disconnect) (let [{:keys [ip pre-connect]} this] (let [port (-> @server/state :server meta :local-port)] (println (str "<< started Weasel server on ws://" ip ":" port " >>"))) @@ -90,16 +111,18 @@ (defn- websocket-eval [js] - (reset! client-response (promise)) - (send-for-eval! js) - (let [ret @@client-response] - (reset! client-response nil) - ret)) + (let [channel (server/active-channel) + p (promise)] + (reset! pending-eval {:channel channel :promise p}) + ;; if the channel is already closed the message is silently dropped, so + ;; surface that immediately rather than waiting for a result that won't come + (when (false? (server/send-to! channel (pr-str {:op :eval-js, :code js}))) + (deliver p disconnect-result)) + (let [ret @p] + (reset! pending-eval nil) + ret))) (defn- load-javascript [_ provides _] (websocket-eval (str "goog.require('" (cmp/munge (first provides)) "')"))) - -(defn- send-for-eval! [js] - (server/send! (pr-str {:op :eval-js, :code js}))) diff --git a/test/clj/weasel/repl/server_test.clj b/test/clj/weasel/repl/server_test.clj index 9fa2623..3de1892 100644 --- a/test/clj/weasel/repl/server_test.clj +++ b/test/clj/weasel/repl/server_test.clj @@ -5,19 +5,51 @@ (deftest start-and-stop (testing "starting the server populates the shared state" (try - (server/start (fn [_]) :ip "127.0.0.1" :port 0) + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) (is (some? (:server @server/state)) "a stop fn is stored") - (is (instance? clojure.lang.IPending (:channel @server/state)) + (is (instance? clojure.lang.IPending (:ready @server/state)) "a pending client promise is stored") (finally (server/stop)))) (testing "stopping the server clears the shared state" (is (nil? (:server @server/state))) - (is (nil? (:channel @server/state))) - (is (nil? (:response-fn @server/state))))) + (is (nil? (:ready @server/state))) + (is (nil? (:response-fn @server/state))) + (is (empty? (:clients @server/state))))) (deftest send-without-server-throws (testing "sending with no running server raises an IOException" (server/stop) (is (thrown? java.io.IOException (server/send! "anything"))))) + +(deftest most-recent-client-wins + (testing "the active channel is the most recently connected client" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (try + (#'server/add-client! :client-a) + (is (= :client-a (server/active-channel))) + (#'server/add-client! :client-b) + (is (= :client-b (server/active-channel)) "newest client takes over") + (#'server/remove-client! :client-b) + (is (= :client-a (server/active-channel)) "falls back to the remaining client") + (finally + (server/stop))))) + +(deftest stop-wakes-blocked-waiter + (testing "stopping the server unblocks a thread waiting for a client" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (let [waiter (future (server/wait-for-client) :woke)] + (Thread/sleep 100) + (is (not (realized? waiter)) "blocks while no client is connected") + (server/stop) + (is (= :woke (deref waiter 1000 ::timeout)) "stop wakes the waiter")))) + +(deftest stale-disconnect-after-stop-leaves-server-stopped + (testing "a late client close after stop does not re-arm the readiness promise" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (#'server/add-client! :client-a) + (server/stop) + (#'server/remove-client! :client-a) ; the channel's on-close firing late + (is (nil? (:ready @server/state)) "the server stays stopped") + (is (thrown? java.io.IOException (server/active-channel))))) diff --git a/test/clj/weasel/repl/websocket_test.clj b/test/clj/weasel/repl/websocket_test.clj index 4619a36..e6ae7fd 100644 --- a/test/clj/weasel/repl/websocket_test.clj +++ b/test/clj/weasel/repl/websocket_test.clj @@ -1,12 +1,43 @@ (ns weasel.repl.websocket-test - (:require [clojure.test :refer [deftest is testing]] + (:require [clojure.test :refer [deftest is testing use-fixtures]] [clojure.edn :as edn] [weasel.repl.server :as server] [weasel.repl.websocket :as websocket])) +(defn- reset-eval-state! [] + (reset! @#'websocket/pending-eval nil)) + +(use-fixtures :each (fn [t] (reset-eval-state!) (t) (reset-eval-state!))) + (deftest ping-is-answered-with-pong - (testing "a :ping message makes the server send a :pong back" + (testing "a :ping is answered with a :pong sent back to the pinging client" (let [sent (atom nil)] - (with-redefs [server/send! (fn [msg] (reset! sent msg))] - (#'websocket/process-message ::ignored {:op :ping})) - (is (= {:op :pong} (edn/read-string @sent)))))) + (with-redefs [server/send-to! (fn [channel msg] (reset! sent {:channel channel :msg msg}))] + (#'websocket/process-message :the-channel {:op :ping})) + (is (= :the-channel (:channel @sent)) "pong goes to the client that pinged") + (is (= {:op :pong} (edn/read-string (:msg @sent))))))) + +(deftest result-correlation + (testing "only the channel an eval was sent to may answer it" + (let [response (promise)] + (reset! @#'websocket/pending-eval {:channel :ch-a :promise response}) + (#'websocket/process-message :ch-b {:op :result :value "wrong"}) + (is (not (realized? response)) "a foreign client's result is ignored") + (#'websocket/process-message :ch-a {:op :result :value "right"}) + (is (= "right" (deref response 100 ::timeout)) "the eval's client answers it")))) + +(deftest disconnect-unblocks-pending-eval + (testing "the eval's client disconnecting unblocks the pending eval" + (let [response (promise)] + (reset! @#'websocket/pending-eval {:channel :ch-a :promise response}) + (#'websocket/on-client-disconnect :ch-b) + (is (not (realized? response)) "an unrelated disconnect leaves the eval pending") + (#'websocket/on-client-disconnect :ch-a) + (is (= :exception (:status (deref response 100 ::timeout))) + "the eval reports an exception instead of hanging")))) + +(deftest eval-to-closed-channel-errors + (testing "evaluating against an already-closed client returns an error, not a hang" + (with-redefs [server/active-channel (fn [] :closed-channel) + server/send-to! (fn [_ _] false)] + (is (= :exception (:status (#'websocket/websocket-eval "1 + 1"))))))) From 7aef098a8fbe72b710608a0717608f1d70990827 Mon Sep 17 00:00:00 2001 From: Bozhidar Batsov Date: Sat, 27 Jun 2026 15:17:24 +0300 Subject: [PATCH 2/3] Cover takeover and coexistence in the integration test The Node client can stamp a global id; the harness connects a second client, asserts the eval targets it, and then checks both clients keep their heartbeats answered so neither reconnects. --- dev/weasel/integration.clj | 39 ++++++++++++++++++++++++------- test/cljs/weasel/node_client.cljs | 10 ++++++-- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/dev/weasel/integration.clj b/dev/weasel/integration.clj index b4d8509..dcb5709 100644 --- a/dev/weasel/integration.clj +++ b/dev/weasel/integration.clj @@ -6,6 +6,7 @@ * the heartbeat (the client keeps pinging, the server pongs, and the connection is not torn down) * auto-reconnect after the server is bounced + * takeover: a second client becomes the active eval target Run with: @@ -26,7 +27,7 @@ (defn- fresh-round! [] (reset! signals {:ready (promise) :printed (promise) :result (promise)})) -(defn- handle [data] +(defn- handle [channel data] (let [msg (edn/read-string data) {:keys [ready printed result]} @signals] (case (:op msg) @@ -34,7 +35,8 @@ :print (deliver printed (:value msg)) :result (deliver result (:value msg)) :ping (do (swap! ping-count inc) - (server/send! (pr-str {:op :pong}))) + ;; pong back to the client that pinged, like the real server + (server/send-to! channel (pr-str {:op :pong}))) nil))) (defn- await! [what p] @@ -46,20 +48,24 @@ (defn- wait-for-client! [what] ;; like server/wait-for-client, but bounded so a broken reconnect fails the ;; run instead of hanging forever - (when (= ::timeout (deref (server/channel) 10000 ::timeout)) + (when (= ::timeout (deref (server/ready) 10000 ::timeout)) (throw (ex-info (str "timed out waiting for " what) {})))) (defn- eval! [code] (server/send! (pr-str {:op :eval-js :code code}))) +(defn- ^Process launch-client [id] + (let [^"[Ljava.lang.String;" cmd (into-array String + ["node" client-js (str "ws://127.0.0.1:" port) + (str heartbeat-ms) id])] + (-> (ProcessBuilder. cmd) (.inheritIO) (.start)))) + (defn -main [& _] (let [ok? (atom false)] (fresh-round!) (server/start handle :ip "127.0.0.1" :port port) - (let [^"[Ljava.lang.String;" cmd (into-array String - ["node" client-js (str "ws://127.0.0.1:" port) - (str heartbeat-ms)]) - proc (-> (ProcessBuilder. cmd) (.inheritIO) (.start))] + (let [proc (launch-client "A") + proc-b (atom nil)] (try ;; round 1 - result and print travel back over the socket (wait-for-client! "client to connect") @@ -90,13 +96,30 @@ (assert (= "42" (:value (await! ":result (after reconnect)" (:result @signals)))) "reconnected eval returned the wrong value") + ;; round 3 - a second client connects and takes over as the eval target; + ;; client A stays connected (passive) + (fresh-round!) + (reset! proc-b (launch-client "B")) + (await! ":ready (client B)" (:ready @signals)) + (let [ready-at-takeover @ready-count] + (eval! "globalThis.CLIENT_ID") + (assert (= "B" (:value (await! ":result (takeover)" (:result @signals)))) + "eval did not target the most recently connected client") + ;; both clients keep pinging; with pongs routed to the pinger neither + ;; is torn down, so no client reconnects (which would bump ready-count) + (Thread/sleep (long (* 5 heartbeat-ms))) + (assert (= ready-at-takeover @ready-count) + (str "a client reconnected during coexistence (pong misrouting?); " + ":ready count went " ready-at-takeover " -> " @ready-count))) + (reset! ok? true) (println (str "PASS - eval, print, heartbeat (" @ping-count - " pings) and reconnect all verified")) + " pings), reconnect and takeover all verified")) (catch Throwable e (println "FAIL:" (.getMessage e))) (finally (.destroy proc) + (when-let [^Process p @proc-b] (.destroy p)) (server/stop)))) (when-not @ok? (System/exit 1)))) diff --git a/test/cljs/weasel/node_client.cljs b/test/cljs/weasel/node_client.cljs index 1d7cbb6..90be099 100644 --- a/test/cljs/weasel/node_client.cljs +++ b/test/cljs/weasel/node_client.cljs @@ -2,12 +2,18 @@ "A tiny Node entry point used by the integration test to prove that the REPL client works outside the browser, on a native `WebSocket`. - Args: [heartbeat-interval-ms]" + Args: [heartbeat-interval-ms] [client-id] + + When a client-id is given it is stashed on the global object so the test can + evaluate `globalThis.CLIENT_ID` and confirm which client handled the eval." (:require [weasel.repl :as repl])) (defn -main [& args] (let [url (or (first args) "ws://127.0.0.1:9001") - hb (some-> (second args) (js/parseInt 10))] + hb (some-> (second args) (js/parseInt 10)) + id (nth args 2 nil)] + (when id + (set! (.-CLIENT_ID js/globalThis) id)) (apply repl/connect url :verbose false (if hb [:heartbeat-interval hb] [])))) From e9ce9b247bb9337779eaf049cdd2f7a135273df5 Mon Sep 17 00:00:00 2001 From: Bozhidar Batsov Date: Sat, 27 Jun 2026 15:17:24 +0300 Subject: [PATCH 3/3] Document multi-client takeover --- CHANGES.md | 5 +++++ README.md | 8 +++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 47497ac..366ee90 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,6 +29,11 @@ silently dead connection and triggers a reconnect. It is off by default and enabled via the `:heartbeat-interval` option; it never disrupts a server that doesn't answer pings. +* The server no longer rejects a second client. Several clients may be connected + at once; evaluations go to the most recently connected one (so a new client + takes over the REPL) while the others stay connected and their output still + reaches the REPL. An evaluation whose target client disconnects mid-flight now + reports an error instead of hanging the REPL. * Ship a `deps.edn` so the library can be consumed via the Clojure CLI / tools.deps. * Add a GitHub Actions CI pipeline and a basic test suite, including a Node round-trip integration test that exercises the full eval cycle over a real diff --git a/README.md b/README.md index 5cf51c4..aa764e2 100644 --- a/README.md +++ b/README.md @@ -141,9 +141,11 @@ java.io.IOException: No client connected to Websocket nil ``` -Only a single client can be connected to the REPL at once. Attempting -to connect to an occupied REPL server will throw an exception in the -client. +More than one client may be connected at once. Evaluations are sent to the +most recently connected client, so a newly connected client takes over the +REPL; the others stay connected and their printed output still reaches the +REPL. This pairs naturally with auto-reconnect - a client that drops and +comes back simply becomes the active one again. ## Example