From 2eb37caf2ec2b8d0ca92e3d091397cbb2d137aa5 Mon Sep 17 00:00:00 2001 From: chariri Date: Mon, 18 May 2026 16:31:37 +0900 Subject: [PATCH] refactor(api): migrate console.app.workflow to BaseModel (#36216) Co-authored-by: WH-2099 Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/AGENTS.md | 2 +- api/controllers/API_SCHEMA_GUIDE.md | 34 ++- api/controllers/console/app/workflow.py | 284 ++++++++++++++---- .../rag_pipeline/rag_pipeline_workflow.py | 58 ++-- api/fields/online_user_fields.py | 16 - api/openapi/markdown/console-swagger.md | 161 ++++++---- .../test_rag_pipeline_workflow.py | 109 ++++++- .../controllers/console/app/test_workflow.py | 273 +++++++++++++++-- .../test_rag_pipeline_workflow.py | 150 +++++++++ .../generated/api/console/apps/types.gen.ts | 130 ++++---- .../generated/api/console/apps/zod.gen.ts | 139 ++++++--- .../generated/api/console/rag/types.gen.ts | 111 ++++++- .../generated/api/console/rag/zod.gen.ts | 95 +++++- .../__tests__/use-pipeline-config.spec.ts | 4 +- .../rag-pipeline/hooks/use-pipeline-config.ts | 4 +- .../workflow-app/hooks/use-workflow-init.ts | 2 +- web/service/use-pipeline.ts | 4 +- web/service/use-workflow.ts | 4 +- web/service/workflow.ts | 2 +- 19 files changed, 1241 insertions(+), 341 deletions(-) delete mode 100644 api/fields/online_user_fields.py create mode 100644 api/tests/unit_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py diff --git a/api/AGENTS.md b/api/AGENTS.md index eb4404509d..7cd60b0281 100644 --- a/api/AGENTS.md +++ b/api/AGENTS.md @@ -195,7 +195,7 @@ Before opening a PR / submitting: - Document non-obvious behaviour with concise docstrings and comments. - For Flask-RESTX controller request, query, and response schemas, follow `controllers/API_SCHEMA_GUIDE.md`. In short: use Pydantic models, document GET query params with `query_params_from_model(...)`, register response - DTOs with `register_response_schema_models(...)`, serialize with `ResponseModel.model_validate(...).model_dump(...)`, + DTOs with `register_response_schema_models(...)`, serialize response DTOs with `dump_response(...)`, and avoid adding new legacy `ns.model(...)`, `@marshal_with(...)`, or GET `@ns.expect(...)` patterns. ### Miscellaneous diff --git a/api/controllers/API_SCHEMA_GUIDE.md b/api/controllers/API_SCHEMA_GUIDE.md index 5b1b055b09..6cfbab4b1c 100644 --- a/api/controllers/API_SCHEMA_GUIDE.md +++ b/api/controllers/API_SCHEMA_GUIDE.md @@ -34,6 +34,7 @@ from controllers.common.schema import ( register_response_schema_models, register_schema_models, ) +from libs.helper import dump_response ``` Register request payload and query models with `register_schema_models(...)`: @@ -82,7 +83,7 @@ register_schema_models(console_ns, DraftWorkflowNodeRunPayload) def post(self, app_model: App, node_id: str): payload = DraftWorkflowNodeRunPayload.model_validate(console_ns.payload or {}) result = service.run(..., inputs=payload.inputs, query=payload.query) - return WorkflowRunNodeExecutionResponse.model_validate(result, from_attributes=True).model_dump(mode="json") + return dump_response(WorkflowRunNodeExecutionResponse, result) ``` ## Query Parameters @@ -105,7 +106,7 @@ class WorkflowRunListQuery(BaseModel): def get(self, app_model: App): query = WorkflowRunListQuery.model_validate(request.args.to_dict(flat=True)) result = service.list(..., limit=query.limit, last_id=query.last_id) - return WorkflowRunPaginationResponse.model_validate(result, from_attributes=True).model_dump(mode="json") + return dump_response(WorkflowRunPaginationResponse, result) ``` Do not do this for GET query parameters: @@ -145,10 +146,25 @@ def post(...): Serialize explicitly: ```python -return WorkflowRunNodeExecutionResponse.model_validate( - workflow_node_execution, - from_attributes=True, -).model_dump(mode="json") +return dump_response(WorkflowRunNodeExecutionResponse, workflow_node_execution) +``` + +`dump_response(...)` is the preferred response serialization helper for a single Pydantic response DTO. It validates +with `from_attributes=True` and returns `model_dump(mode="json")`, so SQLAlchemy models, plain objects, dictionaries, +Pydantic aliases, computed fields, and `datetime` values are serialized consistently. + +For wrapper responses, pass a dictionary with the public wrapper fields: + +```python +return dump_response( + WorkflowRunPaginationResponse, + { + "data": workflow_runs, + "page": page, + "limit": limit, + "has_more": has_more, + }, +) ``` If the service can return `None`, translate that into the expected HTTP error before validation: @@ -158,9 +174,12 @@ workflow_run = service.get_workflow_run(...) if workflow_run is None: raise NotFound("Workflow run not found") -return WorkflowRunDetailResponse.model_validate(workflow_run, from_attributes=True).model_dump(mode="json") +return dump_response(WorkflowRunDetailResponse, workflow_run) ``` +Use manual `model_validate(...).model_dump(...)` only when the endpoint needs behavior that `dump_response(...)` does +not provide, such as returning a non-dict payload, intentionally excluding fields, or composing a `(body, status)` tuple. + ## Legacy Flask-RESTX Patterns Avoid adding these patterns to new or migrated endpoints: @@ -190,4 +209,3 @@ Inspect affected endpoints with `jq`. Check that: - Request bodies appear only where the endpoint has a body. - Responses reference the expected `*Response` schema. - Response schemas use public serialized names, not internal validation aliases like `inputs_dict`. - diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 4f532b437c..8d065ece67 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1,17 +1,22 @@ import json import logging from collections.abc import Sequence -from typing import Any +from datetime import datetime +from typing import Any, NotRequired, TypedDict from flask import abort, request -from flask_restx import Resource, fields, marshal, marshal_with -from pydantic import BaseModel, Field, ValidationError, field_validator +from flask_restx import Resource, fields +from pydantic import AliasChoices, BaseModel, Field, ValidationError, field_validator from sqlalchemy.orm import sessionmaker from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound import services from controllers.common.controller_schemas import DefaultBlockConfigQuery, WorkflowListQuery, WorkflowUpdatePayload -from controllers.common.schema import register_response_schema_model, register_schema_models +from controllers.common.schema import ( + register_response_schema_model, + register_response_schema_models, + register_schema_models, +) from controllers.console import console_ns from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync from controllers.console.app.wraps import get_app_model @@ -22,6 +27,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY from core.app.entities.app_invoke_entities import InvokeFrom from core.app.file_access import DatabaseFileAccessController +from core.helper import encrypter from core.helper.trace_id_helper import get_external_trace_id from core.plugin.impl.exc import PluginInvokeError from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE @@ -34,18 +40,18 @@ from core.trigger.debug.event_selectors import ( from extensions.ext_database import db from extensions.ext_redis import redis_client from factories import file_factory, variable_factory -from fields.member_fields import simple_account_fields -from fields.online_user_fields import online_user_list_fields -from fields.workflow_fields import workflow_fields, workflow_pagination_fields +from fields.base import ResponseModel +from fields.member_fields import SimpleAccount from fields.workflow_run_fields import WorkflowRunNodeExecutionResponse from graphon.enums import NodeType from graphon.file import File from graphon.file import helpers as file_helpers from graphon.graph_engine.manager import GraphEngineManager from graphon.model_runtime.utils.encoders import jsonable_encoder +from graphon.variables import SecretVariable, SegmentType, VariableBase from libs import helper from libs.datetime_utils import naive_utc_now -from libs.helper import TimestampField, uuid_value +from libs.helper import TimestampField, dump_response, to_timestamp, uuid_value from libs.login import current_account_with_tenant, login_required from models import App from models.model import AppMode @@ -64,42 +70,15 @@ LISTENING_RETRY_IN = 2000 RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published" MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS = 1000 WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE = 50 +ENVIRONMENT_VARIABLE_SUPPORTED_TYPES = (SegmentType.STRING, SegmentType.NUMBER, SegmentType.SECRET) -# Register models for flask_restx to avoid dict type issues in Swagger -# Register in dependency order: base models first, then dependent models -# Base models -simple_account_model = console_ns.model("SimpleAccount", simple_account_fields) - -from fields.workflow_fields import pipeline_variable_fields, serialize_value_type - -conversation_variable_model = console_ns.model( - "ConversationVariable", - { - "id": fields.String, - "name": fields.String, - "value_type": fields.String(attribute=serialize_value_type), - "value": fields.Raw, - "description": fields.String, - }, -) - -pipeline_variable_model = console_ns.model("PipelineVariable", pipeline_variable_fields) - -# Workflow model with nested dependencies -workflow_fields_copy = workflow_fields.copy() -workflow_fields_copy["created_by"] = fields.Nested(simple_account_model, attribute="created_by_account") -workflow_fields_copy["updated_by"] = fields.Nested( - simple_account_model, attribute="updated_by_account", allow_null=True -) -workflow_fields_copy["conversation_variables"] = fields.List(fields.Nested(conversation_variable_model)) -workflow_fields_copy["rag_pipeline_variables"] = fields.List(fields.Nested(pipeline_variable_model)) -workflow_model = console_ns.model("Workflow", workflow_fields_copy) - -# Workflow pagination model -workflow_pagination_fields_copy = workflow_pagination_fields.copy() -workflow_pagination_fields_copy["items"] = fields.List(fields.Nested(workflow_model), attribute="items") -workflow_pagination_model = console_ns.model("WorkflowPagination", workflow_pagination_fields_copy) +class EnvironmentVariableResponseDict(TypedDict): + value_type: str + id: NotRequired[str] + name: NotRequired[str] + value: NotRequired[Any] + description: NotRequired[str | None] class SyncDraftWorkflowPayload(BaseModel): @@ -170,6 +149,110 @@ class WorkflowOnlineUsersPayload(BaseModel): return list(dict.fromkeys(app_id.strip() for app_id in app_ids if app_id.strip())) +class WorkflowConversationVariableResponse(ResponseModel): + id: str + name: str + value_type: str + value: Any = Field(json_schema_extra={"type": "object"}) + description: str + + @field_validator("value_type", mode="before") + @classmethod + def _serialize_value_type(cls, value: Any) -> str: + if hasattr(value, "exposed_type"): + return str(value.exposed_type()) + return str(value) + + +class PipelineVariableResponse(ResponseModel): + label: str + variable: str + type: str + belong_to_node_id: str + max_length: int | None = None + required: bool + unit: str | None = None + default_value: Any = Field(default=None, json_schema_extra={"type": "object"}) + options: list[str] | None = None + placeholder: str | None = None + tooltips: str | None = None + allowed_file_types: list[str] | None = None + allowed_file_extensions: list[str] | None = Field( + default=None, validation_alias=AliasChoices("allowed_file_extensions", "allow_file_extension") + ) + allowed_file_upload_methods: list[str] | None = Field( + default=None, validation_alias=AliasChoices("allowed_file_upload_methods", "allow_file_upload_methods") + ) + + +class WorkflowEnvironmentVariableResponse(ResponseModel): + value_type: str + id: str + name: str + value: Any = Field(json_schema_extra={"type": "object"}) + description: str + + +class WorkflowResponse(ResponseModel): + id: str + graph: dict[str, Any] = Field(validation_alias=AliasChoices("graph_dict", "graph")) + features: dict[str, Any] = Field(validation_alias=AliasChoices("features_dict", "features")) + hash: str = Field(validation_alias=AliasChoices("unique_hash", "hash")) + version: str + marked_name: str + marked_comment: str + created_by: SimpleAccount | None = Field( + default=None, validation_alias=AliasChoices("created_by_account", "created_by") + ) + created_at: int + updated_by: SimpleAccount | None = Field( + default=None, validation_alias=AliasChoices("updated_by_account", "updated_by") + ) + updated_at: int + tool_published: bool + environment_variables: list[WorkflowEnvironmentVariableResponse] + conversation_variables: list[WorkflowConversationVariableResponse] + rag_pipeline_variables: list[PipelineVariableResponse] + + @field_validator("created_at", "updated_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int: + timestamp = to_timestamp(value) + if timestamp is None: + raise ValueError("timestamp is required") + return timestamp + + @field_validator("environment_variables", mode="before") + @classmethod + def _serialize_environment_variables(cls, value: Any) -> list[Any]: + if value is None: + return [] + + return [_serialize_environment_variable(item) for item in value] + + +class WorkflowPaginationResponse(ResponseModel): + items: list[WorkflowResponse] + page: int + limit: int + has_more: bool + + +class WorkflowOnlineUser(ResponseModel): + user_id: str + username: str + avatar: str | None = None + + +class WorkflowOnlineUsersByApp(ResponseModel): + app_id: str + users: list[WorkflowOnlineUser] + + +class WorkflowOnlineUsersResponse(ResponseModel): + data: list[WorkflowOnlineUsersByApp] + + class DraftWorkflowTriggerRunPayload(BaseModel): node_id: str @@ -197,6 +280,17 @@ register_schema_models( DraftWorkflowTriggerRunAllPayload, ) register_response_schema_model(console_ns, WorkflowRunNodeExecutionResponse) +register_response_schema_models( + console_ns, + WorkflowConversationVariableResponse, + PipelineVariableResponse, + WorkflowEnvironmentVariableResponse, + WorkflowResponse, + WorkflowPaginationResponse, + WorkflowOnlineUser, + WorkflowOnlineUsersByApp, + WorkflowOnlineUsersResponse, +) # TODO(QuantumGhost): Refactor existing node run API to handle file parameter parsing @@ -218,18 +312,56 @@ def _parse_file(workflow: Workflow, files: list[dict] | None = None) -> Sequence return file_objs +def _serialize_environment_variable(value: Any) -> EnvironmentVariableResponseDict | Any: + match value: + case SecretVariable(): + return { + "id": value.id, + "name": value.name, + "value": encrypter.full_mask_token(), + "value_type": value.value_type.value, + "description": value.description, + } + + case VariableBase(): + return { + "id": value.id, + "name": value.name, + "value": value.value, + "value_type": str(value.value_type.exposed_type()), + "description": value.description, + } + + case dict(): + value_type_str = value.get("value_type") + if not isinstance(value_type_str, str): + raise TypeError( + f"unexpected type for value_type field, value={value_type_str}, type={type(value_type_str)}" + ) + value_type = SegmentType(value_type_str).exposed_type() + if value_type not in ENVIRONMENT_VARIABLE_SUPPORTED_TYPES: + raise ValueError(f"Unsupported environment variable value type: {value_type}") + return value + + case _: + return value + + @console_ns.route("/apps//workflows/draft") class DraftWorkflowApi(Resource): @console_ns.doc("get_draft_workflow") @console_ns.doc(description="Get draft workflow for an application") @console_ns.doc(params={"app_id": "Application ID"}) - @console_ns.response(200, "Draft workflow retrieved successfully", workflow_model) + @console_ns.response( + 200, + "Draft workflow retrieved successfully", + console_ns.models[WorkflowResponse.__name__], + ) @console_ns.response(404, "Draft workflow not found") @setup_required @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(workflow_model) @edit_permission_required def get(self, app_model: App): """ @@ -242,8 +374,8 @@ class DraftWorkflowApi(Resource): if not workflow: raise DraftWorkflowNotExist() - # return workflow, if not found, return None (initiate graph by frontend) - return workflow + # return workflow, if not found, return 404 + return dump_response(WorkflowResponse, workflow) @setup_required @login_required @@ -817,13 +949,15 @@ class PublishedWorkflowApi(Resource): @console_ns.doc("get_published_workflow") @console_ns.doc(description="Get published workflow for an application") @console_ns.doc(params={"app_id": "Application ID"}) - @console_ns.response(200, "Published workflow retrieved successfully", workflow_model) - @console_ns.response(404, "Published workflow not found") + @console_ns.response( + 200, + "Published workflow retrieved successfully, or null if not found", + console_ns.models[WorkflowResponse.__name__], + ) @setup_required @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(workflow_model) @edit_permission_required def get(self, app_model: App): """ @@ -834,7 +968,10 @@ class PublishedWorkflowApi(Resource): workflow = workflow_service.get_published_workflow(app_model=app_model) # return workflow, if not found, return None - return workflow + if workflow is None: + return None + + return dump_response(WorkflowResponse, workflow) @console_ns.expect(console_ns.models[PublishWorkflowPayload.__name__]) @setup_required @@ -993,7 +1130,11 @@ class PublishedAllWorkflowApi(Resource): @console_ns.doc("get_all_published_workflows") @console_ns.doc(description="Get all published workflows for an application") @console_ns.doc(params={"app_id": "Application ID"}) - @console_ns.response(200, "Published workflows retrieved successfully", workflow_pagination_model) + @console_ns.response( + 200, + "Published workflows retrieved successfully", + console_ns.models[WorkflowPaginationResponse.__name__], + ) @setup_required @login_required @account_initialization_required @@ -1025,14 +1166,14 @@ class PublishedAllWorkflowApi(Resource): user_id=user_id, named_only=named_only, ) - serialized_workflows = marshal(workflows, workflow_fields_copy) - - return { - "items": serialized_workflows, - "page": page, - "limit": limit, - "has_more": has_more, - } + return WorkflowPaginationResponse.model_validate( + { + "items": workflows, + "page": page, + "limit": limit, + "has_more": has_more, + } + ).model_dump(mode="json") @console_ns.route("/apps//workflows//restore") @@ -1078,14 +1219,13 @@ class WorkflowByIdApi(Resource): @console_ns.doc(description="Update workflow by ID") @console_ns.doc(params={"app_id": "Application ID", "workflow_id": "Workflow ID"}) @console_ns.expect(console_ns.models[WorkflowUpdatePayload.__name__]) - @console_ns.response(200, "Workflow updated successfully", workflow_model) + @console_ns.response(200, "Workflow updated successfully", console_ns.models[WorkflowResponse.__name__]) @console_ns.response(404, "Workflow not found") @console_ns.response(403, "Permission denied") @setup_required @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(workflow_model) @edit_permission_required def patch(self, app_model: App, workflow_id: str): """ @@ -1119,7 +1259,7 @@ class WorkflowByIdApi(Resource): if not workflow: raise NotFound("Workflow not found") - return workflow + return dump_response(WorkflowResponse, workflow) @setup_required @login_required @@ -1404,12 +1544,16 @@ class DraftWorkflowTriggerRunAllApi(Resource): @console_ns.route("/apps/workflows/online-users") class WorkflowOnlineUsersApi(Resource): @console_ns.expect(console_ns.models[WorkflowOnlineUsersPayload.__name__]) + @console_ns.response( + 200, + "Workflow online users retrieved successfully", + console_ns.models[WorkflowOnlineUsersResponse.__name__], + ) @console_ns.doc("get_workflow_online_users") @console_ns.doc(description="Get workflow online users") @setup_required @login_required @account_initialization_required - @marshal_with(online_user_list_fields) def post(self): args = WorkflowOnlineUsersPayload.model_validate(console_ns.payload or {}) @@ -1452,10 +1596,18 @@ class WorkflowOnlineUsersApi(Resource): if not isinstance(user_info, dict): continue + user_id = user_info.get("user_id") + username = user_info.get("username") + if not isinstance(user_id, str) or not isinstance(username, str): + continue + avatar = user_info.get("avatar") + if avatar is not None and not isinstance(avatar, str): + avatar = None + if isinstance(avatar, str) and avatar and not avatar.startswith(("http://", "https://")): try: - user_info["avatar"] = file_helpers.get_signed_file_url(avatar) + avatar = file_helpers.get_signed_file_url(avatar) except Exception as exc: logger.warning( "Failed to sign workflow online user avatar; using original value. " @@ -1465,7 +1617,7 @@ class WorkflowOnlineUsersApi(Resource): exc, ) - users.append(user_info) + users.append({"user_id": user_id, "username": username, "avatar": avatar}) results.append({"app_id": app_id, "users": users}) - return {"data": results} + return WorkflowOnlineUsersResponse.model_validate({"data": results}).model_dump(mode="json") diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index 8eff32c555..77dbf0be3f 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -3,7 +3,7 @@ import logging from typing import Any, Literal, cast from flask import abort, request -from flask_restx import Resource, marshal_with # type: ignore +from flask_restx import Resource from pydantic import BaseModel, Field, ValidationError from sqlalchemy.orm import sessionmaker from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound @@ -19,8 +19,8 @@ from controllers.console.app.error import ( ) from controllers.console.app.workflow import ( RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE, - workflow_model, - workflow_pagination_model, + WorkflowPaginationResponse, + WorkflowResponse, ) from controllers.console.datasets.wraps import get_rag_pipeline from controllers.console.wraps import ( @@ -42,7 +42,7 @@ from fields.workflow_run_fields import ( ) from graphon.model_runtime.utils.encoders import jsonable_encoder from libs import helper -from libs.helper import TimestampField, UUIDStrOrEmpty +from libs.helper import TimestampField, UUIDStrOrEmpty, dump_response from libs.login import current_account_with_tenant, current_user, login_required from models import Account from models.dataset import Pipeline @@ -142,12 +142,17 @@ register_response_schema_models( @console_ns.route("/rag/pipelines//workflows/draft") class DraftRagPipelineApi(Resource): + @console_ns.response( + 200, + "Draft workflow retrieved successfully", + console_ns.models[WorkflowResponse.__name__], + ) + @console_ns.response(404, "Draft workflow not found") @setup_required @login_required @account_initialization_required @get_rag_pipeline @edit_permission_required - @marshal_with(workflow_model) def get(self, pipeline: Pipeline): """ Get draft rag pipeline's workflow @@ -159,8 +164,8 @@ class DraftRagPipelineApi(Resource): if not workflow: raise DraftWorkflowNotExist() - # return workflow, if not found, return None (initiate graph by frontend) - return workflow + # return workflow, if not found, return 404 + return dump_response(WorkflowResponse, workflow) @setup_required @login_required @@ -476,12 +481,16 @@ class RagPipelineTaskStopApi(Resource): @console_ns.route("/rag/pipelines//workflows/publish") class PublishedRagPipelineApi(Resource): + @console_ns.response( + 200, + "Published workflow retrieved successfully, or null if not exist", + console_ns.models[WorkflowResponse.__name__], + ) @setup_required @login_required @account_initialization_required @edit_permission_required @get_rag_pipeline - @marshal_with(workflow_model) def get(self, pipeline: Pipeline): """ Get published pipeline @@ -494,7 +503,10 @@ class PublishedRagPipelineApi(Resource): workflow = rag_pipeline_service.get_published_workflow(pipeline=pipeline) # return workflow, if not found, return None - return workflow + if workflow is None: + return None + + return dump_response(WorkflowResponse, workflow) @setup_required @login_required @@ -567,12 +579,17 @@ class DefaultRagPipelineBlockConfigApi(Resource): @console_ns.route("/rag/pipelines//workflows") class PublishedAllRagPipelineApi(Resource): + @console_ns.response( + 200, + "Published workflows retrieved successfully", + console_ns.models[WorkflowPaginationResponse.__name__], + ) + @console_ns.response(403, "Permission denied") @setup_required @login_required @account_initialization_required @edit_permission_required @get_rag_pipeline - @marshal_with(workflow_pagination_model) def get(self, pipeline: Pipeline): """ Get published workflows @@ -601,12 +618,14 @@ class PublishedAllRagPipelineApi(Resource): named_only=named_only, ) - return { - "items": workflows, - "page": page, - "limit": limit, - "has_more": has_more, - } + return WorkflowPaginationResponse.model_validate( + { + "items": workflows, + "page": page, + "limit": limit, + "has_more": has_more, + } + ).model_dump(mode="json") @console_ns.route("/rag/pipelines//workflows//restore") @@ -641,12 +660,15 @@ class RagPipelineDraftWorkflowRestoreApi(Resource): @console_ns.route("/rag/pipelines//workflows/") class RagPipelineByIdApi(Resource): + @console_ns.response(200, "Workflow updated successfully", console_ns.models[WorkflowResponse.__name__]) + @console_ns.response(400, "No valid fields to update") + @console_ns.response(403, "Permission denied") + @console_ns.response(404, "Workflow not found") @setup_required @login_required @account_initialization_required @edit_permission_required @get_rag_pipeline - @marshal_with(workflow_model) def patch(self, pipeline: Pipeline, workflow_id: str): """ Update workflow attributes @@ -675,7 +697,7 @@ class RagPipelineByIdApi(Resource): if not workflow: raise NotFound("Workflow not found") - return workflow + return dump_response(WorkflowResponse, workflow) @setup_required @login_required diff --git a/api/fields/online_user_fields.py b/api/fields/online_user_fields.py deleted file mode 100644 index bdbe19679c..0000000000 --- a/api/fields/online_user_fields.py +++ /dev/null @@ -1,16 +0,0 @@ -from flask_restx import fields - -online_user_partial_fields = { - "user_id": fields.String, - "username": fields.String, - "avatar": fields.String, -} - -workflow_online_users_fields = { - "app_id": fields.String, - "users": fields.List(fields.Nested(online_user_partial_fields)), -} - -online_user_list_fields = { - "data": fields.List(fields.Nested(workflow_online_users_fields)), -} diff --git a/api/openapi/markdown/console-swagger.md b/api/openapi/markdown/console-swagger.md index 098acd8376..dc055c5823 100644 --- a/api/openapi/markdown/console-swagger.md +++ b/api/openapi/markdown/console-swagger.md @@ -607,9 +607,9 @@ Get workflow online users ##### Responses -| Code | Description | -| ---- | ----------- | -| 200 | Success | +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Workflow online users retrieved successfully | [WorkflowOnlineUsersResponse](#workflowonlineusersresponse) | ### /apps/{app_id} @@ -2720,7 +2720,7 @@ Get all published workflows for an application | Code | Description | Schema | | ---- | ----------- | ------ | -| 200 | Published workflows retrieved successfully | [WorkflowPagination](#workflowpagination) | +| 200 | Published workflows retrieved successfully | [WorkflowPaginationResponse](#workflowpaginationresponse) | ### /apps/{app_id}/workflows/default-workflow-block-configs @@ -2792,7 +2792,7 @@ Get draft workflow for an application | Code | Description | Schema | | ---- | ----------- | ------ | -| 200 | Draft workflow retrieved successfully | [Workflow](#workflow) | +| 200 | Draft workflow retrieved successfully | [WorkflowResponse](#workflowresponse) | | 404 | Draft workflow not found | | #### POST @@ -3403,8 +3403,7 @@ Get published workflow for an application | Code | Description | Schema | | ---- | ----------- | ------ | -| 200 | Published workflow retrieved successfully | [Workflow](#workflow) | -| 404 | Published workflow not found | | +| 200 | Published workflow retrieved successfully, or null if not found | [WorkflowResponse](#workflowresponse) | #### POST ##### Summary @@ -3485,7 +3484,7 @@ Update workflow by ID | Code | Description | Schema | | ---- | ----------- | ------ | -| 200 | Workflow updated successfully | [Workflow](#workflow) | +| 200 | Workflow updated successfully | [WorkflowResponse](#workflowresponse) | | 403 | Permission denied | | | 404 | Workflow not found | | @@ -6685,9 +6684,10 @@ Get published workflows ##### Responses -| Code | Description | -| ---- | ----------- | -| 200 | Success | +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Published workflows retrieved successfully | [WorkflowPaginationResponse](#workflowpaginationresponse) | +| 403 | Permission denied | | ### /rag/pipelines/{pipeline_id}/workflows/default-workflow-block-configs @@ -6743,9 +6743,10 @@ Get draft rag pipeline's workflow ##### Responses -| Code | Description | -| ---- | ----------- | -| 200 | Success | +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Draft workflow retrieved successfully | [WorkflowResponse](#workflowresponse) | +| 404 | Draft workflow not found | | #### POST ##### Summary @@ -7105,9 +7106,9 @@ Get published pipeline ##### Responses -| Code | Description | -| ---- | ----------- | -| 200 | Success | +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Published workflow retrieved successfully, or null if not exist | [WorkflowResponse](#workflowresponse) | #### POST ##### Summary @@ -7260,9 +7261,12 @@ Update workflow attributes ##### Responses -| Code | Description | -| ---- | ----------- | -| 200 | Success | +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Workflow updated successfully | [WorkflowResponse](#workflowresponse) | +| 400 | No valid fields to update | | +| 403 | Permission denied | | +| 404 | Workflow not found | | ### /rag/pipelines/{pipeline_id}/workflows/{workflow_id}/restore @@ -10955,16 +10959,6 @@ Condition detail | auto_generate | boolean | | No | | name | string | | No | -#### ConversationVariable - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| description | string | | No | -| id | string | | No | -| name | string | | No | -| value | object | | No | -| value_type | string | | No | - #### ConversationVariableResponse | Name | Type | Description | Required | @@ -12993,24 +12987,24 @@ Form input definition. | icon_info | object | | No | | name | string | | Yes | -#### PipelineVariable +#### PipelineVariableResponse | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| allow_file_extension | [ string ] | | No | -| allow_file_upload_methods | [ string ] | | No | +| allowed_file_extensions | [ string ] | | No | | allowed_file_types | [ string ] | | No | -| belong_to_node_id | string | | No | +| allowed_file_upload_methods | [ string ] | | No | +| belong_to_node_id | string | | Yes | | default_value | object | | No | -| label | string | | No | +| label | string | | Yes | | max_length | integer | | No | | options | [ string ] | | No | | placeholder | string | | No | -| required | boolean | | No | +| required | boolean | | Yes | | tooltips | string | | No | -| type | string | | No | +| type | string | | Yes | | unit | string | | No | -| variable | string | | No | +| variable | string | | Yes | #### PluginAutoUpgradeSettingsPayload @@ -13906,26 +13900,6 @@ in form definiton, or a variable while the workflow is running. | embedding_provider_name | string | | Yes | | vector_weight | number | | Yes | -#### Workflow - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| conversation_variables | [ [ConversationVariable](#conversationvariable) ] | | No | -| created_at | object | | No | -| created_by | [SimpleAccount](#simpleaccount) | | No | -| environment_variables | [ object ] | | No | -| features | object | | No | -| graph | object | | No | -| hash | string | | No | -| id | string | | No | -| marked_comment | string | | No | -| marked_name | string | | No | -| rag_pipeline_variables | [ [PipelineVariable](#pipelinevariable) ] | | No | -| tool_published | boolean | | No | -| updated_at | object | | No | -| updated_by | [SimpleAccount](#simpleaccount) | | No | -| version | string | | No | - #### WorkflowAppLogPaginationResponse | Name | Type | Description | Required | @@ -14124,6 +14098,16 @@ in form definiton, or a variable while the workflow is running. | position_x | number | Comment X position | No | | position_y | number | Comment Y position | No | +#### WorkflowConversationVariableResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| description | string | | Yes | +| id | string | | Yes | +| name | string | | Yes | +| value | object | | Yes | +| value_type | string | | Yes | + #### WorkflowDraftEnvVariable | Name | Type | Description | Required | @@ -14207,6 +14191,16 @@ in form definiton, or a variable while the workflow is running. | value_type | string | | No | | visible | boolean | | No | +#### WorkflowEnvironmentVariableResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| description | string | | Yes | +| id | string | | Yes | +| name | string | | Yes | +| value | object | | Yes | +| value_type | string | | Yes | + #### WorkflowExecutionStatus | Name | Type | Description | Required | @@ -14228,20 +14222,41 @@ in form definiton, or a variable while the workflow is running. | page | integer | | No | | user_id | string | | No | +#### WorkflowOnlineUser + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| avatar | string | | No | +| user_id | string | | Yes | +| username | string | | Yes | + +#### WorkflowOnlineUsersByApp + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| app_id | string | | Yes | +| users | [ [WorkflowOnlineUser](#workflowonlineuser) ] | | Yes | + #### WorkflowOnlineUsersPayload | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | app_ids | [ string ] | App IDs | No | -#### WorkflowPagination +#### WorkflowOnlineUsersResponse | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| has_more | boolean | | No | -| items | [ [Workflow](#workflow) ] | | No | -| limit | integer | | No | -| page | integer | | No | +| data | [ [WorkflowOnlineUsersByApp](#workflowonlineusersbyapp) ] | | Yes | + +#### WorkflowPaginationResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| has_more | boolean | | Yes | +| items | [ [WorkflowResponse](#workflowresponse) ] | | Yes | +| limit | integer | | Yes | +| page | integer | | Yes | #### WorkflowPartial @@ -14260,6 +14275,26 @@ in form definiton, or a variable while the workflow is running. | paused_at | string | | No | | paused_nodes | [ [PausedNodeResponse](#pausednoderesponse) ] | | Yes | +#### WorkflowResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| conversation_variables | [ [WorkflowConversationVariableResponse](#workflowconversationvariableresponse) ] | | Yes | +| created_at | integer | | Yes | +| created_by | [SimpleAccount](#simpleaccount) | | No | +| environment_variables | [ [WorkflowEnvironmentVariableResponse](#workflowenvironmentvariableresponse) ] | | Yes | +| features | object | | Yes | +| graph | object | | Yes | +| hash | string | | Yes | +| id | string | | Yes | +| marked_comment | string | | Yes | +| marked_name | string | | Yes | +| rag_pipeline_variables | [ [PipelineVariableResponse](#pipelinevariableresponse) ] | | Yes | +| tool_published | boolean | | Yes | +| updated_at | integer | | Yes | +| updated_by | [SimpleAccount](#simpleaccount) | | No | +| version | string | | Yes | + #### WorkflowRunCountQuery | Name | Type | Description | Required | diff --git a/api/tests/test_containers_integration_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py b/api/tests/test_containers_integration_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py index ba59780d59..77b3c72e5e 100644 --- a/api/tests/test_containers_integration_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py +++ b/api/tests/test_containers_integration_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py @@ -2,8 +2,10 @@ from __future__ import annotations +import json from datetime import datetime from types import SimpleNamespace +from typing import TypedDict, Unpack from unittest.mock import MagicMock, patch from uuid import uuid4 @@ -35,9 +37,54 @@ from controllers.console.datasets.rag_pipeline.rag_pipeline_workflow import ( ) from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError from libs.datetime_utils import naive_utc_now +from models.account import Account +from models.workflow import Workflow from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError from services.errors.llm import InvokeRateLimitError +DEFAULT_WORKFLOW_TENANT_ID = "00000000-0000-0000-0000-000000000001" +DEFAULT_WORKFLOW_APP_ID = "00000000-0000-0000-0000-000000000002" +DEFAULT_WORKFLOW_CREATED_BY = "00000000-0000-0000-0000-000000000003" +type WorkflowVariablePayload = dict[str, object] + + +class WorkflowFactoryPayload(TypedDict): + id: str + tenant_id: str + app_id: str + type: str + version: str + marked_name: str + marked_comment: str + graph: str + features: str + created_by: str + created_at: datetime + updated_by: str | None + updated_at: datetime + environment_variables: list[WorkflowVariablePayload] + conversation_variables: list[WorkflowVariablePayload] + rag_pipeline_variables: list[WorkflowVariablePayload] + + +class WorkflowFactoryOverrides(TypedDict, total=False): + id: str + tenant_id: str + app_id: str + type: str + version: str + marked_name: str + marked_comment: str + graph: str + features: str + created_by: str + created_at: datetime + updated_by: str | None + updated_at: datetime + environment_variables: list[WorkflowVariablePayload] + conversation_variables: list[WorkflowVariablePayload] + rag_pipeline_variables: list[WorkflowVariablePayload] + def unwrap(func): while hasattr(func, "__wrapped__"): @@ -74,17 +121,52 @@ def make_node_execution(**overrides): return SimpleNamespace(**payload) +def default_workflow_payload() -> WorkflowFactoryPayload: + return { + "id": "workflow-1", + "tenant_id": DEFAULT_WORKFLOW_TENANT_ID, + "app_id": DEFAULT_WORKFLOW_APP_ID, + "type": "workflow", + "version": "1", + "marked_name": "Release 1", + "marked_comment": "Initial release", + "graph": json.dumps({"nodes": [], "edges": []}), + "features": json.dumps({"file_upload": {"enabled": False}}), + "created_by": DEFAULT_WORKFLOW_CREATED_BY, + "created_at": datetime(2024, 1, 1, 12, 0, 0), + "updated_by": None, + "updated_at": datetime(2024, 1, 1, 12, 1, 0), + "environment_variables": [], + "conversation_variables": [], + "rag_pipeline_variables": [], + } + + +def make_workflow(**overrides: Unpack[WorkflowFactoryOverrides]) -> Workflow: + payload = default_workflow_payload() + payload.update(overrides) + return Workflow(**payload) + + +@pytest.fixture +def workflow_author(db_session_with_containers: Session) -> Account: + account = Account(name="Alice", email=f"alice-{uuid4()}@example.com") + db_session_with_containers.add(account) + db_session_with_containers.commit() + return account + + class TestDraftWorkflowApi: @pytest.fixture def app(self, flask_app_with_containers: Flask): return flask_app_with_containers - def test_get_draft_success(self, app: Flask): + def test_get_draft_success(self, app: Flask, workflow_author: Account): api = DraftRagPipelineApi() method = unwrap(api.get) pipeline = MagicMock() - workflow = MagicMock() + workflow = make_workflow(created_by=workflow_author.id) service = MagicMock() service.get_draft_workflow.return_value = workflow @@ -97,7 +179,17 @@ class TestDraftWorkflowApi: ), ): result = method(api, pipeline) - assert result == workflow + + assert result["id"] == "workflow-1" + assert result["graph"] == {"nodes": [], "edges": []} + assert result["features"] == {"file_upload": {"enabled": False}} + assert result["hash"] == workflow.unique_hash + assert result["created_by"] == { + "id": workflow_author.id, + "name": workflow_author.name, + "email": workflow_author.email, + } + assert result["updated_by"] is None def test_get_draft_not_exist(self, app: Flask): api = DraftRagPipelineApi() @@ -642,7 +734,7 @@ class TestPublishedAllRagPipelineApi: user = MagicMock(id="u1") service = MagicMock() - service.get_all_published_workflow.return_value = ([{"id": "w1"}], False) + service.get_all_published_workflow.return_value = ([make_workflow(id="w1")], False) with ( app.test_request_context("/"), @@ -657,7 +749,8 @@ class TestPublishedAllRagPipelineApi: ): result = method(api, pipeline) - assert result["items"] == [{"id": "w1"}] + assert result["items"][0]["id"] == "w1" + assert result["items"][0]["graph"] == {"nodes": [], "edges": []} assert result["has_more"] is False def test_get_published_workflows_forbidden(self, app: Flask): @@ -690,7 +783,7 @@ class TestRagPipelineByIdApi: pipeline = MagicMock(tenant_id="t1") user = MagicMock(id="u1") - workflow = MagicMock() + workflow = make_workflow(id="w1", marked_name="test") service = MagicMock() service.update_workflow.return_value = workflow @@ -711,7 +804,9 @@ class TestRagPipelineByIdApi: ): result = method(api, pipeline, "w1") - assert result == workflow + assert result["id"] == "w1" + assert result["marked_name"] == "test" + assert result["hash"] == workflow.unique_hash def test_patch_no_fields(self, app: Flask): api = RagPipelineByIdApi() diff --git a/api/tests/unit_tests/controllers/console/app/test_workflow.py b/api/tests/unit_tests/controllers/console/app/test_workflow.py index 7c470eb9a8..e7fc1f8042 100644 --- a/api/tests/unit_tests/controllers/console/app/test_workflow.py +++ b/api/tests/unit_tests/controllers/console/app/test_workflow.py @@ -3,14 +3,18 @@ from __future__ import annotations import json from datetime import datetime from types import SimpleNamespace +from typing import cast from unittest.mock import Mock import pytest +from pydantic import ValidationError from werkzeug.exceptions import HTTPException, NotFound from controllers.console.app import workflow as workflow_module from controllers.console.app.error import DraftWorkflowNotExist, DraftWorkflowNotSync from graphon.file import File, FileTransferMethod, FileType +from graphon.variables import SecretVariable, StringVariable +from graphon.variables.variables import RAGPipelineVariable def _unwrap(func): @@ -19,11 +23,67 @@ def _unwrap(func): return func +def _make_workflow(**overrides): + workflow = SimpleNamespace( + id="workflow-1", + graph_dict={"nodes": [], "edges": []}, + features_dict={"file_upload": {"enabled": False}}, + unique_hash="hash-1", + version="1", + marked_name="Release 1", + marked_comment="Initial release", + created_by_account=SimpleNamespace(id="user-1", name="Alice", email="alice@example.com"), + created_at=datetime(2024, 1, 1, 12, 0, 0), + updated_by_account=None, + updated_at=datetime(2024, 1, 1, 12, 1, 0), + tool_published=False, + environment_variables=[ + { + "id": "env-1", + "name": "API_KEY", + "value": "[__HIDDEN__]", + "value_type": "secret", + "description": "API key", + } + ], + conversation_variables=[ + { + "id": "conv-1", + "name": "topic", + "value": "hello", + "value_type": "string", + "description": "Topic", + } + ], + rag_pipeline_variables=[ + { + "variable": "query", + "type": "text-input", + "label": "Query", + "belong_to_node_id": "shared", + "max_length": 0, + "required": False, + "unit": "", + "default_value": "", + "options": [], + "placeholder": "", + "tooltips": "", + "allowed_file_types": ["custom"], + "allowed_file_extensions": [".pdf"], + "allowed_file_upload_methods": ["local_file"], + } + ], + ) + for key, value in overrides.items(): + setattr(workflow, key, value) + return workflow + + def test_parse_file_no_config(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(workflow_module.FileUploadConfigManager, "convert", lambda *_args, **_kwargs: None) workflow = SimpleNamespace(features_dict={}, tenant_id="t1") - assert workflow_module._parse_file(workflow, files=[{"id": "f"}]) == [] + assert workflow_module._parse_file(cast(workflow_module.Workflow, workflow), files=[{"id": "f"}]) == [] def test_parse_file_with_config(monkeypatch: pytest.MonkeyPatch) -> None: @@ -41,7 +101,7 @@ def test_parse_file_with_config(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(workflow_module.file_factory, "build_from_mappings", build_mock) workflow = SimpleNamespace(features_dict={}, tenant_id="t1") - result = workflow_module._parse_file(workflow, files=[{"id": "f"}]) + result = workflow_module._parse_file(cast(workflow_module.Workflow, workflow), files=[{"id": "f"}]) assert result == file_list build_mock.assert_called_once() @@ -259,7 +319,7 @@ def test_restore_published_workflow_to_draft_returns_400_for_invalid_structure( assert exc.value.description == "invalid workflow graph" -def test_get_published_workflows_marshals_items_before_session_closes(app, monkeypatch: pytest.MonkeyPatch) -> None: +def test_get_published_workflows_serializes_items_before_session_closes(app, monkeypatch: pytest.MonkeyPatch) -> None: api = workflow_module.PublishedAllWorkflowApi() handler = _unwrap(api.get) @@ -278,7 +338,12 @@ def test_get_published_workflows_marshals_items_before_session_closes(app, monke def begin(self): return _SessionContext() + base_workflow = _make_workflow() + class _Workflow: + def __getattr__(self, name): + return getattr(base_workflow, name) + @property def id(self): assert session_state["open"] is True @@ -295,12 +360,6 @@ def test_get_published_workflows_marshals_items_before_session_closes(app, monke ), ) - def _fake_marshal(items, fields): - assert session_state["open"] is True - return [{"id": item.id} for item in items] - - monkeypatch.setattr(workflow_module, "marshal", _fake_marshal) - with app.test_request_context( "/apps/app/workflows", method="GET", @@ -308,12 +367,153 @@ def test_get_published_workflows_marshals_items_before_session_closes(app, monke ): response = handler(api, app_model=SimpleNamespace(id="app", workflow_id="wf-1")) - assert response == { - "items": [{"id": "w1"}], - "page": 1, - "limit": 10, - "has_more": False, - } + assert response["items"][0]["id"] == "w1" + assert response["page"] == 1 + assert response["limit"] == 10 + assert response["has_more"] is False + + +def test_draft_workflow_get_serializes_response_model(monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow() + monkeypatch.setattr( + workflow_module, "WorkflowService", lambda: SimpleNamespace(get_draft_workflow=lambda **_kwargs: workflow) + ) + + api = workflow_module.DraftWorkflowApi() + handler = _unwrap(api.get) + + response = handler(api, app_model=SimpleNamespace(id="app")) + + assert response["id"] == "workflow-1" + assert response["graph"] == {"nodes": [], "edges": []} + assert response["features"] == {"file_upload": {"enabled": False}} + assert response["hash"] == "hash-1" + assert response["created_by"] == {"id": "user-1", "name": "Alice", "email": "alice@example.com"} + assert response["updated_by"] is None + assert response["created_at"] == int(datetime(2024, 1, 1, 12, 0, 0).timestamp()) + assert response["updated_at"] == int(datetime(2024, 1, 1, 12, 1, 0).timestamp()) + assert response["environment_variables"] == [ + { + "id": "env-1", + "name": "API_KEY", + "value": "[__HIDDEN__]", + "value_type": "secret", + "description": "API key", + } + ] + assert response["conversation_variables"] == [ + { + "id": "conv-1", + "name": "topic", + "value": "hello", + "value_type": "string", + "description": "Topic", + } + ] + assert response["rag_pipeline_variables"] == [ + { + "label": "Query", + "variable": "query", + "type": "text-input", + "belong_to_node_id": "shared", + "max_length": 0, + "required": False, + "unit": "", + "default_value": "", + "options": [], + "placeholder": "", + "tooltips": "", + "allowed_file_types": ["custom"], + "allowed_file_extensions": [".pdf"], + "allowed_file_upload_methods": ["local_file"], + } + ] + + +def test_pipeline_variable_response_accepts_legacy_file_field_names() -> None: + response = workflow_module.PipelineVariableResponse.model_validate( + { + "label": "Query", + "variable": "query", + "type": "single-file", + "belong_to_node_id": "shared", + "max_length": 0, + "required": False, + "unit": "", + "default_value": "", + "options": [], + "placeholder": "", + "tooltips": "", + "allowed_file_types": [], + "allow_file_extension": [".txt"], + "allow_file_upload_methods": ["remote_url"], + } + ).model_dump(mode="json") + + assert response["allowed_file_extensions"] == [".txt"] + assert response["allowed_file_upload_methods"] == ["remote_url"] + + +def test_pipeline_variable_response_accepts_explicit_null_optional_fields() -> None: + pipeline_variable = RAGPipelineVariable.model_validate( + { + "label": "Query", + "variable": "query", + "type": "text-input", + "belong_to_node_id": "shared", + "max_length": None, + "unit": None, + "default_value": None, + "options": None, + "placeholder": None, + "tooltips": None, + "allowed_file_types": None, + "allowed_file_extensions": None, + "allowed_file_upload_methods": None, + } + ).model_dump(mode="json") + + response = workflow_module.PipelineVariableResponse.model_validate(pipeline_variable).model_dump(mode="json") + + assert response["max_length"] is None + assert response["allowed_file_types"] is None + assert response["allowed_file_extensions"] is None + assert response["allowed_file_upload_methods"] is None + + +def test_workflow_response_masks_secret_environment_variables() -> None: + workflow = _make_workflow( + environment_variables=[ + SecretVariable(id="env-secret", name="API_KEY", value="plain-token", selector=["env", "API_KEY"]), + StringVariable(id="env-string", name="REGION", value="us-east-1", selector=["env", "REGION"]), + ] + ) + + response = workflow_module.WorkflowResponse.model_validate(workflow, from_attributes=True).model_dump(mode="json") + + assert response["environment_variables"] == [ + { + "id": "env-secret", + "name": "API_KEY", + "value": workflow_module.encrypter.full_mask_token(), + "value_type": "secret", + "description": "", + }, + { + "id": "env-string", + "name": "REGION", + "value": "us-east-1", + "value_type": "string", + "description": "", + }, + ] + + +def test_workflow_response_rejects_invalid_environment_variable_dict() -> None: + workflow = _make_workflow(environment_variables=[{"value_type": "not-a-segment-type"}]) + + with pytest.raises(ValidationError): + workflow_module.WorkflowResponse.model_validate(workflow, from_attributes=True) def test_draft_workflow_get_not_found(monkeypatch: pytest.MonkeyPatch) -> None: @@ -373,10 +573,33 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p "avatar": "avatar-file-id", "sid": "sid-1", } - ) + ), + b"sid-malformed": json.dumps({"avatar": "avatar-file-id", "sid": "sid-malformed"}), + b"sid-invalid-avatar": json.dumps( + { + "user_id": "u-2", + "username": "Bob", + "avatar": {"file_id": "avatar-file-id"}, + } + ), + b"sid-invalid-user-id": json.dumps( + { + "user_id": 42, + "username": "Carol", + "avatar": "avatar-file-id", + } + ), + b"sid-invalid-username": json.dumps( + { + "user_id": "u-4", + "username": ["Dave"], + "avatar": "avatar-file-id", + } + ), } ] - workflow_module.redis_client.pipeline.return_value = redis_pipeline + redis_pipeline_factory = Mock(return_value=redis_pipeline) + monkeypatch.setattr(workflow_module.redis_client, "pipeline", redis_pipeline_factory) api = workflow_module.WorkflowOnlineUsersApi() handler = _unwrap(api.post) @@ -397,13 +620,17 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p "user_id": "u-1", "username": "Alice", "avatar": signed_avatar_url, - "sid": "sid-1", - } + }, + { + "user_id": "u-2", + "username": "Bob", + "avatar": None, + }, ], } ] } - workflow_module.redis_client.pipeline.assert_called_once_with(transaction=False) + redis_pipeline_factory.assert_called_once_with(transaction=False) redis_pipeline.hgetall.assert_called_once_with(f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}") redis_pipeline.execute.assert_called_once_with() sign_avatar.assert_called_once_with("avatar-file-id") @@ -422,7 +649,8 @@ def test_workflow_online_users_batches_redis_reads(app, monkeypatch: pytest.Monk first_pipeline.execute.return_value = [{} for _ in range(workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE)] second_pipeline = Mock() second_pipeline.execute.return_value = [{}] - workflow_module.redis_client.pipeline.side_effect = [first_pipeline, second_pipeline] + redis_pipeline_factory = Mock(side_effect=[first_pipeline, second_pipeline]) + monkeypatch.setattr(workflow_module.redis_client, "pipeline", redis_pipeline_factory) api = workflow_module.WorkflowOnlineUsersApi() handler = _unwrap(api.post) @@ -435,7 +663,7 @@ def test_workflow_online_users_batches_redis_reads(app, monkeypatch: pytest.Monk response = handler(api) assert len(response["data"]) == len(app_ids) - assert workflow_module.redis_client.pipeline.call_count == 2 + assert redis_pipeline_factory.call_count == 2 assert first_pipeline.hgetall.call_count == workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE assert second_pipeline.hgetall.call_count == 1 @@ -463,5 +691,6 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(app, monkeypatch: handler(api) assert exc.value.code == 400 + assert exc.value.description is not None assert "Maximum" in exc.value.description accessible_app_ids.assert_not_called() diff --git a/api/tests/unit_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py b/api/tests/unit_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py new file mode 100644 index 0000000000..322f1baa96 --- /dev/null +++ b/api/tests/unit_tests/controllers/console/datasets/rag_pipeline/test_rag_pipeline_workflow.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import PropertyMock, patch + +import pytest + +from controllers.console.datasets.rag_pipeline import rag_pipeline_workflow as module + + +def _unwrap(func): + while hasattr(func, "__wrapped__"): + func = func.__wrapped__ + return func + + +def _make_workflow(**overrides): + workflow = SimpleNamespace( + id="workflow-1", + graph_dict={"nodes": [], "edges": []}, + features_dict={"file_upload": {"enabled": False}}, + unique_hash="hash-1", + version="1", + marked_name="Release 1", + marked_comment="Initial release", + created_by_account=SimpleNamespace(id="user-1", name="Alice", email="alice@example.com"), + created_at=datetime(2024, 1, 1, 12, 0, 0), + updated_by_account=None, + updated_at=datetime(2024, 1, 1, 12, 1, 0), + tool_published=False, + environment_variables=[], + conversation_variables=[], + rag_pipeline_variables=[], + ) + for key, value in overrides.items(): + setattr(workflow, key, value) + return workflow + + +def test_draft_rag_pipeline_workflow_get_serializes_response_model(monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow() + monkeypatch.setattr( + module, "RagPipelineService", lambda: SimpleNamespace(get_draft_workflow=lambda **_kwargs: workflow) + ) + + api = module.DraftRagPipelineApi() + handler = _unwrap(api.get) + + response = handler(api, pipeline=SimpleNamespace(id="pipeline-1")) + + assert response["id"] == "workflow-1" + assert response["graph"] == {"nodes": [], "edges": []} + assert response["features"] == {"file_upload": {"enabled": False}} + assert response["hash"] == "hash-1" + assert response["created_by"] == {"id": "user-1", "name": "Alice", "email": "alice@example.com"} + assert response["updated_by"] is None + assert response["created_at"] == int(datetime(2024, 1, 1, 12, 0, 0).timestamp()) + assert response["updated_at"] == int(datetime(2024, 1, 1, 12, 1, 0).timestamp()) + + +def test_published_rag_pipeline_workflows_serialize_items_before_session_closes( + app, monkeypatch: pytest.MonkeyPatch +) -> None: + api = module.PublishedAllRagPipelineApi() + handler = _unwrap(api.get) + session_state = {"open": False} + + class _SessionContext: + def __enter__(self): + session_state["open"] = True + return object() + + def __exit__(self, exc_type, exc, tb): + session_state["open"] = False + return False + + class _SessionMaker: + def begin(self): + return _SessionContext() + + base_workflow = _make_workflow() + + class _Workflow: + def __getattr__(self, name: str): + assert session_state["open"] is True + return getattr(base_workflow, name) + + monkeypatch.setattr(module, "db", SimpleNamespace(engine=object())) + monkeypatch.setattr(module, "sessionmaker", lambda *_args, **_kwargs: _SessionMaker()) + monkeypatch.setattr(module, "current_account_with_tenant", lambda: (SimpleNamespace(id="user-1"), "tenant-1")) + monkeypatch.setattr( + module, + "RagPipelineService", + lambda: SimpleNamespace(get_all_published_workflow=lambda **_kwargs: ([_Workflow()], False)), + ) + + with app.test_request_context( + "/rag/pipelines/pipeline-1/workflows", + method="GET", + query_string={"page": 1, "limit": 10, "user_id": "", "named_only": "false"}, + ): + response = handler(api, pipeline=SimpleNamespace(id="pipeline-1")) + + assert response["items"][0]["id"] == "workflow-1" + assert response["page"] == 1 + assert response["limit"] == 10 + assert response["has_more"] is False + + +def test_rag_pipeline_workflow_patch_serializes_response_model(app, monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow(marked_name="Updated release") + monkeypatch.setattr(module, "current_account_with_tenant", lambda: (SimpleNamespace(id="user-1"), "tenant-1")) + + class _SessionContext: + def __enter__(self): + return object() + + def __exit__(self, exc_type, exc, tb): + return False + + class _SessionMaker: + def begin(self): + return _SessionContext() + + monkeypatch.setattr(module, "db", SimpleNamespace(engine=object())) + monkeypatch.setattr(module, "sessionmaker", lambda *_args, **_kwargs: _SessionMaker()) + monkeypatch.setattr( + module, + "RagPipelineService", + lambda: SimpleNamespace(update_workflow=lambda **_kwargs: workflow), + ) + payload: dict[str, object] = {"marked_name": "Updated release"} + + api = module.RagPipelineByIdApi() + handler = _unwrap(api.patch) + + with ( + app.test_request_context("/rag/pipelines/pipeline-1/workflows/workflow-1", method="PATCH", json=payload), + patch.object(type(module.console_ns), "payload", new_callable=PropertyMock, return_value=payload), + ): + response = handler( + api, + pipeline=SimpleNamespace(id="pipeline-1", tenant_id="tenant-1"), + workflow_id="workflow-1", + ) + + assert response["id"] == "workflow-1" + assert response["marked_name"] == "Updated release" + assert response["hash"] == "hash-1" diff --git a/packages/contracts/generated/api/console/apps/types.gen.ts b/packages/contracts/generated/api/console/apps/types.gen.ts index 2402abb2da..b7fd725f53 100644 --- a/packages/contracts/generated/api/console/apps/types.gen.ts +++ b/packages/contracts/generated/api/console/apps/types.gen.ts @@ -72,6 +72,10 @@ export type WorkflowOnlineUsersPayload = { app_ids?: Array } +export type WorkflowOnlineUsersResponse = { + data: Array +} + export type AppDetailWithSite = { access_mode?: string | null api_base_url?: string | null @@ -627,39 +631,33 @@ export type WorkflowCommentResolve = { resolved_by?: string | null } -export type WorkflowPagination = { - has_more?: boolean - items?: Array - limit?: number - page?: number +export type WorkflowPaginationResponse = { + has_more: boolean + items: Array + limit: number + page: number } -export type Workflow = { - conversation_variables?: Array - created_at?: { - [key: string]: unknown - } +export type WorkflowResponse = { + conversation_variables: Array + created_at: number created_by?: SimpleAccount - environment_variables?: Array<{ - [key: string]: unknown - }> - features?: { + environment_variables: Array + features: { [key: string]: unknown } - graph?: { - [key: string]: unknown - } - hash?: string - id?: string - marked_comment?: string - marked_name?: string - rag_pipeline_variables?: Array - tool_published?: boolean - updated_at?: { + graph: { [key: string]: unknown } + hash: string + id: string + marked_comment: string + marked_name: string + rag_pipeline_variables: Array + tool_published: boolean + updated_at: number updated_by?: SimpleAccount - version?: string + version: string } export type SyncDraftWorkflowPayload = { @@ -886,6 +884,11 @@ export type PluginDependency = { value: unknown } +export type WorkflowOnlineUsersByApp = { + app_id: string + users: Array +} + export type DeletedTool = { provider_id: string tool_name: string @@ -1182,33 +1185,43 @@ export type WorkflowCommentReply = { id: string } -export type ConversationVariable = { - description?: string - id?: string - name?: string - value?: { +export type WorkflowConversationVariableResponse = { + description: string + id: string + name: string + value: { [key: string]: unknown } - value_type?: string + value_type: string } -export type PipelineVariable = { - allow_file_extension?: Array - allow_file_upload_methods?: Array - allowed_file_types?: Array - belong_to_node_id?: string +export type WorkflowEnvironmentVariableResponse = { + description: string + id: string + name: string + value: { + [key: string]: unknown + } + value_type: string +} + +export type PipelineVariableResponse = { + allowed_file_extensions?: Array | null + allowed_file_types?: Array | null + allowed_file_upload_methods?: Array | null + belong_to_node_id: string default_value?: { [key: string]: unknown } - label?: string - max_length?: number - options?: Array - placeholder?: string - required?: boolean - tooltips?: string - type?: string - unit?: string - variable?: string + label: string + max_length?: number | null + options?: Array | null + placeholder?: string | null + required: boolean + tooltips?: string | null + type: string + unit?: string | null + variable: string } export type WorkflowDraftVariableWithoutValue = { @@ -1253,6 +1266,12 @@ export type Package = { version?: string | null } +export type WorkflowOnlineUser = { + avatar?: string | null + user_id: string + username: string +} + export type SimpleModelConfig = { model_dict?: JsonValue pre_prompt?: string | null @@ -1548,9 +1567,7 @@ export type PostAppsWorkflowsOnlineUsersData = { } export type PostAppsWorkflowsOnlineUsersResponses = { - 200: { - [key: string]: unknown - } + 200: WorkflowOnlineUsersResponse } export type PostAppsWorkflowsOnlineUsersResponse @@ -3857,7 +3874,7 @@ export type GetAppsByAppIdWorkflowsData = { } export type GetAppsByAppIdWorkflowsResponses = { - 200: WorkflowPagination + 200: WorkflowPaginationResponse } export type GetAppsByAppIdWorkflowsResponse @@ -3930,7 +3947,7 @@ export type GetAppsByAppIdWorkflowsDraftError = GetAppsByAppIdWorkflowsDraftErrors[keyof GetAppsByAppIdWorkflowsDraftErrors] export type GetAppsByAppIdWorkflowsDraftResponses = { - 200: Workflow + 200: WorkflowResponse } export type GetAppsByAppIdWorkflowsDraftResponse @@ -4572,17 +4589,8 @@ export type GetAppsByAppIdWorkflowsPublishData = { url: '/apps/{app_id}/workflows/publish' } -export type GetAppsByAppIdWorkflowsPublishErrors = { - 404: { - [key: string]: unknown - } -} - -export type GetAppsByAppIdWorkflowsPublishError - = GetAppsByAppIdWorkflowsPublishErrors[keyof GetAppsByAppIdWorkflowsPublishErrors] - export type GetAppsByAppIdWorkflowsPublishResponses = { - 200: Workflow + 200: WorkflowResponse } export type GetAppsByAppIdWorkflowsPublishResponse @@ -4668,7 +4676,7 @@ export type PatchAppsByAppIdWorkflowsByWorkflowIdError = PatchAppsByAppIdWorkflowsByWorkflowIdErrors[keyof PatchAppsByAppIdWorkflowsByWorkflowIdErrors] export type PatchAppsByAppIdWorkflowsByWorkflowIdResponses = { - 200: Workflow + 200: WorkflowResponse } export type PatchAppsByAppIdWorkflowsByWorkflowIdResponse diff --git a/packages/contracts/generated/api/console/apps/zod.gen.ts b/packages/contracts/generated/api/console/apps/zod.gen.ts index f02b146106..defc48c824 100644 --- a/packages/contracts/generated/api/console/apps/zod.gen.ts +++ b/packages/contracts/generated/api/console/apps/zod.gen.ts @@ -1108,54 +1108,77 @@ export const zWorkflowCommentDetail = z.object({ updated_at: z.int().nullish(), }) -export const zConversationVariable = z.object({ - description: z.string().optional(), - id: z.string().optional(), - name: z.string().optional(), - value: z.record(z.string(), z.unknown()).optional(), - value_type: z.string().optional(), +/** + * WorkflowConversationVariableResponse + */ +export const zWorkflowConversationVariableResponse = z.object({ + description: z.string(), + id: z.string(), + name: z.string(), + value: z.record(z.string(), z.unknown()), + value_type: z.string(), }) -export const zPipelineVariable = z.object({ - allow_file_extension: z.array(z.string()).optional(), - allow_file_upload_methods: z.array(z.string()).optional(), - allowed_file_types: z.array(z.string()).optional(), - belong_to_node_id: z.string().optional(), +/** + * WorkflowEnvironmentVariableResponse + */ +export const zWorkflowEnvironmentVariableResponse = z.object({ + description: z.string(), + id: z.string(), + name: z.string(), + value: z.record(z.string(), z.unknown()), + value_type: z.string(), +}) + +/** + * PipelineVariableResponse + */ +export const zPipelineVariableResponse = z.object({ + allowed_file_extensions: z.array(z.string()).nullish(), + allowed_file_types: z.array(z.string()).nullish(), + allowed_file_upload_methods: z.array(z.string()).nullish(), + belong_to_node_id: z.string(), default_value: z.record(z.string(), z.unknown()).optional(), - label: z.string().optional(), - max_length: z.int().optional(), - options: z.array(z.string()).optional(), - placeholder: z.string().optional(), - required: z.boolean().optional(), - tooltips: z.string().optional(), - type: z.string().optional(), - unit: z.string().optional(), - variable: z.string().optional(), + label: z.string(), + max_length: z.int().nullish(), + options: z.array(z.string()).nullish(), + placeholder: z.string().nullish(), + required: z.boolean(), + tooltips: z.string().nullish(), + type: z.string(), + unit: z.string().nullish(), + variable: z.string(), }) -export const zWorkflow = z.object({ - conversation_variables: z.array(zConversationVariable).optional(), - created_at: z.record(z.string(), z.unknown()).optional(), +/** + * WorkflowResponse + */ +export const zWorkflowResponse = z.object({ + conversation_variables: z.array(zWorkflowConversationVariableResponse), + created_at: z.int(), created_by: zSimpleAccount.optional(), - environment_variables: z.array(z.record(z.string(), z.unknown())).optional(), - features: z.record(z.string(), z.unknown()).optional(), - graph: z.record(z.string(), z.unknown()).optional(), - hash: z.string().optional(), - id: z.string().optional(), - marked_comment: z.string().optional(), - marked_name: z.string().optional(), - rag_pipeline_variables: z.array(zPipelineVariable).optional(), - tool_published: z.boolean().optional(), - updated_at: z.record(z.string(), z.unknown()).optional(), + environment_variables: z.array(zWorkflowEnvironmentVariableResponse), + features: z.record(z.string(), z.unknown()), + graph: z.record(z.string(), z.unknown()), + hash: z.string(), + id: z.string(), + marked_comment: z.string(), + marked_name: z.string(), + rag_pipeline_variables: z.array(zPipelineVariableResponse), + tool_published: z.boolean(), + updated_at: z.int(), updated_by: zSimpleAccount.optional(), - version: z.string().optional(), + version: z.string(), }) -export const zWorkflowPagination = z.object({ - has_more: z.boolean().optional(), - items: z.array(zWorkflow).optional(), - limit: z.int().optional(), - page: z.int().optional(), +/** + * WorkflowPaginationResponse + */ +export const zWorkflowPaginationResponse = z.object({ + has_more: z.boolean(), + items: z.array(zWorkflowResponse), + limit: z.int(), + page: z.int(), }) export const zWorkflowDraftVariableWithoutValue = z.object({ @@ -1374,6 +1397,30 @@ export const zPackage = z.object({ version: z.string().nullish(), }) +/** + * WorkflowOnlineUser + */ +export const zWorkflowOnlineUser = z.object({ + avatar: z.string().nullish(), + user_id: z.string(), + username: z.string(), +}) + +/** + * WorkflowOnlineUsersByApp + */ +export const zWorkflowOnlineUsersByApp = z.object({ + app_id: z.string(), + users: z.array(zWorkflowOnlineUser), +}) + +/** + * WorkflowOnlineUsersResponse + */ +export const zWorkflowOnlineUsersResponse = z.object({ + data: z.array(zWorkflowOnlineUsersByApp), +}) + /** * SimpleModelConfig */ @@ -1862,9 +1909,9 @@ export const zPostAppsImportsByImportIdConfirmResponse = zImport export const zPostAppsWorkflowsOnlineUsersBody = zWorkflowOnlineUsersPayload /** - * Success + * Workflow online users retrieved successfully */ -export const zPostAppsWorkflowsOnlineUsersResponse = z.record(z.string(), z.unknown()) +export const zPostAppsWorkflowsOnlineUsersResponse = zWorkflowOnlineUsersResponse export const zDeleteAppsByAppIdPath = z.object({ app_id: z.string(), @@ -3091,7 +3138,7 @@ export const zGetAppsByAppIdWorkflowsQuery = z.object({ /** * Published workflows retrieved successfully */ -export const zGetAppsByAppIdWorkflowsResponse = zWorkflowPagination +export const zGetAppsByAppIdWorkflowsResponse = zWorkflowPaginationResponse export const zGetAppsByAppIdWorkflowsDefaultWorkflowBlockConfigsPath = z.object({ app_id: z.string(), @@ -3129,7 +3176,7 @@ export const zGetAppsByAppIdWorkflowsDraftPath = z.object({ /** * Draft workflow retrieved successfully */ -export const zGetAppsByAppIdWorkflowsDraftResponse = zWorkflow +export const zGetAppsByAppIdWorkflowsDraftResponse = zWorkflowResponse export const zPostAppsByAppIdWorkflowsDraftBody = zSyncDraftWorkflowPayload @@ -3459,9 +3506,9 @@ export const zGetAppsByAppIdWorkflowsPublishPath = z.object({ }) /** - * Published workflow retrieved successfully + * Published workflow retrieved successfully, or null if not found */ -export const zGetAppsByAppIdWorkflowsPublishResponse = zWorkflow +export const zGetAppsByAppIdWorkflowsPublishResponse = zWorkflowResponse export const zPostAppsByAppIdWorkflowsPublishBody = zPublishWorkflowPayload @@ -3509,7 +3556,7 @@ export const zPatchAppsByAppIdWorkflowsByWorkflowIdPath = z.object({ /** * Workflow updated successfully */ -export const zPatchAppsByAppIdWorkflowsByWorkflowIdResponse = zWorkflow +export const zPatchAppsByAppIdWorkflowsByWorkflowIdResponse = zWorkflowResponse export const zPostAppsByAppIdWorkflowsByWorkflowIdRestorePath = z.object({ app_id: z.string(), diff --git a/packages/contracts/generated/api/console/rag/types.gen.ts b/packages/contracts/generated/api/console/rag/types.gen.ts index 9de86c6cd2..b25272ed7a 100644 --- a/packages/contracts/generated/api/console/rag/types.gen.ts +++ b/packages/contracts/generated/api/console/rag/types.gen.ts @@ -57,6 +57,35 @@ export type WorkflowRunNodeExecutionListResponse = { data: Array } +export type WorkflowPaginationResponse = { + has_more: boolean + items: Array + limit: number + page: number +} + +export type WorkflowResponse = { + conversation_variables: Array + created_at: number + created_by?: SimpleAccount + environment_variables: Array + features: { + [key: string]: unknown + } + graph: { + [key: string]: unknown + } + hash: string + id: string + marked_comment: string + marked_name: string + rag_pipeline_variables: Array + tool_published: boolean + updated_at: number + updated_by?: SimpleAccount + version: string +} + export type DatasourceNodeRunPayload = { credential_id?: string | null datasource_type: string @@ -171,6 +200,45 @@ export type SimpleEndUser = { type: string } +export type WorkflowConversationVariableResponse = { + description: string + id: string + name: string + value: { + [key: string]: unknown + } + value_type: string +} + +export type WorkflowEnvironmentVariableResponse = { + description: string + id: string + name: string + value: { + [key: string]: unknown + } + value_type: string +} + +export type PipelineVariableResponse = { + allowed_file_extensions?: Array | null + allowed_file_types?: Array | null + allowed_file_upload_methods?: Array | null + belong_to_node_id: string + default_value?: { + [key: string]: unknown + } + label: string + max_length?: number | null + options?: Array | null + placeholder?: string | null + required: boolean + tooltips?: string | null + type: string + unit?: string | null + variable: string +} + export type DeleteRagPipelineCustomizedTemplatesByTemplateIdData = { body?: never path: { @@ -507,12 +575,19 @@ export type GetRagPipelinesByPipelineIdWorkflowsData = { url: '/rag/pipelines/{pipeline_id}/workflows' } -export type GetRagPipelinesByPipelineIdWorkflowsResponses = { - 200: { +export type GetRagPipelinesByPipelineIdWorkflowsErrors = { + 403: { [key: string]: unknown } } +export type GetRagPipelinesByPipelineIdWorkflowsError + = GetRagPipelinesByPipelineIdWorkflowsErrors[keyof GetRagPipelinesByPipelineIdWorkflowsErrors] + +export type GetRagPipelinesByPipelineIdWorkflowsResponses = { + 200: WorkflowPaginationResponse +} + export type GetRagPipelinesByPipelineIdWorkflowsResponse = GetRagPipelinesByPipelineIdWorkflowsResponses[keyof GetRagPipelinesByPipelineIdWorkflowsResponses] @@ -562,12 +637,19 @@ export type GetRagPipelinesByPipelineIdWorkflowsDraftData = { url: '/rag/pipelines/{pipeline_id}/workflows/draft' } -export type GetRagPipelinesByPipelineIdWorkflowsDraftResponses = { - 200: { +export type GetRagPipelinesByPipelineIdWorkflowsDraftErrors = { + 404: { [key: string]: unknown } } +export type GetRagPipelinesByPipelineIdWorkflowsDraftError + = GetRagPipelinesByPipelineIdWorkflowsDraftErrors[keyof GetRagPipelinesByPipelineIdWorkflowsDraftErrors] + +export type GetRagPipelinesByPipelineIdWorkflowsDraftResponses = { + 200: WorkflowResponse +} + export type GetRagPipelinesByPipelineIdWorkflowsDraftResponse = GetRagPipelinesByPipelineIdWorkflowsDraftResponses[keyof GetRagPipelinesByPipelineIdWorkflowsDraftResponses] @@ -946,9 +1028,7 @@ export type GetRagPipelinesByPipelineIdWorkflowsPublishData = { } export type GetRagPipelinesByPipelineIdWorkflowsPublishResponses = { - 200: { - [key: string]: unknown - } + 200: WorkflowResponse } export type GetRagPipelinesByPipelineIdWorkflowsPublishResponse @@ -1094,10 +1174,23 @@ export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdData = { url: '/rag/pipelines/{pipeline_id}/workflows/{workflow_id}' } -export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdResponses = { - 200: { +export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdErrors = { + 400: { [key: string]: unknown } + 403: { + [key: string]: unknown + } + 404: { + [key: string]: unknown + } +} + +export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdError + = PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdErrors[keyof PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdErrors] + +export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdResponses = { + 200: WorkflowResponse } export type PatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdResponse diff --git a/packages/contracts/generated/api/console/rag/zod.gen.ts b/packages/contracts/generated/api/console/rag/zod.gen.ts index 24d9b30139..6832c6eb09 100644 --- a/packages/contracts/generated/api/console/rag/zod.gen.ts +++ b/packages/contracts/generated/api/console/rag/zod.gen.ts @@ -200,6 +200,79 @@ export const zWorkflowRunNodeExecutionListResponse = z.object({ data: z.array(zWorkflowRunNodeExecutionResponse), }) +/** + * WorkflowConversationVariableResponse + */ +export const zWorkflowConversationVariableResponse = z.object({ + description: z.string(), + id: z.string(), + name: z.string(), + value: z.record(z.string(), z.unknown()), + value_type: z.string(), +}) + +/** + * WorkflowEnvironmentVariableResponse + */ +export const zWorkflowEnvironmentVariableResponse = z.object({ + description: z.string(), + id: z.string(), + name: z.string(), + value: z.record(z.string(), z.unknown()), + value_type: z.string(), +}) + +/** + * PipelineVariableResponse + */ +export const zPipelineVariableResponse = z.object({ + allowed_file_extensions: z.array(z.string()).nullish(), + allowed_file_types: z.array(z.string()).nullish(), + allowed_file_upload_methods: z.array(z.string()).nullish(), + belong_to_node_id: z.string(), + default_value: z.record(z.string(), z.unknown()).optional(), + label: z.string(), + max_length: z.int().nullish(), + options: z.array(z.string()).nullish(), + placeholder: z.string().nullish(), + required: z.boolean(), + tooltips: z.string().nullish(), + type: z.string(), + unit: z.string().nullish(), + variable: z.string(), +}) + +/** + * WorkflowResponse + */ +export const zWorkflowResponse = z.object({ + conversation_variables: z.array(zWorkflowConversationVariableResponse), + created_at: z.int(), + created_by: zSimpleAccount.optional(), + environment_variables: z.array(zWorkflowEnvironmentVariableResponse), + features: z.record(z.string(), z.unknown()), + graph: z.record(z.string(), z.unknown()), + hash: z.string(), + id: z.string(), + marked_comment: z.string(), + marked_name: z.string(), + rag_pipeline_variables: z.array(zPipelineVariableResponse), + tool_published: z.boolean(), + updated_at: z.int(), + updated_by: zSimpleAccount.optional(), + version: z.string(), +}) + +/** + * WorkflowPaginationResponse + */ +export const zWorkflowPaginationResponse = z.object({ + has_more: z.boolean(), + items: z.array(zWorkflowResponse), + limit: z.int(), + page: z.int(), +}) + export const zDeleteRagPipelineCustomizedTemplatesByTemplateIdPath = z.object({ template_id: z.string(), }) @@ -383,9 +456,9 @@ export const zGetRagPipelinesByPipelineIdWorkflowsPath = z.object({ }) /** - * Success + * Published workflows retrieved successfully */ -export const zGetRagPipelinesByPipelineIdWorkflowsResponse = z.record(z.string(), z.unknown()) +export const zGetRagPipelinesByPipelineIdWorkflowsResponse = zWorkflowPaginationResponse export const zGetRagPipelinesByPipelineIdWorkflowsDefaultWorkflowBlockConfigsPath = z.object({ pipeline_id: z.string(), @@ -416,9 +489,9 @@ export const zGetRagPipelinesByPipelineIdWorkflowsDraftPath = z.object({ }) /** - * Success + * Draft workflow retrieved successfully */ -export const zGetRagPipelinesByPipelineIdWorkflowsDraftResponse = z.record(z.string(), z.unknown()) +export const zGetRagPipelinesByPipelineIdWorkflowsDraftResponse = zWorkflowResponse export const zPostRagPipelinesByPipelineIdWorkflowsDraftPath = z.object({ pipeline_id: z.string(), @@ -677,12 +750,9 @@ export const zGetRagPipelinesByPipelineIdWorkflowsPublishPath = z.object({ }) /** - * Success + * Published workflow retrieved successfully, or null if not exist */ -export const zGetRagPipelinesByPipelineIdWorkflowsPublishResponse = z.record( - z.string(), - z.unknown(), -) +export const zGetRagPipelinesByPipelineIdWorkflowsPublishResponse = zWorkflowResponse export const zPostRagPipelinesByPipelineIdWorkflowsPublishPath = z.object({ pipeline_id: z.string(), @@ -781,12 +851,9 @@ export const zPatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdPath = z.object( }) /** - * Success + * Workflow updated successfully */ -export const zPatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdResponse = z.record( - z.string(), - z.unknown(), -) +export const zPatchRagPipelinesByPipelineIdWorkflowsByWorkflowIdResponse = zWorkflowResponse export const zPostRagPipelinesByPipelineIdWorkflowsByWorkflowIdRestorePath = z.object({ pipeline_id: z.string(), diff --git a/web/app/components/rag-pipeline/hooks/__tests__/use-pipeline-config.spec.ts b/web/app/components/rag-pipeline/hooks/__tests__/use-pipeline-config.spec.ts index 412fd299d1..a57f255c6f 100644 --- a/web/app/components/rag-pipeline/hooks/__tests__/use-pipeline-config.spec.ts +++ b/web/app/components/rag-pipeline/hooks/__tests__/use-pipeline-config.spec.ts @@ -175,7 +175,7 @@ describe('usePipelineConfig', () => { expect(mockSetPublishedAt).toHaveBeenCalledWith('2024-01-01T00:00:00Z') }) - it('should handle undefined workflow response', () => { + it('should reset published at when workflow response is empty', () => { let capturedCallback: ((data: unknown) => void) | undefined mockUseWorkflowConfig.mockImplementation((url: string, callback: (data: unknown) => void) => { if (url.includes('/publish')) { @@ -187,7 +187,7 @@ describe('usePipelineConfig', () => { capturedCallback?.(undefined) - expect(mockSetPublishedAt).toHaveBeenCalledWith(undefined) + expect(mockSetPublishedAt).toHaveBeenCalledWith(0) }) }) diff --git a/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts b/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts index 1d1b214624..b1871248fc 100644 --- a/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts +++ b/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts @@ -33,10 +33,10 @@ export const usePipelineConfig = () => { handleUpdateNodesDefaultConfigs, ) - const handleUpdatePublishedAt = useCallback((publishedWorkflow: FetchWorkflowDraftResponse) => { + const handleUpdatePublishedAt = useCallback((publishedWorkflow: FetchWorkflowDraftResponse | null) => { const { setPublishedAt } = workflowStore.getState() - setPublishedAt(publishedWorkflow?.created_at) + setPublishedAt(publishedWorkflow?.created_at ?? 0) }, [workflowStore]) useWorkflowConfig( pipelineId ? `/rag/pipelines/${pipelineId}/workflows/publish` : '', diff --git a/web/app/components/workflow-app/hooks/use-workflow-init.ts b/web/app/components/workflow-app/hooks/use-workflow-init.ts index 00bff2919f..548407257e 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-init.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-init.ts @@ -124,7 +124,7 @@ export const useWorkflowInit = () => { return acc }, {} as Record), }) - workflowStore.getState().setPublishedAt(publishedWorkflow?.created_at) + workflowStore.getState().setPublishedAt(publishedWorkflow?.created_at ?? 0) const graph = publishedWorkflow?.graph workflowStore.getState().setLastPublishedHasUserInput( hasConnectedUserInput(graph?.nodes, graph?.edges), diff --git a/web/service/use-pipeline.ts b/web/service/use-pipeline.ts index 7e1bd9711f..70055fc718 100644 --- a/web/service/use-pipeline.ts +++ b/web/service/use-pipeline.ts @@ -195,10 +195,10 @@ export const useInvalidDataSourceList = () => { export const publishedPipelineInfoQueryKeyPrefix = [NAME_SPACE, 'published-pipeline'] export const usePublishedPipelineInfo = (pipelineId: string) => { - return useQuery({ + return useQuery({ queryKey: [...publishedPipelineInfoQueryKeyPrefix, pipelineId], queryFn: () => { - return get(`/rag/pipelines/${pipelineId}/workflows/publish`) + return get(`/rag/pipelines/${pipelineId}/workflows/publish`) }, enabled: !!pipelineId, }) diff --git a/web/service/use-workflow.ts b/web/service/use-workflow.ts index 103e3b77b0..b9620aae40 100644 --- a/web/service/use-workflow.ts +++ b/web/service/use-workflow.ts @@ -19,10 +19,10 @@ import { getFlowPrefix } from './utils' const NAME_SPACE = 'workflow' export const useAppWorkflow = (appID: string) => { - return useQuery({ + return useQuery({ enabled: !!appID, queryKey: [NAME_SPACE, 'publish', appID], - queryFn: () => get(`/apps/${appID}/workflows/publish`), + queryFn: () => get(`/apps/${appID}/workflows/publish`), }) } diff --git a/web/service/workflow.ts b/web/service/workflow.ts index 2db8dffe53..3ecbf82637 100644 --- a/web/service/workflow.ts +++ b/web/service/workflow.ts @@ -43,7 +43,7 @@ export const getLoopSingleNodeRunUrl = (flowType: FlowType, isChatFlow: boolean, } export const fetchPublishedWorkflow = (url: string) => { - return get(url) + return get(url) } export const stopWorkflowRun = (url: string) => {