", { class: "col-sm-11 col-sx-12" });
+
+ let $fieldDropdown = createEditableDropdown(
+ `${basePath}.field`,
+ getString("WF_Condition_field"),
+ fieldOptions,
+ tc.field || "",
+ `wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-field`
+ );
+
+ let $operatorDropdown = createEditableDropdown(
+ `${basePath}.operator`,
+ getString("WF_Condition_operator"),
+ operatorTypes,
+ tc.operator || "equals",
+ `wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-operator`
+ );
+
+ let $valueInput = createEditableInput(
+ `${basePath}.value`,
+ getString("WF_Condition_value"),
+ tc.value || "",
+ `wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-value`,
+ "condition-value-input"
+ );
+
+ $inner.append($fieldDropdown).append($operatorDropdown).append($valueInput);
+
+ let $removeWrap = $("
", { class: "button-container col-sm-1 col-sx-12" });
+ let $removeBtn = $("
", {
+ class: "pointer red-hover-text remove-target-condition",
+ wfIndex: wfIndex,
+ actionIndex: actionIndex,
+ tcIdx: tcIdx
+ }).append($("", { class: "fa-solid fa-trash" }));
+ $removeWrap.append($removeBtn);
+
+ $row.append($inner).append($removeWrap);
+ return $row;
+}
+
// --------------------------------------
// Updating the in-memory workflow object
function updateWorkflowObject(newValue, jsonPath) {
@@ -1097,15 +1225,23 @@ function getEmptyWorkflowJson()
// Save workflows JSON
function saveWorkflows()
{
+ showSpinner();
// encode for import
appConfBase64 = btoa(JSON.stringify(getWorkflowsJson()))
// import
- $.post('php/server/query_replace_config.php', { base64data: appConfBase64, fileName: "workflows.json" }, function(msg) {
- console.log(msg);
- // showMessage(msg);
- write_notification(`[WF]: ${msg}`, 'interrupt');
- });
+ $.post('php/server/query_replace_config.php', { base64data: appConfBase64, fileName: "workflows.json" })
+ .done(function(msg) {
+ console.log(msg);
+ write_notification(`[WF]: ${msg}`, 'interrupt');
+ })
+ .fail(function(jqXHR, textStatus, errorThrown) {
+ console.warn("Failed to save workflows.json:", textStatus, errorThrown);
+ write_notification(`[WF]: Save failed (${textStatus})`, 'interrupt');
+ })
+ .always(function() {
+ hideSpinner();
+ });
}
// ---------------------------------------------------
@@ -1168,6 +1304,42 @@ function saveWorkflows()
removeAction(getWorkflowsJson(), wfIndex, actionIndex);
});
+// Event Listeners for target condition rows (v2 cross-device targeting)
+$(document).on("click", ".add-target-condition", function () {
+ let wfIndex = parseInt($(this).attr("wfIndex"), 10);
+ let actionIndex = parseInt($(this).attr("actionIndex"), 10);
+ let wfs = getWorkflowsJson();
+
+ if (!wfs[wfIndex].actions[actionIndex].target) {
+ wfs[wfIndex].actions[actionIndex].target = { strategy: "query", conditions: [] };
+ }
+ if (!wfs[wfIndex].actions[actionIndex].target.conditions) {
+ wfs[wfIndex].actions[actionIndex].target.conditions = [];
+ }
+
+ wfs[wfIndex].actions[actionIndex].target.conditions.push({
+ field: fieldOptions[0],
+ operator: "equals",
+ value: ""
+ });
+
+ updateWorkflowsJson(wfs);
+ renderWorkflows();
+});
+
+$(document).on("click", ".remove-target-condition", function () {
+ let wfIndex = parseInt($(this).attr("wfIndex"), 10);
+ let actionIndex = parseInt($(this).attr("actionIndex"), 10);
+ let tcIdx = parseInt($(this).attr("tcIdx"), 10);
+ let wfs = getWorkflowsJson();
+
+ if (wfs[wfIndex].actions[actionIndex].target && wfs[wfIndex].actions[actionIndex].target.conditions) {
+ wfs[wfIndex].actions[actionIndex].target.conditions.splice(tcIdx, 1);
+ updateWorkflowsJson(wfs);
+ renderWorkflows();
+ }
+});
+
// Event Listeners for Removing Condition Groups
$(document).on("click", ".remove-condition-group", function () {
let wfIndex = $(this).attr("wfindex");
diff --git a/server/models/device_instance.py b/server/models/device_instance.py
index 10ba61cb8..1fafc9647 100755
--- a/server/models/device_instance.py
+++ b/server/models/device_instance.py
@@ -18,6 +18,7 @@
unlock_fields
)
from helper import is_random_mac, get_setting_value
+from workflows.constants import VALID_DEVICE_COLUMNS
from utils.datetime_utils import timeNowUTC
@@ -85,6 +86,11 @@ def getByGUID(self, devGUID):
SELECT * FROM Devices WHERE devGUID = ?
""", (devGUID,))
+ def getByMac(self, mac):
+ return self._fetchone("""
+ SELECT * FROM Devices WHERE devMac = ?
+ """, (mac,))
+
def exists(self, devGUID):
row = self._fetchone("""
SELECT COUNT(*) as count FROM Devices WHERE devGUID = ?
@@ -96,6 +102,49 @@ def getByIP(self, ip):
SELECT * FROM Devices WHERE devLastIP = ?
""", (ip,))
+ def queryByConditions(self, conditions):
+ """Query Devices using a list of condition dicts.
+
+ Each condition dict must have ``field``, ``operator``, and ``value`` keys.
+ Supported operators: ``equals``, ``contains``.
+
+ Returns a list of device dicts (may be empty). Only fields present in
+ the Devices schema are accepted; unrecognised fields are skipped with a
+ warning to prevent SQL injection.
+ """
+ clauses = []
+ params = []
+
+ for cond in conditions:
+ field = cond.get("field", "")
+ operator = cond.get("operator", "")
+ value = cond.get("value", "")
+
+ if field not in VALID_DEVICE_COLUMNS:
+ mylog("none", [f"[WF] queryByConditions: unknown field '{field}' β skipped"])
+ continue
+
+ # Normalize MAC values before comparison to match stored format
+ if field == "devMac" and value:
+ value = normalize_mac(value)
+
+ if operator == "equals":
+ clauses.append(f"{field} = ?")
+ params.append(value)
+ elif operator == "contains":
+ clauses.append(f"{field} LIKE ?")
+ params.append(f"%{value}%")
+ else:
+ mylog("none", [f"[WF] queryByConditions: unsupported operator '{operator}' β skipped"])
+ continue
+
+ if not clauses:
+ mylog("none", ["[WF] queryByConditions: no valid conditions β returning empty result"])
+ return []
+
+ where = " AND ".join(clauses)
+ return self._fetchall(f"SELECT * FROM Devices WHERE {where}", tuple(params))
+
def search(self, query):
like = f"%{query}%"
return self._fetchall("""
diff --git a/server/workflows/actions.py b/server/workflows/actions.py
index 6169ee49a..ff4fe3d95 100755
--- a/server/workflows/actions.py
+++ b/server/workflows/actions.py
@@ -1,13 +1,31 @@
import sqlite3
from logger import mylog, Logger
from helper import get_setting_value
+from front.plugins.plugin_helper import normalize_mac
from models.device_instance import DeviceInstance
from models.plugin_object_instance import PluginObjectInstance
+from workflows.constants import BOOLEAN_COLUMNS, TOKEN_RE
# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
+def interpolate_tokens(value, trigger_device):
+ """Replace every ``{{trigger.COLUMN}}`` placeholder in *value* with the
+ corresponding field from *trigger_device* (a plain dict).
+
+ Unknown columns are left as-is so callers can log them separately.
+ """
+ if not isinstance(value, str):
+ return value
+
+ def _replace(match):
+ col = match.group(1)
+ return str(trigger_device.get(col, match.group(0)))
+
+ return TOKEN_RE.sub(_replace, value)
+
+
class Action:
"""Base class for all actions."""
@@ -28,90 +46,122 @@ def execute(self):
class UpdateFieldAction(Action):
- """Action to update a specific field of an object."""
+ """Action to update a specific field of a device.
+
+ When *target_device* is supplied the action operates on that device rather
+ than the one that raised the event, enabling cross-device targeting (v2).
+ *trigger* is still required for context / logging.
+ """
- def __init__(self, db, field, value, trigger):
+ def __init__(self, db, field, value, trigger, target_device=None):
super().__init__(trigger)
self.field = field
self.value = value
self.db = db
+ self.target_device = target_device
def execute(self):
- mylog("verbose", f"[WF] Updating field '{self.field}' to '{self.value}' for event object {self.trigger.object_type}")
+ # Resolve the device to operate on
+ obj = self.target_device if self.target_device is not None else self.get_object()
- obj = self.get_object()
+ if isinstance(obj, sqlite3.Row):
+ obj = dict(obj)
if obj is None:
- mylog("none", "[WF] Object no longer exists")
+ mylog("none", "[WF] UpdateFieldAction: target device no longer exists")
return None
- if isinstance(obj, dict) and "objectGuid" in obj:
- mylog("debug", f"[WF] Updating Object '{obj}'")
+ # Interpolate {{trigger.X}} tokens in the value using the triggering device
+ trigger_obj = self.get_object() or {}
+ final_value = interpolate_tokens(self.value, trigger_obj)
- PluginObjectInstance().updateField(
- obj["objectGuid"],
- self.field,
- self.value,
- )
+ # Cast to int for boolean CHECK columns to satisfy SQLite constraints
+ if self.field in BOOLEAN_COLUMNS:
+ try:
+ final_value = int(final_value)
+ except (ValueError, TypeError):
+ mylog("none", [f"[WF] Cannot cast value '{final_value}' to int for boolean field '{self.field}' β skipping"])
+ return None
- return obj
+ mylog("verbose", f"[WF] Updating field '{self.field}' to '{final_value}' on device {obj.get('devGUID', '?')}")
- if isinstance(obj, dict) and "devGUID" in obj:
- mylog("debug", f"[WF] Updating Device '{obj}'")
+ if "objectGuid" in obj:
+ mylog("debug", f"[WF] Updating Object '{obj}'")
+ PluginObjectInstance().updateField(obj["objectGuid"], self.field, final_value)
+ return obj
- DeviceInstance().updateField(
- obj["devGUID"],
- self.field,
- self.value,
- )
+ if "devGUID" in obj:
+ # Guard: if mutating devMac, normalize the value and archive any
+ # existing device already holding that MAC before writing to avoid
+ # a PK UNIQUE constraint violation.
+ if self.field == "devMac":
+ final_value = normalize_mac(final_value)
+ self._archive_conflicting_mac(final_value, obj["devGUID"])
+ mylog("debug", f"[WF] Updating Device '{obj.get('devGUID')}'")
+ DeviceInstance().updateField(obj["devGUID"], self.field, final_value)
return obj
- mylog("none", f"[WF] Unsupported object format: {obj}")
-
+ mylog("none", f"[WF] UpdateFieldAction: unsupported object format: {obj}")
return None
+ def _archive_conflicting_mac(self, new_mac, current_guid):
+ """If another device already holds *new_mac*, archive it before the
+ primary-key mutation so SQLite's UNIQUE constraint is not violated."""
+ normalized = normalize_mac(new_mac)
+ existing = DeviceInstance().getByMac(normalized)
+ if existing and existing.get("devGUID") != current_guid:
+ mylog("none", [
+ f"[WF] Archiving conflicting device {existing['devGUID']} "
+ f"(MAC {normalized}) before devMac update"
+ ])
+ DeviceInstance().updateField(existing["devGUID"], "devIsArchived", 1)
+
class DeleteObjectAction(Action):
- """Action to delete an object."""
+ """Action to delete a device or plugin object.
- def __init__(self, db, trigger):
+ When *target_device* is supplied the action deletes that device rather than
+ the one that raised the event, enabling cross-device targeting (v2).
+ """
+
+ def __init__(self, db, trigger, target_device=None):
super().__init__(trigger)
self.db = db
+ self.target_device = target_device
def execute(self):
- mylog("verbose", f"[WF] Deleting event object {self.trigger.object_type}")
+ obj = self.target_device if self.target_device is not None else self.get_object()
- obj = self.get_object()
+ if isinstance(obj, sqlite3.Row):
+ obj = dict(obj)
if obj is None:
- mylog("none", "[WF] Object no longer exists")
+ mylog("none", "[WF] DeleteObjectAction: target device no longer exists")
return None
- if isinstance(obj, dict) and "objectGuid" in obj:
- mylog("debug", f"[WF] Deleting Object '{obj}'")
+ mylog("verbose", f"[WF] Deleting device {obj.get('devGUID', obj.get('objectGuid', '?'))}")
+ if "objectGuid" in obj:
+ mylog("debug", f"[WF] Deleting Object '{obj}'")
PluginObjectInstance().delete(obj["objectGuid"])
-
return obj
- if isinstance(obj, dict) and "devGUID" in obj:
- mylog("debug", f"[WF] Deleting Device '{obj}'")
-
+ if "devGUID" in obj:
+ mylog("debug", f"[WF] Deleting Device '{obj.get('devGUID')}'")
DeviceInstance().delete(obj["devGUID"])
-
return obj
- mylog("none", f"[WF] Unsupported object format: {obj}")
-
+ mylog("none", f"[WF] DeleteObjectAction: unsupported object format: {obj}")
return None
class RunPluginAction(Action):
"""Action to run a specific plugin."""
- def __init__(self, plugin_name, params, trigger):
+ def __init__(self, db, plugin_name, params, trigger):
super().__init__(trigger)
+ self.db = db
self.plugin_name = plugin_name
self.params = params
diff --git a/server/workflows/app_events.py b/server/workflows/app_events.py
index 236b16634..997256c91 100755
--- a/server/workflows/app_events.py
+++ b/server/workflows/app_events.py
@@ -162,7 +162,28 @@ def save(self):
self.db.commitDB()
-# Manage prefixes of column names
+# ---------------------------------------------------------------------------
+# AppEvents query helpers
+# ---------------------------------------------------------------------------
+
+def get_unprocessed(db):
+ """Return all unprocessed AppEvents rows ordered by creation time."""
+ return db.sql.execute("""
+ SELECT * FROM AppEvents
+ WHERE appEventProcessed = 0
+ ORDER BY dateTimeCreated ASC
+ """).fetchall()
+
+
+def mark_processed(db, event_index):
+ """Mark a single AppEvent row as processed and commit."""
+ db.sql.execute(
+ 'UPDATE AppEvents SET appEventProcessed = 1 WHERE "index" = ?',
+ (event_index,),
+ )
+ db.commitDB()
+
+
def manage_prefix(field, event):
if event == "delete":
return field.replace("NEW.", "OLD.")
diff --git a/server/workflows/constants.py b/server/workflows/constants.py
new file mode 100644
index 000000000..ea9cabae6
--- /dev/null
+++ b/server/workflows/constants.py
@@ -0,0 +1,41 @@
+"""
+Shared constants for the workflow engine.
+
+Centralised here so that manager, actions, and models can all import from a
+single source of truth rather than duplicating schema knowledge across files.
+"""
+
+import re
+
+# ---------------------------------------------------------------------------
+# Devices table column whitelist
+# ---------------------------------------------------------------------------
+
+# Every column present in the Devices table schema. Used in two ways:
+# 1. Token validation β {{trigger.COLUMN}} tokens are rejected at workflow
+# load time if COLUMN is not in this set.
+# 2. Query safety β queryByConditions() refuses to build WHERE clauses for
+# columns not in this set, preventing SQL injection via workflow JSON.
+VALID_DEVICE_COLUMNS = frozenset([
+ "devMac", "devName", "devOwner", "devType", "devVendor", "devFavorite",
+ "devGroup", "devComments", "devFirstConnection", "devLastConnection",
+ "devLastIP", "devPrimaryIPv4", "devPrimaryIPv6", "devVlan", "devForceStatus",
+ "devStaticIP", "devScan", "devLogEvents", "devAlertEvents", "devAlertDown",
+ "devSkipRepeated", "devLastNotification", "devPresentLastScan", "devIsNew",
+ "devLocation", "devIsArchived", "devParentMAC", "devParentPort",
+ "devParentRelType", "devIcon", "devGUID", "devSite", "devSSID",
+ "devSyncHubNode", "devSourcePlugin", "devFQDN", "devMacSource",
+ "devNameSource", "devFQDNSource", "devLastIPSource", "devVendorSource",
+ "devSSIDSource", "devParentMACSource", "devParentPortSource",
+ "devParentRelTypeSource", "devVlanSource", "devCustomProps",
+])
+
+# Devices table columns whose CHECK constraint requires a strict integer 0 or 1.
+# Values destined for these columns are cast to int before being written to DB.
+BOOLEAN_COLUMNS = frozenset([
+ "devFavorite", "devStaticIP", "devLogEvents", "devAlertEvents",
+ "devAlertDown", "devPresentLastScan", "devIsNew", "devIsArchived",
+])
+
+# Compiled regex for {{trigger.COLUMN_NAME}} token substitution and validation.
+TOKEN_RE = re.compile(r"\{\{trigger\.(\w+)\}\}")
diff --git a/server/workflows/manager.py b/server/workflows/manager.py
index 77ffa71c7..e171866fd 100755
--- a/server/workflows/manager.py
+++ b/server/workflows/manager.py
@@ -1,11 +1,15 @@
import json
+import sqlite3
from const import fullConfFolder
from logger import mylog, Logger
from helper import get_setting_value
+from models.device_instance import DeviceInstance
+from workflows.constants import VALID_DEVICE_COLUMNS, TOKEN_RE
+from workflows.app_events import get_unprocessed, mark_processed
from workflows.triggers import Trigger
from workflows.conditions import ConditionGroup
-from workflows.actions import DeleteObjectAction, RunPluginAction, UpdateFieldAction
+from workflows.actions import DeleteObjectAction, RunPluginAction, UpdateFieldAction, interpolate_tokens
# Make sure log level is initialized correctly
@@ -17,140 +21,185 @@ def __init__(self, db):
self.db = db
self.workflows = self.load_workflows()
self.update_api = False
+ # Tracks devGUIDs mutated by workflow actions within the current event batch.
+ # Events whose objectGuid appears here are skipped to prevent cascade loops.
+ # Cleared at the start of each new event batch via get_new_app_events().
+ self._mutated_guids = set()
+
+ # -------------------------------------------------------------------------
+ # Token validation
+
+ def _validate_workflow_tokens(self, workflow):
+ """Recursively scan a workflow dict for {{trigger.X}} tokens.
+ Returns True if every token maps to a valid Devices column."""
+ def _scan(node):
+ if isinstance(node, str):
+ for col in TOKEN_RE.findall(node):
+ if col not in VALID_DEVICE_COLUMNS:
+ mylog("none", [
+ f"[WF] Invalid token '{{{{trigger.{col}}}}}' in workflow "
+ f"'{workflow.get('name', '?')}' β must be a valid Devices column"
+ ])
+ return False
+ return True
+ if isinstance(node, dict):
+ return all(_scan(v) for v in node.values())
+ if isinstance(node, list):
+ return all(_scan(item) for item in node)
+ return True
+
+ return _scan(workflow)
+
+ # -------------------------------------------------------------------------
+ # Loading
def load_workflows(self):
- """Load workflows from workflows.json."""
+ """Load workflows from workflows.json, rejecting any with invalid tokens."""
try:
workflows_json_path = fullConfFolder + "/workflows.json"
with open(workflows_json_path, "r") as f:
- workflows = json.load(f)
- return workflows
+ raw = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
mylog("none", ["[WF] Failed to load workflows.json"])
return []
+ valid = []
+ for wf in raw:
+ if self._validate_workflow_tokens(wf):
+ valid.append(wf)
+ else:
+ mylog("none", [f"[WF] Workflow '{wf.get('name', '?')}' rejected β contains invalid trigger tokens"])
+ return valid
+
+ # -------------------------------------------------------------------------
+ # Event fetching
+
def get_new_app_events(self):
- """Get new unprocessed events from the AppEvents table."""
- result = self.db.sql.execute("""
- SELECT * FROM AppEvents
- WHERE appEventProcessed = 0
- ORDER BY dateTimeCreated ASC
- """).fetchall()
+ """Get new unprocessed events from the AppEvents table.
+ Resets _mutated_guids to start a fresh cascade-prevention window for this batch."""
+ self._mutated_guids.clear()
+
+ result = get_unprocessed(self.db)
mylog("none", [f"[WF] get_new_app_events - new events count: {len(result)}"])
return result
- def process_event(self, event):
- """Process the events. Check if events match a workflow trigger"""
+ # -------------------------------------------------------------------------
+ # Event processing
+ def process_event(self, event):
+ """Process one AppEvent against all enabled workflows."""
evGuid = event["guid"]
+ obj_guid = event["objectGuid"]
+
+ # Cascade prevention: skip events for devices already mutated this batch
+ if obj_guid in self._mutated_guids:
+ mylog("debug", [f"[WF] Skipping event {evGuid} β device {obj_guid} was mutated by a workflow in this batch"])
+ mark_processed(self.db, event["index"])
+ return
mylog("verbose", [f"[WF] Processing event with GUID {evGuid}"])
- # Check if the trigger conditions match
for workflow in self.workflows:
- # Ensure workflow is enabled before proceeding
if workflow.get("enabled", "No").lower() == "yes":
wfName = workflow["name"]
mylog("debug", f"[WF] Checking if '{evGuid}' triggers the workflow '{wfName}'")
- # construct trigger object which also evaluates if the current event triggers it
trigger = Trigger(workflow["trigger"], event, self.db)
if trigger.triggered:
mylog("verbose", f"[WF] Event with GUID '{evGuid}' triggered the workflow '{wfName}'")
-
self.execute_workflow(workflow, trigger)
- # After processing the event, mark the event as processed (set AppEventProcessed to 1)
- self.db.sql.execute(
- """
- UPDATE AppEvents
- SET appEventProcessed = 1
- WHERE "index" = ?
- """,
- (event["index"],),
- ) # Pass the event's unique identifier
- self.db.commitDB()
+ mark_processed(self.db, event["index"])
- def execute_workflow(self, workflow, trigger):
- """Execute the actions in the given workflow if conditions are met."""
+ # -------------------------------------------------------------------------
+ # Workflow execution
+ def execute_workflow(self, workflow, trigger):
+ """Execute workflow actions if any condition group evaluates to True."""
wfName = workflow["name"]
- # Ensure conditions exist
if not isinstance(workflow.get("conditions"), list):
m = "[WF] workflow['conditions'] must be a list"
mylog("none", [m])
raise ValueError(m)
- # Evaluate each condition group separately
for condition_group in workflow["conditions"]:
evaluator = ConditionGroup(condition_group)
-
- if evaluator.evaluate(trigger): # If any group evaluates to True
+ if evaluator.evaluate(trigger):
mylog("none", f"[WF] Workflow {wfName} will be executed - conditions were evaluated as TRUE")
mylog("debug", [f"[WF] Workflow condition_group: {condition_group}"])
-
self.execute_actions(workflow["actions"], trigger)
- return # Stop if a condition group succeeds
+ return
mylog("none", ["[WF] No condition group matched. Actions not executed."])
+ def _resolve_target_devices(self, action, trigger_device):
+ """Return the list of device dicts that the action should be applied to.
+
+ - No ``target`` key or ``strategy == "triggering_device"`` β legacy behaviour,
+ targets only the device that raised the event.
+ - ``strategy == "query"`` β query the Devices table using the action's
+ nested conditions (with {{trigger.X}} tokens already interpolated).
+ """
+ target_block = action.get("target", {})
+ strategy = target_block.get("strategy", "triggering_device")
+
+ if strategy == "triggering_device":
+ return [trigger_device] if trigger_device is not None else []
+
+ if strategy == "query":
+ raw_conditions = target_block.get("conditions", [])
+ compiled_conditions = []
+ for cond in raw_conditions:
+ compiled = dict(cond)
+ compiled["value"] = interpolate_tokens(cond["value"], trigger_device or {})
+ compiled_conditions.append(compiled)
+ return DeviceInstance().queryByConditions(compiled_conditions)
+
+ mylog("none", [f"[WF] Unknown target strategy '{strategy}' β skipping action"])
+ return []
+
def execute_actions(self, actions, trigger):
- """Execute the actions defined in a workflow."""
+ """Execute all actions defined in a workflow against their resolved targets."""
+ # Normalise trigger object to a plain dict for token operations
+ trigger_obj = trigger.object
+ if isinstance(trigger_obj, sqlite3.Row):
+ trigger_obj = dict(trigger_obj)
for action in actions:
- if action["type"] == "update_field":
- field = action["field"]
- value = action["value"]
- action_instance = UpdateFieldAction(self.db, field, value, trigger)
- # indicate if the api has to be updated
- self.update_api = True
-
- elif action["type"] == "run_plugin":
- plugin_name = action["plugin"]
- params = action["params"]
- action_instance = RunPluginAction(self.db, plugin_name, params, trigger)
-
- elif action["type"] == "delete_device":
- action_instance = DeleteObjectAction(self.db, trigger)
-
- # elif action["type"] == "send_notification":
- # method = action["method"]
- # message = action["message"]
- # action_instance = SendNotificationAction(method, message, trigger)
+ action_type = action["type"]
- else:
- m = f"[WF] Unsupported action type: {action['type']}"
- mylog("none", [m])
- raise ValueError(m)
-
- action_instance.execute() # Execute the action
-
- # if result:
- # # Iterate through actions and execute them
- # for action in workflow["actions"]:
- # if action["type"] == "update_field":
- # # Action type is "update_field", so map to UpdateFieldAction
- # field = action["field"]
- # value = action["value"]
- # action_instance = UpdateFieldAction(field, value)
- # action_instance.execute(trigger.event)
-
- # elif action["type"] == "run_plugin":
- # # Action type is "run_plugin", so map to RunPluginAction
- # plugin_name = action["plugin"]
- # params = action["params"]
- # action_instance = RunPluginAction(plugin_name, params)
- # action_instance.execute(trigger.event)
- # elif action["type"] == "send_notification":
- # # Action type is "send_notification", so map to SendNotificationAction
- # method = action["method"]
- # message = action["message"]
- # action_instance = SendNotificationAction(method, message)
- # action_instance.execute(trigger.event)
- # else:
- # # Handle unsupported action types
- # raise ValueError(f"Unsupported action type: {action['type']}")
+ # run_plugin does not support query targeting β always uses the trigger context
+ if action_type == "run_plugin":
+ RunPluginAction(self.db, action["plugin"], action["params"], trigger).execute()
+ continue
+
+ target_devices = self._resolve_target_devices(action, trigger_obj)
+
+ if not target_devices:
+ mylog("debug", [f"[WF] No target devices matched for action '{action_type}'"])
+ continue
+
+ for target_device in target_devices:
+ if action_type == "update_field":
+ action_instance = UpdateFieldAction(
+ self.db, action["field"], action["value"], trigger, target_device
+ )
+ self.update_api = True
+
+ elif action_type == "delete_device":
+ action_instance = DeleteObjectAction(self.db, trigger, target_device)
+
+ else:
+ m = f"[WF] Unsupported action type: {action_type}"
+ mylog("none", [m])
+ raise ValueError(m)
+
+ action_instance.execute()
+
+ # Record this device's GUID so cascade events are suppressed in this batch
+ if isinstance(target_device, dict) and target_device.get("devGUID"):
+ self._mutated_guids.add(target_device["devGUID"])
diff --git a/test/backend/test_workflows.py b/test/backend/test_workflows.py
new file mode 100644
index 000000000..5a39d78af
--- /dev/null
+++ b/test/backend/test_workflows.py
@@ -0,0 +1,403 @@
+"""
+Unit tests for Workflow Engine v2 β cross-device targeting.
+
+Covers:
+ - interpolate_tokens()
+ - WorkflowManager.VALID_DEVICE_COLUMNS token validation
+ - WorkflowManager._validate_workflow_tokens()
+ - WorkflowManager.load_workflows() rejects invalid-token workflows
+ - DeviceInstance.queryByConditions()
+ - UpdateFieldAction boolean column casting
+ - UpdateFieldAction _archive_conflicting_mac guard
+ - WorkflowManager._mutated_guids cascade prevention
+"""
+
+import sys
+import os
+import json
+import tempfile
+import unittest
+from unittest.mock import patch, MagicMock
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+from db_test_helpers import make_db, make_device_dict, insert_device_from_dict
+
+
+# ---------------------------------------------------------------------------
+# Shared test helpers
+# ---------------------------------------------------------------------------
+
+def _make_app_event(guid="evt-guid-1", obj_guid="dev-guid-1", obj_type="Devices",
+ event_type="update", index=1):
+ """Return a dict mimicking an AppEvents sqlite3.Row."""
+ return {
+ "guid": guid,
+ "objectGuid": obj_guid,
+ "objectType": obj_type,
+ "appEventType": event_type,
+ "appEventProcessed": 0,
+ "index": index,
+ }
+
+
+def make_stub_manager():
+ """Return a WorkflowManager with a mock DB and no workflows loaded."""
+ from workflows.manager import WorkflowManager
+ db = MagicMock()
+ db.sql = MagicMock()
+ db.sql.execute.return_value.fetchall.return_value = []
+ db.commitDB = MagicMock()
+ with patch.object(WorkflowManager, "load_workflows", return_value=[]):
+ mgr = WorkflowManager(db)
+ return mgr
+
+
+# ---------------------------------------------------------------------------
+# interpolate_tokens
+# ---------------------------------------------------------------------------
+
+class TestInterpolateTokens(unittest.TestCase):
+
+ def setUp(self):
+ from workflows.actions import interpolate_tokens
+ self.interpolate = interpolate_tokens
+
+ def test_replaces_known_token(self):
+ device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
+ result = self.interpolate("{{trigger.devLastIP}}", device)
+ self.assertEqual(result, "10.0.0.5")
+
+ def test_replaces_multiple_tokens(self):
+ device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
+ result = self.interpolate("ip={{trigger.devLastIP}} mac={{trigger.devMac}}", device)
+ self.assertEqual(result, "ip=10.0.0.5 mac=aa:bb:cc:dd:ee:ff")
+
+ def test_leaves_unknown_token_unchanged(self):
+ device = {"devLastIP": "10.0.0.5"}
+ result = self.interpolate("{{trigger.doesNotExist}}", device)
+ self.assertEqual(result, "{{trigger.doesNotExist}}")
+
+ def test_non_string_value_returned_as_is(self):
+ device = {}
+ self.assertEqual(self.interpolate(42, device), 42)
+ self.assertIsNone(self.interpolate(None, device))
+
+ def test_empty_device_dict_leaves_token_unchanged(self):
+ result = self.interpolate("{{trigger.devMac}}", {})
+ self.assertEqual(result, "{{trigger.devMac}}")
+
+
+# ---------------------------------------------------------------------------
+# Token validation
+# ---------------------------------------------------------------------------
+
+class TestValidateWorkflowTokens(unittest.TestCase):
+
+ def test_valid_token_passes(self):
+ mgr = make_stub_manager()
+ wf = {"name": "test", "actions": [{"value": "{{trigger.devLastIP}}"}]}
+ self.assertTrue(mgr._validate_workflow_tokens(wf))
+
+ def test_invalid_token_fails(self):
+ mgr = make_stub_manager()
+ wf = {"name": "test", "actions": [{"value": "{{trigger.ip_address}}"}]}
+ self.assertFalse(mgr._validate_workflow_tokens(wf))
+
+ def test_nested_invalid_token_fails(self):
+ mgr = make_stub_manager()
+ wf = {
+ "name": "test",
+ "actions": [{
+ "target": {
+ "conditions": [{"value": "{{trigger.bad_field}}"}]
+ }
+ }]
+ }
+ self.assertFalse(mgr._validate_workflow_tokens(wf))
+
+ def test_no_tokens_passes(self):
+ mgr = make_stub_manager()
+ wf = {"name": "test", "conditions": [], "actions": [{"value": "static"}]}
+ self.assertTrue(mgr._validate_workflow_tokens(wf))
+
+
+class TestLoadWorkflowsRejectsInvalidTokens(unittest.TestCase):
+
+ def _make_manager_loading(self, raw_workflows):
+ """Build a WorkflowManager whose load_workflows() reads from a temp file."""
+ import workflows.manager as wf_mod
+ from workflows.manager import WorkflowManager
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ wf_path = os.path.join(tmpdir, "workflows.json")
+ with open(wf_path, "w") as f:
+ json.dump(raw_workflows, f)
+
+ orig = wf_mod.fullConfFolder
+ wf_mod.fullConfFolder = tmpdir
+ try:
+ db = MagicMock()
+ with patch.object(WorkflowManager, "load_workflows", return_value=[]):
+ mgr = WorkflowManager(db)
+ mgr.workflows = mgr.load_workflows()
+ finally:
+ wf_mod.fullConfFolder = orig
+ return mgr
+
+ def test_valid_workflow_loaded(self):
+ wf = {
+ "name": "Valid WF", "enabled": "Yes",
+ "trigger": {"object_type": "Devices", "event_type": "update"},
+ "conditions": [],
+ "actions": [{"type": "update_field", "field": "devIsNew",
+ "value": "{{trigger.devLastIP}}"}]
+ }
+ mgr = self._make_manager_loading([wf])
+ self.assertEqual(len(mgr.workflows), 1)
+
+ def test_invalid_token_workflow_rejected(self):
+ wf = {
+ "name": "Bad WF", "enabled": "Yes",
+ "trigger": {"object_type": "Devices", "event_type": "update"},
+ "conditions": [],
+ "actions": [{"type": "update_field", "field": "devIsNew",
+ "value": "{{trigger.nonexistent_field}}"}]
+ }
+ mgr = self._make_manager_loading([wf])
+ self.assertEqual(len(mgr.workflows), 0)
+
+
+# ---------------------------------------------------------------------------
+# DeviceInstance.queryByConditions
+# ---------------------------------------------------------------------------
+
+class TestQueryByConditions(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = make_db()
+ dev_a = make_device_dict("aa:bb:cc:dd:ee:01", devLastIP="192.168.1.10",
+ devGUID="guid-a", devIsArchived=0)
+ dev_b = make_device_dict("aa:bb:cc:dd:ee:02", devLastIP="192.168.1.10",
+ devGUID="guid-b", devIsArchived=0)
+ dev_c = make_device_dict("aa:bb:cc:dd:ee:03", devLastIP="192.168.1.20",
+ devGUID="guid-c", devIsArchived=0)
+ for d in [dev_a, dev_b, dev_c]:
+ insert_device_from_dict(self.conn, d)
+
+ def _instance(self):
+ from models.device_instance import DeviceInstance
+ inst = DeviceInstance()
+ # Patch _fetchall to use our in-memory connection
+ def _fetchall(q, p=()):
+ rows = self.conn.execute(q, p).fetchall()
+ return [dict(r) for r in rows]
+ inst._fetchall = _fetchall
+ return inst
+
+ def test_equals_returns_matching_devices(self):
+ inst = self._instance()
+ results = inst.queryByConditions([
+ {"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"}
+ ])
+ macs = {r["devMac"] for r in results}
+ self.assertIn("aa:bb:cc:dd:ee:01", macs)
+ self.assertIn("aa:bb:cc:dd:ee:02", macs)
+ self.assertNotIn("aa:bb:cc:dd:ee:03", macs)
+
+ def test_multiple_conditions_and_logic(self):
+ inst = self._instance()
+ results = inst.queryByConditions([
+ {"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"},
+ {"field": "devMac", "operator": "equals", "value": "aa:bb:cc:dd:ee:01"},
+ ])
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]["devMac"], "aa:bb:cc:dd:ee:01")
+
+ def test_contains_operator(self):
+ inst = self._instance()
+ results = inst.queryByConditions([
+ {"field": "devLastIP", "operator": "contains", "value": "192.168.1"}
+ ])
+ self.assertEqual(len(results), 3)
+
+ def test_empty_conditions_returns_empty(self):
+ inst = self._instance()
+ results = inst.queryByConditions([])
+ self.assertEqual(results, [])
+
+ def test_unknown_field_skipped_returns_empty(self):
+ inst = self._instance()
+ results = inst.queryByConditions([
+ {"field": "nonexistent_column", "operator": "equals", "value": "x"}
+ ])
+ self.assertEqual(results, [])
+
+ def test_unknown_operator_skipped_returns_empty(self):
+ inst = self._instance()
+ results = inst.queryByConditions([
+ {"field": "devLastIP", "operator": "regex", "value": ".*"}
+ ])
+ self.assertEqual(results, [])
+
+
+# ---------------------------------------------------------------------------
+# UpdateFieldAction β boolean cast
+# ---------------------------------------------------------------------------
+
+class TestUpdateFieldActionBooleanCast(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = make_db()
+ dev = make_device_dict("aa:bb:cc:dd:ee:ff", devGUID="guid-1", devIsArchived=0)
+ insert_device_from_dict(self.conn, dev)
+
+ def _make_action(self, field, value, target_device):
+ from workflows.actions import UpdateFieldAction
+ trigger = MagicMock()
+ trigger.object = None
+ trigger.object_type = "Devices"
+ db = MagicMock()
+
+ action = UpdateFieldAction(db, field, value, trigger, target_device)
+
+ # Patch DeviceInstance.updateField to capture what value is written
+ self.written_value = None
+ def fake_update(guid, f, v):
+ self.written_value = v
+ with patch("workflows.actions.DeviceInstance") as MockDI:
+ MockDI.return_value.updateField.side_effect = fake_update
+ action.execute()
+
+ return self.written_value
+
+ def test_string_one_cast_to_int_for_boolean_column(self):
+ target = {"devGUID": "guid-1", "devIsArchived": 0}
+ written = self._make_action("devIsArchived", "1", target)
+ self.assertEqual(written, 1)
+ self.assertIsInstance(written, int)
+
+ def test_string_zero_cast_to_int_for_boolean_column(self):
+ target = {"devGUID": "guid-1", "devIsArchived": 1}
+ written = self._make_action("devIsArchived", "0", target)
+ self.assertEqual(written, 0)
+ self.assertIsInstance(written, int)
+
+ def test_non_boolean_column_not_cast(self):
+ target = {"devGUID": "guid-1", "devName": "OldName"}
+ written = self._make_action("devName", "NewName", target)
+ self.assertEqual(written, "NewName")
+ self.assertIsInstance(written, str)
+
+ def test_invalid_boolean_value_skips_update(self):
+ target = {"devGUID": "guid-1", "devIsArchived": 0}
+ written = self._make_action("devIsArchived", "not_an_int", target)
+ self.assertIsNone(written)
+
+
+# ---------------------------------------------------------------------------
+# UpdateFieldAction β devMac conflict archive guard
+# ---------------------------------------------------------------------------
+
+class TestUpdateFieldActionMacGuard(unittest.TestCase):
+
+ def test_conflicting_mac_device_archived(self):
+ from workflows.actions import UpdateFieldAction
+ trigger = MagicMock()
+ trigger.object = None
+ db = MagicMock()
+
+ conflicting = {"devGUID": "guid-conflict", "devMac": "aa:bb:cc:dd:ee:ff"}
+ current_guid = "guid-current"
+ target_device = {"devGUID": current_guid, "devMac": "11:22:33:44:55:66"}
+
+ action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
+
+ archived_guid = None
+ def fake_update(guid, field, value):
+ nonlocal archived_guid
+ if field == "devIsArchived":
+ archived_guid = guid
+
+ with patch("workflows.actions.DeviceInstance") as MockDI:
+ MockDI.return_value.getByMac.return_value = conflicting
+ MockDI.return_value.updateField.side_effect = fake_update
+ action.execute()
+
+ self.assertEqual(archived_guid, "guid-conflict")
+
+ def test_no_conflicting_mac_no_archive(self):
+ from workflows.actions import UpdateFieldAction
+ trigger = MagicMock()
+ trigger.object = None
+ db = MagicMock()
+
+ target_device = {"devGUID": "guid-current", "devMac": "11:22:33:44:55:66"}
+ action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
+
+ archived_guid = None
+ def fake_update(guid, field, value):
+ nonlocal archived_guid
+ if field == "devIsArchived":
+ archived_guid = guid
+
+ with patch("workflows.actions.DeviceInstance") as MockDI:
+ MockDI.return_value.getByMac.return_value = None
+ MockDI.return_value.updateField.side_effect = fake_update
+ action.execute()
+
+ self.assertIsNone(archived_guid)
+
+
+# ---------------------------------------------------------------------------
+# Cascade prevention β _mutated_guids
+# ---------------------------------------------------------------------------
+
+class TestCascadePrevention(unittest.TestCase):
+
+ def test_mutated_guid_blocks_event(self):
+ mgr = make_stub_manager()
+ mgr._mutated_guids.add("dev-guid-42")
+
+ event = _make_app_event(guid="evt-1", obj_guid="dev-guid-42")
+ # Make event dict-accessible
+ event = MagicMock()
+ event.__getitem__ = lambda s, k: {"guid": "evt-1", "objectGuid": "dev-guid-42",
+ "index": 1}[k]
+
+ # process_event should skip without calling execute_workflow
+ with patch.object(mgr, "execute_workflow") as mock_exec:
+ mgr.process_event(event)
+ mock_exec.assert_not_called()
+
+ def test_get_new_app_events_clears_mutated_guids(self):
+ mgr = make_stub_manager()
+ mgr._mutated_guids.add("some-guid")
+
+ mgr.db.sql.execute.return_value.fetchall.return_value = []
+ mgr.get_new_app_events()
+
+ self.assertEqual(len(mgr._mutated_guids), 0)
+
+ def test_execute_actions_adds_to_mutated_guids(self):
+ mgr = make_stub_manager()
+
+ target_device = {"devGUID": "guid-mutated", "devIsArchived": 0}
+
+ actions = [{"type": "update_field", "field": "devIsArchived", "value": "1"}]
+
+ trigger = MagicMock()
+ trigger.object = None
+
+ with patch("workflows.manager.DeviceInstance"), \
+ patch("workflows.actions.DeviceInstance") as MockDI:
+ MockDI.return_value.updateField = MagicMock()
+ with patch.object(mgr, "_resolve_target_devices", return_value=[target_device]):
+ mgr.execute_actions(actions, trigger)
+
+ self.assertIn("guid-mutated", mgr._mutated_guids)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/db_test_helpers.py b/test/db_test_helpers.py
index c61d2ec94..afb941dbe 100644
--- a/test/db_test_helpers.py
+++ b/test/db_test_helpers.py
@@ -377,6 +377,28 @@ def down_event_macs(cur) -> set:
return {r["eveMac"].lower() for r in cur.fetchall()}
+def insert_device_from_dict(conn: sqlite3.Connection, device: dict) -> None:
+ """Insert a device dict (as produced by make_device_dict) into Devices.
+
+ Uses INSERT OR IGNORE so duplicate MACs are silently skipped. Accepts any
+ subset of Devices columns β only keys present in the table are written.
+ """
+ cur = conn.cursor()
+ cur.execute("PRAGMA table_info(Devices)")
+ db_columns = {row[1] for row in cur.fetchall()}
+
+ cols = [k for k in device.keys() if k in db_columns]
+ placeholders = ", ".join("?" for _ in cols)
+ col_list = ", ".join(cols)
+ values = [device[c] for c in cols]
+
+ cur.execute(
+ f"INSERT OR IGNORE INTO Devices ({col_list}) VALUES ({placeholders})",
+ values,
+ )
+ conn.commit()
+
+
# ---------------------------------------------------------------------------
# DummyDB β minimal wrapper used by scan.session_events helpers
# ---------------------------------------------------------------------------