feat: Optimize codes.

This commit is contained in:
FFXN
2026-01-28 15:59:54 +08:00
parent b730017bcd
commit 82fa219cf1
4 changed files with 612 additions and 523 deletions

View File

@ -41,7 +41,7 @@ from fields.document_fields import (
from libs.datetime_utils import naive_utc_now
from libs.login import current_account_with_tenant, login_required
from models import DatasetProcessRule, Document, DocumentSegment, UploadFile
from models.dataset import DocumentPipelineExecutionLog, DocumentSegmentSummary
from models.dataset import DocumentPipelineExecutionLog
from services.dataset_service import DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig, ProcessRule, RetrievalModel
from services.file_service import FileService
@ -329,66 +329,14 @@ class DatasetDocumentListApi(Resource):
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map: dict[str, str | None] = {}
if has_summary_index and document_ids_need_summary:
# Get all segments for these documents (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id, DocumentSegment.document_id)
.where(
DocumentSegment.document_id.in_(document_ids_need_summary),
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == current_tenant_id,
)
.all()
from services.summary_index_service import SummaryIndexService
summary_status_map = SummaryIndexService.get_documents_summary_index_status(
document_ids=document_ids_need_summary,
dataset_id=dataset_id,
tenant_id=current_tenant_id,
)
# Group segments by document_id
document_segments_map: dict[str, list[str]] = {}
for segment in segments:
doc_id = str(segment.document_id)
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
if all_segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(all_segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is None (not started)
summary_status_map[doc_id] = None
continue
# Check if there are any "not_started" or "generating" status summaries
# Only check enabled=True summaries (already filtered in query)
# If segment has no summary record (summaries.get returns None),
# it means the summary is disabled (enabled=False) or not created yet, ignore it
has_pending_summaries = any(
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
and summaries[segment_id] in ("not_started", "generating")
for segment_id in segment_ids
)
if has_pending_summaries:
# Task is still running (not started or generating)
summary_status_map[doc_id] = "SUMMARIZING"
else:
# All enabled=True summaries are "completed" or "error", task finished
# Or no enabled=True summaries exist (all disabled)
summary_status_map[doc_id] = None
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
@ -1491,15 +1439,12 @@ class DocumentSummaryStatusApi(DocumentResource):
segment_ids = [segment.id for segment in segments]
summaries = []
if segment_ids:
summaries = (
db.session.query(DocumentSegmentSummary)
.filter(
DocumentSegmentSummary.document_id == document_id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.all()
from services.summary_index_service import SummaryIndexService
summaries = SummaryIndexService.get_document_summaries(
document_id=document_id,
dataset_id=dataset_id,
segment_ids=segment_ids,
)
# Create a mapping of chunk_id to summary

View File

@ -32,7 +32,7 @@ from extensions.ext_redis import redis_client
from fields.segment_fields import child_chunk_fields, segment_fields
from libs.helper import escape_like_pattern
from libs.login import current_account_with_tenant, login_required
from models.dataset import ChildChunk, DocumentSegment, DocumentSegmentSummary
from models.dataset import ChildChunk, DocumentSegment
from models.model import UploadFile
from services.dataset_service import DatasetService, DocumentService, SegmentService
from services.entities.knowledge_entities.knowledge_entities import ChildChunkUpdateArgs, SegmentUpdateArgs
@ -43,17 +43,11 @@ from tasks.batch_create_segment_to_index_task import batch_create_segment_to_ind
def _get_segment_with_summary(segment, dataset_id):
"""Helper function to marshal segment and add summary information."""
from services.summary_index_service import SummaryIndexService
segment_dict = dict(marshal(segment, segment_fields))
# Query summary for this segment (only enabled summaries)
summary = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id == segment.id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.first()
)
summary = SummaryIndexService.get_segment_summary(segment_id=segment.id, dataset_id=dataset_id)
segment_dict["summary"] = summary.summary_content if summary else None
return segment_dict
@ -203,17 +197,12 @@ class DatasetDocumentSegmentListApi(Resource):
segment_ids = [segment.id for segment in segments.items]
summaries = {}
if segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
)
.all()
)
# Only include enabled summaries
from services.summary_index_service import SummaryIndexService
summary_records = SummaryIndexService.get_segments_summaries(segment_ids=segment_ids, dataset_id=dataset_id)
# Only include enabled summaries (already filtered by service)
summaries = {
summary.chunk_id: summary.summary_content for summary in summary_records if summary.enabled is True
chunk_id: summary.summary_content for chunk_id, summary in summary_records.items()
}
# Add summary to each segment

View File

@ -45,6 +45,7 @@ from services.entities.knowledge_entities.knowledge_entities import (
Segmentation,
)
from services.file_service import FileService
from services.summary_index_service import SummaryIndexService
class DocumentTextCreatePayload(BaseModel):
@ -518,68 +519,12 @@ class DocumentListApi(DatasetApiResource):
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map: dict[str, str | None] = {}
if has_summary_index and document_ids_need_summary:
# Get all segments for these documents (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id, DocumentSegment.document_id)
.where(
DocumentSegment.document_id.in_(document_ids_need_summary),
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == tenant_id,
)
.all()
summary_status_map = SummaryIndexService.get_documents_summary_index_status(
document_ids=document_ids_need_summary,
dataset_id=dataset_id,
tenant_id=tenant_id,
)
# Group segments by document_id
document_segments_map: dict[str, list[str]] = {}
for segment in segments:
doc_id = str(segment.document_id)
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
if all_segment_ids:
from models.dataset import DocumentSegmentSummary
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(all_segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is None (not started)
summary_status_map[doc_id] = None # type: ignore[assignment]
continue
# Check if there are any "not_started" or "generating" status summaries
# Only check enabled=True summaries (already filtered in query)
# If segment has no summary record (summaries.get returns None),
# it means the summary is disabled (enabled=False) or not created yet, ignore it
has_pending_summaries = any(
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
and summaries[segment_id] in ("not_started", "generating")
for segment_id in segment_ids
)
if has_pending_summaries:
# Task is still running (not started or generating)
summary_status_map[doc_id] = "SUMMARIZING"
else:
# All enabled=True summaries are "completed" or "error", task finished
# Or no enabled=True summaries exist (all disabled)
summary_status_map[doc_id] = None # type: ignore[assignment]
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
@ -697,46 +642,11 @@ class DocumentApi(DatasetApiResource):
summary_index_status = None
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
if has_summary_index and document.need_summary is True:
# Get all segments for this document (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id)
.where(
DocumentSegment.document_id == document_id,
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == tenant_id,
)
.all()
summary_index_status = SummaryIndexService.get_document_summary_index_status(
document_id=document_id,
dataset_id=dataset_id,
tenant_id=tenant_id,
)
segment_ids = [seg.id for seg in segments]
if segment_ids:
from models.dataset import DocumentSegmentSummary
# Get all summary records for these segments
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Check if there are any "not_started" or "generating" status summaries
has_pending_summaries = any(
summaries.get(segment_id) is not None # Ensure summary exists (enabled=True)
and summaries[segment_id] in ("not_started", "generating")
for segment_id in segment_ids
)
if has_pending_summaries:
summary_index_status = "SUMMARIZING"
else:
summary_index_status = None
else:
summary_index_status = None
if metadata == "only":
response = {"id": document.id, "doc_type": document.doc_type, "doc_metadata": document.doc_metadata_details}

File diff suppressed because it is too large Load Diff