Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/migration-1.x-to-2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,27 +185,29 @@ assertThrows(IllegalStateException.class, future::get);
What changes in practice:

- Serialization problems now fail on first execution instead of surfacing later on replay.
- Operation results can be returned after a SerDes round-trip so first execution matches replay.
- Custom `SerDes` implementations must be able to deserialize SDK-managed values they serialize.
- Child-context results are validated consistently, including virtual child-context paths.

This is usually a correctness improvement, but it can surface previously hidden `SerDes` bugs during upgrade.

### New opt-out configuration

If your workload is very performance-sensitive and you need to skip the extra validation deserialize pass, you can opt out:
If your workload is very performance-sensitive and you need to skip the extra deserialize pass, you can opt out:

```java
@Override
protected DurableConfig createConfiguration() {
return DurableConfig.builder()
.withSerializationRoundTripValidation(false)
.withDeserializeAfterSerialization(false)
.build();
}
```

Use that carefully:

- Disabling validation can hide serialization bugs until replay.
- Disabling this can hide serialization bugs until replay.
- First execution may return the raw result shape instead of the replay result shape.
- Custom `SerDes` implementations are still expected to be round-trip safe.

## Recommended Validation After Upgrading
Expand All @@ -226,6 +228,6 @@ Most upgrades are straightforward:
- Logger metadata moves to `executionArn`, `operationId`, and `operationName`
- Replay-sensitive logging becomes per-context, `isReplaying()` moves to `DurableContext`, and step logs are no longer replay-suppressed
- Validation failures now throw `IllegalStateException`
- Serialization round-trip problems surface earlier by default, with an opt-out via `withSerializationRoundTripValidation(false)`
- Serialization round-trip problems surface earlier by default, with an opt-out via `withDeserializeAfterSerialization(false)`

If you update those areas first, the `1.x` to `2.x` migration should be low risk.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* <li>Automatic region detection with fallback to us-east-1 for testing environments
* <li>Environment variable credentials provider
* <li>Custom SerDes with snake_case property naming
* <li>Optional post-serialization deserialization toggle for performance-sensitive workloads
* </ul>
*/
public class CustomConfigExample extends DurableHandler<String, String> {
Expand Down Expand Up @@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
return DurableConfig.builder()
.withDurableExecutionClient(durableClient)
.withSerDes(customSerDes)
// Disable the extra deserialize pass if your workload is sensitive to the added cost.
.withDeserializeAfterSerialization(false)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public final class DurableConfig {
private final LoggerConfig loggerConfig;
private final PollingStrategy pollingStrategy;
private final Duration checkpointDelay;
private final boolean deserializeAfterSerialization;
private final PluginRunner pluginRunner;

private DurableConfig(Builder builder) {
Expand All @@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
this.deserializeAfterSerialization = builder.deserializeAfterSerialization;
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);

validateConfiguration();
Expand Down Expand Up @@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
return checkpointDelay;
}

/**
* Gets whether serialized operation data should be deserialized immediately after serialization.
*
* <p>When enabled, the SDK returns deserialized operation results so first execution matches replay, and validates
* serialized exceptions before checkpointing them. Defaults to true, and custom SerDes implementations are still
* expected to be round-trip safe even if this behavior is disabled.
*
* @return true when serialized data should be deserialized immediately
*/
public boolean shouldDeserializeAfterSerialization() {
return deserializeAfterSerialization;
}

/**
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
*
Expand Down Expand Up @@ -293,6 +308,7 @@ public static final class Builder {
private LoggerConfig loggerConfig;
private PollingStrategy pollingStrategy;
private Duration checkpointDelay;
private boolean deserializeAfterSerialization = true;
private List<DurableExecutionPlugin> plugins = new ArrayList<>();

public Builder() {}
Expand Down Expand Up @@ -403,6 +419,20 @@ public Builder withCheckpointDelay(Duration duration) {
return this;
}

/**
* Controls whether the SDK immediately deserializes serialized operation data after serialization.
*
* <p>This is enabled by default. When enabled, operation results are returned after a SerDes round-trip so
* first execution returns the same value shape as replay, and exceptions are checked before checkpointing.
*
* @param deserializeAfterSerialization true to deserialize serialized data immediately
* @return This builder
*/
public Builder withDeserializeAfterSerialization(boolean deserializeAfterSerialization) {
this.deserializeAfterSerialization = deserializeAfterSerialization;
return this;
}

/**
* Registers one or more plugins for lifecycle event instrumentation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
* .withDurableExecutionClient(durableClient)
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
* .withExecutorService(customExecutor) // Optional: custom thread pool
* .withDeserializeAfterSerialization(false) // Optional: skip immediate deserialize pass
* .build();
* }
* }</pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,26 @@ private void executeChildContext() {
}

private void handleChildContextSuccess(T result) {
var serializedResult = serializeAndDeserializeResult(result);

if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
// Skip checkpointing if
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
// after the parent has already completed.
// - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
// - nestingType is FLAT
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
if (isVirtual) {
fireOnOperationEnd(null, null);
}
markAlreadyCompleted();
} else {
checkpointSuccess(result);
checkpointSuccess(serializedResult.deserialized(), serializedResult.serialized());
}
}

private void checkpointSuccess(T result) {
var serialized = serializeResult(result);

private void checkpointSuccess(T result, String serialized) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T result is this required?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, when caching large payload

if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
sendOperationUpdate(
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,22 @@ protected void replay(Operation existing) {
@Override
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
this.cachedResult = constructMapResult(concurrencyCompletionStatus);
var serialized = serializeResult(cachedResult);
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
var serializedResult = serializeAndDeserializeResult(cachedResult);
this.cachedResult = serializedResult.deserialized();
var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8);

if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(serialized));
.payload(serializedResult.serialized()));
} else {
// Large result: checkpoint with stripped payload + replayChildren flag
var strippedResult = serializeResult(stripMapResult(cachedResult));
var strippedResult = serializeAndDeserializeResult(stripMapResult(cachedResult));
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(strippedResult)
.payload(strippedResult.serialized())
.contextOptions(
ContextOptions.builder().replayChildren(true).build()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
int skippedCount = items.size() - succeededCount - failedCount;
cachedResult = new ParallelResult(
items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses);
var serializedResult = serializeAndDeserializeResult(cachedResult);
cachedResult = serializedResult.deserialized();

// Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned
// value from get() method.
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(serializeResult(cachedResult))
.payload(serializedResult.serialized())
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);

protected record SerializedResult<T>(String serialized, T deserialized) {}

private final TypeToken<T> resultTypeToken;
private final SerDes resultSerDes;

Expand Down Expand Up @@ -95,13 +97,18 @@ protected T deserializeResult(String result) {
}

/**
* Serializes the result to a string.
* Serializes the result and returns the value that should be exposed to callers.
*
* <p>Use this for operations that cache a first-execution result instead of reading it back from checkpoint data.
* This keeps first execution consistent with replay when a SerDes normalizes or otherwise changes the value.
*
* @param result the result to serialize
* @return the serialized string
* @return the serialized string and the deserialized result
*/
protected String serializeResult(T result) {
return resultSerDes.serialize(result);
protected SerializedResult<T> serializeAndDeserializeResult(T result) {
var serialized = resultSerDes.serialize(result);
var deserialized = shouldDeserializeAfterSerialization() ? deserializeResult(serialized) : result;
return new SerializedResult<>(serialized, deserialized);
}

/**
Expand All @@ -110,8 +117,18 @@ protected String serializeResult(T result) {
* @param throwable the exception to serialize
* @return the serialized error object
*/
@SuppressWarnings("ThrowableNotThrown")
protected ErrorObject serializeException(Throwable throwable) {
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
if (shouldDeserializeAfterSerialization()) {
deserializeException(error);
}
return error;
}

private boolean shouldDeserializeAfterSerialization() {
var config = getContext().getDurableConfig();
return config == null || config.shouldDeserializeAfterSerialization();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ private void checkpointStarted() {
}

private void handleStepSucceeded(T result) {
var serializedResult = serializeAndDeserializeResult(result);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: serializedResult is a bit confusing, maybe parsedResult is better?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsedResult?


// Send SUCCEED
var successUpdate =
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result));
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using serialized here because we re-execute step during replay? why can't we use raw then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload is a string field


// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
// the context threads waiting for the result of this step operation will be wakened up and registered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) {
// Execute check function in user executor
WaitForConditionResult<T> result = checkFunc.apply(currentState, stepContext);

// Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe
var serializedState = serializeResult(result.value());
T deserializedValue = deserializeResult(serializedState);
// Normalize the value through SerDes so first execution matches replay.
var serializedState = serializeAndDeserializeResult(result.value());
T deserializedValue = serializedState.deserialized();

if (result.isDone()) {
// Condition met — checkpoint SUCCEED
var successUpdate = OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.payload(serializedState);
.payload(serializedState.serialized());
sendOperationUpdate(successUpdate);
} else {
// Compute delay from strategy
Expand All @@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) {
// Checkpoint RETRY with delay
var retryUpdate = OperationUpdate.builder()
.action(OperationAction.RETRY)
.payload(serializedState)
.payload(serializedState.serialized())
.stepOptions(StepOptions.builder()
.nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() {
assertNotNull(config.getSerDes());
}

@Test
void testBuilder_DeserializeAfterSerializationDefaultsToTrue() {
var config =
DurableConfig.builder().withDurableExecutionClient(mockClient).build();

assertTrue(config.shouldDeserializeAfterSerialization());
}

@Test
void testBuilder_WithDeserializeAfterSerializationDisabled() {
var config = DurableConfig.builder()
.withDurableExecutionClient(mockClient)
.withDeserializeAfterSerialization(false)
.build();

assertFalse(config.shouldDeserializeAfterSerialization());
}

@Test
void testBuilder_WithAllCustomComponents() {
var config = DurableConfig.builder()
Expand Down Expand Up @@ -131,6 +149,7 @@ void testBuilder_FluentAPI() {
assertSame(builder, builder.withDurableExecutionClient(mockClient));
assertSame(builder, builder.withSerDes(mockSerDes));
assertSame(builder, builder.withExecutorService(mockExecutor));
assertSame(builder, builder.withDeserializeAfterSerialization(false));
}

@Test
Expand Down
Loading
Loading