diff --git a/packages/javascript-api/src/lib/services/graphql/graphql-error.ts b/packages/javascript-api/src/lib/services/graphql/graphql-error.ts new file mode 100644 index 00000000..65cf6596 --- /dev/null +++ b/packages/javascript-api/src/lib/services/graphql/graphql-error.ts @@ -0,0 +1,44 @@ +import { GraphQLErrorExtensions, SourceLocation } from 'graphql'; + +/** + * A GraphQL error as returned by the Qminder API over a subscription or query. + */ +export interface QminderGraphQLError { + readonly message: string; + readonly errorType?: string | null; + readonly extensions?: GraphQLErrorExtensions | null; + readonly sourcePreview?: string | null; + readonly offendingToken?: string | null; + readonly locations?: SourceLocation[] | null; + readonly path?: (string | number)[] | null; +} + +/** + * GraphQL subscription error types that the server will never resolve on retry. + * Subscriptions failing with one of these are surfaced to the caller immediately. + */ +const NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES = [ + 'BAD_REQUEST', + 'FIELD_NOT_FOUND', + 'INVALID_ARGUMENT', + 'InvalidSyntax', + 'NOT_FOUND', + 'PERMISSION_DENIED', + 'ValidationError', +] as const; + +/** + * Whether any of the errors is a type the server will never resolve on retry, + * meaning the subscription should fail immediately rather than be retried. + */ +export function isNonRetryableSubscriptionError( + errors: QminderGraphQLError[], +): boolean { + return errors + .filter((error) => error.errorType) + .some(({ errorType }) => + (NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES as unknown as string[]).includes( + errorType, + ), + ); +} diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 6c3127c0..d7529fc2 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -1,21 +1,11 @@ -import { - DocumentNode, - GraphQLErrorExtensions, - print, - SourceLocation, -} from 'graphql'; +import { DocumentNode, print } from 'graphql'; import WebSocket, { CloseEvent } from 'isomorphic-ws'; import { BehaviorSubject, distinctUntilChanged, - map, Observable, - scan, shareReplay, - startWith, - Subject, Subscriber, - take, } from 'rxjs'; import { ConnectionStatus } from '../../model/connection-status.js'; @@ -24,6 +14,10 @@ import { calculateRandomizedExponentialBackoffTime } from '../../util/randomized import { sleepMs } from '../../util/sleep-ms/sleep-ms.js'; import { ApiBase, GraphqlQuery } from '../api-base/api-base.js'; import { TemporaryApiKeyService } from '../temporary-api-key/temporary-api-key.service.js'; +import { QminderGraphQLError } from './graphql-error.js'; +import { RetryableSubscriptionErrorPolicy } from './retryable-subscription-error-policy.js'; + +export type { QminderGraphQLError } from './graphql-error.js'; function parseQuery(queryOrDocumentNode: string | DocumentNode): string { if (typeof queryOrDocumentNode === 'string') { @@ -37,16 +31,6 @@ function parseQuery(queryOrDocumentNode: string | DocumentNode): string { throw new Error('queryOrDocumentNode must be a string or a DocumentNode'); } -export interface QminderGraphQLError { - readonly message: string; - readonly errorType?: string | null; - readonly extensions?: GraphQLErrorExtensions | null; - readonly sourcePreview?: string | null; - readonly offendingToken?: string | null; - readonly locations?: SourceLocation[] | null; - readonly path?: (string | number)[] | null; -} - interface Message { readonly id?: string; readonly type: MessageType; @@ -77,21 +61,6 @@ enum MessageType { GQL_ERROR = 'error', } -const NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES = [ - 'BAD_REQUEST', - 'FIELD_NOT_FOUND', - 'INVALID_ARGUMENT', - 'InvalidSyntax', - 'NOT_FOUND', - 'PERMISSION_DENIED', - 'ValidationError', -] as const; - -const RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT = 5; - -// To avoid haveAnySubscriptionsErrored returning 'false' temporarily if retrying errored subscriptions fails. -const RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS = 500; - const PONG_TIMEOUT_IN_MS = 12_000; const PING_PONG_INTERVAL_IN_MS = 20_000; @@ -134,55 +103,10 @@ export class GraphqlService { private readonly subscriptionConnection$: Observable; - private readonly retryableErroredSubscriptionsAction$ = new Subject< - | { - readonly type: 'add'; - readonly messageId: string; - readonly errors: QminderGraphQLError[]; - } - | { - readonly type: 'remove'; - readonly messageId: string; - } - | { - readonly type: 'clear'; - } - >(); - - private readonly retryableErroredSubscriptions$ = - this.retryableErroredSubscriptionsAction$.pipe( - scan((subscriptions, action) => { - const result = new Map(subscriptions); - - switch (action.type) { - case 'add': - return result.set(action.messageId, action.errors); - case 'remove': - result.delete(action.messageId); - return result; - case 'clear': - return new Map(); - } - }, new Map()), - startWith(new Map()), - shareReplay(1), - ); - - private readonly haveAnyRetryableSubscriptionsErrored$ = - this.retryableErroredSubscriptions$.pipe( - map(({ size }) => !!size), - distinctUntilChanged(), - ); - - private retryableErroredSubscriptionsRetryTimeout: ReturnType< - typeof setTimeout - > | null = null; - - private retryableErroredSubscriptionsSuccessTimeout: ReturnType< - typeof setTimeout - > | null = null; - - private retryableErroredSubscriptionsRetryCount = 0; + private readonly errorPolicy = new RetryableSubscriptionErrorPolicy({ + retry: (messageId) => this.retrySubscription(messageId), + fail: (messageId, errors) => this.failSubscription(messageId, errors), + }); private temporaryApiKeyService: TemporaryApiKeyService | undefined; @@ -209,8 +133,6 @@ export class GraphqlService { distinctUntilChanged(), shareReplay(1), ); - - this.retryableErroredSubscriptions$.subscribe(); } /** @@ -328,10 +250,7 @@ export class GraphqlService { }, ); - this.retryableErroredSubscriptionsAction$.next({ - type: 'remove', - messageId, - }); + this.errorPolicy.forget(messageId); } this.cleanUpSubscription(messageId); @@ -385,7 +304,7 @@ export class GraphqlService { * @see {@link NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES | non-retryable subscription error types} */ haveAnyRetryableSubscriptionsErrored(): Observable { - return this.haveAnyRetryableSubscriptionsErrored$; + return this.errorPolicy.haveAnyErrored$; } /** @@ -519,9 +438,7 @@ export class GraphqlService { break; case MessageType.GQL_CONNECTION_ACK: { - this.clearErroredSubscriptionsTimeouts(); - this.retryableErroredSubscriptionsRetryCount = 0; - this.retryableErroredSubscriptionsAction$.next({ type: 'clear' }); + this.errorPolicy.reset(); // Connection established — clear the blocked back-off. this.consecutiveFailedHandshakes = 0; @@ -562,19 +479,13 @@ export class GraphqlService { } case MessageType.GQL_DATA: - this.retryableErroredSubscriptionsAction$.next({ - type: 'remove', - messageId: message.id, - }); + this.errorPolicy.forget(message.id); this.messagesSubscribers.get(message.id)?.next(message.payload?.data); break; case MessageType.GQL_COMPLETE: { - this.retryableErroredSubscriptionsAction$.next({ - type: 'remove', - messageId: message.id, - }); + this.errorPolicy.forget(message.id); const subscriber = this.messagesSubscribers.get(message.id); this.cleanUpSubscription(message.id); @@ -590,49 +501,7 @@ export class GraphqlService { case MessageType.GQL_ERROR: { const errors = message.payload?.errors ?? []; - - if (this.isAnySubscriptionErrorNonRetryable(errors)) { - this.logger.error( - `Non-retryable GraphQL subscription error: ${JSON.stringify( - message, - )}`, - ); - - // May have been retryable before - this.retryableErroredSubscriptionsAction$.next({ - type: 'remove', - messageId: message.id, - }); - - const subscriber = this.messagesSubscribers.get(message.id); - this.cleanUpSubscription(message.id); - subscriber?.error(errors); - - break; - } - - this.logger.warn( - `Retryable GraphQL subscription error: ${JSON.stringify(message)}`, - ); - - this.clearErroredSubscriptionsSuccessTimeout(); - - this.retryableErroredSubscriptionsAction$.next({ - type: 'add', - messageId: message.id, - errors, - }); - - if ( - this.retryableErroredSubscriptionsRetryCount < - RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT && - !this.retryableErroredSubscriptionsRetryTimeout - ) { - this.scheduleErroredSubscriptionsRetry(); - } else if (!this.retryableErroredSubscriptionsRetryTimeout) { - this.failErroredSubscriptions(); - } - + this.errorPolicy.recordError(message.id, errors); break; } @@ -799,95 +668,33 @@ export class GraphqlService { clearInterval(this.pingPongInterval); } - private clearErroredSubscriptionsTimeouts(): void { - clearTimeout(this.retryableErroredSubscriptionsRetryTimeout ?? undefined); - this.retryableErroredSubscriptionsRetryTimeout = null; - - this.clearErroredSubscriptionsSuccessTimeout(); - } - - private clearErroredSubscriptionsSuccessTimeout(): void { - clearTimeout(this.retryableErroredSubscriptionsSuccessTimeout ?? undefined); - this.retryableErroredSubscriptionsSuccessTimeout = null; - } - - private isAnySubscriptionErrorNonRetryable( - errors: QminderGraphQLError[], - ): boolean { - return errors - .filter((error) => error.errorType) - .some(({ errorType }) => - ( - NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES as unknown as string[] - ).includes(errorType), - ); - } - - private scheduleErroredSubscriptionsRetry(): void { - const retryCount = this.retryableErroredSubscriptionsRetryCount + 1; - const delay = calculateRandomizedExponentialBackoffTime(retryCount); - - this.logger.info( - `Retry (retry count: ${retryCount}) errored subscriptions in ${delay.toFixed( - 0, - )}ms`, + /** Re-issue a subscription that the error policy decided to retry. */ + private retrySubscription(messageId: string): void { + const subscription = this.subscriptions.find( + (subscription) => subscription.messageId === messageId, ); - this.retryableErroredSubscriptionsRetryTimeout = setTimeout(() => { - this.retryErroredSubscriptions(); - this.retryableErroredSubscriptionsRetryCount = retryCount; - this.retryableErroredSubscriptionsRetryTimeout = null; - }, delay); - } + if (!subscription) { + return; + } - private failErroredSubscriptions(): void { - this.logger.error( - `Errored subscriptions retry limit (${RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached. Giving up after ${this.retryableErroredSubscriptionsRetryCount} retries`, + this.sendRawMessage( + JSON.stringify({ + id: subscription.messageId, + type: MessageType.GQL_START, + payload: { query: subscription.query }, + }), ); - - this.retryableErroredSubscriptions$ - .pipe(take(1)) - .subscribe((subscriptions) => { - for (const [messageId, errors] of subscriptions) { - const subscriber = this.messagesSubscribers.get(messageId); - this.cleanUpSubscription(messageId); - - subscriber?.error(errors); - } - }); } - private retryErroredSubscriptions(): void { - this.retryableErroredSubscriptions$ - .pipe( - take(1), - map((subscriptions) => subscriptions.keys()), - ) - .subscribe((messageIds) => { - for (const messageId of messageIds) { - const subscription = this.subscriptions.find( - (subscription) => subscription.messageId === messageId, - ); - - if (!subscription) { - continue; - } - - this.sendRawMessage( - JSON.stringify({ - id: subscription.messageId, - type: MessageType.GQL_START, - payload: { query: subscription.query }, - }), - ); - } - - this.retryableErroredSubscriptionsSuccessTimeout = setTimeout(() => { - this.retryableErroredSubscriptionsAction$.next({ type: 'clear' }); - this.retryableErroredSubscriptionsRetryCount = 0; - this.retryableErroredSubscriptionsSuccessTimeout = null; - }, RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS); - }); + /** Surface a terminal error to the subscriber and stop tracking it. */ + private failSubscription( + messageId: string, + errors: QminderGraphQLError[], + ): void { + const subscriber = this.messagesSubscribers.get(messageId); + this.cleanUpSubscription(messageId); + subscriber?.error(errors); } /** diff --git a/packages/javascript-api/src/lib/services/graphql/retryable-subscription-error-policy.ts b/packages/javascript-api/src/lib/services/graphql/retryable-subscription-error-policy.ts new file mode 100644 index 00000000..178fabf8 --- /dev/null +++ b/packages/javascript-api/src/lib/services/graphql/retryable-subscription-error-policy.ts @@ -0,0 +1,190 @@ +import { + distinctUntilChanged, + map, + Observable, + scan, + shareReplay, + startWith, + Subject, + take, +} from 'rxjs'; + +import { Logger } from '../../util/logger/logger.js'; +import { calculateRandomizedExponentialBackoffTime } from '../../util/randomized-exponential-backoff/randomized-exponential-backoff.js'; +import { + isNonRetryableSubscriptionError, + QminderGraphQLError, +} from './graphql-error.js'; + +const RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT = 5; + +// To avoid haveAnyErrored returning 'false' temporarily if retrying errored +// subscriptions fails. +const RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS = 500; + +/** + * Transport-specific actions the policy delegates back to its owner: how to + * re-issue a subscription, and how to surface a terminal error to its subscriber. + */ +export interface RetryableSubscriptionActions { + retry(messageId: string): void; + fail(messageId: string, errors: QminderGraphQLError[]): void; +} + +/** + * Decides what to do when a GraphQL subscription errors, independently of the + * transport. Non-retryable errors are failed immediately; retryable ones are + * tracked and retried as a batch with an exponential backoff, and after + * {@link RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT} attempts they are failed. + * + * Exposes {@link haveAnyErrored$} so callers can observe whether any retryable + * subscription error is currently outstanding. + */ +export class RetryableSubscriptionErrorPolicy { + private readonly logger = new Logger('GraphQL'); + + private readonly action$ = new Subject< + | { + readonly type: 'add'; + readonly messageId: string; + readonly errors: QminderGraphQLError[]; + } + | { readonly type: 'remove'; readonly messageId: string } + | { readonly type: 'clear' } + >(); + + private readonly erroredSubscriptions$ = this.action$.pipe( + scan((subscriptions, action) => { + const result = new Map(subscriptions); + switch (action.type) { + case 'add': + return result.set(action.messageId, action.errors); + case 'remove': + result.delete(action.messageId); + return result; + case 'clear': + return new Map(); + } + }, new Map()), + startWith(new Map()), + shareReplay(1), + ); + + readonly haveAnyErrored$: Observable = + this.erroredSubscriptions$.pipe( + map(({ size }) => !!size), + distinctUntilChanged(), + ); + + private retryTimeout: ReturnType | null = null; + private successTimeout: ReturnType | null = null; + private retryCount = 0; + + constructor(private readonly actions: RetryableSubscriptionActions) { + this.erroredSubscriptions$.subscribe(); + } + + /** Record that a subscription errored, retrying or failing it per policy. */ + recordError(messageId: string, errors: QminderGraphQLError[]): void { + if (isNonRetryableSubscriptionError(errors)) { + this.logger.error( + `Non-retryable GraphQL subscription error: ${JSON.stringify(errors)}`, + ); + this.action$.next({ type: 'remove', messageId }); + this.actions.fail(messageId, errors); + return; + } + + this.logger.warn( + `Retryable GraphQL subscription error: ${JSON.stringify(errors)}`, + ); + + this.clearSuccessTimeout(); + this.action$.next({ type: 'add', messageId, errors }); + + if ( + this.retryCount < RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT && + !this.retryTimeout + ) { + this.scheduleRetry(); + } else if (!this.retryTimeout) { + this.failAll(); + } + } + + /** Stop tracking a subscription (it produced data again, or completed). */ + forget(messageId: string): void { + this.action$.next({ type: 'remove', messageId }); + } + + /** Clear all tracked state and pending timers (e.g. after a fresh connection). */ + reset(): void { + this.clearTimeouts(); + this.retryCount = 0; + this.action$.next({ type: 'clear' }); + } + + /** Stop pending timers without emitting state (teardown). */ + dispose(): void { + this.clearTimeouts(); + } + + private scheduleRetry(): void { + const retryCount = this.retryCount + 1; + const delay = calculateRandomizedExponentialBackoffTime(retryCount); + + this.logger.info( + `Retry (retry count: ${retryCount}) errored subscriptions in ${delay.toFixed( + 0, + )}ms`, + ); + + this.retryTimeout = setTimeout(() => { + this.retryAll(); + this.retryCount = retryCount; + this.retryTimeout = null; + }, delay); + } + + private retryAll(): void { + this.erroredSubscriptions$ + .pipe( + take(1), + map((subscriptions) => subscriptions.keys()), + ) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + this.actions.retry(messageId); + } + + this.successTimeout = setTimeout(() => { + this.action$.next({ type: 'clear' }); + this.retryCount = 0; + this.successTimeout = null; + }, RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS); + }); + } + + private failAll(): void { + this.logger.error( + `Errored subscriptions retry limit (${RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached. Giving up after ${this.retryCount} retries`, + ); + + this.erroredSubscriptions$.pipe(take(1)).subscribe((subscriptions) => { + for (const [messageId, errors] of subscriptions) { + this.actions.fail(messageId, errors); + } + }); + } + + private clearTimeouts(): void { + clearTimeout(this.retryTimeout ?? undefined); + this.retryTimeout = null; + this.clearSuccessTimeout(); + } + + private clearSuccessTimeout(): void { + clearTimeout(this.successTimeout ?? undefined); + this.successTimeout = null; + } +}