diff --git a/README.md b/README.md
index 1a25499..f3f1b76 100644
--- a/README.md
+++ b/README.md
@@ -104,6 +104,30 @@ It also features some quality of life improvements for ROS outputs.
+- ### **```ros2 param list``` coloring**
+ Node headers are colored with the group badge; parameter names are rendered dim so the structure is easy to scan. `--param-type` type annotations are dimmed automatically.
+
+Action list output colored by owning node group, with the same badge and style options available in all other CLI commands.
🚫
Truly non-invasive
No launch file changes. Exit codes preserved. DENDROS_DISABLE=1 bypasses everything instantly.
diff --git a/docs/param-change-alert.md b/docs/param-change-alert.md
new file mode 100644
index 0000000..8227b0d
--- /dev/null
+++ b/docs/param-change-alert.md
@@ -0,0 +1,93 @@
+# Parameter Change Alert
+
+When a node's parameter is changed at runtime via `ros2 param set` or any ROS 2 parameter service client, DendROS prints an inline notification in the launch terminal — so you always know when and what changed without leaving the log view.
+
+Enabled by default. Disable with:
+
+```bash
+dendros config # → "Param change alert" → off
+```
+
+or in `~/.config/dendROS/defaults.yaml`:
+
+```yaml
+param_change_alert: false
+```
+
+---
+
+## What it looks like
+
+### Inline
+
+
+
+
+
ros2 launch my_pkg bringup.launch.py
+
+
+
+
+
+
+
+
+### Inverted
+
+
+
+
+
ros2 launch my_pkg bringup.launch.py
+
+
+
+
+
+
+
+
+---
+
+## Scope
+
+| `param_change_alert_scope` | Behavior |
+|---|---|
+| `tracked` (default) | Only nodes that appear in a `dendROS.yaml` group generate notifications |
+| `all` | Every node on the ROS graph generates notifications, including unmatched nodes |
+
+---
+
+## Alert styles
+
+| `param_change_alert_style` | Appearance |
+|---|---|
+| `inline` (default) | Compact single line: `[dendROS]` tag + colored node identity + bold param name |
+| `inverted` | Full white-background strip from `[dendROS]` to end of line; node identity shown as a colored background island; harder to miss in busy logs |
+
+---
+
+## Configuring via `dendros config`
+
+| TUI field | Values | Description |
+|---|---|---|
+| **Param change alert** | `on` / `off` | Enable or disable the feature. |
+| **Param alert scope** | `tracked` / `all` | Which nodes trigger notifications. |
+| **Param alert style** | `inline` / `inverted` | Visual style of the notification line. |
+
+---
+
+## Notes
+
+- Notifications are delayed until the next log line from the launch process arrives (drain is called after each line). If the process is silent, notifications queue up and appear on the next output.
+- Transient CLI daemon nodes (`/_ros2cli_*`) are always filtered out regardless of scope.
+- Startup parameter declarations (`new_parameters`) are ignored — only `changed_parameters` events generate alerts.
+- The background thread terminates automatically when the launch process exits.
+- Covers: `ros2 param set`, programmatic `node->set_parameters()`, and any parameter service client that goes through the ROS 2 parameter service.
diff --git a/docs/param-describe.md b/docs/param-describe.md
new file mode 100644
index 0000000..26bd981
--- /dev/null
+++ b/docs/param-describe.md
@@ -0,0 +1,37 @@
+# ros2 param describe Colorization
+
+When you run `ros2 param describe /node param_name`, DendROS colorizes the output: the group badge and param name are highlighted, labels are dimmed so they recede, and section headers like `Constraints:` are rendered bold so the structure is immediately readable.
+
+---
+
+## What it looks like
+
+
+
+
+
ros2 param describe /battery_monitor start_type_description_service
+
+
+
+
+
+
+
+
+---
+
+
+## Badge and style options
+
+| Setting | Effect on param describe |
+|---|---|
+| `show_tag_cli: true` | Badge shown to the left of the `Parameter name:` line |
+| `tag_style: inverted` | Badge rendered with colored background |
+| `unmatched_color` | Unmatched node uses the fallback color for the param name |
+| `dim_unmatched` | Unmatched param name dimmed (only when `unmatched_color: null`) |
+
+---
diff --git a/docs/param-list.md b/docs/param-list.md
new file mode 100644
index 0000000..73746c9
--- /dev/null
+++ b/docs/param-list.md
@@ -0,0 +1,40 @@
+# ros2 param list Colorization
+
+When you run `ros2 param list`, DendROS automatically colorizes node headers with your group colors and badges, and dims the parameter names so the structure is easier to scan at a glance. The `--param-type` flag is fully supported.
+
+---
+
+## What it looks like
+
+
+
+
+
ros2 param list --param-type
+
+
+
+
+
+
+
+
+---
+
+
+## Badge and style options
+
+| Setting | Effect on param list |
+|---|---|
+| `show_tag_cli: true` | Badge shown to the left of the node header |
+| `tag_style: inverted` | Badge rendered with colored background |
+| Per-group `show_tag: false` | Badge suppressed for that group only |
+| `unmatched_color` | Unmatched node headers shown in the fallback color |
+| `unmatched_tag` | Badge shown next to unmatched headers (requires `unmatched_color`) |
+| `dim_unmatched` | Unmatched headers dimmed (only when `unmatched_color: null`) |
+
+---
+
diff --git a/docs/reference.md b/docs/reference.md
index ef25e2c..6a073b2 100644
--- a/docs/reference.md
+++ b/docs/reference.md
@@ -76,6 +76,9 @@ Stored in `~/.config/dendROS/defaults.yaml`, managed via `dendros config`:
| `crash_alert_color` | `node` | `node` = use group color; `red` = always bold red. |
| `crash_alert_interval` | `30` | Seconds between periodic banner reprints. `0` = only on new crashes. |
| `traceback_color` | `fancy` | `fancy` = bold red header + dim red frames; `red` = all bold red; `off` = passthrough. |
+| `param_change_alert` | `true` | Print inline notification on runtime parameter changes. |
+| `param_change_alert_scope` | `tracked` | `tracked` = only config-listed nodes; `all` = entire graph. |
+| `param_change_alert_style` | `inline` | `inline` = compact line; `inverted` = white-background strip. |
| `init_modify_build` | `true` | `dendros init`: patch build files. |
| `init_on_existing` | `abort` | `dendros init`: `abort`, `merge`, or `overwrite`. |
| `init_color` | `palette` | `dendros init`: `palette` or `null`. |
@@ -96,6 +99,8 @@ The `ros2()` shell wrapper intercepts specific subcommands; everything else call
| `ros2 node info …` | Output piped through `dendros_node_info.py` — node name, sections, and entries colorized by group. See [ros2 node info](node-info.md). |
| `ros2 service list` | Output piped through `dendros_service_list.py` — services colored by owning node; default system services dimmed. See [ros2 service list](service-list.md). |
| `ros2 action list` | Output piped through `dendros_action_list.py` — actions colored by owning node. See [ros2 action list](action-list.md). |
+| `ros2 param list` | Output piped through `dendros_param_list.py` — node headers colored, param lines dimmed. See [ros2 param list](param-list.md). |
+| `ros2 param describe` | Output piped through `dendros_param_describe.py` — badge + colored param name, dim labels, bold section headers. See [ros2 param describe](param-describe.md). |
| Everything else | Passed directly to the real `ros2` binary, untouched. |
---
diff --git a/install.sh b/install.sh
index d1c22cc..eeac30b 100755
--- a/install.sh
+++ b/install.sh
@@ -52,14 +52,19 @@ $SUDO cp "${SCRIPT_DIR}/dendROS/dendros_node_info.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_node_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_service_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_action_list.py" "$INSTALL_DIR/"
+$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_param_list.py" "$INSTALL_DIR/"
+$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_param_describe.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendROS.sh" "$INSTALL_DIR/"
$SUDO cp -r "${SCRIPT_DIR}/dendROS/lib" "$INSTALL_DIR/"
$SUDO chmod +x "$INSTALL_DIR/dendROS_pipe.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_config.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_init.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_node_list.py"
+$SUDO chmod +x "$INSTALL_DIR/dendros_node_info.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_service_list.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_action_list.py"
+$SUDO chmod +x "$INSTALL_DIR/dendros_param_list.py"
+$SUDO chmod +x "$INSTALL_DIR/dendros_param_describe.py"
$SUDO chmod 644 "$INSTALL_DIR/dendROS.sh"
echo -e "${GREEN}[DendROS] Files installed to ${INSTALL_DIR}${RESET}"
diff --git a/mkdocs.yml b/mkdocs.yml
index d7bad7e..2874ae2 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -75,10 +75,13 @@ nav:
- Features:
- Crash Alert: crash-alert.md
- Traceback Highlighting: traceback-highlighting.md
+ - Parameter Change Alert: param-change-alert.md
- ros2 node list: node-list.md
- ros2 node info: node-info.md
- ros2 service list: service-list.md
- ros2 action list: action-list.md
+ - ros2 param list: param-list.md
+ - ros2 param describe: param-describe.md
- Tools:
- dendros init: dendros-init.md
- dendros config: global-config.md
diff --git a/test/unit/conftest.py b/test/unit/conftest.py
index 15ac633..c84fc03 100644
--- a/test/unit/conftest.py
+++ b/test/unit/conftest.py
@@ -23,11 +23,13 @@
# ── ANSI helpers ─────────────────────────────────────────────────────────────
ANSI_RE = re.compile(r'\033\[([0-9;]*)m')
+# Broader regex that strips any CSI sequence (e.g. \033[K for erase-to-EOL)
+_FULL_ANSI_RE = re.compile(r'\033\[[^a-zA-Z]*[a-zA-Z]')
def strip_ansi(s: str) -> str:
"""Remove all ANSI escape sequences from s."""
- return ANSI_RE.sub('', s)
+ return _FULL_ANSI_RE.sub('', s)
def ansi_codes(s: str) -> list:
diff --git a/test/unit/test_global_config.py b/test/unit/test_global_config.py
index c63df2f..e4e4f70 100644
--- a/test/unit/test_global_config.py
+++ b/test/unit/test_global_config.py
@@ -90,6 +90,9 @@ def test_loads_existing_file(self, tmp_config):
"traceback_color": "red",
"tag_style": "inverted",
"show_default_services": False,
+ "param_change_alert": True,
+ "param_change_alert_scope": "all",
+ "param_change_alert_style": "inverted",
}
with open(tmp_config, "w") as f:
yaml.dump(data, f)
@@ -172,6 +175,9 @@ def test_roundtrip_custom_values(self, tmp_config):
"traceback_color": "off",
"tag_style": "inverted",
"show_default_services": False,
+ "param_change_alert": True,
+ "param_change_alert_scope": "all",
+ "param_change_alert_style": "inverted",
}
save_global_config(custom)
result = load_global_config()
diff --git a/test/unit/test_param_describe.py b/test/unit/test_param_describe.py
new file mode 100644
index 0000000..c4656f7
--- /dev/null
+++ b/test/unit/test_param_describe.py
@@ -0,0 +1,273 @@
+"""Tests for dendros_param_describe.py colorization and formatting."""
+
+import os
+import sys
+import subprocess
+
+import pytest
+import yaml
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
+PARAM_DESCRIBE_PATH = os.path.join(REPO_ROOT, 'dendROS', 'dendros_param_describe.py')
+
+from conftest import assert_segment_colored, assert_segment_uncolored, colored_segments, strip_ansi
+
+# ── Fixtures ──────────────────────────────────────────────────────────────────
+
+_SIMPLE = [
+ 'Parameter name: use_sim_time',
+ ' Type: boolean',
+ ' Constraints:',
+]
+
+_WITH_DESC = [
+ 'Parameter name: my_param',
+ ' Type: string',
+ ' Description: Controls the output rate.',
+ ' Constraints:',
+ ' Read only: true',
+]
+
+_MULTI = [
+ 'Parameter name: use_sim_time',
+ ' Type: boolean',
+ ' Constraints:',
+ 'Parameter name: publish_rate',
+ ' Type: double',
+ ' Description: Rate in Hz.',
+ ' Constraints:',
+ ' Min value: 0.0',
+ ' Max value: 100.0',
+ ' Step: 0.0',
+]
+
+
+# ── Helper ────────────────────────────────────────────────────────────────────
+
+def run_describe(tmp_prefix, lines, global_cfg=None, node_colors=None,
+ argv_extra=None, timeout=10):
+ """Run dendros_param_describe.py with text lines as stdin."""
+ env = os.environ.copy()
+ env['AMENT_PREFIX_PATH'] = tmp_prefix
+ env.pop('ROS_DISTRO', None)
+ env['HOME'] = tmp_prefix
+
+ cfg_dir = os.path.join(tmp_prefix, '.config', 'dendROS')
+ os.makedirs(cfg_dir, exist_ok=True)
+
+ if global_cfg:
+ with open(os.path.join(cfg_dir, 'defaults.yaml'), 'w') as f:
+ yaml.dump(global_cfg, f)
+ if node_colors:
+ with open(os.path.join(cfg_dir, 'node_colors.yaml'), 'w') as f:
+ yaml.dump(node_colors, f)
+
+ stdin = '\n'.join(lines) + '\n'
+ cmd = [sys.executable, PARAM_DESCRIBE_PATH] + (argv_extra or [])
+ result = subprocess.run(
+ cmd, input=stdin.encode(), capture_output=True, env=env, timeout=timeout,
+ )
+ return result.stdout.decode(), result.stderr.decode(), result.returncode
+
+
+# ── Parameter name header ─────────────────────────────────────────────────────
+
+class TestParamDescribeHeader:
+ """'Parameter name: X' line: dim label, bold+node-colored param name."""
+
+ def test_param_name_label_dimmed(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[2mParameter name:\033[0m' in stdout
+
+ def test_param_name_value_bold_node_color(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[32;1muse_sim_time\033[0m' in stdout
+
+ def test_param_name_unmatched_passthrough(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/unknown'])
+ # label still dimmed, name not colored
+ assert '\033[2mParameter name:\033[0m' in stdout
+ assert '\033[32;1m' not in stdout
+
+ def test_param_name_no_node_arg(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc)
+ # no color on param name but label is still formatted
+ assert '\033[2mParameter name:\033[0m' in stdout
+
+ def test_namespaced_node_resolved_by_basename(self, tmp_path):
+ nc = {'color_map': {'talker': '33'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/my_ns/talker'])
+ assert '\033[33;1muse_sim_time\033[0m' in stdout
+
+ def test_wildcard_pattern(self, tmp_path):
+ nc = {'color_map': {'nav2_*': '35'}, 'tag_map': {'nav2_*': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/nav2_planner'])
+ assert '\033[35;1muse_sim_time\033[0m' in stdout
+
+ def test_multi_param_each_header_colored(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _MULTI, node_colors=nc,
+ argv_extra=['/talker'])
+ headers = [l for l in stdout.splitlines() if 'Parameter name:' in strip_ansi(l)]
+ assert len(headers) == 2
+ for h in headers:
+ assert '\033[32;1m' in h
+
+
+# ── Field formatting ──────────────────────────────────────────────────────────
+
+class TestParamDescribeFields:
+ """Indented field lines: key dimmed, value plain; section headers bold."""
+
+ def test_type_key_dimmed(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[2mType:\033[0m' in stdout
+
+ def test_type_value_plain(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ type_line = [l for l in stdout.splitlines() if 'Type:' in strip_ansi(l)][0]
+ # 'boolean' should appear outside any color code
+ segs = colored_segments(type_line)
+ assert any('boolean' in t and c is None for t, c in segs)
+
+ def test_description_key_dimmed(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _WITH_DESC, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[2mDescription:\033[0m' in stdout
+
+ def test_constraints_header_bold(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[1mConstraints:\033[0m' in stdout
+
+ def test_read_only_key_dimmed(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _WITH_DESC, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[2mRead only:\033[0m' in stdout
+
+ def test_range_constraints_formatted(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _MULTI, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[2mMin value:\033[0m' in stdout
+ assert '\033[2mMax value:\033[0m' in stdout
+ assert '\033[2mStep:\033[0m' in stdout
+
+ def test_formatting_applied_without_node_color(self, tmp_path):
+ """Field formatting applies even when the node has no config color."""
+ stdout, _, _ = run_describe(str(tmp_path), _WITH_DESC)
+ assert '\033[2mType:\033[0m' in stdout
+ assert '\033[1mConstraints:\033[0m' in stdout
+ assert '\033[2mDescription:\033[0m' in stdout
+
+ def test_indent_preserved(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _WITH_DESC, node_colors=nc,
+ argv_extra=['/talker'])
+ lines = stdout.splitlines()
+ type_line = [strip_ansi(l) for l in lines if 'Type:' in strip_ansi(l)][0]
+ constraint_line = [strip_ansi(l) for l in lines if 'Read only:' in strip_ansi(l)][0]
+ assert type_line.startswith(' ')
+ assert constraint_line.startswith(' ')
+
+ def test_empty_lines_preserved(self, tmp_path):
+ lines = _SIMPLE + ['', 'Parameter name: other_param', ' Type: integer', ' Constraints:']
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), lines, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '' in stdout.splitlines()
+
+
+# ── Tag badge ─────────────────────────────────────────────────────────────────
+
+class TestParamDescribeTag:
+
+ def test_tag_shown_before_param_name(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ header = [l for l in stdout.splitlines() if 'Parameter name:' in strip_ansi(l)][0]
+ assert '[TLK]' in header
+ assert header.index('[TLK]') < header.index('Parameter name:')
+
+ def test_tag_hidden_when_show_tag_cli_false(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'],
+ global_cfg={'show_tag_cli': False})
+ assert '[TLK]' not in stdout
+
+ def test_inverted_tag_style(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'},
+ 'style_map': {'talker': 'inverted'}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/talker'])
+ assert '\033[32;7m[TLK]' in stdout
+
+ def test_tag_not_shown_on_field_lines(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _WITH_DESC, node_colors=nc,
+ argv_extra=['/talker'])
+ for line in stdout.splitlines():
+ if 'Type:' in strip_ansi(line) or 'Description:' in strip_ansi(line):
+ assert '[TLK]' not in line
+
+ def test_unmatched_tag(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/unknown'],
+ global_cfg={'unmatched_color': 'white', 'unmatched_tag': '?'})
+ assert '[?]' in stdout
+
+
+# ── Unmatched / dim_unmatched ─────────────────────────────────────────────────
+
+class TestParamDescribeUnmatched:
+
+ def test_unmatched_color_applied_to_param_name(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/unknown'],
+ global_cfg={'unmatched_color': 'cyan'})
+ assert '\033[' in stdout
+ # at least the param name should be colored
+ header = [l for l in stdout.splitlines() if 'Parameter name:' in strip_ansi(l)][0]
+ assert '\033[' in header
+
+ def test_dim_unmatched_param_name(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, node_colors=nc,
+ argv_extra=['/unknown'],
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2muse_sim_time\033[0m' in stdout
+
+
+# ── AMENT_PREFIX_PATH fallback ────────────────────────────────────────────────
+
+class TestParamDescribeFallback:
+
+ def test_fallback_scan_colors_param_name(self, tmp_path):
+ cfg_dir = tmp_path / 'share' / 'my_pkg' / 'config'
+ cfg_dir.mkdir(parents=True)
+ (cfg_dir / 'dendROS.yaml').write_text(yaml.dump({
+ 'groups': {'loc': {'color': 'bold blue', 'label': 'LOC',
+ 'nodes': ['talker']}}
+ }))
+ stdout, _, _ = run_describe(str(tmp_path), _SIMPLE, argv_extra=['/talker'])
+ assert '\033[' in stdout
diff --git a/test/unit/test_param_list.py b/test/unit/test_param_list.py
new file mode 100644
index 0000000..c207505
--- /dev/null
+++ b/test/unit/test_param_list.py
@@ -0,0 +1,334 @@
+"""Tests for dendros_param_list.py colorization."""
+
+import os
+import sys
+import subprocess
+
+import pytest
+import yaml
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
+PARAM_LIST_PATH = os.path.join(REPO_ROOT, 'dendROS', 'dendros_param_list.py')
+
+from conftest import assert_segment_colored, assert_segment_uncolored, colored_segments, strip_ansi
+
+
+# ── Helper ────────────────────────────────────────────────────────────────────
+
+def run_param_list(tmp_prefix, lines, global_cfg=None, node_colors=None,
+ argv_extra=None, timeout=10):
+ """Run dendros_param_list.py with text lines as stdin; return (stdout, stderr, rc).
+
+ argv_extra: list of extra args appended to the command (mirrors ${@:3} from the shell).
+ """
+ env = os.environ.copy()
+ env['AMENT_PREFIX_PATH'] = tmp_prefix
+ env.pop('ROS_DISTRO', None)
+ env['HOME'] = tmp_prefix
+
+ cfg_dir = os.path.join(tmp_prefix, '.config', 'dendROS')
+ os.makedirs(cfg_dir, exist_ok=True)
+
+ if global_cfg:
+ with open(os.path.join(cfg_dir, 'defaults.yaml'), 'w') as f:
+ yaml.dump(global_cfg, f)
+ if node_colors:
+ with open(os.path.join(cfg_dir, 'node_colors.yaml'), 'w') as f:
+ yaml.dump(node_colors, f)
+
+ stdin = '\n'.join(lines) + '\n'
+ cmd = [sys.executable, PARAM_LIST_PATH] + (argv_extra or [])
+ result = subprocess.run(
+ cmd,
+ input=stdin.encode(),
+ capture_output=True,
+ env=env,
+ timeout=timeout,
+ )
+ return result.stdout.decode(), result.stderr.decode(), result.returncode
+
+
+# ── Node header colorization ──────────────────────────────────────────────────
+
+class TestParamNodeHeader:
+ """Node header lines (/node_name:) are colored with the group's color."""
+
+ def test_matched_node_header_colored(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'], node_colors=nc)
+ assert '\033[32m/talker:\033[0m' in stdout
+
+ def test_unmatched_node_header_passthrough(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/unknown_node:'], node_colors=nc)
+ assert_segment_uncolored(stdout, '/unknown_node:')
+
+ def test_multiple_node_headers(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'listener': '34'},
+ 'tag_map': {'talker': '', 'listener': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/talker:', ' p1', '/listener:', ' p2'], node_colors=nc)
+ assert '\033[32m/talker:\033[0m' in stdout
+ assert '\033[34m/listener:\033[0m' in stdout
+
+ def test_namespaced_node_header(self, tmp_path):
+ nc = {'color_map': {'talker': '33'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/my_ns/talker:'], node_colors=nc)
+ assert '\033[33m/my_ns/talker:\033[0m' in stdout
+
+ def test_wildcard_pattern(self, tmp_path):
+ nc = {'color_map': {'nav2_*': '35'}, 'tag_map': {'nav2_*': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/nav2_planner:'], node_colors=nc)
+ assert '\033[35m/nav2_planner:\033[0m' in stdout
+
+ def test_no_config_passthrough(self, tmp_path):
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'])
+ assert '\033[' not in stdout
+ assert '/talker:' in stdout
+
+ def test_empty_lines_preserved(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/talker:', '', ' p1'], node_colors=nc)
+ assert '' in stdout.splitlines()
+
+
+# ── Param line colorization (dimmed) ─────────────────────────────────────────
+
+class TestParamLines:
+ """Param lines (indented) use the current node's color, dimmed."""
+
+ def test_param_colored_dim_with_node_color(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:', ' use_sim_time'],
+ node_colors=nc)
+ assert '\033[32m\033[2muse_sim_time\033[0m' in stdout
+
+ def test_param_indent_preserved(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:', ' use_sim_time'],
+ node_colors=nc)
+ line = [l for l in stdout.splitlines() if 'use_sim_time' in l][0]
+ plain = strip_ansi(line)
+ assert plain.startswith(' ')
+
+ def test_param_under_unmatched_node_passthrough(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:', ' some_param'], node_colors=nc)
+ assert_segment_uncolored(stdout, 'some_param')
+
+ def test_params_inherit_node_color_change(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'listener': '34'},
+ 'tag_map': {'talker': '', 'listener': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path),
+ ['/talker:', ' p1', '/listener:', ' p2'],
+ node_colors=nc)
+ assert '\033[32m\033[2mp1\033[0m' in stdout
+ assert '\033[34m\033[2mp2\033[0m' in stdout
+
+ def test_multiple_params_same_node(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/talker:', ' param_a', ' param_b'], node_colors=nc)
+ assert '\033[32m\033[2mparam_a\033[0m' in stdout
+ assert '\033[32m\033[2mparam_b\033[0m' in stdout
+
+ def test_param_no_config_passthrough(self, tmp_path):
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:', ' use_sim_time'])
+ assert '\033[' not in stdout
+
+
+# ── --param-type flag: type annotation dimmed ─────────────────────────────────
+
+class TestParamListTypeFlag:
+ """--param-type appends ' (type)'; type content must be rendered dim."""
+
+ def test_type_dimmed_for_matched_param(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/talker:', ' use_sim_time (bool)'], node_colors=nc)
+ assert '(\033[2mbool\033[0m)' in stdout
+
+ def test_type_dimmed_for_unmatched_param(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:', ' foo (integer)'], node_colors=nc)
+ assert '(\033[2minteger\033[0m)' in stdout
+
+ def test_param_name_and_type_both_present(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/talker:', ' my_param (string)'], node_colors=nc)
+ assert '\033[32m\033[2mmy_param\033[0m' in stdout
+ assert '(\033[2mstring\033[0m)' in stdout
+
+ def test_no_type_no_parens(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:', ' my_param'],
+ node_colors=nc)
+ assert '(' not in strip_ansi(stdout)
+
+
+# ── Tag badge ─────────────────────────────────────────────────────────────────
+
+class TestParamListTag:
+ """Tag badge appears left of the node header when show_tag_cli is true."""
+
+ def test_tag_shown_before_node_header(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'], node_colors=nc)
+ assert '[TLK]' in stdout
+ line = [l for l in stdout.splitlines() if '/talker:' in strip_ansi(l)][0]
+ assert line.index('[TLK]') < line.index('/talker:')
+
+ def test_tag_hidden_when_show_tag_cli_false(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'],
+ node_colors=nc,
+ global_cfg={'show_tag_cli': False})
+ assert '[TLK]' not in stdout
+
+ def test_inverted_tag_style(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'},
+ 'style_map': {'talker': 'inverted'}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'], node_colors=nc)
+ assert '\033[32;7m[TLK]' in stdout
+
+ def test_unmatched_tag(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:'], node_colors=nc,
+ global_cfg={'unmatched_color': 'white', 'unmatched_tag': '?'})
+ assert '[?]' in stdout
+
+ def test_tag_not_shown_on_param_lines(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:', ' p1'], node_colors=nc)
+ param_line = [l for l in stdout.splitlines() if 'p1' in strip_ansi(l)][0]
+ assert '[TLK]' not in param_line
+
+
+# ── Unmatched / dim_unmatched ─────────────────────────────────────────────────
+
+class TestParamListUnmatched:
+
+ def test_unmatched_color_applied_to_header(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:'], node_colors=nc,
+ global_cfg={'unmatched_color': 'cyan'})
+ assert '\033[' in stdout
+
+ def test_dim_unmatched_header(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:'], node_colors=nc,
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2m/unknown:\033[0m' in stdout
+
+ def test_dim_unmatched_param(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(
+ str(tmp_path), ['/unknown:', ' my_param'], node_colors=nc,
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2mmy_param\033[0m' in stdout
+
+ def test_passthrough_when_no_config(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['/unknown:', ' p1'], node_colors=nc)
+ assert_segment_uncolored(stdout, '/unknown:')
+ assert_segment_uncolored(stdout, 'p1')
+
+
+# ── AMENT_PREFIX_PATH fallback ────────────────────────────────────────────────
+
+class TestParamListFallback:
+
+ def test_fallback_scan_colors_header(self, tmp_path):
+ cfg_dir = tmp_path / 'share' / 'my_pkg' / 'config'
+ cfg_dir.mkdir(parents=True)
+ (cfg_dir / 'dendROS.yaml').write_text(yaml.dump({
+ 'groups': {'loc': {'color': 'bold blue', 'label': 'LOC',
+ 'nodes': ['talker']}}
+ }))
+ stdout, _, _ = run_param_list(str(tmp_path), ['/talker:'])
+ assert '\033[' in stdout
+
+
+# ── ros2 param list /node_name — bare output format ──────────────────────────
+
+class TestParamListNodeArg:
+ """When a node path is passed as argv, bare param lines (no header, no indent)
+ are colored using that node's color, pre-seeded before reading stdin."""
+
+ def test_bare_params_colored_via_node_arg(self, tmp_path):
+ """Bare param names (no header, no indentation) colored when node arg given."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time', 'publish_rate'],
+ node_colors=nc, argv_extra=['/talker'])
+ assert '\033[32m\033[2muse_sim_time\033[0m' in stdout
+ assert '\033[32m\033[2mpublish_rate\033[0m' in stdout
+
+ def test_bare_params_with_type_flag(self, tmp_path):
+ """--param-type type dimming works for bare output."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time (bool)'],
+ node_colors=nc, argv_extra=['/talker'])
+ assert '\033[32m\033[2muse_sim_time\033[0m' in stdout
+ assert '(\033[2mbool\033[0m)' in stdout
+
+ def test_node_arg_with_namespace(self, tmp_path):
+ """Namespaced node path resolved by basename matching."""
+ nc = {'color_map': {'talker': '33'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time'],
+ node_colors=nc, argv_extra=['/my_ns/talker'])
+ assert '\033[33m\033[2muse_sim_time\033[0m' in stdout
+
+ def test_node_arg_unmatched_passthrough(self, tmp_path):
+ """Unknown node arg leaves bare params uncolored."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time'],
+ node_colors=nc, argv_extra=['/unknown_node'])
+ assert_segment_uncolored(stdout, 'use_sim_time')
+
+ def test_node_arg_unmatched_dim(self, tmp_path):
+ """dim_unmatched applies to bare params from an unmatched node arg."""
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time'],
+ node_colors=nc, argv_extra=['/unknown_node'],
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2muse_sim_time\033[0m' in stdout
+
+ def test_node_arg_unmatched_color(self, tmp_path):
+ """unmatched_color pre-seeds bare params from an unmatched node arg."""
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time'],
+ node_colors=nc, argv_extra=['/unknown_node'],
+ global_cfg={'unmatched_color': 'cyan'})
+ assert '\033[' in stdout
+
+ def test_header_in_output_overrides_node_arg(self, tmp_path):
+ """When output includes a node header, header color wins (even if arg differs)."""
+ nc = {'color_map': {'talker': '32', 'listener': '34'},
+ 'tag_map': {'talker': '', 'listener': ''}, 'style_map': {}}
+ # arg says /talker but output has /listener: header — listener color should apply
+ stdout, _, _ = run_param_list(str(tmp_path), ['/listener:', ' p1'],
+ node_colors=nc, argv_extra=['/talker'])
+ assert '\033[34m\033[2mp1\033[0m' in stdout
+
+ def test_flag_args_ignored_for_node_detection(self, tmp_path):
+ """Flags like --param-type passed in argv_extra do not confuse node detection."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time (bool)'],
+ node_colors=nc,
+ argv_extra=['--param-type', '/talker'])
+ assert '\033[32m\033[2muse_sim_time\033[0m' in stdout
+
+ def test_no_node_arg_bare_params_passthrough(self, tmp_path):
+ """Without a node arg, bare param names are passed through without color."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_param_list(str(tmp_path), ['use_sim_time'], node_colors=nc)
+ assert_segment_uncolored(stdout, 'use_sim_time')
diff --git a/test/unit/test_param_watcher.py b/test/unit/test_param_watcher.py
new file mode 100644
index 0000000..26e6086
--- /dev/null
+++ b/test/unit/test_param_watcher.py
@@ -0,0 +1,484 @@
+"""Tests for lib/param_watcher.py — param change notification subsystem."""
+
+import os
+import queue
+import sys
+
+import pytest
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
+sys.path.insert(0, os.path.join(REPO_ROOT, 'dendROS'))
+
+import lib.param_watcher as pw
+from conftest import strip_ansi
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _resolve_node_mock(node, color_map, tag_map):
+ """Minimal stand-in for lib.config_loader.resolve_node (exact basename match)."""
+ basename = node.rsplit('/', 1)[-1]
+ code = color_map.get(node) or color_map.get(basename)
+ label = tag_map.get(node) or tag_map.get(basename) if code else None
+ return code, label
+
+
+def _make_param_event(node, changed=None, new=None, deleted=None):
+ """Build a list of YAML lines representing one ParameterEvent message."""
+ lines = [f'node: {node}', 'stamp:', ' sec: 0', ' nanosec: 0']
+ for section, params in [('new_parameters', new), ('changed_parameters', changed),
+ ('deleted_parameters', deleted)]:
+ if params:
+ lines.append(f'{section}:')
+ for name, type_id, val_field, val in params:
+ lines += [
+ f'- name: {name}',
+ f' value:',
+ f' type: {type_id}',
+ f' {val_field}: {val}',
+ ]
+ else:
+ lines.append(f'{section}: []')
+ return lines
+
+
+def _drain_all(color_map, tag_map, style_map=None, tag_style='normal', show_tag=True,
+ alert_style='inline'):
+ """Drain the queue using the real drain() but with a mock resolve_node."""
+ return pw.drain(color_map, tag_map, style_map or {}, tag_style, show_tag, alert_style)
+
+
+def _flush_queue():
+ while True:
+ try:
+ pw._queue.get_nowait()
+ except queue.Empty:
+ break
+
+
+# ── _extract_value ────────────────────────────────────────────────────────────
+
+class TestExtractValue:
+
+ def test_bool_true(self):
+ assert pw._extract_value({'type': 1, 'bool_value': True}) == 'true'
+
+ def test_bool_false(self):
+ assert pw._extract_value({'type': 1, 'bool_value': False}) == 'false'
+
+ def test_integer(self):
+ assert pw._extract_value({'type': 2, 'integer_value': 42}) == '42'
+
+ def test_double(self):
+ assert pw._extract_value({'type': 3, 'double_value': 3.14}) == '3.14'
+
+ def test_string(self):
+ assert pw._extract_value({'type': 4, 'string_value': 'hello'}) == 'hello'
+
+ def test_bool_array(self):
+ result = pw._extract_value({'type': 6, 'bool_array_value': [True, False]})
+ assert 'True' in result or 'False' in result
+
+ def test_long_array_truncated(self):
+ big = list(range(100))
+ result = pw._extract_value({'type': 7, 'integer_array_value': big})
+ assert '…' in result
+ assert len(result) <= pw._MAX_VALUE_LEN + 2
+
+ def test_unknown_type_returns_empty(self):
+ assert pw._extract_value({'type': 99}) == ''
+
+ def test_non_dict_input(self):
+ result = pw._extract_value('bare_string')
+ assert result == 'bare_string'
+
+
+# ── _process_chunk ────────────────────────────────────────────────────────────
+
+class TestProcessChunk:
+
+ def setup_method(self):
+ _flush_queue()
+ pw._param_cache.clear()
+
+ def _process(self, lines, color_map=None, tag_map=None, scope='tracked'):
+ cm = color_map or {}
+ tm = tag_map or {}
+ pw._process_chunk(lines, cm, tm, scope, _resolve_node_mock)
+
+ def test_changed_param_enqueued(self):
+ lines = _make_param_event(
+ '/talker',
+ changed=[('use_sim_time', 1, 'bool_value', 'true')],
+ )
+ color_map = {'talker': '32'}
+ self._process(lines, color_map)
+ node, name, old_val, value = pw._queue.get_nowait()
+ assert node == '/talker'
+ assert name == 'use_sim_time'
+ assert value == 'true'
+ assert old_val is None # first-ever change for this param
+
+ def test_new_params_not_enqueued(self):
+ """new_parameters (startup declarations) must be silently ignored."""
+ lines = _make_param_event(
+ '/talker',
+ new=[('use_sim_time', 1, 'bool_value', 'false')],
+ )
+ self._process(lines, {'talker': '32'})
+ assert pw._queue.empty()
+
+ def test_deleted_params_not_enqueued(self):
+ lines = _make_param_event(
+ '/talker',
+ deleted=[('old_param', 4, 'string_value', 'gone')],
+ )
+ self._process(lines, {'talker': '32'})
+ assert pw._queue.empty()
+
+ def test_cli_daemon_node_filtered(self):
+ lines = _make_param_event(
+ '/_ros2cli_12345',
+ changed=[('use_sim_time', 1, 'bool_value', 'false')],
+ )
+ self._process(lines, {})
+ assert pw._queue.empty()
+
+ def test_untracked_node_filtered_in_tracked_scope(self):
+ lines = _make_param_event(
+ '/unknown_node',
+ changed=[('my_param', 2, 'integer_value', '5')],
+ )
+ self._process(lines, {'talker': '32'}, scope='tracked')
+ assert pw._queue.empty()
+
+ def test_untracked_node_passes_in_all_scope(self):
+ lines = _make_param_event(
+ '/unknown_node',
+ changed=[('my_param', 2, 'integer_value', '5')],
+ )
+ self._process(lines, {'talker': '32'}, scope='all')
+ assert not pw._queue.empty()
+ node, name, old_val, value = pw._queue.get_nowait()
+ assert node == '/unknown_node'
+ assert name == 'my_param'
+
+ def test_multiple_changed_params_all_enqueued(self):
+ lines = _make_param_event(
+ '/talker',
+ changed=[
+ ('param_a', 1, 'bool_value', 'true'),
+ ('param_b', 2, 'integer_value', '7'),
+ ],
+ )
+ self._process(lines, {'talker': '32'})
+ assert pw._queue.qsize() == 2
+
+ def test_invalid_yaml_silently_ignored(self):
+ self._process(['not: valid: yaml: :::'], {'talker': '32'})
+ # should not raise; queue stays empty
+ assert pw._queue.empty()
+
+ def test_empty_node_field_filtered(self):
+ lines = ['node: ', 'changed_parameters:', '- name: p', ' value:', ' type: 1',
+ ' bool_value: true', 'new_parameters: []', 'deleted_parameters: []']
+ self._process(lines, {}, scope='all')
+ assert pw._queue.empty()
+
+ def test_namespaced_node_matched_by_basename(self):
+ lines = _make_param_event(
+ '/my_ns/talker',
+ changed=[('rate', 3, 'double_value', '10.0')],
+ )
+ self._process(lines, {'talker': '32'}, scope='tracked')
+ assert not pw._queue.empty()
+
+ def test_wildcard_pattern_matched(self):
+ """Wildcard patterns in color_map are resolved by _resolve_node_mock (exact only)
+ — this test verifies that a direct match reaches the queue."""
+ lines = _make_param_event(
+ '/nav2_planner',
+ changed=[('max_vel', 3, 'double_value', '1.5')],
+ )
+ self._process(lines, {'nav2_planner': '35'}, scope='tracked')
+ assert not pw._queue.empty()
+
+
+# ── drain() formatting ────────────────────────────────────────────────────────
+
+class TestDrain:
+
+ def setup_method(self):
+ _flush_queue()
+ pw._param_cache.clear()
+
+ def _push(self, node, name, value, old_val=None):
+ pw._queue.put((node, name, old_val, value))
+
+ def test_drain_empty_queue_returns_empty_list(self):
+ result = _drain_all({}, {})
+ assert result == []
+
+ def test_notification_contains_node_name(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert len(lines) == 1
+ assert '/talker' in strip_ansi(lines[0])
+
+ def test_notification_contains_param_name(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert 'use_sim_time' in strip_ansi(lines[0])
+
+ def test_notification_contains_value(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert 'true' in strip_ansi(lines[0])
+
+ def test_notification_contains_dendros_header(self):
+ self._push('/talker', 'rate', '10.0')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '[dendROS]' in strip_ansi(lines[0])
+
+ def test_dendros_header_logo_colors(self):
+ """[dendROS] must split on the logo palette: [dend in logo-blue, ROS] in logo-orange."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '[dendROS]' in strip_ansi(lines[0])
+ # Logo-blue (0,75,107) for '[dend'
+ assert '\033[38;2;0;75;107;1m[dend' in lines[0]
+ # Logo-orange (224,127,0) for 'ROS]'
+ assert '\033[38;2;224;127;0;1mROS]' in lines[0]
+
+ def test_node_colored(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '\033[32m/talker\033[0m' in lines[0]
+
+ def test_tag_shown_when_label_present(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': 'TLK'})
+ assert '[TLK]' in lines[0]
+
+ def test_tag_hidden_when_show_tag_false(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': 'TLK'}, show_tag=False)
+ assert '[TLK]' not in lines[0]
+
+ def test_inverted_tag_style(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all(
+ {'talker': '32'}, {'talker': 'TLK'},
+ style_map={'talker': 'inverted'}, tag_style='inverted',
+ )
+ assert '\033[32;7m[TLK]\033[0m' in lines[0]
+
+ def test_param_name_bold(self):
+ self._push('/talker', 'my_param', 'val')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '\033[1mmy_param\033[0m' in lines[0]
+
+ def test_param_keyword_plain_white(self):
+ """'param' keyword must appear without any ANSI color code (plain/white)."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert 'param' in strip_ansi(lines[0])
+ assert '\033[35mparam\033[0m' not in lines[0]
+ assert '\033[2mparam\033[0m' not in lines[0]
+
+ def test_untracked_node_no_color(self):
+ """In 'all' mode, untracked nodes appear without ANSI color wrapping the node name."""
+ self._push('/unknown_node', 'p', 'v')
+ lines = _drain_all({}, {})
+ assert '/unknown_node' in lines[0]
+ import re
+ assert not re.search(r'\033\[[0-9;]+m/unknown_node\033\[0m', lines[0])
+
+ def test_multiple_events_all_drained(self):
+ for i in range(5):
+ self._push('/talker', f'param_{i}', str(i))
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert len(lines) == 5
+
+ def test_line_ends_with_newline(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert lines[0].endswith('\n')
+
+ def test_arrow_separator_present(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '→' in lines[0]
+
+ def test_unknown_old_value_shown_as_question_mark(self):
+ """First-ever change (old_val=None) should show '? →' in output."""
+ self._push('/talker', 'p', 'new', old_val=None)
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ assert '? →' in strip_ansi(lines[0])
+
+ def test_known_old_value_shown(self):
+ self._push('/talker', 'p', 'new', old_val='old')
+ lines = _drain_all({'talker': '32'}, {'talker': ''})
+ plain = strip_ansi(lines[0])
+ assert 'old → new' in plain
+
+ def test_process_chunk_caches_and_reports_old_value(self):
+ """Second change via _process_chunk should carry the first change's value as old."""
+ import queue as _queue_mod
+ pw._param_cache.clear()
+ _flush_queue()
+
+ def _resolve(node, cm, tm):
+ return cm.get(node.rsplit('/', 1)[-1]), None
+
+ lines1 = _make_param_event('/talker', changed=[('rate', 3, 'double_value', '5.0')])
+ pw._process_chunk(lines1, {'talker': '32'}, {}, 'all', _resolve)
+ _, _, old1, val1 = pw._queue.get_nowait()
+ assert old1 is None
+ assert val1 == '5.0'
+
+ lines2 = _make_param_event('/talker', changed=[('rate', 3, 'double_value', '10.0')])
+ pw._process_chunk(lines2, {'talker': '32'}, {}, 'all', _resolve)
+ _, _, old2, val2 = pw._queue.get_nowait()
+ assert old2 == '5.0'
+ assert val2 == '10.0'
+
+
+# ── Inverted alert style ──────────────────────────────────────────────────────
+
+class TestInvertedAlertStyle:
+
+ def setup_method(self):
+ _flush_queue()
+ pw._param_cache.clear()
+
+ def _push(self, node, name, value, old_val=None):
+ pw._queue.put((node, name, old_val, value))
+
+ def test_inverted_sections_use_correct_backgrounds(self):
+ """Node identity uses explicit node-color bg; everything else on white bg."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ # Node identity: explicit green bg + black text (_fg_to_bg('32') == '42')
+ assert '\033[42;30m' in lines[0]
+ # White bg present (for param section and separating spaces)
+ assert '\033[107;30m' in lines[0]
+ # No reverse-video for node identity
+ assert '\033[32;7m' not in lines[0]
+
+ def test_inverted_contains_node_name(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '/talker' in strip_ansi(lines[0])
+
+ def test_inverted_contains_param_name(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert 'use_sim_time' in strip_ansi(lines[0])
+
+ def test_inverted_contains_value(self):
+ self._push('/talker', 'use_sim_time', 'true')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert 'true' in strip_ansi(lines[0])
+
+ def test_inverted_tag_and_node_use_explicit_node_bg(self):
+ """Tag and node name together in one colored-bg island (_fg_to_bg of node code)."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': 'TLK'}, alert_style='inverted')
+ assert '[TLK]' in strip_ansi(lines[0])
+ # _fg_to_bg('32') == '42' → green bg + black text; returns to white bg (_WB) after
+ assert '\033[42;30m[TLK] /talker\033[107;30m' in lines[0]
+
+ def test_inverted_tag_hidden_when_show_tag_false(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': 'TLK'},
+ show_tag=False, alert_style='inverted')
+ assert '[TLK]' not in strip_ansi(lines[0])
+
+ def test_inverted_no_param_bold_marker(self):
+ """Inverted block wraps everything; no standalone bold param marker needed."""
+ self._push('/talker', 'my_param', 'val')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ # Content must still contain the param name
+ assert 'my_param' in strip_ansi(lines[0])
+
+ def test_inverted_untracked_uses_explicit_white_bg(self):
+ """Untracked nodes (all mode) get the white bg strip; no reverse-video or color bg."""
+ self._push('/unknown', 'p', 'v')
+ lines = _drain_all({}, {}, alert_style='inverted')
+ assert '\033[107;30m' in lines[0] # white bg present
+ assert '\033[7m' not in lines[0] # no reverse-video at all
+ # No node-color bg escape (no fg→bg conversion applied for untracked)
+ import re
+ assert not re.search(r'\033\[4[0-9];30m', lines[0])
+
+ def test_inverted_block_reset_at_end(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '\033[0m' in lines[0]
+
+ def test_inverted_extends_to_eol_with_erase_sequence(self):
+ """\\033[K (erase to EOL) must be present to fill background to console edge."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '\033[K' in lines[0]
+
+ def test_inverted_dendros_header_present(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '[dendROS]' in strip_ansi(lines[0])
+
+ def test_inverted_dendros_header_logo_colors(self):
+ """[dend on logo-blue bg, ROS] on logo-orange bg; black text (hollow/cutout)."""
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '\033[48;2;0;75;107;1m' in lines[0] # logo-blue background
+ assert '\033[48;2;224;127;0;1m' in lines[0] # logo-orange background
+ # Black text applied before [dend (hollow cutout look)
+ assert '\033[48;2;0;75;107;1m\033[30m[dend' in lines[0]
+
+ def test_inverted_arrow_present(self):
+ self._push('/talker', 'p', 'v')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '→' in strip_ansi(lines[0])
+
+ def test_inverted_same_content_as_inline(self):
+ """Both styles must expose the same semantic content in plain text."""
+ self._push('/talker', 'use_sim_time', 'true', old_val='false')
+ inline_lines = _drain_all({'talker': '32'}, {'talker': 'CTR'}, alert_style='inline')
+ self._push('/talker', 'use_sim_time', 'true', old_val='false')
+ inv_lines = _drain_all({'talker': '32'}, {'talker': 'CTR'}, alert_style='inverted')
+ assert strip_ansi(inline_lines[0]).strip() == strip_ansi(inv_lines[0]).strip()
+
+ def test_inverted_old_value_unknown_shows_question_mark(self):
+ self._push('/talker', 'p', 'new', old_val=None)
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert '? →' in strip_ansi(lines[0])
+
+ def test_inverted_known_old_value_shown(self):
+ self._push('/talker', 'p', 'new', old_val='old')
+ lines = _drain_all({'talker': '32'}, {'talker': ''}, alert_style='inverted')
+ assert 'old → new' in strip_ansi(lines[0])
+
+
+# ── Global config defaults ────────────────────────────────────────────────────
+
+class TestGlobalConfigDefaults:
+
+ def test_param_change_alert_default_true(self):
+ from lib.global_config import DEFAULTS
+ assert DEFAULTS['param_change_alert'] is True
+
+ def test_param_change_alert_scope_default_tracked(self):
+ from lib.global_config import DEFAULTS
+ assert DEFAULTS['param_change_alert_scope'] == 'tracked'
+
+ def test_param_change_alert_style_default_inline(self):
+ from lib.global_config import DEFAULTS
+ assert DEFAULTS['param_change_alert_style'] == 'inline'
+
+ def test_all_keys_present_in_defaults(self):
+ from lib.global_config import DEFAULTS
+ assert 'param_change_alert' in DEFAULTS
+ assert 'param_change_alert_scope' in DEFAULTS
+ assert 'param_change_alert_style' in DEFAULTS