Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/package/yuxi/services/agent_run_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def _compact_stream_chunk(chunk: dict) -> dict:
"interrupt_info",
"source",
"agent_state",
"action_requests",
"review_configs",
)
if chunk.get(key) is not None and chunk.get(key) != ""
}
Expand Down
51 changes: 48 additions & 3 deletions backend/package/yuxi/services/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,45 @@ def _build_ask_user_question_payload(info: Any, thread_id: str) -> dict[str, Any
}


def _is_human_approval_payload(payload: dict) -> bool:
"""判断 interrupt 是否为 HumanInTheLoopMiddleware 的工具审批载荷。

HIL 中间件产生的 interrupt value 含 ``action_requests``(待审批的工具调用)
与 ``review_configs``(每个工具允许的决策类型),与 ask_user_question 的
``questions`` 结构不同。用 ``action_requests`` 作为判别依据。
"""
action_requests = payload.get("action_requests")
return isinstance(action_requests, list) and len(action_requests) > 0


def _build_human_approval_payload(info: Any, thread_id: str) -> dict[str, Any]:
"""将 HIL 工具审批 interrupt 标准化为 human_approval_required 载荷。"""
payload = _coerce_interrupt_payload(info)

action_requests = payload.get("action_requests") or []
review_configs = payload.get("review_configs") or []

# 为每个 action_request 补齐 description(供前端展示),保留原始字段
normalized_actions: list[dict[str, Any]] = []
for action in action_requests:
if not isinstance(action, dict):
continue
action = dict(action)
if not action.get("description"):
action["description"] = "操作需要确认\n\nTool: {name}\nArgs: {args}".format(
name=action.get("name", ""),
args=action.get("args", {}),
)
normalized_actions.append(action)

return {
"action_requests": normalized_actions,
"review_configs": review_configs,
"source": "human_approval",
"thread_id": thread_id,
}


def _ensure_full_msg(full_msg: AIMessage | None, accumulated_content: list[str]) -> AIMessage | None:
"""如果 full_msg 为空且有累积内容,构建 AIMessage"""
if not full_msg and accumulated_content:
Expand Down Expand Up @@ -673,9 +712,15 @@ async def check_and_handle_interrupts(

interrupt_info = _extract_interrupt_info(state)
if interrupt_info:
question_payload = _build_ask_user_question_payload(interrupt_info, thread_id)
meta["interrupt"] = question_payload
yield make_chunk(status="ask_user_question_required", meta=meta, **question_payload)
payload = _coerce_interrupt_payload(interrupt_info)
if _is_human_approval_payload(payload):
approval_payload = _build_human_approval_payload(interrupt_info, thread_id)
meta["interrupt"] = approval_payload
yield make_chunk(status="human_approval_required", meta=meta, **approval_payload)
else:
question_payload = _build_ask_user_question_payload(interrupt_info, thread_id)
meta["interrupt"] = question_payload
yield make_chunk(status="ask_user_question_required", meta=meta, **question_payload)

except Exception as e:
logger.exception(f"Error checking interrupts: {e}")
Expand Down
36 changes: 30 additions & 6 deletions backend/package/yuxi/services/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,35 @@ def _chunk_thread_id(chunk: dict, fallback: str | None) -> str | None:
return _thread_id_from_mapping(chunk) or fallback


def _interrupt_summary(chunk: dict) -> str:
"""从 interrupt chunk 提取人类可读的摘要。

兼容两类 interrupt 载荷:ask_user_question 的 ``questions`` 与
HumanInTheLoop 的 ``action_requests``(工具审批)。无可用信息时返回空串。
"""
if not isinstance(chunk, dict):
return ""

questions = chunk.get("questions")
if isinstance(questions, list) and questions:
first = questions[0]
if isinstance(first, dict):
return str(first.get("question") or "").strip()

action_requests = chunk.get("action_requests")
if isinstance(action_requests, list) and action_requests:
first = action_requests[0]
if isinstance(first, dict):
name = str(first.get("name") or "").strip()
args = first.get("args")
if name:
if args:
return f"操作需要确认: {name}({args})"
return f"操作需要确认: {name}"

return ""


def _map_chunk_to_run_event(chunk: dict) -> tuple[str, dict]:
status = chunk.get("status") or "event"
if status == "loading":
Expand Down Expand Up @@ -407,12 +436,7 @@ async def process_agent_run(ctx, run_id: str):
await _append_end_event(run_id, status_value, thread_id=thread_id, payload={"chunk": chunk})
terminal_set = True
elif status in {"ask_user_question_required", "human_approval_required"}:
questions = chunk.get("questions") if isinstance(chunk, dict) else None
first_question = ""
if isinstance(questions, list) and questions:
first = questions[0]
if isinstance(first, dict):
first_question = str(first.get("question") or "").strip()
first_question = _interrupt_summary(chunk)

await mark_run_terminal(
run_id,
Expand Down
43 changes: 43 additions & 0 deletions backend/test/unit/services/test_agent_run_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,3 +1145,46 @@ async def set_input_message(self, run_id: str, message_id: int):
)

assert captured["input_payload"]["model_spec"] == "parent-model"


class TestCompactStreamChunkHil:
"""测试 _compact_stream_chunk 在 verbose=false 精简时保留 HIL 工具审批字段。

回归:修复前白名单漏掉 action_requests/review_configs,前端收到空弹窗。
"""

def test_compact_keeps_human_approval_action_requests(self):
chunk = {
"status": "human_approval_required",
"action_requests": [{"name": "delete_file", "args": {"path": "/x"}}],
"review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "reject"]}],
"source": "human_approval",
}
compact = agent_run_service._compact_stream_chunk(chunk)

assert compact["status"] == "human_approval_required"
assert compact["action_requests"] == chunk["action_requests"]
assert compact["review_configs"] == chunk["review_configs"]

def test_compact_keeps_ask_user_question_questions(self):
"""ask_user_question 的 questions 仍保留(未回归)。"""
chunk = {
"status": "ask_user_question_required",
"questions": [{"question": "选择一个", "options": ["A"]}],
}
compact = agent_run_service._compact_stream_chunk(chunk)

assert compact["questions"] == chunk["questions"]

def test_compact_drops_missing_fields(self):
"""白名单外的字段被丢弃;白名单内但未提供的字段不出现。"""
chunk = {
"status": "human_approval_required",
"extra_field": "should be dropped",
}
compact = agent_run_service._compact_stream_chunk(chunk)

assert compact["status"] == "human_approval_required"
assert "extra_field" not in compact
assert "action_requests" not in compact
assert "review_configs" not in compact
142 changes: 139 additions & 3 deletions backend/test/unit/services/test_chat_stream_interrupt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
from yuxi.services.chat_service import (
_normalize_interrupt_questions,
_build_ask_user_question_payload,
_build_human_approval_payload,
_is_human_approval_payload,
_coerce_interrupt_payload,
check_and_handle_interrupts,
stream_agent_resume,
)
from yuxi.services import chat_service as svc
from yuxi.services import run_worker
from yuxi.utils.question_utils import normalize_options


Expand Down Expand Up @@ -215,9 +219,12 @@ class FakeAgent:
context_schema = FakeContext

async def stream_resume_with_state(self, resume_command, input_context=None, **kwargs):
yield "messages", (
{"content": "child token", "id": "msg-child"},
{"namespace": ["task:1"], "thread_id": "child-thread"},
yield (
"messages",
(
{"content": "child token", "id": "msg-child"},
{"namespace": ["task:1"], "thread_id": "child-thread"},
),
)

async def get_graph(self, context=None):
Expand Down Expand Up @@ -290,3 +297,132 @@ def test_string_input(self):
def test_none_input(self):
result = _coerce_interrupt_payload(None)
assert isinstance(result, dict)


class TestHumanApprovalPayloadDetection:
"""测试 HIL 工具审批 interrupt 的判别与载荷构建。"""

def test_is_human_approval_payload_true_for_action_requests(self):
payload = {"action_requests": [{"name": "delete_file", "args": {"path": "/x"}}], "review_configs": []}
assert _is_human_approval_payload(payload) is True

def test_is_human_approval_payload_false_for_questions(self):
assert _is_human_approval_payload({"questions": [{"question": "Q"}]}) is False

def test_is_human_approval_payload_false_for_empty(self):
assert _is_human_approval_payload({}) is False
assert _is_human_approval_payload({"action_requests": []}) is False

def test_build_human_approval_payload_preserves_action_requests(self):
info = {
"action_requests": [{"name": "send_email", "args": {"to": "a@b.com"}, "description": "需要确认"}],
"review_configs": [{"action_name": "send_email", "allowed_decisions": ["approve", "reject"]}],
}
result = _build_human_approval_payload(info, "thread-hil-1")

assert result["source"] == "human_approval"
assert result["thread_id"] == "thread-hil-1"
assert len(result["action_requests"]) == 1
assert result["action_requests"][0]["name"] == "send_email"
assert result["action_requests"][0]["description"] == "需要确认"
assert result["review_configs"][0]["allowed_decisions"] == ["approve", "reject"]

def test_build_human_approval_payload_fills_missing_description(self):
"""HIL payload 缺 description 时补默认提示,便于前端展示。"""
info = {"action_requests": [{"name": "delete_file", "args": {"path": "/tmp/x"}}], "review_configs": []}
result = _build_human_approval_payload(info, "thread-hil-2")

desc = result["action_requests"][0]["description"]
assert "delete_file" in desc
assert "/tmp/x" in desc


class TestCheckAndHandleInterruptsRouting:
"""测试 check_and_handle_interrupts 按载荷类型分流到 ask_user_question / human_approval。"""

@pytest.mark.asyncio
async def test_routes_human_approval_interrupt_to_human_approval_required(self):
"""HIL interrupt(含 action_requests)应发 human_approval_required,不再塞默认空问题。"""
hil_value = {
"action_requests": [{"name": "delete_file", "args": {"path": "/x"}}],
"review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "reject"]}],
}

class FakeGraph:
async def aget_state(self, _config):
return SimpleNamespace(
values={"__interrupt__": [SimpleNamespace(value=hil_value)]},
tasks=[],
)

class FakeAgent:
async def get_graph(self):
return FakeGraph()

chunks: list[dict] = []

def make_chunk(status, meta, **kwargs):
return json.dumps({"status": status, "meta": meta, **kwargs}, ensure_ascii=False).encode("utf-8")

stream = check_and_handle_interrupts(FakeAgent(), {"configurable": {"thread_id": "t"}}, make_chunk, {}, "t")
async for raw in stream:
chunks.append(json.loads(raw.decode("utf-8")))

assert len(chunks) == 1
assert chunks[0]["status"] == "human_approval_required"
assert "action_requests" in chunks[0]
assert chunks[0]["action_requests"][0]["name"] == "delete_file"
assert "review_configs" in chunks[0]
# 不应再出现被误塞的默认空问题
assert "questions" not in chunks[0]

@pytest.mark.asyncio
async def test_routes_ask_user_question_interrupt_unchanged(self):
"""ask_user_question interrupt(含 questions)仍走原有 ask_user_question_required 通道。"""
info_value = {"questions": [{"question": "选择一个", "options": ["A", "B"]}]}

class FakeGraph:
async def aget_state(self, _config):
return SimpleNamespace(
values={"__interrupt__": [SimpleNamespace(value=info_value)]},
tasks=[],
)

class FakeAgent:
async def get_graph(self):
return FakeGraph()

chunks: list[dict] = []

def make_chunk(status, meta, **kwargs):
return json.dumps({"status": status, "meta": meta, **kwargs}, ensure_ascii=False).encode("utf-8")

stream = check_and_handle_interrupts(FakeAgent(), {"configurable": {"thread_id": "t"}}, make_chunk, {}, "t")
async for raw in stream:
chunks.append(json.loads(raw.decode("utf-8")))

assert len(chunks) == 1
assert chunks[0]["status"] == "ask_user_question_required"
assert chunks[0]["questions"][0]["question"] == "选择一个"


class TestInterruptSummary:
"""测试 run_worker._interrupt_summary 兼容两类 interrupt 载荷。"""

def test_summary_from_questions(self):
chunk = {"questions": [{"question": "是否继续?"}]}
assert run_worker._interrupt_summary(chunk) == "是否继续?"

def test_summary_from_action_requests_with_args(self):
chunk = {"action_requests": [{"name": "delete_file", "args": {"path": "/x"}}]}
summary = run_worker._interrupt_summary(chunk)
assert "delete_file" in summary
assert "/x" in summary

def test_summary_from_action_requests_without_args(self):
chunk = {"action_requests": [{"name": "send_email", "args": {}}]}
assert run_worker._interrupt_summary(chunk) == "操作需要确认: send_email"

def test_summary_empty_when_no_payload(self):
assert run_worker._interrupt_summary({}) == ""
assert run_worker._interrupt_summary(None) == ""
Loading