Merge branch 'main' into feat/pull-a-variable

# Conflicts:
#	.nvmrc
This commit is contained in:
zhsama
2026-01-12 17:15:56 +08:00
59 changed files with 2445 additions and 1456 deletions

View File

@ -0,0 +1,158 @@
app:
description: Validate v1 Variable Assigner blocks streaming until conversation variable is updated.
icon: 🤖
icon_background: '#FFEAD5'
mode: advanced-chat
name: test_streaming_conversation_variables_v1_overwrite
use_icon_as_answer_icon: false
dependencies: []
kind: app
version: 0.5.0
workflow:
conversation_variables:
- description: ''
id: 6ddf2d7f-3d1b-4bb0-9a5e-9b0c87c7b5e6
name: conv_var
selector:
- conversation
- conv_var
value: default
value_type: string
environment_variables: []
features:
file_upload:
allowed_file_extensions:
- .JPG
- .JPEG
- .PNG
- .GIF
- .WEBP
- .SVG
allowed_file_types:
- image
allowed_file_upload_methods:
- local_file
- remote_url
enabled: false
fileUploadConfig:
audio_file_size_limit: 50
batch_count_limit: 5
file_size_limit: 15
image_file_size_limit: 10
video_file_size_limit: 100
workflow_file_upload_limit: 10
image:
enabled: false
number_limits: 3
transfer_methods:
- local_file
- remote_url
number_limits: 3
opening_statement: ''
retriever_resource:
enabled: true
sensitive_word_avoidance:
enabled: false
speech_to_text:
enabled: false
suggested_questions: []
suggested_questions_after_answer:
enabled: false
text_to_speech:
enabled: false
language: ''
voice: ''
graph:
edges:
- data:
isInIteration: false
isInLoop: false
sourceType: start
targetType: assigner
id: start-source-assigner-target
source: start
sourceHandle: source
target: assigner
targetHandle: target
type: custom
zIndex: 0
- data:
isInLoop: false
sourceType: assigner
targetType: answer
id: assigner-source-answer-target
source: assigner
sourceHandle: source
target: answer
targetHandle: target
type: custom
zIndex: 0
nodes:
- data:
desc: ''
selected: false
title: Start
type: start
variables: []
height: 54
id: start
position:
x: 30
y: 253
positionAbsolute:
x: 30
y: 253
selected: false
sourcePosition: right
targetPosition: left
type: custom
width: 244
- data:
answer: 'Current Value Of `conv_var` is:{{#conversation.conv_var#}}'
desc: ''
selected: false
title: Answer
type: answer
variables: []
height: 106
id: answer
position:
x: 638
y: 253
positionAbsolute:
x: 638
y: 253
selected: true
sourcePosition: right
targetPosition: left
type: custom
width: 244
- data:
assigned_variable_selector:
- conversation
- conv_var
desc: ''
input_variable_selector:
- sys
- query
selected: false
title: Variable Assigner
type: assigner
write_mode: over-write
height: 84
id: assigner
position:
x: 334
y: 253
positionAbsolute:
x: 334
y: 253
selected: false
sourcePosition: right
targetPosition: left
type: custom
width: 244
viewport:
x: 0
y: 0
zoom: 0.7

View File

@ -230,7 +230,6 @@ class TestAgentService:
# Create first agent thought
thought1 = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to analyze the user's request",
@ -257,7 +256,6 @@ class TestAgentService:
# Create second agent thought
thought2 = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=2,
thought="Based on the analysis, I can provide a response",
@ -545,7 +543,6 @@ class TestAgentService:
# Create agent thought with tool error
thought_with_error = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to analyze the user's request",
@ -759,7 +756,6 @@ class TestAgentService:
# Create agent thought with multiple tools
complex_thought = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to use multiple tools to complete this task",
@ -877,7 +873,6 @@ class TestAgentService:
# Create agent thought with files
thought_with_files = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to process some files",
@ -957,7 +952,6 @@ class TestAgentService:
# Create agent thought with empty tool data
empty_thought = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to analyze the user's request",
@ -999,7 +993,6 @@ class TestAgentService:
# Create agent thought with malformed JSON
malformed_thought = MessageAgentThought(
id=fake.uuid4(),
message_id=message.id,
position=1,
thought="I need to analyze the user's request",

View File

@ -45,3 +45,33 @@ def test_streaming_conversation_variables():
runner = TableTestRunner()
result = runner.run_test_case(case)
assert result.success, f"Test failed: {result.error}"
def test_streaming_conversation_variables_v1_overwrite_waits_for_assignment():
fixture_name = "test_streaming_conversation_variables_v1_overwrite"
input_query = "overwrite-value"
case = WorkflowTestCase(
fixture_path=fixture_name,
use_auto_mock=False,
mock_config=MockConfigBuilder().build(),
query=input_query,
inputs={},
expected_outputs={"answer": f"Current Value Of `conv_var` is:{input_query}"},
)
runner = TableTestRunner()
result = runner.run_test_case(case)
assert result.success, f"Test failed: {result.error}"
events = result.events
conv_var_chunk_events = [
event
for event in events
if isinstance(event, NodeRunStreamChunkEvent) and tuple(event.selector) == ("conversation", "conv_var")
]
assert conv_var_chunk_events, "Expected conversation variable chunk events to be emitted"
assert all(event.chunk == input_query for event in conv_var_chunk_events), (
"Expected streamed conversation variable value to match the input query"
)

View File

@ -4,6 +4,7 @@ from datetime import UTC, datetime
from unittest.mock import Mock, patch
import pytest
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Session, sessionmaker
from core.workflow.enums import WorkflowExecutionStatus
@ -104,6 +105,42 @@ class TestDifyAPISQLAlchemyWorkflowRunRepository:
return pause
class TestGetRunsBatchByTimeRange(TestDifyAPISQLAlchemyWorkflowRunRepository):
def test_get_runs_batch_by_time_range_filters_terminal_statuses(
self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock
):
scalar_result = Mock()
scalar_result.all.return_value = []
mock_session.scalars.return_value = scalar_result
repository.get_runs_batch_by_time_range(
start_from=None,
end_before=datetime(2024, 1, 1),
last_seen=None,
batch_size=50,
)
stmt = mock_session.scalars.call_args[0][0]
compiled_sql = str(
stmt.compile(
dialect=postgresql.dialect(),
compile_kwargs={"literal_binds": True},
)
)
assert "workflow_runs.status" in compiled_sql
for status in (
WorkflowExecutionStatus.SUCCEEDED,
WorkflowExecutionStatus.FAILED,
WorkflowExecutionStatus.STOPPED,
WorkflowExecutionStatus.PARTIAL_SUCCEEDED,
):
assert f"'{status.value}'" in compiled_sql
assert "'running'" not in compiled_sql
assert "'paused'" not in compiled_sql
class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
"""Test create_workflow_pause method."""
@ -181,6 +218,61 @@ class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
)
class TestDeleteRunsWithRelated(TestDifyAPISQLAlchemyWorkflowRunRepository):
def test_uses_trigger_log_repository(self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock):
node_ids_result = Mock()
node_ids_result.all.return_value = []
pause_ids_result = Mock()
pause_ids_result.all.return_value = []
mock_session.scalars.side_effect = [node_ids_result, pause_ids_result]
# app_logs delete, runs delete
mock_session.execute.side_effect = [Mock(rowcount=0), Mock(rowcount=1)]
fake_trigger_repo = Mock()
fake_trigger_repo.delete_by_run_ids.return_value = 3
run = Mock(id="run-1", tenant_id="t1", app_id="a1", workflow_id="w1", triggered_from="tf")
counts = repository.delete_runs_with_related(
[run],
delete_node_executions=lambda session, runs: (2, 1),
delete_trigger_logs=lambda session, run_ids: fake_trigger_repo.delete_by_run_ids(run_ids),
)
fake_trigger_repo.delete_by_run_ids.assert_called_once_with(["run-1"])
assert counts["node_executions"] == 2
assert counts["offloads"] == 1
assert counts["trigger_logs"] == 3
assert counts["runs"] == 1
class TestCountRunsWithRelated(TestDifyAPISQLAlchemyWorkflowRunRepository):
def test_uses_trigger_log_repository(self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock):
pause_ids_result = Mock()
pause_ids_result.all.return_value = ["pause-1", "pause-2"]
mock_session.scalars.return_value = pause_ids_result
mock_session.scalar.side_effect = [5, 2]
fake_trigger_repo = Mock()
fake_trigger_repo.count_by_run_ids.return_value = 3
run = Mock(id="run-1", tenant_id="t1", app_id="a1", workflow_id="w1", triggered_from="tf")
counts = repository.count_runs_with_related(
[run],
count_node_executions=lambda session, runs: (2, 1),
count_trigger_logs=lambda session, run_ids: fake_trigger_repo.count_by_run_ids(run_ids),
)
fake_trigger_repo.count_by_run_ids.assert_called_once_with(["run-1"])
assert counts["node_executions"] == 2
assert counts["offloads"] == 1
assert counts["trigger_logs"] == 3
assert counts["app_logs"] == 5
assert counts["pauses"] == 2
assert counts["pause_reasons"] == 2
assert counts["runs"] == 1
class TestResumeWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
"""Test resume_workflow_pause method."""

View File

@ -0,0 +1,31 @@
from unittest.mock import Mock
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Session
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
def test_delete_by_run_ids_executes_delete():
session = Mock(spec=Session)
session.execute.return_value = Mock(rowcount=2)
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
deleted = repo.delete_by_run_ids(["run-1", "run-2"])
stmt = session.execute.call_args[0][0]
compiled_sql = str(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
assert "workflow_trigger_logs" in compiled_sql
assert "'run-1'" in compiled_sql
assert "'run-2'" in compiled_sql
assert deleted == 2
def test_delete_by_run_ids_empty_short_circuits():
session = Mock(spec=Session)
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
deleted = repo.delete_by_run_ids([])
session.execute.assert_not_called()
assert deleted == 0

View File

@ -0,0 +1,327 @@
import datetime
from typing import Any
import pytest
from services.billing_service import SubscriptionPlan
from services.retention.workflow_run import clear_free_plan_expired_workflow_run_logs as cleanup_module
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
class FakeRun:
def __init__(
self,
run_id: str,
tenant_id: str,
created_at: datetime.datetime,
app_id: str = "app-1",
workflow_id: str = "wf-1",
triggered_from: str = "workflow-run",
) -> None:
self.id = run_id
self.tenant_id = tenant_id
self.app_id = app_id
self.workflow_id = workflow_id
self.triggered_from = triggered_from
self.created_at = created_at
class FakeRepo:
def __init__(
self,
batches: list[list[FakeRun]],
delete_result: dict[str, int] | None = None,
count_result: dict[str, int] | None = None,
) -> None:
self.batches = batches
self.call_idx = 0
self.deleted: list[list[str]] = []
self.counted: list[list[str]] = []
self.delete_result = delete_result or {
"runs": 0,
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
self.count_result = count_result or {
"runs": 0,
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
def get_runs_batch_by_time_range(
self,
start_from: datetime.datetime | None,
end_before: datetime.datetime,
last_seen: tuple[datetime.datetime, str] | None,
batch_size: int,
) -> list[FakeRun]:
if self.call_idx >= len(self.batches):
return []
batch = self.batches[self.call_idx]
self.call_idx += 1
return batch
def delete_runs_with_related(
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
) -> dict[str, int]:
self.deleted.append([run.id for run in runs])
result = self.delete_result.copy()
result["runs"] = len(runs)
return result
def count_runs_with_related(
self, runs: list[FakeRun], count_node_executions=None, count_trigger_logs=None
) -> dict[str, int]:
self.counted.append([run.id for run in runs])
result = self.count_result.copy()
result["runs"] = len(runs)
return result
def plan_info(plan: str, expiration: int) -> SubscriptionPlan:
return SubscriptionPlan(plan=plan, expiration_date=expiration)
def create_cleanup(
monkeypatch: pytest.MonkeyPatch,
repo: FakeRepo,
*,
grace_period_days: int = 0,
whitelist: set[str] | None = None,
**kwargs: Any,
) -> WorkflowRunCleanup:
monkeypatch.setattr(
cleanup_module.dify_config,
"SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD",
grace_period_days,
)
monkeypatch.setattr(
cleanup_module.WorkflowRunCleanup,
"_get_cleanup_whitelist",
lambda self: whitelist or set(),
)
return WorkflowRunCleanup(workflow_run_repo=repo, **kwargs)
def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
def fail_bulk(_: list[str]) -> dict[str, SubscriptionPlan]:
raise RuntimeError("should not call")
monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk_with_cache", staticmethod(fail_bulk))
tenants = {"t1", "t2"}
free = cleanup._filter_free_tenants(tenants)
assert free == tenants
def test_filter_free_tenants_bulk_mixed(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_plan_bulk_with_cache",
staticmethod(
lambda tenant_ids: {
tenant_id: (plan_info("team", -1) if tenant_id == "t_paid" else plan_info("sandbox", -1))
for tenant_id in tenant_ids
}
),
)
free = cleanup._filter_free_tenants({"t_free", "t_paid", "t_missing"})
assert free == {"t_free", "t_missing"}
def test_filter_free_tenants_respects_grace_period(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, grace_period_days=45)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
now = datetime.datetime.now(datetime.UTC)
within_grace_ts = int((now - datetime.timedelta(days=10)).timestamp())
outside_grace_ts = int((now - datetime.timedelta(days=90)).timestamp())
def fake_bulk(_: list[str]) -> dict[str, SubscriptionPlan]:
return {
"recently_downgraded": plan_info("sandbox", within_grace_ts),
"long_sandbox": plan_info("sandbox", outside_grace_ts),
}
monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk_with_cache", staticmethod(fake_bulk))
free = cleanup._filter_free_tenants({"recently_downgraded", "long_sandbox"})
assert free == {"long_sandbox"}
def test_filter_free_tenants_skips_cleanup_whitelist(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(
monkeypatch,
repo=FakeRepo([]),
days=30,
batch_size=10,
whitelist={"tenant_whitelist"},
)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_plan_bulk_with_cache",
staticmethod(
lambda tenant_ids: {
tenant_id: (plan_info("team", -1) if tenant_id == "t_paid" else plan_info("sandbox", -1))
for tenant_id in tenant_ids
}
),
)
tenants = {"tenant_whitelist", "tenant_regular"}
free = cleanup._filter_free_tenants(tenants)
assert free == {"tenant_regular"}
def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_plan_bulk_with_cache",
staticmethod(lambda tenant_ids: (_ for _ in ()).throw(RuntimeError("boom"))),
)
free = cleanup._filter_free_tenants({"t1", "t2"})
assert free == set()
def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(
batches=[
[
FakeRun("run-free", "t_free", cutoff),
FakeRun("run-paid", "t_paid", cutoff),
]
]
)
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_plan_bulk_with_cache",
staticmethod(
lambda tenant_ids: {
tenant_id: (plan_info("team", -1) if tenant_id == "t_paid" else plan_info("sandbox", -1))
for tenant_id in tenant_ids
}
),
)
cleanup.run()
assert repo.deleted == [["run-free"]]
def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]])
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_plan_bulk_with_cache",
staticmethod(lambda tenant_ids: {tenant_id: plan_info("team", 1893456000) for tenant_id in tenant_ids}),
)
cleanup.run()
assert repo.deleted == []
def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10)
cleanup.run()
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(
batches=[[FakeRun("run-free", "t_free", cutoff)]],
count_result={
"runs": 0,
"node_executions": 2,
"offloads": 1,
"app_logs": 3,
"trigger_logs": 4,
"pauses": 5,
"pause_reasons": 6,
},
)
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10, dry_run=True)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
cleanup.run()
assert repo.deleted == []
assert repo.counted == [["run-free"]]
captured = capsys.readouterr().out
assert "Dry run mode enabled" in captured
assert "would delete 1 runs" in captured
assert "related records" in captured
assert "node_executions 2" in captured
assert "offloads 1" in captured
assert "app_logs 3" in captured
assert "trigger_logs 4" in captured
assert "pauses 5" in captured
assert "pause_reasons 6" in captured
def test_between_sets_window_bounds(monkeypatch: pytest.MonkeyPatch) -> None:
start_from = datetime.datetime(2024, 5, 1, 0, 0, 0)
end_before = datetime.datetime(2024, 6, 1, 0, 0, 0)
cleanup = create_cleanup(
monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_from=start_from, end_before=end_before
)
assert cleanup.window_start == start_from
assert cleanup.window_end == end_before
def test_between_requires_both_boundaries(monkeypatch: pytest.MonkeyPatch) -> None:
with pytest.raises(ValueError):
create_cleanup(
monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_from=datetime.datetime.now(), end_before=None
)
with pytest.raises(ValueError):
create_cleanup(
monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_from=None, end_before=datetime.datetime.now()
)
def test_between_requires_end_after_start(monkeypatch: pytest.MonkeyPatch) -> None:
start_from = datetime.datetime(2024, 6, 1, 0, 0, 0)
end_before = datetime.datetime(2024, 5, 1, 0, 0, 0)
with pytest.raises(ValueError):
create_cleanup(
monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_from=start_from, end_before=end_before
)