diff --git a/GUIDE.md b/GUIDE.md index 342f91db..caaf07b1 100644 --- a/GUIDE.md +++ b/GUIDE.md @@ -132,33 +132,26 @@ JACK (bridges via `-X seq`) - ✅ Expression Pedal (CC 75) - sends to virtual port - ✅ Footswitches (CC 60-63) - send to virtual port when pressed - ✅ Rotary Encoder Rotation (Tweak1=CC70, Tweak2=CC71) - send to virtual port -- ✅ Encoder Button Presses - Can optionally send MIDI via `shortpress` config (see below) +- ❌ Encoder Button Presses - UI navigation / handler actions only; do not send MIDI -### Encoder Button Configuration (v3 only) +### Encoder Button Behavior (v3) -Encoder buttons support configurable `shortpress` callbacks: +Encoder button presses are wired to handler callbacks, not MIDI. A short press +invokes the built-in click handler (`universal_encoder_sw`, UI navigation); a +long press invokes the named `longpress` callback from config. ```yaml encoders: - id: 1 - midi_CC: 70 # Rotation + midi_CC: 70 # rotation sends CC 70 longpress: previous_snapshot - shortpress: universal_encoder_sw # Default if omitted - - - id: 2 - midi_CC: 71 - shortpress: - callback: send_midi_cc - args: {cc: 72} # Button sends CC 72 to virtual port ``` -Shortpress accepts string (callback name) or object with `callback` and `args` (expanded as kwargs). - #### Implementation Details | Control | Shortpress | Longpress | |---------|------------|-----------| -| Encoder | String or `{callback, args}` via `encoderconfig.parse_shortpress_config()` | String only (no args) | +| Encoder | Built-in click handler (UI nav) - no config | String (callback name) - no args | | Footswitch | Hardcoded (toggle/MIDI) - no config | String or list (group names) - no args | | `GpioSwitch` | `callback_arg` (dict→kwargs, value→arg, None) | `longpress_callback_arg` (dict→kwargs, value→arg, None) | | `AnalogSwitch` | Single `callback(state)` - no separate longpress | Same callback, state=LONGPRESSED | @@ -403,7 +396,7 @@ Outbound behaviour depends on the initiator: **Encoders** (`pistomp/encoder.py`, `pistomp/encodermidicontrol.py`): - **Base**: Quadrature decoding, GPIO interrupts, debounce - **MIDI Control**: Sends CC on rotation (v3 tweak encoders) -- **Buttons**: Configurable shortpress (callback + args) and longpress +- **Buttons**: Built-in shortpress (UI nav) and configurable longpress (callback name) - **State Machines** (v1/v2 only): `TopEncoderMode`, `BotEncoderMode`, `UniversalEncoderMode` **Analog Controls** (`pistomp/analogmidicontrol.py`): diff --git a/modalapi/external_midi.py b/modalapi/external_midi.py new file mode 100644 index 00000000..05893c85 --- /dev/null +++ b/modalapi/external_midi.py @@ -0,0 +1,250 @@ +# This file is part of pi-stomp. +# +# pi-stomp is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# pi-stomp is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pi-stomp. If not, see . + +from __future__ import annotations + +import logging +import time +from typing import TypedDict + +import rtmidi +from rtmidi import MidiOut as RtMidiOut + +MidiMessage = list[int] + +EXTERNAL_INSTANCE_ID = "External" + +PORT_RETRY_BACKOFF_S = 5.0 # don't re-enumerate a failed port more often than this + + +class ExternalMidiOut: + """ + Wrapper around external MIDI port that implements the same interface as + the virtual port's midiout. Allows controls to send MIDI to external devices + transparently, with automatic fallback to virtual port if device unavailable. + """ + + def __init__(self, external_midi_manager: ExternalMidiManager, port_name: str, fallback_midiout: RtMidiOut): + self.external_midi = external_midi_manager + self.port_name = port_name + self.fallback = fallback_midiout + + def send_message(self, message: MidiMessage) -> None: + try: + success = self.external_midi.send_raw(self.port_name, message) + except Exception: + logging.warning(f"External MIDI send failed for port {self.port_name}, falling back to virtual") + self.fallback.send_message(message) + return + if not success: + self.fallback.send_message(message) + + +class ExternalMidiConfig(TypedDict, total=False): + enabled: bool + send_delay_ms: int + messages: dict[str, list[MidiMessage]] + + +def _client_name_of(port: str) -> str: + """Extract the ALSA client name from an rtmidi port string like 'Client Name:Port Name N:M'.""" + return port.split(":")[0].strip() + + +class ExternalMidiManager: + """ + Manages external MIDI device synchronization. + Sends MIDI messages to external devices when pedalboards are loaded. + + Config keys in `messages` and `midi_port` are ALSA client names exactly as + reported by the device (e.g. 'Source Audio C4 Synth'). Matching is + case-insensitive against the client-name prefix of each rtmidi port string. + """ + + def __init__(self): + self.midi_ports: dict[str, rtmidi.MidiOut] = {} + self.messages: dict[str, list[MidiMessage]] = {} + self.enabled: bool = False + self.send_delay_ms: int = 10 + self._open_failures: dict[str, float] = {} + + def update_config(self, cfg: ExternalMidiConfig | None) -> None: + """Update configuration incrementally; only fields present are updated.""" + if cfg is None: + return + + if "enabled" in cfg: + self.enabled = cfg["enabled"] + if self.enabled: + logging.debug("External MIDI enabled") + else: + logging.debug("External MIDI disabled") + + if "send_delay_ms" in cfg: + self.send_delay_ms = cfg["send_delay_ms"] + + if "messages" in cfg: + # Merge messages at port level (replace entire message list per port) + self.messages.update(cfg["messages"]) + + def _get_available_ports(self) -> list[str]: + try: + temp_out = rtmidi.MidiOut() + ports = temp_out.get_ports() + del temp_out + return ports + except Exception as e: + logging.error(f"Failed to enumerate MIDI ports: {e}") + return [] + + def _find_port_index(self, device_name: str) -> int | None: + """Return the index of the first port whose ALSA client name matches device_name (case-insensitive).""" + available = self._get_available_ports() + for idx, port in enumerate(available): + if _client_name_of(port).lower() == device_name.lower(): + logging.info(f"Found MIDI port: {port} (index {idx})") + return idx + logging.warning(f"No MIDI port found with device name: {device_name!r}") + return None + + def open_port(self, port_name: str) -> bool: + """Eagerly open a port at routing time so the first poll-loop send doesn't enumerate.""" + return self._init_port(port_name) is not None + + def _init_port(self, port_name: str) -> rtmidi.MidiOut | None: + if port_name in self.midi_ports: + return self.midi_ports[port_name] + + last_fail = self._open_failures.get(port_name) + if last_fail is not None and (time.monotonic() - last_fail) < PORT_RETRY_BACKOFF_S: + return None + + port_idx = self._find_port_index(port_name) + if port_idx is None: + self._open_failures[port_name] = time.monotonic() + return None + + try: + midi_out = rtmidi.MidiOut() + midi_out.open_port(port_idx) + self.midi_ports[port_name] = midi_out + self._open_failures.pop(port_name, None) + logging.info(f"Opened MIDI port: {port_name}") + return midi_out + except Exception as e: + logging.error(f"Failed to open MIDI port {port_name} (index {port_idx}): {e}") + self._open_failures[port_name] = time.monotonic() + return None + + def _invalidate_port(self, port_name: str) -> None: + """ + Invalidate a port that has failed, closing it and removing from cache. + This forces re-discovery/re-opening on next use. + """ + if port_name in self.midi_ports: + midi_out = self.midi_ports[port_name] + try: + midi_out.close_port() + except Exception as e: + logging.debug(f"Error closing invalidated port {port_name}: {e}") + del self.midi_ports[port_name] + self._open_failures[port_name] = time.monotonic() # back off before re-enumerating + + def _validate_midi_message(self, message: MidiMessage) -> bool: + if not isinstance(message, list) or len(message) < 2: + logging.warning(f"Invalid MIDI message format (must be list with 2+ bytes): {message}") + return False + + status = message[0] + if not (0x80 <= status <= 0xFF): + logging.warning(f"Invalid MIDI status byte (must be 0x80-0xFF): 0x{status:02X}") + return False + + for i, byte in enumerate(message[1:], start=1): + if not (0x00 <= byte <= 0x7F): + logging.warning(f"Invalid MIDI data byte at position {i} (must be 0x00-0x7F): 0x{byte:02X}") + return False + + return True + + def _send_messages(self, port_name: str, messages: list[MidiMessage], delay_ms: int = 10): + midi_out = self._init_port(port_name) + if midi_out is None: + logging.warning(f"Skipping messages for unavailable port: {port_name}") + return + + for i, message in enumerate(messages): + if not self._validate_midi_message(message): + logging.warning(f"Skipping invalid MIDI message {i + 1}/{len(messages)}: {message}") + continue + + try: + midi_out.send_message(message) + logging.debug(f"Sent MIDI message to {port_name}: {[f'0x{b:02X}' for b in message]}") + + # Delay between messages (except after last one) + if i < len(messages) - 1 and delay_ms > 0: + time.sleep(delay_ms / 1000.0) + + except Exception as e: + logging.error(f"Failed to send MIDI message to {port_name}: {e}") + self._invalidate_port(port_name) + break # Stop sending remaining messages to this broken port + + def send_raw(self, port_name: str, message: MidiMessage) -> bool: + """Send one raw message; True on success, False if the port is unavailable (caller falls back).""" + if not self.enabled: + return False + + midi_out = self._init_port(port_name) + if midi_out is None: + return False + + try: + midi_out.send_message(message) + return True + except Exception as e: + logging.error(f"Failed to send MIDI message to {port_name}: {e}") + self._invalidate_port(port_name) + return False + + def send_messages_for_pedalboard(self) -> bool: + """Send the current pedalboard's external messages (config set earlier via update_config).""" + if not self.enabled: + return False + + if not self.messages: + return False + + for port_name, messages in self.messages.items(): + if not messages: + continue + + logging.debug(f"Sending MIDI message(s) to {port_name}: {messages}") + self._send_messages(port_name, messages, self.send_delay_ms) + + return True + + def close(self): + """Close ports and clean up.""" + for port_name, midi_out in self.midi_ports.items(): + try: + midi_out.close_port() + logging.debug(f"Closed MIDI port: {port_name}") + except Exception as e: + logging.warning(f"Error closing MIDI port {port_name}: {e}") + + self.midi_ports.clear() + logging.info("External MIDI manager closed") diff --git a/modalapi/mod.py b/modalapi/mod.py index abe13a6b..fb4921ad 100755 --- a/modalapi/mod.py +++ b/modalapi/mod.py @@ -28,13 +28,15 @@ import modalapi.pedalboard as Pedalboard import common.parameter as Parameter import modalapi.wifi as Wifi +import modalapi.external_midi as ExternalMidi from blend.snapshot import SnapshotManager from modalapi.websocket_bridge import AsyncWebSocketBridge from modalapi.ws_protocol import parse_message, LoadingEndMessage, PedalSnapshotMessage, PluginBypassMessage, TransportMessage, AddPluginMessage, ParamSetMessage, WebSocketMessage from modalapi.pedalboard_monitor import FileChangeMonitor, read_pedalboard_bundle -from pistomp.analogmidicontrol import AnalogMidiControl +from pistomp.controller_manager import ControllerManager +from pistomp.current import Current from pistomp.footswitch import Footswitch from pistomp.handler import Handler from enum import Enum @@ -134,7 +136,7 @@ def __init__(self, audiocard, homedir): self.software_version = None self.git_describe = None - self.current = None # pointer to Current class + self.current: Current | None = None self.deep = None # pointer to current Deep class # Stores snapshot index from loading_end until pedalboard change is detected @@ -156,6 +158,9 @@ def __init__(self, audiocard, homedir): self.ws_bridge.start() logging.info("WebSocket bridge started") + # External MIDI device synchronization + self.external_midi = ExternalMidi.ExternalMidiManager() + # Callback function map. Key is the user specified name, value is function from this handler # Used for calling handler callbacks pointed to by names which may be user set in the config file self.callbacks = {"set_mod_tap_tempo": self.set_mod_tap_tempo, @@ -171,27 +176,18 @@ def __del__(self): logging.info("Handler cleanup") if self.wifi_manager: del self.wifi_manager - if self.ws_bridge is not None: - self.ws_bridge.stop() + self.ws_bridge.stop() def cleanup(self): if self.lcd is not None: self.lcd.cleanup() - if self.ws_bridge is not None: - self.ws_bridge.stop() - logging.info("WebSocket bridge stopped") + self.external_midi.close() + self.ws_bridge.stop() # Container for dynamic data which is unique to the "current" pedalboard # The self.current pointed above will point to this object which gets # replaced when a different pedalboard is made current (old Current object # gets deleted and a new one added via self.set_current_pedalboard() - class Current: - def __init__(self, pedalboard): - self.pedalboard = pedalboard - self.presets = {} - self.preset_index = 0 # Assumes pedalboard loads at snapshot 0 (default behavior) - self.analog_controllers = {} # { type: (plugin_name, param_name) } - class Deep: def __init__(self, plugin): self.plugin = plugin @@ -206,6 +202,8 @@ def __init__(self, plugin): def add_hardware(self, hardware): self.hardware = hardware + hardware.external_midi = self.external_midi + self._controller_manager = ControllerManager(hardware, reorder_footswitch_plugins=True) def add_lcd(self, lcd): self.lcd = lcd @@ -612,7 +610,7 @@ def set_current_pedalboard(self, pedalboard): del self.current # Create a new "current" - self.current = self.Current(pedalboard) + self.current = Current(pedalboard) if self.next_pedalboard_preset_index is not None: self.current.preset_index = self.next_pedalboard_preset_index @@ -631,6 +629,16 @@ def set_current_pedalboard(self, pedalboard): self.load_current_presets() self.update_lcd() + # Send external MIDI messages for this pedalboard + # Config was already updated by hardware.reinit(cfg) above + try: + self.external_midi.send_messages_for_pedalboard() + except Exception as e: + logging.warning(f"Failed to send external MIDI messages: {e}") + + # Sync current state of analog controls (expression pedals, etc.) + self.hardware.sync_analog_controls() + # Prepare blend modes if configured (snapshot-based activation) try: blend_configs = cfg.get('blend_snapshots', []) if cfg else [] @@ -695,36 +703,7 @@ def bind_current_pedalboard(self): # "current" being the pedalboard mod-host says is current # The pedalboard data has already been loaded, but this will overlay # any real time settings - footswitch_plugins = [] - if self.current.pedalboard: - #logging.debug(self.current.pedalboard.to_json()) - for plugin in self.current.pedalboard.plugins: - if plugin is None or plugin.parameters is None: - continue - for sym, param in plugin.parameters.items(): - if param.binding is not None: - controller = self.hardware.controllers.get(param.binding) - if controller is not None: - # TODO possibly use a setter instead of accessing var directly - # What if multiple params could map to the same controller? - controller.parameter = param - controller.set_value(param.value) - plugin.controllers.append(controller) - if isinstance(controller, Footswitch): - # TODO sort this list so selection orders correctly (sort on midi_CC?) - plugin.has_footswitch = True - footswitch_plugins.append(plugin) - controller.set_category(plugin.category) - elif isinstance(controller, AnalogMidiControl): - key = "%s:%s" % (plugin.instance_id, param.name) - controller.cfg[Token.CATEGORY] = plugin.category # somewhat LAME adding to cfg dict - controller.cfg[Token.TYPE] = controller.type - self.current.analog_controllers[key] = controller.cfg - - # Move Footswitch controlled plugins to the end of the list - self.current.pedalboard.plugins = [elem for elem in self.current.pedalboard.plugins - if elem.has_footswitch is False] - self.current.pedalboard.plugins += footswitch_plugins + self._controller_manager.bind(self.current) def pedalboard_select(self, direction): # 0 means the pedalboard field is selected but a new pedalboard hasn't been scrolled to yet diff --git a/modalapi/modhandler.py b/modalapi/modhandler.py index 2fc0ccc3..a2356fd1 100755 --- a/modalapi/modhandler.py +++ b/modalapi/modhandler.py @@ -24,24 +24,24 @@ import subprocess import sys import yaml -from typing import Any - from typing import cast, Any import common.token as Token import common.util as util import modalapi.pedalboard as Pedalboard import modalapi.wifi as Wifi +import modalapi.external_midi as ExternalMidi +from modalapi.external_midi import EXTERNAL_INSTANCE_ID from pistomp.lcd320x240 import Lcd -from pistomp.hardware import Controller, Hardware +from pistomp.hardware import Hardware import pistomp.settings as Settings from blend.snapshot import SnapshotManager from modalapi.websocket_bridge import AsyncWebSocketBridge from modalapi.ws_protocol import parse_message, LoadingEndMessage, PedalSnapshotMessage, PluginBypassMessage, TransportMessage, AddPluginMessage, ParamSetMessage, WebSocketMessage from modalapi.pedalboard_monitor import FileChangeMonitor, read_pedalboard_bundle -from pistomp.analogmidicontrol import AnalogMidiControl -from pistomp.encodermidicontrol import EncoderMidiControl +from pistomp.controller_manager import ControllerManager +from pistomp.current import Current from pistomp.footswitch import Footswitch from pistomp.tuner import TunerEngine, TunerPanel, TunerSourceFactory from pistomp.tuner.source import AudioSource, build_source @@ -79,7 +79,7 @@ def __init__(self, audiocard: Audiocard, homedir, data_dir="/home/pistomp/data") self.bypass_left = False self.bypass_right = False - self.current: Modhandler.Current | None = None + self.current: Current | None = None self._lcd: Lcd | None = None self._hardware: Hardware | None = None @@ -125,6 +125,9 @@ def __init__(self, audiocard: Audiocard, homedir, data_dir="/home/pistomp/data") "toggle_tuner_enable": self.toggle_tuner_enable, } + # External MIDI device synchronization + self.external_midi = ExternalMidi.ExternalMidiManager() + # Blend mode manager - multiple blend snapshots per pedalboard self.blend_modes: dict[str, Any] = {} # {snapshot_name: BlendMode} self.active_blend_mode: Any | None = None # Currently active blend mode @@ -147,20 +150,8 @@ def cleanup(self): self._lcd.cleanup() if self._hardware is not None: self._hardware.cleanup() - if self.ws_bridge is not None: - self.ws_bridge.stop() - logging.info("WebSocket bridge stopped") - - # Container for dynamic data which is unique to the "current" pedalboard - # The self.current pointed above will point to this object which gets - # replaced when a different pedalboard is made current (old Current object - # gets deleted and a new one added via self.set_current_pedalboard() - class Current: - def __init__(self, pedalboard: Pedalboard.Pedalboard): - self.pedalboard: Pedalboard.Pedalboard = pedalboard - self.presets: dict[int, str] = {} - self.preset_index: int = 0 # Assumes pedalboard loads at snapshot 0 (default behavior) - self.analog_controllers: dict[str, dict[str, Any]] = {} # { type: (plugin_name, param_name) } + self.external_midi.close() + self.ws_bridge.stop() def _rest_get(self, url: str) -> Response | None: try: @@ -178,6 +169,8 @@ def _rest_post(self, url: str, *, json=None, data=None) -> Response | None: def add_hardware(self, hardware): self._hardware = hardware + hardware.external_midi = self.external_midi + self._controller_manager = ControllerManager(hardware) def add_lcd(self, lcd): self._lcd = lcd @@ -547,7 +540,7 @@ def set_current_pedalboard(self, pedalboard): del self.current # Create a new "current" - self.current = self.Current(pedalboard) + self.current = Current(pedalboard) if self.next_pedalboard_preset_index is not None: self.current.preset_index = self.next_pedalboard_preset_index @@ -568,6 +561,16 @@ def set_current_pedalboard(self, pedalboard): self.lcd.draw_main_panel() self.lcd.update_wifi(self.wifi_status) + # Send external MIDI messages for this pedalboard + # Config was already updated by hardware.reinit(cfg) above + try: + self.external_midi.send_messages_for_pedalboard() + except Exception as e: + logging.warning(f"Failed to send external MIDI messages: {e}") + + # Sync analog controls last: after bind + external send, matching mod.py + self.hardware.sync_analog_controls() + # Prepare blend modes if configured (snapshot-based activation) try: blend_configs = cfg.get('blend_snapshots', []) if cfg else [] @@ -611,49 +614,7 @@ def bind_current_pedalboard(self): # "current" being the pedalboard mod-host says is current # The pedalboard data has already been loaded, but this will overlay # any real time settings - footswitch_plugins = [] - if self.current: - #logging.debug(self.current.pedalboard.to_json()) - for plugin in self.current.pedalboard.plugins: - if plugin is None or plugin.parameters is None: - continue - for sym, param in plugin.parameters.items(): - if param.binding is not None: - controller = self.hardware.controllers.get(param.binding) - if controller is not None: - # TODO possibly use a setter instead of accessing var directly - # What if multiple params could map to the same controller? - controller.parameter = param # pyright: ignore[reportAttributeAccessIssue] - controller.set_value(param.value) - plugin.controllers.append(controller) - if isinstance(controller, Footswitch): - # TODO sort this list so selection orders correctly (sort on midi_CC?) - plugin.has_footswitch = True - footswitch_plugins.append(plugin) - controller.set_category(plugin.category) - elif isinstance(controller, AnalogMidiControl): - key = "%s:%s" % (plugin.instance_id, param.name) - controller.cfg[Token.CATEGORY] = plugin.category # somewhat LAME adding to cfg dict - controller.cfg[Token.TYPE] = controller.type - controller.cfg[Token.ID] = controller.id - self.current.analog_controllers[key] = controller.cfg - elif isinstance(controller, EncoderMidiControl): - key = "%s:%s" % (plugin.instance_id, param.name) - controller.cfg[Token.CATEGORY] = plugin.category # somewhat LAME adding to cfg dict - controller.cfg[Token.TYPE] = controller.type - controller.cfg[Token.ID] = controller.id - self.current.analog_controllers[key] = controller.cfg - - # LAME special case for volume control - # Doesn't seem quite right to add this here, but it's where all the mapped controls are bound - for e in self.hardware.encoders: - if e.type == Token.VOLUME: - cfg = { - Token.CATEGORY : None, - Token.TYPE : e.type, - Token.ID : e.id - } - self.current.analog_controllers[Token.VOLUME] = cfg + self._controller_manager.bind(self.current) def pedalboard_change(self, pedalboard=None): logging.info("Pedalboard change") @@ -795,11 +756,16 @@ def get_num_footswitches(self): def parameter_value_commit(self, param, value): param.value = value - # Audio parameter (volume, EQ, etc.) - no REST update needed + # Audio parameter (volume, EQ, etc.) - handled locally, no remote update needed if param.instance_id is None: self.audio_parameter_commit(param.symbol, value) return + # External MIDI parameters are local-only (visual feedback), no remote update needed + if param.instance_id == EXTERNAL_INSTANCE_ID: + logging.debug("Skipping remote update for external parameter: %s" % param.symbol) + return + self.ws_bridge.send_parameter(param.instance_id, param.symbol, param.value) def parameter_midi_change(self, param, direction): @@ -873,7 +839,7 @@ def user_backup_data(self, arg): logging.info("Data backup...") cmd = os.path.join(self.homedir, 'util', 'data-backup.sh') try: - output = subprocess.check_output([cmd, os.path.join(self.backup_dir, self.backup_file), self.data_dir]) + subprocess.check_output([cmd, os.path.join(self.backup_dir, self.backup_file), self.data_dir]) self.lcd.draw_message_dialog("Backup complete", "Info") logging.info("Backup complete") except subprocess.CalledProcessError as e: @@ -892,8 +858,8 @@ def user_restore_data(self, arg): logging.info("Restoring data backup...") cmd = os.path.join(self.homedir, 'util', 'data-restore.sh') try: - output = subprocess.check_output(['sudo', '-u', self.username, cmd, - os.path.join(self.backup_dir, self.backup_file), self.data_dir]) + subprocess.check_output(['sudo', '-u', self.username, cmd, + os.path.join(self.backup_dir, self.backup_file), self.data_dir]) logging.info("Restore complete") self.system_menu_restart_sound(None) except subprocess.CalledProcessError as e: diff --git a/pistomp/analogcontrol.py b/pistomp/analogcontrol.py index bd4a57d0..c5f853a8 100755 --- a/pistomp/analogcontrol.py +++ b/pistomp/analogcontrol.py @@ -17,14 +17,12 @@ class AnalogControl: - def __init__(self, spi, adc_channel, tolerance): - self.spi = spi self.adc_channel = adc_channel - self.last_read = 0 # this keeps track of the last potentiometer value + self.last_read = 0 # this keeps track of the last potentiometer value self.tolerance = tolerance # to keep from being jittery we'll only change the - # value when the control has moved a significant amount + # value when the control has moved a significant amount def readChannel(self): adc = self.spi.xfer2([1, (8 + self.adc_channel) << 4, 0]) @@ -34,7 +32,3 @@ def readChannel(self): def refresh(self): """Read current value from hardware and potentially take action.""" logging.error("AnalogControl subclass hasn't overriden the refresh method") - - def initialize(self): - """Called when the pedalboard has been loaded, e.g. to sync current value.""" - logging.error("AnalogControl subclass hasn't implemented initialize method") \ No newline at end of file diff --git a/pistomp/analogmidicontrol.py b/pistomp/analogmidicontrol.py index 20a31cb9..d31f6b0b 100755 --- a/pistomp/analogmidicontrol.py +++ b/pistomp/analogmidicontrol.py @@ -13,13 +13,15 @@ # You should have received a copy of the GNU General Public License # along with pi-stomp. If not, see . -from typing_extensions import override from typing import Any + from rtmidi.midiconstants import CONTROL_CHANGE import common.util as util import pistomp.analogcontrol as analogcontrol +import pistomp.controller as controller +from pistomp.controller import AnalogDisplayInfo import logging @@ -29,25 +31,11 @@ def as_midi_value(adc_value: int): return util.renormalize(adc_value, 0, 1023, 0, 127) -class AnalogMidiControl(analogcontrol.AnalogControl): - def __init__( - self, - spi, - adc_channel, - tolerance, - midi_CC, - midi_channel, - midiout, - type, - id=None, - cfg={}, - autosync=False, - value_change_callback=None, - ): +class AnalogMidiControl(analogcontrol.AnalogControl, controller.Controller): + def __init__(self, spi, adc_channel, tolerance, midi_CC, midi_channel, midiout, type, id=None, cfg={}, autosync=False, value_change_callback=None): super(AnalogMidiControl, self).__init__(spi, adc_channel, tolerance) - self.midi_CC = midi_CC + controller.Controller.__init__(self, midi_channel, midi_CC) self.midiout = midiout - self.midi_channel = midi_channel self.autosync = autosync # Parent member overrides @@ -64,55 +52,45 @@ def set_midi_channel(self, midi_channel): def set_value(self, value): self.value = value - def get_normalized_value(self) -> float: - """Current ADC reading normalized to [0.0, 1.0].""" - return self.last_read / 1023.0 - - @override - def initialize(self): - if not self.autosync: - return + def _clamp_endpoints(self, value: int) -> int: + """Clamp ADC values near endpoints to exact 0/1023 (deadband at extremes).""" + if value <= self.tolerance: + return 0 + if value >= 1023 - self.tolerance: + return 1023 + return value - # read the analog pin - value = self._clamp_endpoints(self.readChannel()) + def _send_value(self, value): + """Send ADC value as MIDI CC and invoke callback if set.""" set_volume = as_midi_value(value) - cc = [self.midi_channel | CONTROL_CHANGE, self.midi_CC, set_volume] - logging.debug("AnalogControl force-sending CC event %s" % cc) + logging.debug("AnalogControl Sending CC event %s" % cc) self.midiout.send_message(cc) - # save the reading to prevent duplicate sends on next poll + # Update last_read BEFORE the callback so get_normalized_value() reflects + # the new position when blend mode re-reads the control inside the callback. self.last_read = value - def _clamp_endpoints(self, value: int) -> int: - """Clamp ADC values within tolerance of endpoints to exact endpoints. + if self.value_change_callback: + self.value_change_callback(value, self) - Creates a deadband at both extremes so the control always reaches - exactly 0 and 1023, with natural hysteresis at the tolerance boundary. - """ - if value <= self.tolerance: - return 0 - if value >= 1023 - self.tolerance: - return 1023 - return value + def send_current_value(self): + """Force-send the current ADC value unconditionally. Used by sync_analog_controls().""" + value = self._clamp_endpoints(self.readChannel()) + self._send_value(value) - @override def refresh(self): - # read the analog pin value = self._clamp_endpoints(self.readChannel()) + if abs(value - self.last_read) > self.tolerance: + self._send_value(value) - # how much has it changed since the last read? - pot_adjust = abs(value - self.last_read) - value_changed = pot_adjust > self.tolerance - - if value_changed: - set_volume = as_midi_value(value) - - cc = [self.midi_channel | CONTROL_CHANGE, self.midi_CC, set_volume] - logging.debug("AnalogControl Sending CC event %s" % cc) - self.midiout.send_message(cc) - - self.last_read = value + def get_normalized_value(self) -> float: + return self.last_read / 1023.0 - if self.value_change_callback: - self.value_change_callback(value, self) + def get_display_info(self) -> AnalogDisplayInfo: + return { + **super(AnalogMidiControl, self).get_display_info(), + 'type': self.type, + 'id': self.id, + 'category': None, + } diff --git a/pistomp/analogswitch.py b/pistomp/analogswitch.py index 501fd6b9..27db4d34 100755 --- a/pistomp/analogswitch.py +++ b/pistomp/analogswitch.py @@ -20,14 +20,14 @@ import pistomp.switchstate as switchstate from pistomp.taptempo import TapTempo -LONG_PRESS_TIME = 0.5 # Hold seconds which defines a long press +LONG_PRESS_TIME = 0.5 # Hold seconds which defines a long press FALLING_THRESHOLD = 800 # ASSUMES 10-bit ADC, can be changed for debounce handling -class AnalogSwitch(analogcontrol.AnalogControl): +class AnalogSwitch(analogcontrol.AnalogControl): def __init__(self, spi, adc_channel, tolerance, callback, taptempo: TapTempo | None = None): super(AnalogSwitch, self).__init__(spi, adc_channel, tolerance) - #self.value = None # this keeps track of the last value, do we still need this? + # self.value = None # this keeps track of the last value, do we still need this? self.callback = callback self.state = switchstate.Value.RELEASED self.start_time = 0 @@ -59,9 +59,3 @@ def refresh(self): self.callback(switchstate.Value.RELEASED) elif self.state is switchstate.Value.LONGPRESSED: self.state = switchstate.Value.RELEASED - - @override - def initialize(self): - # no-op for stateless switches - pass - diff --git a/pistomp/config.py b/pistomp/config.py index c0ed1f62..e587b5fd 100644 --- a/pistomp/config.py +++ b/pistomp/config.py @@ -92,6 +92,10 @@ "midi_CC": { "type": "integer" }, + "midi_port": { + "type": "string", + "description": "Send MIDI to this external port instead of the virtual MIDI Through port; falls back to virtual if the device is unavailable (must match a port in external_midi)" + }, "preset": { "oneOf": [ { @@ -121,9 +125,16 @@ "adc_input": { "type": "integer" }, + "id": { + "type": "integer" + }, "midi_CC": { "type": "integer" }, + "midi_port": { + "type": "string", + "description": "Send MIDI to this external port instead of the virtual MIDI Through port; falls back to virtual if the device is unavailable (must match a port in external_midi)" + }, "threshold": { "type": "integer", "minimum": 0, @@ -154,6 +165,16 @@ "midi_CC": { "type": "integer" }, + "midi_port": { + "type": "string", + "description": "Send MIDI to this external port instead of the virtual MIDI Through port; falls back to virtual if the device is unavailable (must be the device name)" + }, + "midi_channel": { + "type": "integer", + "minimum": 0, + "maximum": 15, + "description": "Override MIDI channel for this encoder (0-15); useful when the external device is on a different channel than the hardware default" + }, "type": { "enum": ["KNOB", "VOLUME"] }, @@ -165,6 +186,32 @@ "id" ] } + }, + "external_midi": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean" + }, + "send_delay_ms": { + "type": "integer", + "minimum": 0 + }, + "messages": { + "type": "object", + "additionalProperties": { + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer", + "minimum": 0, + "maximum": 255 + } + } + } + } + } } }, "required": [ diff --git a/pistomp/controller.py b/pistomp/controller.py index 9061613a..f3680437 100755 --- a/pistomp/controller.py +++ b/pistomp/controller.py @@ -13,28 +13,68 @@ # You should have received a copy of the GNU General Public License # along with pi-stomp. If not, see . +from __future__ import annotations + +from dataclasses import dataclass from enum import Enum -import json import logging +from typing import TypedDict +from common.parameter import Parameter +from rtmidi import MidiOut -class Controller: +class RoutingDestination(Enum): + VIRTUAL = "virtual" + EXTERNAL = "external" + + +@dataclass(frozen=True) +class RoutingInfo: + destination: RoutingDestination + port_name: str | None = None + + @classmethod + def virtual(cls) -> "RoutingInfo": + return cls(destination=RoutingDestination.VIRTUAL) + + @classmethod + def external(cls, port_name: str) -> "RoutingInfo": + return cls(destination=RoutingDestination.EXTERNAL, port_name=port_name) - def __init__(self, midi_channel, midi_CC): - self.midi_channel = midi_channel - self.midi_CC = midi_CC - self.minimum = None - self.maximum = None - self.parameter = None - self.hardware_name = None - #self.type = None # this will conflict with encoder.type for EncoderMidiControl - self.midi_min = 0 - self.midi_max = 127 - def to_json(self): - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) +class AnalogDisplayInfo(TypedDict, total=False): + type: str | None # Token.KNOB, Token.EXPRESSION, Token.VOLUME + id: int | None # Position on screen (0-based from left); None if unpositioned + category: str | None + port_name: str | None # External port name if routed externally + midi_cc: int | None # MIDI CC for external routing display + + +# Per-pedalboard analog/encoder assignment display, keyed by "instance:param" +# (plugin-bound), "channel:cc" (external), or Token.VOLUME. +AnalogControllers = dict[str, AnalogDisplayInfo] + + +class Controller: + type: str | None = None # class default; not in __init__ — clashes with Encoder.type in EncoderController's MRO + + def __init__(self, midi_channel: int, midi_CC: int | None): + self.midi_channel: int = midi_channel + self.midi_CC: int | None = midi_CC + self.parameter: Parameter | None = None + self.midi_min: int = 0 + self.midi_max: int = 127 + self.midi_value: int = 0 + self.midiout: MidiOut | None = None - def set_value(self, bypass_value: float): - logging.error("Controller subclass hasn't overriden the set_value method") + def set_value(self, value: float) -> None: + logging.error(f"Controller subclass ({self.__class__.__name__}) hasn't overriden the set_value method") + def bind_to_parameter(self, parameter: Parameter) -> None: + self.parameter = parameter + self.set_value(parameter.value) + def get_display_info(self) -> AnalogDisplayInfo: + """Own-presentation only; routing-derived fields are added by the + registry owner (ControllerManager._bind_external_controllers).""" + return {} diff --git a/pistomp/controller_manager.py b/pistomp/controller_manager.py new file mode 100644 index 00000000..f50a168c --- /dev/null +++ b/pistomp/controller_manager.py @@ -0,0 +1,147 @@ +# This file is part of pi-stomp. +# +# pi-stomp is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# pi-stomp is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pi-stomp. If not, see . + +from __future__ import annotations + +import logging +from typing import cast, TYPE_CHECKING + +import common.token as Token +from pistomp.analogmidicontrol import AnalogMidiControl +from pistomp.controller import AnalogDisplayInfo +from pistomp.current import Current +from pistomp.encodermidicontrol import EncoderMidiControl +from pistomp.footswitch import Footswitch + +if TYPE_CHECKING: + from pistomp.hardware import Hardware + + +class ControllerManager: + """ + Manages controller/parameter bindings on the current pedalboard, + overlaying per-pedalboard config on top of the base. + Version differences are passed as flags rather than subclassed: + + reorder_footswitch_plugins v1 moves footswitch-controlled plugins to the + tail of the chain; v3 leaves order untouched. + """ + + def __init__( + self, + hardware: "Hardware", + *, + reorder_footswitch_plugins: bool = False, + ): + self._hw = hardware + self._reorder_footswitch_plugins = reorder_footswitch_plugins + + def bind(self, current: Current | None) -> None: + """Rebind all controllers for the active pedalboard state.""" + if current is None: + return + + # Clear previous parameter bindings from all controllers except volume. + for controller in self._hw.controllers.values(): + if controller.type != Token.VOLUME: + controller.parameter = None + + current.analog_controllers = {} + + if current.pedalboard: + footswitch_plugins = self._bind_plugin_parameters(current) + self._bind_volume_encoders(current) + if self._reorder_footswitch_plugins: + self._move_footswitch_plugins_to_end(current, footswitch_plugins) + + self._bind_external_controllers(current) + + def _bind_plugin_parameters(self, current) -> list: + """Bind controllers referenced by plugin parameters; return the plugins + that gained a footswitch.""" + footswitch_plugins = [] + for plugin in current.pedalboard.plugins: + if plugin is None or plugin.parameters is None: + continue + for param in plugin.parameters.values(): + if param.binding is None: + continue + controller = self._hw.controllers.get(param.binding) + if controller is None: + continue + + # External controllers aren't bound to plugin parameters. + if self._hw.is_external(controller): + logging.warning( + f"Plugin parameter {plugin.name}:{param.name} is bound to external controller " + f"{param.binding} (routed to {self._hw.external_port_name(controller)}) - ignoring plugin binding" + ) + continue + + controller.parameter = param + controller.set_value(param.value) + plugin.controllers.append(controller) + + if isinstance(controller, Footswitch): + plugin.has_footswitch = True + footswitch_plugins.append(plugin) + controller.set_category(plugin.category) + elif isinstance(controller, (AnalogMidiControl, EncoderMidiControl)): + key = "%s:%s" % (plugin.instance_id, param.name) + controller.cfg[Token.CATEGORY] = plugin.category # somewhat LAME adding to cfg dict + controller.cfg[Token.TYPE] = controller.type + controller.cfg[Token.ID] = controller.id + current.analog_controllers[key] = cast(AnalogDisplayInfo, controller.cfg) + return footswitch_plugins + + def _bind_volume_encoders(self, current) -> None: + """Surface VOLUME-type encoders in the assignment display (v3 only in + practice — v1 has no VOLUME-typed encoder).""" + for e in self._hw.encoders: + if e.type == Token.VOLUME: + entry: AnalogDisplayInfo = {"category": None, "type": e.type, "id": e.id} + current.analog_controllers[Token.VOLUME] = entry + + @staticmethod + def _move_footswitch_plugins_to_end(current, footswitch_plugins) -> None: + plugins = current.pedalboard.plugins + current.pedalboard.plugins = [p for p in plugins if p.has_footswitch is False] + footswitch_plugins + + def _bind_external_controllers(self, current) -> None: + """Externally-routed controllers: bind a synthetic parameter and show + them under an "External" category.""" + for controller in self._hw.controllers.values(): + if not self._hw.is_external(controller) or controller.midi_CC is None: + continue + port_name = self._hw.external_port_name(controller) + + controller.parameter = self._hw.create_external_parameter( + controller, port_name, controller.midi_channel, controller.midi_CC + ) + + if not isinstance(controller, (AnalogMidiControl, EncoderMidiControl)): + continue + key = f"{controller.midi_channel}:{controller.midi_CC}" + # Seed type/id (the encoder's get_display_info is empty); routing + # fields come from the registry, not the control. + entry: AnalogDisplayInfo = { + "type": controller.type, + "id": controller.id, + **controller.get_display_info(), + "port_name": port_name, + "midi_cc": controller.midi_CC, + "category": "External", + } + current.analog_controllers[key] = entry diff --git a/pistomp/current.py b/pistomp/current.py new file mode 100644 index 00000000..dd448e16 --- /dev/null +++ b/pistomp/current.py @@ -0,0 +1,31 @@ +# This file is part of pi-stomp. +# +# pi-stomp is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# pi-stomp is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pi-stomp. If not, see . + +from __future__ import annotations + +from dataclasses import dataclass, field + +from pistomp.controller import AnalogControllers +from modalapi.pedalboard import Pedalboard + + +@dataclass +class Current: + """Mutable per-pedalboard state for the active ("current") pedalboard.""" + + pedalboard: Pedalboard + presets: dict[int, str] = field(default_factory=dict) + preset_index: int = 0 # Assumes pedalboard loads at snapshot 0 (default behavior) + analog_controllers: AnalogControllers = field(default_factory=dict) diff --git a/pistomp/hardware.py b/pistomp/hardware.py index 3b921c5a..bcf02d68 100755 --- a/pistomp/hardware.py +++ b/pistomp/hardware.py @@ -20,6 +20,8 @@ import common.token as Token import common.util as Util +import common.parameter as Parameter +from common.parameter import TTL_PROPERTIES, TTL_INTEGER from pistomp.analogcontrol import AnalogControl import pistomp.analogmidicontrol as AnalogMidiControl import pistomp.encoder as Encoder @@ -29,6 +31,9 @@ import pistomp.taptempo as taptempo from abc import ABC, abstractmethod +from rtmidi import MidiOut +from modalapi.external_midi import ExternalMidiOut, ExternalMidiManager, EXTERNAL_INSTANCE_ID +from pistomp.controller import RoutingInfo, RoutingDestination import pistomp.relay as Relay Controller = Union[AnalogMidiControl.AnalogMidiControl, EncoderMidiControl.EncoderMidiControl, Footswitch.Footswitch] @@ -63,6 +68,10 @@ def __init__(self, default_config, handler, midiout, refresh_callback): self.debounce_map = None self.ledstrip = None self.taptempo = taptempo.TapTempo(None) + self.external_midi: ExternalMidiManager | None = None + # control → destination; absent means internal (virtual/mod-host). + # Rebuilt every reinit in __route_section. + self.external_routing: dict[Controller, RoutingInfo] = {} def toggle_tap_tempo_enable(self, bpm: float = 0.0): if self.taptempo: @@ -98,6 +107,22 @@ def poll_controls(self): if s: s.check_longpress_events() + def sync_analog_controls(self): + """Send current values of analog controls with autosync enabled via MIDI.""" + for control in self.analog_controls: + if getattr(control, 'autosync', False) and hasattr(control, 'send_current_value'): + try: + control.send_current_value() + except Exception as e: + logging.warning(f"Failed to sync analog control {control.midi_CC}: {e}") + + def is_external(self, controller: Controller) -> bool: + return controller in self.external_routing + + def external_port_name(self, controller: Controller) -> str | None: + info = self.external_routing.get(controller) + return info.port_name if info is not None else None + def poll_indicators(self): for i in self.indicators: i.refresh() @@ -113,6 +138,7 @@ def recalibrateVU_baseline(self, baseline): def reinit(self, cfg): # reinit hardware as specified by the new cfg context (after pedalboard change, etc.) self.cfg = self.default_cfg.copy() + self.external_routing.clear() # rebuilt by __route_section for this cfg overlay self.__init_midi_default() @@ -123,18 +149,17 @@ def reinit(self, cfg): self.__init_footswitches(self.cfg) self.__init_encoders(self.cfg) - # Analog control configuration - for ac in self.analog_controls: - try: - ac.initialize() - except Exception as e: - logging.warning(f"Failed to initialize analog control {ac}: {e}") + # External MIDI configuration + self.__init_external_midi(self.cfg) + self.__apply_midi_routing(self.cfg) # Pedalboard specific config if cfg is not None: self.__init_midi(cfg) self.__init_footswitches(cfg) + self.__init_external_midi(cfg) self.__init_encoders(cfg) + self.__apply_midi_routing(cfg) @abstractmethod def init_analog_controls(self): @@ -223,17 +248,20 @@ def create_footswitches(self, cfg): if taptempo: taptempo.set_callback(self.handler.get_callback(tap_tempo_callback)) + # midi_port routing is applied later in __apply_midi_routing (external_midi is None here) + midiout = self.midiout + fs: Footswitch.Footswitch | None = None if adc_input is not None: fs = Footswitch.Footswitch(id if id else idx, gpio_output, pixel, midi_cc, midi_channel, - self.midiout, refresh_callback=self.refresh_callback, + midiout, refresh_callback=self.refresh_callback, adc_input=adc_input, spi=self.spi, taptempo = taptempo) logging.debug("Created Footswitch on ADC input: %d, Midi Chan: %d, CC: %s" % (adc_input, midi_channel, midi_cc)) elif gpio_input is not None: fs = Footswitch.Footswitch(id if id else idx, gpio_output, pixel, midi_cc, midi_channel, - self.midiout, refresh_callback=self.refresh_callback, + midiout, refresh_callback=self.refresh_callback, gpio_input=gpio_input, taptempo = taptempo) logging.debug("Created Footswitch on GPIO input: %d, Midi Chan: %d, CC: %s" % @@ -273,6 +301,7 @@ def create_analog_controls(self, cfg): if autosync is None: autosync = False # Default to False + # midi_port routing is applied later in __apply_midi_routing (external_midi is None here) control = AnalogMidiControl.AnalogMidiControl(self.spi, adc_input, threshold, midi_cc, midi_channel, self.midiout, control_type, id, c, autosync) self.analog_controls.append(control) @@ -282,7 +311,7 @@ def create_analog_controls(self, cfg): (adc_input, midi_channel, midi_cc)) @abstractmethod - def add_encoder(self, id, type, callback, longpress_callback, midi_channel, midi_cc) -> Encoder.Encoder | EncoderMidiControl.EncoderMidiControl: + def add_encoder(self, id, type, callback, longpress_callback, midi_channel, midi_cc, midiout=None) -> Encoder.Encoder | EncoderMidiControl.EncoderMidiControl: # This should be implemented by hardware subclasses that support tweak encoders (Tre at least) ... @@ -307,8 +336,9 @@ def create_encoders(self, cfg): logging.error("Config file error. Encoder specified without %s" % Token.ID) continue + # midi_port routing is applied later in __apply_midi_routing (external_midi is None here) try: - control = self.add_encoder(id, type, None, longpress_callback, midi_channel, midi_cc) + control = self.add_encoder(id, type, None, longpress_callback, midi_channel, midi_cc, midiout=self.midiout) self.encoders.append(control) except Exception: logging.exception("Failed to create encoder with config: %s" % c) @@ -330,6 +360,69 @@ def get_real_midi_channel(self, cfg): pass return chan + def create_external_parameter(self, controller, port_name, midi_channel, midi_cc): + name = f"{port_name}:{midi_cc}" + info = { + Token.NAME: name, + Token.SYMBOL: f"external_{port_name}_{midi_cc}", + Token.RANGES: { + Token.MINIMUM: 0, + Token.MAXIMUM: 127 + }, + TTL_PROPERTIES: [TTL_INTEGER] + } + val = getattr(controller, 'midi_value', 0) + return Parameter.Parameter(info, val, f"{midi_channel}:{midi_cc}", EXTERNAL_INSTANCE_ID) + + def __validate_midi_port(self, port_name): + if self.external_midi is None: + logging.warning(f"midi_port '{port_name}' set but external_midi not initialized, falling back to virtual") + return None + return port_name + + def __resolve_midiout(self, cfg_entry) -> tuple[MidiOut | ExternalMidiOut, RoutingInfo]: + """Return (midiout, routing): the wire to send on and its destination.""" + midi_port = Util.DICT_GET(cfg_entry, "midi_port") + if midi_port: + midi_port = self.__validate_midi_port(midi_port) + if not midi_port or self.external_midi is None: + return self.midiout, RoutingInfo.virtual() + self.external_midi.open_port(midi_port) # eager: first poll-loop send must not enumerate + return ExternalMidiOut(self.external_midi, midi_port, self.midiout), RoutingInfo.external(midi_port) + + def __route_section(self, cfg, section, controls, set_cc): + cfg_list = Util.DICT_GET(cfg[Token.HARDWARE], section) + if not cfg_list: + return + for entry in cfg_list: + ctrl_id = Util.DICT_GET(entry, Token.ID) + if ctrl_id is None: + continue + ctrl = next((c for c in controls if getattr(c, 'id', None) == ctrl_id), None) + if ctrl is None: + continue + # Footswitch midi_CC (incl. NONE removal) is owned by __init_footswitches; only encoders/analog here. + if set_cc: + midi_cc = Util.DICT_GET(entry, Token.MIDI_CC) + if midi_cc is not None and hasattr(ctrl, 'midi_CC'): + ctrl.midi_CC = midi_cc + midi_channel = Util.DICT_GET(entry, "midi_channel") + if midi_channel is not None and hasattr(ctrl, 'midi_channel'): + ctrl.midi_channel = midi_channel + ctrl.midiout, routing = self.__resolve_midiout(entry) + if routing.destination == RoutingDestination.EXTERNAL: + self.external_routing[ctrl] = routing + else: + self.external_routing.pop(ctrl, None) + + def __apply_midi_routing(self, cfg): + """Route every control to its external port or the virtual port (default + pedalboard cfg).""" + if cfg is None or Token.HARDWARE not in cfg: + return + self.__route_section(cfg, Token.ENCODERS, self.encoders, set_cc=True) + self.__route_section(cfg, Token.ANALOG_CONTROLLERS, self.analog_controls, set_cc=True) + self.__route_section(cfg, Token.FOOTSWITCHES, self.footswitches, set_cc=False) + def __init_midi_default(self): self.__init_midi(self.cfg) @@ -340,10 +433,15 @@ def __init_midi(self, cfg): if isinstance(ac, AnalogMidiControl.AnalogMidiControl): ac.set_midi_channel(self.midi_channel) - def __init_footswitches_default(self): - for fs in self.footswitches: - fs.clear_relays() - self.__init_footswitches(self.cfg) + def __init_external_midi(self, cfg): + """Initialize/update external MIDI config (called for both default and pedalboard).""" + if self.external_midi is None: + return + if cfg is None or Token.HARDWARE not in cfg: + return + ext_cfg = cfg[Token.HARDWARE].get("external_midi") + if ext_cfg: + self.external_midi.update_config(ext_cfg) def __init_footswitches(self, cfg): if cfg is None or (Token.HARDWARE not in cfg) or (Token.FOOTSWITCHES not in cfg[Token.HARDWARE]): diff --git a/pistomp/lcdgfx.py b/pistomp/lcdgfx.py index 95e34cb2..ba5c2010 100755 --- a/pistomp/lcdgfx.py +++ b/pistomp/lcdgfx.py @@ -332,9 +332,13 @@ def draw_analog_assignments(self, controllers): knob = Token.NONE for k, v in controllers.items(): control_type = util.DICT_GET(v, Token.TYPE) - s = k.split(":") - text = "%s:%s" % (self.shorten_name(s[0], self.plugin_width), - self.shorten_name(s[1], self.plugin_width_medium)) + if util.DICT_GET(v, Token.CATEGORY) == 'External': + port = util.DICT_GET(v, 'port_name') or '' + text = "%s:%s" % (self.shorten_name(port, self.plugin_width), util.DICT_GET(v, 'midi_cc')) + else: + s = k.split(":") + text = "%s:%s" % (self.shorten_name(s[0], self.plugin_width), + self.shorten_name(s[1], self.plugin_width_medium)) if control_type == Token.EXPRESSION: exp = text elif control_type == Token.KNOB: diff --git a/pistomp/pistomptre.py b/pistomp/pistomptre.py index 515e0be9..70bd5db2 100755 --- a/pistomp/pistomptre.py +++ b/pistomp/pistomptre.py @@ -69,7 +69,7 @@ def __init__(self, cfg, handler, midiout, refresh_callback): try: self.ledstrip = Ledstrip.Ledstrip() - except Exception as e: + except Exception: self.ledstrip = None logging.error("Could not initialize LED Strip") @@ -94,7 +94,7 @@ def init_lcd(self): spi_speed = 24 # Default to spec self.handler.add_lcd(Lcd.Lcd(self.handler.homedir, self.handler, flip=False, spi_speed_mhz=spi_speed)) - def add_encoder(self, id, type, callback, longpress_callback, midi_channel, midi_cc): + def add_encoder(self, id, type, callback, longpress_callback, midi_channel, midi_cc, midiout=None): enc_pins = Util.DICT_GET(ENC, id) if enc_pins is None: raise ValueError("Cannot create encoder object for id:", id) @@ -111,7 +111,8 @@ def add_encoder(self, id, type, callback, longpress_callback, midi_channel, midi enc = EncoderMidiControl.EncoderMidiControl(self.handler, d_pin=d_pin, clk_pin=clk_pin, callback=callback, midi_channel=midi_channel, midi_CC=midi_cc, - midiout=self.midiout, type=Token.KNOB, id=id) + midiout=midiout if midiout is not None else self.midiout, + type=Token.KNOB, id=id) if sw_pin is not None: longpress = self.handler.get_callback(longpress_callback) diff --git a/setup/config_templates/default_config.yml b/setup/config_templates/default_config.yml index 37050956..c603c87f 100755 --- a/setup/config_templates/default_config.yml +++ b/setup/config_templates/default_config.yml @@ -23,6 +23,8 @@ hardware: # adc_input: The analog input to which the switch is connected (required) # ledstrip_position: The position of the corresponding LED (optional) # midi_CC: The MIDI CC message to be sent when switch is clicked (optional) + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') # longpress: The name of a handler method to call when switch is long-pressed (optional) # longpress can be a list enclosed with []'s # @@ -55,6 +57,8 @@ hardware: # id: The id and position on the screen (starting with 0 on the left) # type: The control type, used to represent the control on the screen (optional) # midi_CC: The MIDI CC message to be sent when the control is adjusted (optional) + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') # autosync: Whether to send current value on pedalboard load (optional, default: false) # #analog_controllers: @@ -70,6 +74,10 @@ hardware: # type: The control type (default is KNOB, VOLUME controls output volume) # midi_CC: The MIDI CC message to be sent when the control is adjusted (optional) # cannot be used along with type=VOLUME + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') + # midi_channel: Override MIDI channel for this encoder (0-15, optional) + # Use when the external device is on a different channel than the hardware default # longpress: The name of a handler method to call when switch is long-pressed (optional) # encoders: @@ -82,6 +90,39 @@ hardware: - id: 3 type: VOLUME + # external_midi: + # External MIDI device synchronization - sends messages when pedalboards load + # Can be overridden per-pedalboard by creating .pedalboard/config.yml + # + # enabled: Enable/disable external MIDI (default: false) + # send_delay_ms: Delay between consecutive messages in milliseconds (default: 10) + # + # messages: MIDI messages to send on pedalboard load + # : Device name exactly as reported by the hardware (run `aconnect -l` to find it) + # - [0xSS, 0xDD, ...] List of MIDI messages (hex bytes) + # + # Example configuration: + # + # external_midi: + # enabled: true + # send_delay_ms: 10 + # messages: + # Source Audio C4 Synth: + # - [0xB0, 0x66, 0x00] # CC 102 = 0 (bypass) + # HX Stomp: + # - [0xC0, 0x00] # Program Change 0 + # + # MIDI message formats: + # Program Change: [0xCn, program] where n = channel (0-F) + # Control Change: [0xBn, cc, value] where n = channel (0-F) + # + # Per-pedalboard override example (.pedalboard/config.yml): + # hardware: + # external_midi: + # messages: + # Source Audio C4 Synth: + # - [0xC0, 0x05] # Program Change 5 for this pedalboard only + # Blend Mode Configuration (Pedalboard-specific) # This feature interpolates between snapshots based on analog input position. # IMPORTANT: This should be configured per-pedalboard in .pedalboard/config.yml diff --git a/setup/config_templates/default_config_pistomptre.yml b/setup/config_templates/default_config_pistomptre.yml index 64619516..62645ada 100644 --- a/setup/config_templates/default_config_pistomptre.yml +++ b/setup/config_templates/default_config_pistomptre.yml @@ -23,6 +23,10 @@ hardware: # adc_input: The analog input to which the switch is connected (required) # ledstrip_position: The position of the corresponding LED (optional) # midi_CC: The MIDI CC message to be sent when switch is clicked (optional) + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') + # midi_channel: Override MIDI channel for this encoder (0-15, optional) + # Use when the external device is on a different channel than the hardware default # longpress: The name of a handler method to call when switch is long-pressed (optional) # longpress can be a list enclosed with []'s # @@ -55,6 +59,8 @@ hardware: # id: The id and position on the screen (starting with 0 on the left) # type: The control type, used to represent the control on the screen (optional) # midi_CC: The MIDI CC message to be sent when the control is adjusted (optional) + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') # autosync: Whether to send current value on pedalboard load (optional, default: false) # #analog_controllers: @@ -70,6 +76,10 @@ hardware: # type: The control type (default is KNOB, VOLUME controls output volume) # midi_CC: The MIDI CC message to be sent when the control is adjusted (optional) # cannot be used along with type=VOLUME + # midi_port: Send MIDI to this external port INSTEAD of the virtual MIDI Through port (optional) + # Falls back to the virtual port only if the device is unavailable; must be the device name (e.g. 'Source Audio C4 Synth') + # midi_channel: Override MIDI channel for this encoder (0-15, optional) + # Use when the external device is on a different channel than the hardware default # longpress: The name of a handler method to call when switch is long-pressed (optional) # encoders: @@ -82,6 +92,39 @@ hardware: - id: 3 type: VOLUME + # external_midi: + # External MIDI device synchronization - sends messages when pedalboards load + # Can be overridden per-pedalboard by creating .pedalboard/config.yml + # + # enabled: Enable/disable external MIDI (default: false) + # send_delay_ms: Delay between consecutive messages in milliseconds (default: 10) + # + # messages: MIDI messages to send on pedalboard load + # : Exact ALSA client name of the device (run `aconnect -l` to find it) + # - [0xSS, 0xDD, ...] List of MIDI messages (hex bytes) + # + # Example configuration: + # + # external_midi: + # enabled: true + # send_delay_ms: 10 + # messages: + # Source Audio C4 Synth: + # - [0xB0, 0x66, 0x00] # CC 102 = 0 (bypass) + # HX Stomp: + # - [0xC0, 0x00] # Program Change 0 + # + # MIDI message formats: + # Program Change: [0xCn, program] where n = channel (0-F) + # Control Change: [0xBn, cc, value] where n = channel (0-F) + # + # Per-pedalboard override example (.pedalboard/config.yml): + # hardware: + # external_midi: + # messages: + # Source Audio C4 Synth: + # - [0xC0, 0x05] # Program Change 5 for this pedalboard only + # Blend Mode Configuration (Pedalboard-specific) # This feature interpolates between snapshots based on analog input position. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 30b1e536..357aacef 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -204,6 +204,7 @@ def fake_init_lcd(hw_self): patch("modalapi.pedalboard.Pedalboard.load_bundle"), patch("modalapi.wifi.WifiManager") as mock_wm_cls, patch("modalapi.mod.AsyncWebSocketBridge", return_value=fake_bridge), + patch("modalapi.mod.ExternalMidi.ExternalMidiManager"), patch("pistomp.hardware.Hardware.init_spi"), patch("pistomp.pistomp.Pistomp.run_test"), patch("pistomp.pistomp.Pistomp.init_lcd", fake_init_lcd), diff --git a/tests/integration/test_external_midi_loopback.py b/tests/integration/test_external_midi_loopback.py new file mode 100644 index 00000000..dbc03fad --- /dev/null +++ b/tests/integration/test_external_midi_loopback.py @@ -0,0 +1,215 @@ +""" +Real-device loopback tests for external MIDI routing. +macOS-only for now: virtual ports need CoreMIDI. Linux/CI needs the ALSA sequencer (`sudo modprobe snd-seq-dummy snd-seq`). +""" + +import sys +import time +import uuid +from unittest.mock import MagicMock + +import pytest + +import common.token as Token +import pistomp.switchstate as switchstate +from modalapi.external_midi import ExternalMidiManager, ExternalMidiOut +from pistomp.analogmidicontrol import AnalogMidiControl +from pistomp.encodermidicontrol import EncoderMidiControl +from pistomp.footswitch import Footswitch + +pytestmark = pytest.mark.skipif( + sys.platform != "darwin", + reason="virtual MIDI ports require CoreMIDI (macOS); Linux/CI needs ALSA seq modules", +) + + +def _wait_for(predicate, timeout=1.0): + """Poll predicate until true or timeout — MIDI delivery is asynchronous.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(0.005) + return predicate() + + +def _port_is_visible(port_name, timeout=1.0): + """Block until CoreMIDI publishes the virtual port to enumeration.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + import rtmidi + + temp = rtmidi.MidiOut() + ports = temp.get_ports() + del temp + if any(port_name in p for p in ports): + return True + time.sleep(0.01) + return False + + +@pytest.fixture +def loopback(): + """Open a real virtual MIDI-in port; yield (port_name, received_messages). + + Each test gets a uniquely-named port so async callbacks can't cross-talk. + The fixture blocks until CoreMIDI has published the port to enumeration, + eliminating the race between port creation and port discovery. + """ + import rtmidi + + port_name = f"pistomp-loopback-{uuid.uuid4().hex[:8]}" + midi_in = rtmidi.MidiIn() + midi_in.open_virtual_port(port_name) + received: list[list[int]] = [] + + def _on_message(event, data): + message, _delta = event + received.append(message) + + midi_in.set_callback(_on_message) + + assert _port_is_visible(port_name), f"Virtual port {port_name!r} never became visible" + + try: + yield port_name, received + finally: + midi_in.close_port() + del midi_in + + +def _manager_for(port_name): + """Enabled manager; port_name is the exact ALSA client device name used as the key.""" + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True, "send_delay_ms": 0}) + return mgr + + +class TestRealLoopback: + def test_send_raw_reaches_real_device(self, loopback): + port_name, received = loopback + mgr = _manager_for(port_name) + mgr.open_port(port_name) + + assert mgr.send_raw(port_name, [0xB0, 75, 42]) is True + assert _wait_for(lambda: received == [[0xB0, 75, 42]]) + mgr.close() + + def test_external_midi_out_prefers_real_port_over_fallback(self, loopback): + port_name, received = loopback + mgr = _manager_for(port_name) + mgr.open_port(port_name) + fallback = MagicMock() + out = ExternalMidiOut(mgr, port_name, fallback) + + out.send_message([0xB0, 70, 7]) + + assert _wait_for(lambda: received == [[0xB0, 70, 7]]) + fallback.send_message.assert_not_called() + mgr.close() + + def test_external_midi_out_falls_back_when_device_absent(self, loopback): + _, received = loopback + mgr = _manager_for("no-such-pistomp-port") + fallback = MagicMock() + out = ExternalMidiOut(mgr, "no-such-pistomp-port", fallback) + + out.send_message([0xB0, 70, 7]) + + fallback.send_message.assert_called_once_with([0xB0, 70, 7]) + assert received == [] + mgr.close() + + def test_send_messages_for_pedalboard_delivers_sequence(self, loopback): + port_name, received = loopback + mgr = _manager_for(port_name) + mgr.messages = {port_name: [[0xC0, 5], [0xB0, 7, 100]]} + mgr.open_port(port_name) + + assert mgr.send_messages_for_pedalboard() is True + + assert _wait_for(lambda: received == [[0xC0, 5], [0xB0, 7, 100]]) + mgr.close() + + def test_absent_device_backs_off_no_per_send_reenumerate(self, loopback, monkeypatch): + # End-to-end: an absent device must not re-enumerate on every send. + _, _received = loopback + mgr = _manager_for("no-such-pistomp-port") + + enumerations = [] + real_enumerate = mgr._get_available_ports + + def counting_enumerate(): + enumerations.append(1) + return real_enumerate() + + monkeypatch.setattr(mgr, "_get_available_ports", counting_enumerate) + + assert mgr.send_raw("no-such-pistomp-port", [0xB0, 1, 1]) is False + assert mgr.send_raw("no-such-pistomp-port", [0xB0, 1, 2]) is False # within backoff window + + assert len(enumerations) == 1 # second send short-circuited on backoff + mgr.close() + + +class TestControlRoutesToRealPort: + """End-to-end: a real control routed via ExternalMidiOut emits framed CC bytes on the wire. + + Each builds the actual control with its midiout set to an external wrapper (as + __apply_midi_routing does), drives the action method directly, and asserts the + bytes land on the virtual port. Covers every externally-routable control: + footswitch press, tweak-encoder rotation, expression/knob movement. + """ + + def _routed(self, port_name): + mgr = _manager_for(port_name) + fallback = MagicMock() + out = ExternalMidiOut(mgr, port_name, fallback) + # Eagerly open the port so the first send doesn't race enumeration. + mgr.open_port(port_name) + return mgr, out, fallback + + def test_footswitch_press_reaches_real_port(self, loopback): + port_name, received = loopback + mgr, out, fallback = self._routed(port_name) + fs = Footswitch(1, None, None, midi_CC=61, midi_channel=0, midiout=out, refresh_callback=MagicMock()) + + fs.pressed(switchstate.Value.RELEASED) # short press → CC 127 (toggled on) + + assert _wait_for(lambda: received == [[0xB0, 61, 127]]) + fallback.send_message.assert_not_called() + mgr.close() + + def test_tweak_encoder_rotation_reaches_real_port(self, loopback): + port_name, received = loopback + mgr, out, fallback = self._routed(port_name) + enc = EncoderMidiControl( + MagicMock(), + d_pin=None, + clk_pin=None, + callback=None, + midi_CC=70, + midi_channel=0, + midiout=out, + type=Token.KNOB, + id=1, + ) + + enc.refresh(1) # one detent clockwise → midi_value 0 + per_click(8) + + assert _wait_for(lambda: received == [[0xB0, 70, 8]]) + fallback.send_message.assert_not_called() + mgr.close() + + def test_expression_movement_reaches_real_port(self, loopback): + port_name, received = loopback + mgr, out, fallback = self._routed(port_name) + exp = AnalogMidiControl( + MagicMock(), 0, 16, midi_CC=75, midi_channel=0, midiout=out, type=Token.EXPRESSION, id=4 + ) + + exp._send_value(1023) # full travel → MIDI 127 + + assert _wait_for(lambda: received == [[0xB0, 75, 127]]) + fallback.send_message.assert_not_called() + mgr.close() diff --git a/tests/snapshots/v1/test_external_midi/test_external_analog_assignments_render/0.png b/tests/snapshots/v1/test_external_midi/test_external_analog_assignments_render/0.png new file mode 100644 index 00000000..a6bcb6e5 Binary files /dev/null and b/tests/snapshots/v1/test_external_midi/test_external_analog_assignments_render/0.png differ diff --git a/tests/test_config_schema.py b/tests/test_config_schema.py new file mode 100644 index 00000000..1223e86c --- /dev/null +++ b/tests/test_config_schema.py @@ -0,0 +1,62 @@ +"""Schema regression net for pistomp/config.py (S1: external_midi / midi_port). + +Guards two things: every shipped template stays schema-valid, and the +external-MIDI routing surface (per-control `midi_port` + the `external_midi` +block) is both accepted when well-formed and rejected when malformed. +""" + +import glob + +import pytest +import yaml +from jsonschema import Draft4Validator, exceptions, validate + +from pistomp.config import schema + +TEMPLATES = sorted(glob.glob("setup/config_templates/default_config*.yml")) + + +def test_schema_is_well_formed(): + Draft4Validator.check_schema(schema) + + +@pytest.mark.parametrize("path", TEMPLATES, ids=lambda p: p.rsplit("/", 1)[-1]) +def test_shipped_template_validates(path): + with open(path) as fh: + cfg = yaml.safe_load(fh) + validate(instance=cfg, schema=schema) + + +def test_midi_port_and_external_midi_accepted(): + cfg = { + "hardware": { + "version": 3.0, + "midi": {"channel": 14}, + "footswitches": [{"id": 0, "midi_CC": 60, "midi_port": "Source Audio C4 Synth"}], + "analog_controllers": [ + {"adc_input": 5, "id": 0, "midi_CC": 75, "midi_port": "HX Stomp", "type": "EXPRESSION"} + ], + "encoders": [{"id": 1, "midi_CC": 70, "midi_port": "Source Audio C4 Synth"}], + "external_midi": { + "enabled": True, + "send_delay_ms": 10, + "messages": { + "Source Audio C4 Synth": [[0xB0, 0x66, 0x00]], + "HX Stomp": [[0xC0, 0x00]], + }, + }, + } + } + validate(instance=cfg, schema=schema) + + +def test_non_string_midi_port_rejected(): + cfg = { + "hardware": { + "version": 3.0, + "midi": {"channel": 14}, + "encoders": [{"id": 1, "midi_port": 5}], + } + } + with pytest.raises(exceptions.ValidationError): + validate(instance=cfg, schema=schema) diff --git a/tests/test_controller_manager.py b/tests/test_controller_manager.py new file mode 100644 index 00000000..c3d5ad59 --- /dev/null +++ b/tests/test_controller_manager.py @@ -0,0 +1,69 @@ +"""ControllerManager.bind() — controller→parameter binding, version-flagged.""" + +from unittest.mock import MagicMock + +import common.token as Token +from pistomp.analogmidicontrol import AnalogMidiControl +from pistomp.controller_manager import ControllerManager +from pistomp.current import Current + + +def _make_current() -> Current: + """A Current with an empty (truthy) pedalboard — no plugin bindings.""" + current = Current(MagicMock()) + current.pedalboard.plugins = [] + return current + + +class _Ctl: + """Minimal internally-routed controller — v1 config supplies no VOLUME control + to use directly.""" + + def __init__(self, type): + self.type = type + self.parameter = "bound" + self.midi_CC = None + + +def test_bind_preserves_volume_binding_clears_others(): + """Controller.type is a class-level default, so the volume guard is type-safe: + bind() clears every controller's parameter except the VOLUME control's.""" + vol = _Ctl(Token.VOLUME) + knob = _Ctl(Token.KNOB) + hw = MagicMock() + hw.controllers = {"0:7": vol, "0:8": knob} + hw.encoders = [] + hw.is_external.return_value = False + + current = _make_current() + ControllerManager(hw).bind(current) + + assert vol.parameter == "bound" + assert knob.parameter is None + + +def _external_analog(midi_cc=75, midi_channel=0, ctrl_id=3): + return AnalogMidiControl(MagicMock(), 0, 16, midi_cc, midi_channel, MagicMock(), Token.KNOB, id=ctrl_id, cfg={}) + + +def test_external_controller_bound_and_displayed(): + """An externally-routed control isn't bound to any plugin parameter, so the + plugin loop skips it. The external block binds a synthetic parameter and adds + an "External" display entry — otherwise it'd be invisible on the LCD. Routing + is read from the hardware registry, not the control.""" + ctrl = _external_analog() + hw = MagicMock() + hw.controllers = {"0:75": ctrl} + hw.encoders = [] + hw.is_external.return_value = True + hw.external_port_name.return_value = "c4" + hw.create_external_parameter.return_value = "SYNTH_PARAM" + + current = _make_current() + ControllerManager(hw).bind(current) + + assert ctrl.parameter == "SYNTH_PARAM" + entry = current.analog_controllers["0:75"] + assert entry.get("category") == "External" + assert entry.get("port_name") == "c4" + assert entry.get("midi_cc") == 75 diff --git a/tests/test_external_midi.py b/tests/test_external_midi.py new file mode 100644 index 00000000..59aa4fbc --- /dev/null +++ b/tests/test_external_midi.py @@ -0,0 +1,344 @@ +""" +Tests for ExternalMidiManager and ExternalMidiOut. + +Config keys are ALSA client device names (e.g. 'Source Audio C4 Synth'). +Matching is case-insensitive against the client-name prefix of each rtmidi port string. +""" + +from typing import cast +from unittest.mock import MagicMock + +import pytest + +from modalapi.external_midi import ExternalMidiManager, ExternalMidiOut + + +def _port(mgr: ExternalMidiManager, name: str) -> MagicMock: + return cast(MagicMock, mgr.midi_ports[name]) + + +@pytest.fixture +def fake_ports(monkeypatch): + """Patch rtmidi.MidiOut so MidiOut() returns a fresh MagicMock per call. + + Returns (available_ports_list, created_outs_list). Modify available_ports_list + in place to control what get_ports() returns. created_outs_list collects each + MagicMock instance constructed via rtmidi.MidiOut(), so tests can assert on + open_port / send_message / close_port calls. + """ + available_ports: list[str] = [] + created_outs: list[MagicMock] = [] + + def _factory(*args, **kwargs): + m = MagicMock() + m.get_ports.return_value = list(available_ports) + created_outs.append(m) + return m + + monkeypatch.setattr("modalapi.external_midi.rtmidi.MidiOut", _factory) + return available_ports, created_outs + + +class TestUpdateConfig: + def test_disabled_by_default(self): + mgr = ExternalMidiManager() + assert mgr.enabled is False + assert mgr.send_delay_ms == 10 + assert mgr.messages == {} + + def test_none_config_is_noop(self): + mgr = ExternalMidiManager() + mgr.update_config(None) + assert mgr.enabled is False + + def test_enables_and_sets_delay(self): + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True, "send_delay_ms": 25}) + assert mgr.enabled is True + assert mgr.send_delay_ms == 25 + + def test_messages_merge_per_port(self): + """Per-pedalboard override updates only the named port, leaves others.""" + mgr = ExternalMidiManager() + mgr.update_config({"messages": {"Source Audio C4 Synth": [[0xC0, 0x00]], "HX Stomp": [[0xC0, 0x01]]}}) + mgr.update_config({"messages": {"Source Audio C4 Synth": [[0xC0, 0x05]]}}) + assert mgr.messages["Source Audio C4 Synth"] == [[0xC0, 0x05]] + assert mgr.messages["HX Stomp"] == [[0xC0, 0x01]] + + +class TestPortDiscovery: + def test_device_name_matches_by_client_name(self, fake_ports): + available, _ = fake_ports + available[:] = [ + "Midi Through:Midi Through Port-0 14:0", + "Source Audio C4 Synth:Source Audio C4 Synth MIDI 1 20:0", + "HX Stomp:HX Stomp MIDI 1 21:0", + ] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "messages": {"Source Audio C4 Synth": [[0xC0, 0x05]]}, + } + ) + assert mgr.send_messages_for_pedalboard() is True + _port(mgr, "Source Audio C4 Synth").open_port.assert_called_once_with(1) + + def test_device_name_match_is_case_insensitive(self, fake_ports): + available, _ = fake_ports + available[:] = ["Source Audio C4 Synth:Source Audio C4 Synth MIDI 1 20:0"] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "messages": {"source audio c4 synth": [[0xC0, 0x05]]}, + } + ) + assert mgr.send_messages_for_pedalboard() is True + _port(mgr, "source audio c4 synth").open_port.assert_called_once_with(0) + + def test_no_match_skips_port(self, fake_ports): + available, _ = fake_ports + available[:] = ["Midi Through:Midi Through Port-0 14:0"] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "messages": {"Source Audio C4 Synth": [[0xC0, 0x05]]}, + } + ) + mgr.send_messages_for_pedalboard() + assert "Source Audio C4 Synth" not in mgr.midi_ports + + def test_port_name_without_colon_matches_exactly(self, fake_ports): + """Port strings with no colon (short names) match the key directly.""" + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True, "messages": {"dev": [[0xC0, 0x00]]}}) + assert mgr.send_messages_for_pedalboard() is True + _port(mgr, "dev").open_port.assert_called_once_with(0) + + +class TestSendMessagesForPedalboard: + def test_disabled_short_circuits(self, fake_ports): + mgr = ExternalMidiManager() + mgr.update_config({"messages": {"dev": [[0xC0, 0]]}}) + assert mgr.send_messages_for_pedalboard() is False + + def test_no_messages_returns_false(self): + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.send_messages_for_pedalboard() is False + + def test_delay_applied_between_messages(self, fake_ports, monkeypatch): + available, _ = fake_ports + available[:] = ["dev"] + sleeps: list[float] = [] + monkeypatch.setattr("modalapi.external_midi.time.sleep", lambda s: sleeps.append(s)) + + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "send_delay_ms": 25, + "messages": {"dev": [[0xC0, 0], [0xC0, 1], [0xC0, 2]]}, + } + ) + mgr.send_messages_for_pedalboard() + # Delays between consecutive messages, but not after the last one + assert sleeps == [0.025, 0.025] + assert _port(mgr, "dev").send_message.call_count == 3 + + def test_invalid_message_is_skipped_others_sent(self, fake_ports): + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "send_delay_ms": 0, + "messages": { + "dev": [ + [0x00, 0x00], # invalid status byte (< 0x80) + [0xC0, 0x05], # valid + [0xB0, 0x80, 0x00], # invalid data byte (> 0x7F) + [0xB0, 0x10, 0x40], # valid + ] + }, + } + ) + mgr.send_messages_for_pedalboard() + sent = [c.args[0] for c in _port(mgr, "dev").send_message.call_args_list] + assert sent == [[0xC0, 0x05], [0xB0, 0x10, 0x40]] + + def test_failing_port_invalidated_and_stops(self, fake_ports): + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "send_delay_ms": 0, + "messages": {"dev": [[0xC0, 1], [0xC0, 2], [0xC0, 3]]}, + } + ) + midi_out = MagicMock() + midi_out.send_message.side_effect = RuntimeError("device disconnected") + mgr.midi_ports["dev"] = midi_out + + mgr.send_messages_for_pedalboard() + assert midi_out.send_message.call_count == 1 + assert "dev" not in mgr.midi_ports + midi_out.close_port.assert_called_once() + + +class TestInitPort: + def test_open_port_failure_returns_none_without_keyerror(self, monkeypatch): + """A failing open_port must not KeyError when the port is absent from the cache.""" + failing = MagicMock() + failing.open_port.side_effect = RuntimeError("cannot open") + monkeypatch.setattr("modalapi.external_midi.rtmidi.MidiOut", lambda *a, **k: failing) + + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + # Bypass enumeration so we reach open_port directly + monkeypatch.setattr(mgr, "_find_port_index", lambda name: 0) + + assert mgr._init_port("dev") is None + assert "dev" not in mgr.midi_ports + + +class TestOpenBackoff: + def test_failed_open_backs_off_no_reenumerate(self, fake_ports): + """A port whose device is absent must not re-enumerate on every poll tick.""" + available, created = fake_ports + available[:] = ["something_else"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + + assert mgr._init_port("missing_device") is None + n = len(created) + assert mgr._init_port("missing_device") is None + assert len(created) == n # second attempt skipped enumeration + + def test_open_port_eager_returns_bool(self, fake_ports): + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.open_port("dev") is True + assert "dev" in mgr.midi_ports + + +class TestSendRaw: + def test_returns_false_when_disabled(self): + mgr = ExternalMidiManager() + assert mgr.send_raw("dev", [0xB0, 10, 64]) is False + + def test_unknown_port_returns_false(self, fake_ports): + available, _ = fake_ports + available[:] = ["something_else"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.send_raw("ghost", [0xB0, 10, 64]) is False + + def test_sends_message_and_returns_true(self, fake_ports): + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.send_raw("dev", [0xB0, 80, 100]) is True + _port(mgr, "dev").send_message.assert_called_once_with([0xB0, 80, 100]) + + def test_sends_non_cc_message(self, fake_ports): + available, _ = fake_ports + available[:] = ["dev"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.send_raw("dev", [0xC0, 5]) is True # Program Change + _port(mgr, "dev").send_message.assert_called_once_with([0xC0, 5]) + + def test_returns_false_when_port_unavailable(self, fake_ports): + available, _ = fake_ports + available[:] = ["something_else"] + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + assert mgr.send_raw("dev", [0xB0, 10, 64]) is False + + def test_send_failure_invalidates_port(self, fake_ports): + mgr = ExternalMidiManager() + mgr.update_config({"enabled": True}) + midi_out = MagicMock() + midi_out.send_message.side_effect = RuntimeError("broken") + mgr.midi_ports["dev"] = midi_out + assert mgr.send_raw("dev", [0xB0, 10, 64]) is False + assert "dev" not in mgr.midi_ports + + +class TestClose: + def test_close_closes_all_ports(self, fake_ports): + available, _ = fake_ports + available[:] = ["a", "b"] + mgr = ExternalMidiManager() + mgr.update_config( + { + "enabled": True, + "messages": {"a": [[0xC0, 0]], "b": [[0xC0, 0]]}, + } + ) + mgr.send_messages_for_pedalboard() + outs = [_port(mgr, "a"), _port(mgr, "b")] + mgr.close() + for o in outs: + o.close_port.assert_called_once() + assert mgr.midi_ports == {} + + +class TestExternalMidiOut: + def test_delegates_to_send_raw(self): + manager = MagicMock() + manager.send_raw.return_value = True + fallback = MagicMock() + out = ExternalMidiOut(manager, "dev", fallback) + out.send_message([0xB3, 80, 100]) + manager.send_raw.assert_called_once_with("dev", [0xB3, 80, 100]) + fallback.send_message.assert_not_called() + + def test_falls_back_when_send_raw_fails(self): + manager = MagicMock() + manager.send_raw.return_value = False + fallback = MagicMock() + out = ExternalMidiOut(manager, "dev", fallback) + msg = [0xB0, 10, 64] + out.send_message(msg) + manager.send_raw.assert_called_once_with("dev", msg) + fallback.send_message.assert_called_once_with(msg) + + def test_falls_back_when_send_raw_raises(self): + """Exceptions from send_raw must not crash the poll loop.""" + manager = MagicMock() + manager.send_raw.side_effect = RuntimeError("port not in config") + fallback = MagicMock() + out = ExternalMidiOut(manager, "dev", fallback) + out.send_message([0xB0, 10, 64]) + fallback.send_message.assert_called_once_with([0xB0, 10, 64]) + + def test_non_cc_message_routes_to_external(self): + manager = MagicMock() + manager.send_raw.return_value = True + fallback = MagicMock() + out = ExternalMidiOut(manager, "dev", fallback) + msg = [0x92, 64, 100] # Note On + out.send_message(msg) + manager.send_raw.assert_called_once_with("dev", msg) + fallback.send_message.assert_not_called() + + def test_program_change_routes_to_external(self): + manager = MagicMock() + manager.send_raw.return_value = True + fallback = MagicMock() + out = ExternalMidiOut(manager, "dev", fallback) + out.send_message([0xC0, 5]) # Program Change, 2 bytes + manager.send_raw.assert_called_once_with("dev", [0xC0, 5]) + fallback.send_message.assert_not_called() diff --git a/tests/test_handler_cleanup.py b/tests/test_handler_cleanup.py new file mode 100644 index 00000000..aaf59e98 --- /dev/null +++ b/tests/test_handler_cleanup.py @@ -0,0 +1,49 @@ +"""External MIDI must be closed exactly once, in cleanup() — not also in __del__. + +Closing in both leads to a double-close: cleanup() runs at shutdown, then __del__ +fires again at GC/interpreter teardown on the same (already-closed) manager. +""" + +from unittest.mock import MagicMock + +from modalapi.mod import Mod +from modalapi.modhandler import Modhandler + + +class TestModCleanup: + def test_del_does_not_close_external_midi(self): + h = object.__new__(Mod) + h.wifi_manager = None + h.external_midi = MagicMock() + h.ws_bridge = MagicMock() + h.__del__() + h.external_midi.close.assert_not_called() + + def test_cleanup_closes_external_midi(self): + h = object.__new__(Mod) + h.wifi_manager = None + h.lcd = None + h.external_midi = MagicMock() + h.ws_bridge = MagicMock() + h.cleanup() + h.external_midi.close.assert_called_once() + + +class TestModhandlerCleanup: + def test_del_does_not_close_external_midi(self): + h = object.__new__(Modhandler) + h.wifi_manager = None + h.external_midi = MagicMock() + h.__del__() + h.external_midi.close.assert_not_called() + + def test_cleanup_closes_external_midi(self): + h = object.__new__(Modhandler) + h.wifi_manager = None + h._tuner_engine = None + h._lcd = None + h._hardware = None + h.external_midi = MagicMock() + h.ws_bridge = MagicMock() + h.cleanup() + h.external_midi.close.assert_called_once() diff --git a/tests/test_hardware.py b/tests/test_hardware.py new file mode 100644 index 00000000..2d9f6811 --- /dev/null +++ b/tests/test_hardware.py @@ -0,0 +1,166 @@ +"""Unit tests for pistomp.hardware.Hardware helpers.""" + +import logging +from typing import cast +from unittest.mock import MagicMock + +import pytest + +import common.token as Token +from modalapi.external_midi import ExternalMidiManager, ExternalMidiOut +from pistomp.hardware import Hardware + + +class _Ctl: + """Hashable double; SimpleNamespace can't key the registry (defines __eq__).""" + + def __init__(self, **kw): + self.__dict__.update(kw) + + +class _StubHardware(Hardware): + """Concrete subclass so object.__new__ works (Hardware is abstract).""" + + def init_analog_controls(self): ... + def init_encoders(self): ... + def init_footswitches(self): ... + def init_relays(self): ... + def cleanup(self): ... + def test(self): ... + def add_encoder(self, *a, **k): + raise NotImplementedError + + +def _validate(hw, port_name): + return hw._Hardware__validate_midi_port(port_name) + + +class TestValidateMidiPort: + def test_known_port_returned(self): + """A valid device name passes through unchanged.""" + hw = object.__new__(_StubHardware) + hw.external_midi = ExternalMidiManager() + assert _validate(hw, "Source Audio C4 Synth") == "Source Audio C4 Synth" + + def test_uninitialized_external_midi_logs_warning_not_error(self, caplog): + hw = object.__new__(_StubHardware) + hw.external_midi = None + + with caplog.at_level(logging.WARNING): + assert _validate(hw, "dev") is None + + recs = [r for r in caplog.records if "dev" in r.getMessage()] + assert recs + assert all(r.levelno == logging.WARNING for r in recs) + + +@pytest.fixture +def routed_hw(monkeypatch): + """A Hardware with one encoder, analog control, and footswitch, and a 'c4' external port.""" + mock_out = MagicMock() + mock_out.get_ports.return_value = ["My MIDI Device"] + monkeypatch.setattr("modalapi.external_midi.rtmidi.MidiOut", lambda *a, **k: mock_out) + + hw = object.__new__(_StubHardware) + hw.midiout = MagicMock(name="virtual") + hw.external_midi = ExternalMidiManager() + hw.external_midi.update_config({"enabled": True}) + + hw.encoders = [_Ctl(id=1, midi_CC=70, midi_channel=13, midiout=hw.midiout)] + hw.analog_controls = cast(list, [_Ctl(id=2, midi_CC=75, midiout=hw.midiout)]) + hw.footswitches = cast(list, [_Ctl(id=0, midiout=hw.midiout)]) + hw.external_routing = {} # __new__ bypasses __init__; __route_section writes here + return hw + + +def _route(hw, cfg): + hw._Hardware__apply_midi_routing(cfg) + + +class TestApplyMidiRouting: + def test_footswitch_routed_to_external_port(self, routed_hw): + """A footswitch with midi_port routes to its external port.""" + cfg = {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0, "midi_port": "My MIDI Device"}]}} + _route(routed_hw, cfg) + fs = routed_hw.footswitches[0] + assert isinstance(fs.midiout, ExternalMidiOut) + assert fs.midiout.port_name == "My MIDI Device" + assert routed_hw.is_external(fs) + assert routed_hw.external_port_name(fs) == "My MIDI Device" + assert routed_hw.external_routing[fs].port_name == "My MIDI Device" + + def test_unrouted_control_is_internal(self, routed_hw): + """No midi_port → internal: absent from the registry, sends to virtual.""" + cfg = {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0}]}} + _route(routed_hw, cfg) + fs = routed_hw.footswitches[0] + assert fs.midiout is routed_hw.midiout + assert not routed_hw.is_external(fs) + assert routed_hw.external_port_name(fs) is None + assert fs not in routed_hw.external_routing + + def test_routing_overlay_clears_external(self, routed_hw): + """A later cfg pass with no midi_port removes a prior external routing.""" + ext = {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0, "midi_port": "My MIDI Device"}]}} + _route(routed_hw, ext) + fs = routed_hw.footswitches[0] + assert routed_hw.is_external(fs) + _route(routed_hw, {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0}]}}) + assert not routed_hw.is_external(fs) + + def test_encoder_and_analog_routed_to_external_port(self, routed_hw): + cfg = { + Token.HARDWARE: { + Token.ENCODERS: [{Token.ID: 1, "midi_port": "My MIDI Device"}], + Token.ANALOG_CONTROLLERS: [{Token.ID: 2, "midi_port": "My MIDI Device"}], + } + } + _route(routed_hw, cfg) + assert isinstance(routed_hw.encoders[0].midiout, ExternalMidiOut) + assert isinstance(routed_hw.analog_controls[0].midiout, ExternalMidiOut) + + def test_encoder_midi_cc_override(self, routed_hw): + cfg = {Token.HARDWARE: {Token.ENCODERS: [{Token.ID: 1, Token.MIDI_CC: 99}]}} + _route(routed_hw, cfg) + assert routed_hw.encoders[0].midi_CC == 99 + + def test_encoder_midi_channel_override(self, routed_hw): + """External device may be on a different channel than the hardware default.""" + cfg = {Token.HARDWARE: {Token.ENCODERS: [{Token.ID: 1, "midi_channel": 0}]}} + _route(routed_hw, cfg) + assert routed_hw.encoders[0].midi_channel == 0 + + def test_no_midi_port_falls_back_to_virtual(self, routed_hw): + cfg = {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0}]}} + _route(routed_hw, cfg) + assert routed_hw.footswitches[0].midiout is routed_hw.midiout + + def test_external_port_opened_eagerly(self, routed_hw): + """The external port is opened at routing time, not lazily inside the poll loop.""" + cfg = {Token.HARDWARE: {Token.FOOTSWITCHES: [{Token.ID: 0, "midi_port": "My MIDI Device"}]}} + _route(routed_hw, cfg) + assert "My MIDI Device" in routed_hw.external_midi.midi_ports + + +class TestReinitDefaultRouting: + def test_reinit_applies_routing_for_default_cfg(self, monkeypatch): + """Routing is applied for the default config, not only for pedalboard cfg.""" + hw = object.__new__(_StubHardware) + hw.default_cfg = {Token.HARDWARE: {}} + hw.handler = MagicMock() + hw.external_routing = {} # __new__ bypasses __init__; reinit clears it + + for name in ( + "_Hardware__init_midi_default", + "_Hardware__init_footswitches", + "_Hardware__init_encoders", + "_Hardware__init_external_midi", + ): + setattr(hw, name, lambda *a, **k: None) + routed = [] + setattr(hw, "_Hardware__apply_midi_routing", lambda cfg: routed.append(cfg)) + monkeypatch.setattr("pistomp.footswitch.Footswitch.init", staticmethod(lambda cb: None)) + + hw.reinit(None) + + assert routed == [hw.cfg] diff --git a/tests/v1/test_binding.py b/tests/v1/test_binding.py new file mode 100644 index 00000000..0602d95b --- /dev/null +++ b/tests/v1/test_binding.py @@ -0,0 +1,68 @@ +"""Characterization of Mod.bind_current_pedalboard() (v1). + +Pins behavior that differs from the v3 (modhandler) twin so a future shared +controller-manager extraction can't silently flatten the asymmetry: + + - v1 reorders the plugin chain so footswitch-controlled plugins sit last. + - v1 populates analog_controllers from an AnalogMidiControl's cfg dict, + stamping CATEGORY + TYPE + ID. +""" + +import common.token as Token +from pistomp.analogmidicontrol import AnalogMidiControl +from pistomp.footswitch import Footswitch + + +def _key_of(hw, predicate): + return next(k for k, v in hw.controllers.items() if predicate(v)) + + +def test_v1_bind_footswitch_and_analog(v1_system, make_plugin): + handler = v1_system.handler + hw = v1_system.hw + + fs_key = _key_of(hw, lambda c: isinstance(c, Footswitch)) + knob_key = _key_of(hw, lambda c: isinstance(c, AnalogMidiControl) and c.type == Token.KNOB) + fs = hw.controllers[fs_key] + knob = hw.controllers[knob_key] + + fuzz = make_plugin("fuzz", category="Distortion") + fuzz.parameters[":bypass"].binding = fs_key + tone = make_plugin("tone", category="Filter") + tone.parameters[":bypass"].binding = knob_key + + handler.current.pedalboard.plugins = [fuzz, tone] + handler.bind_current_pedalboard() + + # Footswitch bound to its plugin's bypass param; plugin flagged. + assert fs.parameter is fuzz.parameters[":bypass"] + assert fuzz.has_footswitch is True + + # Analog control surfaced in the LCD assignment dict with category + type. + analog_entries = [ + cfg for cfg in handler.current.analog_controllers.values() + if cfg.get(Token.TYPE) == Token.KNOB + ] + assert len(analog_entries) == 1 + entry = analog_entries[0] + assert entry[Token.CATEGORY] == "Filter" + assert Token.ID in entry + + +def test_v1_bind_reorders_footswitch_plugins_to_end(v1_system, make_plugin): + """v1 moves footswitch-controlled plugins to the tail of the chain.""" + handler = v1_system.handler + hw = v1_system.hw + + fs_key = _key_of(hw, lambda c: isinstance(c, Footswitch)) + + fuzz = make_plugin("fuzz") # footswitch-controlled + fuzz.parameters[":bypass"].binding = fs_key + reverb = make_plugin("reverb") # no controller binding + + # Footswitch plugin deliberately placed first. + handler.current.pedalboard.plugins = [fuzz, reverb] + handler.bind_current_pedalboard() + + titles = [p.instance_id for p in handler.current.pedalboard.plugins] + assert titles == ["reverb", "fuzz"], "footswitch plugin must be reordered to the end" diff --git a/tests/v1/test_external_midi.py b/tests/v1/test_external_midi.py new file mode 100644 index 00000000..500d4c3a --- /dev/null +++ b/tests/v1/test_external_midi.py @@ -0,0 +1,15 @@ +"""External controllers must render in the v1 mono analog-assignments zone.""" + +import common.token as Token + + +def test_external_analog_assignments_render(v1_system, snapshot): + v1_system.handler.lcd.draw_analog_assignments( + { + "0:75": {Token.CATEGORY: "External", Token.TYPE: Token.KNOB, Token.ID: 3, + "port_name": "c4", "midi_cc": 75}, + "0:76": {Token.CATEGORY: "External", Token.TYPE: Token.EXPRESSION, Token.ID: 4, + "port_name": "hx", "midi_cc": 76}, + } + ) + snapshot() diff --git a/tests/v3/test_plugins.py b/tests/v3/test_plugins.py index 63dbb9b3..7ff38867 100644 --- a/tests/v3/test_plugins.py +++ b/tests/v3/test_plugins.py @@ -10,6 +10,7 @@ import pistomp.switchstate as switchstate from pistomp.encodermidicontrol import EncoderMidiControl +from pistomp.footswitch import Footswitch from common.parameter import Parameter from modalapi.plugin import Plugin import common.token as Token @@ -77,6 +78,31 @@ def test_v3_bind_volume_encoder_populates_analog_controllers(v3_system: SystemFi assert Token.VOLUME in handler.current.analog_controllers +def test_v3_bind_does_not_reorder_footswitch_plugins(v3_system: SystemFixture, make_plugin): + """v3 (modhandler) leaves the plugin chain order untouched. + + Counterpart to v1's reorder-to-end behavior — pins the asymmetry so a shared + controller-manager extraction must preserve it rather than unify it. + """ + handler = v3_system.handler + hw = v3_system.hw + + fs_key = next(k for k, v in hw.controllers.items() if isinstance(v, Footswitch)) + + fuzz = make_plugin("fuzz") # footswitch-controlled, placed first + fuzz.parameters[":bypass"].binding = fs_key + reverb = make_plugin("reverb") # no controller binding + + assert handler.current + handler.current.pedalboard.plugins = [fuzz, reverb] + handler.bind_current_pedalboard() + + assert hw.controllers[fs_key].parameter is fuzz.parameters[":bypass"] + assert fuzz.has_footswitch is True + titles = [p.instance_id for p in handler.current.pedalboard.plugins] + assert titles == ["fuzz", "reverb"], "v3 must not reorder footswitch plugins" + + # --------------------------------------------------------------------------- # Plugin bypass # --------------------------------------------------------------------------- @@ -102,6 +128,7 @@ def test_v3_toggle_plugin_bypass_via_footswitch_sends_midi_cc(v3_system: SystemF handler.toggle_plugin_bypass(None, plugin) + assert isinstance(fs.midiout, MagicMock) fs.midiout.send_message.assert_called_once() sent_cc = fs.midiout.send_message.call_args[0][0] assert sent_cc[1] == fs.midi_CC