fix: regenerate document summary after update via API (#35950) (#36035)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
EvanYao
2026-05-15 15:26:29 +08:00
committed by GitHub
parent b04a3851cc
commit cdcfd2ef2c
2 changed files with 567 additions and 0 deletions

View File

@ -7,10 +7,12 @@ from sqlalchemy import delete, select
from core.db.session_factory import session_factory
from core.indexing_runner import DocumentIsPausedError, IndexingRunner
from core.rag.index_processor.constant.index_type import IndexStructureType, IndexTechniqueType
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from libs.datetime_utils import naive_utc_now
from models.dataset import Dataset, Document, DocumentSegment
from models.enums import IndexingStatus
from tasks.generate_summary_index_task import generate_summary_index_task
logger = logging.getLogger(__name__)
@ -70,6 +72,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id == document_id)
session.execute(segment_delete_stmt)
has_error = False
try:
indexing_runner = IndexingRunner()
indexing_runner.run([document])
@ -77,5 +80,45 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
logger.info(click.style(f"update document: {document.id} latency: {end_at - start_at}", fg="green"))
except DocumentIsPausedError as ex:
logger.info(click.style(str(ex), fg="yellow"))
has_error = True
except Exception:
logger.exception("document_indexing_update_task failed, document_id: %s", document_id)
has_error = True
if has_error:
return
# Trigger summary index generation for the updated document if enabled.
# Only generate for high_quality indexing technique and when summary_index_setting is enabled.
with session_factory.create_session() as session:
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if not dataset:
logger.warning("Dataset %s not found after update indexing", dataset_id)
return
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
summary_index_setting = dataset.summary_index_setting
if summary_index_setting and summary_index_setting.get("enable"):
session.expire_all()
document = session.scalar(
select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1)
)
if (
document
and document.indexing_status == IndexingStatus.COMPLETED
and document.doc_form != IndexStructureType.QA_INDEX
and document.need_summary is True
):
try:
generate_summary_index_task.delay(dataset.id, document.id, None)
logger.info(
"Queued summary index generation task for document %s in dataset %s "
"after update indexing completed",
document.id,
dataset.id,
)
except Exception:
logger.exception(
"Failed to queue summary index generation task for document %s after update",
document.id,
)

View File

@ -0,0 +1,524 @@
"""
Unit tests for document_indexing_update_task summary generation.
After updating a document via the API, the summary index should be
regenerated under the same conditions as during initial creation:
- indexing_technique is HIGH_QUALITY
- summary_index_setting has enable=True
- document.indexing_status is COMPLETED
- document.doc_form is not QA_INDEX
- document.need_summary is True
"""
from contextlib import nullcontext
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from core.indexing_runner import DocumentIsPausedError
from tasks.document_indexing_update_task import document_indexing_update_task
class _SessionContext:
"""Minimal context manager that yields a mock session."""
def __init__(self, session: MagicMock) -> None:
self._session = session
def __enter__(self) -> MagicMock:
return self._session
def __exit__(self, exc_type, exc, tb) -> None: # type: ignore[override]
return None
def _make_dataset_and_documents(
*,
dataset_id: str = "ds-1",
document_id: str = "doc-1",
indexing_technique: str = "high_quality",
summary_index_setting: dict | None = None,
doc_form: str = "text_model",
need_summary: bool = True,
):
"""Create mock dataset and document objects.
Returns (dataset, doc_for_session1, doc_for_session3).
session1 doc: before IndexingRunner runs (status irrelevant for summary).
session3 doc: re-queried after IndexingRunner completes — normally COMPLETED.
"""
dataset = SimpleNamespace(
id=dataset_id,
indexing_technique=indexing_technique,
summary_index_setting=summary_index_setting,
)
doc_s1 = SimpleNamespace(
id=document_id,
dataset_id=dataset_id,
indexing_status="waiting",
doc_form=doc_form,
need_summary=need_summary,
)
# After IndexingRunner.run the document status is COMPLETED in the DB
doc_s3 = SimpleNamespace(
id=document_id,
dataset_id=dataset_id,
indexing_status="completed",
doc_form=doc_form,
need_summary=need_summary,
)
return dataset, doc_s1, doc_s3
def _patch_all(monkeypatch: pytest.MonkeyPatch, *, sessions, runner, processor):
"""Wire up all mocks for document_indexing_update_task."""
monkeypatch.setattr(
"tasks.document_indexing_update_task.session_factory.create_session",
MagicMock(side_effect=sessions),
)
monkeypatch.setattr(
"tasks.document_indexing_update_task.IndexProcessorFactory",
MagicMock(return_value=MagicMock(init_index_processor=MagicMock(return_value=processor))),
)
monkeypatch.setattr(
"tasks.document_indexing_update_task.IndexingRunner",
MagicMock(return_value=runner),
)
def _session_with_begin():
"""Create a mock session with a begin() context manager."""
s = MagicMock()
s.begin.return_value = nullcontext()
return s
class TestUpdateTaskSummaryGeneration:
"""Tests for summary index generation in the document update task.
The update task creates sessions in this order:
1. session1: fetch document + dataset + segments (uses begin())
2. session2: delete segments — only if segments exist (uses begin())
3. session3: summary check — only if indexing succeeded (no begin())
With empty segments (default), only sessions 1 and 3 are created.
"""
def test_should_queue_summary_when_conditions_met(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary task is queued when all conditions are met."""
dataset, doc_s1, doc_s3 = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_called_once_with("ds-1", "doc-1", None)
def test_should_not_queue_when_not_high_quality(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when indexing_technique is not high_quality."""
dataset, doc_s1, _ = _make_dataset_and_documents(
indexing_technique="economy",
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.return_value = dataset # dataset.indexing_technique == "economy"
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_summary_setting_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when summary_index_setting has enable=False."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting={"enable": False},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.return_value = dataset
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_summary_setting_none(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when summary_index_setting is None."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting=None,
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.return_value = dataset
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_need_summary_false(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when document.need_summary is False."""
dataset, doc_s1, doc_s3 = _make_dataset_and_documents(
summary_index_setting={"enable": True},
need_summary=False,
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_qa_index_form(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when doc_form is QA_INDEX."""
dataset, doc_s1, doc_s3 = _make_dataset_and_documents(
summary_index_setting={"enable": True},
doc_form="qa_model",
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_indexing_fails(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when IndexingRunner.run raises."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
runner = MagicMock()
runner.run.side_effect = Exception("indexing failed")
processor = MagicMock()
# Only session1 needed — task returns early after indexing failure
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_document_is_paused(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when IndexingRunner raises DocumentIsPausedError."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
runner = MagicMock()
runner.run.side_effect = DocumentIsPausedError("doc-1 is paused")
processor = MagicMock()
# Only session1 needed — task returns early after paused error
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_dataset_not_found_after_indexing(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when the dataset disappears after indexing."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
# Session 3: dataset is None
session3 = MagicMock()
session3.scalar.return_value = None
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_not_queue_when_document_not_completed_after_indexing(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Summary is skipped when document indexing_status is not COMPLETED after indexing."""
dataset, doc_s1, _ = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
# Document still in error status after indexing
doc_s3_error = SimpleNamespace(
id="doc-1",
dataset_id="ds-1",
indexing_status="error",
doc_form="text_model",
need_summary=True,
)
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3_error]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_not_called()
def test_should_swallow_summary_queue_error(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Task should not raise when generate_summary_index_task.delay raises."""
dataset, doc_s1, doc_s3 = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[]))
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[_SessionContext(session1), _SessionContext(session3)],
runner=runner,
processor=processor,
)
delay_mock = MagicMock(side_effect=Exception("queue full"))
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
# Should not raise
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_called_once_with("ds-1", "doc-1", None)
def test_should_queue_summary_with_segments_and_session2(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""When segments exist, session2 is also created for deletion.
Verify summary generation still works correctly."""
dataset, doc_s1, doc_s3 = _make_dataset_and_documents(
summary_index_setting={"enable": True},
)
session1 = _session_with_begin()
session1.scalar.side_effect = [doc_s1, dataset]
seg = SimpleNamespace(index_node_id="node-1")
session1.scalars.return_value = MagicMock(all=MagicMock(return_value=[seg]))
# Session 2: segment deletion
session2 = _session_with_begin()
session3 = MagicMock()
session3.scalar.side_effect = [dataset, doc_s3]
runner = MagicMock()
processor = MagicMock()
_patch_all(
monkeypatch,
sessions=[
_SessionContext(session1),
_SessionContext(session2),
_SessionContext(session3),
],
runner=runner,
processor=processor,
)
delay_mock = MagicMock()
monkeypatch.setattr(
"tasks.document_indexing_update_task.generate_summary_index_task.delay",
delay_mock,
)
document_indexing_update_task("ds-1", "doc-1")
delay_mock.assert_called_once_with("ds-1", "doc-1", None)