Merge remote-tracking branch 'origin/main' into feat/support-agent-sandbox

# Conflicts:
#	api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
This commit is contained in:
yyh
2026-02-11 12:44:05 +08:00
58 changed files with 5155 additions and 72 deletions

View File

@ -21,6 +21,7 @@ language_timezone_mapping = {
"th-TH": "Asia/Bangkok",
"id-ID": "Asia/Jakarta",
"ar-TN": "Africa/Tunis",
"nl-NL": "Europe/Amsterdam",
}
languages = list(language_timezone_mapping.keys())

View File

@ -42,7 +42,15 @@ class SetupResponse(BaseModel):
tags=["console"],
)
def get_setup_status_api() -> SetupStatusResponse:
"""Get system setup status."""
"""Get system setup status.
NOTE: This endpoint is unauthenticated by design.
During first-time bootstrap there is no admin account yet, so frontend initialization must be
able to query setup progress before any login flow exists.
Only bootstrap-safe status information should be returned by this endpoint.
"""
if dify_config.EDITION == "SELF_HOSTED":
setup_status = get_setup_status()
if setup_status and not isinstance(setup_status, bool):
@ -61,7 +69,12 @@ def get_setup_status_api() -> SetupStatusResponse:
)
@only_edition_self_hosted
def setup_system(payload: SetupRequestPayload) -> SetupResponse:
"""Initialize system setup with admin account."""
"""Initialize system setup with admin account.
NOTE: This endpoint is unauthenticated by design for first-time bootstrap.
Access is restricted by deployment mode (`SELF_HOSTED`), one-time setup guards,
and init-password validation rather than user session authentication.
"""
if get_setup_status():
raise AlreadySetupError()

View File

@ -34,7 +34,7 @@ def stream_topic_events(
on_subscribe()
while True:
try:
msg = sub.receive(timeout=0.1)
msg = sub.receive(timeout=1)
except SubscriptionClosedError:
return
if msg is None:

View File

@ -46,6 +46,8 @@ from core.app.entities.task_entities import (
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.file import helpers as file_helpers
from core.file.enums import FileTransferMethod
from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from core.model_runtime.entities.message_entities import (
@ -57,10 +59,12 @@ from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from core.tools.signature import sign_tool_file
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import AppMode, Conversation, LLMGenerationDetail, Message, MessageAgentThought
from models.model import AppMode, Conversation, Message, MessageAgentThought, MessageFile, UploadFile
logger = logging.getLogger(__name__)
@ -591,6 +595,85 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
metadata=metadata_dict,
)
def _record_files(self):
with Session(db.engine, expire_on_commit=False) as session:
message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all()
if not message_files:
return None
files_list = []
upload_file_ids = [
mf.upload_file_id
for mf in message_files
if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id
]
upload_files_map = {}
if upload_file_ids:
upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all()
upload_files_map = {uf.id: uf for uf in upload_files}
for message_file in message_files:
upload_file = None
if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id:
upload_file = upload_files_map.get(message_file.upload_file_id)
url = None
filename = "file"
mime_type = "application/octet-stream"
size = 0
extension = ""
if message_file.transfer_method == FileTransferMethod.REMOTE_URL:
url = message_file.url
if message_file.url:
filename = message_file.url.split("/")[-1].split("?")[0] # Remove query params
elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE:
if upload_file:
url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id))
filename = upload_file.name
mime_type = upload_file.mime_type or "application/octet-stream"
size = upload_file.size or 0
extension = f".{upload_file.extension}" if upload_file.extension else ""
elif message_file.upload_file_id:
# Fallback: generate URL even if upload_file not found
url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id))
elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url:
# For tool files, use URL directly if it's HTTP, otherwise sign it
if message_file.url.startswith("http"):
url = message_file.url
filename = message_file.url.split("/")[-1].split("?")[0]
else:
# Extract tool file id and extension from URL
url_parts = message_file.url.split("/")
if url_parts:
file_part = url_parts[-1].split("?")[0] # Remove query params first
# Use rsplit to correctly handle filenames with multiple dots
if "." in file_part:
tool_file_id, ext = file_part.rsplit(".", 1)
extension = f".{ext}"
else:
tool_file_id = file_part
extension = ".bin"
url = sign_tool_file(tool_file_id=tool_file_id, extension=extension)
filename = file_part
transfer_method_value = message_file.transfer_method
remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else ""
file_dict = {
"related_id": message_file.id,
"extension": extension,
"filename": filename,
"size": size,
"mime_type": mime_type,
"transfer_method": transfer_method_value,
"type": message_file.type,
"url": url or "",
"upload_file_id": message_file.upload_file_id or message_file.id,
"remote_url": remote_url,
}
files_list.append(file_dict)
return files_list or None
def _agent_message_to_stream_response(self, answer: str, message_id: str) -> AgentMessageStreamResponse:
"""
Agent message to stream response.

View File

@ -64,7 +64,13 @@ class MessageCycleManager:
# Use SQLAlchemy 2.x style session.scalar(select(...))
with session_factory.create_session() as session:
message_file = session.scalar(select(MessageFile).where(MessageFile.message_id == message_id))
message_file = session.scalar(
select(MessageFile)
.where(
MessageFile.message_id == message_id,
)
.where(MessageFile.belongs_to == "assistant")
)
if message_file:
self._message_has_file.add(message_id)

View File

@ -119,7 +119,7 @@ class RedisClientWrapper:
redis_client: RedisClientWrapper = RedisClientWrapper()
pubsub_redis_client: RedisClientWrapper = RedisClientWrapper()
_pubsub_redis_client: redis.Redis | RedisCluster | None = None
def _get_ssl_configuration() -> tuple[type[Union[Connection, SSLConnection]], dict[str, Any]]:
@ -232,7 +232,7 @@ def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis
return client
def _create_pubsub_client(pubsub_url: str, use_clusters: bool) -> Union[redis.Redis, RedisCluster]:
def _create_pubsub_client(pubsub_url: str, use_clusters: bool) -> redis.Redis | RedisCluster:
if use_clusters:
return RedisCluster.from_url(pubsub_url)
return redis.Redis.from_url(pubsub_url)
@ -256,23 +256,19 @@ def init_app(app: DifyApp):
redis_client.initialize(client)
app.extensions["redis"] = redis_client
pubsub_client = client
global _pubsub_redis_client
_pubsub_redis_client = client
if dify_config.normalized_pubsub_redis_url:
pubsub_client = _create_pubsub_client(
_pubsub_redis_client = _create_pubsub_client(
dify_config.normalized_pubsub_redis_url, dify_config.PUBSUB_REDIS_USE_CLUSTERS
)
pubsub_redis_client.initialize(pubsub_client)
def get_pubsub_redis_client() -> RedisClientWrapper:
return pubsub_redis_client
def get_pubsub_broadcast_channel() -> BroadcastChannelProtocol:
redis_conn = get_pubsub_redis_client()
assert _pubsub_redis_client is not None, "PubSub redis Client should be initialized here."
if dify_config.PUBSUB_REDIS_CHANNEL_TYPE == "sharded":
return ShardedRedisBroadcastChannel(redis_conn) # pyright: ignore[reportArgumentType]
return RedisBroadcastChannel(redis_conn) # pyright: ignore[reportArgumentType]
return ShardedRedisBroadcastChannel(_pubsub_redis_client)
return RedisBroadcastChannel(_pubsub_redis_client)
P = ParamSpec("P")

View File

@ -152,7 +152,7 @@ class RedisSubscriptionBase(Subscription):
"""Iterator for consuming messages from the subscription."""
while not self._closed.is_set():
try:
item = self._queue.get(timeout=0.1)
item = self._queue.get(timeout=1)
except queue.Empty:
continue

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from libs.broadcast_channel.channel import Producer, Subscriber, Subscription
from redis import Redis
from redis import Redis, RedisCluster
from ._subscription import RedisSubscriptionBase
@ -18,7 +18,7 @@ class BroadcastChannel:
def __init__(
self,
redis_client: Redis,
redis_client: Redis | RedisCluster,
):
self._client = redis_client
@ -27,7 +27,7 @@ class BroadcastChannel:
class Topic:
def __init__(self, redis_client: Redis, topic: str):
def __init__(self, redis_client: Redis | RedisCluster, topic: str):
self._client = redis_client
self._topic = topic

View File

@ -70,8 +70,9 @@ class _RedisShardedSubscription(RedisSubscriptionBase):
# Since we have already filtered at the caller's site, we can safely set
# `ignore_subscribe_messages=False`.
if isinstance(self._client, RedisCluster):
# NOTE(QuantumGhost): due to an issue in upstream code, calling `get_sharded_message`
# would use busy-looping to wait for incoming message, consuming excessive CPU quota.
# NOTE(QuantumGhost): due to an issue in upstream code, calling `get_sharded_message` without
# specifying the `target_node` argument would use busy-looping to wait
# for incoming message, consuming excessive CPU quota.
#
# Here we specify the `target_node` to mitigate this problem.
node = self._client.get_node_from_key(self._topic)
@ -80,8 +81,10 @@ class _RedisShardedSubscription(RedisSubscriptionBase):
timeout=1,
target_node=node,
)
else:
elif isinstance(self._client, Redis):
return self._pubsub.get_sharded_message(ignore_subscribe_messages=False, timeout=1) # type: ignore[attr-defined]
else:
raise AssertionError("client should be either Redis or RedisCluster.")
def _get_message_type(self) -> str:
return "smessage"

View File

@ -0,0 +1,59 @@
"""add unique constraint to tenant_default_models
Revision ID: fix_tenant_default_model_unique
Revises: 9d77545f524e
Create Date: 2026-01-19 15:07:00.000000
"""
from alembic import op
import sqlalchemy as sa
def _is_pg(conn):
return conn.dialect.name == "postgresql"
# revision identifiers, used by Alembic.
revision = 'f55813ffe2c8'
down_revision = 'c3df22613c99'
branch_labels = None
depends_on = None
def upgrade():
# First, remove duplicate records keeping only the most recent one per (tenant_id, model_type)
# This is necessary before adding the unique constraint
conn = op.get_bind()
# Delete duplicates: keep the record with the latest updated_at for each (tenant_id, model_type)
# If updated_at is the same, keep the one with the largest id as tiebreaker
if _is_pg(conn):
# PostgreSQL: Use DISTINCT ON for efficient deduplication
conn.execute(sa.text("""
DELETE FROM tenant_default_models
WHERE id NOT IN (
SELECT DISTINCT ON (tenant_id, model_type) id
FROM tenant_default_models
ORDER BY tenant_id, model_type, updated_at DESC, id DESC
)
"""))
else:
# MySQL: Use self-join to find and delete duplicates
# Keep the record with latest updated_at (or largest id if updated_at is equal)
conn.execute(sa.text("""
DELETE t1 FROM tenant_default_models t1
INNER JOIN tenant_default_models t2
ON t1.tenant_id = t2.tenant_id
AND t1.model_type = t2.model_type
AND (t1.updated_at < t2.updated_at
OR (t1.updated_at = t2.updated_at AND t1.id < t2.id))
"""))
# Now add the unique constraint
with op.batch_alter_table('tenant_default_models', schema=None) as batch_op:
batch_op.create_unique_constraint('unique_tenant_default_model_type', ['tenant_id', 'model_type'])
def downgrade():
with op.batch_alter_table('tenant_default_models', schema=None) as batch_op:
batch_op.drop_constraint('unique_tenant_default_model_type', type_='unique')

View File

@ -229,7 +229,7 @@ class App(Base):
with Session(db.engine) as session:
if api_provider_ids:
existing_api_providers = [
api_provider.id
str(api_provider.id)
for api_provider in session.execute(
text("SELECT id FROM tool_api_providers WHERE id IN :provider_ids"),
{"provider_ids": tuple(api_provider_ids)},

View File

@ -181,6 +181,7 @@ class TenantDefaultModel(TypeBase):
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="tenant_default_model_pkey"),
sa.Index("tenant_default_model_tenant_id_provider_type_idx", "tenant_id", "provider_name", "model_type"),
sa.UniqueConstraint("tenant_id", "model_type", name="unique_tenant_default_model_type"),
)
id: Mapped[str] = mapped_column(

View File

@ -22,7 +22,7 @@ from libs.exception import BaseHTTPException
from models.human_input import RecipientType
from models.model import App, AppMode
from repositories.factory import DifyAPIRepositoryFactory
from tasks.app_generate.workflow_execute_task import WORKFLOW_BASED_APP_EXECUTION_QUEUE, resume_app_execution
from tasks.app_generate.workflow_execute_task import resume_app_execution
class Form:
@ -230,7 +230,6 @@ class HumanInputService:
try:
resume_app_execution.apply_async(
kwargs={"payload": payload},
queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE,
)
except Exception: # pragma: no cover
logger.exception("Failed to enqueue resume task for workflow run %s", workflow_run_id)

View File

@ -129,15 +129,15 @@ def build_workflow_event_stream(
return
try:
event = buffer_state.queue.get(timeout=0.1)
event = buffer_state.queue.get(timeout=1)
except queue.Empty:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
logger.debug(
"No workflow events received for %s seconds, keeping stream open",
"Idle timeout of %s seconds reached, closing workflow event stream.",
idle_timeout,
)
last_msg_time = current_time
return
if current_time - last_ping_time >= ping_interval:
yield StreamEvent.PING.value
last_ping_time = current_time
@ -405,7 +405,7 @@ def _start_buffering(subscription) -> BufferState:
dropped_count = 0
try:
while not buffer_state.stop_event.is_set():
msg = subscription.receive(timeout=0.1)
msg = subscription.receive(timeout=1)
if msg is None:
continue
event = _parse_event_message(msg)

View File

@ -51,7 +51,7 @@ def _patch_redis_clients_on_loaded_modules():
continue
if hasattr(module, "redis_client"):
module.redis_client = redis_mock
if hasattr(module, "pubsub_redis_client"):
if hasattr(module, "_pubsub_redis_client"):
module.pubsub_redis_client = redis_mock
@ -72,7 +72,7 @@ def _patch_redis_clients():
with (
patch.object(ext_redis, "redis_client", redis_mock),
patch.object(ext_redis, "pubsub_redis_client", redis_mock),
patch.object(ext_redis, "_pubsub_redis_client", redis_mock),
):
_patch_redis_clients_on_loaded_modules()
yield

View File

@ -25,15 +25,19 @@ class TestMessageCycleManagerOptimization:
task_state = Mock()
return MessageCycleManager(application_generate_entity=mock_application_generate_entity, task_state=task_state)
def test_get_message_event_type_with_message_file(self, message_cycle_manager):
"""Test get_message_event_type returns MESSAGE_FILE when message has files."""
def test_get_message_event_type_with_assistant_file(self, message_cycle_manager):
"""Test get_message_event_type returns MESSAGE_FILE when message has assistant-generated files.
This ensures that AI-generated images (belongs_to='assistant') trigger the MESSAGE_FILE event,
allowing the frontend to properly display generated image files with url field.
"""
with patch("core.app.task_pipeline.message_cycle_manager.session_factory") as mock_session_factory:
# Setup mock session and message file
mock_session = Mock()
mock_session_factory.create_session.return_value.__enter__.return_value = mock_session
mock_message_file = Mock()
# Current implementation uses session.scalar(select(...))
mock_message_file.belongs_to = "assistant"
mock_session.scalar.return_value = mock_message_file
# Execute
@ -44,6 +48,31 @@ class TestMessageCycleManagerOptimization:
assert result == StreamEvent.MESSAGE_FILE
mock_session.scalar.assert_called_once()
def test_get_message_event_type_with_user_file(self, message_cycle_manager):
"""Test get_message_event_type returns MESSAGE when message only has user-uploaded files.
This is a regression test for the issue where user-uploaded images (belongs_to='user')
caused the LLM text response to be incorrectly tagged with MESSAGE_FILE event,
resulting in broken images in the chat UI. The query filters for belongs_to='assistant',
so when only user files exist, the database query returns None, resulting in MESSAGE event type.
"""
with patch("core.app.task_pipeline.message_cycle_manager.session_factory") as mock_session_factory:
# Setup mock session and message file
mock_session = Mock()
mock_session_factory.create_session.return_value.__enter__.return_value = mock_session
# When querying for assistant files with only user files present, return None
# (simulates database query with belongs_to='assistant' filter returning no results)
mock_session.scalar.return_value = None
# Execute
with current_app.app_context():
result = message_cycle_manager.get_message_event_type("test-message-id")
# Assert
assert result == StreamEvent.MESSAGE
mock_session.scalar.assert_called_once()
def test_get_message_event_type_without_message_file(self, message_cycle_manager):
"""Test get_message_event_type returns MESSAGE when message has no files."""
with patch("core.app.task_pipeline.message_cycle_manager.session_factory") as mock_session_factory:
@ -69,7 +98,7 @@ class TestMessageCycleManagerOptimization:
mock_session_factory.create_session.return_value.__enter__.return_value = mock_session
mock_message_file = Mock()
# Current implementation uses session.scalar(select(...))
mock_message_file.belongs_to = "assistant"
mock_session.scalar.return_value = mock_message_file
# Execute: compute event type once, then pass to message_to_stream_response

View File

@ -198,6 +198,15 @@ class SubscriptionTestCase:
description: str = ""
class FakeRedisClient:
"""Minimal fake Redis client for unit tests."""
def __init__(self) -> None:
self.publish = MagicMock()
self.spublish = MagicMock()
self.pubsub = MagicMock(return_value=MagicMock())
class TestRedisSubscription:
"""Test cases for the _RedisSubscription class."""
@ -619,10 +628,13 @@ class TestRedisSubscription:
class TestRedisShardedSubscription:
"""Test cases for the _RedisShardedSubscription class."""
@pytest.fixture(autouse=True)
def patch_sharded_redis_type(self, monkeypatch):
monkeypatch.setattr("libs.broadcast_channel.redis.sharded_channel.Redis", FakeRedisClient)
@pytest.fixture
def mock_redis_client(self) -> MagicMock:
client = MagicMock()
return client
def mock_redis_client(self) -> FakeRedisClient:
return FakeRedisClient()
@pytest.fixture
def mock_pubsub(self) -> MagicMock:
@ -636,7 +648,7 @@ class TestRedisShardedSubscription:
@pytest.fixture
def sharded_subscription(
self, mock_pubsub: MagicMock, mock_redis_client: MagicMock
self, mock_pubsub: MagicMock, mock_redis_client: FakeRedisClient
) -> Generator[_RedisShardedSubscription, None, None]:
"""Create a _RedisShardedSubscription instance for testing."""
subscription = _RedisShardedSubscription(
@ -657,7 +669,7 @@ class TestRedisShardedSubscription:
# ==================== Lifecycle Tests ====================
def test_sharded_subscription_initialization(self, mock_pubsub: MagicMock, mock_redis_client: MagicMock):
def test_sharded_subscription_initialization(self, mock_pubsub: MagicMock, mock_redis_client: FakeRedisClient):
"""Test that sharded subscription is properly initialized."""
subscription = _RedisShardedSubscription(
client=mock_redis_client,
@ -970,7 +982,7 @@ class TestRedisShardedSubscription:
],
)
def test_sharded_subscription_scenarios(
self, test_case: SubscriptionTestCase, mock_pubsub: MagicMock, mock_redis_client: MagicMock
self, test_case: SubscriptionTestCase, mock_pubsub: MagicMock, mock_redis_client: FakeRedisClient
):
"""Test various sharded subscription scenarios using table-driven approach."""
subscription = _RedisShardedSubscription(
@ -1058,7 +1070,7 @@ class TestRedisShardedSubscription:
# Close should still work
sharded_subscription.close() # Should not raise
def test_channel_name_variations(self, mock_pubsub: MagicMock, mock_redis_client: MagicMock):
def test_channel_name_variations(self, mock_pubsub: MagicMock, mock_redis_client: FakeRedisClient):
"""Test various sharded channel name formats."""
channel_names = [
"simple",
@ -1120,10 +1132,13 @@ class TestRedisSubscriptionCommon:
"""Parameterized fixture providing subscription type and class."""
return request.param
@pytest.fixture(autouse=True)
def patch_sharded_redis_type(self, monkeypatch):
monkeypatch.setattr("libs.broadcast_channel.redis.sharded_channel.Redis", FakeRedisClient)
@pytest.fixture
def mock_redis_client(self) -> MagicMock:
client = MagicMock()
return client
def mock_redis_client(self) -> FakeRedisClient:
return FakeRedisClient()
@pytest.fixture
def mock_pubsub(self) -> MagicMock:
@ -1140,7 +1155,7 @@ class TestRedisSubscriptionCommon:
return pubsub
@pytest.fixture
def subscription(self, subscription_params, mock_pubsub: MagicMock, mock_redis_client: MagicMock):
def subscription(self, subscription_params, mock_pubsub: MagicMock, mock_redis_client: FakeRedisClient):
"""Create a subscription instance based on parameterized type."""
subscription_type, subscription_class = subscription_params
topic_name = f"test-{subscription_type}-topic"

View File

@ -17,7 +17,6 @@ from core.workflow.nodes.human_input.entities import (
from core.workflow.nodes.human_input.enums import FormInputType, HumanInputFormKind, HumanInputFormStatus
from models.human_input import RecipientType
from services.human_input_service import Form, FormExpiredError, HumanInputService, InvalidFormDataError
from tasks.app_generate.workflow_execute_task import WORKFLOW_BASED_APP_EXECUTION_QUEUE
@pytest.fixture
@ -88,7 +87,6 @@ def test_enqueue_resume_dispatches_task_for_workflow(mocker, mock_session_factor
resume_task.apply_async.assert_called_once()
call_kwargs = resume_task.apply_async.call_args.kwargs
assert call_kwargs["queue"] == WORKFLOW_BASED_APP_EXECUTION_QUEUE
assert call_kwargs["kwargs"]["payload"]["workflow_run_id"] == "workflow-run-id"
@ -130,7 +128,6 @@ def test_enqueue_resume_dispatches_task_for_advanced_chat(mocker, mock_session_f
resume_task.apply_async.assert_called_once()
call_kwargs = resume_task.apply_async.call_args.kwargs
assert call_kwargs["queue"] == WORKFLOW_BASED_APP_EXECUTION_QUEUE
assert call_kwargs["kwargs"]["payload"]["workflow_run_id"] == "workflow-run-id"