[TDD-red] CloudAIGymEnv cache key must include env_params (drives domain-randomization correctness fix)#900
Conversation
📝 WalkthroughWalkthroughThis PR introduces agent run orchestration and gymnasium integration. It adds a step counter to track trial progression, implements BaseAgent.run() to coordinate agent-environment interaction loops, integrates step tracking into CloudAIGymEnv, refactors the handler to delegate to agent.run(), and adds GymnasiumAdapter to expose BaseGym environments through the gymnasium.Env interface with gymnasium-compatible space definitions and step counter synchronization. ChangesAgent Run Loop and Gymnasium Integration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
- Agents that set HAS_CUSTOM_TRAINING_LOOP = True drive their own training loop; handle_dse_job calls agent.train() and skips the per-step env.step loop. - New _run_custom_training_loop helper logs exceptions, returns a process-style exit code, and always invokes agent.shutdown() (when defined) in a finally block so resources are released on both success and failure paths. - CustomTrainingLoopAgent Protocol documents the opt-in contract for type checkers and IDEs.
Pyright rejected calling _run_custom_training_loop(agent, ...) because the plain bool predicate did not narrow agent's static type from BaseAgent to CustomTrainingLoopAgent. Return TypeGuard[CustomTrainingLoopAgent] from _has_custom_training_loop so the truthy branch in handle_dse_job sees the opted-in shape and the helper can call agent.train() directly.
If agent.shutdown() raised from the finally block, Python suppressed the earlier return 0/1 from agent.train() and propagated the exception, breaking the outer test-run loop in handle_dse_job (skipped remaining scenarios, failed to accumulate err |= rc). Wrap shutdown() in its own try/except, log via logging.exception, set rc = 1, and return rc after finally so the helper always honours the (int) -> int contract. Adds tests for shutdown-only failure and combined train+shutdown failure.
…mutator Previously, ``test_run.step`` had no clear owner: the dispatcher set it from outside, the adapter rewound it on ``reset()``, and other callers wrote to it ad hoc. In RLlib custom-loop runs this collapsed every trial onto ``step=1``, overwriting ``trajectory.csv`` and ``env.csv`` rows. Centralize the invariant: ``TestRun.increment_step()`` is the single named mutator, and ``CloudAIGymEnv.step()`` is its only caller. One ``env.step()`` call advances the trial counter by exactly one — independent of any episode or dispatcher concept above the gym env. Contract tests in ``TestIncrementStep`` cover the API; ``test_cloudaigym`` asserts ``step`` is advanced *before* ``output_path`` and trajectory rows are computed, so cached and live trials both record the post-increment value.
…orphism Earlier commits in this PR introduced ``HAS_CUSTOM_TRAINING_LOOP`` + a ``CustomTrainingLoopAgent`` Protocol + a TypeGuard helper + an ``if/else`` in ``handle_dse_job`` to switch between the cloudai step loop and an agent-owned ``train()`` loop. That is a type-tagged conditional dispatching on agent identity — the textbook signal to replace conditional with polymorphism (Fowler). Add a default ``BaseAgent.run() -> int`` that holds the step-loop body (``select_action`` / ``env.step`` / ``update_policy`` per trial). Agents that drive their own training (RLlib, etc.) override ``run()`` to delegate to whatever loop they own and return a process-style exit code. ``handle_dse_job`` collapses to ``err |= agent.run()`` — one line, no branching, no Protocol vocabulary. The handler no longer knows that "custom training loops" exist as a category; that's an agent implementation detail. Net: -89 lines on cloudai. Surface area shrinks (no Protocol, no TypeGuard, no flag). ``test_handlers`` replaces the 5 helper unit tests + 2 dispatcher integration tests with 2 polymorphic tests asserting ``handle_dse_job`` delegates to ``agent.run()`` and propagates its return code.
- GymnasiumAdapter wraps a CloudAI BaseGym as a gymnasium-compatible env: Dict of Discrete actions over the tunable params, float32 Box observation derived from define_observation_space(), gymnasium 5-tuple from step/step_raw. - Fixed (single-value) params are injected on every step so agents only ever see the tunable subset; step_raw bypasses index decoding and fixed injection for callers that already have a fully-formed parameter dict. - The adapter mirrors handle_dse_job's 1-based test_run.step so artifact paths match whether the env is driven by the DSE loop or an external training loop. - CloudAIGymEnv.define_observation_space() now returns one slot per agent metric (at least one), giving adapters the correct Box shape. - gymnasium is an optional dependency: lazy-imported inside the adapter and exposed via a new rl extra (also added to dev) pinned to ~=1.2.
decode_action used to accept partial action dicts and step_raw accepted arbitrary key sets, both silently propagating incomplete params to the underlying env. Validate up front: - decode_action requires keys exactly == self._tunable_params, and each discrete index must be in range. - step_raw requires keys exactly == self._tunable_params | self._fixed_params. A new _assert_keys helper raises ValueError with sorted missing / extra lists so callers see exactly what was wrong. Adds tests for missing key, unknown key, out-of-range index, missing fixed param, and unknown raw key paths.
CloudAIGymEnv.get_cached_trajectory_result(action) keys the trajectory cache on action alone. When a workload declares env_params (e.g. drop_rate) and the agent re-selects the same action under a different env_params sample, the cache returns a stale reward measured under a different env, silently invalidating any domain-randomization workflow. Four tests pin the contract; today two FAIL (bug-exposing), two PASS (back-compat sanity). All 27 pre-existing tests are untouched. FAIL test_cache_miss_when_env_params_differ FAIL test_step_reruns_workload_when_env_params_change PASS test_cache_hit_when_action_and_env_params_match PASS test_cache_hit_when_neither_has_env_params Driver for the follow-up PR that adds env_params as a first-class field on TestDefinition + TrajectoryEntry and rewrites the cache key as (action, env_params). Both unit and integration shapes are covered so the fix can be validated end-to-end through env.step().
7a27fa2 to
6b9f126
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/cloudai/configurator/base_agent.py`:
- Line 148: The log call assumes observation is a numeric list and will raise
TypeError for non-numeric entries; update the logging around the line with
logging.info so it formats observation safely by mapping each item to a rounded
number when isinstance(item, (int, float)) and falling back to str(item) (or by
wrapping the comprehension in try/except and formatting with str on failure),
then log the resulting list along with step and reward; modify the code that
constructs the observation string (the expression using round(obs, 4)) to use
this safe formatting approach.
- Around line 137-147: The loop handling env.step(...) in base_agent.py
currently ignores the done flag and continues stepping; modify the control flow
in the method that contains the lines where observation, reward, done, *_ =
self.env.step(action) and the subsequent self.update_policy({...}) call so that
after calling update_policy you check if done is True and break (or return) to
stop the episode; ensure the terminal transition is still recorded (i.e., keep
the update_policy call for the final step) and then exit the loop to avoid
stepping into terminal states.
In `@tests/test_cloudaigym.py`:
- Around line 505-520: The test function
test_cache_hit_when_neither_has_env_params has a multi-line signature that fails
ruff-format; reformat the function signature and body to conform to ruff (e.g.,
collapse the signature to a single line and run ruff format) so the file passes
ruff-format checks, then re-run tests; target the function named
test_cache_hit_when_neither_has_env_params in the tests/test_cloudaigym.py diff
and apply ruff format or equivalent style fixes.
In `@tests/test_handlers.py`:
- Around line 287-288: The override of select_action must match the base
signature; update the tests/test_handlers.py select_action method to accept the
observation parameter (e.g., def select_action(self, observation: Any) ->
tuple[int, dict[str, Any]]), preserving the same return type and behavior (still
raise AssertionError), so the signature is compatible with
BaseAgent.select_action and satisfies the type checker.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Enterprise
Run ID: 5198c57f-3f25-4c55-92be-50bcc9233c1a
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
pyproject.tomlsrc/cloudai/_core/test_scenario.pysrc/cloudai/cli/handlers.pysrc/cloudai/configurator/__init__.pysrc/cloudai/configurator/base_agent.pysrc/cloudai/configurator/cloudai_gym.pysrc/cloudai/configurator/gymnasium_adapter.pytests/test_cloudaigym.pytests/test_gymnasium_adapter.pytests/test_handlers.pytests/test_test_scenario.py
| observation, reward, done, *_ = self.env.step(action) | ||
| self.update_policy( | ||
| { | ||
| "trial_index": step, | ||
| "value": reward, | ||
| "observation": observation, | ||
| "prev_observation": prev_observation, | ||
| "action": action, | ||
| "done": done, | ||
| } | ||
| ) |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚖️ Poor tradeoff
Missing done check will continue stepping past episode termination.
The done flag from env.step() is captured at line 137 but never evaluated. Standard gymnasium/RL semantics require breaking the loop when done=True to prevent stepping in terminal states, which can cause invalid transitions or downstream errors.
🔁 Proposed fix to respect the done flag
observation, reward, done, *_ = self.env.step(action)
self.update_policy(
{
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_observation,
"action": action,
"done": done,
}
)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
+ if done:
+ break
return 0🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/cloudai/configurator/base_agent.py` around lines 137 - 147, The loop
handling env.step(...) in base_agent.py currently ignores the done flag and
continues stepping; modify the control flow in the method that contains the
lines where observation, reward, done, *_ = self.env.step(action) and the
subsequent self.update_policy({...}) call so that after calling update_policy
you check if done is True and break (or return) to stop the episode; ensure the
terminal transition is still recorded (i.e., keep the update_policy call for the
final step) and then exit the loop to avoid stepping into terminal states.
| "done": done, | ||
| } | ||
| ) | ||
| logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}") |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
Observation rounding assumes numeric list.
round(obs, 4) will raise TypeError if observation contains non-numeric values. Consider either validating the observation type or wrapping in a try-except for robust logging.
🛡️ Proposed fix to handle non-numeric observations
- logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
+ try:
+ obs_str = [round(obs, 4) for obs in observation]
+ except (TypeError, ValueError):
+ obs_str = observation
+ logging.info(f"Step {step}: Observation: {obs_str}, Reward: {reward:.4f}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}") | |
| try: | |
| obs_str = [round(obs, 4) for obs in observation] | |
| except (TypeError, ValueError): | |
| obs_str = observation | |
| logging.info(f"Step {step}: Observation: {obs_str}, Reward: {reward:.4f}") |
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 148-148: Logging statement uses f-string
(G004)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/cloudai/configurator/base_agent.py` at line 148, The log call assumes
observation is a numeric list and will raise TypeError for non-numeric entries;
update the logging around the line with logging.info so it formats observation
safely by mapping each item to a rounded number when isinstance(item, (int,
float)) and falling back to str(item) (or by wrapping the comprehension in
try/except and formatting with str on failure), then log the resulting list
along with step and reward; modify the code that constructs the observation
string (the expression using round(obs, 4)) to use this safe formatting
approach.
| def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) -> None: | ||
| """Workloads without env_params behave exactly as today (back-compat).""" | ||
| runner = MagicMock() | ||
| runner.scenario_root = tmp_path / "scenario" | ||
| runner.system = MagicMock() | ||
| runner.test_scenario = MagicMock(test_runs=[]) | ||
| runner.jobs = {} | ||
| runner.testrun_to_job_map = {} | ||
|
|
||
| env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) | ||
| env.test_run.current_iteration = 0 | ||
| env.trajectory = {0: [TrajectoryEntry(step=1, action={"x": 10}, reward=0.5, observation=[100.0])]} | ||
| # Note: neither the cached entry nor test_run carries env_params -> existing behavior. | ||
|
|
||
| result = env.get_cached_trajectory_result({"x": 10}) | ||
| assert result is not None and result.step == 1 |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🔴 Critical | ⚡ Quick win
Fix formatting to pass ruff-format check.
The function signature spans multiple lines and violates the project's formatting rules.
🔧 Run ruff format to fix
#!/bin/bash
ruff format tests/test_cloudaigym.py🧰 Tools
🪛 Ruff (0.15.15)
[warning] 520-520: Assertion should be broken down into multiple parts
Break down assertion into multiple parts
(PT018)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/test_cloudaigym.py` around lines 505 - 520, The test function
test_cache_hit_when_neither_has_env_params has a multi-line signature that fails
ruff-format; reformat the function signature and body to conform to ruff (e.g.,
collapse the signature to a single line and run ruff format) so the file passes
ruff-format checks, then re-run tests; target the function named
test_cache_hit_when_neither_has_env_params in the tests/test_cloudaigym.py diff
and apply ruff format or equivalent style fixes.
| def select_action(self) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called | ||
| raise AssertionError("select_action must not be called when run() is overridden") |
There was a problem hiding this comment.
🎯 Functional Correctness | 🔴 Critical | ⚡ Quick win
Fix incompatible method override signature — CI is failing.
The select_action override is missing the observation parameter present in the base class. Even though this method is never called (since run() is overridden), the signature must remain compatible with BaseAgent.select_action to satisfy the type checker.
🐛 Proposed fix
- def select_action(self) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called
+ def select_action(self, observation: list[float] | None = None) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called
raise AssertionError("select_action must not be called when run() is overridden")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def select_action(self) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called | |
| raise AssertionError("select_action must not be called when run() is overridden") | |
| def select_action(self, observation: list[float] | None = None) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called | |
| raise AssertionError("select_action must not be called when run() is overridden") |
🧰 Tools
🪛 GitHub Actions: CI / 2_Linting.txt
[error] 287-287: pyright: Method "select_action" overrides class "BaseAgent" in an incompatible manner. Positional parameter count mismatch; base method has 2, but override has 1. Parameter 2 mismatch: base parameter "observation" is keyword parameter, override parameter is position-only (reportIncompatibleMethodOverride).
🪛 GitHub Actions: CI / Linting
[error] 287-287: pyright failed (hook id: pyright). Method "select_action" overrides class "BaseAgent" in an incompatible manner: positional parameter count mismatch (base has 2, override has 1). Parameter 2 mismatch: base parameter "observation" is keyword parameter, override parameter is position-only (reportIncompatibleMethodOverride).
🪛 Ruff (0.15.15)
[warning] 288-288: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/test_handlers.py` around lines 287 - 288, The override of select_action
must match the base signature; update the tests/test_handlers.py select_action
method to accept the observation parameter (e.g., def select_action(self,
observation: Any) -> tuple[int, dict[str, Any]]), preserving the same return
type and behavior (still raise AssertionError), so the signature is compatible
with BaseAgent.select_action and satisfies the type checker.
Source: Linters/SAST tools
Status: TDD red — drives a follow-up fix PR; do not merge alone
Stacked on #893 (`rpatro/step-ownership-tdd`); diff vs. `main` will look larger than the actual test addition until #893 lands. The test-only delta on top of #893 is +127 lines, one file (`tests/test_cloudaigym.py`).
This PR adds four tests pinning the contract that `CloudAIGymEnv`'s trajectory cache must include `env_params` in its key. Two tests FAIL today (bug-exposing), two PASS (back-compat sanity).
Bug
`CloudAIGymEnv.get_cached_trajectory_result(action)` keys the trajectory cache on `action` alone (`src/cloudai/configurator/cloudai_gym.py`, around L255-260). When a workload declares `env_params` (e.g. `drop_rate` for domain randomization) and the agent re-selects the same action under a different `env_params` sample, the cache returns the stale reward measured under the first sample. The agent silently trains on labels that do not correspond to the env they were nominally generated under.
Concrete evidence
Reproduced during the `mrc_prt_rl_ppo_dr` smoke test (2026-05-23):
Tests added
The tests use `object.setattr` to attach `env_params` to the frozen `TrajectoryEntry` and to set `current_env_params` on `TestRun` — deliberately, so the contract test compiles before the schema change. A helper `_seed_cached_entry_with_env_params` localizes this; once the fix lands and `TrajectoryEntry.env_params` is a real field, the helper is dropped in favour of a constructor kwarg.
Follow-up fix PR (separate)
This makes `env.csv` ↔ `trajectory.csv` 1:1 alignment a structural invariant and unblocks any DR-driven downstream work across cloudai.
Why TDD-red as a separate PR
Stack & rebase status (updated 2026-06-12)
Stack:
main← #893 ← #900 ← #901Re-stacked on top of #893 (previously tangled with #893's foundation commits). Now carries only:
GymnasiumAdapter+ fail-fast action/param-key validation + the cache-key TDD-red test.main; stacking is logical.