diff --git a/docs/migration-1.x-to-2.x.md b/docs/migration-1.x-to-2.x.md index 52e1cfaeb..5da0e3aa3 100644 --- a/docs/migration-1.x-to-2.x.md +++ b/docs/migration-1.x-to-2.x.md @@ -185,6 +185,7 @@ assertThrows(IllegalStateException.class, future::get); What changes in practice: - Serialization problems now fail on first execution instead of surfacing later on replay. +- Operation results can be returned after a SerDes round-trip so first execution matches replay. - Custom `SerDes` implementations must be able to deserialize SDK-managed values they serialize. - Child-context results are validated consistently, including virtual child-context paths. @@ -192,20 +193,21 @@ This is usually a correctness improvement, but it can surface previously hidden ### New opt-out configuration -If your workload is very performance-sensitive and you need to skip the extra validation deserialize pass, you can opt out: +If your workload is very performance-sensitive and you need to skip the extra deserialize pass, you can opt out: ```java @Override protected DurableConfig createConfiguration() { return DurableConfig.builder() - .withSerializationRoundTripValidation(false) + .withDeserializeAfterSerialization(false) .build(); } ``` Use that carefully: -- Disabling validation can hide serialization bugs until replay. +- Disabling this can hide serialization bugs until replay. +- First execution may return the raw result shape instead of the replay result shape. - Custom `SerDes` implementations are still expected to be round-trip safe. ## Recommended Validation After Upgrading @@ -226,6 +228,6 @@ Most upgrades are straightforward: - Logger metadata moves to `executionArn`, `operationId`, and `operationName` - Replay-sensitive logging becomes per-context, `isReplaying()` moves to `DurableContext`, and step logs are no longer replay-suppressed - Validation failures now throw `IllegalStateException` -- Serialization round-trip problems surface earlier by default, with an opt-out via `withSerializationRoundTripValidation(false)` +- Serialization round-trip problems surface earlier by default, with an opt-out via `withDeserializeAfterSerialization(false)` If you update those areas first, the `1.x` to `2.x` migration should be low risk. diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java index 28d248395..1cfd2c8c7 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java @@ -31,6 +31,7 @@ *
  • Automatic region detection with fallback to us-east-1 for testing environments *
  • Environment variable credentials provider *
  • Custom SerDes with snake_case property naming + *
  • Optional post-serialization deserialization toggle for performance-sensitive workloads * */ public class CustomConfigExample extends DurableHandler { @@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() { return DurableConfig.builder() .withDurableExecutionClient(durableClient) .withSerDes(customSerDes) + // Disable the extra deserialize pass if your workload is sensitive to the added cost. + .withDeserializeAfterSerialization(false) .build(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java index a21251e19..a4b3b40bf 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java @@ -98,6 +98,7 @@ public final class DurableConfig { private final LoggerConfig loggerConfig; private final PollingStrategy pollingStrategy; private final Duration checkpointDelay; + private final boolean deserializeAfterSerialization; private final PluginRunner pluginRunner; private DurableConfig(Builder builder) { @@ -109,6 +110,7 @@ private DurableConfig(Builder builder) { this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults); this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT); this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0)); + this.deserializeAfterSerialization = builder.deserializeAfterSerialization; this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins); validateConfiguration(); @@ -186,6 +188,19 @@ public Duration getCheckpointDelay() { return checkpointDelay; } + /** + * Gets whether serialized operation data should be deserialized immediately after serialization. + * + *

    When enabled, the SDK returns deserialized operation results so first execution matches replay, and validates + * serialized exceptions before checkpointing them. Defaults to true, and custom SerDes implementations are still + * expected to be round-trip safe even if this behavior is disabled. + * + * @return true when serialized data should be deserialized immediately + */ + public boolean shouldDeserializeAfterSerialization() { + return deserializeAfterSerialization; + } + /** * Gets the plugin runner that dispatches lifecycle events to registered plugins. * @@ -293,6 +308,7 @@ public static final class Builder { private LoggerConfig loggerConfig; private PollingStrategy pollingStrategy; private Duration checkpointDelay; + private boolean deserializeAfterSerialization = true; private List plugins = new ArrayList<>(); public Builder() {} @@ -403,6 +419,20 @@ public Builder withCheckpointDelay(Duration duration) { return this; } + /** + * Controls whether the SDK immediately deserializes serialized operation data after serialization. + * + *

    This is enabled by default. When enabled, operation results are returned after a SerDes round-trip so + * first execution returns the same value shape as replay, and exceptions are checked before checkpointing. + * + * @param deserializeAfterSerialization true to deserialize serialized data immediately + * @return This builder + */ + public Builder withDeserializeAfterSerialization(boolean deserializeAfterSerialization) { + this.deserializeAfterSerialization = deserializeAfterSerialization; + return this; + } + /** * Registers one or more plugins for lifecycle event instrumentation. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java index a31c94d00..2342c70ac 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java @@ -117,6 +117,7 @@ public DurableConfig getConfiguration() { * .withDurableExecutionClient(durableClient) * .withSerDes(customSerDes) // Optional: custom SerDes for user data * .withExecutorService(customExecutor) // Optional: custom thread pool + * .withDeserializeAfterSerialization(false) // Optional: skip immediate deserialize pass * .build(); * } * } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index a47aff182..52fd39f08 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -149,6 +149,8 @@ private void executeChildContext() { } private void handleChildContextSuccess(T result) { + var serializedResult = serializeAndDeserializeResult(result); + if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) { // Skip checkpointing if // - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes @@ -156,19 +158,17 @@ private void handleChildContextSuccess(T result) { // - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing. // - nestingType is FLAT // Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response. - cachedOperationResult.set(DeserializedOperationResult.succeeded(result)); + cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized())); if (isVirtual) { fireOnOperationEnd(null, null); } markAlreadyCompleted(); } else { - checkpointSuccess(result); + checkpointSuccess(serializedResult.deserialized(), serializedResult.serialized()); } } - private void checkpointSuccess(T result) { - var serialized = serializeResult(result); - + private void checkpointSuccess(T result, String serialized) { if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) { sendOperationUpdate( OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized)); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index c2afb1ab7..5776f498b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -178,21 +178,22 @@ protected void replay(Operation existing) { @Override protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) { this.cachedResult = constructMapResult(concurrencyCompletionStatus); - var serialized = serializeResult(cachedResult); - var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8); + var serializedResult = serializeAndDeserializeResult(cachedResult); + this.cachedResult = serializedResult.deserialized(); + var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8); if (serializedBytes.length < LARGE_RESULT_THRESHOLD) { sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(getSubType().getValue()) - .payload(serialized)); + .payload(serializedResult.serialized())); } else { // Large result: checkpoint with stripped payload + replayChildren flag - var strippedResult = serializeResult(stripMapResult(cachedResult)); + var strippedResult = serializeAndDeserializeResult(stripMapResult(cachedResult)); sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(getSubType().getValue()) - .payload(strippedResult) + .payload(strippedResult.serialized()) .contextOptions( ContextOptions.builder().replayChildren(true).build())); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index 4ae7b8386..fb093bdf5 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -78,13 +78,15 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio int skippedCount = items.size() - succeededCount - failedCount; cachedResult = new ParallelResult( items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses); + var serializedResult = serializeAndDeserializeResult(cachedResult); + cachedResult = serializedResult.deserialized(); // Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned // value from get() method. sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(getSubType().getValue()) - .payload(serializeResult(cachedResult)) + .payload(serializedResult.serialized()) .contextOptions(ContextOptions.builder().replayChildren(true).build())); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java index 6ccf24e0f..6457c996d 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java @@ -34,6 +34,8 @@ public abstract class SerializableDurableOperation extends BaseDurableOperation implements DurableFuture { private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class); + protected record SerializedResult(String serialized, T deserialized) {} + private final TypeToken resultTypeToken; private final SerDes resultSerDes; @@ -95,13 +97,18 @@ protected T deserializeResult(String result) { } /** - * Serializes the result to a string. + * Serializes the result and returns the value that should be exposed to callers. + * + *

    Use this for operations that cache a first-execution result instead of reading it back from checkpoint data. + * This keeps first execution consistent with replay when a SerDes normalizes or otherwise changes the value. * * @param result the result to serialize - * @return the serialized string + * @return the serialized string and the deserialized result */ - protected String serializeResult(T result) { - return resultSerDes.serialize(result); + protected SerializedResult serializeAndDeserializeResult(T result) { + var serialized = resultSerDes.serialize(result); + var deserialized = shouldDeserializeAfterSerialization() ? deserializeResult(serialized) : result; + return new SerializedResult<>(serialized, deserialized); } /** @@ -110,8 +117,18 @@ protected String serializeResult(T result) { * @param throwable the exception to serialize * @return the serialized error object */ + @SuppressWarnings("ThrowableNotThrown") protected ErrorObject serializeException(Throwable throwable) { - return ExceptionHelper.buildErrorObject(throwable, resultSerDes); + var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes); + if (shouldDeserializeAfterSerialization()) { + deserializeException(error); + } + return error; + } + + private boolean shouldDeserializeAfterSerialization() { + var config = getContext().getDurableConfig(); + return config == null || config.shouldDeserializeAfterSerialization(); } /** diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 6018a0ccf..d76129177 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -144,9 +144,11 @@ private void checkpointStarted() { } private void handleStepSucceeded(T result) { + var serializedResult = serializeAndDeserializeResult(result); + // Send SUCCEED var successUpdate = - OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result)); + OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized()); // sendOperationUpdate must be synchronous here. When waiting for the return of this call, // the context threads waiting for the result of this step operation will be wakened up and registered. diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index 8404e1d68..07e55951a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) { // Execute check function in user executor WaitForConditionResult result = checkFunc.apply(currentState, stepContext); - // Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe - var serializedState = serializeResult(result.value()); - T deserializedValue = deserializeResult(serializedState); + // Normalize the value through SerDes so first execution matches replay. + var serializedState = serializeAndDeserializeResult(result.value()); + T deserializedValue = serializedState.deserialized(); if (result.isDone()) { // Condition met — checkpoint SUCCEED var successUpdate = OperationUpdate.builder() .action(OperationAction.SUCCEED) - .payload(serializedState); + .payload(serializedState.serialized()); sendOperationUpdate(successUpdate); } else { // Compute delay from strategy @@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) { // Checkpoint RETRY with delay var retryUpdate = OperationUpdate.builder() .action(OperationAction.RETRY) - .payload(serializedState) + .payload(serializedState.serialized()) .stepOptions(StepOptions.builder() .nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds())) .build()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java index 94d13f5ef..b810c2d62 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java @@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() { assertNotNull(config.getSerDes()); } + @Test + void testBuilder_DeserializeAfterSerializationDefaultsToTrue() { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + assertTrue(config.shouldDeserializeAfterSerialization()); + } + + @Test + void testBuilder_WithDeserializeAfterSerializationDisabled() { + var config = DurableConfig.builder() + .withDurableExecutionClient(mockClient) + .withDeserializeAfterSerialization(false) + .build(); + + assertFalse(config.shouldDeserializeAfterSerialization()); + } + @Test void testBuilder_WithAllCustomComponents() { var config = DurableConfig.builder() @@ -131,6 +149,7 @@ void testBuilder_FluentAPI() { assertSame(builder, builder.withDurableExecutionClient(mockClient)); assertSame(builder, builder.withSerDes(mockSerDes)); assertSame(builder, builder.withExecutorService(mockExecutor)); + assertSame(builder, builder.withDeserializeAfterSerialization(false)); } @Test diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java index ac56262a4..99d994538 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java @@ -24,16 +24,43 @@ import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.ChildContextFailedException; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; +import software.amazon.lambda.durable.exception.SerDesException; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; +import software.amazon.lambda.durable.serde.SerDes; /** Unit tests for ChildContextOperation. */ class ChildContextOperationTest { + private static final class SerializationOnlySerDes implements SerDes { + @Override + public String serialize(Object value) { + return "\"serialized\""; + } + + @Override + public T deserialize(String data, TypeToken typeToken) { + throw new SerDesException("cannot deserialize"); + } + } + + private static final class NormalizingSerDes implements SerDes { + @Override + public String serialize(Object value) { + return "\"serialized\""; + } + + @Override + @SuppressWarnings("unchecked") + public T deserialize(String data, TypeToken typeToken) { + return (T) "deserialized"; + } + } + private static final JacksonSerDes SERDES = new JacksonSerDes(); private DurableContextImpl durableContext; @@ -49,8 +76,13 @@ void setUp() { } private DurableConfig createConfig() { + return createConfig(true); + } + + private DurableConfig createConfig(boolean deserializeAfterSerialization) { return DurableConfig.builder() .withExecutorService(Executors.newCachedThreadPool()) + .withDeserializeAfterSerialization(deserializeAfterSerialization) .build(); } @@ -58,20 +90,28 @@ private DurableConfig createConfig() { OperationIdentifier.of("1", "test-context", OperationSubType.RUN_IN_CHILD_CONTEXT); private ChildContextOperation createOperation(Function func) { + return createOperation(func, SERDES); + } + + private ChildContextOperation createOperation(Function func, SerDes serDes) { return new ChildContextOperation<>( OPERATION_IDENTIFIER, func, TypeToken.get(String.class), - RunInChildContextConfig.builder().serDes(SERDES).build(), + RunInChildContextConfig.builder().serDes(serDes).build(), durableContext); } private ChildContextOperation createVirtualOperation(Function func) { + return createVirtualOperation(func, SERDES); + } + + private ChildContextOperation createVirtualOperation(Function func, SerDes serDes) { return new ChildContextOperation<>( OPERATION_IDENTIFIER, func, TypeToken.get(String.class), - RunInChildContextConfig.builder().serDes(SERDES).isVirtual(true).build(), + RunInChildContextConfig.builder().serDes(serDes).isVirtual(true).build(), durableContext); } @@ -132,6 +172,15 @@ void executeVirtualContext() { assertTrue(functionCalled.get(), "Function should be called during SUCCEEDED replay"); } + @Test + void virtualChildReturnsDeserializedResult() { + var operation = createVirtualOperation(ctx -> "raw", new NormalizingSerDes()); + + operation.execute(); + + assertEquals("deserialized", operation.get()); + } + // ===== FAILED replay ===== /** FAILED replay throws the original exception without re-executing. */ @@ -311,6 +360,40 @@ void childSkipsSuccessCheckpointWhenParentAlreadyCompleted() throws Exception { .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); } + /** Virtual child still validates result round-trip before skipping a success checkpoint. */ + @Test + void virtualChildFailsWhenResultCannotBeDeserialized() throws Exception { + when(executionManager.getOperationAndUpdateReplayState("1")).thenReturn(null); + + var operation = createVirtualOperation(ctx -> "result", new SerializationOnlySerDes()); + operation.execute(); + Thread.sleep(200); + + var thrown = assertThrows(ChildContextFailedException.class, operation::get); + assertTrue(thrown.getMessage().contains(SerDesException.class.getName())); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL)); + } + + /** Virtual child can skip result deserialization when disabled in DurableConfig. */ + @Test + void virtualChildSucceedsWhenResultValidationDisabled() throws Exception { + when(executionManager.getOperationAndUpdateReplayState("1")).thenReturn(null); + when(durableContext.getDurableConfig()).thenReturn(createConfig(false)); + + var operation = createVirtualOperation(ctx -> "result", new SerializationOnlySerDes()); + operation.execute(); + Thread.sleep(200); + + assertEquals("result", operation.get()); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL)); + } + /** Child skips failure checkpoint when parent operation has already completed. */ @Test void childSkipsFailureCheckpointWhenParentAlreadyCompleted() throws Exception { diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java index c544eb235..bc9e940b8 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.ErrorObject; @@ -27,7 +28,9 @@ import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; +import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.client.DurableExecutionClient; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.IllegalDurableOperationException; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; @@ -42,6 +45,45 @@ class SerializableDurableOperationTest { + private static final class TrackingSerDes extends JacksonSerDes { + private final AtomicInteger deserializeCount = new AtomicInteger(0); + + @Override + public T deserialize(String data, TypeToken typeToken) { + deserializeCount.incrementAndGet(); + return super.deserialize(data, typeToken); + } + + int getDeserializeCount() { + return deserializeCount.get(); + } + } + + private static final class SerializationOnlySerDes implements SerDes { + @Override + public String serialize(Object value) { + return "\"serialized\""; + } + + @Override + public T deserialize(String data, TypeToken typeToken) { + throw new SerDesException("cannot deserialize"); + } + } + + private static final class NormalizingSerDes implements SerDes { + @Override + public String serialize(Object value) { + return "\"serialized\""; + } + + @Override + @SuppressWarnings("unchecked") + public T deserialize(String data, TypeToken typeToken) { + return (T) "deserialized"; + } + } + private static final String OPERATION_ID = "1"; private static final String CONTEXT_ID = "1-step"; private static final String OPERATION_NAME = "name"; @@ -331,7 +373,7 @@ protected void replay(Operation existing) {} @Override public String get() { - assertEquals("abc", deserializeResult(serializeResult("abc"))); + assertEquals("abc", deserializeResult(SER_DES.serialize("abc"))); assertEquals("", deserializeResult("\"\"")); assertThrows(SerDesException.class, () -> deserializeResult("x")); return RESULT; @@ -340,6 +382,100 @@ public String get() { op.get(); } + @Test + void serializeAndDeserializeResultDeserializesResult() { + var serDes = new TrackingSerDes(); + SerializableDurableOperation op = + new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, serDes, durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var result = serializeAndDeserializeResult("abc"); + assertEquals("\"abc\"", result.serialized()); + assertEquals("abc", result.deserialized()); + assertEquals(1, serDes.getDeserializeCount()); + return RESULT; + } + }; + + op.get(); + } + + @Test + void serializeAndDeserializeResultThrowsWhenDeserializeFails() { + var serDes = new SerializationOnlySerDes(); + SerializableDurableOperation op = + new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, serDes, durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var thrown = assertThrows(SerDesException.class, () -> serializeAndDeserializeResult("abc")); + assertEquals("cannot deserialize", thrown.getMessage()); + return RESULT; + } + }; + + op.get(); + } + + @Test + void serializeAndDeserializeResultReturnsRawResultWhenDeserializationDisabled() { + when(durableContext.getDurableConfig()).thenReturn(configWithDeserializeAfterSerialization(false)); + + var serDes = new NormalizingSerDes(); + SerializableDurableOperation op = + new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, serDes, durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var result = serializeAndDeserializeResult("abc"); + assertEquals("\"serialized\"", result.serialized()); + assertEquals("abc", result.deserialized()); + return RESULT; + } + }; + + op.get(); + } + + @Test + void serializeAndDeserializeResultReturnsDeserializedValue() { + SerializableDurableOperation op = + new SerializableDurableOperation<>( + OPERATION_IDENTIFIER, RESULT_TYPE, new NormalizingSerDes(), durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var result = serializeAndDeserializeResult("raw"); + assertEquals("\"serialized\"", result.serialized()); + assertEquals("deserialized", result.deserialized()); + return RESULT; + } + }; + + op.get(); + } + @Test void deserializeException() { SerializableDurableOperation op = @@ -366,6 +502,53 @@ public String get() { op.get(); } + @Test + void serializeExceptionValidatesRoundTrip() { + var serDes = new TrackingSerDes(); + SerializableDurableOperation op = + new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, serDes, durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var error = serializeException(new RuntimeException("test exception")); + assertEquals(RuntimeException.class.getName(), error.errorType()); + assertEquals(1, serDes.getDeserializeCount()); + return RESULT; + } + }; + + op.get(); + } + + @Test + void serializeExceptionSkipsRoundTripValidationWhenDisabled() { + when(durableContext.getDurableConfig()).thenReturn(configWithDeserializeAfterSerialization(false)); + + var serDes = new SerializationOnlySerDes(); + SerializableDurableOperation op = + new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, serDes, durableContext) { + @Override + protected void start() {} + + @Override + protected void replay(Operation existing) {} + + @Override + public String get() { + var error = serializeException(new RuntimeException("test exception")); + assertEquals(RuntimeException.class.getName(), error.errorType()); + return RESULT; + } + }; + + op.get(); + } + @Test void polling() { SerializableDurableOperation op = @@ -411,4 +594,11 @@ public String get() { op.execute(); verify(executionManager, times(1)).sendOperationUpdate(update.build()); } + + private DurableConfig configWithDeserializeAfterSerialization(boolean deserializeAfterSerialization) { + return DurableConfig.builder() + .withDurableExecutionClient(mock(DurableExecutionClient.class)) + .withDeserializeAfterSerialization(deserializeAfterSerialization) + .build(); + } }