mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
WIP: feat(api): always use form_token to submit human input form
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
from datetime import timedelta
|
||||
from enum import StrEnum
|
||||
from typing import Literal
|
||||
|
||||
@ -82,9 +83,9 @@ class AppExecutionConfig(BaseSettings):
|
||||
default=0,
|
||||
)
|
||||
|
||||
HITL_GLOBAL_TIMEOUT_HOURS: PositiveInt = Field(
|
||||
description="Maximum hours a workflow run can stay paused waiting for human input before global timeout.",
|
||||
default=24 * 7,
|
||||
HITL_GLOBAL_TIMEOUT_SECONDS: PositiveInt = Field(
|
||||
description="Maximum seconds a workflow run can stay paused waiting for human input before global timeout.",
|
||||
default=int(timedelta(days=3).total_seconds()),
|
||||
ge=1,
|
||||
)
|
||||
|
||||
|
||||
@ -11,7 +11,6 @@ from flask_restx import Resource, reqparse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
from controllers.console import console_ns
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
@ -20,55 +19,32 @@ from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
|
||||
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
|
||||
from core.app.apps.message_generator import MessageGenerator
|
||||
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
||||
from core.workflow.nodes.human_input.entities import FormDefinition
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models import App
|
||||
from models.enums import CreatorUserRole
|
||||
from models.human_input import HumanInputForm as HumanInputFormModel
|
||||
from models.human_input import RecipientType
|
||||
from models.model import AppMode
|
||||
from models.workflow import Workflow, WorkflowRun
|
||||
from repositories.factory import DifyAPIRepositoryFactory
|
||||
from services.human_input_service import HumanInputService
|
||||
from services.human_input_service import Form, HumanInputService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _FormDefinitionWithSite(FormDefinition):
|
||||
# the site field may be not necessary for console scenario.
|
||||
site: None
|
||||
|
||||
|
||||
def _jsonify_pydantic_model(model: BaseModel) -> Response:
|
||||
return Response(model.model_dump_json(), mimetype="application/json")
|
||||
|
||||
|
||||
@console_ns.route("/form/human_input/<string:form_id>")
|
||||
@console_ns.route("/form/human_input/<string:form_token>")
|
||||
class ConsoleHumanInputFormApi(Resource):
|
||||
"""Console API for getting human input form definition."""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, form_id: str):
|
||||
"""
|
||||
Get human input form definition by form ID.
|
||||
|
||||
GET /console/api/form/human_input/<form_id>
|
||||
"""
|
||||
service = HumanInputService(db.engine)
|
||||
form = service.get_form_definition_by_id(
|
||||
form_id=form_id,
|
||||
)
|
||||
if form is None:
|
||||
raise NotFoundError(f"form not found, id={form_id}")
|
||||
|
||||
@staticmethod
|
||||
def _ensure_console_access(form: Form):
|
||||
current_user, current_tenant_id = current_account_with_tenant()
|
||||
form_model = db.session.get(HumanInputFormModel, form_id)
|
||||
if form_model is None or form_model.tenant_id != current_tenant_id:
|
||||
raise NotFoundError(f"form not found, id={form_id}")
|
||||
|
||||
workflow_run = db.session.get(WorkflowRun, form_model.workflow_run_id)
|
||||
workflow_run = db.session.get(WorkflowRun, form.workflow_run_id)
|
||||
if workflow_run is None or workflow_run.tenant_id != current_tenant_id:
|
||||
raise NotFoundError("Workflow run not found")
|
||||
|
||||
@ -81,20 +57,32 @@ class ConsoleHumanInputFormApi(Resource):
|
||||
workflow = db.session.get(Workflow, workflow_run.workflow_id)
|
||||
if workflow is None or workflow.tenant_id != current_tenant_id:
|
||||
raise NotFoundError("Workflow not found")
|
||||
owner_account_id = workflow.created_by
|
||||
|
||||
if owner_account_id != current_user.id:
|
||||
raise Forbidden("You do not have permission to access this human input form.")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, form_token: str):
|
||||
"""
|
||||
Get human input form definition by form token.
|
||||
|
||||
GET /console/api/form/human_input/<form_token>
|
||||
"""
|
||||
service = HumanInputService(db.engine)
|
||||
form = service.get_form_definition_by_token_for_console(form_token)
|
||||
if form is None:
|
||||
raise NotFoundError(f"form not found, token={form_token}")
|
||||
|
||||
self._ensure_console_access(form)
|
||||
|
||||
return _jsonify_pydantic_model(form.get_definition())
|
||||
|
||||
@account_initialization_required
|
||||
@login_required
|
||||
def post(self, form_id: str):
|
||||
def post(self, form_token: str):
|
||||
"""
|
||||
Submit human input form by form ID.
|
||||
Submit human input form by form token.
|
||||
|
||||
POST /console/api/form/human_input/<form_id>
|
||||
POST /console/api/form/human_input/<form_token>
|
||||
|
||||
Request body:
|
||||
{
|
||||
@ -110,13 +98,23 @@ class ConsoleHumanInputFormApi(Resource):
|
||||
args = parser.parse_args()
|
||||
current_user, _ = current_account_with_tenant()
|
||||
|
||||
# Submit the form
|
||||
service = HumanInputService(db.engine)
|
||||
service.submit_form_by_id(
|
||||
form_id=form_id,
|
||||
form = service.get_form_by_token(form_token)
|
||||
if form is None:
|
||||
raise NotFoundError(f"form not found, token={form_token}")
|
||||
|
||||
self._ensure_console_access(form)
|
||||
|
||||
recipient_type = form.recipient_type
|
||||
if recipient_type != RecipientType.CONSOLE:
|
||||
raise NotFoundError(f"form not found, token={form_token}")
|
||||
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.CONSOLE,
|
||||
form_token=form_token,
|
||||
selected_action_id=args["action"],
|
||||
form_data=args["inputs"],
|
||||
user=current_user,
|
||||
submission_user_id=current_user.id,
|
||||
)
|
||||
|
||||
return jsonify({})
|
||||
@ -167,9 +165,11 @@ class ConsoleWorkflowEventsApi(Resource):
|
||||
payload = response.model_dump(mode="json")
|
||||
payload["event"] = response.event.value
|
||||
|
||||
def generate_events() -> Generator[str, None, None]:
|
||||
def _generate_finished_events() -> Generator[str, None, None]:
|
||||
yield f"data: {json.dumps(payload)}\n\n"
|
||||
|
||||
event_generator = _generate_finished_events
|
||||
|
||||
else:
|
||||
msg_generator = MessageGenerator()
|
||||
if app.mode == AppMode.ADVANCED_CHAT:
|
||||
@ -179,13 +179,15 @@ class ConsoleWorkflowEventsApi(Resource):
|
||||
else:
|
||||
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
|
||||
|
||||
def generate_events():
|
||||
def _generate_stream_events():
|
||||
return generator.convert_to_event_stream(
|
||||
msg_generator.retrieve_events(AppMode(app.mode), workflow_run.id),
|
||||
)
|
||||
|
||||
event_generator = _generate_stream_events
|
||||
|
||||
return Response(
|
||||
generate_events(),
|
||||
event_generator(),
|
||||
mimetype="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
|
||||
@ -23,19 +23,19 @@ def _jsonify_form_definition(form: Form) -> Response:
|
||||
return Response(form.get_definition().model_dump_json(), mimetype="application/json")
|
||||
|
||||
|
||||
@web_ns.route("/form/human_input/<string:web_app_form_token>")
|
||||
@web_ns.route("/form/human_input/<string:form_token>")
|
||||
class HumanInputFormApi(WebApiResource):
|
||||
"""API for getting and submitting human input forms via the web app."""
|
||||
|
||||
def get(self, _app_model: App, _end_user: EndUser, web_app_form_token: str):
|
||||
def get(self, _app_model: App, _end_user: EndUser, form_token: str):
|
||||
"""
|
||||
Get human input form definition by token.
|
||||
|
||||
GET /api/form/human_input/<web_app_form_token>
|
||||
GET /api/form/human_input/<form_token>
|
||||
"""
|
||||
service = HumanInputService(db.engine)
|
||||
try:
|
||||
form = service.get_form_definition_by_token(RecipientType.WEBAPP, web_app_form_token)
|
||||
form = service.get_form_definition_by_token(RecipientType.STANDALONE_WEB_APP, form_token)
|
||||
except FormNotFoundError:
|
||||
raise NotFoundError("Form not found")
|
||||
|
||||
@ -44,11 +44,11 @@ class HumanInputFormApi(WebApiResource):
|
||||
|
||||
return _jsonify_form_definition(form)
|
||||
|
||||
def post(self, _app_model: App, _end_user: EndUser, web_app_form_token: str):
|
||||
def post(self, _app_model: App, _end_user: EndUser, form_token: str):
|
||||
"""
|
||||
Submit human input form by token.
|
||||
|
||||
POST /api/form/human_input/<web_app_form_token>
|
||||
POST /api/form/human_input/<form_token>
|
||||
|
||||
Request body:
|
||||
{
|
||||
@ -66,8 +66,8 @@ class HumanInputFormApi(WebApiResource):
|
||||
service = HumanInputService(db.engine)
|
||||
try:
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
form_token=web_app_form_token,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
form_token=form_token,
|
||||
selected_action_id=args["action"],
|
||||
form_data=args["inputs"],
|
||||
submission_end_user_id=_end_user.id,
|
||||
|
||||
@ -299,7 +299,7 @@ class WorkflowResponseConverter:
|
||||
form_content=reason.form_content,
|
||||
inputs=reason.inputs,
|
||||
actions=reason.actions,
|
||||
web_app_form_token=reason.web_app_form_token,
|
||||
form_token=reason.form_token,
|
||||
resolved_placeholder_values=reason.resolved_placeholder_values,
|
||||
),
|
||||
)
|
||||
|
||||
@ -277,7 +277,7 @@ class HumanInputRequiredResponse(StreamResponse):
|
||||
form_content: str
|
||||
inputs: Sequence[FormInput] = Field(default_factory=list)
|
||||
actions: Sequence[UserAction] = Field(default_factory=list)
|
||||
web_app_form_token: str | None = None
|
||||
form_token: str | None = None
|
||||
resolved_placeholder_values: Mapping[str, Any] = Field(default_factory=dict)
|
||||
|
||||
event: StreamEvent = StreamEvent.HUMAN_INPUT_REQUIRED
|
||||
|
||||
@ -17,7 +17,7 @@ from core.workflow.nodes.human_input.entities import (
|
||||
MemberRecipient,
|
||||
WebAppDeliveryMethod,
|
||||
)
|
||||
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
|
||||
from core.workflow.nodes.human_input.enums import DeliveryMethodType, HumanInputFormStatus
|
||||
from core.workflow.repositories.human_input_form_repository import (
|
||||
FormCreateParams,
|
||||
FormNotFoundError,
|
||||
@ -28,13 +28,15 @@ from libs.datetime_utils import naive_utc_now
|
||||
from libs.uuid_utils import uuidv7
|
||||
from models.account import Account, TenantAccountJoin
|
||||
from models.human_input import (
|
||||
ConsoleDeliveryPayload,
|
||||
ConsoleRecipientPayload,
|
||||
EmailExternalRecipientPayload,
|
||||
EmailMemberRecipientPayload,
|
||||
HumanInputDelivery,
|
||||
HumanInputForm,
|
||||
HumanInputFormRecipient,
|
||||
RecipientType,
|
||||
WebAppRecipientPayload,
|
||||
StandaloneWebAppRecipientPayload,
|
||||
)
|
||||
|
||||
|
||||
@ -70,7 +72,15 @@ class _HumanInputFormEntityImpl(HumanInputFormEntity):
|
||||
self._form_model = form_model
|
||||
self._recipients = [_HumanInputFormRecipientEntityImpl(recipient) for recipient in recipient_models]
|
||||
self._web_app_recipient = next(
|
||||
(recipient for recipient in recipient_models if recipient.recipient_type == RecipientType.WEBAPP),
|
||||
(
|
||||
recipient
|
||||
for recipient in recipient_models
|
||||
if recipient.recipient_type == RecipientType.STANDALONE_WEB_APP
|
||||
),
|
||||
None,
|
||||
)
|
||||
self._console_recipient = next(
|
||||
(recipient for recipient in recipient_models if recipient.recipient_type == RecipientType.CONSOLE),
|
||||
None,
|
||||
)
|
||||
self._submitted_data: Mapping[str, Any] | None = (
|
||||
@ -83,6 +93,8 @@ class _HumanInputFormEntityImpl(HumanInputFormEntity):
|
||||
|
||||
@property
|
||||
def web_app_token(self):
|
||||
if self._console_recipient is not None:
|
||||
return self._console_recipient.access_token
|
||||
if self._web_app_recipient is None:
|
||||
return None
|
||||
return self._web_app_recipient.access_token
|
||||
@ -195,8 +207,8 @@ class HumanInputFormRepositoryImpl:
|
||||
recipient_model = HumanInputFormRecipient(
|
||||
form_id=form_id,
|
||||
delivery_id=delivery_id,
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_payload=WebAppRecipientPayload().model_dump_json(),
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
recipient_payload=StandaloneWebAppRecipientPayload().model_dump_json(),
|
||||
)
|
||||
recipients.append(recipient_model)
|
||||
elif isinstance(delivery_method, EmailDeliveryMethod):
|
||||
@ -339,6 +351,28 @@ class HumanInputFormRepositoryImpl:
|
||||
session.add(delivery_and_recipients.delivery)
|
||||
session.add_all(delivery_and_recipients.recipients)
|
||||
recipient_models.extend(delivery_and_recipients.recipients)
|
||||
if params.console_recipient_required and not any(
|
||||
recipient.recipient_type == RecipientType.CONSOLE for recipient in recipient_models
|
||||
):
|
||||
console_delivery_id = str(uuidv7())
|
||||
console_delivery = HumanInputDelivery(
|
||||
id=console_delivery_id,
|
||||
form_id=form_id,
|
||||
delivery_method_type=DeliveryMethodType.WEBAPP,
|
||||
delivery_config_id=None,
|
||||
channel_payload=ConsoleDeliveryPayload().model_dump_json(),
|
||||
)
|
||||
console_recipient = HumanInputFormRecipient(
|
||||
form_id=form_id,
|
||||
delivery_id=console_delivery_id,
|
||||
recipient_type=RecipientType.CONSOLE,
|
||||
recipient_payload=ConsoleRecipientPayload(
|
||||
account_id=params.console_creator_account_id,
|
||||
).model_dump_json(),
|
||||
)
|
||||
session.add(console_delivery)
|
||||
session.add(console_recipient)
|
||||
recipient_models.append(console_recipient)
|
||||
session.flush()
|
||||
|
||||
return _HumanInputFormEntityImpl(form_model=form_model, recipient_models=recipient_models)
|
||||
|
||||
@ -32,11 +32,11 @@ class HumanInputRequired(BaseModel):
|
||||
# Only form inputs with placeholder type `VARIABLE` will be resolved and stored in `resolved_placeholder_values`.
|
||||
resolved_placeholder_values: Mapping[str, Any] = Field(default_factory=dict)
|
||||
|
||||
# The `web_app_form_token` is the token used to submit the form via webapp. It corresponds to
|
||||
# The `form_token` is the token used to submit the form via webapp. It corresponds to
|
||||
# `HumanInputFormRecipient.access_token`.
|
||||
#
|
||||
# This field is `None` if webapp delivery is not set.
|
||||
web_app_form_token: str | None = None
|
||||
form_token: str | None = None
|
||||
|
||||
|
||||
class SchedulingPause(BaseModel):
|
||||
|
||||
@ -13,7 +13,10 @@ class HumanInputFormStatus(enum.StrEnum):
|
||||
class DeliveryMethodType(enum.StrEnum):
|
||||
"""Delivery method types for human input forms."""
|
||||
|
||||
# WEBAPP controls whether the form is delivered to the web app. It not only controls
|
||||
# the standalone web app, but also controls the installed apps in the console.
|
||||
WEBAPP = enum.auto()
|
||||
|
||||
EMAIL = enum.auto()
|
||||
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ import logging
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl
|
||||
from core.workflow.entities.pause_reason import HumanInputRequired
|
||||
from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
|
||||
@ -153,9 +154,19 @@ class HumanInputNode(Node[HumanInputNodeData]):
|
||||
|
||||
return resolved_inputs
|
||||
|
||||
def _should_require_form_token(self) -> bool:
|
||||
if self.invoke_from == InvokeFrom.DEBUGGER:
|
||||
return True
|
||||
if self.invoke_from == InvokeFrom.EXPLORE:
|
||||
return self._node_data.is_webapp_enabled()
|
||||
return False
|
||||
|
||||
def _human_input_required_event(self, form_entity: HumanInputFormEntity) -> HumanInputRequired:
|
||||
node_data = self._node_data
|
||||
resolved_placeholder_values = self._resolve_inputs()
|
||||
form_token = form_entity.web_app_token
|
||||
if self._should_require_form_token() and form_token is None:
|
||||
raise AssertionError("Form token should be available for console execution.")
|
||||
return HumanInputRequired(
|
||||
form_id=form_entity.id,
|
||||
form_content=form_entity.rendered_content,
|
||||
@ -163,7 +174,7 @@ class HumanInputNode(Node[HumanInputNodeData]):
|
||||
actions=node_data.user_actions,
|
||||
node_id=self.id,
|
||||
node_title=node_data.title,
|
||||
web_app_form_token=form_entity.web_app_token,
|
||||
form_token=form_token,
|
||||
resolved_placeholder_values=resolved_placeholder_values,
|
||||
)
|
||||
|
||||
@ -188,6 +199,10 @@ class HumanInputNode(Node[HumanInputNodeData]):
|
||||
form_config=self._node_data,
|
||||
rendered_content=self._render_form_content_before_submission(),
|
||||
resolved_placeholder_values=self._resolve_inputs(),
|
||||
console_recipient_required=self._should_require_form_token(),
|
||||
console_creator_account_id=(
|
||||
self.user_id if self.invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE} else None
|
||||
),
|
||||
)
|
||||
form_entity = self._form_repository.create_form(params)
|
||||
# Create human input required event
|
||||
|
||||
@ -36,6 +36,10 @@ class FormCreateParams:
|
||||
# For type = CONSTANT, the value is not stored inside `resolved_placeholder_values`
|
||||
resolved_placeholder_values: Mapping[str, Any]
|
||||
|
||||
# Force creating a console-only recipient for submission in Console.
|
||||
console_recipient_required: bool = False
|
||||
console_creator_account_id: str | None = None
|
||||
|
||||
|
||||
class HumanInputFormEntity(abc.ABC):
|
||||
@property
|
||||
@ -49,7 +53,8 @@ class HumanInputFormEntity(abc.ABC):
|
||||
def web_app_token(self) -> str | None:
|
||||
"""web_app_token returns the token for submission inside webapp.
|
||||
|
||||
If web app delivery is not enabled, this method would return `None`.
|
||||
For console/debug execution, this may point to the console submission token
|
||||
if the form is configured to require console delivery.
|
||||
"""
|
||||
|
||||
# TODO: what if the users are allowed to add multiple
|
||||
|
||||
@ -112,7 +112,14 @@ class RecipientType(StrEnum):
|
||||
# EMAIL_MEMBER member means that the
|
||||
EMAIL_MEMBER = "email_member"
|
||||
EMAIL_EXTERNAL = "email_external"
|
||||
WEBAPP = "webapp"
|
||||
# STANDALONE_WEB_APP is used by the standalone web app.
|
||||
#
|
||||
# It's not used while running workflows / chatflows containing HumanInput
|
||||
# node inside console.
|
||||
STANDALONE_WEB_APP = "standalone_web_app"
|
||||
# CONSOLE is used while running workflows / chatflows containing HumanInput
|
||||
# node inside console. (E.G. running installed apps or debugging workflows / chatflows)
|
||||
CONSOLE = "console"
|
||||
|
||||
|
||||
@final
|
||||
@ -131,12 +138,27 @@ class EmailExternalRecipientPayload(BaseModel):
|
||||
|
||||
|
||||
@final
|
||||
class WebAppRecipientPayload(BaseModel):
|
||||
TYPE: Literal[RecipientType.WEBAPP] = RecipientType.WEBAPP
|
||||
class StandaloneWebAppRecipientPayload(BaseModel):
|
||||
TYPE: Literal[RecipientType.STANDALONE_WEB_APP] = RecipientType.STANDALONE_WEB_APP
|
||||
|
||||
|
||||
@final
|
||||
class ConsoleRecipientPayload(BaseModel):
|
||||
TYPE: Literal[RecipientType.CONSOLE] = RecipientType.CONSOLE
|
||||
account_id: str | None = None
|
||||
|
||||
|
||||
@final
|
||||
class ConsoleDeliveryPayload(BaseModel):
|
||||
type: Literal["console"] = "console"
|
||||
internal: bool = True
|
||||
|
||||
|
||||
RecipientPayload = Annotated[
|
||||
EmailMemberRecipientPayload | EmailExternalRecipientPayload | WebAppRecipientPayload,
|
||||
EmailMemberRecipientPayload
|
||||
| EmailExternalRecipientPayload
|
||||
| StandaloneWebAppRecipientPayload
|
||||
| ConsoleRecipientPayload,
|
||||
Field(discriminator="TYPE"),
|
||||
]
|
||||
|
||||
|
||||
@ -13,7 +13,6 @@ from core.workflow.nodes.human_input.entities import FormDefinition
|
||||
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.exception import BaseHTTPException
|
||||
from models.account import Account
|
||||
from models.human_input import RecipientType
|
||||
from models.model import App, AppMode
|
||||
from repositories.factory import DifyAPIRepositoryFactory
|
||||
@ -66,7 +65,8 @@ class FormSubmittedError(HumanInputError, BaseHTTPException):
|
||||
code = 412
|
||||
|
||||
def __init__(self, form_id: str):
|
||||
description = self.description.format(form_id=form_id)
|
||||
template = self.description or "This form has already been submitted by another user, form_id={form_id}"
|
||||
description = template.format(form_id=form_id)
|
||||
super().__init__(description=description)
|
||||
|
||||
|
||||
@ -115,22 +115,6 @@ class HumanInputService:
|
||||
return None
|
||||
return Form(record)
|
||||
|
||||
def get_form_by_id(self, form_id: str, recipient_type: RecipientType = RecipientType.WEBAPP) -> Form | None:
|
||||
record = self._form_repository.get_by_form_id_and_recipient_type(
|
||||
form_id=form_id,
|
||||
recipient_type=recipient_type,
|
||||
)
|
||||
if record is None:
|
||||
return None
|
||||
return Form(record)
|
||||
|
||||
def get_form_definition_by_id(self, form_id: str) -> Form | None:
|
||||
form = self.get_form_by_id(form_id, recipient_type=RecipientType.WEBAPP)
|
||||
if form is None:
|
||||
return None
|
||||
self._ensure_not_submitted(form)
|
||||
return form
|
||||
|
||||
def get_form_definition_by_token(self, recipient_type: RecipientType, form_token: str) -> Form | None:
|
||||
form = self.get_form_by_token(form_token)
|
||||
if form is None or form.recipient_type != recipient_type:
|
||||
@ -138,30 +122,12 @@ class HumanInputService:
|
||||
self._ensure_not_submitted(form)
|
||||
return form
|
||||
|
||||
def submit_form_by_id(
|
||||
self,
|
||||
form_id: str,
|
||||
selected_action_id: str,
|
||||
form_data: Mapping[str, Any],
|
||||
user: Account | None = None,
|
||||
):
|
||||
form = self.get_form_by_id(form_id, recipient_type=RecipientType.WEBAPP)
|
||||
if form is None:
|
||||
raise WebAppDeliveryNotEnabledError()
|
||||
|
||||
self._ensure_form_active(form)
|
||||
self._validate_submission(form=form, selected_action_id=selected_action_id, form_data=form_data)
|
||||
|
||||
result = self._form_repository.mark_submitted(
|
||||
form_id=form.id,
|
||||
recipient_id=form.recipient_id,
|
||||
selected_action_id=selected_action_id,
|
||||
form_data=form_data,
|
||||
submission_user_id=user.id if user else None,
|
||||
submission_end_user_id=None,
|
||||
)
|
||||
|
||||
self._enqueue_resume(result.workflow_run_id)
|
||||
def get_form_definition_by_token_for_console(self, form_token: str) -> Form | None:
|
||||
form = self.get_form_by_token(form_token)
|
||||
if form is None or form.recipient_type != RecipientType.CONSOLE:
|
||||
return None
|
||||
self._ensure_not_submitted(form)
|
||||
return form
|
||||
|
||||
def submit_form_by_token(
|
||||
self,
|
||||
@ -170,6 +136,7 @@ class HumanInputService:
|
||||
selected_action_id: str,
|
||||
form_data: Mapping[str, Any],
|
||||
submission_end_user_id: str | None = None,
|
||||
submission_user_id: str | None = None,
|
||||
):
|
||||
form = self.get_form_by_token(form_token)
|
||||
if form is None or form.recipient_type != recipient_type:
|
||||
@ -183,7 +150,7 @@ class HumanInputService:
|
||||
recipient_id=form.recipient_id,
|
||||
selected_action_id=selected_action_id,
|
||||
form_data=form_data,
|
||||
submission_user_id=None,
|
||||
submission_user_id=submission_user_id,
|
||||
submission_end_user_id=submission_end_user_id,
|
||||
)
|
||||
|
||||
|
||||
@ -789,7 +789,7 @@ class WorkflowService:
|
||||
node_id=node_id,
|
||||
node_title=node.title,
|
||||
resolved_placeholder_values=resolved_placeholder_values,
|
||||
web_app_form_token=None,
|
||||
form_token=None,
|
||||
)
|
||||
return human_input_required.model_dump(mode="json")
|
||||
|
||||
|
||||
@ -4,12 +4,12 @@ from datetime import timedelta
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.app_config.entities import WorkflowUIBasedAppConfig
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
|
||||
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
|
||||
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.workflow.entities import GraphInitParams
|
||||
from core.workflow.enums import WorkflowType
|
||||
|
||||
@ -69,7 +69,7 @@ def test_graph_run_paused_event_emits_queue_pause_event():
|
||||
actions=[],
|
||||
node_id="node-human",
|
||||
node_title="Human Step",
|
||||
web_app_form_token="tok",
|
||||
form_token="tok",
|
||||
)
|
||||
event = GraphRunPausedEvent(reasons=[reason], outputs={"foo": "bar"})
|
||||
workflow_entry = SimpleNamespace(
|
||||
@ -128,7 +128,7 @@ def test_queue_workflow_paused_event_to_stream_responses():
|
||||
actions=[UserAction(id="approve", title="Approve")],
|
||||
node_id="node-id",
|
||||
node_title="Human Step",
|
||||
web_app_form_token="token",
|
||||
form_token="token",
|
||||
)
|
||||
queue_event = QueueWorkflowPausedEvent(
|
||||
reasons=[reason],
|
||||
|
||||
@ -290,7 +290,7 @@ class TestHumanInputFormRepositoryImplPublicMethods:
|
||||
recipient = _DummyRecipient(
|
||||
id="recipient-1",
|
||||
form_id=form.id,
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
access_token="token-123",
|
||||
)
|
||||
session = _FakeSession(scalars_results=[form, [recipient]])
|
||||
@ -368,7 +368,7 @@ class TestHumanInputFormSubmissionRepository:
|
||||
recipient = _DummyRecipient(
|
||||
id="recipient-1",
|
||||
form_id=form.id,
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
access_token="token-123",
|
||||
form=form,
|
||||
)
|
||||
@ -379,7 +379,7 @@ class TestHumanInputFormSubmissionRepository:
|
||||
|
||||
assert record is not None
|
||||
assert record.form_id == form.id
|
||||
assert record.recipient_type == RecipientType.WEBAPP
|
||||
assert record.recipient_type == RecipientType.STANDALONE_WEB_APP
|
||||
assert record.submitted is False
|
||||
|
||||
def test_get_by_form_id_and_recipient_type_uses_recipient(self):
|
||||
@ -395,14 +395,17 @@ class TestHumanInputFormSubmissionRepository:
|
||||
recipient = _DummyRecipient(
|
||||
id="recipient-1",
|
||||
form_id=form.id,
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
access_token="token-123",
|
||||
form=form,
|
||||
)
|
||||
session = _FakeSession(scalars_result=recipient)
|
||||
repo = HumanInputFormSubmissionRepository(_session_factory(session))
|
||||
|
||||
record = repo.get_by_form_id_and_recipient_type(form_id=form.id, recipient_type=RecipientType.WEBAPP)
|
||||
record = repo.get_by_form_id_and_recipient_type(
|
||||
form_id=form.id,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
)
|
||||
|
||||
assert record is not None
|
||||
assert record.recipient_id == recipient.id
|
||||
@ -424,7 +427,7 @@ class TestHumanInputFormSubmissionRepository:
|
||||
recipient = _DummyRecipient(
|
||||
id="recipient-1",
|
||||
form_id="form-1",
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
access_token="token-123",
|
||||
)
|
||||
session = _FakeSession(
|
||||
|
||||
@ -38,6 +38,7 @@ class _InMemoryFormEntity(HumanInputFormEntity):
|
||||
form_id: str
|
||||
rendered: str
|
||||
token: str | None = None
|
||||
console_token_value: str | None = None
|
||||
action_id: str | None = None
|
||||
data: Mapping[str, Any] | None = None
|
||||
is_submitted: bool = False
|
||||
@ -50,6 +51,8 @@ class _InMemoryFormEntity(HumanInputFormEntity):
|
||||
|
||||
@property
|
||||
def web_app_token(self) -> str | None:
|
||||
if self.console_token_value is not None:
|
||||
return self.console_token_value
|
||||
return self.token
|
||||
|
||||
@property
|
||||
@ -94,7 +97,13 @@ class InMemoryHumanInputFormRepository(HumanInputFormRepository):
|
||||
self.created_params.append(params)
|
||||
self._form_counter += 1
|
||||
form_id = f"form-{self._form_counter}"
|
||||
entity = _InMemoryFormEntity(form_id=form_id, rendered=params.rendered_content, token=f"token-{form_id}")
|
||||
console_token = f"console-{form_id}" if params.console_recipient_required else None
|
||||
entity = _InMemoryFormEntity(
|
||||
form_id=form_id,
|
||||
rendered=params.rendered_content,
|
||||
token=f"token-{form_id}",
|
||||
console_token_value=console_token,
|
||||
)
|
||||
self.created_forms.append(entity)
|
||||
self._forms_by_key[(params.workflow_execution_id, params.node_id)] = entity
|
||||
return entity
|
||||
|
||||
@ -350,6 +350,61 @@ class TestHumanInputNodeVariableResolution:
|
||||
params = mock_repo.create_form.call_args.args[0]
|
||||
assert params.resolved_placeholder_values == expected_values
|
||||
|
||||
def test_debugger_falls_back_to_recipient_token_when_webapp_disabled(self):
|
||||
variable_pool = VariablePool(
|
||||
system_variables=SystemVariable(
|
||||
user_id="user",
|
||||
app_id="app",
|
||||
workflow_id="workflow",
|
||||
workflow_execution_id="exec-2",
|
||||
),
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
|
||||
graph_init_params = GraphInitParams(
|
||||
tenant_id="tenant",
|
||||
app_id="app",
|
||||
workflow_id="workflow",
|
||||
graph_config={"nodes": [], "edges": []},
|
||||
user_id="user",
|
||||
user_from="account",
|
||||
invoke_from="debugger",
|
||||
call_depth=0,
|
||||
)
|
||||
|
||||
node_data = HumanInputNodeData(
|
||||
title="Human Input",
|
||||
form_content="Provide your name",
|
||||
inputs=[],
|
||||
user_actions=[UserAction(id="submit", title="Submit")],
|
||||
)
|
||||
config = {"id": "human", "data": node_data.model_dump()}
|
||||
|
||||
mock_repo = MagicMock(spec=HumanInputFormRepository)
|
||||
mock_repo.get_form.return_value = None
|
||||
mock_repo.create_form.return_value = SimpleNamespace(
|
||||
id="form-2",
|
||||
rendered_content="Provide your name",
|
||||
web_app_token="console-token",
|
||||
recipients=[SimpleNamespace(token="recipient-token")],
|
||||
submitted=False,
|
||||
)
|
||||
|
||||
node = HumanInputNode(
|
||||
id=config["id"],
|
||||
config=config,
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=runtime_state,
|
||||
form_repository=mock_repo,
|
||||
)
|
||||
|
||||
run_result = node._run()
|
||||
pause_event = next(run_result)
|
||||
|
||||
assert isinstance(pause_event, PauseRequestedEvent)
|
||||
assert pause_event.reason.form_token == "console-token"
|
||||
|
||||
|
||||
class TestValidation:
|
||||
"""Test validation scenarios."""
|
||||
|
||||
@ -48,7 +48,7 @@ class HumanInputForm:
|
||||
user_actions: list[dict[str, Any]]
|
||||
timeout: int
|
||||
timeout_unit: TimeoutUnit
|
||||
web_app_form_token: str | None = None
|
||||
form_token: str | None = None
|
||||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||||
expires_at: datetime | None = None
|
||||
submitted_at: datetime | None = None
|
||||
@ -141,7 +141,7 @@ class InMemoryFormRepository:
|
||||
|
||||
def get_by_token(self, token: str) -> Optional[HumanInputForm]:
|
||||
for form in self._forms.values():
|
||||
if form.web_app_form_token == token:
|
||||
if form.form_token == token:
|
||||
return form
|
||||
return None
|
||||
|
||||
@ -169,7 +169,7 @@ class FormService:
|
||||
user_actions,
|
||||
timeout: int,
|
||||
timeout_unit: TimeoutUnit,
|
||||
web_app_form_token: str | None = None,
|
||||
form_token: str | None = None,
|
||||
) -> HumanInputForm:
|
||||
form = HumanInputForm(
|
||||
form_id=form_id,
|
||||
@ -182,7 +182,7 @@ class FormService:
|
||||
user_actions=[{"id": action.id, "title": action.title} for action in user_actions],
|
||||
timeout=timeout,
|
||||
timeout_unit=timeout_unit,
|
||||
web_app_form_token=web_app_form_token,
|
||||
form_token=form_token,
|
||||
)
|
||||
form.calculate_expiration()
|
||||
self.repository.save(form)
|
||||
|
||||
@ -14,6 +14,7 @@ from core.workflow.nodes.human_input.enums import (
|
||||
FormInputType,
|
||||
TimeoutUnit,
|
||||
)
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
|
||||
from .support import (
|
||||
FormAlreadySubmittedError,
|
||||
@ -53,7 +54,7 @@ class TestFormService:
|
||||
"user_actions": [UserAction(id="submit", title="Submit")],
|
||||
"timeout": 1,
|
||||
"timeout_unit": TimeoutUnit.HOUR,
|
||||
"web_app_form_token": "token-xyz",
|
||||
"form_token": "token-xyz",
|
||||
}
|
||||
|
||||
def test_create_form(self, form_service, sample_form_data):
|
||||
@ -65,7 +66,7 @@ class TestFormService:
|
||||
assert form.node_id == "node-789"
|
||||
assert form.tenant_id == "tenant-abc"
|
||||
assert form.app_id == "app-def"
|
||||
assert form.web_app_form_token == "token-xyz"
|
||||
assert form.form_token == "token-xyz"
|
||||
assert form.timeout == 1
|
||||
assert form.timeout_unit == TimeoutUnit.HOUR
|
||||
assert form.expires_at is not None
|
||||
@ -99,7 +100,7 @@ class TestFormService:
|
||||
retrieved_form = form_service.get_form_by_token("token-xyz")
|
||||
|
||||
assert retrieved_form.form_id == created_form.form_id
|
||||
assert retrieved_form.web_app_form_token == "token-xyz"
|
||||
assert retrieved_form.form_token == "token-xyz"
|
||||
|
||||
def test_get_form_by_token_not_found(self, form_service):
|
||||
"""Test getting non-existent form by token."""
|
||||
@ -261,13 +262,13 @@ class TestFormService:
|
||||
for i in range(3):
|
||||
data = sample_form_data.copy()
|
||||
data["form_id"] = f"form-{i}"
|
||||
data["web_app_form_token"] = f"token-{i}"
|
||||
data["form_token"] = f"token-{i}"
|
||||
form_service.create_form(**data)
|
||||
|
||||
# Manually expire some forms
|
||||
for i in range(2): # Expire first 2 forms
|
||||
form = form_service.get_form_by_id(f"form-{i}")
|
||||
form.expires_at = datetime.utcnow() - timedelta(hours=1)
|
||||
form.expires_at = naive_utc_now() - timedelta(hours=1)
|
||||
form_service.repository.save(form)
|
||||
|
||||
# Clean up expired forms
|
||||
|
||||
@ -35,7 +35,7 @@ class TestHumanInputForm:
|
||||
"user_actions": [UserAction(id="submit", title="Submit")],
|
||||
"timeout": 2,
|
||||
"timeout_unit": TimeoutUnit.HOUR,
|
||||
"web_app_form_token": "token-xyz",
|
||||
"form_token": "token-xyz",
|
||||
}
|
||||
|
||||
def test_form_creation(self, sample_form_data):
|
||||
@ -47,7 +47,7 @@ class TestHumanInputForm:
|
||||
assert form.node_id == "node-789"
|
||||
assert form.tenant_id == "tenant-abc"
|
||||
assert form.app_id == "app-def"
|
||||
assert form.web_app_form_token == "token-xyz"
|
||||
assert form.form_token == "token-xyz"
|
||||
assert form.timeout == 2
|
||||
assert form.timeout_unit == TimeoutUnit.HOUR
|
||||
assert form.created_at is not None
|
||||
@ -148,11 +148,11 @@ class TestHumanInputForm:
|
||||
|
||||
def test_form_without_web_app_token(self, sample_form_data):
|
||||
"""Test form creation without web app token."""
|
||||
sample_form_data["web_app_form_token"] = None
|
||||
sample_form_data["form_token"] = None
|
||||
|
||||
form = HumanInputForm(**sample_form_data)
|
||||
|
||||
assert form.web_app_form_token is None
|
||||
assert form.form_token is None
|
||||
assert form.form_id == "form-123" # Other fields should still work
|
||||
|
||||
def test_form_with_explicit_timestamps(self):
|
||||
|
||||
@ -18,9 +18,8 @@ from core.workflow.nodes.human_input.enums import (
|
||||
HumanInputFormStatus,
|
||||
TimeoutUnit,
|
||||
)
|
||||
from models.account import Account
|
||||
from models.human_input import RecipientType
|
||||
from services.human_input_service import FormSubmittedError, HumanInputService, InvalidFormDataError
|
||||
from services.human_input_service import HumanInputService, InvalidFormDataError
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -60,7 +59,7 @@ def sample_form_record():
|
||||
submission_end_user_id=None,
|
||||
completed_by_recipient_id=None,
|
||||
recipient_id="recipient-id",
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
access_token="token",
|
||||
)
|
||||
|
||||
@ -146,32 +145,18 @@ def test_enqueue_resume_skips_unsupported_app_mode(mocker, mock_session_factory)
|
||||
resume_task.apply_async.assert_not_called()
|
||||
|
||||
|
||||
def test_get_form_definition_by_id_uses_repository(sample_form_record, mock_session_factory):
|
||||
def test_get_form_definition_by_token_for_console_uses_repository(sample_form_record, mock_session_factory):
|
||||
session_factory, _ = mock_session_factory
|
||||
repo = MagicMock(spec=HumanInputFormSubmissionRepository)
|
||||
repo.get_by_form_id_and_recipient_type.return_value = sample_form_record
|
||||
console_record = dataclasses.replace(sample_form_record, recipient_type=RecipientType.CONSOLE)
|
||||
repo.get_by_token.return_value = console_record
|
||||
|
||||
service = HumanInputService(session_factory, form_repository=repo)
|
||||
form = service.get_form_definition_by_id("form-id")
|
||||
form = service.get_form_definition_by_token_for_console("token")
|
||||
|
||||
repo.get_by_form_id_and_recipient_type.assert_called_once_with(
|
||||
form_id="form-id",
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
)
|
||||
repo.get_by_token.assert_called_once_with("token")
|
||||
assert form is not None
|
||||
assert form.get_definition() == sample_form_record.definition
|
||||
|
||||
|
||||
def test_get_form_definition_by_id_raises_on_submitted(sample_form_record, mock_session_factory):
|
||||
session_factory, _ = mock_session_factory
|
||||
submitted_record = dataclasses.replace(sample_form_record, submitted_at=datetime(2024, 1, 1))
|
||||
repo = MagicMock(spec=HumanInputFormSubmissionRepository)
|
||||
repo.get_by_form_id_and_recipient_type.return_value = submitted_record
|
||||
|
||||
service = HumanInputService(session_factory, form_repository=repo)
|
||||
|
||||
with pytest.raises(FormSubmittedError):
|
||||
service.get_form_definition_by_id("form-id")
|
||||
assert form.get_definition() == console_record.definition
|
||||
|
||||
|
||||
def test_submit_form_by_token_calls_repository_and_enqueue(sample_form_record, mock_session_factory, mocker):
|
||||
@ -183,7 +168,7 @@ def test_submit_form_by_token_calls_repository_and_enqueue(sample_form_record, m
|
||||
enqueue_spy = mocker.patch.object(service, "_enqueue_resume")
|
||||
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
form_token="token",
|
||||
selected_action_id="submit",
|
||||
form_data={"field": "value"},
|
||||
@ -201,26 +186,25 @@ def test_submit_form_by_token_calls_repository_and_enqueue(sample_form_record, m
|
||||
enqueue_spy.assert_called_once_with(sample_form_record.workflow_run_id)
|
||||
|
||||
|
||||
def test_submit_form_by_id_passes_account(sample_form_record, mock_session_factory, mocker):
|
||||
def test_submit_form_by_token_passes_submission_user_id(sample_form_record, mock_session_factory, mocker):
|
||||
session_factory, _ = mock_session_factory
|
||||
repo = MagicMock(spec=HumanInputFormSubmissionRepository)
|
||||
repo.get_by_form_id_and_recipient_type.return_value = sample_form_record
|
||||
repo.get_by_token.return_value = sample_form_record
|
||||
repo.mark_submitted.return_value = sample_form_record
|
||||
service = HumanInputService(session_factory, form_repository=repo)
|
||||
enqueue_spy = mocker.patch.object(service, "_enqueue_resume")
|
||||
account = MagicMock(spec=Account)
|
||||
account.id = "account-id"
|
||||
|
||||
service.submit_form_by_id(
|
||||
form_id="form-id",
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
form_token="token",
|
||||
selected_action_id="submit",
|
||||
form_data={"x": 1},
|
||||
user=account,
|
||||
form_data={"field": "value"},
|
||||
submission_user_id="account-id",
|
||||
)
|
||||
|
||||
repo.get_by_form_id_and_recipient_type.assert_called_once()
|
||||
repo.mark_submitted.assert_called_once()
|
||||
assert repo.mark_submitted.call_args.kwargs["submission_user_id"] == "account-id"
|
||||
call_kwargs = repo.mark_submitted.call_args.kwargs
|
||||
assert call_kwargs["submission_user_id"] == "account-id"
|
||||
assert call_kwargs["submission_end_user_id"] is None
|
||||
enqueue_spy.assert_called_once_with(sample_form_record.workflow_run_id)
|
||||
|
||||
|
||||
@ -232,7 +216,7 @@ def test_submit_form_by_token_invalid_action(sample_form_record, mock_session_fa
|
||||
|
||||
with pytest.raises(InvalidFormDataError) as exc_info:
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
form_token="token",
|
||||
selected_action_id="invalid",
|
||||
form_data={},
|
||||
@ -260,7 +244,7 @@ def test_submit_form_by_token_missing_inputs(sample_form_record, mock_session_fa
|
||||
|
||||
with pytest.raises(InvalidFormDataError) as exc_info:
|
||||
service.submit_form_by_token(
|
||||
recipient_type=RecipientType.WEBAPP,
|
||||
recipient_type=RecipientType.STANDALONE_WEB_APP,
|
||||
form_token="token",
|
||||
selected_action_id="submit",
|
||||
form_data={},
|
||||
|
||||
Reference in New Issue
Block a user