From 5df75d7ffa40f7fbd10f80e28c8b6a9909dd5937 Mon Sep 17 00:00:00 2001 From: FFXN Date: Fri, 23 Jan 2026 22:33:42 +0800 Subject: [PATCH] fix: fix summary index bug. --- api/tasks/regenerate_summary_index_task.py | 343 +++++++++++++-------- 1 file changed, 220 insertions(+), 123 deletions(-) diff --git a/api/tasks/regenerate_summary_index_task.py b/api/tasks/regenerate_summary_index_task.py index f24b7bf368..db6e6f854e 100644 --- a/api/tasks/regenerate_summary_index_task.py +++ b/api/tasks/regenerate_summary_index_task.py @@ -2,10 +2,11 @@ import logging import time +from collections import defaultdict import click from celery import shared_task -from sqlalchemy import select +from sqlalchemy import and_, or_, select from extensions.ext_database import db from models.dataset import Dataset, DocumentSegment, DocumentSegmentSummary @@ -62,158 +63,254 @@ def regenerate_summary_index_task( db.session.close() return - # Check if summary index is enabled + # Check if summary index is enabled (only for summary_model change) + # For embedding_model change, we still re-vectorize existing summaries even if setting is disabled summary_index_setting = dataset.summary_index_setting - if not summary_index_setting or not summary_index_setting.get("enable"): - logger.info( - click.style( - f"Summary index is disabled for dataset {dataset_id}", - fg="cyan", + if not regenerate_vectors_only: + # For summary_model change, require summary_index_setting to be enabled + if not summary_index_setting or not summary_index_setting.get("enable"): + logger.info( + click.style( + f"Summary index is disabled for dataset {dataset_id}", + fg="cyan", + ) ) - ) - db.session.close() - return - - # Get all documents with completed indexing status - dataset_documents = db.session.scalars( - select(DatasetDocument).where( - DatasetDocument.dataset_id == dataset_id, - DatasetDocument.indexing_status == "completed", - DatasetDocument.enabled == True, - DatasetDocument.archived == False, - ) - ).all() - - if not dataset_documents: - logger.info( - click.style( - f"No documents found for summary regeneration in dataset {dataset_id}", - fg="cyan", - ) - ) - db.session.close() - return - - logger.info( - "Found %s documents for summary regeneration in dataset %s", - len(dataset_documents), - dataset_id, - ) + db.session.close() + return total_segments_processed = 0 total_segments_failed = 0 - for dataset_document in dataset_documents: - # Skip qa_model documents - if dataset_document.doc_form == "qa_model": - continue - - try: - # Get all segments with existing summaries - segments = ( - db.session.query(DocumentSegment) - .join( - DocumentSegmentSummary, - DocumentSegment.id == DocumentSegmentSummary.chunk_id, - ) - .where( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.dataset_id == dataset_id, - DocumentSegment.status == "completed", - DocumentSegment.enabled == True, - DocumentSegmentSummary.dataset_id == dataset_id, - ) - .order_by(DocumentSegment.position.asc()) - .all() + if regenerate_vectors_only: + # For embedding_model change: directly query all segments with existing summaries + # Don't require document indexing_status == "completed" + # Include summaries with status "completed" or "error" (if they have content) + segments_with_summaries = ( + db.session.query(DocumentSegment, DocumentSegmentSummary) + .join( + DocumentSegmentSummary, + DocumentSegment.id == DocumentSegmentSummary.chunk_id, ) + .join( + DatasetDocument, + DocumentSegment.document_id == DatasetDocument.id, + ) + .where( + DocumentSegment.dataset_id == dataset_id, + DocumentSegment.status == "completed", # Segment must be completed + DocumentSegment.enabled == True, + DocumentSegmentSummary.dataset_id == dataset_id, + DocumentSegmentSummary.summary_content.isnot(None), # Must have summary content + # Include completed summaries or error summaries (with content) + or_( + DocumentSegmentSummary.status == "completed", + DocumentSegmentSummary.status == "error", + ), + DatasetDocument.enabled == True, # Document must be enabled + DatasetDocument.archived == False, # Document must not be archived + DatasetDocument.doc_form != "qa_model", # Skip qa_model documents + ) + .order_by(DocumentSegment.document_id.asc(), DocumentSegment.position.asc()) + .all() + ) - if not segments: - continue - + if not segments_with_summaries: logger.info( - "Regenerating summaries for %s segments in document %s", - len(segments), - dataset_document.id, + click.style( + f"No segments with summaries found for re-vectorization in dataset {dataset_id}", + fg="cyan", + ) + ) + db.session.close() + return + + logger.info( + "Found %s segments with summaries for re-vectorization in dataset %s", + len(segments_with_summaries), + dataset_id, + ) + + # Group by document for logging + segments_by_document = defaultdict(list) + for segment, summary_record in segments_with_summaries: + segments_by_document[segment.document_id].append((segment, summary_record)) + + logger.info( + "Segments grouped into %s documents for re-vectorization", + len(segments_by_document), + ) + + for document_id, segment_summary_pairs in segments_by_document.items(): + logger.info( + "Re-vectorizing summaries for %s segments in document %s", + len(segment_summary_pairs), + document_id, ) - for segment in segments: + for segment, summary_record in segment_summary_pairs: try: - # Get existing summary record - summary_record = ( - db.session.query(DocumentSegmentSummary) - .filter_by( - chunk_id=segment.id, - dataset_id=dataset_id, - ) - .first() - ) + # Delete old vector + if summary_record.summary_index_node_id: + try: + from core.rag.datasource.vdb.vector_factory import Vector - if not summary_record: - logger.warning("Summary record not found for segment %s, skipping", segment.id) - continue - - if regenerate_vectors_only: - # Only regenerate vectors (for embedding_model change) - # Delete old vector - if summary_record.summary_index_node_id: - try: - from core.rag.datasource.vdb.vector_factory import Vector - - vector = Vector(dataset) - vector.delete_by_ids([summary_record.summary_index_node_id]) - except Exception as e: - logger.warning( - "Failed to delete old summary vector for segment %s: %s", - segment.id, - str(e), - ) - - # Re-vectorize with new embedding model - SummaryIndexService.vectorize_summary(summary_record, segment, dataset) - db.session.commit() - else: - # Regenerate both summary content and vectors (for summary_model change) - SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting) - db.session.commit() + vector = Vector(dataset) + vector.delete_by_ids([summary_record.summary_index_node_id]) + except Exception as e: + logger.warning( + "Failed to delete old summary vector for segment %s: %s", + segment.id, + str(e), + ) + # Re-vectorize with new embedding model + SummaryIndexService.vectorize_summary(summary_record, segment, dataset) + db.session.commit() total_segments_processed += 1 except Exception as e: logger.error( - "Failed to regenerate summary for segment %s: %s", + "Failed to re-vectorize summary for segment %s: %s", segment.id, str(e), exc_info=True, ) total_segments_failed += 1 # Update summary record with error status - if summary_record: - summary_record.status = "error" - summary_record.error = f"Regeneration failed: {str(e)}" - db.session.add(summary_record) - db.session.commit() + summary_record.status = "error" + summary_record.error = f"Re-vectorization failed: {str(e)}" + db.session.add(summary_record) + db.session.commit() continue - except Exception as e: - logger.error( - "Failed to process document %s for summary regeneration: %s", - dataset_document.id, - str(e), - exc_info=True, + else: + # For summary_model change: require document indexing_status == "completed" + # Get all documents with completed indexing status + dataset_documents = db.session.scalars( + select(DatasetDocument).where( + DatasetDocument.dataset_id == dataset_id, + DatasetDocument.indexing_status == "completed", + DatasetDocument.enabled == True, + DatasetDocument.archived == False, ) - continue + ).all() + + if not dataset_documents: + logger.info( + click.style( + f"No documents found for summary regeneration in dataset {dataset_id}", + fg="cyan", + ) + ) + db.session.close() + return + + logger.info( + "Found %s documents for summary regeneration in dataset %s", + len(dataset_documents), + dataset_id, + ) + + for dataset_document in dataset_documents: + # Skip qa_model documents + if dataset_document.doc_form == "qa_model": + continue + + try: + # Get all segments with existing summaries + segments = ( + db.session.query(DocumentSegment) + .join( + DocumentSegmentSummary, + DocumentSegment.id == DocumentSegmentSummary.chunk_id, + ) + .where( + DocumentSegment.document_id == dataset_document.id, + DocumentSegment.dataset_id == dataset_id, + DocumentSegment.status == "completed", + DocumentSegment.enabled == True, + DocumentSegmentSummary.dataset_id == dataset_id, + ) + .order_by(DocumentSegment.position.asc()) + .all() + ) + + if not segments: + continue + + logger.info( + "Regenerating summaries for %s segments in document %s", + len(segments), + dataset_document.id, + ) + + for segment in segments: + try: + # Get existing summary record + summary_record = ( + db.session.query(DocumentSegmentSummary) + .filter_by( + chunk_id=segment.id, + dataset_id=dataset_id, + ) + .first() + ) + + if not summary_record: + logger.warning("Summary record not found for segment %s, skipping", segment.id) + continue + + # Regenerate both summary content and vectors (for summary_model change) + SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting) + db.session.commit() + total_segments_processed += 1 + + except Exception as e: + logger.error( + "Failed to regenerate summary for segment %s: %s", + segment.id, + str(e), + exc_info=True, + ) + total_segments_failed += 1 + # Update summary record with error status + if summary_record: + summary_record.status = "error" + summary_record.error = f"Regeneration failed: {str(e)}" + db.session.add(summary_record) + db.session.commit() + continue + + except Exception as e: + logger.error( + "Failed to process document %s for summary regeneration: %s", + dataset_document.id, + str(e), + exc_info=True, + ) + continue end_at = time.perf_counter() - logger.info( - click.style( - f"Summary index regeneration completed for dataset {dataset_id}: " - f"{total_segments_processed} segments processed successfully, " - f"{total_segments_failed} segments failed, " - f"total documents: {len(dataset_documents)}, " - f"latency: {end_at - start_at:.2f}s", - fg="green", + if regenerate_vectors_only: + logger.info( + click.style( + f"Summary re-vectorization completed for dataset {dataset_id}: " + f"{total_segments_processed} segments processed successfully, " + f"{total_segments_failed} segments failed, " + f"latency: {end_at - start_at:.2f}s", + fg="green", + ) + ) + else: + logger.info( + click.style( + f"Summary index regeneration completed for dataset {dataset_id}: " + f"{total_segments_processed} segments processed successfully, " + f"{total_segments_failed} segments failed, " + f"latency: {end_at - start_at:.2f}s", + fg="green", + ) ) - ) except Exception: logger.exception("Regenerate summary index failed for dataset %s", dataset_id)