mirror of
https://github.com/langgenius/dify.git
synced 2026-03-25 00:07:56 +08:00
Merge branch 'main' into feat/trigger
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
from collections.abc import Mapping, Sequence
|
||||
from types import MappingProxyType
|
||||
from typing import Any
|
||||
|
||||
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator
|
||||
@ -112,3 +113,102 @@ class SystemVariable(BaseModel):
|
||||
if self.timestamp is not None:
|
||||
d[SystemVariableKey.TIMESTAMP] = self.timestamp
|
||||
return d
|
||||
|
||||
def as_view(self) -> "SystemVariableReadOnlyView":
|
||||
return SystemVariableReadOnlyView(self)
|
||||
|
||||
|
||||
class SystemVariableReadOnlyView:
|
||||
"""
|
||||
A read-only view of a SystemVariable that implements the ReadOnlySystemVariable protocol.
|
||||
|
||||
This class wraps a SystemVariable instance and provides read-only access to all its fields.
|
||||
It always reads the latest data from the wrapped instance and prevents any write operations.
|
||||
"""
|
||||
|
||||
def __init__(self, system_variable: SystemVariable) -> None:
|
||||
"""
|
||||
Initialize the read-only view with a SystemVariable instance.
|
||||
|
||||
Args:
|
||||
system_variable: The SystemVariable instance to wrap
|
||||
"""
|
||||
self._system_variable = system_variable
|
||||
|
||||
@property
|
||||
def user_id(self) -> str | None:
|
||||
return self._system_variable.user_id
|
||||
|
||||
@property
|
||||
def app_id(self) -> str | None:
|
||||
return self._system_variable.app_id
|
||||
|
||||
@property
|
||||
def workflow_id(self) -> str | None:
|
||||
return self._system_variable.workflow_id
|
||||
|
||||
@property
|
||||
def workflow_execution_id(self) -> str | None:
|
||||
return self._system_variable.workflow_execution_id
|
||||
|
||||
@property
|
||||
def query(self) -> str | None:
|
||||
return self._system_variable.query
|
||||
|
||||
@property
|
||||
def conversation_id(self) -> str | None:
|
||||
return self._system_variable.conversation_id
|
||||
|
||||
@property
|
||||
def dialogue_count(self) -> int | None:
|
||||
return self._system_variable.dialogue_count
|
||||
|
||||
@property
|
||||
def document_id(self) -> str | None:
|
||||
return self._system_variable.document_id
|
||||
|
||||
@property
|
||||
def original_document_id(self) -> str | None:
|
||||
return self._system_variable.original_document_id
|
||||
|
||||
@property
|
||||
def dataset_id(self) -> str | None:
|
||||
return self._system_variable.dataset_id
|
||||
|
||||
@property
|
||||
def batch(self) -> str | None:
|
||||
return self._system_variable.batch
|
||||
|
||||
@property
|
||||
def datasource_type(self) -> str | None:
|
||||
return self._system_variable.datasource_type
|
||||
|
||||
@property
|
||||
def invoke_from(self) -> str | None:
|
||||
return self._system_variable.invoke_from
|
||||
|
||||
@property
|
||||
def files(self) -> Sequence[File]:
|
||||
"""
|
||||
Get a copy of the files from the wrapped SystemVariable.
|
||||
|
||||
Returns:
|
||||
A defensive copy of the files sequence to prevent modification
|
||||
"""
|
||||
return tuple(self._system_variable.files) # Convert to immutable tuple
|
||||
|
||||
@property
|
||||
def datasource_info(self) -> Mapping[str, Any] | None:
|
||||
"""
|
||||
Get a copy of the datasource info from the wrapped SystemVariable.
|
||||
|
||||
Returns:
|
||||
A view of the datasource info mapping to prevent modification
|
||||
"""
|
||||
if self._system_variable.datasource_info is None:
|
||||
return None
|
||||
return MappingProxyType(self._system_variable.datasource_info)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return a string representation of the read-only view."""
|
||||
return f"SystemVariableReadOnlyView(system_variable={self._system_variable!r})"
|
||||
|
||||
Reference in New Issue
Block a user