diff --git a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts index 42969500..30459eb6 100644 --- a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts +++ b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts @@ -122,8 +122,29 @@ export class GraphQLSubscriptionsFixture { await this.server.nextMessage; } - async cleanup() { + async cleanup(): Promise { + this.tearDownService(); WS.clean(); await this.server.closed; } + + private tearDownService(): void { + this.graphqlService['openSocket'] = () => Promise.resolve(); + + const socket = this.graphqlService['socket']; + if (socket) { + socket.onopen = null; + socket.onmessage = null; + socket.onerror = null; + socket.onclose = null; + + if (typeof socket.close === 'function') { + socket.close(); + } + + this.graphqlService['socket'] = null; + } + + this.graphqlService['clearPingMonitoring'](); + } } diff --git a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-blocked-websocket.spec.ts b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-blocked-websocket.spec.ts new file mode 100644 index 00000000..b2ce382a --- /dev/null +++ b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-blocked-websocket.spec.ts @@ -0,0 +1,95 @@ +import { WebSocket } from 'mock-socket'; + +import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture'; +import * as backoff from '../../../util/randomized-exponential-backoff/randomized-exponential-backoff'; + +jest.mock('isomorphic-ws', () => WebSocket); +jest.mock('../../../util/sleep-ms/sleep-ms', () => ({ + sleepMs: () => new Promise((resolve) => setTimeout(resolve, 4)), +})); + +// Mirrors the constants in graphql.service.ts (not exported). +const MAX_FAILED_HANDSHAKES = 5; +const BLOCKED_RETRY_INTERVAL_MS = 5 * 60 * 1000; + +describe('GraphQL blocked-WebSocket back-off', () => { + let fixture: GraphQLSubscriptionsFixture; + + beforeEach(() => { + fixture = new GraphQLSubscriptionsFixture(); + }); + + afterEach(async () => { + await fixture.cleanup(); + }); + + it('should back off to a slow interval after repeated handshakes fail before connecting', async () => { + const service = fixture.graphqlService as any; + const backoffSpy = jest.spyOn( + backoff, + 'calculateRandomizedExponentialBackoffTime', + ); + const infoSpy = jest.spyOn(service.logger, 'info'); + + const sub = fixture.triggerSubscription(); + + // Each cycle: connect, never ACK, close before the connection is established. + for (let i = 0; i < MAX_FAILED_HANDSHAKES; i++) { + await fixture.waitForConnection(); + await fixture.consumeInitMessage(); + await fixture.closeWithCode(1001); + fixture.openServer(); + } + + expect(service.consecutiveFailedHandshakes).toBeGreaterThanOrEqual( + MAX_FAILED_HANDSHAKES, + ); + // Fast exponential backoff is used only for the first (MAX - 1) reconnects; + // once the cap is hit, the flat slow interval is used instead. + expect(backoffSpy).toHaveBeenCalledTimes(MAX_FAILED_HANDSHAKES - 1); + expect(infoSpy).toHaveBeenCalledWith( + `Reconnect socket in ${BLOCKED_RETRY_INTERVAL_MS}ms`, + ); + + sub.unsubscribe(); + }); + + it('should not count a close that happens after a successful connection', async () => { + const service = fixture.graphqlService as any; + + const sub = fixture.triggerSubscription(); + await fixture.handleConnectionInit(); // connection_ack -> CONNECTED + await fixture.consumeSubscribeMessage(); + + expect(service.consecutiveFailedHandshakes).toBe(0); + + // A drop after the connection was established is a normal reconnect. + await fixture.closeWithCode(1001); + expect(service.consecutiveFailedHandshakes).toBe(0); + + fixture.openServer(); + await fixture.handleConnectionInit(); + sub.unsubscribe(); + }); + + it('should reset the counter when the connection is established', async () => { + const service = fixture.graphqlService as any; + + const sub = fixture.triggerSubscription(); + + for (let i = 0; i < 2; i++) { + await fixture.waitForConnection(); + await fixture.getNextMessage(); + await fixture.closeWithCode(1001); + fixture.openServer(); + } + expect(service.consecutiveFailedHandshakes).toBe(2); + + // A successful connection_ack resets the counter back to fast reconnects. + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + expect(service.consecutiveFailedHandshakes).toBe(0); + + sub.unsubscribe(); + }); +}); diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index a45c71e5..45a10bdb 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -98,6 +98,12 @@ const PING_PONG_INTERVAL_IN_MS = 20_000; // https://www.w3.org/TR/websockets/#concept-websocket-close-fail const CLIENT_SIDE_CLOSE_EVENT = 1000; +// Once the WebSocket has failed to establish this many times in a row, the reconnect +// loop backs off to a slow interval instead of minting a new temporary API key on +// every fast retry. A successful connection resets the counter. +const MAX_FAILED_HANDSHAKES = 5; +const BLOCKED_RETRY_INTERVAL_MS = 5 * 60 * 1000; + /** * A service that lets the user query Qminder API via GraphQL statements. * Queries and subscriptions are supported. There is no support for mutations. @@ -192,6 +198,10 @@ export class GraphqlService { private connectionAttemptsCount = 0; + // Consecutive WebSocket closes that happened before the connection was ever + // established. Reset to 0 on a successful GQL_CONNECTION_ACK. + private consecutiveFailedHandshakes = 0; + constructor() { this.setServer('api.qminder.com'); @@ -446,14 +456,42 @@ export class GraphqlService { reason: event.reason, }); + // Capture this before the status is overwritten below: a close while still + // CONNECTING means the connection never established. + const closedBeforeEstablished = + this.connectionStatus === ConnectionStatus.CONNECTING; + this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.socket = null; this.clearPingMonitoring(); - if (this.shouldRetry(event)) { - const timer = calculateRandomizedExponentialBackoffTime( - this.connectionAttemptsCount, + if (closedBeforeEstablished) { + this.consecutiveFailedHandshakes++; + this.logger.error( + `Received socket close event before a connection was established! ` + + `Close code: ${event.code} (consecutive failed handshakes: ${this.consecutiveFailedHandshakes})`, ); + } + + if (this.shouldRetry(event)) { + // After repeated failures to establish, back off to a slow interval so we + // stop creating a temporary API key on every retry. Recovers on its own: + // a successful connection resets consecutiveFailedHandshakes. + const isBlocked = + this.consecutiveFailedHandshakes >= MAX_FAILED_HANDSHAKES; + + if (isBlocked) { + this.logger.warn( + `Handshake failed ${this.consecutiveFailedHandshakes} times in a row; ` + + `slowing down reconnect attempts to ${BLOCKED_RETRY_INTERVAL_MS}ms.`, + ); + } + + const timer = isBlocked + ? BLOCKED_RETRY_INTERVAL_MS + : calculateRandomizedExponentialBackoffTime( + this.connectionAttemptsCount, + ); this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`); @@ -466,12 +504,6 @@ export class GraphqlService { this.logger.error('Failed to reconnect socket: ', error); }); } - - if (this.connectionStatus === ConnectionStatus.CONNECTING) { - this.logger.error( - `Received socket close event before a connection was established! Close code: ${event.code}`, - ); - } }; this.socket.onerror = () => { @@ -498,6 +530,9 @@ export class GraphqlService { this.retryableErroredSubscriptionsRetryCount = 0; this.retryableErroredSubscriptionsAction$.next({ type: 'clear' }); + // Connection established — clear the blocked back-off. + this.consecutiveFailedHandshakes = 0; + this.setConnectionStatus(ConnectionStatus.CONNECTED); this.logger.info('Connected to websocket'); this.startConnectionMonitoring();