import dataclasses import json from unittest import mock from uuid import uuid4 from constants import HIDDEN_VALUE from core.helper import encrypter from core.workflow.file_reference import build_file_reference from factories.variable_factory import build_segment from graphon.file import File, FileTransferMethod, FileType from graphon.variables import FloatVariable, IntegerVariable, SecretVariable, StringVariable from graphon.variables.segments import IntegerSegment, Segment from models.workflow import ( Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel, is_system_variable_editable, ) _LEGACY_FILE_TEMPLATE = "{{#" + ".".join(("sys", "files")) + "#}}" _LEGACY_FILE_SELECTOR = ["sys", "files"] def test_environment_variables(): # tenant_id context variable removed - using current_user.current_tenant_id directly # Create a Workflow instance workflow = Workflow( tenant_id="tenant_id", app_id="app_id", type="workflow", version="draft", graph="{}", features="{}", created_by="account_id", environment_variables=[], conversation_variables=[], ) # Create some EnvironmentVariable instances variable1 = StringVariable(name="var1", value="value1", id=str(uuid4()), selector=["env", "var1"]) variable2 = IntegerVariable(name="var2", value=123, id=str(uuid4()), selector=["env", "var2"]) variable3 = SecretVariable(name="var3", value="secret", id=str(uuid4()), selector=["env", "var3"]) variable4 = FloatVariable(name="var4", value=3.14, id=str(uuid4()), selector=["env", "var4"]) with ( mock.patch("core.helper.encrypter.encrypt_token", return_value="encrypted_token"), mock.patch("core.helper.encrypter.decrypt_token", return_value="secret"), ): # Set the environment_variables property of the Workflow instance variables = [variable1, variable2, variable3, variable4] workflow.environment_variables = variables # Get the environment_variables property and assert its value assert workflow.environment_variables == variables def test_update_environment_variables(): # tenant_id context variable removed - using current_user.current_tenant_id directly # Create a Workflow instance workflow = Workflow( tenant_id="tenant_id", app_id="app_id", type="workflow", version="draft", graph="{}", features="{}", created_by="account_id", environment_variables=[], conversation_variables=[], ) # Create some EnvironmentVariable instances variable1 = StringVariable(name="var1", value="value1", id=str(uuid4()), selector=["env", "var1"]) variable2 = IntegerVariable(name="var2", value=123, id=str(uuid4()), selector=["env", "var2"]) variable3 = SecretVariable(name="var3", value="secret", id=str(uuid4()), selector=["env", "var3"]) variable4 = FloatVariable(name="var4", value=3.14, id=str(uuid4()), selector=["env", "var4"]) with ( mock.patch("core.helper.encrypter.encrypt_token", return_value="encrypted_token"), mock.patch("core.helper.encrypter.decrypt_token", return_value="secret"), ): variables = [variable1, variable2, variable3, variable4] # Set the environment_variables property of the Workflow instance workflow.environment_variables = variables assert workflow.environment_variables == [variable1, variable2, variable3, variable4] # Update the name of variable3 and keep the value as it is variables[2] = variable3.model_copy( update={ "name": "new name", "value": HIDDEN_VALUE, } ) workflow.environment_variables = variables assert workflow.environment_variables[2].name == "new name" assert workflow.environment_variables[2].value == variable3.value def test_to_dict(): # tenant_id context variable removed - using current_user.current_tenant_id directly # Create a Workflow instance workflow = Workflow( tenant_id="tenant_id", app_id="app_id", type="workflow", version="draft", graph="{}", features="{}", created_by="account_id", environment_variables=[], conversation_variables=[], ) # Create some EnvironmentVariable instances with ( mock.patch("core.helper.encrypter.encrypt_token", return_value="encrypted_token"), mock.patch("core.helper.encrypter.decrypt_token", return_value="secret"), ): # Set the environment_variables property of the Workflow instance workflow.environment_variables = [ SecretVariable(name="secret", value="secret", id=str(uuid4())), StringVariable(name="text", value="text", id=str(uuid4())), ] workflow_dict = workflow.to_dict() assert workflow_dict["environment_variables"][0]["value"] == "" assert workflow_dict["environment_variables"][1]["value"] == "text" workflow_dict = workflow.to_dict(include_secret=True) assert workflow_dict["environment_variables"][0]["value"] == "secret" assert workflow_dict["environment_variables"][1]["value"] == "text" def test_normalize_environment_variable_mappings_converts_full_mask_to_hidden_value(): normalized = Workflow.normalize_environment_variable_mappings( [ { "id": str(uuid4()), "name": "secret", "value": encrypter.full_mask_token(), "value_type": "secret", } ] ) assert normalized[0]["value"] == HIDDEN_VALUE def test_normalize_environment_variable_mappings_keeps_hidden_value(): normalized = Workflow.normalize_environment_variable_mappings( [ { "id": str(uuid4()), "name": "secret", "value": HIDDEN_VALUE, "value_type": "secret", } ] ) assert normalized[0]["value"] == HIDDEN_VALUE class TestWorkflowNodeExecution: def test_execution_metadata_dict(self): node_exec = WorkflowNodeExecutionModel() node_exec.execution_metadata = None assert node_exec.execution_metadata_dict == {} original = {"a": 1, "b": ["2"]} node_exec.execution_metadata = json.dumps(original) assert node_exec.execution_metadata_dict == original class TestIsSystemVariableEditable: def test_is_system_variable(self): cases = [ ("query", True), ("files", True), ("dialogue_count", False), ("conversation_id", False), ("user_id", False), ("app_id", False), ("workflow_id", False), ("workflow_run_id", False), ] for name, editable in cases: assert editable == is_system_variable_editable(name) assert is_system_variable_editable("invalid_or_new_system_variable") == False class TestWorkflowLegacySysFilesCompatibility: def _make_workflow(self, graph: dict, *, features: dict | None = None) -> Workflow: return Workflow( tenant_id="tenant_id", app_id="app_id", type="workflow", version="draft", graph=json.dumps(graph), features=json.dumps(features or {}), created_by="account_id", environment_variables=[], conversation_variables=[], ) def test_graph_dict_rewrites_legacy_sys_files_references_to_start_variable(self): workflow = self._make_workflow( { "nodes": [ { "id": "start", "data": { "type": "start", "title": "Start", "variables": [], }, }, { "id": "llm", "data": { "type": "llm", "prompt_template": [{"role": "user", "text": f"files: {_LEGACY_FILE_TEMPLATE}"}], "context": {"variable_selector": _LEGACY_FILE_SELECTOR}, }, }, ], "edges": [], } ) graph = workflow.graph_dict start_node = next(node for node in graph["nodes"] if node["id"] == "start") llm_node = next(node for node in graph["nodes"] if node["id"] == "llm") compat_variable = start_node["data"]["variables"][0] assert compat_variable["variable"] == "sys_files" assert compat_variable["type"] == "file-list" assert compat_variable["required"] is False assert llm_node["data"]["prompt_template"][0]["text"] == "files: {{#start.sys_files#}}" assert llm_node["data"]["context"]["variable_selector"] == ["start", "sys_files"] stored_graph = json.loads(workflow.graph) stored_llm_node = next(node for node in stored_graph["nodes"] if node["id"] == "llm") assert stored_llm_node["data"]["prompt_template"][0]["text"] == "files: {{#start.sys_files#}}" assert stored_llm_node["data"]["context"]["variable_selector"] == ["start", "sys_files"] def test_graph_dict_uses_unique_compat_variable_name(self): workflow = self._make_workflow( { "nodes": [ { "id": "start", "data": { "type": "start", "title": "Start", "variables": [ {"variable": "sys_files", "label": "Existing", "type": "text-input"}, ], }, }, { "id": "answer", "data": { "type": "answer", "answer": _LEGACY_FILE_TEMPLATE, }, }, ], "edges": [], } ) graph = workflow.graph_dict start_node = next(node for node in graph["nodes"] if node["id"] == "start") answer_node = next(node for node in graph["nodes"] if node["id"] == "answer") assert [variable["variable"] for variable in start_node["data"]["variables"]] == [ "sys_files", "sys_files_1", ] assert start_node["data"]["variables"][1]["type"] == "file-list" assert answer_node["data"]["answer"] == "{{#start.sys_files_1#}}" def test_graph_dict_copies_file_upload_settings_to_compat_variable(self): workflow = self._make_workflow( { "nodes": [ { "id": "start", "data": { "type": "start", "title": "Start", "variables": [], }, }, { "id": "answer", "data": { "type": "answer", "answer": _LEGACY_FILE_TEMPLATE, }, }, ], "edges": [], }, features={ "file_upload": { "enabled": True, "allowed_file_upload_methods": ["remote_url"], "allowed_file_types": ["document", "custom"], "allowed_file_extensions": [".pdf"], "number_limits": 8, } }, ) graph = workflow.graph_dict start_node = next(node for node in graph["nodes"] if node["id"] == "start") compat_variable = start_node["data"]["variables"][0] assert compat_variable["allowed_file_upload_methods"] == ["remote_url"] assert compat_variable["allowed_file_types"] == ["document", "custom"] assert compat_variable["allowed_file_extensions"] == [".pdf"] assert compat_variable["max_length"] == 8 class TestWorkflowDraftVariableGetValue: def test_get_value_by_case(self): @dataclasses.dataclass class TestCase: name: str value: Segment tenant_id = "test_tenant_id" test_file = File( file_type=FileType.IMAGE, transfer_method=FileTransferMethod.REMOTE_URL, remote_url="https://example.com/example.jpg", filename="example.jpg", extension=".jpg", mime_type="image/jpeg", size=100, ) cases: list[TestCase] = [ TestCase( name="number/int", value=build_segment(1), ), TestCase( name="number/float", value=build_segment(1.0), ), TestCase( name="string", value=build_segment("a"), ), TestCase( name="object", value=build_segment({}), ), TestCase( name="file", value=build_segment(test_file), ), TestCase( name="array[any]", value=build_segment([1, "a"]), ), TestCase( name="array[string]", value=build_segment(["a", "b"]), ), TestCase( name="array[number]/int", value=build_segment([1, 2]), ), TestCase( name="array[number]/float", value=build_segment([1.0, 2.0]), ), TestCase( name="array[number]/mixed", value=build_segment([1, 2.0]), ), TestCase( name="array[object]", value=build_segment([{}, {"a": 1}]), ), TestCase( name="none", value=build_segment(None), ), ] for idx, c in enumerate(cases, 1): fail_msg = f"test case {c.name} failed, index={idx}" draft_var = WorkflowDraftVariable() draft_var.set_value(c.value) assert c.value == draft_var.get_value(), fail_msg def test_file_variable_preserves_all_fields(self): """Test that File type variables preserve all fields during encoding/decoding.""" tenant_id = "test_tenant_id" # Create a File with specific field values test_file = File( file_id="test_file_id", file_type=FileType.IMAGE, transfer_method=FileTransferMethod.REMOTE_URL, remote_url="https://example.com/test.jpg", filename="test.jpg", extension=".jpg", mime_type="image/jpeg", size=12345, # Specific size to test preservation storage_key="test_storage_key", ) # Create a FileSegment and WorkflowDraftVariable file_segment = build_segment(test_file) draft_var = WorkflowDraftVariable() draft_var.set_value(file_segment) # Retrieve the value and verify all fields are preserved retrieved_segment = draft_var.get_value() retrieved_file = retrieved_segment.value # Verify all important fields are preserved assert retrieved_file.id == test_file.id assert retrieved_file.type == test_file.type assert retrieved_file.transfer_method == test_file.transfer_method assert retrieved_file.remote_url == test_file.remote_url assert retrieved_file.filename == test_file.filename assert retrieved_file.extension == test_file.extension assert retrieved_file.mime_type == test_file.mime_type assert retrieved_file.size == test_file.size # This was the main issue being fixed # Note: storage_key is not serialized in model_dump() so it won't be preserved # Verify the segments have the same type and the important fields match assert file_segment.value_type == retrieved_segment.value_type def test_file_variable_rebuilds_storage_backed_payloads_with_app_tenant(self): persisted_file = File( file_id="test_file_id", file_type=FileType.DOCUMENT, transfer_method=FileTransferMethod.LOCAL_FILE, reference=build_file_reference(record_id="upload-1", storage_key="legacy-storage-key"), filename="test.txt", extension=".txt", mime_type="text/plain", size=12, ) rebuilt_file = File( file_id="test_file_id", file_type=FileType.DOCUMENT, transfer_method=FileTransferMethod.LOCAL_FILE, reference=build_file_reference(record_id="upload-1"), filename="test.txt", extension=".txt", mime_type="text/plain", size=12, storage_key="canonical-storage-key", ) draft_var = WorkflowDraftVariable() draft_var.app_id = "app-1" draft_var.set_value(build_segment(persisted_file)) draft_var._WorkflowDraftVariable__value = None with ( mock.patch("models.workflow._resolve_workflow_app_tenant_id", return_value="tenant-1"), mock.patch("models.workflow.build_file_from_stored_mapping", return_value=rebuilt_file) as rebuild_file, ): retrieved_segment = draft_var.get_value() assert retrieved_segment.value == rebuilt_file rebuild_file.assert_called_once() assert rebuild_file.call_args.kwargs["tenant_id"] == "tenant-1" def test_get_and_set_value(self): draft_var = WorkflowDraftVariable() int_var = IntegerSegment(value=1) draft_var.set_value(int_var) value = draft_var.get_value() assert value == int_var