mirror of
https://github.com/langgenius/dify.git
synced 2026-05-02 08:28:03 +08:00
fix: fix summary index bug.
This commit is contained in:
@ -378,25 +378,22 @@ class DatasetDocumentListApi(Resource):
|
||||
summary_status_map[doc_id] = None
|
||||
continue
|
||||
|
||||
# Count summary statuses for this document's segments
|
||||
status_counts = {"completed": 0, "generating": 0, "error": 0, "not_started": 0}
|
||||
for segment_id in segment_ids:
|
||||
status = summaries.get(segment_id, "not_started")
|
||||
if status in status_counts:
|
||||
status_counts[status] += 1
|
||||
else:
|
||||
status_counts["not_started"] += 1
|
||||
# Check if there are any "not_started" or "generating" status summaries
|
||||
# Only check enabled=True summaries (already filtered in query)
|
||||
# If segment has no summary record (summaries.get returns None),
|
||||
# it means the summary is disabled (enabled=False) or not created yet, ignore it
|
||||
has_pending_summaries = any(
|
||||
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
|
||||
and summaries[segment_id] in ("not_started", "generating")
|
||||
for segment_id in segment_ids
|
||||
)
|
||||
|
||||
generating_count = status_counts["generating"]
|
||||
|
||||
# Determine overall status:
|
||||
# - "SUMMARIZING" only when task is queued and at least one summary is generating
|
||||
# - None (empty) for all other cases (not queued, all completed/error)
|
||||
if generating_count > 0:
|
||||
# Task is queued and at least one summary is still generating
|
||||
if has_pending_summaries:
|
||||
# Task is still running (not started or generating)
|
||||
summary_status_map[doc_id] = "SUMMARIZING"
|
||||
else:
|
||||
# Task not queued yet, or all summaries are completed/error (task finished)
|
||||
# All enabled=True summaries are "completed" or "error", task finished
|
||||
# Or no enabled=True summaries exist (all disabled)
|
||||
summary_status_map[doc_id] = None
|
||||
|
||||
# Add summary_index_status to each document
|
||||
|
||||
@ -3362,15 +3362,71 @@ class SegmentService:
|
||||
elif document.doc_form in (IndexStructureType.PARAGRAPH_INDEX, IndexStructureType.QA_INDEX):
|
||||
# update segment vector index
|
||||
VectorService.update_segment_vector(args.keywords, segment, dataset)
|
||||
# update summary index if summary is provided
|
||||
if args.summary is not None:
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
# Handle summary index when content changed
|
||||
has_summary_index = (
|
||||
dataset.indexing_technique == "high_quality"
|
||||
and dataset.summary_index_setting
|
||||
and dataset.summary_index_setting.get("enable") is True
|
||||
)
|
||||
if has_summary_index:
|
||||
from models.dataset import DocumentSegmentSummary
|
||||
|
||||
try:
|
||||
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
|
||||
except Exception:
|
||||
logger.exception("Failed to update summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary update fails
|
||||
existing_summary = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.where(
|
||||
DocumentSegmentSummary.chunk_id == segment.id,
|
||||
DocumentSegmentSummary.dataset_id == dataset.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if args.summary is None:
|
||||
# User didn't provide summary, auto-regenerate if segment previously had summary
|
||||
if existing_summary:
|
||||
# Segment previously had summary, regenerate it with new content
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.generate_and_vectorize_summary(
|
||||
segment, dataset, dataset.summary_index_setting
|
||||
)
|
||||
logger.info(
|
||||
"Auto-regenerated summary for segment %s after content change", segment.id
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to auto-regenerate summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary regeneration fails
|
||||
else:
|
||||
# User provided summary, check if it has changed
|
||||
existing_summary_content = existing_summary.summary_content if existing_summary else None
|
||||
if existing_summary_content != args.summary:
|
||||
# Summary has changed, use user-provided summary
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
|
||||
logger.info(
|
||||
"Updated summary for segment %s with user-provided content", segment.id
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to update summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary update fails
|
||||
else:
|
||||
# Summary hasn't changed, regenerate based on new content
|
||||
if existing_summary:
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.generate_and_vectorize_summary(
|
||||
segment, dataset, dataset.summary_index_setting
|
||||
)
|
||||
logger.info(
|
||||
"Regenerated summary for segment %s after content change (summary unchanged)",
|
||||
segment.id,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to regenerate summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary regeneration fails
|
||||
# update multimodel vector index
|
||||
VectorService.update_multimodel_vector(segment, args.attachment_ids or [], dataset)
|
||||
except Exception as e:
|
||||
|
||||
@ -227,6 +227,90 @@ class SummaryIndexService:
|
||||
db.session.flush()
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def batch_create_summary_records(
|
||||
segments: list[DocumentSegment],
|
||||
dataset: Dataset,
|
||||
status: str = "not_started",
|
||||
) -> None:
|
||||
"""
|
||||
Batch create summary records for segments with specified status.
|
||||
If a record already exists, update its status.
|
||||
|
||||
Args:
|
||||
segments: List of DocumentSegment instances
|
||||
dataset: Dataset containing the segments
|
||||
status: Initial status for the records (default: "not_started")
|
||||
"""
|
||||
segment_ids = [segment.id for segment in segments]
|
||||
if not segment_ids:
|
||||
return
|
||||
|
||||
# Query existing summary records
|
||||
existing_summaries = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.filter(
|
||||
DocumentSegmentSummary.chunk_id.in_(segment_ids),
|
||||
DocumentSegmentSummary.dataset_id == dataset.id,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
existing_summary_map = {summary.chunk_id: summary for summary in existing_summaries}
|
||||
|
||||
# Create or update records
|
||||
for segment in segments:
|
||||
existing_summary = existing_summary_map.get(segment.id)
|
||||
if existing_summary:
|
||||
# Update existing record
|
||||
existing_summary.status = status
|
||||
existing_summary.error = None # Clear any previous errors
|
||||
if not existing_summary.enabled:
|
||||
existing_summary.enabled = True
|
||||
existing_summary.disabled_at = None
|
||||
existing_summary.disabled_by = None
|
||||
db.session.add(existing_summary)
|
||||
else:
|
||||
# Create new record
|
||||
summary_record = DocumentSegmentSummary(
|
||||
dataset_id=dataset.id,
|
||||
document_id=segment.document_id,
|
||||
chunk_id=segment.id,
|
||||
summary_content=None, # Will be filled later
|
||||
status=status,
|
||||
enabled=True,
|
||||
)
|
||||
db.session.add(summary_record)
|
||||
|
||||
@staticmethod
|
||||
def update_summary_record_error(
|
||||
segment: DocumentSegment,
|
||||
dataset: Dataset,
|
||||
error: str,
|
||||
) -> None:
|
||||
"""
|
||||
Update summary record with error status.
|
||||
|
||||
Args:
|
||||
segment: DocumentSegment
|
||||
dataset: Dataset containing the segment
|
||||
error: Error message
|
||||
"""
|
||||
summary_record = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if summary_record:
|
||||
summary_record.status = "error"
|
||||
summary_record.error = error
|
||||
db.session.add(summary_record)
|
||||
db.session.flush()
|
||||
else:
|
||||
logger.warning(
|
||||
"Summary record not found for segment %s when updating error", segment.id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def generate_and_vectorize_summary(
|
||||
segment: DocumentSegment,
|
||||
@ -235,6 +319,7 @@ class SummaryIndexService:
|
||||
) -> DocumentSegmentSummary:
|
||||
"""
|
||||
Generate summary for a segment and vectorize it.
|
||||
Assumes summary record already exists (created by batch_create_summary_records).
|
||||
|
||||
Args:
|
||||
segment: DocumentSegment to generate summary for
|
||||
@ -247,33 +332,52 @@ class SummaryIndexService:
|
||||
Raises:
|
||||
ValueError: If summary generation fails
|
||||
"""
|
||||
try:
|
||||
# Generate summary
|
||||
summary_content = SummaryIndexService.generate_summary_for_segment(segment, dataset, summary_index_setting)
|
||||
# Get existing summary record (should have been created by batch_create_summary_records)
|
||||
summary_record = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
# Create or update summary record (will handle overwrite internally)
|
||||
summary_record = SummaryIndexService.create_summary_record(
|
||||
segment, dataset, summary_content, status="generating"
|
||||
if not summary_record:
|
||||
# If not found (shouldn't happen), create one
|
||||
logger.warning(
|
||||
"Summary record not found for segment %s, creating one", segment.id
|
||||
)
|
||||
summary_record = SummaryIndexService.create_summary_record(
|
||||
segment, dataset, summary_content="", status="generating"
|
||||
)
|
||||
|
||||
try:
|
||||
# Update status to "generating"
|
||||
summary_record.status = "generating"
|
||||
summary_record.error = None
|
||||
db.session.add(summary_record)
|
||||
db.session.flush()
|
||||
|
||||
# Generate summary
|
||||
summary_content = SummaryIndexService.generate_summary_for_segment(
|
||||
segment, dataset, summary_index_setting
|
||||
)
|
||||
|
||||
# Update summary content
|
||||
summary_record.summary_content = summary_content
|
||||
|
||||
# Vectorize summary (will delete old vector if exists before creating new one)
|
||||
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
|
||||
|
||||
# Status will be updated to "completed" by vectorize_summary on success
|
||||
db.session.commit()
|
||||
logger.info("Successfully generated and vectorized summary for segment %s", segment.id)
|
||||
return summary_record
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate summary for segment %s", segment.id)
|
||||
# Update summary record with error status if it exists
|
||||
summary_record = (
|
||||
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
|
||||
)
|
||||
if summary_record:
|
||||
summary_record.status = "error"
|
||||
summary_record.error = str(e)
|
||||
db.session.add(summary_record)
|
||||
db.session.commit()
|
||||
# Update summary record with error status
|
||||
summary_record.status = "error"
|
||||
summary_record.error = str(e)
|
||||
db.session.add(summary_record)
|
||||
db.session.commit()
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
@ -340,6 +444,15 @@ class SummaryIndexService:
|
||||
logger.info("No segments found for document %s", document.id)
|
||||
return []
|
||||
|
||||
# Batch create summary records with "not_started" status before processing
|
||||
# This ensures all records exist upfront, allowing status tracking
|
||||
SummaryIndexService.batch_create_summary_records(
|
||||
segments=segments,
|
||||
dataset=dataset,
|
||||
status="not_started",
|
||||
)
|
||||
db.session.commit() # Commit initial records
|
||||
|
||||
summary_records = []
|
||||
|
||||
for segment in segments:
|
||||
@ -359,11 +472,19 @@ class SummaryIndexService:
|
||||
segment, dataset, summary_index_setting
|
||||
)
|
||||
summary_records.append(summary_record)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate summary for segment %s", segment.id)
|
||||
# Update summary record with error status
|
||||
SummaryIndexService.update_summary_record_error(
|
||||
segment=segment,
|
||||
dataset=dataset,
|
||||
error=str(e),
|
||||
)
|
||||
# Continue with other segments
|
||||
continue
|
||||
|
||||
db.session.commit() # Commit any remaining changes
|
||||
|
||||
logger.info(
|
||||
"Completed summary generation for document %s: %s summaries generated and vectorized",
|
||||
document.id,
|
||||
|
||||
Reference in New Issue
Block a user