Merge remote-tracking branch 'origin/feat/knowledgebase-summaryIndex' into feat/knowledgebase-summaryIndex

This commit is contained in:
FFXN
2026-01-28 10:47:24 +08:00
12 changed files with 28 additions and 47 deletions

View File

@ -450,4 +450,4 @@ Requirements:
Output only the summary text. Start summarizing now:
"""
)
)

View File

@ -554,7 +554,7 @@ class RetrievalService:
# Check if this segment was retrieved via summary
# Use summary score as base score if available, otherwise 0.0
max_score = summary_score_map.get(segment.id, 0.0)
if child_chunks or attachment_infos:
child_chunk_details = []
for child_chunk in child_chunks:
@ -571,9 +571,7 @@ class RetrievalService:
for attachment_info in attachment_infos:
file_document = doc_to_document_map.get(attachment_info["id"])
if file_document:
max_score = max(
max_score, file_document.metadata.get("score", 0.0)
)
max_score = max(max_score, file_document.metadata.get("score", 0.0))
map_detail = {
"max_score": max_score,
@ -595,23 +593,23 @@ class RetrievalService:
else:
if segment.id not in include_segment_ids:
include_segment_ids.add(segment.id)
# Check if this segment was retrieved via summary
# Use summary score if available (summary retrieval takes priority)
max_score = summary_score_map.get(segment.id, 0.0)
# If not retrieved via summary, use original segment's score
if segment.id not in summary_score_map:
segment_document = doc_to_document_map.get(segment.index_node_id)
if segment_document:
max_score = max(max_score, segment_document.metadata.get("score", 0.0))
# Also consider attachment scores
for attachment_info in attachment_infos:
file_doc = doc_to_document_map.get(attachment_info["id"])
if file_doc:
max_score = max(max_score, file_doc.metadata.get("score", 0.0))
record = {
"segment": segment,
"score": max_score,

View File

@ -309,10 +309,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
errors: list[Exception] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_texts))) as executor:
futures = [
executor.submit(process, preview)
for preview in preview_texts
]
futures = [executor.submit(process, preview) for preview in preview_texts]
# Wait for all tasks to complete with timeout
done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds)

View File

@ -362,7 +362,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
For each parent chunk in preview_texts, concurrently call generate_summary to generate a summary
and write it to the summary attribute of PreviewDetail.
In preview mode (indexing-estimate), if any summary generation fails, the method will raise an exception.
Note: For parent-child structure, we only generate summaries for parent chunks.
"""
import concurrent.futures
@ -379,6 +379,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
def process(preview: PreviewDetail) -> None:
"""Generate summary for a single preview item (parent chunk)."""
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
@ -403,10 +404,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
errors: list[Exception] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_texts))) as executor:
futures = [
executor.submit(process, preview)
for preview in preview_texts
]
futures = [executor.submit(process, preview) for preview in preview_texts]
# Wait for all tasks to complete with timeout
done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds)

View File

@ -243,7 +243,7 @@ class QAIndexProcessor(BaseIndexProcessor):
) -> list[PreviewDetail]:
"""
QA model doesn't generate summaries, so this method returns preview_texts unchanged.
Note: QA model uses question-answer pairs, which don't require summary generation.
"""
# QA model doesn't generate summaries, return as-is

View File

@ -241,13 +241,13 @@ class DatasetRetrieval:
segment_content = f"question:{segment.get_sign_content()} answer:{segment.answer}"
else:
segment_content = segment.get_sign_content()
# If summary exists, prepend it to the content
if record.summary:
final_content = f"{record.summary}\n{segment_content}"
else:
final_content = segment_content
document_context_list.append(
DocumentContext(
content=final_content,
@ -321,7 +321,7 @@ class DatasetRetrieval:
else:
source.content = segment.content
# Add summary if this segment was retrieved via summary
if hasattr(record, 'summary') and record.summary:
if hasattr(record, "summary") and record.summary:
source.summary = record.summary
retrieval_resource_list.append(source)
if hit_callback and retrieval_resource_list:

View File

@ -174,13 +174,13 @@ class DatasetRetrieverTool(DatasetRetrieverBaseTool):
segment_content = f"question:{segment.get_sign_content()} answer:{segment.answer}"
else:
segment_content = segment.get_sign_content()
# If summary exists, prepend it to the content
if record.summary:
final_content = f"{record.summary}\n{segment_content}"
else:
final_content = segment_content
document_context_list.append(
DocumentContext(
content=final_content,
@ -221,7 +221,7 @@ class DatasetRetrieverTool(DatasetRetrieverBaseTool):
else:
source.content = segment.content
# Add summary if this segment was retrieved via summary
if hasattr(record, 'summary') and record.summary:
if hasattr(record, "summary") and record.summary:
source.summary = record.summary
retrieval_resource_list.append(source)

View File

@ -385,7 +385,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total)
timeout_seconds = min(300, 60 * len(preview_output["preview"]))
errors: list[Exception] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_output["preview"]))) as executor:
futures = [
executor.submit(generate_summary_for_chunk, preview_item)

View File

@ -419,7 +419,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
source["content"] = f"question:{segment.get_sign_content()} \nanswer:{segment.answer}"
else:
source["content"] = segment.get_sign_content()
# Add summary if available
# Add summary if available
if record.summary:
source["summary"] = record.summary
retrieval_resource_list.append(source)

View File

@ -748,7 +748,7 @@ class LLMNode(Node[LLMNodeData]):
page=metadata.get("page"),
doc_metadata=metadata.get("doc_metadata"),
files=context_dict.get("files"),
summary=context_dict.get("summary"),
summary=context_dict.get("summary"),
)
return source

View File

@ -3384,9 +3384,7 @@ class SegmentService:
SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, dataset.summary_index_setting
)
logger.info(
"Auto-regenerated summary for segment %s after content change", segment.id
)
logger.info("Auto-regenerated summary for segment %s after content change", segment.id)
except Exception:
logger.exception("Failed to auto-regenerate summary for segment %s", segment.id)
# Don't fail the entire update if summary regeneration fails
@ -3400,9 +3398,7 @@ class SegmentService:
try:
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
logger.info(
"Updated summary for segment %s with user-provided content", segment.id
)
logger.info("Updated summary for segment %s with user-provided content", segment.id)
except Exception:
logger.exception("Failed to update summary for segment %s", segment.id)
# Don't fail the entire update if summary update fails

View File

@ -216,7 +216,7 @@ class SummaryIndexService:
db.session.add(summary_record)
db.session.flush()
# Success, exit function
return
return
except (ConnectionError, Exception) as e:
error_str = str(e).lower()
@ -333,9 +333,7 @@ class SummaryIndexService:
error: Error message
"""
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if summary_record:
@ -344,9 +342,7 @@ class SummaryIndexService:
db.session.add(summary_record)
db.session.flush()
else:
logger.warning(
"Summary record not found for segment %s when updating error", segment.id
)
logger.warning("Summary record not found for segment %s when updating error", segment.id)
@staticmethod
def generate_and_vectorize_summary(
@ -371,16 +367,12 @@ class SummaryIndexService:
"""
# Get existing summary record (should have been created by batch_create_summary_records)
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if not summary_record:
# If not found (shouldn't happen), create one
logger.warning(
"Summary record not found for segment %s, creating one", segment.id
)
logger.warning("Summary record not found for segment %s, creating one", segment.id)
summary_record = SummaryIndexService.create_summary_record(
segment, dataset, summary_content="", status="generating"
)