diff --git a/.gitignore b/.gitignore index f0e629bf..a241bb1e 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,4 @@ RxCodeMobile/GoogleService-Info.plist RxCodeAndroid/app/google-services.json .env *.log +android/.idea diff --git a/Packages/Sources/RxCodeSync/Protocol/ICEPayloads.swift b/Packages/Sources/RxCodeSync/Protocol/ICEPayloads.swift new file mode 100644 index 00000000..ff45edd0 --- /dev/null +++ b/Packages/Sources/RxCodeSync/Protocol/ICEPayloads.swift @@ -0,0 +1,64 @@ +import Foundation + +/// Direct-path (P2P) signaling payloads. +/// +/// These are exchanged between two paired devices **over the relay** (the always- +/// warm signaling channel) to negotiate a direct connection, then consumed +/// entirely inside `PeerConnectionManager`. They never reach the app layer — an +/// older peer that predates this feature decodes them as `Payload.unknown` and +/// ignores them, so no version gating is required. + +/// A single reachable address a peer offers for a direct connection. +public struct ICECandidate: Codable, Sendable, Equatable { + public enum Kind: String, Codable, Sendable { + /// Discovered via Bonjour; connect by resolving the service name. + case lanBonjour + /// A raw local interface IP:port (mDNS-blocked subnets). Phase 2. + case lanInterface + /// A mapped public IP:port obtained via NAT-PMP/PCP. Phase 3. + case wanMapped + } + + public let kind: Kind + /// IP literal for `.lanInterface`/`.wanMapped`; unused for `.lanBonjour`. + public let host: String + public let port: Int + /// Bonjour service instance name, set only for `.lanBonjour`. + public let bonjourName: String? + + public init(kind: Kind, host: String, port: Int, bonjourName: String? = nil) { + self.kind = kind + self.host = host + self.port = port + self.bonjourName = bonjourName + } +} + +/// The set of candidates one peer offers for a given probing round. +public struct ICECandidatesPayload: Codable, Sendable { + /// Groups a probing round so stale offers can be ignored. + public let sessionID: UUID + public let candidates: [ICECandidate] + /// What the sender believes its own public IP is (Phase 3; nil until then). + public let observedPublicIP: String? + + public init(sessionID: UUID, candidates: [ICECandidate], observedPublicIP: String? = nil) { + self.sessionID = sessionID + self.candidates = candidates + self.observedPublicIP = observedPublicIP + } +} + +/// Handshake probe/echo sent **over a candidate direct link itself** to confirm +/// it is truly usable (not a half-open NAT that TCP-connects then black-holes) +/// before the coordinator promotes it to the active path. +public struct ICESelectedPayload: Codable, Sendable { + public let sessionID: UUID + /// `false` = probe from the initiator; `true` = echo reply from the peer. + public let echo: Bool + + public init(sessionID: UUID, echo: Bool) { + self.sessionID = sessionID + self.echo = echo + } +} diff --git a/Packages/Sources/RxCodeSync/Protocol/Payload.swift b/Packages/Sources/RxCodeSync/Protocol/Payload.swift index cd3286d7..068eefc4 100644 --- a/Packages/Sources/RxCodeSync/Protocol/Payload.swift +++ b/Packages/Sources/RxCodeSync/Protocol/Payload.swift @@ -70,6 +70,10 @@ public enum Payload: Sendable { case autopilotResult(AutopilotResultPayload) case ping(PingPayload) case pong(PongPayload) + // Direct-path (P2P) signaling — exchanged over the relay, consumed inside + // `PeerConnectionManager`, never surfaced to the app layer. See ICEPayloads.swift. + case iceCandidates(ICECandidatesPayload) + case iceSelected(ICESelectedPayload) case unknown(type: String) } @@ -139,6 +143,8 @@ public extension Payload { case .autopilotResult: return "autopilot_result" case .ping: return "ping" case .pong: return "pong" + case .iceCandidates: return "ice_candidates" + case .iceSelected: return "ice_selected" case .unknown(let type): return type } } @@ -786,6 +792,8 @@ extension Payload: Codable { case autopilotResult = "autopilot_result" case ping case pong + case iceCandidates = "ice_candidates" + case iceSelected = "ice_selected" } public init(from decoder: Decoder) throws { @@ -859,6 +867,8 @@ extension Payload: Codable { case .autopilotResult: self = .autopilotResult(try container.decode(AutopilotResultPayload.self, forKey: .data)) case .ping: self = .ping(try container.decode(PingPayload.self, forKey: .data)) case .pong: self = .pong(try container.decode(PongPayload.self, forKey: .data)) + case .iceCandidates: self = .iceCandidates(try container.decode(ICECandidatesPayload.self, forKey: .data)) + case .iceSelected: self = .iceSelected(try container.decode(ICESelectedPayload.self, forKey: .data)) } } @@ -928,6 +938,8 @@ extension Payload: Codable { case .autopilotResult(let p): try container.encode(TypeKey.autopilotResult.rawValue, forKey: .type); try container.encode(p, forKey: .data) case .ping(let p): try container.encode(TypeKey.ping.rawValue, forKey: .type); try container.encode(p, forKey: .data) case .pong(let p): try container.encode(TypeKey.pong.rawValue, forKey: .type); try container.encode(p, forKey: .data) + case .iceCandidates(let p): try container.encode(TypeKey.iceCandidates.rawValue, forKey: .type); try container.encode(p, forKey: .data) + case .iceSelected(let p): try container.encode(TypeKey.iceSelected.rawValue, forKey: .type); try container.encode(p, forKey: .data) case .unknown(let type): try container.encode(type, forKey: .type) } } diff --git a/Packages/Sources/RxCodeSync/SyncClient.swift b/Packages/Sources/RxCodeSync/SyncClient.swift index bd98d823..e4fddf95 100644 --- a/Packages/Sources/RxCodeSync/SyncClient.swift +++ b/Packages/Sources/RxCodeSync/SyncClient.swift @@ -1,65 +1,334 @@ import Foundation import CryptoKit +import Network +import os -/// High-level facade used by both the desktop service and the iOS app. +/// High-level facade used by both the desktop service and the iOS app, and the +/// multi-path hub. /// -/// Owns a `RelayClient` and translates app-level intents ("send this user -/// message", "respond to permission request") into encrypted envelopes -/// addressed to a paired peer. -public actor SyncClient { +/// Owns the always-warm `RelayClient` plus, when `directPathsEnabled`, a +/// `PeerConnectionManager` per paired peer, the Bonjour advertiser/browser, an +/// `NWPathMonitor`, and a NAT-PMP `PortMapper`. It is the *sole subscriber* of +/// the relay and every `DirectTransport`, and merges their events into a single +/// upward stream — so `MobileSyncService` / `MobileAppState` keep consuming one +/// `events()` stream exactly as before. +/// +/// When `directPathsEnabled == false` the hub is a pure pass-through: `events()` +/// returns the relay's stream and `send` goes straight to the relay, preserving +/// the relay-only behavior byte-for-byte. +public actor SyncClient: PeerManagerHost { public let identity: DeviceIdentity public let relayURL: URL + public let directPathsEnabled: Bool private let relay: RelayClient + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "SyncClient") /// Map of paired peer pubkey-hex to their `Curve25519` public key. - /// Desktop usually has 1..n entries (one per paired mobile); mobile has 1 - /// (the paired desktop). private var peers: [String: Curve25519.KeyAgreement.PublicKey] = [:] - public init(identity: DeviceIdentity, relayURL: URL) { + // Direct-path state (only populated when `directPathsEnabled`). + private var managers: [String: PeerConnectionManager] = [:] + private var advertiser: BonjourAdvertiser? + private var browser: BonjourBrowser? + private var pathMonitor: NWPathMonitor? + private let portMapper = PortMapper() + private var currentWANMapping: PortMapping? + private var wanRenewTask: Task? + private var hubStarted = false + private var pumpTasks: [UUID: Task] = [:] + private var continuations: [UUID: AsyncStream.Continuation] = [:] + + public init(identity: DeviceIdentity, relayURL: URL, directPathsEnabled: Bool = false) { self.identity = identity self.relayURL = relayURL + self.directPathsEnabled = directPathsEnabled self.relay = RelayClient(identity: identity, relayURL: relayURL) } public func start() async { - await relay.connect() + // `reconnect()` (not `connect()`) so a stale socket left behind by an + // OS suspend is torn down and reopened. At initial startup `task` is nil, + // so this behaves exactly like a plain connect. + await relay.reconnect() + guard directPathsEnabled else { return } + startHubIfNeeded() + // `stop()` cancelled every per-peer event pump and dropped the managers; + // rebuild them from the persisted peer set so direct-path links — and the + // inbound data (snapshots, history) they carry — come back after a + // background cycle. Without this the LAN link re-promotes but its events + // reach no subscriber, and the app appears connected yet loads nothing. + for (hex, key) in peers { startManager(forHex: hex, key: key) } } public func stop() async { await relay.disconnect() + for task in pumpTasks.values { task.cancel() } + pumpTasks.removeAll() + wanRenewTask?.cancel(); wanRenewTask = nil + pathMonitor?.cancel(); pathMonitor = nil + await advertiser?.stop() + await browser?.stop() + // Drop the per-peer managers. Their event pumps were just cancelled and + // their direct sockets die when the OS suspends us; tearing them down and + // rebuilding on `start()` avoids reusing a manager stuck with a dead + // `active` transport (which blocks redial and silently swallows data). + for manager in managers.values { await manager.teardown() } + managers.removeAll() + hubStarted = false } public func events() async -> AsyncStream { - await relay.events() + guard directPathsEnabled else { return await relay.events() } + return AsyncStream { continuation in + let id = UUID() + self.continuations[id] = continuation + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removeContinuation(id) } + } + } } public func addPeer(_ pubkeyHex: String) throws { guard let raw = Data(hexString: pubkeyHex) else { throw SyncError.invalidPubkey } let key = try Curve25519.KeyAgreement.PublicKey(rawRepresentation: raw) peers[pubkeyHex] = key + startManager(forHex: pubkeyHex, key: key) + } + + /// Create the per-peer manager and its event pump, unless direct paths are + /// disabled or one is already running. Idempotent: called both when a peer is + /// added and when `start()` rebuilds managers after a background cycle. + private func startManager(forHex pubkeyHex: String, key: Curve25519.KeyAgreement.PublicKey) { + guard directPathsEnabled, managers[pubkeyHex] == nil else { return } + let manager = PeerConnectionManager(peerHex: pubkeyHex, peerKey: key, identity: identity, relay: relay, host: self) + managers[pubkeyHex] = manager + track { [weak self] in + let stream = await manager.events() + for await event in stream { await self?.emitUp(event) } + } + // A peer added while the relay is already connected (e.g. right after + // pairing, or on a foreground rebuild) would otherwise miss the + // `.stateChanged(.connected)` that triggers the first candidate offer. + // Offer immediately in that case. + track { [weak self, weak manager] in + guard let self, let manager else { return } + if await self.relayIsConnected { await manager.offerLocalCandidates() } + } + } + + private var relayIsConnected: Bool { + get async { await relay.state == .connected } } public func removePeer(_ pubkeyHex: String) { peers.removeValue(forKey: pubkeyHex) + managers.removeValue(forKey: pubkeyHex) } - public func peer(forHex hex: String) -> Curve25519.KeyAgreement.PublicKey? { - peers[hex] - } + public func peer(forHex hex: String) -> Curve25519.KeyAgreement.PublicKey? { peers[hex] } - /// Send `payload` to a single paired peer. + /// Send `payload` to a single paired peer over its best available path. public func send(_ payload: Payload, toHex hex: String) async throws { guard let key = peers[hex] else { throw SyncError.unknownPeer } - try await relay.send(payload, to: key) + if directPathsEnabled, let manager = managers[hex] { + try await manager.send(payload) + } else { + try await relay.send(payload, to: key) + } } /// Broadcast `payload` to every paired peer (e.g. notification fan-out). public func broadcast(_ payload: Payload) async { - for (_, key) in peers { - try? await relay.send(payload, to: key) + for (hex, key) in peers { + if directPathsEnabled, let manager = managers[hex] { + try? await manager.send(payload) + } else { + try? await relay.send(payload, to: key) + } + } + } + + // MARK: - PeerManagerHost + + public func dial(_ endpoint: NWEndpoint, priority: Int, pathKind: ConnectionPathKind, for manager: PeerConnectionManager) async { + guard let key = peers[manager.peerHex] else { return } + let transport = DirectTransport(outboundTo: endpoint, identity: identity, expectedPeer: key) + await manager.registerProbe(transport, pathKind: pathKind) + track { [weak manager] in + let stream = await transport.events() + for await event in stream { await manager?.handleDirectEvent(event, from: transport) } + } + await transport.connect() + } + + public func localDirectCandidates() async -> [ICECandidate] { + guard let port = await advertiser?.listeningPort() else { return [] } + var candidates: [ICECandidate] = [] + for ip in NetworkInterfaces.localIPAddresses() { + candidates.append(ICECandidate(kind: .lanInterface, host: ip, port: Int(port))) + } + if let wan = currentWANMapping { + candidates.append(ICECandidate(kind: .wanMapped, host: wan.publicIP, port: Int(wan.externalPort))) + } + return candidates + } + + // MARK: - Hub plumbing + + private func startHubIfNeeded() { + guard !hubStarted else { return } + hubStarted = true + + // 1. Relay pump: intercept ICE signaling, forward everything else up, and + // trigger candidate offers when the relay (re)connects. + track { [weak self] in + guard let self else { return } + let stream = await self.relay.events() + for await event in stream { await self.handleRelayEvent(event) } + } + + // 2. Bonjour advertise + accept inbound direct links. + let advertiser = BonjourAdvertiser(localPubkeyHex: identity.publicKeyHex) + self.advertiser = advertiser + track { [weak self] in + let accepted = await advertiser.acceptedConnections() + await advertiser.start() + for await connection in accepted { await self?.adoptInbound(connection) } + } + // Once the listener binds a port, (re)request the WAN mapping and re-offer + // candidates — the initial attempt runs before the port exists. + track { [weak self] in + let ready = await advertiser.portReady() + for await _ in ready { await self?.handleListenerReady() } + } + + // 3. Bonjour browse + dial discovered peers. + let browser = BonjourBrowser() + self.browser = browser + track { [weak self] in + let discoveries = await browser.discoveries() + await browser.start() + for await peer in discoveries { await self?.handleDiscovery(peer) } + } + + // 4. Watch for network changes to re-gather candidates and re-probe. + let monitor = NWPathMonitor() + self.pathMonitor = monitor + monitor.pathUpdateHandler = { [weak self] _ in + Task { await self?.handlePathChange() } + } + monitor.start(queue: DispatchQueue(label: "com.idealapp.RxCodeSync.pathmonitor")) + + // WAN mapping + candidate offers are kicked off from `handleListenerReady` + // once the Bonjour listener actually binds a port (step 2 above). + } + + /// The Bonjour listener bound a port: (re)request a WAN mapping and re-offer + /// candidates to every peer so both interface and WAN candidates propagate. + private func handleListenerReady() async { + await refreshWANMapping() + for manager in managers.values { + await manager.offerLocalCandidates() + } + } + + private func handleRelayEvent(_ event: RelayClient.Event) async { + switch event { + case .inbound(let inbound) where Self.isICESignaling(inbound.payload): + await managers[inbound.fromHex]?.handleRelayICE(inbound.payload) + return + case .stateChanged(.connected): + for manager in managers.values { + Task { await manager.offerLocalCandidates() } + } + default: + break + } + emitUp(event) + } + + private func handlePathChange() async { + await refreshWANMapping() + for manager in managers.values { + await manager.networkChanged() + } + } + + /// Ask the gateway for a public IP:port mapping to our direct listener, then + /// schedule a renewal before it expires. No mapping ⇒ WAN-direct simply isn't + /// offered and the relay carries traffic. + private func refreshWANMapping() async { + guard let port = await advertiser?.listeningPort() else { return } + let mapping = await portMapper.requestMapping(internalPort: port) + currentWANMapping = mapping + wanRenewTask?.cancel() + guard let mapping, mapping.lifetimeSeconds > 0 else { return } + // Renew at half the granted lifetime. + let renewAfter = max(60, Int(mapping.lifetimeSeconds) / 2) + wanRenewTask = Task { [weak self] in + try? await Task.sleep(nanoseconds: UInt64(renewAfter) * 1_000_000_000) + await self?.refreshWANMapping() + } + } + + /// Dial a peer discovered on the LAN. + private func handleDiscovery(_ peer: DiscoveredDirectPeer) async { + guard let manager = managers[peer.pubkeyHex] else { return } + await manager.considerBonjourEndpoint(peer.endpoint) + } + + /// Adopt an accepted inbound connection, binding it to a peer on the first + /// authenticated frame (`Envelope.from` is cleartext, so the peer is known + /// before any plaintext is exposed). + private func adoptInbound(_ connection: NWConnection) async { + let transport = DirectTransport(adopting: connection, identity: identity) + track { [weak self] in + let stream = await transport.events() + var bound: PeerConnectionManager? + for await event in stream { + if bound == nil, case .inbound(let inbound) = event { + guard let manager = await self?.manager(forHex: inbound.fromHex), + await manager.beginAdoptIfIdle(transport) else { + await transport.disconnect() + return + } + bound = manager + } + if let manager = bound { + await manager.handleDirectEvent(event, from: transport) + } + } + } + await transport.connect() + } + + private func manager(forHex hex: String) -> PeerConnectionManager? { managers[hex] } + + private static func isICESignaling(_ payload: Payload) -> Bool { + switch payload { + case .iceCandidates, .iceSelected: return true + default: return false } } + + /// Spawn a self-removing pump task so finished direct-link pumps don't + /// accumulate over reconnect/discovery churn. + private func track(_ body: @escaping @Sendable () async -> Void) { + let id = UUID() + let task = Task { [weak self] in + await body() + await self?.untrack(id) + } + pumpTasks[id] = task + } + + private func untrack(_ id: UUID) { pumpTasks.removeValue(forKey: id) } + + private func emitUp(_ event: RelayClient.Event) { + for c in continuations.values { c.yield(event) } + } + + private func removeContinuation(_ id: UUID) { continuations.removeValue(forKey: id) } } public enum SyncError: Error, Sendable { diff --git a/Packages/Sources/RxCodeSync/Transport/BonjourService.swift b/Packages/Sources/RxCodeSync/Transport/BonjourService.swift new file mode 100644 index 00000000..7bc9162e --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/BonjourService.swift @@ -0,0 +1,173 @@ +import Foundation +import Network +import os + +/// The Bonjour service type both devices advertise/browse for LAN discovery. +public let rxcodeSyncBonjourType = "_rxcode-sync._tcp" + +/// A peer discovered on the LAN, matched to a paired pubkey via its TXT record. +public struct DiscoveredDirectPeer: Sendable { + public let pubkeyHex: String + public let endpoint: NWEndpoint +} + +/// Advertises a `_rxcode-sync._tcp` listener carrying our pubkey in TXT, and +/// streams inbound connections a dialing peer opens against it. On the desktop +/// this is how a mobile's direct link is accepted; the coordinator adopts each +/// connection into a `DirectTransport` and routes it once the peer is known. +public actor BonjourAdvertiser { + private let localPubkeyHex: String + private let queue = DispatchQueue(label: "com.idealapp.RxCodeSync.bonjour.listen") + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "Bonjour") + + private var listener: NWListener? + private var continuations: [UUID: AsyncStream.Continuation] = [:] + private var portReadyContinuations: [UUID: AsyncStream.Continuation] = [:] + private var boundPort: UInt16? + + public init(localPubkeyHex: String) { + self.localPubkeyHex = localPubkeyHex + } + + /// A stream of inbound connections accepted by the listener. + public func acceptedConnections() -> AsyncStream { + AsyncStream { continuation in + let id = UUID() + self.continuations[id] = continuation + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removeContinuation(id) } + } + } + } + + /// Emits the OS-assigned TCP port once the listener reaches `.ready`. Lets the + /// hub kick off WAN port mapping and candidate offers only after the port is + /// actually bound (it's `nil` until then). + public func portReady() -> AsyncStream { + AsyncStream { continuation in + let id = UUID() + self.portReadyContinuations[id] = continuation + // If already bound, replay immediately. + if let boundPort { continuation.yield(boundPort) } + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removePortReadyContinuation(id) } + } + } + } + + public func start() { + guard listener == nil else { return } + do { + let listener = try NWListener(using: .tcp) + var txt = NWTXTRecord() + txt["pk"] = localPubkeyHex + txt["v"] = "1" + listener.service = NWListener.Service(type: rxcodeSyncBonjourType, txtRecord: txt) + listener.newConnectionHandler = { [weak self] connection in + // Do NOT start the connection here — the adopting `DirectTransport` + // owns its lifecycle and calls `start` in `connect()`. Starting it + // twice is a double-start on the same NWConnection. + Task { await self?.yield(connection) } + } + listener.stateUpdateHandler = { [weak self] state in + switch state { + case .ready: + Task { await self?.handleListenerReady() } + case .failed(let error): + Task { await self?.logFailure(error) } + default: + break + } + } + self.listener = listener + listener.start(queue: queue) + logger.info("[Bonjour] advertising \(rxcodeSyncBonjourType, privacy: .public) pk=\(String(self.localPubkeyHex.prefix(12)), privacy: .public)") + } catch { + logger.error("[Bonjour] advertiser start failed error=\(error.localizedDescription, privacy: .public)") + } + } + + public func stop() { + listener?.cancel() + listener = nil + } + + /// The OS-assigned TCP port the listener is bound to, once ready. Used to + /// build raw-interface and WAN ICE candidates that point at this listener. + public func listeningPort() -> UInt16? { + listener?.port?.rawValue + } + + private func yield(_ connection: NWConnection) { + for c in continuations.values { c.yield(connection) } + } + + private func handleListenerReady() { + guard let port = listener?.port?.rawValue else { return } + boundPort = port + logger.info("[Bonjour] listener ready port=\(port, privacy: .public)") + for c in portReadyContinuations.values { c.yield(port) } + } + + private func removeContinuation(_ id: UUID) { continuations.removeValue(forKey: id) } + private func removePortReadyContinuation(_ id: UUID) { portReadyContinuations.removeValue(forKey: id) } + private func logFailure(_ error: NWError) { + logger.error("[Bonjour] listener failed error=\(error.localizedDescription, privacy: .public)") + } +} + +/// Browses for `_rxcode-sync._tcp` peers and streams those whose TXT `pk` +/// matches a paired device. The endpoint it yields is dialed directly by a +/// `DirectTransport`. +public actor BonjourBrowser { + private let queue = DispatchQueue(label: "com.idealapp.RxCodeSync.bonjour.browse") + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "Bonjour") + + private var browser: NWBrowser? + private var continuations: [UUID: AsyncStream.Continuation] = [:] + + public init() {} + + public func discoveries() -> AsyncStream { + AsyncStream { continuation in + let id = UUID() + self.continuations[id] = continuation + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removeContinuation(id) } + } + } + } + + public func start() { + guard browser == nil else { return } + let params = NWParameters() + params.includePeerToPeer = true + let descriptor = NWBrowser.Descriptor.bonjourWithTXTRecord(type: rxcodeSyncBonjourType, domain: nil) + let browser = NWBrowser(for: descriptor, using: params) + browser.browseResultsChangedHandler = { [weak self] results, _ in + Task { await self?.handleResults(results) } + } + self.browser = browser + browser.start(queue: queue) + logger.info("[Bonjour] browsing \(rxcodeSyncBonjourType, privacy: .public)") + } + + public func stop() { + browser?.cancel() + browser = nil + } + + private func handleResults(_ results: Set) { + for result in results { + guard case let .bonjour(txt) = result.metadata, + let pk = txt["pk"], !pk.isEmpty else { continue } + emit(DiscoveredDirectPeer(pubkeyHex: pk, endpoint: result.endpoint)) + } + } + + private func emit(_ peer: DiscoveredDirectPeer) { + for c in continuations.values { c.yield(peer) } + } + + private func removeContinuation(_ id: UUID) { continuations.removeValue(forKey: id) } +} diff --git a/Packages/Sources/RxCodeSync/Transport/DirectTransport.swift b/Packages/Sources/RxCodeSync/Transport/DirectTransport.swift new file mode 100644 index 00000000..272acb21 --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/DirectTransport.swift @@ -0,0 +1,223 @@ +import Foundation +import CryptoKit +import Network +import os + +/// A direct TCP `Transport` between two paired devices over `Network.framework`. +/// +/// Carries the **identical** `Envelope` JSON the relay carries — same +/// `EnvelopeCodec`, same `SessionCrypto` E2E sealing — so a direct link has the +/// same confidentiality/authenticity guarantee as the relay. The only +/// differences from `RelayClient` are the socket (`NWConnection` vs WebSocket) +/// and framing: raw TCP has no message boundaries, so each envelope is prefixed +/// with a 4-byte big-endian length. +/// +/// Peer authentication falls out of the crypto: an envelope from the wrong peer +/// fails `SessionCrypto.open` and is dropped. As defence in depth we also drop +/// any inbound whose `from` doesn't match `expectedPeerHex`. +public actor DirectTransport: Transport { + /// Frame ceiling mirroring `RelayClient` — reject anything larger as a + /// malformed/hostile length prefix rather than allocating for it. + private static let maxFrameSize = 10 * 1024 * 1024 + private static let lengthPrefixBytes = 4 + + private let identity: DeviceIdentity + /// The peer this link is bound to. Known upfront when we dial (`outboundTo`); + /// `nil` until the first inbound frame when we adopt an accepted connection. + private var expectedPeerHex: String? + private let queue: DispatchQueue + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "DirectTransport") + + private var connection: NWConnection? + private(set) public var state: TransportConnectionState = .disconnected + private var continuations: [UUID: AsyncStream.Continuation] = [:] + private var started = false + + /// The peer hex this link is bound to, once known. + public var peerHex: String? { expectedPeerHex } + + /// Dial a remote candidate endpoint whose peer identity we already know. + public init( + outboundTo endpoint: NWEndpoint, + identity: DeviceIdentity, + expectedPeer: Curve25519.KeyAgreement.PublicKey + ) { + self.identity = identity + self.expectedPeerHex = expectedPeer.rawRepresentation.hexString + self.queue = DispatchQueue(label: "com.idealapp.RxCodeSync.direct.out") + let params = NWParameters.tcp + self.connection = NWConnection(to: endpoint, using: params) + } + + /// Adopt an inbound connection accepted by a `BonjourAdvertiser`'s listener. + /// The bound peer is learned from the first inbound envelope's cleartext + /// `from` (the coordinator routes on `peerHex`). + public init( + adopting connection: NWConnection, + identity: DeviceIdentity + ) { + self.identity = identity + self.expectedPeerHex = nil + self.queue = DispatchQueue(label: "com.idealapp.RxCodeSync.direct.in") + self.connection = connection + } + + // MARK: - Transport + + public func events() -> AsyncStream { + AsyncStream { continuation in + let id = UUID() + self.continuations[id] = continuation + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removeContinuation(id) } + } + } + } + + public func connect() { + guard let connection, !started else { return } + started = true + updateState(.connecting) + connection.stateUpdateHandler = { [weak self] newState in + Task { await self?.handleConnectionState(newState) } + } + connection.start(queue: queue) + } + + public func disconnect() { + started = false + connection?.cancel() + connection = nil + if state != .disconnected { updateState(.disconnected) } + finishContinuations() + } + + public func send(_ payload: Payload, to recipient: Curve25519.KeyAgreement.PublicKey) async throws { + guard let connection, state == .connected else { throw RelayError.notConnected } + let body = try EnvelopeCodec.encode(payload, from: identity, to: recipient) + guard body.count <= Self.maxFrameSize else { throw DirectTransportError.frameTooLarge } + var frame = Data(count: Self.lengthPrefixBytes) + let length = UInt32(body.count).bigEndian + withUnsafeBytes(of: length) { frame.replaceSubrange(0..) in + connection.send(content: frame, completion: .contentProcessed { error in + if let error { cont.resume(throwing: error) } else { cont.resume() } + }) + } + } + + // MARK: - Private + + private func removeContinuation(_ id: UUID) { continuations.removeValue(forKey: id) } + + private func emit(_ event: TransportEvent) { + for c in continuations.values { c.yield(event) } + } + + private func updateState(_ newState: TransportConnectionState) { + state = newState + emit(.stateChanged(newState)) + } + + private func handleConnectionState(_ newState: NWConnection.State) { + switch newState { + case .ready: + logger.info("[Direct] connection ready peer=\(String((self.expectedPeerHex ?? "pending").prefix(12)), privacy: .public)") + updateState(.connected) + receiveNextFrame() + case .failed(let error), .waiting(let error): + logger.error("[Direct] connection failed peer=\(String((self.expectedPeerHex ?? "pending").prefix(12)), privacy: .public) error=\(error.localizedDescription, privacy: .public)") + teardown() + case .cancelled: + teardown() + default: + break + } + } + + private func teardown() { + guard state != .disconnected else { return } + started = false + connection?.cancel() + connection = nil + updateState(.disconnected) + // Finish the event stream so the hub's per-connection pump loop exits + // and stops retaining this transport — otherwise every dropped/failed + // direct link would leak a Task and a zombie NWConnection wrapper. + finishContinuations() + } + + private func finishContinuations() { + for c in continuations.values { c.finish() } + continuations.removeAll() + } + + /// Read a 4-byte length, then that many bytes, then loop. + private func receiveNextFrame() { + guard let connection else { return } + connection.receive( + minimumIncompleteLength: Self.lengthPrefixBytes, + maximumLength: Self.lengthPrefixBytes + ) { [weak self] data, _, isComplete, error in + Task { await self?.handleLengthPrefix(data, isComplete: isComplete, error: error) } + } + } + + private func handleLengthPrefix(_ data: Data?, isComplete: Bool, error: NWError?) { + if let error { + logger.error("[Direct] receive length failed error=\(error.localizedDescription, privacy: .public)") + teardown(); return + } + guard let data, data.count == Self.lengthPrefixBytes else { + if isComplete { teardown() } + return + } + let length = data.withUnsafeBytes { UInt32(bigEndian: $0.loadUnaligned(as: UInt32.self)) } + guard length > 0, Int(length) <= Self.maxFrameSize else { + logger.error("[Direct] rejecting frame length=\(length, privacy: .public)") + teardown(); return + } + receiveBody(length: Int(length)) + } + + private func receiveBody(length: Int) { + guard let connection else { return } + connection.receive(minimumIncompleteLength: length, maximumLength: length) { [weak self] data, _, isComplete, error in + Task { await self?.handleBody(data, isComplete: isComplete, error: error) } + } + } + + private func handleBody(_ data: Data?, isComplete: Bool, error: NWError?) { + if let error { + logger.error("[Direct] receive body failed error=\(error.localizedDescription, privacy: .public)") + teardown(); return + } + if let data { + switch EnvelopeCodec.decode(data, localIdentity: identity) { + case .inbound(let inbound): + if let bound = expectedPeerHex { + // Defence in depth: only accept the peer this link is for. + if inbound.fromHex == bound { + emit(.inbound(inbound)) + } else { + logger.warning("[Direct] dropping inbound from unexpected peer=\(String(inbound.fromHex.prefix(12)), privacy: .public)") + } + } else { + // Adopted inbound: bind to the peer that authenticated first. + expectedPeerHex = inbound.fromHex + logger.info("[Direct] adopted link bound to peer=\(String(inbound.fromHex.prefix(12)), privacy: .public)") + emit(.inbound(inbound)) + } + case .deliveryFailed, .drop: + break + } + } + if isComplete { teardown(); return } + receiveNextFrame() + } +} + +public enum DirectTransportError: Error, Sendable { + case frameTooLarge +} diff --git a/Packages/Sources/RxCodeSync/Transport/EnvelopeCodec.swift b/Packages/Sources/RxCodeSync/Transport/EnvelopeCodec.swift new file mode 100644 index 00000000..93c2ea3b --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/EnvelopeCodec.swift @@ -0,0 +1,80 @@ +import Foundation +import CryptoKit +import os + +/// Shared `Envelope` encode/decode used by every `Transport`. +/// +/// The wire unit on all paths — relay WebSocket or direct TCP — is the identical +/// `Envelope` JSON: a `Payload` sealed for the recipient with `SessionCrypto`. +/// Factoring it here means `RelayClient` and `DirectTransport` share one +/// implementation of the crypto, so a direct link has exactly the same E2E +/// guarantee as the relay (which is explicitly untrusted). +enum EnvelopeCodec { + private static let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "EnvelopeCodec") + + /// Result of decoding a raw inbound frame. + enum DecodeResult { + case inbound(TransportInbound) + case deliveryFailed(toHex: String) + /// Not addressed to us, malformed, or undecryptable — drop quietly. + case drop + } + + /// Encrypt `payload` for `recipient` and return the serialized `Envelope` JSON. + static func encode( + _ payload: Payload, + from identity: DeviceIdentity, + to recipient: Curve25519.KeyAgreement.PublicKey + ) throws -> Data { + let plaintext = try JSONEncoder().encode(payload) + let (nonce, ct) = try SessionCrypto.seal( + plaintext: plaintext, + sender: identity.privateKey, + recipient: recipient + ) + let envelope = Envelope( + to: recipient.rawRepresentation.hexString, + from: identity.publicKeyHex, + nonce: nonce, + ct: ct + ) + return try JSONEncoder().encode(envelope) + } + + /// Decode a raw inbound frame: a `DeliveryFailedNotice`, an `Envelope` we can + /// decrypt into a `Payload`, or nothing. + static func decode(_ raw: Data, localIdentity identity: DeviceIdentity) -> DecodeResult { + let decoder = JSONDecoder() + if let notice = try? decoder.decode(DeliveryFailedNotice.self, from: raw), + notice.type == "delivery_failed" { + return .deliveryFailed(toHex: notice.to) + } + guard let env = try? decoder.decode(Envelope.self, from: raw) else { + logger.warning("[Codec] dropping non-envelope message bytes=\(raw.count, privacy: .public)") + return .drop + } + guard let nonce = env.nonceData, + let ct = env.ciphertextData, + let fromRaw = Data(hexString: env.from), + let fromKey = try? Curve25519.KeyAgreement.PublicKey(rawRepresentation: fromRaw) + else { + logger.warning("[Codec] dropping malformed envelope from=\(String(env.from.prefix(12)), privacy: .public)") + return .drop + } + do { + let plaintext = try SessionCrypto.open( + ciphertext: ct, + nonce: nonce, + recipient: identity.privateKey, + sender: fromKey + ) + let payload = try decoder.decode(Payload.self, from: plaintext) + return .inbound(TransportInbound(from: fromKey, fromHex: env.from, payload: payload)) + } catch { + // Decrypt or decode failure means the sender isn't a paired peer we + // know how to talk to, OR the wire format drifted. Drop quietly. + logger.warning("[Codec] dropping encrypted payload from=\(String(env.from.prefix(12)), privacy: .public) error=\(error.localizedDescription, privacy: .public)") + return .drop + } + } +} diff --git a/Packages/Sources/RxCodeSync/Transport/NetworkInterfaces.swift b/Packages/Sources/RxCodeSync/Transport/NetworkInterfaces.swift new file mode 100644 index 00000000..dd1e2d86 --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/NetworkInterfaces.swift @@ -0,0 +1,57 @@ +import Foundation +import Network +#if canImport(Darwin) +import Darwin +#endif + +/// Enumerates this device's usable local IP addresses for raw-interface ICE +/// candidates — the fallback for subnets where mDNS/Bonjour is blocked but the +/// two devices still share a route. +enum NetworkInterfaces { + /// Non-loopback, non-link-local IPv4/IPv6 literals for up interfaces. + /// Excludes `169.254.*`, `fe80::*`, loopback, and down interfaces. + static func localIPAddresses() -> [String] { + var addresses: [String] = [] + var ifaddrPtr: UnsafeMutablePointer? + guard getifaddrs(&ifaddrPtr) == 0, let first = ifaddrPtr else { return [] } + defer { freeifaddrs(ifaddrPtr) } + + var cursor: UnsafeMutablePointer? = first + while let ptr = cursor { + defer { cursor = ptr.pointee.ifa_next } + let flags = Int32(ptr.pointee.ifa_flags) + guard (flags & Int32(IFF_UP)) != 0, + (flags & Int32(IFF_LOOPBACK)) == 0, + let addr = ptr.pointee.ifa_addr else { continue } + let family = addr.pointee.sa_family + guard family == UInt8(AF_INET) || family == UInt8(AF_INET6) else { continue } + + var host = [CChar](repeating: 0, count: Int(NI_MAXHOST)) + let result = getnameinfo( + addr, socklen_t(addr.pointee.sa_len), + &host, socklen_t(host.count), + nil, 0, NI_NUMERICHOST + ) + guard result == 0 else { continue } + let bytes = host.prefix { $0 != 0 }.map { UInt8(bitPattern: $0) } + var ip = String(decoding: bytes, as: UTF8.self) + // Strip the IPv6 scope id ("fe80::1%en0" → "fe80::1") before filtering. + if let pct = ip.firstIndex(of: "%") { ip = String(ip[.. Bool { + if ip == "127.0.0.1" || ip == "::1" { return false } + if ip.hasPrefix("169.254.") { return false } // IPv4 link-local + let lower = ip.lowercased() + if lower.hasPrefix("fe80:") { return false } // IPv6 link-local + if lower.hasPrefix("fc") || lower.hasPrefix("fd") { + // Unique-local IPv6 is routable within a site — keep it. + } + return true + } +} diff --git a/Packages/Sources/RxCodeSync/Transport/PeerConnectionManager.swift b/Packages/Sources/RxCodeSync/Transport/PeerConnectionManager.swift new file mode 100644 index 00000000..c699212b --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/PeerConnectionManager.swift @@ -0,0 +1,332 @@ +import Foundation +import CryptoKit +import Network +import os + +/// The hub (SyncClient) provides these services to each per-peer manager: it +/// owns transport creation/pumping and knows this device's local candidates. +public protocol PeerManagerHost: AnyObject, Sendable { + /// Create, register, pump, and connect an outbound direct link to `endpoint` + /// on the manager's behalf. Events are routed back via `handleDirectEvent`. + func dial(_ endpoint: NWEndpoint, priority: Int, pathKind: ConnectionPathKind, for manager: PeerConnectionManager) async + /// This device's current direct candidates (raw interface IPs + WAN mapping) + /// to advertise to the peer over the relay. + func localDirectCandidates() async -> [ICECandidate] +} + +/// Per-peer multi-path coordinator. +/// +/// Keeps the relay warm as the baseline and races direct candidates — LAN via +/// Bonjour, raw interface IPs, and a mapped WAN port — promoting the first that +/// completes an `.iceSelected` handshake echo. Fails over to the relay instantly +/// if the active direct link drops. +/// +/// To avoid two devices each opening a direct link (a wasteful double +/// connection), a deterministic role split applies: the device with the +/// lexicographically smaller pubkey is the **dialer** (it connects); the other +/// **advertises and accepts**. Both still exchange candidates over the relay. +public actor PeerConnectionManager { + public let peerHex: String + private let peerKey: Curve25519.KeyAgreement.PublicKey + private let identity: DeviceIdentity + private let relay: RelayClient + private weak var host: PeerManagerHost? + /// Smaller pubkey dials; the other side only advertises + accepts. + private let isDialer: Bool + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "PeerConn") + + private var activePath: ConnectionPathKind = .relay + private var active: DirectTransport? + private var lastRTTMillis: Int? + + /// In-flight candidate probes, keyed by transport identity. + private var probing: [ObjectIdentifier: ProbeInfo] = [:] + private var probeSessionID = UUID() + /// Endpoints the dialer knows about (from relay candidates + Bonjour), retried + /// with backoff after a failed round. + private var knownTargets: [String: DialTarget] = [:] + private var backoffSeconds = 1 + private var retryTask: Task? + + private var continuations: [UUID: AsyncStream.Continuation] = [:] + + private struct ProbeInfo { + let transport: DirectTransport + let pathKind: ConnectionPathKind + /// True when we dialed this link (so we send the handshake probe on + /// connect); false when we adopted an inbound link (we echo instead). + let initiatedByUs: Bool + var probeSentAt: Date? + } + private struct DialTarget { + let endpoint: NWEndpoint + let priority: Int + let pathKind: ConnectionPathKind + } + + public init( + peerHex: String, + peerKey: Curve25519.KeyAgreement.PublicKey, + identity: DeviceIdentity, + relay: RelayClient, + host: PeerManagerHost + ) { + self.peerHex = peerHex + self.peerKey = peerKey + self.identity = identity + self.relay = relay + self.host = host + self.isDialer = identity.publicKeyHex < peerHex + } + + public var currentPath: ConnectionPathKind { activePath } + + /// Disconnect every link and clear connection state. Called when the hub + /// stops (app backgrounding); the hub then drops this manager and builds a + /// fresh one on the next start, so we close sockets cleanly rather than + /// leaking dead `NWConnection`s across a suspend. + public func teardown() async { + retryTask?.cancel(); retryTask = nil + for info in probing.values { await info.transport.disconnect() } + probing.removeAll() + await active?.disconnect() + active = nil + activePath = .relay + knownTargets.removeAll() + } + + public func events() -> AsyncStream { + AsyncStream { continuation in + let id = UUID() + self.continuations[id] = continuation + continuation.onTermination = { @Sendable [weak self] _ in + Task { await self?.removeContinuation(id) } + } + } + } + + /// Route a payload over the active path, falling back to relay on failure. + public func send(_ payload: Payload) async throws { + if activePath != .relay, let active { + do { + try await active.send(payload, to: peerKey) + return + } catch { + logger.error("[PeerConn] direct send failed peer=\(String(self.peerHex.prefix(12)), privacy: .public) — failing over to relay") + demote() + } + } + try await relay.send(payload, to: peerKey) + } + + // MARK: - Triggers from the hub + + /// Advertise our candidates so the peer can reach us directly. Called when the + /// relay (re)connects, when this peer is added while already connected, and + /// when the Bonjour listener binds its port. + public func offerLocalCandidates() async { + await offerCandidates() + } + + /// The network path changed — re-advertise candidates and, if we're on relay, + /// reset backoff and re-probe known targets. + public func networkChanged() async { + await offerCandidates() + if active == nil { + backoffSeconds = 1 + dialAllKnownTargets() + } + } + + /// A Bonjour endpoint matching this peer was discovered (dialer only). + public func considerBonjourEndpoint(_ endpoint: NWEndpoint) async { + guard isDialer, active == nil else { return } + let key = "bonjour:\(endpoint.debugDescription)" + knownTargets[key] = DialTarget(endpoint: endpoint, priority: 0, pathKind: .directLAN) + await dialTarget(knownTargets[key]!) + } + + /// Relay-delivered ICE signaling. + /// + /// LAN candidates are only turned into targets by the LAN **dialer** (smaller + /// pubkey) so exactly one side opens a LAN link. WAN-mapped candidates are the + /// exception: only one peer can ever produce one (WAN port mapping is + /// macOS-only), so whoever *receives* it dials it — this makes a desktop's + /// reachable `wanMapped` endpoint usable regardless of key order, without + /// risking a double connection. + public func handleRelayICE(_ payload: Payload) async { + guard case .iceCandidates(let offer) = payload, active == nil else { return } + for candidate in offer.candidates { + let isWAN = candidate.kind == .wanMapped + guard isWAN || isDialer else { continue } + guard let endpoint = Self.endpoint(for: candidate) else { continue } + let key = "\(candidate.kind.rawValue):\(candidate.host):\(candidate.port)" + let priority = isWAN ? 2 : 1 + let pathKind: ConnectionPathKind = isWAN ? .directWAN : .directLAN + knownTargets[key] = DialTarget(endpoint: endpoint, priority: priority, pathKind: pathKind) + } + dialAllKnownTargets() + } + + /// Register a freshly-created outbound probe (called by the hub from `dial`). + public func registerProbe(_ transport: DirectTransport, pathKind: ConnectionPathKind) { + guard active == nil else { return } + probing[ObjectIdentifier(transport)] = ProbeInfo(transport: transport, pathKind: pathKind, initiatedByUs: true, probeSentAt: nil) + } + + /// Reserve this manager for an adopted inbound link (controlled side). + public func beginAdoptIfIdle(_ transport: DirectTransport) -> Bool { + guard active == nil else { return false } + probing[ObjectIdentifier(transport)] = ProbeInfo(transport: transport, pathKind: .directLAN, initiatedByUs: false, probeSentAt: nil) + return true + } + + // MARK: - Direct transport events + + public func handleDirectEvent(_ event: TransportEvent, from transport: DirectTransport) async { + switch event { + case .stateChanged(let state): + await handleDirectState(state, from: transport) + case .inbound(let inbound): + await handleDirectInbound(inbound, from: transport) + case .deliveryFailed, .pathChanged: + break + } + } + + // MARK: - Private + + private func offerCandidates() async { + guard let host else { return } + let candidates = await host.localDirectCandidates() + guard !candidates.isEmpty else { return } + let payload = Payload.iceCandidates(ICECandidatesPayload(sessionID: probeSessionID, candidates: candidates)) + try? await relay.send(payload, to: peerKey) + } + + private func dialAllKnownTargets() { + guard active == nil else { return } + let targets = knownTargets.values.sorted { $0.priority < $1.priority } + for target in targets { + let delay = Double(target.priority) * 0.2 // stagger so LAN wins before WAN is attempted + Task { [weak self] in + if delay > 0 { try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) } + await self?.dialTarget(target) + } + } + } + + private func dialTarget(_ target: DialTarget) async { + guard active == nil, let host else { return } + // LAN links are opened only by the dialer role; a WAN-mapped endpoint is + // dialed by whichever side received it (see `handleRelayICE`). + guard target.pathKind == .directWAN || isDialer else { return } + await host.dial(target.endpoint, priority: target.priority, pathKind: target.pathKind, for: self) + } + + private func handleDirectState(_ state: TransportConnectionState, from transport: DirectTransport) async { + switch state { + case .connected: + // Initiator: the moment a probe link we dialed is up, send the probe. + if let info = probing[ObjectIdentifier(transport)], info.initiatedByUs { + probing[ObjectIdentifier(transport)]?.probeSentAt = Date() + try? await transport.send(.iceSelected(ICESelectedPayload(sessionID: probeSessionID, echo: false)), to: peerKey) + } + case .disconnected: + if transport === active { + logger.warning("[PeerConn] active direct dropped peer=\(String(self.peerHex.prefix(12)), privacy: .public) — reverting to relay") + demote() + dialAllKnownTargets() + } else if probing[ObjectIdentifier(transport)] != nil { + probing.removeValue(forKey: ObjectIdentifier(transport)) + scheduleRetryIfIdle() + } + case .connecting, .reconnecting: + break + } + } + + private func handleDirectInbound(_ inbound: TransportInbound, from transport: DirectTransport) async { + switch inbound.payload { + case .iceSelected(let sel): + if sel.echo { + // Dialer: our probe was echoed — measure RTT and promote. + if let info = probing[ObjectIdentifier(transport)] { + let rtt = info.probeSentAt.map { Int(Date().timeIntervalSince($0) * 1000) } + promote(transport, pathKind: info.pathKind, rttMillis: rtt) + } + } else { + // Controlled side: a probe arrived — echo it back and promote. + try? await transport.send(.iceSelected(ICESelectedPayload(sessionID: sel.sessionID, echo: true)), to: peerKey) + if let info = probing[ObjectIdentifier(transport)] { + promote(transport, pathKind: info.pathKind, rttMillis: nil) + } + } + case .iceCandidates: + break + default: + if transport === active { emit(.inbound(inbound)) } + } + } + + private func promote(_ transport: DirectTransport, pathKind: ConnectionPathKind, rttMillis: Int?) { + active = transport + activePath = pathKind + lastRTTMillis = rttMillis + backoffSeconds = 1 + retryTask?.cancel(); retryTask = nil + // Cancel every losing probe. + let winner = ObjectIdentifier(transport) + for (oid, info) in probing where oid != winner { + Task { await info.transport.disconnect() } + } + probing.removeAll() + if let rttMillis { + logger.info("[PeerConn] promoted \(String(describing: pathKind), privacy: .public) peer=\(String(self.peerHex.prefix(12)), privacy: .public) rtt=\(rttMillis, privacy: .public)ms") + } else { + logger.info("[PeerConn] promoted \(String(describing: pathKind), privacy: .public) peer=\(String(self.peerHex.prefix(12)), privacy: .public)") + } + emit(.pathChanged(peerHex: peerHex, path: pathKind, rttMillis: rttMillis)) + } + + private func demote() { + let dropped = active + active = nil + activePath = .relay + lastRTTMillis = nil + emit(.pathChanged(peerHex: peerHex, path: .relay, rttMillis: nil)) + Task { await dropped?.disconnect() } + } + + private func scheduleRetryIfIdle() { + // No `isDialer` gate: a non-dialer may still be retrying a WAN target + // (the only kind it ever stores). `dialAllKnownTargets` filters per target. + guard active == nil, probing.isEmpty, retryTask == nil, !knownTargets.isEmpty else { return } + let delay = backoffSeconds + backoffSeconds = min(30, backoffSeconds * 2) + retryTask = Task { [weak self] in + try? await Task.sleep(nanoseconds: UInt64(delay) * 1_000_000_000) + guard let self else { return } + await self.retryNow() + } + } + + private func retryNow() { + retryTask = nil + guard active == nil else { return } + probeSessionID = UUID() + dialAllKnownTargets() + } + + private static func endpoint(for candidate: ICECandidate) -> NWEndpoint? { + guard candidate.port > 0, candidate.port <= 65535, + let port = NWEndpoint.Port(rawValue: UInt16(candidate.port)) else { return nil } + return .hostPort(host: NWEndpoint.Host(candidate.host), port: port) + } + + private func emit(_ event: TransportEvent) { + for c in continuations.values { c.yield(event) } + } + + private func removeContinuation(_ id: UUID) { continuations.removeValue(forKey: id) } +} diff --git a/Packages/Sources/RxCodeSync/Transport/PortMapper.swift b/Packages/Sources/RxCodeSync/Transport/PortMapper.swift new file mode 100644 index 00000000..226ffe9d --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/PortMapper.swift @@ -0,0 +1,195 @@ +import Foundation +import Network +import os +#if canImport(Darwin) +import Darwin +#endif + +/// A best-effort NAT-PMP (RFC 6886) client used to obtain a reachable public +/// IP:port for WAN-direct candidates. +/// +/// It asks the default gateway for its public address and a TCP port mapping to +/// this device's direct-listener port. On a cooperating router (Apple/AirPort, +/// most consumer NAT-PMP/PCP gateways) this yields a `wanMapped` candidate; on a +/// non-cooperating NAT it simply times out and WAN-direct is not offered, leaving +/// the relay to carry the traffic. There is deliberately no UDP hole-punching. +public struct PortMapping: Sendable, Equatable { + public let publicIP: String + public let externalPort: UInt16 + public let lifetimeSeconds: UInt32 +} + +public actor PortMapper { + private static let natpmpPort: UInt16 = 5351 + private let logger = Logger(subsystem: "com.idealapp.RxCodeSync", category: "PortMapper") + + public init() {} + + /// Request a public address + TCP mapping for `internalPort`. Returns nil on + /// any failure (no gateway, no NAT-PMP support, timeout). + public func requestMapping(internalPort: UInt16, lifetime: UInt32 = 3600) async -> PortMapping? { + guard let gateway = Self.defaultGatewayIPv4() else { + logger.info("[PortMapper] no default gateway found — WAN-direct unavailable") + return nil + } + guard let publicIP = await sendExternalAddressRequest(gateway: gateway) else { + logger.info("[PortMapper] gateway \(gateway, privacy: .public) did not answer NAT-PMP — WAN-direct unavailable") + return nil + } + guard let external = await sendMapRequest(gateway: gateway, internalPort: internalPort, lifetime: lifetime) else { + return nil + } + logger.info("[PortMapper] mapped \(internalPort, privacy: .public) → \(publicIP, privacy: .public):\(external.port, privacy: .public) lifetime=\(external.lifetime, privacy: .public)s") + return PortMapping(publicIP: publicIP, externalPort: external.port, lifetimeSeconds: external.lifetime) + } + + // MARK: - NAT-PMP requests + + private func sendExternalAddressRequest(gateway: String) async -> String? { + // [version=0, opcode=0] + let request = Data([0, 0]) + guard let response = await exchange(gateway: gateway, request: request, minResponse: 12) else { return nil } + // [ver, op=128, result(2), epoch(4), ip(4)] + guard response.count >= 12, response[1] == 128 else { return nil } + let result = UInt16(response[2]) << 8 | UInt16(response[3]) + guard result == 0 else { return nil } + return "\(response[8]).\(response[9]).\(response[10]).\(response[11])" + } + + private func sendMapRequest(gateway: String, internalPort: UInt16, lifetime: UInt32) async -> (port: UInt16, lifetime: UInt32)? { + var request = Data() + request.append(0) // version + request.append(2) // opcode 2 = map TCP + request.append(contentsOf: [0, 0]) // reserved + request.append(UInt8(internalPort >> 8)); request.append(UInt8(internalPort & 0xff)) + // Suggested external port = internal port (router may pick another). + request.append(UInt8(internalPort >> 8)); request.append(UInt8(internalPort & 0xff)) + request.append(UInt8((lifetime >> 24) & 0xff)); request.append(UInt8((lifetime >> 16) & 0xff)) + request.append(UInt8((lifetime >> 8) & 0xff)); request.append(UInt8(lifetime & 0xff)) + + guard let response = await exchange(gateway: gateway, request: request, minResponse: 16) else { return nil } + // [ver, op=130, result(2), epoch(4), internalPort(2), externalPort(2), lifetime(4)] + guard response.count >= 16, response[1] == 130 else { return nil } + let result = UInt16(response[2]) << 8 | UInt16(response[3]) + guard result == 0 else { return nil } + let externalPort = UInt16(response[10]) << 8 | UInt16(response[11]) + let grantedLifetime = UInt32(response[12]) << 24 | UInt32(response[13]) << 16 | UInt32(response[14]) << 8 | UInt32(response[15]) + return (externalPort, grantedLifetime) + } + + /// Fire a single UDP datagram to the gateway and await one reply (2s cap). + private func exchange(gateway: String, request: Data, minResponse: Int) async -> Data? { + guard let port = NWEndpoint.Port(rawValue: Self.natpmpPort) else { return nil } + let host = NWEndpoint.Host(gateway) + let connection = NWConnection(host: host, port: port, using: .udp) + let queue = DispatchQueue(label: "com.idealapp.RxCodeSync.portmapper") + + return await withCheckedContinuation { (cont: CheckedContinuation) in + let state = ExchangeState() + connection.stateUpdateHandler = { st in + if case .ready = st { + connection.send(content: request, completion: .contentProcessed { _ in }) + connection.receiveMessage { data, _, _, _ in + state.finish(cont: cont, connection: connection, value: (data?.count ?? 0) >= minResponse ? data : nil) + } + } else if case .failed = st { + state.finish(cont: cont, connection: connection, value: nil) + } + } + connection.start(queue: queue) + queue.asyncAfter(deadline: .now() + 2.0) { + state.finish(cont: cont, connection: connection, value: nil) + } + } + } + + /// Guards single-resume of the exchange continuation across the reply, + /// failure, and timeout races. + private final class ExchangeState: @unchecked Sendable { + private let lock = NSLock() + private var done = false + func finish(cont: CheckedContinuation, connection: NWConnection, value: Data?) { + lock.lock() + if done { lock.unlock(); return } + done = true + lock.unlock() + connection.cancel() + cont.resume(returning: value) + } + } + + // MARK: - Default gateway discovery (sysctl route dump) + +#if os(macOS) + /// Parse the kernel routing table for the IPv4 default route's gateway. + /// + /// macOS only: the `` message layout (`rt_msghdr`, `RTF_*`, + /// `RTA_*`) isn't exposed in the iOS SDK. That's by design here — the device + /// that *offers* a WAN candidate (typically the desktop on a home network + /// with a NAT-PMP router) needs this; the iOS *dialer* just connects to the + /// advertised public address and never maps a port. + static func defaultGatewayIPv4() -> String? { + var mib: [Int32] = [CTL_NET, PF_ROUTE, 0, AF_INET, NET_RT_DUMP, 0] + var len = 0 + guard sysctl(&mib, u_int(mib.count), nil, &len, nil, 0) == 0, len > 0 else { return nil } + var buffer = [UInt8](repeating: 0, count: len) + guard sysctl(&mib, u_int(mib.count), &buffer, &len, nil, 0) == 0 else { return nil } + + return buffer.withUnsafeBytes { raw -> String? in + var offset = 0 + let base = raw.baseAddress! + while offset < len { + let hdr = base.advanced(by: offset).assumingMemoryBound(to: rt_msghdr.self).pointee + if hdr.rtm_msglen == 0 { break } + // Default route: gateway flag set and a default (0.0.0.0) destination. + if (hdr.rtm_flags & RTF_GATEWAY) != 0 { + if let gw = gatewayAddress(base: base, msgOffset: offset, addrsMask: hdr.rtm_addrs, msgLen: Int(hdr.rtm_msglen)) { + return gw + } + } + offset += Int(hdr.rtm_msglen) + } + return nil + } + } + + /// Walk the sockaddrs trailing an rt_msghdr; return the RTA_GATEWAY address if + /// the destination is the default route (0.0.0.0). + private static func gatewayAddress(base: UnsafeRawPointer, msgOffset: Int, addrsMask: Int32, msgLen: Int) -> String? { + var cursor = msgOffset + MemoryLayout.stride + let end = msgOffset + msgLen + var isDefaultDestination = false + var gateway: String? + + // Addresses appear in ascending RTA_* bit order. + let order: [Int32] = [RTA_DST, RTA_GATEWAY, RTA_NETMASK, RTA_GENMASK, RTA_IFP, RTA_IFA, RTA_AUTHOR, RTA_BRD] + for bit in order { + guard (addrsMask & bit) != 0, cursor + MemoryLayout.size <= end else { continue } + let sa = base.advanced(by: cursor).assumingMemoryBound(to: sockaddr.self) + let saLen = Int(sa.pointee.sa_len) + let advance = saLen == 0 ? 4 : (saLen + 3) & ~3 // round up to 4 + + if sa.pointee.sa_family == UInt8(AF_INET) { + let sin = base.advanced(by: cursor).assumingMemoryBound(to: sockaddr_in.self).pointee + let ip = ipv4String(sin.sin_addr) + if bit == RTA_DST { + isDefaultDestination = (sin.sin_addr.s_addr == 0) + } else if bit == RTA_GATEWAY { + gateway = ip + } + } + cursor += advance + } + return isDefaultDestination ? gateway : nil + } + + private static func ipv4String(_ addr: in_addr) -> String { + let n = addr.s_addr // network byte order + return "\(n & 0xff).\((n >> 8) & 0xff).\((n >> 16) & 0xff).\((n >> 24) & 0xff)" + } +#else + /// iOS: no route-table access. WAN candidates are only ever *offered* by the + /// peer that can map a port (the desktop); this device dials theirs instead. + static func defaultGatewayIPv4() -> String? { nil } +#endif +} diff --git a/Packages/Sources/RxCodeSync/Transport/RelayClient.swift b/Packages/Sources/RxCodeSync/Transport/RelayClient.swift index 8c04b2ae..74fe9ca9 100644 --- a/Packages/Sources/RxCodeSync/Transport/RelayClient.swift +++ b/Packages/Sources/RxCodeSync/Transport/RelayClient.swift @@ -7,25 +7,13 @@ import os /// /// The relay sees only `Envelope` and `DeliveryFailedNotice` — every other /// payload is encrypted before send and decrypted after receive. -public actor RelayClient { - public struct Inbound: Sendable { - public let from: Curve25519.KeyAgreement.PublicKey - public let fromHex: String - public let payload: Payload - } - - public enum ConnectionState: Sendable, Equatable { - case disconnected - case connecting - case connected - case reconnecting(nextAttemptInSeconds: Int) - } - - public enum Event: Sendable { - case stateChanged(ConnectionState) - case inbound(Inbound) - case deliveryFailed(toHex: String) - } +public actor RelayClient: Transport { + // The transport event/state types are shared across every `Transport`; keep + // the historical `RelayClient.Inbound` / `.ConnectionState` / `.Event` + // spellings as typealiases so existing call sites compile unchanged. + public typealias Inbound = TransportInbound + public typealias ConnectionState = TransportConnectionState + public typealias Event = TransportEvent /// WebSocket frame ceiling. The 1 MiB `URLSessionWebSocketTask` default is /// too small for sync payloads; 10 MiB is a comfortable safety margin now @@ -68,12 +56,30 @@ public actor RelayClient { } public func connect() { - guard task == nil else { return } + guard task == nil else { + logger.info("[Relay] connect skipped — socket already assigned relay=\(self.relayURL.absoluteString, privacy: .public) state=\(String(describing: self.state), privacy: .public)") + return + } shouldReconnect = true logger.info("[Relay] connect requested relay=\(self.relayURL.absoluteString, privacy: .public) localKey=\(String(self.identity.publicKeyHex.prefix(12)), privacy: .public)") openSocket() } + /// Force a clean reconnect. Unlike `connect()`, this does not bail when a + /// socket is still assigned — it tears down any existing (possibly stale) + /// socket first, then reopens. This is the correct entry point on app + /// foreground: iOS suspends the process in the background, so the socket can + /// die without `receiveOne`/`sendPing` ever firing the failure callback that + /// would clear `task`. On resume `task` may still reference a dead socket, + /// which would make a plain `connect()` no-op forever. + public func reconnect() { + shouldReconnect = true + logger.info("[Relay] reconnect requested relay=\(self.relayURL.absoluteString, privacy: .public) localKey=\(String(self.identity.publicKeyHex.prefix(12)), privacy: .public)") + closeSocketLocally() + reconnectAttempt = 0 + openSocket() + } + public func disconnect() { shouldReconnect = false logger.info("[Relay] disconnect requested relay=\(self.relayURL.absoluteString, privacy: .public) localKey=\(String(self.identity.publicKeyHex.prefix(12)), privacy: .public)") @@ -88,23 +94,11 @@ public actor RelayClient { logger.error("[Relay] send failed not connected type=\(payload.logName, privacy: .public) to=\(String(recipient.rawRepresentation.hexString.prefix(12)), privacy: .public) relay=\(self.relayURL.absoluteString, privacy: .public)") throw RelayError.notConnected } - let plaintext = try JSONEncoder().encode(payload) - let (nonce, ct) = try SessionCrypto.seal( - plaintext: plaintext, - sender: identity.privateKey, - recipient: recipient - ) - let envelope = Envelope( - to: recipient.rawRepresentation.hexString, - from: identity.publicKeyHex, - nonce: nonce, - ct: ct - ) - let raw = try JSONEncoder().encode(envelope) + let raw = try EnvelopeCodec.encode(payload, from: identity, to: recipient) do { try await task.send(.data(raw)) } catch { - logger.error("[Relay] send failed type=\(payload.logName, privacy: .public) to=\(String(envelope.to.prefix(12)), privacy: .public) error=\(error.localizedDescription, privacy: .public)") + logger.error("[Relay] send failed type=\(payload.logName, privacy: .public) to=\(String(recipient.rawRepresentation.hexString.prefix(12)), privacy: .public) error=\(error.localizedDescription, privacy: .public)") throw error } } @@ -129,10 +123,16 @@ public actor RelayClient { // the race. Opening another would register a second connection for the // same pubkey on the relay — the cause of duplicate registrations when // resuming from background. Never stack sockets. - guard task == nil else { return } + guard task == nil else { + logger.info("[Relay] openSocket skipped — socket already assigned relay=\(self.relayURL.absoluteString, privacy: .public) state=\(String(describing: self.state), privacy: .public)") + return + } // A disconnect() may have landed while a scheduled reconnect was still // sleeping; honour it instead of reopening. - guard shouldReconnect else { return } + guard shouldReconnect else { + logger.info("[Relay] openSocket skipped — shouldReconnect=false relay=\(self.relayURL.absoluteString, privacy: .public)") + return + } updateState(.connecting) @@ -232,39 +232,14 @@ public actor RelayClient { } private func handleIncoming(_ raw: Data) { - let decoder = JSONDecoder() - if let notice = try? decoder.decode(DeliveryFailedNotice.self, from: raw), - notice.type == "delivery_failed" { - logger.warning("[Relay] delivery failed to=\(String(notice.to.prefix(12)), privacy: .public)") - emit(.deliveryFailed(toHex: notice.to)) - return - } - guard let env = try? decoder.decode(Envelope.self, from: raw) else { - logger.warning("[Relay] dropping non-envelope message bytes=\(raw.count, privacy: .public)") - return - } - guard let nonce = env.nonceData, - let ct = env.ciphertextData, - let fromRaw = Data(hexString: env.from), - let fromKey = try? Curve25519.KeyAgreement.PublicKey(rawRepresentation: fromRaw) - else { - logger.warning("[Relay] dropping malformed envelope from=\(String(env.from.prefix(12)), privacy: .public)") - return - } - do { - let plaintext = try SessionCrypto.open( - ciphertext: ct, - nonce: nonce, - recipient: identity.privateKey, - sender: fromKey - ) - let payload = try decoder.decode(Payload.self, from: plaintext) - emit(.inbound(Inbound(from: fromKey, fromHex: env.from, payload: payload))) - } catch { - // Decrypt or decode failure means the sender isn't a paired peer - // we know how to talk to, OR the wire format drifted. Drop quietly. - logger.warning("[Relay] dropping encrypted payload from=\(String(env.from.prefix(12)), privacy: .public) error=\(error.localizedDescription, privacy: .public)") - return + switch EnvelopeCodec.decode(raw, localIdentity: identity) { + case .deliveryFailed(let toHex): + logger.warning("[Relay] delivery failed to=\(String(toHex.prefix(12)), privacy: .public)") + emit(.deliveryFailed(toHex: toHex)) + case .inbound(let inbound): + emit(.inbound(inbound)) + case .drop: + break } } diff --git a/Packages/Sources/RxCodeSync/Transport/Transport.swift b/Packages/Sources/RxCodeSync/Transport/Transport.swift new file mode 100644 index 00000000..9561eff0 --- /dev/null +++ b/Packages/Sources/RxCodeSync/Transport/Transport.swift @@ -0,0 +1,58 @@ +import Foundation +import CryptoKit + +/// A path over which encrypted `Envelope`s flow between two paired devices. +/// +/// Both the relay WebSocket (`RelayClient`) and a direct TCP link +/// (`DirectTransport`) conform. Everything above this seam — `SyncClient`, +/// `MobileSyncService`, `MobileAppState`, and every payload handler — is +/// path-blind: it sends a `Payload` to a peer's public key and receives +/// `TransportEvent`s, regardless of which physical path carries the bytes. +public protocol Transport: Actor { + /// A multiplexed event stream. Each subscriber gets its own buffered stream. + func events() -> AsyncStream + /// Begin connecting. Idempotent — a second call while connected is a no-op. + func connect() async + /// Tear down and stop reconnecting. + func disconnect() async + /// Encrypt `payload` for `recipient` and send the resulting `Envelope`. + func send(_ payload: Payload, to recipient: Curve25519.KeyAgreement.PublicKey) async throws +} + +/// The physical path currently carrying data to a peer. Runtime-only; surfaced +/// in the UI as a badge next to the online indicator. +public enum ConnectionPathKind: String, Sendable, Equatable, Codable { + case relay + case directLAN + case directWAN +} + +public enum TransportConnectionState: Sendable, Equatable { + case disconnected + case connecting + case connected + case reconnecting(nextAttemptInSeconds: Int) +} + +public struct TransportInbound: Sendable { + public let from: Curve25519.KeyAgreement.PublicKey + public let fromHex: String + public let payload: Payload + + public init(from: Curve25519.KeyAgreement.PublicKey, fromHex: String, payload: Payload) { + self.from = from + self.fromHex = fromHex + self.payload = payload + } +} + +public enum TransportEvent: Sendable { + case stateChanged(TransportConnectionState) + case inbound(TransportInbound) + case deliveryFailed(toHex: String) + /// Emitted by the multi-path coordinator when the active path to a peer + /// changes (e.g. relay → direct-LAN, or direct → relay on failover). + /// `rttMillis` is the measured handshake round-trip when promoting a direct + /// path, `nil` on relay fallback. + case pathChanged(peerHex: String, path: ConnectionPathKind, rttMillis: Int?) +} diff --git a/Packages/Tests/RxCodeSyncTests/DirectTransportCodecTests.swift b/Packages/Tests/RxCodeSyncTests/DirectTransportCodecTests.swift new file mode 100644 index 00000000..2905209e --- /dev/null +++ b/Packages/Tests/RxCodeSyncTests/DirectTransportCodecTests.swift @@ -0,0 +1,122 @@ +import Foundation +import CryptoKit +import Testing +@testable import RxCodeSync + +/// Covers the P2P wire additions: the ICE signaling payloads and the shared +/// `EnvelopeCodec` that both `RelayClient` and `DirectTransport` use, so a direct +/// link carries exactly the same E2E-sealed bytes as the relay. +@Suite("Direct transport wire format") +struct DirectTransportCodecTests { + @Test("ice candidates payload round trips through Payload") + func iceCandidatesRoundTrip() throws { + let sid = UUID(uuidString: "11111111-2222-3333-4444-555555555555")! + let payload = Payload.iceCandidates( + ICECandidatesPayload( + sessionID: sid, + candidates: [ + ICECandidate(kind: .lanBonjour, host: "", port: 0, bonjourName: "RxCode-Mac"), + ICECandidate(kind: .lanInterface, host: "192.168.1.20", port: 51234), + ], + observedPublicIP: "203.0.113.7" + ) + ) + let data = try JSONEncoder().encode(payload) + guard case .iceCandidates(let decoded) = try JSONDecoder().decode(Payload.self, from: data) else { + Issue.record("Expected iceCandidates payload"); return + } + #expect(decoded.sessionID == sid) + #expect(decoded.candidates.count == 2) + #expect(decoded.candidates.first?.bonjourName == "RxCode-Mac") + #expect(decoded.observedPublicIP == "203.0.113.7") + } + + @Test("ice selected handshake payload round trips") + func iceSelectedRoundTrip() throws { + let sid = UUID() + let data = try JSONEncoder().encode(Payload.iceSelected(ICESelectedPayload(sessionID: sid, echo: true))) + guard case .iceSelected(let decoded) = try JSONDecoder().decode(Payload.self, from: data) else { + Issue.record("Expected iceSelected payload"); return + } + #expect(decoded.sessionID == sid) + #expect(decoded.echo == true) + } + + @Test("older peers decode the new ICE types as unknown, not a decode failure") + func iceTypesAreForwardCompatible() throws { + // Simulates a build that predates the ICE cases receiving one. + let json = #"{"type":"ice_selected","data":{"sessionID":"11111111-2222-3333-4444-555555555555","echo":false}}"# + // Re-encode via a raw type the enum doesn't know by mangling the tag. + let unknownJSON = json.replacingOccurrences(of: "ice_selected", with: "some_future_type") + let decoded = try JSONDecoder().decode(Payload.self, from: Data(unknownJSON.utf8)) + guard case .unknown(let type) = decoded else { + Issue.record("Expected unknown payload"); return + } + #expect(type == "some_future_type") + } + + @Test("EnvelopeCodec seals for the recipient and opens on the other side") + func envelopeCodecRoundTrip() throws { + // Two independent device identities (as desktop + mobile would have). + let alice = DeviceIdentity(privateKey: Curve25519.KeyAgreement.PrivateKey()) + let bob = DeviceIdentity(privateKey: Curve25519.KeyAgreement.PrivateKey()) + + let payload = Payload.iceSelected(ICESelectedPayload(sessionID: UUID(), echo: false)) + let wire = try EnvelopeCodec.encode(payload, from: alice, to: bob.publicKey) + + // Bob decodes: gets the payload, attributed to Alice. + guard case .inbound(let inbound) = EnvelopeCodec.decode(wire, localIdentity: bob) else { + Issue.record("Bob should decode Alice's envelope"); return + } + #expect(inbound.fromHex == alice.publicKeyHex) + guard case .iceSelected = inbound.payload else { + Issue.record("Expected iceSelected inbound"); return + } + + // A third party (not the recipient) cannot open it — dropped, not crashed. + let eve = DeviceIdentity(privateKey: Curve25519.KeyAgreement.PrivateKey()) + if case .inbound = EnvelopeCodec.decode(wire, localIdentity: eve) { + Issue.record("Eve must not be able to open an envelope addressed to Bob") + } + } + + @Test("interface enumeration excludes loopback and link-local") + func interfaceCandidatesAreRoutable() { + // Whatever this host reports, the filter must never surface loopback or + // link-local addresses (they can't reach a peer). + for ip in NetworkInterfaces.localIPAddresses() { + #expect(ip != "127.0.0.1") + #expect(ip != "::1") + #expect(!ip.hasPrefix("169.254.")) + #expect(!ip.lowercased().hasPrefix("fe80:")) + #expect(!ip.contains("%")) + } + } + + @Test("wan-mapped candidate round trips through Payload") + func wanMappedCandidateRoundTrip() throws { + let payload = Payload.iceCandidates( + ICECandidatesPayload( + sessionID: UUID(), + candidates: [ICECandidate(kind: .wanMapped, host: "203.0.113.9", port: 40001)], + observedPublicIP: "203.0.113.9" + ) + ) + let data = try JSONEncoder().encode(payload) + guard case .iceCandidates(let decoded) = try JSONDecoder().decode(Payload.self, from: data) else { + Issue.record("Expected iceCandidates payload"); return + } + #expect(decoded.candidates.first?.kind == .wanMapped) + #expect(decoded.candidates.first?.port == 40001) + } + + @Test("EnvelopeCodec surfaces a delivery-failed notice") + func envelopeCodecDeliveryFailed() throws { + let me = DeviceIdentity(privateKey: Curve25519.KeyAgreement.PrivateKey()) + let notice = try JSONEncoder().encode(DeliveryFailedNotice(v: 1, type: "delivery_failed", to: "abcdef")) + guard case .deliveryFailed(let toHex) = EnvelopeCodec.decode(notice, localIdentity: me) else { + Issue.record("Expected delivery-failed result"); return + } + #expect(toHex == "abcdef") + } +} diff --git a/RxCode/Info.plist b/RxCode/Info.plist index ea3e4656..740b7b52 100644 --- a/RxCode/Info.plist +++ b/RxCode/Info.plist @@ -29,6 +29,12 @@ NSAllowsLocalNetworking + NSLocalNetworkUsageDescription + RxCode connects directly to your paired iPhone or iPad over the local network for a faster, private peer-to-peer sync. + NSBonjourServices + + _rxcode-sync._tcp + CFBundleURLTypes diff --git a/RxCode/Resources/Localizable.xcstrings b/RxCode/Resources/Localizable.xcstrings index 853ee7e6..ec865b11 100644 --- a/RxCode/Resources/Localizable.xcstrings +++ b/RxCode/Resources/Localizable.xcstrings @@ -5705,6 +5705,15 @@ } } } + }, + "Direct · LAN%@" : { + + }, + "Direct · WAN%@" : { + + }, + "Direct peer-to-peer connections" : { + }, "Disable globally" : { "localizations" : { @@ -17194,6 +17203,9 @@ } } } + }, + "When on the same network, sync connects straight to your device over the LAN (end-to-end encrypted) and falls back to the relay automatically. Turn off to always use relay servers." : { + }, "When tapped" : { "localizations" : { diff --git a/RxCode/Services/MobileSyncService+EventDispatch.swift b/RxCode/Services/MobileSyncService+EventDispatch.swift index 71b7e476..ee0841c6 100644 --- a/RxCode/Services/MobileSyncService+EventDispatch.swift +++ b/RxCode/Services/MobileSyncService+EventDispatch.swift @@ -26,6 +26,8 @@ extension MobileSyncService { } case .inbound(let inbound): handleInbound(inbound) + case .pathChanged(let peerHex, let path, let rttMillis): + updateActivePath(peerHex: peerHex, path: path, rttMillis: rttMillis) } } @@ -45,9 +47,20 @@ extension MobileSyncService { } case .inbound(let inbound): handleInbound(inbound, fromServer: server) + case .pathChanged(let peerHex, let path, let rttMillis): + updateActivePath(peerHex: peerHex, path: path, rttMillis: rttMillis) } } + /// Record the active transport path for a peer (drives the settings badge). + private func updateActivePath(peerHex: String, path: ConnectionPathKind, rttMillis: Int?) { + guard let idx = pairedDevices.firstIndex(where: { $0.pubkeyHex == peerHex }) else { return } + guard pairedDevices[idx].activePath != path || pairedDevices[idx].directRTTMillis != rttMillis else { return } + pairedDevices[idx].activePath = path + pairedDevices[idx].directRTTMillis = rttMillis + logger.info("[MobileSync] direct path for mobileKey=\(String(peerHex.prefix(12)), privacy: .public) is now \(String(describing: path), privacy: .public)") + } + /// Update aggregate connection state based on all relay states. private func updateAggregateConnectionState() { let states = Array(relayConnectionStates.values) diff --git a/RxCode/Services/MobileSyncService.swift b/RxCode/Services/MobileSyncService.swift index 2ccc0e75..6e6b6610 100644 --- a/RxCode/Services/MobileSyncService.swift +++ b/RxCode/Services/MobileSyncService.swift @@ -48,6 +48,11 @@ struct PairedDevice: Codable, Identifiable, Sendable, Hashable { var onlineState: OnlineState = .unknown /// Wall-clock of the last `onlineState` change. Not persisted. var lastOnlineTransitionAt: Date? = nil + /// Active transport path — not persisted. `.relay` until a direct link is + /// promoted; drives the "Direct · LAN" / "Relay" badge. + var activePath: ConnectionPathKind = .relay + /// Measured direct-path handshake RTT in milliseconds — not persisted. + var directRTTMillis: Int? = nil var id: String { pubkeyHex } @@ -240,7 +245,7 @@ final class MobileSyncService: ObservableObject { Self.logFatalKeychain(error) fatalError("Failed to load device identity: \(error)") } - self.client = SyncClient(identity: identity, relayURL: initial) + self.client = SyncClient(identity: identity, relayURL: initial, directPathsEnabled: Self.directPathsEnabledSetting) loadPairedDevices() loadSavedRelayServers() } @@ -286,7 +291,7 @@ final class MobileSyncService: ObservableObject { logger.info("[MobileSync] starting relay server=\(server.name, privacy: .public) url=\(server.url, privacy: .public) desktopKey=\(String(self.identity.publicKeyHex.prefix(12)), privacy: .public)") - let newClient = SyncClient(identity: identity, relayURL: url) + let newClient = SyncClient(identity: identity, relayURL: url, directPathsEnabled: Self.directPathsEnabledSetting) additionalClients[server.id] = newClient // Add all paired devices that use this relay @@ -379,6 +384,29 @@ final class MobileSyncService: ObservableObject { relayConnectionStates.removeAll() } + /// User preference for peer-to-peer direct paths. Defaults to on. + static var directPathsEnabledSetting: Bool { + if UserDefaults.standard.object(forKey: "mobileSync.directPathsEnabled") == nil { return true } + return UserDefaults.standard.bool(forKey: "mobileSync.directPathsEnabled") + } + + /// Toggle peer-to-peer direct paths, persist the choice, and rebuild every + /// per-relay client (the ones that actually route paired devices) so the new + /// setting takes effect immediately. Each rebuild re-adds the relay's peers + /// and restarts its event task via `startRelayServer`. + func setDirectPathsEnabled(_ enabled: Bool) { + guard enabled != Self.directPathsEnabledSetting else { return } + UserDefaults.standard.set(enabled, forKey: "mobileSync.directPathsEnabled") + logger.info("[MobileSync] direct paths \(enabled ? "enabled" : "disabled", privacy: .public) — rebuilding relay clients") + let servers = savedRelayServers.filter { $0.isEnabled } + Task { @MainActor in + for server in servers { + await stopRelayServer(server) + await startRelayServer(server) + } + } + } + /// Update the configured relay URL, persist it, and reconnect. func updateRelay(url: URL) { guard url != relayURL else { return } @@ -388,7 +416,7 @@ final class MobileSyncService: ObservableObject { eventTask?.cancel() eventTask = nil let oldClient = client - client = SyncClient(identity: identity, relayURL: url) + client = SyncClient(identity: identity, relayURL: url, directPathsEnabled: Self.directPathsEnabledSetting) connectionState = .disconnected Task { await oldClient.stop() } start() diff --git a/RxCode/Views/Settings/MobileSettingsTab.swift b/RxCode/Views/Settings/MobileSettingsTab.swift index 427bdf5d..c4785beb 100644 --- a/RxCode/Views/Settings/MobileSettingsTab.swift +++ b/RxCode/Views/Settings/MobileSettingsTab.swift @@ -23,6 +23,7 @@ struct MobileSettingsTab: View { VStack(alignment: .leading, spacing: 20) { headerSection relaySection + directConnectionsSection Divider() pairedSection } @@ -263,6 +264,7 @@ struct MobileSettingsTab: View { Text(device.displayName) .fontWeight(.medium) onlineStatePill(for: device) + pathPill(for: device) } HStack(spacing: 6) { if let token = MobileSyncService.pushToken(for: device), !token.isEmpty { @@ -334,6 +336,42 @@ struct MobileSettingsTab: View { } } + /// Shows how sync traffic reaches this device: a green "Direct · LAN" chip + /// when a peer-to-peer link is active, otherwise nothing (relay is implied + /// by the existing online pill). + private var directConnectionsSection: some View { + VStack(alignment: .leading, spacing: 6) { + Toggle(isOn: Binding( + get: { MobileSyncService.directPathsEnabledSetting }, + set: { sync.setDirectPathsEnabled($0) } + )) { + Text("Direct peer-to-peer connections") + } + Text("When on the same network, sync connects straight to your device over the LAN (end-to-end encrypted) and falls back to the relay automatically. Turn off to always use relay servers.") + .font(.caption) + .foregroundStyle(.secondary) + } + } + + @ViewBuilder + private func pathPill(for device: PairedDevice) -> some View { + let rtt = device.directRTTMillis.map { " · \($0)ms" } ?? "" + switch device.activePath { + case .directLAN: + Label("Direct · LAN\(rtt)", systemImage: "bolt.horizontal.fill") + .font(.caption2) + .foregroundStyle(.green) + .labelStyle(.titleAndIcon) + case .directWAN: + Label("Direct · WAN\(rtt)", systemImage: "bolt.horizontal.fill") + .font(.caption2) + .foregroundStyle(.blue) + .labelStyle(.titleAndIcon) + case .relay: + EmptyView() + } + } + @ViewBuilder private func copyTokenButton(for device: PairedDevice) -> some View { let token = MobileSyncService.pushToken(for: device) diff --git a/RxCodeMobile/Info.plist b/RxCodeMobile/Info.plist index 5e68e6ba..7b068e38 100644 --- a/RxCodeMobile/Info.plist +++ b/RxCodeMobile/Info.plist @@ -11,7 +11,11 @@ NSCameraUsageDescription Scan a QR code shown by RxCode on your Mac to pair this device. NSLocalNetworkUsageDescription - RxCode connects to a relay running on your local network to sync with your Mac. + RxCode connects directly to your Mac over the local network for a faster, private peer-to-peer sync. + NSBonjourServices + + _rxcode-sync._tcp + NSAppTransportSecurity NSAllowsLocalNetworking diff --git a/RxCodeMobile/Resources/Localizable.xcstrings b/RxCodeMobile/Resources/Localizable.xcstrings index 33c88775..7572a694 100644 --- a/RxCodeMobile/Resources/Localizable.xcstrings +++ b/RxCodeMobile/Resources/Localizable.xcstrings @@ -3383,6 +3383,12 @@ } } } + }, + "Direct · LAN%@" : { + + }, + "Direct connection" : { + }, "Disabled" : { "localizations" : { @@ -8644,6 +8650,7 @@ } }, "Select which Mac this device controls. Removing one pairing does not reset this device's identity." : { + "extractionState" : "stale", "localizations" : { "en" : { "stringUnit" : { @@ -8664,6 +8671,9 @@ } } } + }, + "Select which Mac this device controls. When Direct connection is on and you're on the same network, sync connects straight to your Mac (end-to-end encrypted) and falls back to the relay automatically." : { + }, "Serious" : { "localizations" : { @@ -10771,4 +10781,4 @@ } }, "version" : "1.1" -} +} \ No newline at end of file diff --git a/RxCodeMobile/RxCodeMobileApp.swift b/RxCodeMobile/RxCodeMobileApp.swift index 433e01b9..c0ffccfc 100644 --- a/RxCodeMobile/RxCodeMobileApp.swift +++ b/RxCodeMobile/RxCodeMobileApp.swift @@ -41,7 +41,8 @@ struct RxCodeMobileApp: App { handlePairingURL(url) } } - .onChange(of: scenePhase) { _, newPhase in + .onChange(of: scenePhase) { oldPhase, newPhase in + NSLog("[Lifecycle] scenePhase \(String(describing: oldPhase)) -> \(String(describing: newPhase))") state.handleScenePhase(newPhase) } } diff --git a/RxCodeMobile/State/MobileAppState+Inbound.swift b/RxCodeMobile/State/MobileAppState+Inbound.swift index 828eed0d..937e1055 100644 --- a/RxCodeMobile/State/MobileAppState+Inbound.swift +++ b/RxCodeMobile/State/MobileAppState+Inbound.swift @@ -24,6 +24,10 @@ extension MobileAppState { logger.warning("[Relay] delivery failed to desktopKey=\(String(toHex.prefix(12)), privacy: .public)") case .inbound(let inbound): handleInbound(inbound) + case .pathChanged(let peerHex, let path, let rttMillis): + logger.info("[Direct] active path for desktopKey=\(String(peerHex.prefix(12)), privacy: .public) is now \(String(describing: path), privacy: .public)") + activePathByDesktop[peerHex] = path + directRTTByDesktop[peerHex] = rttMillis } } diff --git a/RxCodeMobile/State/MobileAppState+Pairing.swift b/RxCodeMobile/State/MobileAppState+Pairing.swift index 89b5b23f..c0d6c0ce 100644 --- a/RxCodeMobile/State/MobileAppState+Pairing.swift +++ b/RxCodeMobile/State/MobileAppState+Pairing.swift @@ -72,7 +72,7 @@ extension MobileAppState { let oldClient = client eventTask?.cancel() eventTask = nil - client = SyncClient(identity: identity, relayURL: url) + client = SyncClient(identity: identity, relayURL: url, directPathsEnabled: MobileAppState.directPathsEnabledSetting) relayURL = url connectionState = .disconnected await oldClient.stop() diff --git a/RxCodeMobile/State/MobileAppState.swift b/RxCodeMobile/State/MobileAppState.swift index 22a02422..ae6595df 100644 --- a/RxCodeMobile/State/MobileAppState.swift +++ b/RxCodeMobile/State/MobileAppState.swift @@ -83,6 +83,11 @@ final class MobileAppState: ObservableObject { @Published var pairedDesktopPubkey: String = "" @Published var pairedDesktops: [PairedDesktop] = [] @Published var connectionState: RelayClient.ConnectionState = .disconnected + /// Active transport path per paired desktop (pubkey-hex → path). Runtime-only, + /// drives the "Direct · LAN" / "Relay" badge. Absent ⇒ relay. + @Published var activePathByDesktop: [String: ConnectionPathKind] = [:] + /// Measured direct-path handshake RTT (ms) per paired desktop. Runtime-only. + @Published var directRTTByDesktop: [String: Int?] = [:] @Published var relayURL: URL @Published var pairingStatus: PairingStatus = .idle @@ -265,6 +270,10 @@ final class MobileAppState: ObservableObject { var client: SyncClient let logger = Logger(subsystem: "com.idealapp.RxCodeMobile", category: "MobileAppState") var eventTask: Task? + /// Serializes background/foreground relay transitions so a delayed + /// `stop()` can never land after a later `start()` and leave the relay + /// permanently disconnected. + var lifecycleTask: Task? var pairingTimeoutTask: Task? var apnsTokenHex: String? var apnsEnvironment: String? @@ -314,7 +323,7 @@ final class MobileAppState: ObservableObject { fatalError("Failed to load mobile device identity: \(error)") } } - self.client = SyncClient(identity: identity, relayURL: initial) + self.client = SyncClient(identity: identity, relayURL: initial, directPathsEnabled: Self.directPathsEnabledSetting) logger.info("[MobileIdentity] loaded publicKey=\(String(self.identity.publicKeyHex.prefix(12)), privacy: .public) accessGroup=\(Self.keychainAccessGroup, privacy: .public)") loadPairedDesktops() #if DEBUG @@ -333,6 +342,27 @@ final class MobileAppState: ObservableObject { DeviceIdentity.resolveAccessGroup(suffix: keychainAccessGroupSuffix) } + /// User preference for peer-to-peer direct paths. Defaults to on. + static var directPathsEnabledSetting: Bool { + if UserDefaults.standard.object(forKey: "mobileSync.directPathsEnabled") == nil { return true } + return UserDefaults.standard.bool(forKey: "mobileSync.directPathsEnabled") + } + + /// Toggle peer-to-peer direct paths, persist the choice, and rebuild the + /// client so it takes effect immediately. + func setDirectPathsEnabled(_ enabled: Bool) async { + guard enabled != Self.directPathsEnabledSetting else { return } + UserDefaults.standard.set(enabled, forKey: "mobileSync.directPathsEnabled") + logger.info("[MobileSync] direct paths \(enabled ? "enabled" : "disabled", privacy: .public) — rebuilding client") + let wasStarted = clientStarted + eventTask?.cancel() + eventTask = nil + let oldClient = client + client = SyncClient(identity: identity, relayURL: relayURL, directPathsEnabled: enabled) + await oldClient.stop() + if wasStarted { await startClient() } + } + static var defaultRelayURLString: String { #if DEBUG return "ws://localhost:8787/ws" @@ -368,14 +398,17 @@ final class MobileAppState: ObservableObject { /// relay's registration table in sync with reality and gives a clean, /// single re-register on the next foreground. func handleScenePhase(_ phase: ScenePhase) { - guard clientStarted else { return } + guard clientStarted else { + logger.info("[Lifecycle] handleScenePhase \(String(describing: phase), privacy: .public) ignored — clientStarted=false") + return + } switch phase { case .background: logger.info("[Lifecycle] entering background — disconnecting relay") - Task { await client.stop() } + enqueueLifecycle(label: "background/stop") { [client] in await client.stop() } case .active: logger.info("[Lifecycle] entering foreground — reconnecting relay") - Task { await client.start() } + enqueueLifecycle(label: "foreground/start") { [client] in await client.start() } case .inactive: break @unknown default: @@ -383,6 +416,21 @@ final class MobileAppState: ObservableObject { } } + /// Chain a relay lifecycle transition after any in-flight one so they run + /// strictly in the order the scene phases arrived. Two bare `Task {}`s race: + /// a background `stop()` could otherwise complete after a foreground + /// `start()` and disable the relay for good. + private func enqueueLifecycle(label: String, _ work: @escaping @Sendable () async -> Void) { + let previous = lifecycleTask + lifecycleTask = Task { [logger] in + logger.info("[Lifecycle] \(label, privacy: .public) queued — awaiting previous transition") + await previous?.value + logger.info("[Lifecycle] \(label, privacy: .public) running") + await work() + logger.info("[Lifecycle] \(label, privacy: .public) done") + } + } + func startClient() async { clientStarted = true for desktop in pairedDesktops { diff --git a/RxCodeMobile/Views/MobileSettingsView.swift b/RxCodeMobile/Views/MobileSettingsView.swift index 5352ca35..3d71e07e 100644 --- a/RxCodeMobile/Views/MobileSettingsView.swift +++ b/RxCodeMobile/Views/MobileSettingsView.swift @@ -138,10 +138,16 @@ struct MobileSettingsView: View { Spacer() connectionLabel } + Toggle(isOn: Binding( + get: { MobileAppState.directPathsEnabledSetting }, + set: { enabled in Task { await state.setDirectPathsEnabled(enabled) } } + )) { + Text("Direct connection") + } } header: { Text("Paired Macs") } footer: { - Text("Select which Mac this device controls. Removing one pairing does not reset this device's identity.") + Text("Select which Mac this device controls. When Direct connection is on and you're on the same network, sync connects straight to your Mac (end-to-end encrypted) and falls back to the relay automatically.") } } @@ -159,7 +165,13 @@ struct MobileSettingsView: View { .foregroundStyle(.primary) HStack(spacing: 6) { Text("Paired \(desktop.pairedAt, format: .relative(presentation: .named))") - if let relay = desktop.relayDisplayName { + if state.activePathByDesktop[desktop.pubkeyHex] == .directLAN { + Text("•") + let rtt = (state.directRTTByDesktop[desktop.pubkeyHex] ?? nil).map { " · \($0)ms" } ?? "" + Label("Direct · LAN\(rtt)", systemImage: "bolt.horizontal.fill") + .labelStyle(.titleAndIcon) + .foregroundStyle(.green) + } else if let relay = desktop.relayDisplayName { Text("•") Label(relay, systemImage: "antenna.radiowaves.left.and.right") .labelStyle(.titleOnly) diff --git a/RxCodeTests/RxCodeMobile/State/MobileAppState.swift b/RxCodeTests/RxCodeMobile/State/MobileAppState.swift index d57a23fc..46fbd8bd 100644 --- a/RxCodeTests/RxCodeMobile/State/MobileAppState.swift +++ b/RxCodeTests/RxCodeMobile/State/MobileAppState.swift @@ -155,8 +155,11 @@ final class MobileAppState: ObservableObject { break case .inbound(let inbound): handleInbound(inbound) + case .pathChanged: + break } } + // (P2P path changes are exercised in the app target, not this test double.) private func handleInbound(_ inbound: RelayClient.Inbound) { switch inbound.payload {