diff --git a/README.md b/README.md
index f3f1b76..2d91302 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,9 @@ It also features some quality of life improvements for ROS outputs.
+- ### **All main `ros2` CLI commands wrapped**
+ `ros2 node list`, `ros2 node info`, `ros2 service list`, `ros2 action list`, `ros2 topic list`, `ros2 param list`, and `ros2 param describe` — all colorized with the same group colors and badges, no extra config required. [→ Full feature list](https://mlisi1.github.io/DendROS/node-list/#ros2--intercepted-subcommands)
+
- ### **One command to get started**
Too lazy to look up how DendROS config works? We got you covered: `dendros init` scans your launch files and generates an initial config for you
@@ -76,47 +79,7 @@ It also features some quality of life improvements for ROS outputs.
-- ### **```ros2 node list``` coloring**
- Brings color to your node lists.
-
-
🔔
diff --git a/docs/reference.md b/docs/reference.md
index 6a073b2..4ca98b9 100644
--- a/docs/reference.md
+++ b/docs/reference.md
@@ -69,6 +69,7 @@ Stored in `~/.config/dendROS/defaults.yaml`, managed via `dendros config`:
| `unmatched_tag` | `null` | Global badge for unmatched nodes. |
| `dim_unmatched` | `false` | Dim unmatched nodes globally. |
| `show_default_services` | `true` | When `false`, hide standard ROS 2 system services from `ros2 service list` output. |
+| `topic_sort` | `default` | `default` = ros2 alphabetical order; `group` = system topics first, then by publisher group. |
| `debug` | `false` | Print debug output on startup. |
| `config_merge` | `true` | Merge configs from included packages. |
| `colorize_launch_msgs` | `true` | Colorize lifecycle lines globally. |
@@ -101,6 +102,7 @@ The `ros2()` shell wrapper intercepts specific subcommands; everything else call
| `ros2 action list` | Output piped through `dendros_action_list.py` — actions colored by owning node. See [ros2 action list](action-list.md). |
| `ros2 param list` | Output piped through `dendros_param_list.py` — node headers colored, param lines dimmed. See [ros2 param list](param-list.md). |
| `ros2 param describe` | Output piped through `dendros_param_describe.py` — badge + colored param name, dim labels, bold section headers. See [ros2 param describe](param-describe.md). |
+| `ros2 topic list` | Output piped through `dendros_topic_list.py` — topics colored by primary publisher group, pub/sub count blocks, badge left. `-v` verbose: bold headers, dim types. See [ros2 topic list](topic-list.md). |
| Everything else | Passed directly to the real `ros2` binary, untouched. |
---
diff --git a/docs/topic-list.md b/docs/topic-list.md
new file mode 100644
index 0000000..7f65e71
--- /dev/null
+++ b/docs/topic-list.md
@@ -0,0 +1,121 @@
+# ros2 topic list Colorization
+
+When you run `ros2 topic list`, DendROS colors each topic with its primary publisher's group color, adds publisher and subscriber count indicators, and supports badge labels — the same colors and config you set up for `ros2 launch`. Running `ros2 topic list -v` gets minimal formatting: bold section headers and dimmed type annotations.
+
+---
+
+## What it looks like
+
+=== "Default sort"
+
+
+
+
+
+
+
+
+
+
+=== "Group sort"
+
+
+
+
+
+
+
+
+
+
+---
+
+## What gets colored
+
+### Topic name
+
+Each topic is colored with the group color of its **primary publisher** — the first publisher node reported by the ROS graph. If that node has a configured group, the topic name is rendered in that color. A group badge is shown to the **left** of the topic name when `show_tag_cli` is enabled and the group has a label.
+
+If a topic has no publishers or its publishers don't match any group, the topic falls back to `unmatched_color` (if set), dim (if `dim_unmatched` is true), or plain white.
+
+### Publisher count blocks
+
+To the **left** of the badge and topic name, DendROS shows one inverted-color count block per publisher group. Each block shows how many publishers of that group color are advertising this topic. Multiple groups produce multiple adjacent blocks. The blocks are right-aligned across all topics so names stay in a consistent column regardless of how many groups are publishing.
+
+### Subscriber count blocks
+
+To the **right** of the topic name (and type annotation if `-t` is used), DendROS shows subscriber count blocks using the same inverted-color format. Subscriber blocks are left-aligned across all topics so they start at a consistent column.
+
+### System topics
+
+`/parameter_events` and `/rosout` are always shown plain — no color, no badge, no count blocks. They are excluded from the graph query entirely. Their names are still aligned to the same column as other topics.
+
+### Type annotations (`-t`)
+
+Running `ros2 topic list -t` appends a type annotation to each entry:
+
+```
+/chatter [std_msgs/msg/String]
+```
+
+The type content inside the brackets is always **dimmed**, keeping focus on the topic name. Subscriber count blocks appear to the right of the type annotation.
+
+---
+
+## Verbose mode (`-v`)
+
+Running `ros2 topic list -v` produces a structured output with `Published topics:` and `Subscribed topics:` sections. DendROS detects this format automatically and applies minimal formatting:
+
+- **Section headers** (`Published topics:`, `Subscribed topics:`) are rendered **bold**
+- **Type annotations** inside `[...]` are **dimmed**
+- Topic names, bullet points, and publisher/subscriber counts are left unchanged
+
+No group color coding is applied in verbose mode — the structured format already groups topics by role.
+
+---
+
+## Sorting (`topic_sort`)
+
+By default, topics appear in the order `ros2` reports them (alphabetical).
+
+Setting `topic_sort: group` reorders topics:
+
+1. **System topics first** — `/parameter_events` and `/rosout` always appear at the top
+2. **Matched topics grouped by color** — groups appear in first-occurrence order; topics are alphabetical within each group
+3. **Unmatched topics last** — alphabetical
+
+Empty lines from the original output are dropped in group sort mode.
+
+```yaml
+# ~/.config/dendROS/defaults.yaml
+topic_sort: group # default | group
+```
+
+---
+
+## Badge and style options
+
+| Setting | Effect on topic list |
+|---|---|
+| `show_tag_cli: true` | `[LOC] /topic_name` — badge always to the left of the topic name |
+| `tag_style: inverted` | Badge rendered with colored background |
+| Per-group `show_tag: false` | Badge suppressed for that group only |
+| `unmatched_color` | Topics with no matching publisher group shown in the fallback color |
+| `unmatched_tag` | Badge shown next to unmatched topics (requires `unmatched_color`) |
+| `dim_unmatched` | Topics with no matching publisher group dimmed (only when `unmatched_color: null`) |
+| `topic_sort` | `default` = ros2 order (alphabetical); `group` = system first, then by publisher group |
+
diff --git a/install.sh b/install.sh
index eeac30b..079a742 100755
--- a/install.sh
+++ b/install.sh
@@ -52,6 +52,7 @@ $SUDO cp "${SCRIPT_DIR}/dendROS/dendros_node_info.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_node_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_service_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_action_list.py" "$INSTALL_DIR/"
+$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_topic_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_param_list.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendros_param_describe.py" "$INSTALL_DIR/"
$SUDO cp "${SCRIPT_DIR}/dendROS/dendROS.sh" "$INSTALL_DIR/"
@@ -63,6 +64,7 @@ $SUDO chmod +x "$INSTALL_DIR/dendros_node_list.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_node_info.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_service_list.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_action_list.py"
+$SUDO chmod +x "$INSTALL_DIR/dendros_topic_list.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_param_list.py"
$SUDO chmod +x "$INSTALL_DIR/dendros_param_describe.py"
$SUDO chmod 644 "$INSTALL_DIR/dendROS.sh"
diff --git a/mkdocs.yml b/mkdocs.yml
index 2874ae2..e520a89 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -80,6 +80,7 @@ nav:
- ros2 node info: node-info.md
- ros2 service list: service-list.md
- ros2 action list: action-list.md
+ - ros2 topic list: topic-list.md
- ros2 param list: param-list.md
- ros2 param describe: param-describe.md
- Tools:
diff --git a/test/unit/test_global_config.py b/test/unit/test_global_config.py
index e4e4f70..cf06ef0 100644
--- a/test/unit/test_global_config.py
+++ b/test/unit/test_global_config.py
@@ -90,6 +90,7 @@ def test_loads_existing_file(self, tmp_config):
"traceback_color": "red",
"tag_style": "inverted",
"show_default_services": False,
+ "topic_sort": "default",
"param_change_alert": True,
"param_change_alert_scope": "all",
"param_change_alert_style": "inverted",
@@ -175,6 +176,7 @@ def test_roundtrip_custom_values(self, tmp_config):
"traceback_color": "off",
"tag_style": "inverted",
"show_default_services": False,
+ "topic_sort": "group",
"param_change_alert": True,
"param_change_alert_scope": "all",
"param_change_alert_style": "inverted",
diff --git a/test/unit/test_topic_list.py b/test/unit/test_topic_list.py
new file mode 100644
index 0000000..f7db608
--- /dev/null
+++ b/test/unit/test_topic_list.py
@@ -0,0 +1,696 @@
+"""Tests for dendros_topic_list.py colorization."""
+
+import json
+import os
+import sys
+import subprocess
+
+import pytest
+import yaml
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
+TOPIC_LIST_PATH = os.path.join(REPO_ROOT, 'dendROS', 'dendros_topic_list.py')
+
+from conftest import assert_segment_colored, assert_segment_uncolored, colored_segments, strip_ansi
+
+
+# ── Helper ────────────────────────────────────────────────────────────────────
+
+def run_topic_list(tmp_prefix, topics, global_cfg=None, node_colors=None,
+ pub_nodes=None, sub_nodes=None, timeout=10):
+ """Run dendros_topic_list.py with topic names as stdin; return (stdout, stderr, rc)."""
+ env = os.environ.copy()
+ env['AMENT_PREFIX_PATH'] = tmp_prefix
+ env.pop('ROS_DISTRO', None)
+ env['HOME'] = tmp_prefix
+
+ cfg_dir = os.path.join(tmp_prefix, '.config', 'dendROS')
+ os.makedirs(cfg_dir, exist_ok=True)
+
+ if global_cfg:
+ with open(os.path.join(cfg_dir, 'defaults.yaml'), 'w') as f:
+ yaml.dump(global_cfg, f)
+ if node_colors:
+ with open(os.path.join(cfg_dir, 'node_colors.yaml'), 'w') as f:
+ yaml.dump(node_colors, f)
+
+ if pub_nodes is not None:
+ env['DENDROS_TOPIC_PUBLISHERS'] = json.dumps(pub_nodes)
+ if sub_nodes is not None:
+ env['DENDROS_TOPIC_SUBSCRIBERS'] = json.dumps(sub_nodes)
+
+ stdin = '\n'.join(topics) + '\n'
+ result = subprocess.run(
+ [sys.executable, TOPIC_LIST_PATH],
+ input=stdin.encode(),
+ capture_output=True,
+ env=env,
+ timeout=timeout,
+ )
+ return result.stdout.decode(), result.stderr.decode(), result.returncode
+
+
+def _line_for(stdout, topic):
+ """Return the output line containing `topic`."""
+ return next(l for l in stdout.splitlines() if topic in l)
+
+
+def _name_col(line, topic):
+ """Column of `topic` in the plain-text version of `line`."""
+ return strip_ansi(line).index(topic)
+
+
+# ── Publisher color resolution ────────────────────────────────────────────────
+
+class TestTopicListPublisherColor:
+ """Topic is colored with the primary publisher node's group color."""
+
+ def test_topic_colored_by_publisher(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert_segment_colored(stdout, '/chatter', '32')
+
+ def test_no_publisher_no_color(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': []},
+ sub_nodes={'/chatter': []})
+ assert_segment_uncolored(stdout, '/chatter')
+
+ def test_primary_publisher_wins_color(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'other': '33'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker', 'other']},
+ sub_nodes={'/chatter': []})
+ assert_segment_colored(stdout, '/chatter', '32')
+
+ def test_wildcard_publisher_color(self, tmp_path):
+ nc = {'color_map': {'nav2_*': '35'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/plan'],
+ node_colors=nc,
+ pub_nodes={'/plan': ['nav2_planner']},
+ sub_nodes={'/plan': []})
+ assert_segment_colored(stdout, '/plan', '35')
+
+ def test_empty_lines_preserved(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter', '', '/other'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker'], '/other': []},
+ sub_nodes={'/chatter': [], '/other': []})
+ assert '' in stdout.splitlines()
+
+ def test_output_indented(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ for line in stdout.splitlines():
+ if line:
+ assert line.startswith(' '), f"Expected 2-space indent, got: {line!r}"
+
+
+# ── Column alignment ──────────────────────────────────────────────────────────
+
+class TestTopicListAlignment:
+ """Topic names start at the same column regardless of pub block count."""
+
+ def test_names_aligned_across_different_pub_counts(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'nav2_*': '34'},
+ 'tag_map': {}, 'style_map': {}}
+ # /chatter: 1 pub group → 1 block; /plan: 2 pub groups → "1 1"
+ stdout, _, _ = run_topic_list(
+ str(tmp_path), ['/chatter', '/plan'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker'],
+ '/plan': ['nav2_planner', 'talker']},
+ sub_nodes={'/chatter': [], '/plan': []},
+ )
+ col_chatter = _name_col(_line_for(stdout, '/chatter'), '/chatter')
+ col_plan = _name_col(_line_for(stdout, '/plan'), '/plan')
+ assert col_chatter == col_plan
+
+ def test_sub_blocks_aligned_across_lines(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'listener': '33'},
+ 'tag_map': {}, 'style_map': {}}
+ # /chatter: short name; /longer_topic: long name — subs still start at same column
+ stdout, _, _ = run_topic_list(
+ str(tmp_path), ['/chatter', '/longer_topic'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker'],
+ '/longer_topic': ['talker']},
+ sub_nodes={'/chatter': ['listener'],
+ '/longer_topic': ['listener']},
+ )
+ # Find the byte offset of the sub block in each plain-text line
+ def sub_col(line):
+ plain = strip_ansi(line)
+ # sub block follows the topic name; find the digit after topic+padding
+ topic_end = plain.rstrip().rfind('1') # count block digit
+ return topic_end
+
+ chatter_sub = sub_col(_line_for(stdout, '/chatter'))
+ longer_sub = sub_col(_line_for(stdout, '/longer_topic'))
+ assert chatter_sub == longer_sub
+
+ def test_system_topic_name_at_same_column_as_others(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(
+ str(tmp_path), ['/chatter', '/parameter_events'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []},
+ )
+ col_chatter = _name_col(_line_for(stdout, '/chatter'), '/chatter')
+ col_pe = _name_col(_line_for(stdout, '/parameter_events'), '/parameter_events')
+ assert col_chatter == col_pe
+
+ def test_no_pub_topic_name_at_same_column(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ # /scan has pubs, /cmd_vel has none — both names should start at same column
+ stdout, _, _ = run_topic_list(
+ str(tmp_path), ['/scan', '/cmd_vel'],
+ node_colors=nc,
+ pub_nodes={'/scan': ['talker'], '/cmd_vel': []},
+ sub_nodes={'/scan': [], '/cmd_vel': []},
+ )
+ col_scan = _name_col(_line_for(stdout, '/scan'), '/scan')
+ col_cmd_vel = _name_col(_line_for(stdout, '/cmd_vel'), '/cmd_vel')
+ assert col_scan == col_cmd_vel
+
+
+# ── System topics ─────────────────────────────────────────────────────────────
+
+class TestSystemTopics:
+ """/parameter_events and /rosout shown plain — no color, tag, or count blocks."""
+
+ def test_parameter_events_no_ansi(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path),
+ ['/chatter', '/parameter_events'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ pe_line = _line_for(stdout, '/parameter_events')
+ assert '\033[' not in pe_line
+
+ def test_rosout_no_ansi(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter', '/rosout'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ rosout_line = _line_for(stdout, '/rosout')
+ assert '\033[' not in rosout_line
+
+ def test_system_topic_no_tag(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/parameter_events'],
+ node_colors=nc,
+ pub_nodes={}, sub_nodes={})
+ assert '[TLK]' not in stdout
+
+ def test_system_topic_no_count_blocks(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ # System topics are excluded from graph queries; no count blocks appear
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/rosout'],
+ node_colors=nc,
+ pub_nodes={}, sub_nodes={})
+ assert ';7m' not in stdout
+
+ def test_system_topic_still_appears(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter', '/rosout'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert '/rosout' in stdout
+
+ def test_system_topic_indented(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/parameter_events'],
+ node_colors=nc,
+ pub_nodes={}, sub_nodes={})
+ pe_line = _line_for(stdout, '/parameter_events')
+ assert pe_line.startswith(' ')
+
+ def test_system_topic_type_dimmed_with_t_flag(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ line = '/parameter_events [rcl_interfaces/msg/ParameterEvent]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={}, sub_nodes={})
+ # Type should still be dim even for system topics
+ assert '[\033[2mrcl_interfaces/msg/ParameterEvent\033[0m]' in stdout
+ # But the topic name itself has no color code
+ pe_line = _line_for(stdout, '/parameter_events')
+ assert not pe_line.startswith('\033[')
+
+
+# ── Count indicators ──────────────────────────────────────────────────────────
+
+class TestTopicListCounts:
+ """Publisher count blocks on left; subscriber count blocks on right."""
+
+ def test_single_pub_block_on_left(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ pub_block = '\033[32;7m1\033[0m'
+ assert pub_block in stdout
+ assert stdout.index(pub_block) < stdout.index('\033[32m/chatter\033[0m')
+
+ def test_single_sub_block_on_right(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'listener': '33'},
+ 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': ['listener']})
+ sub_block = '\033[33;7m1\033[0m'
+ assert sub_block in stdout
+ assert stdout.index('\033[32m/chatter\033[0m') < stdout.index(sub_block)
+
+ def test_multiple_pub_same_group_summed(self, tmp_path):
+ nc = {'color_map': {'talker*': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker', 'talker2']},
+ sub_nodes={'/chatter': []})
+ assert '\033[32;7m2\033[0m' in stdout
+
+ def test_multiple_pub_different_groups(self, tmp_path):
+ nc = {'color_map': {'loc_node': '34', 'nav_node': '35'},
+ 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/scan'],
+ node_colors=nc,
+ pub_nodes={'/scan': ['loc_node', 'nav_node']},
+ sub_nodes={'/scan': []})
+ assert '\033[34;7m1\033[0m' in stdout
+ assert '\033[35;7m1\033[0m' in stdout
+
+ def test_multiple_sub_different_groups(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'loc': '34', 'nav': '35'},
+ 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/scan'],
+ node_colors=nc,
+ pub_nodes={'/scan': ['talker']},
+ sub_nodes={'/scan': ['loc', 'nav']})
+ assert '\033[34;7m1\033[0m' in stdout
+ assert '\033[35;7m1\033[0m' in stdout
+
+ def test_no_sub_no_trailing_blocks(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ line = _line_for(stdout, '/chatter')
+ plain = strip_ansi(line)
+ after = plain[plain.index('/chatter') + len('/chatter'):]
+ assert after.strip() == ''
+
+ def test_unmatched_publisher_no_pub_block(self, tmp_path):
+ nc = {'color_map': {'known': '34'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['unknown_publisher']},
+ sub_nodes={'/chatter': []})
+ assert ';7m' not in stdout
+
+
+# ── Tag badge ─────────────────────────────────────────────────────────────────
+
+class TestTopicListTag:
+ """Tag badge appears between pub count blocks and the topic name."""
+
+ def test_tag_shown_left_of_topic(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert '[TLK]' in stdout
+ line = _line_for(stdout, '/chatter')
+ assert line.index('[TLK]') < line.index('/chatter')
+
+ def test_tag_between_pub_block_and_topic(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ pub_block = '\033[32;7m1\033[0m'
+ topic_colored = '\033[32m/chatter\033[0m'
+ assert pub_block in stdout
+ assert stdout.index(pub_block) < stdout.index('[TLK]')
+ assert stdout.index('[TLK]') < stdout.index(topic_colored)
+
+ def test_tag_hidden_when_show_tag_cli_false(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []},
+ global_cfg={'show_tag_cli': False})
+ assert '[TLK]' not in stdout
+
+ def test_inverted_tag_style(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'},
+ 'style_map': {'talker': 'inverted'}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert '\033[32;7m[TLK]' in stdout
+
+ def test_empty_label_no_badge(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': ''}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert '[]' not in strip_ansi(stdout)
+
+ def test_unmatched_tag(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ node_colors=nc,
+ pub_nodes={'/chatter': []},
+ sub_nodes={'/chatter': []},
+ global_cfg={'unmatched_color': 'white',
+ 'unmatched_tag': '?'})
+ assert '[?]' in stdout
+
+ def test_tag_width_counted_in_alignment(self, tmp_path):
+ """Badge width is included in mid column, so sub blocks stay aligned."""
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {'talker': 'TLK'}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(
+ str(tmp_path), ['/chatter', '/scan'],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker'], '/scan': ['talker']},
+ sub_nodes={'/chatter': ['talker'], '/scan': ['talker']},
+ )
+ chatter_sub = strip_ansi(_line_for(stdout, '/chatter')).rstrip().rfind('1')
+ scan_sub = strip_ansi(_line_for(stdout, '/scan')).rstrip().rfind('1')
+ assert chatter_sub == scan_sub
+
+
+# ── -t flag: type annotation dimmed ──────────────────────────────────────────
+
+class TestTopicListTypeFlag:
+
+ def test_type_dimmed_matched_topic(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ line = '/chatter [std_msgs/msg/String]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert_segment_colored(stdout, '/chatter', '32')
+ assert '[\033[2mstd_msgs/msg/String\033[0m]' in stdout
+
+ def test_type_dimmed_unmatched_topic(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ line = '/unknown [std_msgs/msg/String]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={'/unknown': []},
+ sub_nodes={'/unknown': []})
+ assert '[\033[2mstd_msgs/msg/String\033[0m]' in stdout
+
+ def test_type_not_colored_with_node_color(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ line = '/chatter [std_msgs/msg/String]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': []})
+ assert '\033[32mstd_msgs' not in stdout
+
+ def test_sub_blocks_after_type(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'listener': '33'}, 'tag_map': {}, 'style_map': {}}
+ line = '/chatter [std_msgs/msg/String]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker']},
+ sub_nodes={'/chatter': ['listener']})
+ type_bracket = '[\033[2mstd_msgs/msg/String\033[0m]'
+ sub_block = '\033[33;7m1\033[0m'
+ assert stdout.index(type_bracket) < stdout.index(sub_block)
+
+
+# ── Unmatched / dim_unmatched ─────────────────────────────────────────────────
+
+class TestTopicListUnmatched:
+
+ def test_unmatched_color_applied(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/unknown'],
+ node_colors=nc,
+ pub_nodes={'/unknown': []},
+ sub_nodes={'/unknown': []},
+ global_cfg={'unmatched_color': 'cyan'})
+ assert '\033[' in stdout
+
+ def test_dim_unmatched(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/unknown'],
+ node_colors=nc,
+ pub_nodes={'/unknown': []},
+ sub_nodes={'/unknown': []},
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2m/unknown\033[0m' in stdout
+
+ def test_passthrough_when_no_match(self, tmp_path):
+ nc = {'color_map': {'known': '34'}, 'tag_map': {}, 'style_map': {}}
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/unknown'],
+ node_colors=nc,
+ pub_nodes={'/unknown': []},
+ sub_nodes={'/unknown': []})
+ assert_segment_uncolored(stdout, '/unknown')
+
+ def test_passthrough_no_config(self, tmp_path):
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/chatter'],
+ pub_nodes={'/chatter': []},
+ sub_nodes={'/chatter': []})
+ assert '/chatter' in stdout
+ assert '\033[' not in stdout
+
+ def test_dim_unmatched_with_type(self, tmp_path):
+ nc = {'color_map': {}, 'tag_map': {}, 'style_map': {}}
+ line = '/unknown [std_msgs/msg/String]'
+ stdout, _, _ = run_topic_list(str(tmp_path), [line],
+ node_colors=nc,
+ pub_nodes={'/unknown': []},
+ sub_nodes={'/unknown': []},
+ global_cfg={'dim_unmatched': True})
+ assert '\033[2m/unknown\033[0m' in stdout
+ assert '[\033[2mstd_msgs/msg/String\033[0m]' in stdout
+
+
+# ── topic list -v (verbose) ───────────────────────────────────────────────────
+
+class TestTopicListVerbose:
+
+ def _run(self, tmp_path, lines):
+ return run_topic_list(str(tmp_path), lines)
+
+ def test_section_headers_bold(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 1 publisher',
+ ])
+ assert '\033[1mPublished topics:\033[0m' in stdout
+
+ def test_subscribed_header_bold(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Subscribed topics:',
+ ' * /chatter [std_msgs/msg/String] 1 subscriber',
+ ])
+ assert '\033[1mSubscribed topics:\033[0m' in stdout
+
+ def test_type_dimmed(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 1 publisher',
+ ])
+ assert '[\033[2mstd_msgs/msg/String\033[0m]' in stdout
+
+ def test_topic_name_uncolored(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 1 publisher',
+ ])
+ line = next(l for l in stdout.splitlines() if '/chatter' in l)
+ plain = strip_ansi(line)
+ assert plain.startswith(' * /chatter')
+
+ def test_count_preserved(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 3 publishers',
+ ])
+ assert '3 publishers' in strip_ansi(stdout)
+
+ def test_empty_lines_pass_through(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 1 publisher',
+ '',
+ 'Subscribed topics:',
+ ' * /chatter [std_msgs/msg/String] 1 subscriber',
+ ])
+ lines = stdout.splitlines()
+ assert '' in lines
+
+ def test_multiple_topics(self, tmp_path):
+ stdout, _, _ = self._run(tmp_path, [
+ 'Published topics:',
+ ' * /chatter [std_msgs/msg/String] 1 publisher',
+ ' * /scan [sensor_msgs/msg/LaserScan] 1 publisher',
+ ])
+ assert '[\033[2mstd_msgs/msg/String\033[0m]' in stdout
+ assert '[\033[2msensor_msgs/msg/LaserScan\033[0m]' in stdout
+
+
+# ── AMENT_PREFIX_PATH fallback ────────────────────────────────────────────────
+
+class TestTopicListFallback:
+
+ def test_fallback_scan_loads_config(self, tmp_path):
+ cfg_dir = tmp_path / 'share' / 'my_pkg' / 'config'
+ cfg_dir.mkdir(parents=True)
+ (cfg_dir / 'dendROS.yaml').write_text(yaml.dump({
+ 'groups': {'loc': {'color': 'bold blue', 'label': 'LOC', 'nodes': ['amcl']}}
+ }))
+ stdout, _, _ = run_topic_list(str(tmp_path), ['/scan'],
+ pub_nodes={'/scan': ['amcl']},
+ sub_nodes={'/scan': []})
+ assert '\033[' in stdout
+
+
+# ── topic_sort ────────────────────────────────────────────────────────────────
+
+class TestTopicListSort:
+ """topic_sort=group: system topics first, then by publisher group, then unmatched."""
+
+ # Helper that returns the plain-text order of topic names in stdout
+ @staticmethod
+ def _order(stdout):
+ result = []
+ for line in stdout.splitlines():
+ plain = strip_ansi(line).strip()
+ if not plain:
+ continue
+ # Topic name is the first token starting with '/'
+ # (pub count blocks and badge come before it)
+ for token in plain.split():
+ if token.startswith('/'):
+ result.append(token)
+ break
+ return result
+
+ def test_default_sort_preserves_ros2_order(self, tmp_path):
+ nc = {'color_map': {'talker': '32', 'nav': '34'}, 'tag_map': {}, 'style_map': {}}
+ topics = ['/zebra', '/alpha', '/middle']
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes={'/zebra': ['talker'], '/alpha': ['nav'],
+ '/middle': ['talker']},
+ sub_nodes={t: [] for t in topics})
+ order = self._order(stdout)
+ assert order == ['/zebra', '/alpha', '/middle']
+
+ def test_group_sort_topics_grouped_by_color(self, tmp_path):
+ nc = {'color_map': {'loc': '34', 'nav': '35'}, 'tag_map': {}, 'style_map': {}}
+ # Input order: nav topic, then loc topic, then another nav topic
+ topics = ['/cmd_vel', '/scan', '/plan']
+ pub = {'/cmd_vel': ['nav'], '/scan': ['loc'], '/plan': ['nav']}
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes=pub,
+ sub_nodes={t: [] for t in topics},
+ global_cfg={'topic_sort': 'group'})
+ order = self._order(stdout)
+ # /cmd_vel and /plan (nav group, first seen) before /scan (loc group)
+ cmd_i = order.index('/cmd_vel')
+ plan_i = order.index('/plan')
+ scan_i = order.index('/scan')
+ assert cmd_i < scan_i and plan_i < scan_i
+
+ def test_group_sort_alphabetical_within_group(self, tmp_path):
+ nc = {'color_map': {'nav': '35'}, 'tag_map': {}, 'style_map': {}}
+ topics = ['/zebra', '/alpha', '/middle']
+ pub = {t: ['nav'] for t in topics}
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes=pub,
+ sub_nodes={t: [] for t in topics},
+ global_cfg={'topic_sort': 'group'})
+ order = self._order(stdout)
+ assert order == ['/alpha', '/middle', '/zebra']
+
+ def test_group_sort_system_topics_first(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ topics = ['/chatter', '/parameter_events', '/scan', '/rosout']
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes={'/chatter': ['talker'], '/scan': ['talker']},
+ sub_nodes={t: [] for t in topics
+ if t not in ('/parameter_events', '/rosout')},
+ global_cfg={'topic_sort': 'group'})
+ order = self._order(stdout)
+ # /parameter_events and /rosout come before any application topic
+ app_indices = [order.index(t) for t in ['/chatter', '/scan'] if t in order]
+ sys_indices = [order.index(t) for t in ['/parameter_events', '/rosout'] if t in order]
+ assert all(s < a for s in sys_indices for a in app_indices)
+
+ def test_group_sort_unmatched_topics_last(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ topics = ['/unknown', '/chatter']
+ pub = {'/unknown': [], '/chatter': ['talker']}
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes=pub,
+ sub_nodes={t: [] for t in topics},
+ global_cfg={'topic_sort': 'group'})
+ order = self._order(stdout)
+ assert order.index('/chatter') < order.index('/unknown')
+
+ def test_group_sort_empty_lines_dropped(self, tmp_path):
+ nc = {'color_map': {'talker': '32'}, 'tag_map': {}, 'style_map': {}}
+ topics = ['/chatter', '', '/scan']
+ pub = {'/chatter': ['talker'], '/scan': ['talker']}
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes=pub,
+ sub_nodes={'/chatter': [], '/scan': []},
+ global_cfg={'topic_sort': 'group'})
+ # In group mode, empty lines are dropped
+ non_empty = [l for l in stdout.splitlines() if l]
+ assert len(non_empty) == 2
+
+ def test_group_sort_groups_in_first_occurrence_order(self, tmp_path):
+ nc = {'color_map': {'nav': '35', 'loc': '34'}, 'tag_map': {}, 'style_map': {}}
+ # nav topic appears first in input → nav group comes first in sorted output
+ topics = ['/cmd_vel', '/scan', '/plan']
+ pub = {'/cmd_vel': ['nav'], '/scan': ['loc'], '/plan': ['nav']}
+ stdout, _, _ = run_topic_list(str(tmp_path), topics,
+ node_colors=nc,
+ pub_nodes=pub,
+ sub_nodes={t: [] for t in topics},
+ global_cfg={'topic_sort': 'group'})
+ order = self._order(stdout)
+ # nav group (first seen) → loc group
+ first_nav = min(order.index(t) for t in ['/cmd_vel', '/plan'])
+ first_loc = order.index('/scan')
+ assert first_nav < first_loc