test: migrate workflow service tests to testcontainers (#34206)

This commit is contained in:
YBoy
2026-03-30 00:50:21 +03:00
committed by GitHub
parent fe9c2b0e4b
commit b36b077d42
2 changed files with 118 additions and 415 deletions

View File

@ -555,6 +555,124 @@ class TestWorkflowService:
assert len(result_workflows) == 2
assert all(wf.marked_name for wf in result_workflows)
def test_get_all_published_workflow_no_workflow_id(self, db_session_with_containers: Session):
"""Test that an app with no workflow_id returns empty results."""
# Arrange
fake = Faker()
app = self._create_test_app(db_session_with_containers, fake)
app.workflow_id = None
db_session_with_containers.commit()
workflow_service = WorkflowService()
# Act
result_workflows, has_more = workflow_service.get_all_published_workflow(
session=db_session_with_containers, app_model=app, page=1, limit=10, user_id=None
)
# Assert
assert result_workflows == []
assert has_more is False
def test_get_all_published_workflow_basic(self, db_session_with_containers: Session):
"""Test basic retrieval of published workflows."""
# Arrange
fake = Faker()
account = self._create_test_account(db_session_with_containers, fake)
app = self._create_test_app(db_session_with_containers, fake)
workflow1 = self._create_test_workflow(db_session_with_containers, app, account, fake)
workflow1.version = "2024.01.01.001"
workflow2 = self._create_test_workflow(db_session_with_containers, app, account, fake)
workflow2.version = "2024.01.02.001"
app.workflow_id = workflow1.id
db_session_with_containers.commit()
workflow_service = WorkflowService()
# Act
result_workflows, has_more = workflow_service.get_all_published_workflow(
session=db_session_with_containers, app_model=app, page=1, limit=10, user_id=None
)
# Assert
assert len(result_workflows) == 2
assert has_more is False
def test_get_all_published_workflow_combined_filters(self, db_session_with_containers: Session):
"""Test combined user_id and named_only filters."""
# Arrange
fake = Faker()
account1 = self._create_test_account(db_session_with_containers, fake)
account2 = self._create_test_account(db_session_with_containers, fake)
app = self._create_test_app(db_session_with_containers, fake)
# account1 named
wf1 = self._create_test_workflow(db_session_with_containers, app, account1, fake)
wf1.version = "2024.01.01.001"
wf1.marked_name = "Named by user1"
wf1.created_by = account1.id
# account1 unnamed
wf2 = self._create_test_workflow(db_session_with_containers, app, account1, fake)
wf2.version = "2024.01.02.001"
wf2.marked_name = ""
wf2.created_by = account1.id
# account2 named
wf3 = self._create_test_workflow(db_session_with_containers, app, account2, fake)
wf3.version = "2024.01.03.001"
wf3.marked_name = "Named by user2"
wf3.created_by = account2.id
app.workflow_id = wf1.id
db_session_with_containers.commit()
workflow_service = WorkflowService()
# Act - Filter by account1 + named_only
result_workflows, has_more = workflow_service.get_all_published_workflow(
session=db_session_with_containers,
app_model=app,
page=1,
limit=10,
user_id=account1.id,
named_only=True,
)
# Assert - Only wf1 matches (account1 + named)
assert len(result_workflows) == 1
assert result_workflows[0].marked_name == "Named by user1"
assert result_workflows[0].created_by == account1.id
def test_get_all_published_workflow_empty_result(self, db_session_with_containers: Session):
"""Test that querying with no matching workflows returns empty."""
# Arrange
fake = Faker()
account = self._create_test_account(db_session_with_containers, fake)
app = self._create_test_app(db_session_with_containers, fake)
# Create a draft workflow (no version set = draft)
workflow = self._create_test_workflow(db_session_with_containers, app, account, fake)
app.workflow_id = workflow.id
db_session_with_containers.commit()
workflow_service = WorkflowService()
# Act - Filter by a user that has no workflows
result_workflows, has_more = workflow_service.get_all_published_workflow(
session=db_session_with_containers,
app_model=app,
page=1,
limit=10,
user_id="00000000-0000-0000-0000-000000000000",
)
# Assert
assert result_workflows == []
assert has_more is False
def test_sync_draft_workflow_create_new(self, db_session_with_containers: Session):
"""
Test creating a new draft workflow through sync operation.

View File

@ -1,415 +0,0 @@
from contextlib import nullcontext
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from graphon.entities.graph_config import NodeConfigDictAdapter
from graphon.enums import BuiltinNodeTypes
from graphon.nodes.human_input.entities import FormInput, HumanInputNodeData, UserAction
from graphon.nodes.human_input.enums import FormInputType
from models.model import App
from models.workflow import Workflow
from services import workflow_service as workflow_service_module
from services.workflow_service import WorkflowService
class TestWorkflowService:
@pytest.fixture
def workflow_service(self):
mock_session_maker = MagicMock()
return WorkflowService(mock_session_maker)
@pytest.fixture
def mock_app(self):
app = MagicMock(spec=App)
app.id = "app-id-1"
app.workflow_id = "workflow-id-1"
app.tenant_id = "tenant-id-1"
return app
@pytest.fixture
def mock_workflows(self):
workflows = []
for i in range(5):
workflow = MagicMock(spec=Workflow)
workflow.id = f"workflow-id-{i}"
workflow.app_id = "app-id-1"
workflow.created_at = f"2023-01-0{5 - i}" # Descending date order
workflow.created_by = "user-id-1" if i % 2 == 0 else "user-id-2"
workflow.marked_name = f"Workflow {i}" if i % 2 == 0 else ""
workflows.append(workflow)
return workflows
@pytest.fixture
def dummy_session_cls(self):
class DummySession:
def __init__(self, *args, **kwargs):
self.commit = MagicMock()
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def begin(self):
return nullcontext()
return DummySession
def test_get_all_published_workflow_no_workflow_id(self, workflow_service, mock_app):
mock_app.workflow_id = None
mock_session = MagicMock()
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=10, user_id=None
)
assert workflows == []
assert has_more is False
mock_session.scalars.assert_not_called()
def test_get_all_published_workflow_basic(self, workflow_service, mock_app, mock_workflows):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
mock_scalar_result.all.return_value = mock_workflows[:3]
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=3, user_id=None
)
assert workflows == mock_workflows[:3]
assert has_more is False
mock_session.scalars.assert_called_once()
def test_get_all_published_workflow_pagination(self, workflow_service, mock_app, mock_workflows):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
# Return 4 items when limit is 3, which should indicate has_more=True
mock_scalar_result.all.return_value = mock_workflows[:4]
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=3, user_id=None
)
# Should return only the first 3 items
assert len(workflows) == 3
assert workflows == mock_workflows[:3]
assert has_more is True
# Test page 2
mock_scalar_result.all.return_value = mock_workflows[3:]
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=2, limit=3, user_id=None
)
assert len(workflows) == 2
assert has_more is False
def test_get_all_published_workflow_user_filter(self, workflow_service, mock_app, mock_workflows):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
# Filter workflows for user-id-1
filtered_workflows = [w for w in mock_workflows if w.created_by == "user-id-1"]
mock_scalar_result.all.return_value = filtered_workflows
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=10, user_id="user-id-1"
)
assert workflows == filtered_workflows
assert has_more is False
mock_session.scalars.assert_called_once()
# Verify that the select contains a user filter clause
args = mock_session.scalars.call_args[0][0]
assert "created_by" in str(args)
def test_get_all_published_workflow_named_only(self, workflow_service, mock_app, mock_workflows):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
# Filter workflows that have a marked_name
named_workflows = [w for w in mock_workflows if w.marked_name]
mock_scalar_result.all.return_value = named_workflows
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=10, user_id=None, named_only=True
)
assert workflows == named_workflows
assert has_more is False
mock_session.scalars.assert_called_once()
# Verify that the select contains a named_only filter clause
args = mock_session.scalars.call_args[0][0]
assert "marked_name !=" in str(args)
def test_get_all_published_workflow_combined_filters(self, workflow_service, mock_app, mock_workflows):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
# Combined filter: user-id-1 and has marked_name
filtered_workflows = [w for w in mock_workflows if w.created_by == "user-id-1" and w.marked_name]
mock_scalar_result.all.return_value = filtered_workflows
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=10, user_id="user-id-1", named_only=True
)
assert workflows == filtered_workflows
assert has_more is False
mock_session.scalars.assert_called_once()
# Verify that both filters are applied
args = mock_session.scalars.call_args[0][0]
assert "created_by" in str(args)
assert "marked_name !=" in str(args)
def test_get_all_published_workflow_empty_result(self, workflow_service, mock_app):
mock_session = MagicMock()
mock_scalar_result = MagicMock()
mock_scalar_result.all.return_value = []
mock_session.scalars.return_value = mock_scalar_result
workflows, has_more = workflow_service.get_all_published_workflow(
session=mock_session, app_model=mock_app, page=1, limit=10, user_id=None
)
assert workflows == []
assert has_more is False
mock_session.scalars.assert_called_once()
def test_submit_human_input_form_preview_uses_rendered_content(
self,
workflow_service: WorkflowService,
monkeypatch: pytest.MonkeyPatch,
dummy_session_cls,
) -> None:
service = workflow_service
node_data = HumanInputNodeData(
title="Human Input",
form_content="<p>{{#$output.name#}}</p>",
inputs=[FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="name")],
user_actions=[UserAction(id="approve", title="Approve")],
)
node = MagicMock()
node.node_data = node_data
node.render_form_content_before_submission.return_value = "<p>preview</p>"
node.render_form_content_with_outputs.return_value = "<p>rendered</p>"
service._build_human_input_variable_pool = MagicMock(return_value=MagicMock()) # type: ignore[method-assign]
service._build_human_input_node = MagicMock(return_value=node) # type: ignore[method-assign]
workflow = MagicMock()
node_config = NodeConfigDictAdapter.validate_python(
{"id": "node-1", "data": {"type": BuiltinNodeTypes.HUMAN_INPUT}}
)
workflow.get_node_config_by_id.return_value = node_config
workflow.get_enclosing_node_type_and_id.return_value = None
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
saved_outputs: dict[str, object] = {}
class DummySaver:
def __init__(self, *args, **kwargs):
pass
def save(self, outputs, process_data):
saved_outputs.update(outputs)
monkeypatch.setattr(workflow_service_module, "Session", dummy_session_cls)
monkeypatch.setattr(workflow_service_module, "DraftVariableSaver", DummySaver)
monkeypatch.setattr(workflow_service_module, "db", SimpleNamespace(engine=MagicMock()))
app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1")
account = SimpleNamespace(id="account-1")
result = service.submit_human_input_form_preview(
app_model=app_model,
account=account,
node_id="node-1",
form_inputs={"name": "Ada", "extra": "ignored"},
inputs={"#node-0.result#": "LLM output"},
action="approve",
)
service._build_human_input_variable_pool.assert_called_once_with(
app_model=app_model,
workflow=workflow,
node_config=node_config,
manual_inputs={"#node-0.result#": "LLM output"},
user_id="account-1",
)
node.render_form_content_with_outputs.assert_called_once()
called_args = node.render_form_content_with_outputs.call_args.args
assert called_args[0] == "<p>preview</p>"
assert called_args[2] == node_data.outputs_field_names()
rendered_outputs = called_args[1]
assert rendered_outputs["name"] == "Ada"
assert rendered_outputs["extra"] == "ignored"
assert "extra" in saved_outputs
assert "extra" in result
assert saved_outputs["name"] == "Ada"
assert result["name"] == "Ada"
assert result["__action_id"] == "approve"
assert "__rendered_content" in result
def test_submit_human_input_form_preview_missing_inputs_message(self, workflow_service: WorkflowService) -> None:
service = workflow_service
node_data = HumanInputNodeData(
title="Human Input",
form_content="<p>{{#$output.name#}}</p>",
inputs=[FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="name")],
user_actions=[UserAction(id="approve", title="Approve")],
)
node = MagicMock()
node.node_data = node_data
node._render_form_content_before_submission.return_value = "<p>preview</p>"
node._render_form_content_with_outputs.return_value = "<p>rendered</p>"
service._build_human_input_variable_pool = MagicMock(return_value=MagicMock()) # type: ignore[method-assign]
service._build_human_input_node = MagicMock(return_value=node) # type: ignore[method-assign]
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = NodeConfigDictAdapter.validate_python(
{"id": "node-1", "data": {"type": BuiltinNodeTypes.HUMAN_INPUT}}
)
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1")
account = SimpleNamespace(id="account-1")
with pytest.raises(ValueError) as exc_info:
service.submit_human_input_form_preview(
app_model=app_model,
account=account,
node_id="node-1",
form_inputs={},
inputs={},
action="approve",
)
assert "Missing required inputs" in str(exc_info.value)
def test_run_draft_workflow_node_successful_behavior(
self, workflow_service, mock_app, monkeypatch, dummy_session_cls
):
"""Behavior: When a basic workflow node runs, it correctly sets up context,
executes the node, and saves outputs."""
service = workflow_service
account = SimpleNamespace(id="account-1")
mock_workflow = MagicMock()
mock_workflow.id = "wf-1"
mock_workflow.tenant_id = "tenant-1"
mock_workflow.environment_variables = []
mock_workflow.conversation_variables = []
# Mock node config
mock_workflow.get_node_config_by_id.return_value = NodeConfigDictAdapter.validate_python(
{"id": "node-1", "data": {"type": BuiltinNodeTypes.LLM}}
)
mock_workflow.get_enclosing_node_type_and_id.return_value = None
# Mock class methods
monkeypatch.setattr(workflow_service_module, "WorkflowDraftVariableService", MagicMock())
monkeypatch.setattr(workflow_service_module, "DraftVarLoader", MagicMock())
# Mock workflow entry execution
mock_node_exec = MagicMock()
mock_node_exec.id = "exec-1"
mock_node_exec.process_data = {}
mock_run = MagicMock()
monkeypatch.setattr(workflow_service_module.WorkflowEntry, "single_step_run", mock_run)
# Mock execution handling
service._handle_single_step_result = MagicMock(return_value=mock_node_exec)
# Mock repository
mock_repo = MagicMock()
mock_repo.get_execution_by_id.return_value = mock_node_exec
mock_repo_factory = MagicMock(return_value=mock_repo)
monkeypatch.setattr(
workflow_service_module.DifyCoreRepositoryFactory,
"create_workflow_node_execution_repository",
mock_repo_factory,
)
service._node_execution_service_repo = mock_repo
# Set up node execution service repo mock to return our exec node
mock_node_exec.load_full_outputs.return_value = {"output_var": "result_value"}
mock_node_exec.node_id = "node-1"
mock_node_exec.node_type = "llm"
# Mock draft variable saver
mock_saver = MagicMock()
monkeypatch.setattr(workflow_service_module, "DraftVariableSaver", MagicMock(return_value=mock_saver))
# Mock DB
monkeypatch.setattr(workflow_service_module, "db", SimpleNamespace(engine=MagicMock()))
monkeypatch.setattr(workflow_service_module, "Session", dummy_session_cls)
# Act
result = service.run_draft_workflow_node(
app_model=mock_app,
draft_workflow=mock_workflow,
node_id="node-1",
user_inputs={"input_val": "test"},
account=account,
)
# Assert
assert result == mock_node_exec
service._handle_single_step_result.assert_called_once()
mock_repo.save.assert_called_once_with(mock_node_exec)
mock_saver.save.assert_called_once_with(process_data={}, outputs={"output_var": "result_value"})
def test_run_draft_workflow_node_failure_behavior(self, workflow_service, mock_app, monkeypatch, dummy_session_cls):
"""Behavior: If retrieving the saved execution fails, an appropriate error bubble matches expectations."""
service = workflow_service
account = SimpleNamespace(id="account-1")
mock_workflow = MagicMock()
mock_workflow.tenant_id = "tenant-1"
mock_workflow.environment_variables = []
mock_workflow.conversation_variables = []
mock_workflow.get_node_config_by_id.return_value = NodeConfigDictAdapter.validate_python(
{"id": "node-1", "data": {"type": BuiltinNodeTypes.LLM}}
)
mock_workflow.get_enclosing_node_type_and_id.return_value = None
monkeypatch.setattr(workflow_service_module, "WorkflowDraftVariableService", MagicMock())
monkeypatch.setattr(workflow_service_module, "DraftVarLoader", MagicMock())
monkeypatch.setattr(workflow_service_module.WorkflowEntry, "single_step_run", MagicMock())
mock_node_exec = MagicMock()
mock_node_exec.id = "exec-invalid"
service._handle_single_step_result = MagicMock(return_value=mock_node_exec)
mock_repo = MagicMock()
mock_repo_factory = MagicMock(return_value=mock_repo)
monkeypatch.setattr(
workflow_service_module.DifyCoreRepositoryFactory,
"create_workflow_node_execution_repository",
mock_repo_factory,
)
service._node_execution_service_repo = mock_repo
# Simulate failure to retrieve the saved execution
mock_repo.get_execution_by_id.return_value = None
monkeypatch.setattr(workflow_service_module, "db", SimpleNamespace(engine=MagicMock()))
monkeypatch.setattr(workflow_service_module, "Session", dummy_session_cls)
# Act & Assert
with pytest.raises(ValueError, match="WorkflowNodeExecution with id exec-invalid not found after saving"):
service.run_draft_workflow_node(
app_model=mock_app, draft_workflow=mock_workflow, node_id="node-1", user_inputs={}, account=account
)