mirror of
https://github.com/langgenius/dify.git
synced 2026-05-06 02:18:08 +08:00
test: migrate dataset service document indexing tests to testcontainers (#34022)
This commit is contained in:
@ -707,3 +707,104 @@ class TestDatasetServiceRetrievalConfiguration:
|
|||||||
db_session_with_containers.refresh(dataset)
|
db_session_with_containers.refresh(dataset)
|
||||||
assert result.id == dataset.id
|
assert result.id == dataset.id
|
||||||
assert dataset.retrieval_model == update_data["retrieval_model"]
|
assert dataset.retrieval_model == update_data["retrieval_model"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestDocumentServicePauseRecoverRetry:
|
||||||
|
"""Tests for pause/recover/retry orchestration using real DB and Redis."""
|
||||||
|
|
||||||
|
def _create_indexing_document(self, db_session_with_containers, indexing_status="indexing"):
|
||||||
|
factory = DatasetServiceIntegrationDataFactory
|
||||||
|
account, tenant = factory.create_account_with_tenant(db_session_with_containers)
|
||||||
|
dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id)
|
||||||
|
doc = factory.create_document(db_session_with_containers, dataset, account.id)
|
||||||
|
doc.indexing_status = indexing_status
|
||||||
|
db_session_with_containers.commit()
|
||||||
|
return doc, account
|
||||||
|
|
||||||
|
def test_pause_document_success(self, db_session_with_containers):
|
||||||
|
from extensions.ext_redis import redis_client
|
||||||
|
from services.dataset_service import DocumentService
|
||||||
|
|
||||||
|
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing")
|
||||||
|
|
||||||
|
with patch("services.dataset_service.current_user") as mock_user:
|
||||||
|
mock_user.id = account.id
|
||||||
|
DocumentService.pause_document(doc)
|
||||||
|
|
||||||
|
db_session_with_containers.refresh(doc)
|
||||||
|
assert doc.is_paused is True
|
||||||
|
assert doc.paused_by == account.id
|
||||||
|
assert doc.paused_at is not None
|
||||||
|
|
||||||
|
cache_key = f"document_{doc.id}_is_paused"
|
||||||
|
assert redis_client.get(cache_key) is not None
|
||||||
|
redis_client.delete(cache_key)
|
||||||
|
|
||||||
|
def test_pause_document_invalid_status_error(self, db_session_with_containers):
|
||||||
|
from services.dataset_service import DocumentService
|
||||||
|
from services.errors.document import DocumentIndexingError
|
||||||
|
|
||||||
|
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="completed")
|
||||||
|
|
||||||
|
with patch("services.dataset_service.current_user") as mock_user:
|
||||||
|
mock_user.id = account.id
|
||||||
|
with pytest.raises(DocumentIndexingError):
|
||||||
|
DocumentService.pause_document(doc)
|
||||||
|
|
||||||
|
def test_recover_document_success(self, db_session_with_containers):
|
||||||
|
from extensions.ext_redis import redis_client
|
||||||
|
from services.dataset_service import DocumentService
|
||||||
|
|
||||||
|
doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing")
|
||||||
|
|
||||||
|
# Pause first
|
||||||
|
with patch("services.dataset_service.current_user") as mock_user:
|
||||||
|
mock_user.id = account.id
|
||||||
|
DocumentService.pause_document(doc)
|
||||||
|
|
||||||
|
# Recover
|
||||||
|
with patch("services.dataset_service.recover_document_indexing_task") as recover_task:
|
||||||
|
DocumentService.recover_document(doc)
|
||||||
|
|
||||||
|
db_session_with_containers.refresh(doc)
|
||||||
|
assert doc.is_paused is False
|
||||||
|
assert doc.paused_by is None
|
||||||
|
assert doc.paused_at is None
|
||||||
|
|
||||||
|
cache_key = f"document_{doc.id}_is_paused"
|
||||||
|
assert redis_client.get(cache_key) is None
|
||||||
|
recover_task.delay.assert_called_once_with(doc.dataset_id, doc.id)
|
||||||
|
|
||||||
|
def test_retry_document_indexing_success(self, db_session_with_containers):
|
||||||
|
from extensions.ext_redis import redis_client
|
||||||
|
from services.dataset_service import DocumentService
|
||||||
|
|
||||||
|
factory = DatasetServiceIntegrationDataFactory
|
||||||
|
account, tenant = factory.create_account_with_tenant(db_session_with_containers)
|
||||||
|
dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id)
|
||||||
|
doc1 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc1.txt")
|
||||||
|
doc2 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc2.txt")
|
||||||
|
doc2.position = 2
|
||||||
|
doc1.indexing_status = "error"
|
||||||
|
doc2.indexing_status = "error"
|
||||||
|
db_session_with_containers.commit()
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("services.dataset_service.current_user") as mock_user,
|
||||||
|
patch("services.dataset_service.retry_document_indexing_task") as retry_task,
|
||||||
|
):
|
||||||
|
mock_user.id = account.id
|
||||||
|
DocumentService.retry_document(dataset.id, [doc1, doc2])
|
||||||
|
|
||||||
|
db_session_with_containers.refresh(doc1)
|
||||||
|
db_session_with_containers.refresh(doc2)
|
||||||
|
assert doc1.indexing_status == "waiting"
|
||||||
|
assert doc2.indexing_status == "waiting"
|
||||||
|
|
||||||
|
# Verify redis keys were set
|
||||||
|
assert redis_client.get(f"document_{doc1.id}_is_retried") is not None
|
||||||
|
assert redis_client.get(f"document_{doc2.id}_is_retried") is not None
|
||||||
|
retry_task.delay.assert_called_once_with(dataset.id, [doc1.id, doc2.id], account.id)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
redis_client.delete(f"document_{doc1.id}_is_retried", f"document_{doc2.id}_is_retried")
|
||||||
|
|||||||
@ -1,129 +0,0 @@
|
|||||||
"""Unit tests for non-SQL DocumentService orchestration behaviors.
|
|
||||||
|
|
||||||
This file intentionally keeps only collaborator-oriented document indexing
|
|
||||||
orchestration tests. SQL-backed dataset lifecycle cases are covered by
|
|
||||||
integration tests under testcontainers.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from unittest.mock import Mock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from models.dataset import Document
|
|
||||||
from services.errors.document import DocumentIndexingError
|
|
||||||
|
|
||||||
|
|
||||||
class DatasetServiceUnitDataFactory:
|
|
||||||
"""Factory for creating lightweight document doubles used in unit tests."""
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_document_mock(
|
|
||||||
document_id: str = "doc-123",
|
|
||||||
dataset_id: str = "dataset-123",
|
|
||||||
indexing_status: str = "completed",
|
|
||||||
is_paused: bool = False,
|
|
||||||
) -> Mock:
|
|
||||||
"""Create a document-shaped mock for DocumentService orchestration tests."""
|
|
||||||
document = Mock(spec=Document)
|
|
||||||
document.id = document_id
|
|
||||||
document.dataset_id = dataset_id
|
|
||||||
document.indexing_status = indexing_status
|
|
||||||
document.is_paused = is_paused
|
|
||||||
document.paused_by = None
|
|
||||||
document.paused_at = None
|
|
||||||
return document
|
|
||||||
|
|
||||||
|
|
||||||
class TestDatasetServiceDocumentIndexing:
|
|
||||||
"""Unit tests for pause/recover/retry orchestration without SQL assertions."""
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_document_service_dependencies(self):
|
|
||||||
"""Patch non-SQL collaborators used by DocumentService methods."""
|
|
||||||
with (
|
|
||||||
patch("services.dataset_service.redis_client") as mock_redis,
|
|
||||||
patch("services.dataset_service.db.session") as mock_db,
|
|
||||||
patch("services.dataset_service.current_user") as mock_current_user,
|
|
||||||
):
|
|
||||||
mock_current_user.id = "user-123"
|
|
||||||
yield {
|
|
||||||
"redis_client": mock_redis,
|
|
||||||
"db_session": mock_db,
|
|
||||||
"current_user": mock_current_user,
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_pause_document_success(self, mock_document_service_dependencies):
|
|
||||||
"""Pause a document that is currently in an indexable status."""
|
|
||||||
# Arrange
|
|
||||||
document = DatasetServiceUnitDataFactory.create_document_mock(indexing_status="indexing")
|
|
||||||
|
|
||||||
# Act
|
|
||||||
from services.dataset_service import DocumentService
|
|
||||||
|
|
||||||
DocumentService.pause_document(document)
|
|
||||||
|
|
||||||
# Assert
|
|
||||||
assert document.is_paused is True
|
|
||||||
assert document.paused_by == "user-123"
|
|
||||||
mock_document_service_dependencies["db_session"].add.assert_called_once_with(document)
|
|
||||||
mock_document_service_dependencies["db_session"].commit.assert_called_once()
|
|
||||||
mock_document_service_dependencies["redis_client"].setnx.assert_called_once_with(
|
|
||||||
f"document_{document.id}_is_paused",
|
|
||||||
"True",
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_pause_document_invalid_status_error(self, mock_document_service_dependencies):
|
|
||||||
"""Raise DocumentIndexingError when pausing a completed document."""
|
|
||||||
# Arrange
|
|
||||||
document = DatasetServiceUnitDataFactory.create_document_mock(indexing_status="completed")
|
|
||||||
|
|
||||||
# Act / Assert
|
|
||||||
from services.dataset_service import DocumentService
|
|
||||||
|
|
||||||
with pytest.raises(DocumentIndexingError):
|
|
||||||
DocumentService.pause_document(document)
|
|
||||||
|
|
||||||
def test_recover_document_success(self, mock_document_service_dependencies):
|
|
||||||
"""Recover a paused document and dispatch the recover indexing task."""
|
|
||||||
# Arrange
|
|
||||||
document = DatasetServiceUnitDataFactory.create_document_mock(indexing_status="indexing", is_paused=True)
|
|
||||||
|
|
||||||
# Act
|
|
||||||
with patch("services.dataset_service.recover_document_indexing_task") as recover_task:
|
|
||||||
from services.dataset_service import DocumentService
|
|
||||||
|
|
||||||
DocumentService.recover_document(document)
|
|
||||||
|
|
||||||
# Assert
|
|
||||||
assert document.is_paused is False
|
|
||||||
assert document.paused_by is None
|
|
||||||
assert document.paused_at is None
|
|
||||||
mock_document_service_dependencies["db_session"].add.assert_called_once_with(document)
|
|
||||||
mock_document_service_dependencies["db_session"].commit.assert_called_once()
|
|
||||||
mock_document_service_dependencies["redis_client"].delete.assert_called_once_with(
|
|
||||||
f"document_{document.id}_is_paused"
|
|
||||||
)
|
|
||||||
recover_task.delay.assert_called_once_with(document.dataset_id, document.id)
|
|
||||||
|
|
||||||
def test_retry_document_indexing_success(self, mock_document_service_dependencies):
|
|
||||||
"""Reset documents to waiting state and dispatch retry indexing task."""
|
|
||||||
# Arrange
|
|
||||||
dataset_id = "dataset-123"
|
|
||||||
documents = [
|
|
||||||
DatasetServiceUnitDataFactory.create_document_mock(document_id="doc-1", indexing_status="error"),
|
|
||||||
DatasetServiceUnitDataFactory.create_document_mock(document_id="doc-2", indexing_status="error"),
|
|
||||||
]
|
|
||||||
mock_document_service_dependencies["redis_client"].get.return_value = None
|
|
||||||
|
|
||||||
# Act
|
|
||||||
with patch("services.dataset_service.retry_document_indexing_task") as retry_task:
|
|
||||||
from services.dataset_service import DocumentService
|
|
||||||
|
|
||||||
DocumentService.retry_document(dataset_id, documents)
|
|
||||||
|
|
||||||
# Assert
|
|
||||||
assert all(document.indexing_status == "waiting" for document in documents)
|
|
||||||
assert mock_document_service_dependencies["db_session"].add.call_count == 2
|
|
||||||
assert mock_document_service_dependencies["db_session"].commit.call_count == 2
|
|
||||||
assert mock_document_service_dependencies["redis_client"].setex.call_count == 2
|
|
||||||
retry_task.delay.assert_called_once_with(dataset_id, ["doc-1", "doc-2"], "user-123")
|
|
||||||
Reference in New Issue
Block a user