Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
commonfields:
id: AppendADContainmentDescriptionNote
version: -1
name: AppendADContainmentDescriptionNote
script: "import traceback\nfrom datetime import datetime\nfrom typing import Any\n\n\nOUTPUT_PREFIX = \"CompromisedUser.ADDescriptionUpdate\"\
\nSEPARATOR = \" | \"\n\n\ndef str_arg(args: dict[str, Any], name: str, default: str = \"\") -> str:\n value = args.get(name,\
\ default)\n if value is None:\n return default\n return str(value).strip()\n\n\ndef bool_arg(args: dict[str,\
\ Any], name: str, default: bool = False) -> bool:\n value = args.get(name)\n if value is None:\n return default\n\
\ if isinstance(value, bool):\n return value\n return str(value).strip().lower() in (\"true\", \"yes\", \"\
y\", \"1\")\n\n\ndef parse_args(args: dict[str, Any]) -> dict[str, Any]:\n user_identifier = str_arg(args, \"user_identifier\"\
)\n containment_reason = str_arg(args, \"containment_reason\")\n\n if not user_identifier:\n raise DemistoException(\"\
user_identifier is required and cannot be blank.\")\n if not containment_reason:\n raise DemistoException(\"containment_reason\
\ is required and cannot be blank.\")\n\n return {\n \"user_identifier\": user_identifier,\n \"existing_description\"\
: str_arg(args, \"existing_description\"),\n \"containment_reason\": containment_reason,\n \"case_id\": str_arg(args,\
\ \"case_id\"),\n \"analyst\": str_arg(args, \"analyst\"),\n \"dry_run\": bool_arg(args, \"dry_run\", False),\n\
\ \"prevent_duplicate_note\": bool_arg(args, \"prevent_duplicate_note\", True),\n }\n\n\ndef build_note(containment_reason:\
\ str, case_id: str, analyst: str) -> str:\n timestamp = datetime.utcnow().strftime(\"%Y-%m-%d %H:%M UTC\")\n note\
\ = f\"XSIAM containment {timestamp}: Account marked compromised. Reason: {containment_reason}.\"\n if case_id:\n \
\ note += f\" Case: {case_id}.\"\n if analyst:\n note += f\" Analyst: {analyst}.\"\n return note\n\n\n\
def is_duplicate(existing_description: str, note: str, case_id: str) -> bool:\n if not existing_description:\n \
\ return False\n\n if note in existing_description:\n return True\n\n if case_id and f\"Case: {case_id}.\"\
\ in existing_description and \"XSIAM containment\" in existing_description:\n return True\n\n return False\n\n\
\ndef append_description(existing_description: str, note: str) -> str:\n if existing_description:\n return f\"\
{existing_description}{SEPARATOR}{note}\"\n return note\n\n\ndef update_ad_description(user_identifier: str, new_description:\
\ str) -> str:\n result = demisto.executeCommand(\n \"ad-update-user\",\n {\n \"username\":\
\ user_identifier,\n \"attribute-name\": \"description\",\n \"attribute-value\": new_description,\n\
\ },\n )\n\n if is_error(result):\n raise DemistoException(get_error(result))\n\n return \"Updated\"\
\n\n\ndef main():\n try:\n args = parse_args(demisto.args())\n\n note = build_note(\n args[\"\
containment_reason\"],\n args[\"case_id\"],\n args[\"analyst\"],\n )\n\n duplicate_prevented\
\ = False\n if args[\"prevent_duplicate_note\"] and is_duplicate(args[\"existing_description\"], note, args[\"case_id\"\
]):\n new_description = args[\"existing_description\"]\n duplicate_prevented = True\n update_status\
\ = \"Skipped - duplicate containment note detected\"\n else:\n new_description = append_description(args[\"\
existing_description\"], note)\n update_status = \"Dry run - AD not updated\" if args[\"dry_run\"] else update_ad_description(\n\
\ args[\"user_identifier\"],\n new_description,\n )\n\n outputs = {\n \
\ \"UserIdentifier\": args[\"user_identifier\"],\n \"OldDescription\": args[\"existing_description\"\
],\n \"AppendedNote\": \"\" if duplicate_prevented else note,\n \"NewDescription\": new_description,\n\
\ \"DryRun\": args[\"dry_run\"],\n \"PreventDuplicateNote\": args[\"prevent_duplicate_note\"],\n \
\ \"DuplicatePrevented\": duplicate_prevented,\n \"UpdateStatus\": update_status,\n \"CaseID\"\
: args[\"case_id\"],\n \"Analyst\": args[\"analyst\"],\n }\n\n readable = tableToMarkdown(\n \
\ \"AD Description Containment Note Update\",\n outputs,\n headers=[\n \"\
UserIdentifier\",\n \"OldDescription\",\n \"AppendedNote\",\n \"NewDescription\"\
,\n \"DryRun\",\n \"DuplicatePrevented\",\n \"UpdateStatus\",\n \
\ ],\n )\n\n return_results(CommandResults(\n readable_output=readable,\n outputs_prefix=OUTPUT_PREFIX,\n\
\ outputs_key_field=\"UserIdentifier\",\n outputs=outputs,\n ))\n\n except Exception as\
\ ex:\n demisto.error(traceback.format_exc())\n return_error(f\"Failed to append AD containment note. Error:\
\ {ex}\")\n\n\nif __name__ in (\"__main__\", \"__builtin__\", \"builtins\"):\n main()"
type: python
tags:
- active directory
- containment
- compromised user
comment: Appends a timestamped XSIAM containment note to an Active Directory user's description attribute using ad-update-user.
enabled: true
args:
- supportedModules: []
name: user_identifier
required: true
description: AD user identifier. Passed to ad-update-user as username; use sAMAccountName when required by the AD integration.
type: String
- supportedModules: []
name: existing_description
description: Current AD description value, usually from ad-get-user.
type: String
- supportedModules: []
name: containment_reason
required: true
description: Reason the account is being contained.
type: String
- supportedModules: []
name: case_id
description: XSIAM incident or case ID to include in the containment note.
type: String
- supportedModules: []
name: analyst
description: Analyst name or username to include in the containment note.
type: String
- supportedModules: []
name: dry_run
description: When true, do not update AD. Return only the planned description.
defaultValue: 'true'
type: Boolean
- supportedModules: []
name: prevent_duplicate_note
description: Avoid appending the same case ID or containment note more than once.
defaultValue: 'true'
type: Boolean
outputs:
- contextPath: CompromisedUser.ADDescriptionUpdate.UserIdentifier
description: AD user identifier supplied to the automation.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.OldDescription
description: Original AD description value.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.AppendedNote
description: Containment note appended to the AD description.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.NewDescription
description: Final AD description value or planned value during dry run.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.DryRun
description: Whether dry-run mode was used.
type: boolean
- contextPath: CompromisedUser.ADDescriptionUpdate.PreventDuplicateNote
description: Whether duplicate prevention was enabled.
type: boolean
- contextPath: CompromisedUser.ADDescriptionUpdate.DuplicatePrevented
description: Whether an existing containment note prevented a duplicate append.
type: boolean
- contextPath: CompromisedUser.ADDescriptionUpdate.UpdateStatus
description: AD update status.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.CaseID
description: Case ID included in the note.
type: string
- contextPath: CompromisedUser.ADDescriptionUpdate.Analyst
description: Analyst included in the note.
type: string
scripttarget: 0
subtype: python3
runonce: false
dockerimage: demisto/python3:3.11.10.113941
runas: DBotWeakRole
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
commonfields:
id: BuildCompromisedUserContainmentVerdict
version: -1
name: BuildCompromisedUserContainmentVerdict
script: "import json\nimport traceback\nfrom typing import Any\n\n\nOUTPUT_PREFIX = \"CompromisedUserContainment.Verdict\"\
\n\nSUCCESS_VALUES = {\"success\", \"succeeded\", \"successful\", \"contained\", \"completed\", \"done\", \"true\"}\nFAILED_VALUES\
\ = {\"failed\", \"failure\", \"error\", \"critical_failed\", \"false\"}\nSKIPPED_VALUES = {\"skipped\", \"not_found\",\
\ \"no_account\", \"not_applicable\", \"n/a\", \"none\"}\nUNKNOWN_VALUES = {\"unknown\", \"missing\", \"pending\", \"\"\
}\n\n\ndef parse_json_arg(value: Any) -> Any:\n if value in (None, \"\", \"null\"):\n return {}\n if isinstance(value,\
\ (dict, list)):\n return value\n if isinstance(value, str):\n try:\n return json.loads(value)\n\
\ except Exception:\n return {\"status\": value}\n return {\"status\": str(value)}\n\n\ndef normalize_status(value:\
\ Any) -> str:\n if isinstance(value, bool):\n return \"success\" if value else \"failed\"\n text = str(value\
\ or \"\").strip().lower().replace(\" \", \"_\")\n if text in SUCCESS_VALUES:\n return \"success\"\n if text\
\ in FAILED_VALUES:\n return \"failed\"\n if text in SKIPPED_VALUES:\n return \"skipped\"\n if text\
\ in UNKNOWN_VALUES:\n return \"unknown\"\n return \"unknown\"\n\n\ndef extract_status(obj: Any) -> str:\n \
\ if not isinstance(obj, dict):\n return normalize_status(obj)\n\n for key in (\"status\", \"containment_status\"\
, \"result\", \"state\"):\n if key in obj:\n return normalize_status(obj.get(key))\n\n for key in (\"\
success\", \"succeeded\", \"contained\"):\n if key in obj:\n return normalize_status(obj.get(key))\n\n\
\ if obj.get(\"skipped\") is True or obj.get(\"account_found\") is False or obj.get(\"exists\") is False:\n return\
\ \"skipped\"\n\n return \"unknown\"\n\n\ndef action_name(default_name: str, obj: Any) -> str:\n if isinstance(obj,\
\ dict):\n return str(obj.get(\"action\") or obj.get(\"name\") or obj.get(\"platform\") or default_name)\n return\
\ default_name\n\n\ndef is_critical(obj: Any) -> bool:\n if not isinstance(obj, dict):\n return True\n return\
\ bool(obj.get(\"critical\", True))\n\n\ndef collect_action(default_name: str, obj: Any) -> dict[str, Any]:\n return\
\ {\n \"name\": action_name(default_name, obj),\n \"status\": extract_status(obj),\n \"critical\":\
\ is_critical(obj),\n }\n\n\ndef collect_actions(default_name: str, obj: Any) -> list[dict[str, Any]]:\n if isinstance(obj,\
\ list):\n return [collect_action(default_name, item) for item in obj]\n if isinstance(obj, dict):\n for\
\ key in (\"actions\", \"results\", \"statuses\"):\n if isinstance(obj.get(key), list):\n return\
\ [collect_action(default_name, item) for item in obj[key]]\n return [collect_action(default_name, obj)]\n\n\ndef build_verdict(ad_obj:\
\ Any, entra_obj: Any, action_obj: Any) -> dict[str, Any]:\n identity_actions = [\n collect_action(\"Active Directory\
\ containment\", ad_obj),\n collect_action(\"Entra ID containment\", entra_obj),\n ]\n\n action_statuses =\
\ collect_actions(\"Containment action\", action_obj) if action_obj else []\n all_actions = identity_actions + action_statuses\n\
\n successful_actions = [a[\"name\"] for a in all_actions if a[\"status\"] == \"success\"]\n failed_actions = [a[\"\
name\"] for a in all_actions if a[\"status\"] == \"failed\"]\n skipped_actions = [a[\"name\"] for a in all_actions if\
\ a[\"status\"] == \"skipped\"]\n unknown_actions = [a[\"name\"] for a in all_actions if a[\"status\"] == \"unknown\"\
]\n\n ad_status = identity_actions[0][\"status\"]\n entra_status = identity_actions[1][\"status\"]\n\n identity_success\
\ = ad_status == \"success\" or entra_status == \"success\"\n critical_failure = any(a[\"status\"] == \"failed\" and\
\ a.get(\"critical\", True) for a in all_actions)\n\n no_matching_account = (\n ad_status == \"skipped\"\n \
\ and entra_status == \"skipped\"\n and not failed_actions\n and not successful_actions\n )\n\n \
\ if no_matching_account:\n status = \"skipped\"\n containment_success = False\n elif identity_success\
\ and critical_failure:\n status = \"partial_success\"\n containment_success = False\n elif identity_success:\n\
\ status = \"success\"\n containment_success = True\n elif ad_status in (\"failed\", \"unknown\") and entra_status\
\ in (\"failed\", \"unknown\"):\n status = \"failed\"\n containment_success = False\n else:\n status\
\ = \"failed\"\n containment_success = False\n\n residual_items = []\n if failed_actions:\n residual_items.append(\"\
Failed actions: \" + \", \".join(failed_actions))\n if skipped_actions:\n residual_items.append(\"Skipped actions:\
\ \" + \", \".join(skipped_actions))\n if unknown_actions:\n residual_items.append(\"Unknown actions: \" + \"\
, \".join(unknown_actions))\n\n residual_risk = (\n \"; \".join(residual_items)\n if residual_items\n \
\ else \"No residual containment risk identified from supplied action statuses.\"\n )\n\n return {\n \
\ \"containment_success\": containment_success,\n \"status\": status,\n \"residual_risk\": residual_risk,\n\
\ \"failed_actions\": failed_actions,\n \"successful_actions\": successful_actions,\n \"skipped_actions\"\
: skipped_actions,\n }\n\n\ndef main():\n try:\n args = demisto.args()\n\n ad_obj = parse_json_arg(args.get(\"\
ad_status_json\"))\n entra_obj = parse_json_arg(args.get(\"entra_status_json\"))\n action_obj = parse_json_arg(args.get(\"\
action_status_json\"))\n\n verdict = build_verdict(ad_obj, entra_obj, action_obj)\n\n readable = tableToMarkdown(\n\
\ \"Compromised User Containment Verdict\",\n [{\n \"Status\": verdict[\"status\"],\n\
\ \"Containment Success\": verdict[\"containment_success\"],\n \"Successful Actions\": \"\
, \".join(verdict[\"successful_actions\"]) or \"None\",\n \"Failed Actions\": \", \".join(verdict[\"failed_actions\"\
]) or \"None\",\n \"Skipped Actions\": \", \".join(verdict[\"skipped_actions\"]) or \"None\",\n \
\ \"Residual Risk\": verdict[\"residual_risk\"],\n }],\n )\n\n return_results(CommandResults(\n\
\ readable_output=readable,\n outputs_prefix=OUTPUT_PREFIX,\n outputs=verdict,\n \
\ ))\n\n except Exception as ex:\n demisto.error(traceback.format_exc())\n return_error(f\"Failed to build\
\ compromised user containment verdict. Error: {ex}\")\n\n\nif __name__ in (\"__main__\", \"__builtin__\", \"builtins\"\
):\n main()"
type: python
tags: []
comment: Combines AD, Entra ID, session, MFA, password, and note/action statuses into one final compromised user containment
verdict.
enabled: true
args:
- supportedModules: []
name: ad_status_json
description: Optional JSON or string containing Active Directory containment status.
- supportedModules: []
name: entra_status_json
description: Optional JSON or string containing Entra ID containment status.
- supportedModules: []
name: action_status_json
description: Optional JSON or string/list containing additional containment action statuses such as sessions, MFA, password
reset, or analyst notes.
outputs:
- contextPath: CompromisedUserContainment.Verdict.containment_success
description: True only when at least one identity platform was contained successfully and no critical containment action
failed.
type: boolean
- contextPath: CompromisedUserContainment.Verdict.status
description: Final containment verdict status. Possible values include success, partial_success, failed, or skipped.
type: string
- contextPath: CompromisedUserContainment.Verdict.residual_risk
description: Residual risk statement built from failed, skipped, or unknown actions.
type: string
- contextPath: CompromisedUserContainment.Verdict.failed_actions
description: List of failed containment actions.
type: unknown
- contextPath: CompromisedUserContainment.Verdict.successful_actions
description: List of successful containment actions.
type: unknown
- contextPath: CompromisedUserContainment.Verdict.skipped_actions
description: List of skipped containment actions.
type: unknown
scripttarget: 0
subtype: python3
runonce: false
dockerimage: demisto/python3:3.10.13.84405
runas: DBotWeakRole
Loading