mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-04 01:07:48 +08:00
fix: re-chunk documents when data source content is updated (#12918)
Closes: #12889 ### What problem does this PR solve? When syncing external data sources (e.g., Jira, Confluence, Google Drive), updated documents were not being re-chunked. The raw content was correctly updated in blob storage, but the vector database retained stale chunks, causing search results to return outdated information. **Root cause:** The task digest used for chunk reuse optimization was calculated only from parser configuration fields (`parser_id`, `parser_config`, `kb_id`, etc.), without any content-dependent fields. When a document's content changed but the parser configuration remained the same, the system incorrectly reused old chunks instead of regenerating new ones. **Example scenario:** 1. User syncs a Jira issue: "Meeting scheduled for Monday" 2. User updates the Jira issue to: "Meeting rescheduled to Friday" 3. User triggers sync again 4. Raw content panel shows updated text ✓ 5. Chunk panel still shows old text "Monday" ✗ **Solution:** 1. Include `update_time` and `size` in the chunking config, so the task digest changes when document content is updated 2. Track updated documents separately in `upload_document()` and return them for processing 3. Process updated documents through the re-parsing pipeline to regenerate chunks [1.webm](https://github.com/user-attachments/assets/d21d4dcd-e189-4d39-8700-053bae0ca5a0) ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@ -908,6 +908,8 @@ class Document(DataBaseModel):
|
||||
process_duration = FloatField(default=0)
|
||||
suffix = CharField(max_length=32, null=False, help_text="The real file extension suffix", index=True)
|
||||
|
||||
content_hash = CharField(max_length=32, null=True, help_text="xxhash128 of document content for change detection", default="", index=True)
|
||||
|
||||
run = CharField(max_length=1, null=True, help_text="start to run processing or cancel.(1: run it; 2: cancel)", default="0", index=True)
|
||||
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
|
||||
|
||||
@ -1523,6 +1525,7 @@ def migrate_db():
|
||||
alter_db_add_column(migrator, "api_4_conversation", "exp_user_id", CharField(max_length=255, null=True, help_text="exp_user_id", index=True))
|
||||
# Migrate system_settings.value from CharField to TextField for longer sandbox configs
|
||||
alter_db_column_type(migrator, "system_settings", "value", TextField(null=False, help_text="Configuration value (JSON, string, etc.)"))
|
||||
alter_db_add_column(migrator, "document", "content_hash", CharField(max_length=32, null=True, help_text="xxhash128 of document content for change detection", default="", index=True))
|
||||
update_tenant_llm_to_id_primary_key()
|
||||
alter_db_add_column(migrator, "tenant", "tenant_llm_id", IntegerField(null=True, help_text="id in tenant_llm", index=True))
|
||||
alter_db_add_column(migrator, "tenant", "tenant_embd_id", IntegerField(null=True, help_text="id in tenant_llm", index=True))
|
||||
|
||||
@ -683,6 +683,8 @@ class DocumentService(CommonService):
|
||||
cls.model.kb_id,
|
||||
cls.model.parser_id,
|
||||
cls.model.parser_config,
|
||||
cls.model.size,
|
||||
cls.model.content_hash,
|
||||
Knowledgebase.language,
|
||||
Knowledgebase.embd_id,
|
||||
Tenant.id.alias("tenant_id"),
|
||||
|
||||
@ -23,6 +23,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
import xxhash
|
||||
from peewee import fn
|
||||
|
||||
from api.db import KNOWLEDGEBASE_FOLDER_NAME, FileType
|
||||
@ -442,11 +443,20 @@ class FileService(CommonService):
|
||||
doc_id = file.id if hasattr(file, "id") else get_uuid()
|
||||
e, doc = DocumentService.get_by_id(doc_id)
|
||||
if e:
|
||||
blob = file.read()
|
||||
settings.STORAGE_IMPL.put(kb.id, doc.location, blob, kb.tenant_id)
|
||||
doc.size = len(blob)
|
||||
doc = doc.to_dict()
|
||||
DocumentService.update_by_id(doc["id"], doc)
|
||||
try:
|
||||
blob = file.read()
|
||||
new_hash = xxhash.xxh128(blob).hexdigest()
|
||||
old_hash = doc.content_hash or ""
|
||||
settings.STORAGE_IMPL.put(kb.id, doc.location, blob, kb.tenant_id)
|
||||
doc.size = len(blob)
|
||||
doc.content_hash = new_hash
|
||||
doc = doc.to_dict()
|
||||
DocumentService.update_by_id(doc["id"], doc)
|
||||
if new_hash != old_hash:
|
||||
files.append((doc, blob))
|
||||
except Exception as exc:
|
||||
logging.exception(f"Failed to update document {doc_id}: {exc}")
|
||||
err.append(file.filename + ": " + str(exc))
|
||||
continue
|
||||
try:
|
||||
DocumentService.check_doc_health(kb.tenant_id, file.filename)
|
||||
@ -485,6 +495,7 @@ class FileService(CommonService):
|
||||
"location": location,
|
||||
"size": len(blob),
|
||||
"thumbnail": thumbnail_location,
|
||||
"content_hash": xxhash.xxh128(blob).hexdigest(),
|
||||
}
|
||||
DocumentService.insert(doc)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user