mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 00:48:04 +08:00
Merge remote-tracking branch 'origin/main' into feat/support-agent-sandbox
This commit is contained in:
@ -707,3 +707,104 @@ class TestDatasetServiceRetrievalConfiguration:
|
||||
db_session_with_containers.refresh(dataset)
|
||||
assert result.id == dataset.id
|
||||
assert dataset.retrieval_model == update_data["retrieval_model"]
|
||||
|
||||
|
||||
class TestDocumentServicePauseRecoverRetry:
|
||||
"""Tests for pause/recover/retry orchestration using real DB and Redis."""
|
||||
|
||||
def _create_indexing_document(self, db_session_with_containers, indexing_status="indexing"):
|
||||
factory = DatasetServiceIntegrationDataFactory
|
||||
account, tenant = factory.create_account_with_tenant(db_session_with_containers)
|
||||
dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id)
|
||||
doc = factory.create_document(db_session_with_containers, dataset, account.id)
|
||||
doc.indexing_status = indexing_status
|
||||
db_session_with_containers.commit()
|
||||
return doc, account
|
||||
|
||||
def test_pause_document_success(self, db_session_with_containers):
|
||||
from extensions.ext_redis import redis_client
|
||||
from services.dataset_service import DocumentService
|
||||
|
||||
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing")
|
||||
|
||||
with patch("services.dataset_service.current_user") as mock_user:
|
||||
mock_user.id = account.id
|
||||
DocumentService.pause_document(doc)
|
||||
|
||||
db_session_with_containers.refresh(doc)
|
||||
assert doc.is_paused is True
|
||||
assert doc.paused_by == account.id
|
||||
assert doc.paused_at is not None
|
||||
|
||||
cache_key = f"document_{doc.id}_is_paused"
|
||||
assert redis_client.get(cache_key) is not None
|
||||
redis_client.delete(cache_key)
|
||||
|
||||
def test_pause_document_invalid_status_error(self, db_session_with_containers):
|
||||
from services.dataset_service import DocumentService
|
||||
from services.errors.document import DocumentIndexingError
|
||||
|
||||
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="completed")
|
||||
|
||||
with patch("services.dataset_service.current_user") as mock_user:
|
||||
mock_user.id = account.id
|
||||
with pytest.raises(DocumentIndexingError):
|
||||
DocumentService.pause_document(doc)
|
||||
|
||||
def test_recover_document_success(self, db_session_with_containers):
|
||||
from extensions.ext_redis import redis_client
|
||||
from services.dataset_service import DocumentService
|
||||
|
||||
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing")
|
||||
|
||||
# Pause first
|
||||
with patch("services.dataset_service.current_user") as mock_user:
|
||||
mock_user.id = account.id
|
||||
DocumentService.pause_document(doc)
|
||||
|
||||
# Recover
|
||||
with patch("services.dataset_service.recover_document_indexing_task") as recover_task:
|
||||
DocumentService.recover_document(doc)
|
||||
|
||||
db_session_with_containers.refresh(doc)
|
||||
assert doc.is_paused is False
|
||||
assert doc.paused_by is None
|
||||
assert doc.paused_at is None
|
||||
|
||||
cache_key = f"document_{doc.id}_is_paused"
|
||||
assert redis_client.get(cache_key) is None
|
||||
recover_task.delay.assert_called_once_with(doc.dataset_id, doc.id)
|
||||
|
||||
def test_retry_document_indexing_success(self, db_session_with_containers):
|
||||
from extensions.ext_redis import redis_client
|
||||
from services.dataset_service import DocumentService
|
||||
|
||||
factory = DatasetServiceIntegrationDataFactory
|
||||
account, tenant = factory.create_account_with_tenant(db_session_with_containers)
|
||||
dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id)
|
||||
doc1 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc1.txt")
|
||||
doc2 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc2.txt")
|
||||
doc2.position = 2
|
||||
doc1.indexing_status = "error"
|
||||
doc2.indexing_status = "error"
|
||||
db_session_with_containers.commit()
|
||||
|
||||
with (
|
||||
patch("services.dataset_service.current_user") as mock_user,
|
||||
patch("services.dataset_service.retry_document_indexing_task") as retry_task,
|
||||
):
|
||||
mock_user.id = account.id
|
||||
DocumentService.retry_document(dataset.id, [doc1, doc2])
|
||||
|
||||
db_session_with_containers.refresh(doc1)
|
||||
db_session_with_containers.refresh(doc2)
|
||||
assert doc1.indexing_status == "waiting"
|
||||
assert doc2.indexing_status == "waiting"
|
||||
|
||||
# Verify redis keys were set
|
||||
assert redis_client.get(f"document_{doc1.id}_is_retried") is not None
|
||||
assert redis_client.get(f"document_{doc2.id}_is_retried") is not None
|
||||
retry_task.delay.assert_called_once_with(dataset.id, [doc1.id, doc2.id], account.id)
|
||||
|
||||
# Cleanup
|
||||
redis_client.delete(f"document_{doc1.id}_is_retried", f"document_{doc2.id}_is_retried")
|
||||
|
||||
@ -141,3 +141,73 @@ class TestArchivedWorkflowRunDeletion:
|
||||
db_session_with_containers.expunge_all()
|
||||
deleted_run = db_session_with_containers.get(WorkflowRun, run_id)
|
||||
assert deleted_run is None
|
||||
|
||||
def test_delete_run_dry_run(self, db_session_with_containers):
|
||||
"""Dry run should return success without actually deleting."""
|
||||
tenant_id = str(uuid4())
|
||||
run = self._create_workflow_run(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
created_at=datetime.now(UTC),
|
||||
)
|
||||
run_id = run.id
|
||||
deleter = ArchivedWorkflowRunDeletion(dry_run=True)
|
||||
|
||||
result = deleter._delete_run(run)
|
||||
|
||||
assert result.success is True
|
||||
assert result.run_id == run_id
|
||||
# Run should still exist because it's a dry run
|
||||
db_session_with_containers.expire_all()
|
||||
assert db_session_with_containers.get(WorkflowRun, run_id) is not None
|
||||
|
||||
def test_delete_run_exception_returns_error(self, db_session_with_containers):
|
||||
"""Exception during deletion should return failure result."""
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
tenant_id = str(uuid4())
|
||||
run = self._create_workflow_run(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
created_at=datetime.now(UTC),
|
||||
)
|
||||
deleter = ArchivedWorkflowRunDeletion(dry_run=False)
|
||||
|
||||
with patch.object(deleter, "_get_workflow_run_repo") as mock_get_repo:
|
||||
mock_repo = MagicMock()
|
||||
mock_get_repo.return_value = mock_repo
|
||||
mock_repo.delete_runs_with_related.side_effect = Exception("Database error")
|
||||
|
||||
result = deleter._delete_run(run)
|
||||
|
||||
assert result.success is False
|
||||
assert result.error == "Database error"
|
||||
|
||||
def test_delete_by_run_id_success(self, db_session_with_containers):
|
||||
"""Successfully delete an archived workflow run by ID."""
|
||||
tenant_id = str(uuid4())
|
||||
base_time = datetime.now(UTC)
|
||||
run = self._create_workflow_run(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
created_at=base_time,
|
||||
)
|
||||
self._create_archive_log(db_session_with_containers, run=run)
|
||||
run_id = run.id
|
||||
|
||||
deleter = ArchivedWorkflowRunDeletion()
|
||||
result = deleter.delete_by_run_id(run_id)
|
||||
|
||||
assert result.success is True
|
||||
db_session_with_containers.expunge_all()
|
||||
assert db_session_with_containers.get(WorkflowRun, run_id) is None
|
||||
|
||||
def test_get_workflow_run_repo_caches_instance(self, db_session_with_containers):
|
||||
"""_get_workflow_run_repo should return a cached repo on subsequent calls."""
|
||||
deleter = ArchivedWorkflowRunDeletion()
|
||||
|
||||
repo1 = deleter._get_workflow_run_repo()
|
||||
repo2 = deleter._get_workflow_run_repo()
|
||||
|
||||
assert repo1 is repo2
|
||||
assert deleter.workflow_run_repo is repo1
|
||||
|
||||
Reference in New Issue
Block a user