From 2ea85d3ba2ac42cf6281533898c12222850ee0b2 Mon Sep 17 00:00:00 2001 From: tmimmanuel <14046872+tmimmanuel@users.noreply.github.com> Date: Thu, 26 Mar 2026 21:34:44 +0100 Subject: [PATCH] refactor: use EnumText for model_type and WorkflowNodeExecution.status (#34093) Co-authored-by: Krishna Chaitanya --- .../ops/arize_phoenix_trace/arize_phoenix_trace.py | 5 +++-- .../logstore_api_workflow_node_execution_repository.py | 2 +- api/models/workflow.py | 10 ++++++++-- api/tasks/workflow_node_execution_tasks.py | 4 ++-- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py b/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py index e354c3909a..724127c31c 100644 --- a/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py +++ b/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py @@ -39,6 +39,7 @@ from core.ops.entities.trace_entity import ( ) from core.repositories import DifyCoreRepositoryFactory from extensions.ext_database import db +from graphon.enums import WorkflowNodeExecutionStatus from models.model import EndUser, MessageFile from models.workflow import WorkflowNodeExecutionTriggeredFrom @@ -300,7 +301,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): "app_name": node_execution.title, "status": node_execution.status, "status_message": node_execution.error or "", - "level": "ERROR" if node_execution.status == "failed" else "DEFAULT", + "level": "ERROR" if node_execution.status == WorkflowNodeExecutionStatus.FAILED else "DEFAULT", } ) @@ -361,7 +362,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): llm_attributes.update(self._construct_llm_attributes(process_data.get("prompts", []))) node_span.set_attributes(llm_attributes) finally: - if node_execution.status == "failed": + if node_execution.status == WorkflowNodeExecutionStatus.FAILED: set_span_status(node_span, node_execution.error) else: set_span_status(node_span) diff --git a/api/extensions/logstore/repositories/logstore_api_workflow_node_execution_repository.py b/api/extensions/logstore/repositories/logstore_api_workflow_node_execution_repository.py index bdfa984874..64ff0f0674 100644 --- a/api/extensions/logstore/repositories/logstore_api_workflow_node_execution_repository.py +++ b/api/extensions/logstore/repositories/logstore_api_workflow_node_execution_repository.py @@ -60,7 +60,7 @@ def _dict_to_workflow_node_execution_model(data: dict[str, Any]) -> WorkflowNode model.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN model.node_id = data.get("node_id") or "" model.node_type = data.get("node_type") or "" - model.status = data.get("status") or "running" # Default status if missing + model.status = WorkflowNodeExecutionStatus(data.get("status") or "running") model.title = data.get("title") or "" created_by_role_val = data.get("created_by_role") try: diff --git a/api/models/workflow.py b/api/models/workflow.py index d15bf71d39..0557e2e890 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -33,7 +33,13 @@ from extensions.ext_storage import Storage from factories.variable_factory import TypeMismatchError, build_segment_with_type from graphon.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter from graphon.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType, SchedulingPause -from graphon.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey +from graphon.enums import ( + BuiltinNodeTypes, + NodeType, + WorkflowExecutionStatus, + WorkflowNodeExecutionMetadataKey, + WorkflowNodeExecutionStatus, +) from graphon.file.constants import maybe_file_object from graphon.file.models import File from graphon.variables import utils as variable_utils @@ -941,7 +947,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo inputs: Mapped[str | None] = mapped_column(LongText) process_data: Mapped[str | None] = mapped_column(LongText) outputs: Mapped[str | None] = mapped_column(LongText) - status: Mapped[str] = mapped_column(String(255)) + status: Mapped[WorkflowNodeExecutionStatus] = mapped_column(EnumText(WorkflowNodeExecutionStatus, length=255)) error: Mapped[str | None] = mapped_column(LongText) elapsed_time: Mapped[float] = mapped_column(sa.Float, server_default=sa.text("0")) execution_metadata: Mapped[str | None] = mapped_column(LongText) diff --git a/api/tasks/workflow_node_execution_tasks.py b/api/tasks/workflow_node_execution_tasks.py index a0fd739325..b823ce3961 100644 --- a/api/tasks/workflow_node_execution_tasks.py +++ b/api/tasks/workflow_node_execution_tasks.py @@ -125,7 +125,7 @@ def _create_node_execution_from_domain( else: node_execution.execution_metadata = "{}" - node_execution.status = execution.status.value + node_execution.status = execution.status node_execution.error = execution.error node_execution.elapsed_time = execution.elapsed_time node_execution.created_by_role = creator_user_role @@ -159,7 +159,7 @@ def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionMode node_execution.execution_metadata = "{}" # Update other fields - node_execution.status = execution.status.value + node_execution.status = execution.status node_execution.error = execution.error node_execution.elapsed_time = execution.elapsed_time node_execution.finished_at = execution.finished_at