From cdcfd2ef2c78033aa1861b6828af258ffb2884bc Mon Sep 17 00:00:00 2001 From: EvanYao <2869018789@qq.com> Date: Fri, 15 May 2026 15:26:29 +0800 Subject: [PATCH] fix: regenerate document summary after update via API (#35950) (#36035) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/tasks/document_indexing_update_task.py | 43 ++ .../test_document_indexing_update_task.py | 524 ++++++++++++++++++ 2 files changed, 567 insertions(+) create mode 100644 api/tests/unit_tests/tasks/test_document_indexing_update_task.py diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index 39564bbede..ba26c20331 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -7,10 +7,12 @@ from sqlalchemy import delete, select from core.db.session_factory import session_factory from core.indexing_runner import DocumentIsPausedError, IndexingRunner +from core.rag.index_processor.constant.index_type import IndexStructureType, IndexTechniqueType from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from models.enums import IndexingStatus +from tasks.generate_summary_index_task import generate_summary_index_task logger = logging.getLogger(__name__) @@ -70,6 +72,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str): segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id == document_id) session.execute(segment_delete_stmt) + has_error = False try: indexing_runner = IndexingRunner() indexing_runner.run([document]) @@ -77,5 +80,45 @@ def document_indexing_update_task(dataset_id: str, document_id: str): logger.info(click.style(f"update document: {document.id} latency: {end_at - start_at}", fg="green")) except DocumentIsPausedError as ex: logger.info(click.style(str(ex), fg="yellow")) + has_error = True except Exception: logger.exception("document_indexing_update_task failed, document_id: %s", document_id) + has_error = True + + if has_error: + return + + # Trigger summary index generation for the updated document if enabled. + # Only generate for high_quality indexing technique and when summary_index_setting is enabled. + with session_factory.create_session() as session: + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) + if not dataset: + logger.warning("Dataset %s not found after update indexing", dataset_id) + return + + if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY: + summary_index_setting = dataset.summary_index_setting + if summary_index_setting and summary_index_setting.get("enable"): + session.expire_all() + document = session.scalar( + select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1) + ) + if ( + document + and document.indexing_status == IndexingStatus.COMPLETED + and document.doc_form != IndexStructureType.QA_INDEX + and document.need_summary is True + ): + try: + generate_summary_index_task.delay(dataset.id, document.id, None) + logger.info( + "Queued summary index generation task for document %s in dataset %s " + "after update indexing completed", + document.id, + dataset.id, + ) + except Exception: + logger.exception( + "Failed to queue summary index generation task for document %s after update", + document.id, + ) diff --git a/api/tests/unit_tests/tasks/test_document_indexing_update_task.py b/api/tests/unit_tests/tasks/test_document_indexing_update_task.py new file mode 100644 index 0000000000..b73275b97d --- /dev/null +++ b/api/tests/unit_tests/tasks/test_document_indexing_update_task.py @@ -0,0 +1,524 @@ +""" +Unit tests for document_indexing_update_task summary generation. + +After updating a document via the API, the summary index should be +regenerated under the same conditions as during initial creation: +- indexing_technique is HIGH_QUALITY +- summary_index_setting has enable=True +- document.indexing_status is COMPLETED +- document.doc_form is not QA_INDEX +- document.need_summary is True +""" + +from contextlib import nullcontext +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from core.indexing_runner import DocumentIsPausedError +from tasks.document_indexing_update_task import document_indexing_update_task + + +class _SessionContext: + """Minimal context manager that yields a mock session.""" + + def __init__(self, session: MagicMock) -> None: + self._session = session + + def __enter__(self) -> MagicMock: + return self._session + + def __exit__(self, exc_type, exc, tb) -> None: # type: ignore[override] + return None + + +def _make_dataset_and_documents( + *, + dataset_id: str = "ds-1", + document_id: str = "doc-1", + indexing_technique: str = "high_quality", + summary_index_setting: dict | None = None, + doc_form: str = "text_model", + need_summary: bool = True, +): + """Create mock dataset and document objects. + + Returns (dataset, doc_for_session1, doc_for_session3). + + session1 doc: before IndexingRunner runs (status irrelevant for summary). + session3 doc: re-queried after IndexingRunner completes — normally COMPLETED. + """ + dataset = SimpleNamespace( + id=dataset_id, + indexing_technique=indexing_technique, + summary_index_setting=summary_index_setting, + ) + doc_s1 = SimpleNamespace( + id=document_id, + dataset_id=dataset_id, + indexing_status="waiting", + doc_form=doc_form, + need_summary=need_summary, + ) + # After IndexingRunner.run the document status is COMPLETED in the DB + doc_s3 = SimpleNamespace( + id=document_id, + dataset_id=dataset_id, + indexing_status="completed", + doc_form=doc_form, + need_summary=need_summary, + ) + return dataset, doc_s1, doc_s3 + + +def _patch_all(monkeypatch: pytest.MonkeyPatch, *, sessions, runner, processor): + """Wire up all mocks for document_indexing_update_task.""" + monkeypatch.setattr( + "tasks.document_indexing_update_task.session_factory.create_session", + MagicMock(side_effect=sessions), + ) + monkeypatch.setattr( + "tasks.document_indexing_update_task.IndexProcessorFactory", + MagicMock(return_value=MagicMock(init_index_processor=MagicMock(return_value=processor))), + ) + monkeypatch.setattr( + "tasks.document_indexing_update_task.IndexingRunner", + MagicMock(return_value=runner), + ) + + +def _session_with_begin(): + """Create a mock session with a begin() context manager.""" + s = MagicMock() + s.begin.return_value = nullcontext() + return s + + +class TestUpdateTaskSummaryGeneration: + """Tests for summary index generation in the document update task. + + The update task creates sessions in this order: + 1. session1: fetch document + dataset + segments (uses begin()) + 2. session2: delete segments — only if segments exist (uses begin()) + 3. session3: summary check — only if indexing succeeded (no begin()) + + With empty segments (default), only sessions 1 and 3 are created. + """ + + def test_should_queue_summary_when_conditions_met(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary task is queued when all conditions are met.""" + dataset, doc_s1, doc_s3 = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_called_once_with("ds-1", "doc-1", None) + + def test_should_not_queue_when_not_high_quality(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when indexing_technique is not high_quality.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + indexing_technique="economy", + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.return_value = dataset # dataset.indexing_technique == "economy" + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_summary_setting_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when summary_index_setting has enable=False.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting={"enable": False}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.return_value = dataset + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_summary_setting_none(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when summary_index_setting is None.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting=None, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.return_value = dataset + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_need_summary_false(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when document.need_summary is False.""" + dataset, doc_s1, doc_s3 = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + need_summary=False, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_qa_index_form(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when doc_form is QA_INDEX.""" + dataset, doc_s1, doc_s3 = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + doc_form="qa_model", + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_indexing_fails(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when IndexingRunner.run raises.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + runner = MagicMock() + runner.run.side_effect = Exception("indexing failed") + processor = MagicMock() + + # Only session1 needed — task returns early after indexing failure + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_document_is_paused(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when IndexingRunner raises DocumentIsPausedError.""" + + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + runner = MagicMock() + runner.run.side_effect = DocumentIsPausedError("doc-1 is paused") + processor = MagicMock() + + # Only session1 needed — task returns early after paused error + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_dataset_not_found_after_indexing(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when the dataset disappears after indexing.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + # Session 3: dataset is None + session3 = MagicMock() + session3.scalar.return_value = None + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_not_queue_when_document_not_completed_after_indexing(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Summary is skipped when document indexing_status is not COMPLETED after indexing.""" + dataset, doc_s1, _ = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + # Document still in error status after indexing + doc_s3_error = SimpleNamespace( + id="doc-1", + dataset_id="ds-1", + indexing_status="error", + doc_form="text_model", + need_summary=True, + ) + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3_error] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_not_called() + + def test_should_swallow_summary_queue_error(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Task should not raise when generate_summary_index_task.delay raises.""" + dataset, doc_s1, doc_s3 = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[])) + + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[_SessionContext(session1), _SessionContext(session3)], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock(side_effect=Exception("queue full")) + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + # Should not raise + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_called_once_with("ds-1", "doc-1", None) + + def test_should_queue_summary_with_segments_and_session2(self, monkeypatch: pytest.MonkeyPatch) -> None: + """When segments exist, session2 is also created for deletion. + Verify summary generation still works correctly.""" + dataset, doc_s1, doc_s3 = _make_dataset_and_documents( + summary_index_setting={"enable": True}, + ) + + session1 = _session_with_begin() + session1.scalar.side_effect = [doc_s1, dataset] + seg = SimpleNamespace(index_node_id="node-1") + session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[seg])) + + # Session 2: segment deletion + session2 = _session_with_begin() + + session3 = MagicMock() + session3.scalar.side_effect = [dataset, doc_s3] + + runner = MagicMock() + processor = MagicMock() + + _patch_all( + monkeypatch, + sessions=[ + _SessionContext(session1), + _SessionContext(session2), + _SessionContext(session3), + ], + runner=runner, + processor=processor, + ) + + delay_mock = MagicMock() + monkeypatch.setattr( + "tasks.document_indexing_update_task.generate_summary_index_task.delay", + delay_mock, + ) + + document_indexing_update_task("ds-1", "doc-1") + + delay_mock.assert_called_once_with("ds-1", "doc-1", None)