diff --git a/openai-java-core/src/main/kotlin/com/openai/core/Futures.kt b/openai-java-core/src/main/kotlin/com/openai/core/Futures.kt new file mode 100644 index 000000000..f8768006f --- /dev/null +++ b/openai-java-core/src/main/kotlin/com/openai/core/Futures.kt @@ -0,0 +1,165 @@ +package com.openai.core + +import java.util.concurrent.CancellationException +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicReference + +@JvmSynthetic +internal fun CompletableFuture.thenApplyPropagatingCancellation( + fn: (T) -> U +): CompletableFuture { + val upstream = this + val result = CompletableFuture() + + result.whenComplete { _, error -> + if (result.isCancelled || error is CancellationException) { + upstream.cancel(false) + } + } + + upstream.whenComplete { value, error -> + if (result.isCancelled) { + return@whenComplete + } + if (error != null) { + result.completeExceptionally(error) + return@whenComplete + } + + try { + result.complete(fn(value)) + } catch (e: Throwable) { + result.completeExceptionally(e) + } + } + + return result +} + +@JvmSynthetic +internal fun CompletableFuture.handlePropagatingCancellation( + fn: (T?, Throwable?) -> U +): CompletableFuture { + val upstream = this + val result = CompletableFuture() + + result.whenComplete { _, error -> + if (result.isCancelled || error is CancellationException) { + upstream.cancel(false) + } + } + + upstream.whenComplete { value, error -> + if (result.isCancelled) { + return@whenComplete + } + + try { + result.complete(fn(value, error)) + } catch (e: Throwable) { + result.completeExceptionally(e) + } + } + + return result +} + +@JvmSynthetic +internal fun CompletableFuture.thenComposePropagatingCancellation( + fn: (T) -> CompletableFuture +): CompletableFuture { + val upstream = this + val innerFuture = AtomicReference?>() + val result = CompletableFuture() + + result.whenComplete { _, error -> + if (result.isCancelled || error is CancellationException) { + upstream.cancel(false) + innerFuture.get()?.cancel(false) + } + } + + upstream.whenComplete { value, error -> + if (result.isCancelled) { + return@whenComplete + } + if (error != null) { + result.completeExceptionally(error) + return@whenComplete + } + + val inner = + try { + fn(value) + } catch (e: Throwable) { + result.completeExceptionally(e) + return@whenComplete + } + innerFuture.set(inner) + + if (result.isCancelled) { + inner.cancel(false) + return@whenComplete + } + + inner.whenComplete { innerValue, innerError -> + if (innerError != null) { + result.completeExceptionally(innerError) + } else { + result.complete(innerValue) + } + } + } + + return result +} + +@JvmSynthetic +internal fun CompletableFuture.thenComposeAsyncPropagatingCancellation( + fn: (T) -> CompletableFuture +): CompletableFuture { + val upstream = this + val innerFuture = AtomicReference?>() + val result = CompletableFuture() + + result.whenComplete { _, error -> + if (result.isCancelled || error is CancellationException) { + upstream.cancel(false) + innerFuture.get()?.cancel(false) + } + } + + upstream.whenCompleteAsync { value, error -> + if (result.isCancelled) { + return@whenCompleteAsync + } + if (error != null) { + result.completeExceptionally(error) + return@whenCompleteAsync + } + + val inner = + try { + fn(value) + } catch (e: Throwable) { + result.completeExceptionally(e) + return@whenCompleteAsync + } + innerFuture.set(inner) + + if (result.isCancelled) { + inner.cancel(false) + return@whenCompleteAsync + } + + inner.whenComplete { innerValue, innerError -> + if (innerError != null) { + result.completeExceptionally(innerError) + } else { + result.complete(innerValue) + } + } + } + + return result +} diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/LoggingHttpClient.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/LoggingHttpClient.kt index 464a6f690..6ee8a51e0 100644 --- a/openai-java-core/src/main/kotlin/com/openai/core/http/LoggingHttpClient.kt +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/LoggingHttpClient.kt @@ -5,6 +5,7 @@ package com.openai.core.http import com.openai.core.LogLevel import com.openai.core.RequestOptions import com.openai.core.checkRequired +import com.openai.core.handlePropagatingCancellation import com.openai.core.toImmutable import java.io.ByteArrayOutputStream import java.io.InputStream @@ -80,13 +81,13 @@ private constructor( logFailure(e, Duration.between(before, OffsetDateTime.now(clock))) throw e } - return future.handle { response, error -> + return future.handlePropagatingCancellation { response, error -> val took = Duration.between(before, OffsetDateTime.now(clock)) if (error != null) { logFailure(unwrapCompletionException(error), took) throw error } - logResponse(response, took) + logResponse(response!!, took) } } diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt index b9d7eef8f..82d0688e0 100644 --- a/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt @@ -6,6 +6,8 @@ import com.openai.core.DefaultSleeper import com.openai.core.RequestOptions import com.openai.core.Sleeper import com.openai.core.checkRequired +import com.openai.core.handlePropagatingCancellation +import com.openai.core.thenComposePropagatingCancellation import com.openai.errors.OpenAIIoException import com.openai.errors.OpenAIRetryableException import java.io.IOException @@ -19,7 +21,6 @@ import java.util.UUID import java.util.concurrent.CompletableFuture import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit -import java.util.function.Function import kotlin.math.min import kotlin.math.pow @@ -98,35 +99,30 @@ private constructor( } return responseFuture - .handleAsync( - fun( - response: HttpResponse?, - throwable: Throwable?, - ): CompletableFuture { - if (response != null) { - if (++retries > maxRetries || !shouldRetry(response)) { - return CompletableFuture.completedFuture(response) - } - } else { - if (++retries > maxRetries || !shouldRetry(throwable!!)) { - val failedFuture = CompletableFuture() - failedFuture.completeExceptionally(throwable) - return failedFuture - } + .handlePropagatingCancellation { response, throwable -> + if (response != null) { + if (++retries > maxRetries || !shouldRetry(response)) { + return@handlePropagatingCancellation CompletableFuture.completedFuture( + response + ) } - - val backoffDuration = getRetryBackoffDuration(retries, response) - // All responses must be closed, so close the failed one before retrying. - response?.close() - return sleeper.sleepAsync(backoffDuration).thenCompose { - executeWithRetries(requestWithRetryCount, requestOptions) + } else { + if (++retries > maxRetries || !shouldRetry(throwable!!)) { + val failedFuture = CompletableFuture() + failedFuture.completeExceptionally(throwable) + return@handlePropagatingCancellation failedFuture } } - ) { - // Run in the same thread. - it.run() + + val backoffDuration = getRetryBackoffDuration(retries, response) + // All responses must be closed, so close the failed one before retrying. + response?.close() + sleeper.sleepAsync(backoffDuration).thenComposePropagatingCancellation { + _: Void? -> + executeWithRetries(requestWithRetryCount, requestOptions) + } } - .thenCompose(Function.identity()) + .thenComposePropagatingCancellation { it } } return executeWithRetries(modifiedRequest, requestOptions) diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/BatchServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/BatchServiceAsyncImpl.kt index b9725411d..ce7179d23 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/BatchServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/BatchServiceAsyncImpl.kt @@ -17,6 +17,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.batches.Batch import com.openai.models.batches.BatchCancelParams import com.openai.models.batches.BatchCreateParams @@ -46,28 +48,36 @@ class BatchServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture = // post /batches - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: BatchRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /batches/{batch_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: BatchListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /batches - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun cancel( params: BatchCancelParams, requestOptions: RequestOptions, ): CompletableFuture = // post /batches/{batch_id}/cancel - withRawResponse().cancel(params, requestOptions).thenApply { it.parse() } + withRawResponse().cancel(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : BatchServiceAsync.WithRawResponse { @@ -102,8 +112,10 @@ class BatchServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -138,8 +150,10 @@ class BatchServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -172,8 +186,10 @@ class BatchServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -217,8 +233,10 @@ class BatchServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { cancelHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/CompletionServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/CompletionServiceAsyncImpl.kt index 6b3f3bd66..e9b2a7f10 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/CompletionServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/CompletionServiceAsyncImpl.kt @@ -23,6 +23,8 @@ import com.openai.core.http.map import com.openai.core.http.parseable import com.openai.core.http.toAsync import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.completions.Completion import com.openai.models.completions.CompletionCreateParams import java.util.concurrent.CompletableFuture @@ -49,7 +51,9 @@ class CompletionServiceAsyncImpl internal constructor(private val clientOptions: requestOptions: RequestOptions, ): CompletableFuture = // post /completions - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun createStreaming( params: CompletionCreateParams, @@ -58,7 +62,7 @@ class CompletionServiceAsyncImpl internal constructor(private val clientOptions: // post /completions withRawResponse() .createStreaming(params, requestOptions) - .thenApply { it.parse() } + .thenApplyPropagatingCancellation { it.parse() } .toAsync(clientOptions.streamHandlerExecutor) class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : @@ -95,8 +99,10 @@ class CompletionServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -140,8 +146,10 @@ class CompletionServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .let { createStreamingHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ContainerServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ContainerServiceAsyncImpl.kt index 38e95900c..47116289c 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ContainerServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ContainerServiceAsyncImpl.kt @@ -18,6 +18,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.containers.ContainerCreateParams import com.openai.models.containers.ContainerCreateResponse import com.openai.models.containers.ContainerDeleteParams @@ -53,21 +55,27 @@ class ContainerServiceAsyncImpl internal constructor(private val clientOptions: requestOptions: RequestOptions, ): CompletableFuture = // post /containers - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: ContainerRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /containers/{container_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: ContainerListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /containers - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: ContainerDeleteParams, @@ -116,8 +124,10 @@ class ContainerServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -153,8 +163,10 @@ class ContainerServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -187,8 +199,10 @@ class ContainerServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -232,8 +246,10 @@ class ContainerServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response.use { deleteHandler.handle(it) } } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ConversationServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ConversationServiceAsyncImpl.kt index 44377ff5d..d8aa0df6c 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ConversationServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ConversationServiceAsyncImpl.kt @@ -17,6 +17,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.conversations.Conversation import com.openai.models.conversations.ConversationCreateParams import com.openai.models.conversations.ConversationDeleteParams @@ -52,28 +54,36 @@ class ConversationServiceAsyncImpl internal constructor(private val clientOption requestOptions: RequestOptions, ): CompletableFuture = // post /conversations - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: ConversationRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /conversations/{conversation_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun update( params: ConversationUpdateParams, requestOptions: RequestOptions, ): CompletableFuture = // post /conversations/{conversation_id} - withRawResponse().update(params, requestOptions).thenApply { it.parse() } + withRawResponse().update(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: ConversationDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /conversations/{conversation_id} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : ConversationServiceAsync.WithRawResponse { @@ -116,8 +126,10 @@ class ConversationServiceAsyncImpl internal constructor(private val clientOption ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -153,8 +165,10 @@ class ConversationServiceAsyncImpl internal constructor(private val clientOption ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -191,8 +205,10 @@ class ConversationServiceAsyncImpl internal constructor(private val clientOption ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { updateHandler.handle(it) } @@ -229,8 +245,10 @@ class ConversationServiceAsyncImpl internal constructor(private val clientOption ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt index daeb9988a..fce2555a2 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt @@ -16,6 +16,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.embeddings.CreateEmbeddingResponse import com.openai.models.embeddings.EmbeddingCreateParams import java.util.concurrent.CompletableFuture @@ -42,7 +44,9 @@ class EmbeddingServiceAsyncImpl internal constructor(private val clientOptions: requestOptions: RequestOptions, ): CompletableFuture = // post /embeddings - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : EmbeddingServiceAsync.WithRawResponse { @@ -78,8 +82,10 @@ class EmbeddingServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/EvalServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/EvalServiceAsyncImpl.kt index 2daa8405e..a644062b3 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/EvalServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/EvalServiceAsyncImpl.kt @@ -17,6 +17,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.evals.EvalCreateParams import com.openai.models.evals.EvalCreateResponse import com.openai.models.evals.EvalDeleteParams @@ -57,35 +59,45 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien requestOptions: RequestOptions, ): CompletableFuture = // post /evals - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: EvalRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /evals/{eval_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun update( params: EvalUpdateParams, requestOptions: RequestOptions, ): CompletableFuture = // post /evals/{eval_id} - withRawResponse().update(params, requestOptions).thenApply { it.parse() } + withRawResponse().update(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: EvalListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /evals - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: EvalDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /evals/{eval_id} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : EvalServiceAsync.WithRawResponse { @@ -128,8 +140,10 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -165,8 +179,10 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -203,8 +219,10 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { updateHandler.handle(it) } @@ -237,8 +255,10 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -283,8 +303,10 @@ class EvalServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/FileServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/FileServiceAsyncImpl.kt index 254acc07a..3b8d67dfa 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/FileServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/FileServiceAsyncImpl.kt @@ -18,6 +18,8 @@ import com.openai.core.http.json import com.openai.core.http.multipartFormData import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.files.FileContentParams import com.openai.models.files.FileCreateParams import com.openai.models.files.FileDeleteParams @@ -52,28 +54,36 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien requestOptions: RequestOptions, ): CompletableFuture = // post /files - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: FileRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /files/{file_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: FileListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /files - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: FileDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /files/{file_id} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun content( params: FileContentParams, @@ -116,8 +126,10 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -153,8 +165,10 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -187,8 +201,10 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -233,8 +249,10 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } @@ -268,8 +286,10 @@ class FileServiceAsyncImpl internal constructor(private val clientOptions: Clien ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> errorHandler.handle(response) } + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response) } } } } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ImageServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ImageServiceAsyncImpl.kt index e57b91809..f16a6bfc4 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ImageServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ImageServiceAsyncImpl.kt @@ -25,6 +25,8 @@ import com.openai.core.http.multipartFormData import com.openai.core.http.parseable import com.openai.core.http.toAsync import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.images.ImageCreateVariationParams import com.openai.models.images.ImageEditParams import com.openai.models.images.ImageEditStreamEvent @@ -52,14 +54,18 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture = // post /images/variations - withRawResponse().createVariation(params, requestOptions).thenApply { it.parse() } + withRawResponse().createVariation(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun edit( params: ImageEditParams, requestOptions: RequestOptions, ): CompletableFuture = // post /images/edits - withRawResponse().edit(params, requestOptions).thenApply { it.parse() } + withRawResponse().edit(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun editStreaming( params: ImageEditParams, @@ -68,7 +74,7 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie // post /images/edits withRawResponse() .editStreaming(params, requestOptions) - .thenApply { it.parse() } + .thenApplyPropagatingCancellation { it.parse() } .toAsync(clientOptions.streamHandlerExecutor) override fun generate( @@ -76,7 +82,9 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture = // post /images/generations - withRawResponse().generate(params, requestOptions).thenApply { it.parse() } + withRawResponse().generate(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun generateStreaming( params: ImageGenerateParams, @@ -85,7 +93,7 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie // post /images/generations withRawResponse() .generateStreaming(params, requestOptions) - .thenApply { it.parse() } + .thenApplyPropagatingCancellation { it.parse() } .toAsync(clientOptions.streamHandlerExecutor) class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : @@ -122,8 +130,10 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createVariationHandler.handle(it) } @@ -157,8 +167,10 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { editHandler.handle(it) } @@ -198,8 +210,10 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .let { editStreamingHandler.handle(it) } @@ -235,8 +249,10 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { generateHandler.handle(it) } @@ -280,8 +296,10 @@ class ImageServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .let { generateStreamingHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ModelServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ModelServiceAsyncImpl.kt index 7e572d6b7..bc6b0de5f 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ModelServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ModelServiceAsyncImpl.kt @@ -17,6 +17,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.models.Model import com.openai.models.models.ModelDeleteParams import com.openai.models.models.ModelDeleted @@ -46,21 +48,27 @@ class ModelServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture = // get /models/{model} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: ModelListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /models - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: ModelDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /models/{model} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : ModelServiceAsync.WithRawResponse { @@ -97,8 +105,10 @@ class ModelServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -131,8 +141,10 @@ class ModelServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -177,8 +189,10 @@ class ModelServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ModerationServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ModerationServiceAsyncImpl.kt index 7a18a9122..8249ea0bd 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ModerationServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ModerationServiceAsyncImpl.kt @@ -16,6 +16,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.moderations.ModerationCreateParams import com.openai.models.moderations.ModerationCreateResponse import java.util.concurrent.CompletableFuture @@ -39,7 +41,9 @@ class ModerationServiceAsyncImpl internal constructor(private val clientOptions: requestOptions: RequestOptions, ): CompletableFuture = // post /moderations - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : ModerationServiceAsync.WithRawResponse { @@ -75,8 +79,10 @@ class ModerationServiceAsyncImpl internal constructor(private val clientOptions: ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsync.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsync.kt index 9e07ba1fa..a1885aa9d 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsync.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsync.kt @@ -9,6 +9,7 @@ import com.openai.core.http.AsyncStreamResponse import com.openai.core.http.HttpResponse import com.openai.core.http.HttpResponseFor import com.openai.core.http.StreamResponse +import com.openai.core.thenApplyPropagatingCancellation import com.openai.models.responses.CompactedResponse import com.openai.models.responses.Response import com.openai.models.responses.ResponseCancelParams @@ -90,7 +91,7 @@ interface ResponseServiceAsync { params: StructuredResponseCreateParams, requestOptions: RequestOptions = RequestOptions.none(), ): CompletableFuture> = - create(params.rawParams, requestOptions).thenApply { + create(params.rawParams, requestOptions).thenApplyPropagatingCancellation { StructuredResponse(params.responseType, it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsyncImpl.kt index 8a38b0adb..76475933c 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/ResponseServiceAsyncImpl.kt @@ -25,6 +25,8 @@ import com.openai.core.http.map import com.openai.core.http.parseable import com.openai.core.http.toAsync import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.responses.CompactedResponse import com.openai.models.responses.Response import com.openai.models.responses.ResponseCancelParams @@ -70,7 +72,9 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C requestOptions: RequestOptions, ): CompletableFuture = // post /responses - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun createStreaming( params: ResponseCreateParams, @@ -79,7 +83,7 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C // post /responses withRawResponse() .createStreaming(params, requestOptions) - .thenApply { it.parse() } + .thenApplyPropagatingCancellation { it.parse() } .toAsync(clientOptions.streamHandlerExecutor) override fun retrieve( @@ -87,7 +91,9 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C requestOptions: RequestOptions, ): CompletableFuture = // get /responses/{response_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieveStreaming( params: ResponseRetrieveParams, @@ -96,7 +102,7 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C // get /responses/{response_id} withRawResponse() .retrieveStreaming(params, requestOptions) - .thenApply { it.parse() } + .thenApplyPropagatingCancellation { it.parse() } .toAsync(clientOptions.streamHandlerExecutor) override fun delete( @@ -111,14 +117,18 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C requestOptions: RequestOptions, ): CompletableFuture = // post /responses/{response_id}/cancel - withRawResponse().cancel(params, requestOptions).thenApply { it.parse() } + withRawResponse().cancel(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun compact( params: ResponseCompactParams, requestOptions: RequestOptions, ): CompletableFuture = // post /responses/compact - withRawResponse().compact(params, requestOptions).thenApply { it.parse() } + withRawResponse().compact(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : ResponseServiceAsync.WithRawResponse { @@ -166,8 +176,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -211,8 +223,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .let { createStreamingHandler.handle(it) } @@ -250,8 +264,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -289,8 +305,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .let { retrieveStreamingHandler.handle(it) } @@ -328,8 +346,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response.use { deleteHandler.handle(it) } } @@ -360,8 +380,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { cancelHandler.handle(it) } @@ -395,8 +417,10 @@ class ResponseServiceAsyncImpl internal constructor(private val clientOptions: C ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { compactHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/SkillServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/SkillServiceAsyncImpl.kt index 6494d9319..ef1e1b9ce 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/SkillServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/SkillServiceAsyncImpl.kt @@ -18,6 +18,8 @@ import com.openai.core.http.json import com.openai.core.http.multipartFormData import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.skills.DeletedSkill import com.openai.models.skills.Skill import com.openai.models.skills.SkillCreateParams @@ -60,35 +62,45 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture = // post /skills - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: SkillRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /skills/{skill_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun update( params: SkillUpdateParams, requestOptions: RequestOptions, ): CompletableFuture = // post /skills/{skill_id} - withRawResponse().update(params, requestOptions).thenApply { it.parse() } + withRawResponse().update(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: SkillListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /skills - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: SkillDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /skills/{skill_id} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : SkillServiceAsync.WithRawResponse { @@ -135,8 +147,10 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -171,8 +185,10 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -208,8 +224,10 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { updateHandler.handle(it) } @@ -242,8 +260,10 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -288,8 +308,10 @@ class SkillServiceAsyncImpl internal constructor(private val clientOptions: Clie ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/UploadServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/UploadServiceAsyncImpl.kt index 7e8569f06..b03404b7d 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/UploadServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/UploadServiceAsyncImpl.kt @@ -17,6 +17,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.uploads.Upload import com.openai.models.uploads.UploadCancelParams import com.openai.models.uploads.UploadCompleteParams @@ -50,21 +52,27 @@ class UploadServiceAsyncImpl internal constructor(private val clientOptions: Cli requestOptions: RequestOptions, ): CompletableFuture = // post /uploads - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun cancel( params: UploadCancelParams, requestOptions: RequestOptions, ): CompletableFuture = // post /uploads/{upload_id}/cancel - withRawResponse().cancel(params, requestOptions).thenApply { it.parse() } + withRawResponse().cancel(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun complete( params: UploadCompleteParams, requestOptions: RequestOptions, ): CompletableFuture = // post /uploads/{upload_id}/complete - withRawResponse().complete(params, requestOptions).thenApply { it.parse() } + withRawResponse().complete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : UploadServiceAsync.WithRawResponse { @@ -106,8 +114,10 @@ class UploadServiceAsyncImpl internal constructor(private val clientOptions: Cli ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -143,8 +153,10 @@ class UploadServiceAsyncImpl internal constructor(private val clientOptions: Cli ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { cancelHandler.handle(it) } @@ -180,8 +192,10 @@ class UploadServiceAsyncImpl internal constructor(private val clientOptions: Cli ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { completeHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/VectorStoreServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/VectorStoreServiceAsyncImpl.kt index 129bd047d..c8ac5a795 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/VectorStoreServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/VectorStoreServiceAsyncImpl.kt @@ -18,6 +18,8 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.vectorstores.VectorStore import com.openai.models.vectorstores.VectorStoreCreateParams import com.openai.models.vectorstores.VectorStoreDeleteParams @@ -70,42 +72,54 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions requestOptions: RequestOptions, ): CompletableFuture = // post /vector_stores - withRawResponse().create(params, requestOptions).thenApply { it.parse() } + withRawResponse().create(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun retrieve( params: VectorStoreRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture = // get /vector_stores/{vector_store_id} - withRawResponse().retrieve(params, requestOptions).thenApply { it.parse() } + withRawResponse().retrieve(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun update( params: VectorStoreUpdateParams, requestOptions: RequestOptions, ): CompletableFuture = // post /vector_stores/{vector_store_id} - withRawResponse().update(params, requestOptions).thenApply { it.parse() } + withRawResponse().update(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun list( params: VectorStoreListParams, requestOptions: RequestOptions, ): CompletableFuture = // get /vector_stores - withRawResponse().list(params, requestOptions).thenApply { it.parse() } + withRawResponse().list(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun delete( params: VectorStoreDeleteParams, requestOptions: RequestOptions, ): CompletableFuture = // delete /vector_stores/{vector_store_id} - withRawResponse().delete(params, requestOptions).thenApply { it.parse() } + withRawResponse().delete(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } override fun search( params: VectorStoreSearchParams, requestOptions: RequestOptions, ): CompletableFuture = // post /vector_stores/{vector_store_id}/search - withRawResponse().search(params, requestOptions).thenApply { it.parse() } + withRawResponse().search(params, requestOptions).thenApplyPropagatingCancellation { + it.parse() + } class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) : VectorStoreServiceAsync.WithRawResponse { @@ -154,8 +168,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { createHandler.handle(it) } @@ -192,8 +208,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { retrieveHandler.handle(it) } @@ -231,8 +249,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { updateHandler.handle(it) } @@ -266,8 +286,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { listHandler.handle(it) } @@ -313,8 +335,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { deleteHandler.handle(it) } @@ -352,8 +376,10 @@ class VectorStoreServiceAsyncImpl internal constructor(private val clientOptions ) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } - .thenApply { response -> + .thenComposeAsyncPropagatingCancellation { + clientOptions.httpClient.executeAsync(it, requestOptions) + } + .thenApplyPropagatingCancellation { response -> errorHandler.handle(response).parseable { response .use { searchHandler.handle(it) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/VideoServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/VideoServiceAsyncImpl.kt index 2cfe8c891..a4a2eb996 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/VideoServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/VideoServiceAsyncImpl.kt @@ -18,6 +18,8 @@ import com.openai.core.http.json import com.openai.core.http.multipartFormData import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.thenApplyPropagatingCancellation +import com.openai.core.thenComposeAsyncPropagatingCancellation import com.openai.models.videos.Video import com.openai.models.videos.VideoCreateCharacterParams import com.openai.models.videos.VideoCreateCharacterResponse @@ -55,35 +57,45 @@ class VideoServiceAsyncImpl internal constructor(private val clientOptions: Clie requestOptions: RequestOptions, ): CompletableFuture