Merge branch 'main' into feat/r2

# Conflicts:
#	api/core/plugin/impl/oauth.py
#	api/core/workflow/entities/variable_pool.py
#	api/models/workflow.py
#	api/services/dataset_service.py
This commit is contained in:
jyong
2025-06-25 14:35:23 +08:00
419 changed files with 17049 additions and 3831 deletions

View File

@ -7,10 +7,16 @@ from typing import TYPE_CHECKING, Any, Optional, Union
from uuid import uuid4
from flask_login import current_user
from sqlalchemy import orm
from core.file.constants import maybe_file_object
from core.file.models import File
from core.variables import utils as variable_utils
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from factories.variable_factory import build_segment
from core.workflow.nodes.enums import NodeType
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from ._workflow_exc import NodeNotFoundError, WorkflowDataError
if TYPE_CHECKING:
from models.model import AppMode
@ -73,6 +79,10 @@ class WorkflowType(Enum):
return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
class _InvalidGraphDefinitionError(Exception):
pass
class Workflow(Base):
"""
Workflow, for `Workflow App` and `Chat App workflow mode`.
@ -140,6 +150,8 @@ class Workflow(Base):
"rag_pipeline_variables", db.Text, nullable=False, server_default="{}"
)
VERSION_DRAFT = "draft"
@classmethod
def new(
cls,
@ -185,8 +197,72 @@ class Workflow(Base):
@property
def graph_dict(self) -> Mapping[str, Any]:
# TODO(QuantumGhost): Consider caching `graph_dict` to avoid repeated JSON decoding.
#
# Using `functools.cached_property` could help, but some code in the codebase may
# modify the returned dict, which can cause issues elsewhere.
#
# For example, changing this property to a cached property led to errors like the
# following when single stepping an `Iteration` node:
#
# Root node id 1748401971780start not found in the graph
#
# There is currently no standard way to make a dict deeply immutable in Python,
# and tracking modifications to the returned dict is difficult. For now, we leave
# the code as-is to avoid these issues.
#
# Currently, the following functions / methods would mutate the returned dict:
#
# - `_get_graph_and_variable_pool_of_single_iteration`.
# - `_get_graph_and_variable_pool_of_single_loop`.
return json.loads(self.graph) if self.graph else {}
def get_node_config_by_id(self, node_id: str) -> Mapping[str, Any]:
"""Extract a node configuration from the workflow graph by node ID.
A node configuration is a dictionary containing the node's properties, including
the node's id, title, and its data as a dict.
"""
workflow_graph = self.graph_dict
if not workflow_graph:
raise WorkflowDataError(f"workflow graph not found, workflow_id={self.id}")
nodes = workflow_graph.get("nodes")
if not nodes:
raise WorkflowDataError("nodes not found in workflow graph")
try:
node_config = next(filter(lambda node: node["id"] == node_id, nodes))
except StopIteration:
raise NodeNotFoundError(node_id)
assert isinstance(node_config, dict)
return node_config
@staticmethod
def get_node_type_from_node_config(node_config: Mapping[str, Any]) -> NodeType:
"""Extract type of a node from the node configuration returned by `get_node_config_by_id`."""
node_config_data = node_config.get("data", {})
# Get node class
node_type = NodeType(node_config_data.get("type"))
return node_type
@staticmethod
def get_enclosing_node_type_and_id(node_config: Mapping[str, Any]) -> tuple[NodeType, str] | None:
in_loop = node_config.get("isInLoop", False)
in_iteration = node_config.get("isInIteration", False)
if in_loop:
loop_id = node_config.get("loop_id")
if loop_id is None:
raise _InvalidGraphDefinitionError("invalid graph")
return NodeType.LOOP, loop_id
elif in_iteration:
iteration_id = node_config.get("iteration_id")
if iteration_id is None:
raise _InvalidGraphDefinitionError("invalid graph")
return NodeType.ITERATION, iteration_id
else:
return None
@property
def features(self) -> str:
"""
@ -400,6 +476,10 @@ class Workflow(Base):
ensure_ascii=False,
)
@staticmethod
def version_from_datetime(d: datetime) -> str:
return str(d)
class WorkflowRun(Base):
"""
@ -410,7 +490,7 @@ class WorkflowRun(Base):
- id (uuid) Run ID
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- sequence_number (int) Auto-increment sequence number, incremented within the App, starting from 1
- workflow_id (uuid) Workflow ID
- type (string) Workflow type
- triggered_from (string) Trigger source
@ -443,13 +523,12 @@ class WorkflowRun(Base):
__table_args__ = (
db.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
db.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
db.Index("workflow_run_tenant_app_sequence_idx", "tenant_id", "app_id", "sequence_number"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID)
app_id: Mapped[str] = mapped_column(StringUUID)
sequence_number: Mapped[int] = mapped_column()
workflow_id: Mapped[str] = mapped_column(StringUUID)
type: Mapped[str] = mapped_column(db.String(255))
triggered_from: Mapped[str] = mapped_column(db.String(255))
@ -509,7 +588,6 @@ class WorkflowRun(Base):
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"sequence_number": self.sequence_number,
"workflow_id": self.workflow_id,
"type": self.type,
"triggered_from": self.triggered_from,
@ -535,7 +613,6 @@ class WorkflowRun(Base):
id=data.get("id"),
tenant_id=data.get("tenant_id"),
app_id=data.get("app_id"),
sequence_number=data.get("sequence_number"),
workflow_id=data.get("workflow_id"),
type=data.get("type"),
triggered_from=data.get("triggered_from"),
@ -863,8 +940,18 @@ def _naive_utc_datetime():
class WorkflowDraftVariable(Base):
"""`WorkflowDraftVariable` record variables and outputs generated during
debugging worfklow or chatflow.
IMPORTANT: This model maintains multiple invariant rules that must be preserved.
Do not instantiate this class directly with the constructor.
Instead, use the factory methods (`new_conversation_variable`, `new_sys_variable`,
`new_node_variable`) defined below to ensure all invariants are properly maintained.
"""
@staticmethod
def unique_columns() -> list[str]:
def unique_app_id_node_id_name() -> list[str]:
return [
"app_id",
"node_id",
@ -872,7 +959,9 @@ class WorkflowDraftVariable(Base):
]
__tablename__ = "workflow_draft_variables"
__table_args__ = (UniqueConstraint(*unique_columns()),)
__table_args__ = (UniqueConstraint(*unique_app_id_node_id_name()),)
# Required for instance variable annotation.
__allow_unmapped__ = True
# id is the unique identifier of a draft variable.
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=db.text("uuid_generate_v4()"))
@ -953,6 +1042,36 @@ class WorkflowDraftVariable(Base):
default=None,
)
# Cache for deserialized value
#
# NOTE(QuantumGhost): This field serves two purposes:
#
# 1. Caches deserialized values to reduce repeated parsing costs
# 2. Allows modification of the deserialized value after retrieval,
# particularly important for `File`` variables which require database
# lookups to obtain storage_key and other metadata
#
# Use double underscore prefix for better encapsulation,
# making this attribute harder to access from outside the class.
__value: Segment | None
def __init__(self, *args, **kwargs):
"""
The constructor of `WorkflowDraftVariable` is not intended for
direct use outside this file. Its solo purpose is setup private state
used by the model instance.
Please use the factory methods
(`new_conversation_variable`, `new_sys_variable`, `new_node_variable`)
defined below to create instances of this class.
"""
super().__init__(*args, **kwargs)
self.__value = None
@orm.reconstructor
def _init_on_load(self):
self.__value = None
def get_selector(self) -> list[str]:
selector = json.loads(self.selector)
if not isinstance(selector, list):
@ -967,15 +1086,92 @@ class WorkflowDraftVariable(Base):
def _set_selector(self, value: list[str]):
self.selector = json.dumps(value)
def get_value(self) -> Segment | None:
return build_segment(json.loads(self.value))
def _loads_value(self) -> Segment:
value = json.loads(self.value)
return self.build_segment_with_type(self.value_type, value)
@staticmethod
def rebuild_file_types(value: Any) -> Any:
# NOTE(QuantumGhost): Temporary workaround for structured data handling.
# By this point, `output` has been converted to dict by
# `WorkflowEntry.handle_special_values`, so we need to
# reconstruct File objects from their serialized form
# to maintain proper variable saving behavior.
#
# Ideally, we should work with structured data objects directly
# rather than their serialized forms.
# However, multiple components in the codebase depend on
# `WorkflowEntry.handle_special_values`, making a comprehensive migration challenging.
if isinstance(value, dict):
if not maybe_file_object(value):
return value
return File.model_validate(value)
elif isinstance(value, list) and value:
first = value[0]
if not maybe_file_object(first):
return value
return [File.model_validate(i) for i in value]
else:
return value
@classmethod
def build_segment_with_type(cls, segment_type: SegmentType, value: Any) -> Segment:
# Extends `variable_factory.build_segment_with_type` functionality by
# reconstructing `FileSegment`` or `ArrayFileSegment`` objects from
# their serialized dictionary or list representations, respectively.
if segment_type == SegmentType.FILE:
if isinstance(value, File):
return build_segment_with_type(segment_type, value)
elif isinstance(value, dict):
file = cls.rebuild_file_types(value)
return build_segment_with_type(segment_type, file)
else:
raise TypeMismatchError(f"expected dict or File for FileSegment, got {type(value)}")
if segment_type == SegmentType.ARRAY_FILE:
if not isinstance(value, list):
raise TypeMismatchError(f"expected list for ArrayFileSegment, got {type(value)}")
file_list = cls.rebuild_file_types(value)
return build_segment_with_type(segment_type=segment_type, value=file_list)
return build_segment_with_type(segment_type=segment_type, value=value)
def get_value(self) -> Segment:
"""Decode the serialized value into its corresponding `Segment` object.
This method caches the result, so repeated calls will return the same
object instance without re-parsing the serialized data.
If you need to modify the returned `Segment`, use `value.model_copy()`
to create a copy first to avoid affecting the cached instance.
For more information about the caching mechanism, see the documentation
of the `__value` field.
Returns:
Segment: The deserialized value as a Segment object.
"""
if self.__value is not None:
return self.__value
value = self._loads_value()
self.__value = value
return value
def set_name(self, name: str):
self.name = name
self._set_selector([self.node_id, name])
def set_value(self, value: Segment):
self.value = json.dumps(value.value)
"""Updates the `value` and corresponding `value_type` fields in the database model.
This method also stores the provided Segment object in the deserialized cache
without creating a copy, allowing for efficient value access.
Args:
value: The Segment object to store as the variable's value.
"""
self.__value = value
self.value = json.dumps(value, cls=variable_utils.SegmentJSONEncoder)
self.value_type = value.value_type
def get_node_id(self) -> str | None:
@ -1001,6 +1197,7 @@ class WorkflowDraftVariable(Base):
node_id: str,
name: str,
value: Segment,
node_execution_id: str | None,
description: str = "",
) -> "WorkflowDraftVariable":
variable = WorkflowDraftVariable()
@ -1012,6 +1209,7 @@ class WorkflowDraftVariable(Base):
variable.name = name
variable.set_value(value)
variable._set_selector(list(variable_utils.to_selector(node_id, name)))
variable.node_execution_id = node_execution_id
return variable
@classmethod
@ -1021,13 +1219,17 @@ class WorkflowDraftVariable(Base):
app_id: str,
name: str,
value: Segment,
description: str = "",
) -> "WorkflowDraftVariable":
variable = cls._new(
app_id=app_id,
node_id=CONVERSATION_VARIABLE_NODE_ID,
name=name,
value=value,
description=description,
node_execution_id=None,
)
variable.editable = True
return variable
@classmethod
@ -1037,9 +1239,16 @@ class WorkflowDraftVariable(Base):
app_id: str,
name: str,
value: Segment,
node_execution_id: str,
editable: bool = False,
) -> "WorkflowDraftVariable":
variable = cls._new(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, value=value)
variable = cls._new(
app_id=app_id,
node_id=SYSTEM_VARIABLE_NODE_ID,
name=name,
node_execution_id=node_execution_id,
value=value,
)
variable.editable = editable
return variable
@ -1051,11 +1260,19 @@ class WorkflowDraftVariable(Base):
node_id: str,
name: str,
value: Segment,
node_execution_id: str,
visible: bool = True,
editable: bool = True,
) -> "WorkflowDraftVariable":
variable = cls._new(app_id=app_id, node_id=node_id, name=name, value=value)
variable = cls._new(
app_id=app_id,
node_id=node_id,
name=name,
node_execution_id=node_execution_id,
value=value,
)
variable.visible = visible
variable.editable = True
variable.editable = editable
return variable
@property