Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 301 additions & 6 deletions docs/sharing-the-cluster/queue-splitting.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ This feature is available to users on the Team and Enterprise pricing plans.
{% endhint %}

{% hint style="info" %}
Queue splitting is currently available for [Amazon SQS](https://aws.amazon.com/sqs/), [Kafka](https://kafka.apache.org/), [RabbitMQ](https://www.rabbitmq.com), [Google Cloud Pub/Sub](https://cloud.google.com/pubsub), and [Azure Service Bus](https://azure.microsoft.com/en-us/products/service-bus).
The word "queue" in this doc is used to also refer to "topic" in the context of Kafka and Azure Service Bus, and "subscription" in the context of Google Cloud Pub/Sub.
Queue splitting is currently available for [Amazon SQS](https://aws.amazon.com/sqs/), [Kafka](https://kafka.apache.org/), [RabbitMQ](https://www.rabbitmq.com), [Google Cloud Pub/Sub](https://cloud.google.com/pubsub), [Azure Service Bus](https://azure.microsoft.com/en-us/products/service-bus), and [Redis Pub/Sub](https://redis.io/docs/latest/develop/pubsub/).
The word "queue" in this doc is used to also refer to "topic" in the context of Kafka and Azure Service Bus, "subscription" in the context of Google Cloud Pub/Sub, and "channel" in the context of Redis Pub/Sub.
{% endhint %}

## How It Works
Expand Down Expand Up @@ -131,6 +131,18 @@ If the filters defined by the two users both match some message, one of the user

{% endtab %}

{% tab title="Redis Pub/Sub" %}

First, we have a consumer app subscribed to a Redis Pub/Sub channel.

When the first mirrord Redis Pub/Sub splitting session starts, the operator becomes the only subscriber on the original channel and creates two temporary channels (one for the target deployed in the cluster, one for the user's local application). Every message it receives is checked against the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run) and republished onto the matching temporary channel. The target and the local app each subscribe to their own temporary channel instead of the original one.

If a second user then starts a mirrord Redis Pub/Sub splitting session on the same channel, an additional temporary channel is created for the second user's local application. The operator includes the new channel and the second user's filter in the routing logic. When the two users pick filters that do not overlap, each only sees its own messages; if a message matches both filters, one of the sessions receives it.

Redis Pub/Sub is fire-and-forget: there is no stored queue to drain, so any message published while the operator is not subscribed is simply gone. After all sessions end, the temporary channels are no longer used and nothing needs to be cleaned up on the Redis server, since Redis does not keep pub/sub messages around.

{% endtab %}

{% endtabs %}

Temporary queues are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources.
Expand All @@ -139,6 +151,7 @@ Please note that:
1. Temporary queues created for the deployed targets will not be deleted as long as there are any targets' pods that use them.
2. In case of SQS splitting, deployed targets will keep reading from the temporary queues as long as their temporary queues have unconsumed messages.
3. For Google Cloud Pub/Sub, the operator creates temporary topics and subscriptions. The target workload's subscription environment variable is patched to read from a temporary subscription, while the operator drains the original subscription and forwards messages through temporary topics.
4. For Redis Pub/Sub, the temporary channels are just channel names, so there is nothing to delete on the Redis server when a session ends. The operator only stops subscribing to the original channel and reverts the target workload's channel environment variable back to the original.


## Enabling Queue Splitting in Your Cluster
Expand Down Expand Up @@ -1388,6 +1401,194 @@ The template must contain all three placeholders:

Azure Service Bus resource names can be up to 260 characters. If the rendered name exceeds this limit, the original portion is truncated with a hash suffix.

{% endstep %}
{% endstepper %}
{% endtab %}

{% tab title="Redis Pub/Sub" %}

{% stepper %}
{% step %}

### Enable Redis Pub/Sub splitting in the Helm chart

Enable the `operator.redisPubsubSplitting` setting in the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/blob/main/mirrord-operator/values.yaml).

{% endstep %}
{% step %}

### Create a MirrordPropertyList

The operator connects to your Redis instance using properties from a `MirrordPropertyList`. Supported keys:

| Property | Description |
| -------- | ----------- |
| `url` | Full Redis URL, e.g. `redis://redis.example.svc:6379/0` |
| `host` + `port` | Used when `url` is not set (default port `6379`) |
| `password` | Optional password |
| `tls` | Set to `true` for `rediss://` |
| `db` | Database index (default `0`) |

Example:

```yaml
apiVersion: mirrord.metalbear.co/v1
kind: MirrordPropertyList
metadata:
name: redis-pubsub-config
namespace: orders
spec:
properties:
- name: url
value: redis://redis.orders.svc.cluster.local:6379
```

{% endstep %}
{% step %}

### Create a MirrordSplitConfig

Redis Pub/Sub uses `kind: redisPubSub` in queue entries. Each queue entry must resolve at least one channel from the target workload's environment.

#### Choosing the channel mode

Redis has three ways to subscribe, and the operator must use the same one the app uses so it sees the same messages. Pick the field that matches how your app subscribes:

| App subscribes with | Use this field | Redis commands |
| ------------------- | -------------- | -------------- |
| `SUBSCRIBE` to one exact channel name | `appConfig.channel` | `SUBSCRIBE` / `PUBLISH` |
| `PSUBSCRIBE` to a glob pattern (e.g. `orders.*`) | `appConfig.channelPattern` | `PSUBSCRIBE` / `PUBLISH` |
| `SSUBSCRIBE` to a sharded channel | `appConfig.shardChannel` | `SSUBSCRIBE` / `SPUBLISH` |

Sharded pub/sub needs Redis 7 or newer and the RESP3 protocol (it works on both standalone and cluster). Your local app must subscribe the same way the target does (for example `SSubscribe` in go-redis), otherwise it will not see the forwarded messages.

```yaml
apiVersion: queues.mirrord.metalbear.co/v1
kind: MirrordSplitConfig
metadata:
name: orders-split
namespace: orders
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: order-processor
clientConfigs:
redisPubSub: redis-pubsub-config
queues:
- id: orders-channel
kind: redisPubSub
appConfig:
channel:
- env: REDIS_CHANNEL
```

Pattern subscription (`PSUBSCRIBE`):

```yaml
appConfig:
channelPattern:
- env: REDIS_CHANNEL # e.g. orders.*
```

Sharded pub/sub (`SSUBSCRIBE`):

```yaml
appConfig:
shardChannel:
- env: REDIS_CHANNEL # e.g. events.live
```

For pattern subscriptions the operator keeps the part of the channel name that matched the pattern when it builds the temporary channel. A message on `orders.created` is republished to a temporary channel like `mirrord-tmp-ab12cd-orders.created` (the exact prefix follows the `tmpNameTemplate`, see below). Because of this, the local process must `PSUBSCRIBE` to the patched pattern, not `SUBSCRIBE` to a single exact name.

The operator patches the target workload's channel env var to the temporary channel name (or pattern). Your local process receives the same patch when you start a mirrord session with a matching filter.

#### AppConfig reference fields

Each item under `channel`, `channelPattern`, or `shardChannel` is an `AppConfigRef` that describes how to find the channel name in the workload's environment:

| Field | Description | Required |
| ----- | :---------: | :------: |
| `env` | Exact environment variable name | One of `env` or `envLike` |
| `envLike` | Regex pattern matching multiple environment variable names | One of `env` or `envLike` |
| `fallback` | Fallback value if the variable is not found (only with `env`) | No |
| `valueSelector` | JSON selector to extract the channel name from the variable's content (e.g. `.key`) | No |
| `containers` | Limit resolution to specific containers. Defaults to all containers. | No |

#### Per-queue client configuration

To use a different `MirrordPropertyList` for a specific queue entry (instead of the default from `clientConfigs.redisPubSub`), set the `clientConfig` field:

```yaml
queues:
- id: orders-channel
kind: redisPubSub
clientConfig: orders-redis-config
appConfig:
channel:
- env: REDIS_CHANNEL
```

{% hint style="warning" %}
The mirrord operator can only read the consumer's environment variables if they are either:
1. defined directly in the workload's pod template, with the value defined in `value` or in `valueFrom` via config map reference; or
2. loaded from config maps using `envFrom`.
{% endhint %}

{% endstep %}
{% step %}

### Additional options

The `MirrordSplitConfig` supports a few optional fields that control restart behavior, temporary channel naming, and teardown timing.

#### Restart policy

Controls how the workload is restarted when patched for queue splitting:

```yaml
spec:
restart:
strategy: standard
timeout: 120
waitForPods: all
```

| Field | Description | Default |
| ----- | :---------: | :-----: |
| `strategy` | `standard` or `isolatePods` | Global operator setting |
| `timeout` | Seconds to wait for pods to become ready after restart | 60 |
| `waitForPods` | Number of patched pods required before sessions may start, or `"all"` | 1 |

#### Drain timeout

For most brokers the operator waits for the original queue to drain before tearing down. Redis Pub/Sub has nothing to drain, so this wait does not really apply. You can still set `drainTimeout` (in seconds), but `0` is the natural choice for Redis so teardown happens right away:

```yaml
spec:
drainTimeout: 0
```

If omitted, the default is 30 seconds.

#### Temporary channel name template

You can customize the naming format of the temporary channels the operator publishes to:

```yaml
spec:
tmpNameTemplate: "mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL}}"
```

The template must contain all three placeholders:
* `{{RANDOM}}` - random alphabetic characters for uniqueness.
* `{{FALLBACK}}` - resolves to `-main-` for the cluster target's channel or `-` for user channels.
* `{{ORIGINAL}}` - name of the original channel being split.

{% hint style="warning" %}
Redis Pub/Sub is fire-and-forget. Messages published while no forwarder is running are not stored and cannot be replayed.
{% endhint %}

{% endstep %}
{% endstepper %}
{% endtab %}
Expand All @@ -1400,17 +1601,18 @@ Once cluster setup is done, mirrord users can start running sessions with queue
Directly under it, mirrord expects a mapping from a queue or queue ID to a queue filter definition.

Filter definition contains the following fields:
* `queue_type` - `SQS`, `Kafka`, `RMQ`, `GCPPubSub`, or `AzureServiceBus`
* `message_filter` - mapping from message attribute (SQS, GCP Pub/Sub), header (Kafka, RabbitMQ), or application property (Azure Service Bus) name to a regex for its value.
The local application will only see queue messages that have **all** of the specified message attributes/headers/properties.
* `jq_filter` - supported for `SQS`, `GCPPubSub`, and `AzureServiceBus` queue types.
* `queue_type` - `SQS`, `Kafka`, `RMQ`, `GCPPubSub`, `AzureServiceBus`, or `RedisPubSub`
* `message_filter` - mapping from message attribute (SQS, GCP Pub/Sub), header (Kafka, RabbitMQ), application property (Azure Service Bus), or top-level JSON field (Redis Pub/Sub) name to a regex for its value.
The local application will only see queue messages that have **all** of the specified message attributes/headers/properties/JSON fields.
* `jq_filter` - supported for `SQS`, `GCPPubSub`, `AzureServiceBus`, and `RedisPubSub` queue types.
* For **SQS**, it runs a jq program on the JSON representation of the SQS [`Message`](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html) object.
For queues configured with `s3Event: true`, jq filters can also inspect `S3Metadata`.
It is populated with user-defined S3 object metadata when the message is parsed as an S3 event
and metadata is fetched successfully. `S3Metadata` follows the [AWS S3 user-defined metadata format](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html#UserMetadata):
a flat key-value map where keys are lowercase strings (without the `x-amz-meta-` prefix) and values are strings.
* For **GCP Pub/Sub**, it runs a jq program on the JSON representation of the [`PubsubMessage`](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage) object.
* For **Azure Service Bus**, the JSON object has `body`, `application_properties`, `message_id`, `content_type`, and `subject` fields.
* For **Redis Pub/Sub**, it runs a jq program on the JSON representation of the message payload. Non-JSON payloads never match attribute or jq filters and are routed to the main output channel.
* A message matches if the jq program outputs `true`.

If both `message_filter` and `jq_filter` are specified for the same queue, both must match for a message to be matched.
Expand Down Expand Up @@ -1677,6 +1879,73 @@ This routes only messages whose JSON body contains `"priority": "high"` to the l

Both `message_filter` and `jq_filter` can be combined - a message must match both to be routed to the local application.

{% endtab %}

{% tab title="Redis Pub/Sub" %}

```json
{
"operator": true,
"target": "deployment/event-processor",
"feature": {
"split_queues": {
"orders-channel": {
"queue_type": "RedisPubSub",
"message_filter": {
"tenant": "^dev$"
}
}
}
}
}
```

In the example above, the local application receives JSON messages from the Redis channel described under ID `orders-channel` in the `MirrordSplitConfig`, but only when the top-level JSON field `tenant` matches `dev`.

{% endtab %}

{% tab title="Redis Pub/Sub with jq_filter" %}

```json
{
"operator": true,
"target": "deployment/event-processor",
"feature": {
"split_queues": {
"orders-channel": {
"queue_type": "RedisPubSub",
"jq_filter": ".tenant == \"dev\" and .priority > 5"
}
}
}
}
```

The operator parses each message payload as JSON and runs the `jq_filter` program on it. The message is routed to your local application only when the program returns `true`. This lets you match on nested fields or combine conditions, which a plain `message_filter` (top-level fields only) cannot do. Payloads that are not valid JSON never match a `jq_filter` (or a `message_filter`) and are sent to the cluster target instead.

{% endtab %}

{% tab title="Redis Pub/Sub with wildcard" %}

```json
{
"operator": true,
"target": "deployment/event-processor",
"feature": {
"split_queues": {
"*": {
"queue_type": "RedisPubSub",
"message_filter": {
"tenant": "^dev$"
}
}
}
}
}
```

`*` resolves to all Redis channels defined in the target's `MirrordSplitConfig`. If no `MirrordSplitConfig` exists for the target, the wildcard is silently ignored. [Check the operator logs](#troubleshooting-redis-pubsub-splitting) if messages are not being filtered as expected.

{% endtab %}
{% endtabs %}

Expand Down Expand Up @@ -1767,6 +2036,18 @@ openssl pkcs12 -in truststore.p12 -nokeys -out ca-cert.pem

Then, follow the guide for [authenticating with an SSL certificate](queue-splitting.md#how-do-i-authenticate-operators-kafka-client-with-an-ssl-certificate).

#### Can I split Redis Pub/Sub messages that are not JSON?

No. Unlike SQS, Kafka, RabbitMQ, Azure, and GCP - where filters match on separate message metadata (attributes, headers, or application properties) - a Redis Pub/Sub message carries no metadata at all, just the raw payload. So the only thing the operator can route on is the payload itself, and it interprets it as JSON: `message_filter` matches top-level JSON fields and `jq_filter` runs on the same parsed object. A payload that is not valid JSON can never match a filter, so it is always sent to the cluster target and never to a local session.

#### What happens to Redis messages published while no one is splitting?

Redis Pub/Sub is fire-and-forget: Redis delivers a message only to whoever is subscribed at that exact moment and never stores it. The operator subscribes only while a splitting session is active, so any message published when no session is running is delivered straight to the normal subscribers and is not held for later. There is no backlog to replay.

#### Should I use exact, pattern, or sharded channels?

Match whatever your application already does. If it subscribes to one fixed channel name, use `appConfig.channel` (`SUBSCRIBE`). If it subscribes to a glob like `orders.*`, use `appConfig.channelPattern` (`PSUBSCRIBE`). If it uses sharded pub/sub on Redis 7+, use `appConfig.shardChannel` (`SSUBSCRIBE`). The operator must subscribe the same way the app does, otherwise it sees a different (or empty) stream of messages.

## Troubleshooting SQS splitting

If you're trying to use SQS-splitting and are facing difficulties, here are some steps you can go through to identify
Expand Down Expand Up @@ -1874,3 +2155,17 @@ Check that:
#### Temporary queues are not being cleaned up

After all splitting sessions end, the operator deletes temporary queues. If they linger, check that the operator has `Manage` rights on the Service Bus namespace and that the operator pod is running. You can also set `drainTimeout` in the `MirrordSplitConfig` to control how long fallback queues are kept alive after sessions end.

## Troubleshooting Redis Pub/Sub splitting

If messages are not reaching your local application, check the common causes first:

- The channel mode in the `MirrordSplitConfig` (`channel`, `channelPattern`, or `shardChannel`) matches how the app subscribes. If they differ, the operator subscribes the wrong way and sees no messages.
- The payload is valid JSON. Non-JSON payloads never match a filter and always go to the cluster target.
- Sharded channels (`SSUBSCRIBE` / `SPUBLISH`) need Redis 7+ and RESP3 on both the operator's connection and your app.

For more detail, get the operator logs and raise the log level for the Redis Pub/Sub module:

```shell
helm upgrade mirrord-operator --reuse-values --set operator.logLevel "mirrord=info,operator=info,operator_queue_splitting::redis_pubsub=trace" metalbear/mirrord-operator
```