mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
feat: Add summary index for knowledge. (#31625)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: zxhlyh <jasonapring2015@outlook.com> Co-authored-by: Yansong Zhang <916125788@qq.com> Co-authored-by: hj24 <mambahj24@gmail.com> Co-authored-by: CodingOnStar <hanxujiang@dify.ai> Co-authored-by: CodingOnStar <hanxujiang@dify.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@ -89,6 +89,7 @@ from tasks.disable_segments_from_index_task import disable_segments_from_index_t
|
||||
from tasks.document_indexing_update_task import document_indexing_update_task
|
||||
from tasks.enable_segments_to_index_task import enable_segments_to_index_task
|
||||
from tasks.recover_document_indexing_task import recover_document_indexing_task
|
||||
from tasks.regenerate_summary_index_task import regenerate_summary_index_task
|
||||
from tasks.remove_document_from_index_task import remove_document_from_index_task
|
||||
from tasks.retry_document_indexing_task import retry_document_indexing_task
|
||||
from tasks.sync_website_document_indexing_task import sync_website_document_indexing_task
|
||||
@ -211,6 +212,7 @@ class DatasetService:
|
||||
embedding_model_provider: str | None = None,
|
||||
embedding_model_name: str | None = None,
|
||||
retrieval_model: RetrievalModel | None = None,
|
||||
summary_index_setting: dict | None = None,
|
||||
):
|
||||
# check if dataset name already exists
|
||||
if db.session.query(Dataset).filter_by(name=name, tenant_id=tenant_id).first():
|
||||
@ -253,6 +255,8 @@ class DatasetService:
|
||||
dataset.retrieval_model = retrieval_model.model_dump() if retrieval_model else None
|
||||
dataset.permission = permission or DatasetPermissionEnum.ONLY_ME
|
||||
dataset.provider = provider
|
||||
if summary_index_setting is not None:
|
||||
dataset.summary_index_setting = summary_index_setting
|
||||
db.session.add(dataset)
|
||||
db.session.flush()
|
||||
|
||||
@ -476,6 +480,11 @@ class DatasetService:
|
||||
if external_retrieval_model:
|
||||
dataset.retrieval_model = external_retrieval_model
|
||||
|
||||
# Update summary index setting if provided
|
||||
summary_index_setting = data.get("summary_index_setting", None)
|
||||
if summary_index_setting is not None:
|
||||
dataset.summary_index_setting = summary_index_setting
|
||||
|
||||
# Update basic dataset properties
|
||||
dataset.name = data.get("name", dataset.name)
|
||||
dataset.description = data.get("description", dataset.description)
|
||||
@ -564,6 +573,9 @@ class DatasetService:
|
||||
# update Retrieval model
|
||||
if data.get("retrieval_model"):
|
||||
filtered_data["retrieval_model"] = data["retrieval_model"]
|
||||
# update summary index setting
|
||||
if data.get("summary_index_setting"):
|
||||
filtered_data["summary_index_setting"] = data.get("summary_index_setting")
|
||||
# update icon info
|
||||
if data.get("icon_info"):
|
||||
filtered_data["icon_info"] = data.get("icon_info")
|
||||
@ -572,12 +584,27 @@ class DatasetService:
|
||||
db.session.query(Dataset).filter_by(id=dataset.id).update(filtered_data)
|
||||
db.session.commit()
|
||||
|
||||
# Reload dataset to get updated values
|
||||
db.session.refresh(dataset)
|
||||
|
||||
# update pipeline knowledge base node data
|
||||
DatasetService._update_pipeline_knowledge_base_node_data(dataset, user.id)
|
||||
|
||||
# Trigger vector index task if indexing technique changed
|
||||
if action:
|
||||
deal_dataset_vector_index_task.delay(dataset.id, action)
|
||||
# If embedding_model changed, also regenerate summary vectors
|
||||
if action == "update":
|
||||
regenerate_summary_index_task.delay(
|
||||
dataset.id,
|
||||
regenerate_reason="embedding_model_changed",
|
||||
regenerate_vectors_only=True,
|
||||
)
|
||||
|
||||
# Note: summary_index_setting changes do not trigger automatic regeneration of existing summaries.
|
||||
# The new setting will only apply to:
|
||||
# 1. New documents added after the setting change
|
||||
# 2. Manual summary generation requests
|
||||
|
||||
return dataset
|
||||
|
||||
@ -616,6 +643,7 @@ class DatasetService:
|
||||
knowledge_index_node_data["chunk_structure"] = dataset.chunk_structure
|
||||
knowledge_index_node_data["indexing_technique"] = dataset.indexing_technique # pyright: ignore[reportAttributeAccessIssue]
|
||||
knowledge_index_node_data["keyword_number"] = dataset.keyword_number
|
||||
knowledge_index_node_data["summary_index_setting"] = dataset.summary_index_setting
|
||||
node["data"] = knowledge_index_node_data
|
||||
updated = True
|
||||
except Exception:
|
||||
@ -854,6 +882,54 @@ class DatasetService:
|
||||
)
|
||||
filtered_data["collection_binding_id"] = dataset_collection_binding.id
|
||||
|
||||
@staticmethod
|
||||
def _check_summary_index_setting_model_changed(dataset: Dataset, data: dict[str, Any]) -> bool:
|
||||
"""
|
||||
Check if summary_index_setting model (model_name or model_provider_name) has changed.
|
||||
|
||||
Args:
|
||||
dataset: Current dataset object
|
||||
data: Update data dictionary
|
||||
|
||||
Returns:
|
||||
bool: True if summary model changed, False otherwise
|
||||
"""
|
||||
# Check if summary_index_setting is being updated
|
||||
if "summary_index_setting" not in data or data.get("summary_index_setting") is None:
|
||||
return False
|
||||
|
||||
new_summary_setting = data.get("summary_index_setting")
|
||||
old_summary_setting = dataset.summary_index_setting
|
||||
|
||||
# If new setting is disabled, no need to regenerate
|
||||
if not new_summary_setting or not new_summary_setting.get("enable"):
|
||||
return False
|
||||
|
||||
# If old setting doesn't exist, no need to regenerate (no existing summaries to regenerate)
|
||||
# Note: This task only regenerates existing summaries, not generates new ones
|
||||
if not old_summary_setting:
|
||||
return False
|
||||
|
||||
# Compare model_name and model_provider_name
|
||||
old_model_name = old_summary_setting.get("model_name")
|
||||
old_model_provider = old_summary_setting.get("model_provider_name")
|
||||
new_model_name = new_summary_setting.get("model_name")
|
||||
new_model_provider = new_summary_setting.get("model_provider_name")
|
||||
|
||||
# Check if model changed
|
||||
if old_model_name != new_model_name or old_model_provider != new_model_provider:
|
||||
logger.info(
|
||||
"Summary index setting model changed for dataset %s: old=%s/%s, new=%s/%s",
|
||||
dataset.id,
|
||||
old_model_provider,
|
||||
old_model_name,
|
||||
new_model_provider,
|
||||
new_model_name,
|
||||
)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def update_rag_pipeline_dataset_settings(
|
||||
session: Session, dataset: Dataset, knowledge_configuration: KnowledgeConfiguration, has_published: bool = False
|
||||
@ -889,6 +965,9 @@ class DatasetService:
|
||||
else:
|
||||
raise ValueError("Invalid index method")
|
||||
dataset.retrieval_model = knowledge_configuration.retrieval_model.model_dump()
|
||||
# Update summary_index_setting if provided
|
||||
if knowledge_configuration.summary_index_setting is not None:
|
||||
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
|
||||
session.add(dataset)
|
||||
else:
|
||||
if dataset.chunk_structure and dataset.chunk_structure != knowledge_configuration.chunk_structure:
|
||||
@ -994,6 +1073,9 @@ class DatasetService:
|
||||
if dataset.keyword_number != knowledge_configuration.keyword_number:
|
||||
dataset.keyword_number = knowledge_configuration.keyword_number
|
||||
dataset.retrieval_model = knowledge_configuration.retrieval_model.model_dump()
|
||||
# Update summary_index_setting if provided
|
||||
if knowledge_configuration.summary_index_setting is not None:
|
||||
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
|
||||
session.add(dataset)
|
||||
session.commit()
|
||||
if action:
|
||||
@ -1314,6 +1396,50 @@ class DocumentService:
|
||||
upload_file = DocumentService._get_upload_file_for_upload_file_document(document)
|
||||
return file_helpers.get_signed_file_url(upload_file_id=upload_file.id, as_attachment=True)
|
||||
|
||||
@staticmethod
|
||||
def enrich_documents_with_summary_index_status(
|
||||
documents: Sequence[Document],
|
||||
dataset: Dataset,
|
||||
tenant_id: str,
|
||||
) -> None:
|
||||
"""
|
||||
Enrich documents with summary_index_status based on dataset summary index settings.
|
||||
|
||||
This method calculates and sets the summary_index_status for each document that needs summary.
|
||||
Documents that don't need summary or when summary index is disabled will have status set to None.
|
||||
|
||||
Args:
|
||||
documents: List of Document instances to enrich
|
||||
dataset: Dataset instance containing summary_index_setting
|
||||
tenant_id: Tenant ID for summary status lookup
|
||||
"""
|
||||
# Check if dataset has summary index enabled
|
||||
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
|
||||
|
||||
# Filter documents that need summary calculation
|
||||
documents_need_summary = [doc for doc in documents if doc.need_summary is True]
|
||||
document_ids_need_summary = [str(doc.id) for doc in documents_need_summary]
|
||||
|
||||
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
|
||||
summary_status_map: dict[str, str | None] = {}
|
||||
if has_summary_index and document_ids_need_summary:
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
summary_status_map = SummaryIndexService.get_documents_summary_index_status(
|
||||
document_ids=document_ids_need_summary,
|
||||
dataset_id=dataset.id,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
|
||||
# Add summary_index_status to each document
|
||||
for document in documents:
|
||||
if has_summary_index and document.need_summary is True:
|
||||
# Get status from map, default to None (not queued yet)
|
||||
document.summary_index_status = summary_status_map.get(str(document.id)) # type: ignore[attr-defined]
|
||||
else:
|
||||
# Return null if summary index is not enabled or document doesn't need summary
|
||||
document.summary_index_status = None # type: ignore[attr-defined]
|
||||
|
||||
@staticmethod
|
||||
def prepare_document_batch_download_zip(
|
||||
*,
|
||||
@ -1964,6 +2090,8 @@ class DocumentService:
|
||||
DuplicateDocumentIndexingTaskProxy(
|
||||
dataset.tenant_id, dataset.id, duplicate_document_ids
|
||||
).delay()
|
||||
# Note: Summary index generation is triggered in document_indexing_task after indexing completes
|
||||
# to ensure segments are available. See tasks/document_indexing_task.py
|
||||
except LockNotOwnedError:
|
||||
pass
|
||||
|
||||
@ -2268,6 +2396,11 @@ class DocumentService:
|
||||
name: str,
|
||||
batch: str,
|
||||
):
|
||||
# Set need_summary based on dataset's summary_index_setting
|
||||
need_summary = False
|
||||
if dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True:
|
||||
need_summary = True
|
||||
|
||||
document = Document(
|
||||
tenant_id=dataset.tenant_id,
|
||||
dataset_id=dataset.id,
|
||||
@ -2281,6 +2414,7 @@ class DocumentService:
|
||||
created_by=account.id,
|
||||
doc_form=document_form,
|
||||
doc_language=document_language,
|
||||
need_summary=need_summary,
|
||||
)
|
||||
doc_metadata = {}
|
||||
if dataset.built_in_field_enabled:
|
||||
@ -2505,6 +2639,7 @@ class DocumentService:
|
||||
embedding_model_provider=knowledge_config.embedding_model_provider,
|
||||
collection_binding_id=dataset_collection_binding_id,
|
||||
retrieval_model=retrieval_model.model_dump() if retrieval_model else None,
|
||||
summary_index_setting=knowledge_config.summary_index_setting,
|
||||
is_multimodal=knowledge_config.is_multimodal,
|
||||
)
|
||||
|
||||
@ -2686,6 +2821,14 @@ class DocumentService:
|
||||
if not isinstance(args["process_rule"]["rules"]["segmentation"]["max_tokens"], int):
|
||||
raise ValueError("Process rule segmentation max_tokens is invalid")
|
||||
|
||||
# valid summary index setting
|
||||
summary_index_setting = args["process_rule"].get("summary_index_setting")
|
||||
if summary_index_setting and summary_index_setting.get("enable"):
|
||||
if "model_name" not in summary_index_setting or not summary_index_setting["model_name"]:
|
||||
raise ValueError("Summary index model name is required")
|
||||
if "model_provider_name" not in summary_index_setting or not summary_index_setting["model_provider_name"]:
|
||||
raise ValueError("Summary index model provider name is required")
|
||||
|
||||
@staticmethod
|
||||
def batch_update_document_status(
|
||||
dataset: Dataset, document_ids: list[str], action: Literal["enable", "disable", "archive", "un_archive"], user
|
||||
@ -3154,6 +3297,35 @@ class SegmentService:
|
||||
if args.enabled or keyword_changed:
|
||||
# update segment vector index
|
||||
VectorService.update_segment_vector(args.keywords, segment, dataset)
|
||||
# update summary index if summary is provided and has changed
|
||||
if args.summary is not None:
|
||||
# When user manually provides summary, allow saving even if summary_index_setting doesn't exist
|
||||
# summary_index_setting is only needed for LLM generation, not for manual summary vectorization
|
||||
# Vectorization uses dataset.embedding_model, which doesn't require summary_index_setting
|
||||
if dataset.indexing_technique == "high_quality":
|
||||
# Query existing summary from database
|
||||
from models.dataset import DocumentSegmentSummary
|
||||
|
||||
existing_summary = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.where(
|
||||
DocumentSegmentSummary.chunk_id == segment.id,
|
||||
DocumentSegmentSummary.dataset_id == dataset.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
# Check if summary has changed
|
||||
existing_summary_content = existing_summary.summary_content if existing_summary else None
|
||||
if existing_summary_content != args.summary:
|
||||
# Summary has changed, update it
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
|
||||
except Exception:
|
||||
logger.exception("Failed to update summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary update fails
|
||||
else:
|
||||
segment_hash = helper.generate_text_hash(content)
|
||||
tokens = 0
|
||||
@ -3228,6 +3400,73 @@ class SegmentService:
|
||||
elif document.doc_form in (IndexStructureType.PARAGRAPH_INDEX, IndexStructureType.QA_INDEX):
|
||||
# update segment vector index
|
||||
VectorService.update_segment_vector(args.keywords, segment, dataset)
|
||||
# Handle summary index when content changed
|
||||
if dataset.indexing_technique == "high_quality":
|
||||
from models.dataset import DocumentSegmentSummary
|
||||
|
||||
existing_summary = (
|
||||
db.session.query(DocumentSegmentSummary)
|
||||
.where(
|
||||
DocumentSegmentSummary.chunk_id == segment.id,
|
||||
DocumentSegmentSummary.dataset_id == dataset.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if args.summary is None:
|
||||
# User didn't provide summary, auto-regenerate if segment previously had summary
|
||||
# Auto-regeneration only happens if summary_index_setting exists and enable is True
|
||||
if (
|
||||
existing_summary
|
||||
and dataset.summary_index_setting
|
||||
and dataset.summary_index_setting.get("enable") is True
|
||||
):
|
||||
# Segment previously had summary, regenerate it with new content
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.generate_and_vectorize_summary(
|
||||
segment, dataset, dataset.summary_index_setting
|
||||
)
|
||||
logger.info("Auto-regenerated summary for segment %s after content change", segment.id)
|
||||
except Exception:
|
||||
logger.exception("Failed to auto-regenerate summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary regeneration fails
|
||||
else:
|
||||
# User provided summary, check if it has changed
|
||||
# Manual summary updates are allowed even if summary_index_setting doesn't exist
|
||||
existing_summary_content = existing_summary.summary_content if existing_summary else None
|
||||
if existing_summary_content != args.summary:
|
||||
# Summary has changed, use user-provided summary
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
|
||||
logger.info("Updated summary for segment %s with user-provided content", segment.id)
|
||||
except Exception:
|
||||
logger.exception("Failed to update summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary update fails
|
||||
else:
|
||||
# Summary hasn't changed, regenerate based on new content
|
||||
# Auto-regeneration only happens if summary_index_setting exists and enable is True
|
||||
if (
|
||||
existing_summary
|
||||
and dataset.summary_index_setting
|
||||
and dataset.summary_index_setting.get("enable") is True
|
||||
):
|
||||
from services.summary_index_service import SummaryIndexService
|
||||
|
||||
try:
|
||||
SummaryIndexService.generate_and_vectorize_summary(
|
||||
segment, dataset, dataset.summary_index_setting
|
||||
)
|
||||
logger.info(
|
||||
"Regenerated summary for segment %s after content change (summary unchanged)",
|
||||
segment.id,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to regenerate summary for segment %s", segment.id)
|
||||
# Don't fail the entire update if summary regeneration fails
|
||||
# update multimodel vector index
|
||||
VectorService.update_multimodel_vector(segment, args.attachment_ids or [], dataset)
|
||||
except Exception as e:
|
||||
@ -3616,6 +3855,39 @@ class SegmentService:
|
||||
)
|
||||
return result if isinstance(result, DocumentSegment) else None
|
||||
|
||||
@classmethod
|
||||
def get_segments_by_document_and_dataset(
|
||||
cls,
|
||||
document_id: str,
|
||||
dataset_id: str,
|
||||
status: str | None = None,
|
||||
enabled: bool | None = None,
|
||||
) -> Sequence[DocumentSegment]:
|
||||
"""
|
||||
Get segments for a document in a dataset with optional filtering.
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
dataset_id: Dataset ID
|
||||
status: Optional status filter (e.g., "completed")
|
||||
enabled: Optional enabled filter (True/False)
|
||||
|
||||
Returns:
|
||||
Sequence of DocumentSegment instances
|
||||
"""
|
||||
query = select(DocumentSegment).where(
|
||||
DocumentSegment.document_id == document_id,
|
||||
DocumentSegment.dataset_id == dataset_id,
|
||||
)
|
||||
|
||||
if status is not None:
|
||||
query = query.where(DocumentSegment.status == status)
|
||||
|
||||
if enabled is not None:
|
||||
query = query.where(DocumentSegment.enabled == enabled)
|
||||
|
||||
return db.session.scalars(query).all()
|
||||
|
||||
|
||||
class DatasetCollectionBindingService:
|
||||
@classmethod
|
||||
|
||||
@ -119,6 +119,7 @@ class KnowledgeConfig(BaseModel):
|
||||
data_source: DataSource | None = None
|
||||
process_rule: ProcessRule | None = None
|
||||
retrieval_model: RetrievalModel | None = None
|
||||
summary_index_setting: dict | None = None
|
||||
doc_form: str = "text_model"
|
||||
doc_language: str = "English"
|
||||
embedding_model: str | None = None
|
||||
@ -141,6 +142,7 @@ class SegmentUpdateArgs(BaseModel):
|
||||
regenerate_child_chunks: bool = False
|
||||
enabled: bool | None = None
|
||||
attachment_ids: list[str] | None = None
|
||||
summary: str | None = None # Summary content for summary index
|
||||
|
||||
|
||||
class ChildChunkUpdateArgs(BaseModel):
|
||||
|
||||
@ -116,6 +116,8 @@ class KnowledgeConfiguration(BaseModel):
|
||||
embedding_model: str = ""
|
||||
keyword_number: int | None = 10
|
||||
retrieval_model: RetrievalSetting
|
||||
# add summary index setting
|
||||
summary_index_setting: dict | None = None
|
||||
|
||||
@field_validator("embedding_model_provider", mode="before")
|
||||
@classmethod
|
||||
|
||||
@ -343,6 +343,9 @@ class RagPipelineDslService:
|
||||
dataset.embedding_model_provider = knowledge_configuration.embedding_model_provider
|
||||
elif knowledge_configuration.indexing_technique == "economy":
|
||||
dataset.keyword_number = knowledge_configuration.keyword_number
|
||||
# Update summary_index_setting if provided
|
||||
if knowledge_configuration.summary_index_setting is not None:
|
||||
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
|
||||
dataset.pipeline_id = pipeline.id
|
||||
self._session.add(dataset)
|
||||
self._session.commit()
|
||||
@ -477,6 +480,9 @@ class RagPipelineDslService:
|
||||
dataset.embedding_model_provider = knowledge_configuration.embedding_model_provider
|
||||
elif knowledge_configuration.indexing_technique == "economy":
|
||||
dataset.keyword_number = knowledge_configuration.keyword_number
|
||||
# Update summary_index_setting if provided
|
||||
if knowledge_configuration.summary_index_setting is not None:
|
||||
dataset.summary_index_setting = knowledge_configuration.summary_index_setting
|
||||
dataset.pipeline_id = pipeline.id
|
||||
self._session.add(dataset)
|
||||
self._session.commit()
|
||||
|
||||
1432
api/services/summary_index_service.py
Normal file
1432
api/services/summary_index_service.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user