fix(api): add expiration_time to form definition and events / response (vibe-kanban 3290f924)

This commit is contained in:
QuantumGhost
2026-01-27 18:09:36 +08:00
parent 51ed03c9e0
commit f3eb342883
14 changed files with 87 additions and 45 deletions

View File

@ -8,7 +8,6 @@ from collections.abc import Generator
from flask import Response, jsonify, request
from flask_restx import Resource, reqparse
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
@ -33,8 +32,10 @@ from services.workflow_event_snapshot_service import build_workflow_event_stream
logger = logging.getLogger(__name__)
def _jsonify_pydantic_model(model: BaseModel) -> Response:
return Response(model.model_dump_json(), mimetype="application/json")
def _jsonify_form_definition(form: Form) -> Response:
payload = form.get_definition().model_dump()
payload["expiration_time"] = int(form.expiration_time.timestamp())
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")
@console_ns.route("/form/human_input/<string:form_token>")
@ -64,7 +65,7 @@ class ConsoleHumanInputFormApi(Resource):
self._ensure_console_access(form)
return _jsonify_pydantic_model(form.get_definition())
return _jsonify_form_definition(form)
@account_initialization_required
@login_required

View File

@ -5,6 +5,9 @@ from dataclasses import dataclass
from datetime import datetime
from typing import Any, NewType, Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
@ -58,6 +61,8 @@ from core.workflow.enums import (
WorkflowNodeExecutionStatus,
)
from core.workflow.runtime import GraphRuntimeState
from extensions.ext_database import db
from models.human_input import HumanInputForm
from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
@ -296,11 +301,23 @@ class WorkflowResponseConverter:
if self._application_generate_entity.invoke_from == InvokeFrom.SERVICE_API:
encoded_outputs = {}
pause_reasons = [reason.model_dump(mode="json") for reason in event.reasons]
human_input_form_ids = [reason.form_id for reason in event.reasons if isinstance(reason, HumanInputRequired)]
expiration_times_by_form_id: dict[str, datetime] = {}
if human_input_form_ids:
stmt = select(HumanInputForm.id, HumanInputForm.expiration_time).where(
HumanInputForm.id.in_(human_input_form_ids)
)
with Session(bind=db.engine) as session:
for form_id, expiration_time in session.execute(stmt):
expiration_times_by_form_id[str(form_id)] = expiration_time
responses: list[StreamResponse] = []
for reason in event.reasons:
if isinstance(reason, HumanInputRequired):
expiration_time = expiration_times_by_form_id.get(reason.form_id)
if expiration_time is None:
raise ValueError(f"HumanInputForm not found for pause reason, form_id={reason.form_id}")
responses.append(
HumanInputRequiredResponse(
task_id=task_id,
@ -315,6 +332,7 @@ class WorkflowResponseConverter:
display_in_ui=reason.display_in_ui,
form_token=reason.form_token,
resolved_default_values=reason.resolved_default_values,
expiration_time=int(expiration_time.timestamp()),
),
)
)

View File

@ -288,6 +288,7 @@ class HumanInputRequiredResponse(StreamResponse):
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int = Field(..., description="Unix timestamp in seconds")
event: StreamEvent = StreamEvent.HUMAN_INPUT_REQUIRED
workflow_run_id: str

View File

@ -1,6 +1,7 @@
from __future__ import annotations
from collections.abc import Mapping, Sequence
from datetime import datetime
from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, Field
@ -21,6 +22,7 @@ class HumanInputFormDefinition(BaseModel):
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: datetime
class HumanInputFormSubmissionData(BaseModel):

View File

@ -164,6 +164,9 @@ class HumanInputFormRecord:
def from_models(
cls, form_model: HumanInputForm, recipient_model: HumanInputFormRecipient | None
) -> "HumanInputFormRecord":
definition_payload = json.loads(form_model.form_definition)
if "expiration_time" not in definition_payload:
definition_payload["expiration_time"] = form_model.expiration_time
return cls(
form_id=form_model.id,
workflow_run_id=form_model.workflow_run_id,
@ -171,7 +174,7 @@ class HumanInputFormRecord:
tenant_id=form_model.tenant_id,
app_id=form_model.app_id,
form_kind=form_model.form_kind,
definition=FormDefinition.model_validate_json(form_model.form_definition),
definition=FormDefinition.model_validate(definition_payload),
rendered_content=form_model.rendered_content,
created_at=form_model.created_at,
expiration_time=form_model.expiration_time,
@ -341,8 +344,7 @@ class HumanInputFormRepositoryImpl:
inputs=form_config.inputs,
user_actions=form_config.user_actions,
rendered_content=params.rendered_content,
timeout=form_config.timeout,
timeout_unit=form_config.timeout_unit,
expiration_time=node_expiration,
default_values=dict(params.resolved_default_values),
display_in_ui=params.display_in_ui,
node_title=form_config.title,

View File

@ -312,9 +312,7 @@ class FormDefinition(BaseModel):
inputs: list[FormInput] = Field(default_factory=list)
user_actions: list[UserAction] = Field(default_factory=list)
rendered_content: str
timeout: int
timeout_unit: TimeoutUnit
expiration_time: datetime
# this is used to store the resolved default values
default_values: dict[str, Any] = Field(default_factory=dict)

View File

@ -19,6 +19,7 @@ Implementation Notes:
- Maintains data consistency with proper transaction handling
"""
import json
import logging
import uuid
from collections.abc import Sequence
@ -84,12 +85,14 @@ def _build_human_input_required_reason(
node_title = "Human Input"
form_id = reason_model.form_id
node_id = reason_model.node_id
if form_model is not None:
form_id = form_model.id
node_id = form_model.node_id or node_id
try:
definition = FormDefinition.model_validate_json(form_model.form_definition)
definition_payload = json.loads(form_model.form_definition)
if "expiration_time" not in definition_payload:
definition_payload["expiration_time"] = form_model.expiration_time
definition = FormDefinition.model_validate(definition_payload)
except ValidationError:
definition = None

View File

@ -112,7 +112,10 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository)
return None
try:
form_definition = FormDefinition.model_validate_json(form.form_definition)
definition_payload = json.loads(form.form_definition)
if "expiration_time" not in definition_payload:
definition_payload["expiration_time"] = form.expiration_time
form_definition = FormDefinition.model_validate(definition_payload)
except ValueError:
logger.warning("Failed to load form definition for HumanInputContent(id=%s)", model.id)
return None
@ -135,6 +138,7 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository)
display_in_ui=display_in_ui,
form_token=form_token,
resolved_default_values=form_definition.default_values,
expiration_time=form.expiration_time,
),
)

View File

@ -5,7 +5,7 @@ from datetime import datetime, timedelta
from decimal import Decimal
from uuid import uuid4
from core.workflow.nodes.human_input.entities import FormDefinition, TimeoutUnit, UserAction
from core.workflow.nodes.human_input.entities import FormDefinition, UserAction
from models.account import Account, Tenant, TenantAccountJoin
from models.execution_extra_content import HumanInputContent
from models.human_input import HumanInputForm, HumanInputFormStatus
@ -116,8 +116,7 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture:
inputs=[],
user_actions=[UserAction(id=action_id, title=action_text)],
rendered_content="Rendered block",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=datetime.utcnow() + timedelta(days=1),
node_title=node_title,
display_in_ui=True,
)

View File

@ -1,6 +1,10 @@
from datetime import UTC, datetime
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
import core.app.apps.common.workflow_response_converter as workflow_response_converter
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.workflow.app_runner import WorkflowAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom
@ -111,7 +115,7 @@ def _build_converter():
)
def test_queue_workflow_paused_event_to_stream_responses():
def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.MonkeyPatch):
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task",
@ -120,6 +124,21 @@ def test_queue_workflow_paused_event_to_stream_responses():
reason=WorkflowStartReason.INITIAL,
)
expiration_time = datetime(2024, 1, 1, tzinfo=UTC)
class _FakeSession:
def execute(self, _stmt):
return [("form-1", expiration_time)]
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession())
monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object()))
reason = HumanInputRequired(
form_id="form-1",
form_content="Rendered",
@ -161,3 +180,4 @@ def test_queue_workflow_paused_event_to_stream_responses():
assert hi_resp.data.inputs[0].output_variable_name == "field"
assert hi_resp.data.actions[0].id == "approve"
assert hi_resp.data.display_in_ui is True
assert hi_resp.data.expiration_time == int(expiration_time.timestamp())

View File

@ -19,10 +19,9 @@ from core.workflow.nodes.human_input.entities import (
ExternalRecipient,
FormDefinition,
MemberRecipient,
TimeoutUnit,
UserAction,
)
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
from core.workflow.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
from libs.datetime_utils import naive_utc_now
from models.human_input import (
EmailExternalRecipientPayload,
@ -162,8 +161,7 @@ def _make_form_definition() -> str:
inputs=[],
user_actions=[UserAction(id="submit", title="Submit")],
rendered_content="<p>hello</p>",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=datetime.utcnow(),
).model_dump_json()
@ -177,6 +175,8 @@ class _DummyForm:
form_definition: str
rendered_content: str
expiration_time: datetime
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME
created_at: datetime = dataclasses.field(default_factory=naive_utc_now)
selected_action_id: str | None = None
submitted_data: str | None = None
submitted_at: datetime | None = None

View File

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session, sessionmaker
from core.workflow.entities.pause_reason import HumanInputRequired, PauseReasonType
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes.human_input.entities import FormDefinition, FormInput, UserAction
from core.workflow.nodes.human_input.enums import FormInputType, HumanInputFormStatus, TimeoutUnit
from core.workflow.nodes.human_input.enums import FormInputType, HumanInputFormStatus
from models.human_input import BackstageRecipientPayload, HumanInputForm, HumanInputFormRecipient, RecipientType
from models.workflow import WorkflowPause as WorkflowPauseModel
from models.workflow import WorkflowPauseReason, WorkflowRun
@ -208,6 +208,7 @@ class TestResumeWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
sample_workflow_pause.resumed_at = None
mock_session.scalar.return_value = sample_workflow_run
mock_session.scalars.return_value.all.return_value = []
with patch("repositories.sqlalchemy_api_workflow_run_repository.naive_utc_now") as mock_now:
mock_now.return_value = datetime.now(UTC)
@ -372,13 +373,13 @@ class TestPrivateWorkflowPauseEntity(TestDifyAPISQLAlchemyWorkflowRunRepository)
class TestBuildHumanInputRequiredReason:
def test_prefers_backstage_token_when_available(self):
expiration_time = datetime.now(UTC)
form_definition = FormDefinition(
form_content="content",
inputs=[FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="name")],
user_actions=[UserAction(id="approve", title="Approve")],
rendered_content="rendered",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=expiration_time,
default_values={"name": "Alice"},
node_title="Ask Name",
display_in_ui=True,
@ -392,7 +393,7 @@ class TestBuildHumanInputRequiredReason:
form_definition=form_definition.model_dump_json(),
rendered_content="rendered",
status=HumanInputFormStatus.WAITING,
expiration_time=datetime.now(UTC),
expiration_time=expiration_time,
)
reason_model = WorkflowPauseReason(
pause_id="pause-1",

View File

@ -10,10 +10,7 @@ from core.workflow.nodes.human_input.entities import (
FormDefinition,
UserAction,
)
from core.workflow.nodes.human_input.enums import (
HumanInputFormStatus,
TimeoutUnit,
)
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
from models.execution_extra_content import HumanInputContent as HumanInputContentModel
from models.human_input import ConsoleRecipientPayload, HumanInputForm, HumanInputFormRecipient, RecipientType
from repositories.sqlalchemy_execution_extra_content_repository import SQLAlchemyExecutionExtraContentRepository
@ -52,13 +49,13 @@ class _FakeSessionMaker:
def _build_form(action_id: str, action_title: str, rendered_content: str) -> HumanInputForm:
expiration_time = datetime.now(UTC) + timedelta(days=1)
definition = FormDefinition(
form_content="content",
inputs=[],
user_actions=[UserAction(id=action_id, title=action_title)],
rendered_content="rendered",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=expiration_time,
node_title="Approval",
display_in_ui=True,
)
@ -71,7 +68,7 @@ def _build_form(action_id: str, action_title: str, rendered_content: str) -> Hum
form_definition=definition.model_dump_json(),
rendered_content=rendered_content,
status=HumanInputFormStatus.SUBMITTED,
expiration_time=datetime.now(UTC) + timedelta(days=1),
expiration_time=expiration_time,
)
form.selected_action_id = action_id
return form
@ -120,13 +117,13 @@ def test_get_by_message_ids_groups_contents_by_message() -> None:
def test_get_by_message_ids_returns_unsubmitted_form_definition() -> None:
expiration_time = datetime.now(UTC) + timedelta(days=1)
definition = FormDefinition(
form_content="content",
inputs=[],
user_actions=[UserAction(id="approve", title="Approve")],
rendered_content="rendered",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=expiration_time,
default_values={"name": "John"},
node_title="Approval",
display_in_ui=True,
@ -140,7 +137,7 @@ def test_get_by_message_ids_returns_unsubmitted_form_definition() -> None:
form_definition=definition.model_dump_json(),
rendered_content="Rendered block",
status=HumanInputFormStatus.WAITING,
expiration_time=datetime.now(UTC) + timedelta(days=1),
expiration_time=expiration_time,
)
content = HumanInputContentModel(
id="content-msg-1",
@ -170,6 +167,8 @@ def test_get_by_message_ids_returns_unsubmitted_form_definition() -> None:
assert domain_content.submitted is False
assert domain_content.workflow_run_id == "workflow-run"
assert domain_content.form_definition is not None
assert domain_content.form_definition.expiration_time == form.expiration_time
assert domain_content.form_definition is not None
form_definition = domain_content.form_definition
assert form_definition.form_id == "form-1"
assert form_definition.node_id == "node-id"
@ -178,3 +177,4 @@ def test_get_by_message_ids_returns_unsubmitted_form_definition() -> None:
assert form_definition.display_in_ui is True
assert form_definition.form_token == "token-1"
assert form_definition.resolved_default_values == {"name": "John"}
assert form_definition.expiration_time == form.expiration_time

View File

@ -14,12 +14,7 @@ from core.workflow.nodes.human_input.entities import (
FormInput,
UserAction,
)
from core.workflow.nodes.human_input.enums import (
FormInputType,
HumanInputFormKind,
HumanInputFormStatus,
TimeoutUnit,
)
from core.workflow.nodes.human_input.enums import FormInputType, HumanInputFormKind, HumanInputFormStatus
from models.human_input import RecipientType
from services.human_input_service import Form, FormExpiredError, HumanInputService, InvalidFormDataError
@ -50,8 +45,7 @@ def sample_form_record():
inputs=[],
user_actions=[UserAction(id="submit", title="Submit")],
rendered_content="<p>hello</p>",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=datetime.utcnow() + timedelta(hours=1),
),
rendered_content="<p>hello</p>",
created_at=datetime.utcnow(),
@ -277,8 +271,7 @@ def test_submit_form_by_token_missing_inputs(sample_form_record, mock_session_fa
inputs=[FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="content")],
user_actions=sample_form_record.definition.user_actions,
rendered_content="<p>hello</p>",
timeout=1,
timeout_unit=TimeoutUnit.HOUR,
expiration_time=sample_form_record.expiration_time,
)
form_with_input = dataclasses.replace(sample_form_record, definition=definition_with_input)
repo.get_by_token.return_value = form_with_input