feat: summary for .dsl and .pipeline

This commit is contained in:
FFXN
2026-01-28 10:14:29 +08:00
parent 75f288ff02
commit e665375549
4 changed files with 141 additions and 4 deletions

View File

@ -38,6 +38,7 @@ class DatasetCreatePayload(BaseModel):
retrieval_model: RetrievalModel | None = None
embedding_model: str | None = None
embedding_model_provider: str | None = None
summary_index_setting: dict | None = None
class DatasetUpdatePayload(BaseModel):
@ -209,6 +210,7 @@ class DatasetListApi(DatasetApiResource):
embedding_model_provider=payload.embedding_model_provider,
embedding_model_name=payload.embedding_model,
retrieval_model=payload.retrieval_model,
summary_index_setting=payload.summary_index_setting,
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()

View File

@ -488,6 +488,87 @@ class DocumentListApi(DatasetApiResource):
)
documents = paginated_documents.items
# Check if dataset has summary index enabled
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
# Filter documents that need summary calculation
documents_need_summary = [doc for doc in documents if doc.need_summary is True]
document_ids_need_summary = [str(doc.id) for doc in documents_need_summary]
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map = {}
if has_summary_index and document_ids_need_summary:
# Get all segments for these documents (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id, DocumentSegment.document_id)
.where(
DocumentSegment.document_id.in_(document_ids_need_summary),
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == tenant_id,
)
.all()
)
# Group segments by document_id
document_segments_map = {}
for segment in segments:
doc_id = str(segment.document_id)
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
if all_segment_ids:
from models.dataset import DocumentSegmentSummary
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(all_segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is None (not started)
summary_status_map[doc_id] = None
continue
# Check if there are any "not_started" or "generating" status summaries
# Only check enabled=True summaries (already filtered in query)
# If segment has no summary record (summaries.get returns None),
# it means the summary is disabled (enabled=False) or not created yet, ignore it
has_pending_summaries = any(
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
and summaries[segment_id] in ("not_started", "generating")
for segment_id in segment_ids
)
if has_pending_summaries:
# Task is still running (not started or generating)
summary_status_map[doc_id] = "SUMMARIZING"
else:
# All enabled=True summaries are "completed" or "error", task finished
# Or no enabled=True summaries exist (all disabled)
summary_status_map[doc_id] = None
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
# Get status from map, default to None (not queued yet)
document.summary_index_status = summary_status_map.get(str(document.id))
else:
# Return null if summary index is not enabled or document doesn't need summary
document.summary_index_status = None
response = {
"data": marshal(documents, document_fields),
"has_more": len(documents) == query_params.limit,
@ -592,6 +673,51 @@ class DocumentApi(DatasetApiResource):
if metadata not in self.METADATA_CHOICES:
raise InvalidMetadataError(f"Invalid metadata value: {metadata}")
# Calculate summary_index_status if needed
summary_index_status = None
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
if has_summary_index and document.need_summary is True:
# Get all segments for this document (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id)
.where(
DocumentSegment.document_id == document_id,
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == tenant_id,
)
.all()
)
segment_ids = [seg.id for seg in segments]
if segment_ids:
from models.dataset import DocumentSegmentSummary
# Get all summary records for these segments
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Check if there are any "not_started" or "generating" status summaries
has_pending_summaries = any(
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
and summaries[segment_id] in ("not_started", "generating")
for segment_id in segment_ids
)
if has_pending_summaries:
summary_index_status = "SUMMARIZING"
else:
summary_index_status = None
else:
summary_index_status = None
if metadata == "only":
response = {"id": document.id, "doc_type": document.doc_type, "doc_metadata": document.doc_metadata_details}
elif metadata == "without":
@ -626,6 +752,8 @@ class DocumentApi(DatasetApiResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"summary_index_status": summary_index_status,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
else:
dataset_process_rules = DatasetService.get_process_rules(dataset_id)
@ -661,6 +789,8 @@ class DocumentApi(DatasetApiResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"summary_index_status": summary_index_status,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
return response

View File

@ -212,6 +212,7 @@ class DatasetService:
embedding_model_provider: str | None = None,
embedding_model_name: str | None = None,
retrieval_model: RetrievalModel | None = None,
summary_index_setting: dict | None = None,
):
# check if dataset name already exists
if db.session.query(Dataset).filter_by(name=name, tenant_id=tenant_id).first():
@ -254,6 +255,8 @@ class DatasetService:
dataset.retrieval_model = retrieval_model.model_dump() if retrieval_model else None
dataset.permission = permission or DatasetPermissionEnum.ONLY_ME
dataset.provider = provider
if summary_index_setting is not None:
dataset.summary_index_setting = summary_index_setting
db.session.add(dataset)
db.session.flush()
@ -907,10 +910,6 @@ class DatasetService:
if not old_summary_setting:
return False
# If old setting was disabled, no need to regenerate (no existing summaries to regenerate)
if not old_summary_setting.get("enable"):
return False
# Compare model_name and model_provider_name
old_model_name = old_summary_setting.get("model_name")
old_model_provider = old_summary_setting.get("model_provider_name")

View File

@ -343,6 +343,9 @@ class RagPipelineDslService:
dataset.embedding_model_provider = knowledge_configuration.embedding_model_provider
elif knowledge_configuration.indexing_technique == "economy":
dataset.keyword_number = knowledge_configuration.keyword_number
# Update summary_index_setting if provided
if knowledge_configuration.summary_index_setting is not None:
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
dataset.pipeline_id = pipeline.id
self._session.add(dataset)
self._session.commit()
@ -477,6 +480,9 @@ class RagPipelineDslService:
dataset.embedding_model_provider = knowledge_configuration.embedding_model_provider
elif knowledge_configuration.indexing_technique == "economy":
dataset.keyword_number = knowledge_configuration.keyword_number
# Update summary_index_setting if provided
if knowledge_configuration.summary_index_setting is not None:
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
dataset.pipeline_id = pipeline.id
self._session.add(dataset)
self._session.commit()