mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
fix the issue in mark_timeout (vibe-kanban db2a9506)
distinguish between the global timeout and node timeout. For node-level timeout, the status should be updated to timeout. for global timeout, the status should be updated to expired. For status in (timeout, expired, SUBMITTED), the form should not be processed by the `check_and_handle_human_input_timeouts` logic. only node-level timeout should resume the execution of workflow, global timeout should mark execution as STOPPED. Update the documentation of HumanInputFormStatus to reflect the facts above.
This commit is contained in:
@ -188,6 +188,10 @@ class HumanInputFormRecord:
|
||||
)
|
||||
|
||||
|
||||
class _InvalidTimeoutStatusError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class HumanInputFormRepositoryImpl:
|
||||
def __init__(
|
||||
self,
|
||||
@ -502,20 +506,29 @@ class HumanInputFormSubmissionRepository:
|
||||
|
||||
return HumanInputFormRecord.from_models(form_model, recipient_model)
|
||||
|
||||
def mark_timeout(self, *, form_id: str, reason: str | None = None) -> HumanInputFormRecord:
|
||||
def mark_timeout(
|
||||
self,
|
||||
*,
|
||||
form_id: str,
|
||||
timeout_status: HumanInputFormStatus,
|
||||
reason: str | None = None,
|
||||
) -> HumanInputFormRecord:
|
||||
with self._session_factory(expire_on_commit=False) as session, session.begin():
|
||||
form_model = session.get(HumanInputForm, form_id)
|
||||
if form_model is None:
|
||||
raise FormNotFoundError(f"form not found, id={form_id}")
|
||||
|
||||
if timeout_status not in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}:
|
||||
raise _InvalidTimeoutStatusError(f"invalid timeout status: {timeout_status}")
|
||||
|
||||
# already handled or submitted
|
||||
if form_model.status == HumanInputFormStatus.TIMEOUT:
|
||||
if form_model.status in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}:
|
||||
return HumanInputFormRecord.from_models(form_model, None)
|
||||
|
||||
if form_model.submitted_at is not None or form_model.status == HumanInputFormStatus.SUBMITTED:
|
||||
raise FormNotFoundError(f"form already submitted, id={form_id}")
|
||||
|
||||
form_model.status = HumanInputFormStatus.TIMEOUT
|
||||
form_model.status = timeout_status
|
||||
form_model.selected_action_id = None
|
||||
form_model.submitted_data = None
|
||||
form_model.submission_user_id = None
|
||||
|
||||
@ -2,11 +2,20 @@ import enum
|
||||
|
||||
|
||||
class HumanInputFormStatus(enum.StrEnum):
|
||||
"""Status of a human input form."""
|
||||
"""Status of a human input form.
|
||||
"""
|
||||
|
||||
# Awaiting submission from any recipient. Forms stay in this state until
|
||||
# submitted or a timeout rule applies.
|
||||
WAITING = enum.auto()
|
||||
# Global timeout reached. The workflow run is stopped and will not resume.
|
||||
# This is distinct from node-level timeout.
|
||||
EXPIRED = enum.auto()
|
||||
# Submitted by a recipient; form data is available and execution resumes
|
||||
# along the selected action edge.
|
||||
SUBMITTED = enum.auto()
|
||||
# Node-level timeout reached. The human input node should emit a timeout
|
||||
# event and the workflow should resume along the timeout edge.
|
||||
TIMEOUT = enum.auto()
|
||||
|
||||
|
||||
|
||||
@ -249,7 +249,10 @@ class HumanInputNode(Node[HumanInputNodeData]):
|
||||
yield self._form_to_pause_event(form_entity)
|
||||
return
|
||||
|
||||
if form.status == HumanInputFormStatus.TIMEOUT or form.expiration_time <= naive_utc_now():
|
||||
if (
|
||||
form.status in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}
|
||||
or form.expiration_time <= naive_utc_now()
|
||||
):
|
||||
yield HumanInputFormTimeoutEvent(
|
||||
node_title=self._node_data.title,
|
||||
expiration_time=form.expiration_time,
|
||||
|
||||
@ -186,7 +186,7 @@ class HumanInputService:
|
||||
def ensure_form_active(self, form: Form) -> None:
|
||||
if form.submitted:
|
||||
raise FormSubmittedError(form.id)
|
||||
if form.status == HumanInputFormStatus.TIMEOUT:
|
||||
if form.status in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}:
|
||||
raise FormExpiredError(form.id)
|
||||
now = naive_utc_now()
|
||||
if ensure_naive_utc(form.expiration_time) <= now:
|
||||
|
||||
@ -47,7 +47,7 @@ def _handle_global_timeout(*, form_id: str, workflow_run_id: str, node_id: str,
|
||||
with session_factory() as session, session.begin():
|
||||
workflow_run = session.get(WorkflowRun, workflow_run_id)
|
||||
if workflow_run is not None:
|
||||
workflow_run.status = WorkflowExecutionStatus.FAILED
|
||||
workflow_run.status = WorkflowExecutionStatus.STOPPED
|
||||
workflow_run.error = f"Human input global timeout at node {node_id}"
|
||||
workflow_run.finished_at = now
|
||||
session.add(workflow_run)
|
||||
@ -83,6 +83,7 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
|
||||
HumanInputForm.status == HumanInputFormStatus.WAITING,
|
||||
HumanInputForm.expiration_time <= now,
|
||||
)
|
||||
.order_by(HumanInputForm.id.asc())
|
||||
.limit(limit)
|
||||
)
|
||||
expired_forms = session.scalars(stmt).all()
|
||||
@ -90,11 +91,16 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
|
||||
for form_model in expired_forms:
|
||||
try:
|
||||
if form_model.form_kind == HumanInputFormKind.DELIVERY_TEST:
|
||||
form_repo.mark_timeout(form_id=form_model.id, reason="delivery_test_timeout")
|
||||
form_repo.mark_timeout(
|
||||
form_id=form_model.id,
|
||||
timeout_status=HumanInputFormStatus.TIMEOUT,
|
||||
reason="delivery_test_timeout",
|
||||
)
|
||||
continue
|
||||
is_global = _is_global_timeout(form_model, global_timeout_seconds)
|
||||
record = form_repo.mark_timeout(
|
||||
form_id=form_model.id,
|
||||
timeout_status=HumanInputFormStatus.EXPIRED if is_global else HumanInputFormStatus.TIMEOUT,
|
||||
reason="global_timeout" if is_global else "node_timeout",
|
||||
)
|
||||
if is_global:
|
||||
|
||||
Reference in New Issue
Block a user