fix: ensure deleted chunks are not returned in retrieval (#12520) (#12546)

## Summary
Fixes #12520 - Deleted chunks should not appear in retrieval/reference
results.

## Changes

### Core Fix
- **api/apps/chunk_app.py**: Include \doc_id\ in delete condition to
properly scope the delete operation

### Improved Error Handling
- **api/db/services/document_service.py**: Better separation of concerns
with individual try-catch blocks and proper logging for each cleanup
operation

### Doc Store Updates
- **rag/utils/es_conn.py**: Updated delete query construction to support
compound conditions
- **rag/utils/opensearch_conn.py**: Same updates for OpenSearch
compatibility

### Tests
- **test/testcases/.../test_retrieval_chunks.py**: Added
\TestDeletedChunksNotRetrievable\ class with regression tests
- **test/unit/test_delete_query_construction.py**: Unit tests for delete
query construction

## Testing
- Added regression tests that verify deleted chunks are not returned by
retrieval API
- Tests cover single chunk deletion and batch deletion scenarios
This commit is contained in:
Vedant Madane
2026-01-15 12:15:55 +05:30
committed by GitHub
parent d8192f8f17
commit ac936005e6
6 changed files with 472 additions and 48 deletions

View File

@ -223,7 +223,9 @@ async def rm():
e, doc = DocumentService.get_by_id(req["doc_id"])
if not e:
return get_data_error_result(message="Document not found!")
if not settings.docStoreConn.delete({"id": req["chunk_ids"]},
# Include doc_id in condition to properly scope the delete
condition = {"id": req["chunk_ids"], "doc_id": req["doc_id"]}
if not settings.docStoreConn.delete(condition,
search.index_name(DocumentService.get_tenant_id(req["doc_id"])),
doc.kb_id):
return get_data_error_result(message="Chunk deleting failure")

View File

@ -340,14 +340,35 @@ class DocumentService(CommonService):
def remove_document(cls, doc, tenant_id):
from api.db.services.task_service import TaskService
cls.clear_chunk_num(doc.id)
# Delete tasks first
try:
TaskService.filter_delete([Task.doc_id == doc.id])
except Exception as e:
logging.warning(f"Failed to delete tasks for document {doc.id}: {e}")
# Delete chunk images (non-critical, log and continue)
try:
cls.delete_chunk_images(doc, tenant_id)
except Exception as e:
logging.warning(f"Failed to delete chunk images for document {doc.id}: {e}")
# Delete thumbnail (non-critical, log and continue)
try:
if doc.thumbnail and not doc.thumbnail.startswith(IMG_BASE64_PREFIX):
if settings.STORAGE_IMPL.obj_exist(doc.kb_id, doc.thumbnail):
settings.STORAGE_IMPL.rm(doc.kb_id, doc.thumbnail)
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
except Exception as e:
logging.warning(f"Failed to delete thumbnail for document {doc.id}: {e}")
# Delete chunks from doc store - this is critical, log errors
try:
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
except Exception as e:
logging.error(f"Failed to delete chunks from doc store for document {doc.id}: {e}")
# Cleanup knowledge graph references (non-critical, log and continue)
try:
graph_source = settings.docStoreConn.get_fields(
settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), ["source_id"]
)
@ -360,8 +381,9 @@ class DocumentService(CommonService):
search.index_name(tenant_id), doc.kb_id)
settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}},
search.index_name(tenant_id), doc.kb_id)
except Exception:
pass
except Exception as e:
logging.warning(f"Failed to cleanup knowledge graph for document {doc.id}: {e}")
return cls.delete_by_id(doc.id)
@classmethod

View File

@ -303,32 +303,43 @@ class ESConnection(ESConnectionBase):
def delete(self, condition: dict, index_name: str, knowledgebase_id: str) -> int:
assert "_id" not in condition
condition["kb_id"] = knowledgebase_id
# Build a bool query that combines id filter with other conditions
bool_query = Q("bool")
# Handle chunk IDs if present
if "id" in condition:
chunk_ids = condition["id"]
if not isinstance(chunk_ids, list):
chunk_ids = [chunk_ids]
if not chunk_ids: # when chunk_ids is empty, delete all
qry = Q("match_all")
else:
qry = Q("ids", values=chunk_ids)
if chunk_ids:
# Filter by specific chunk IDs
bool_query.filter.append(Q("ids", values=chunk_ids))
# If chunk_ids is empty, we don't add an ids filter - rely on other conditions
# Add all other conditions as filters
for k, v in condition.items():
if k == "id":
continue # Already handled above
if k == "exists":
bool_query.filter.append(Q("exists", field=v))
elif k == "must_not":
if isinstance(v, dict):
for kk, vv in v.items():
if kk == "exists":
bool_query.must_not.append(Q("exists", field=vv))
elif isinstance(v, list):
bool_query.must.append(Q("terms", **{k: v}))
elif isinstance(v, str) or isinstance(v, int):
bool_query.must.append(Q("term", **{k: v}))
elif v is not None:
raise Exception("Condition value must be int, str or list.")
# If no filters were added, use match_all (for tenant-wide operations)
if not bool_query.filter and not bool_query.must and not bool_query.must_not:
qry = Q("match_all")
else:
qry = Q("bool")
for k, v in condition.items():
if k == "exists":
qry.filter.append(Q("exists", field=v))
elif k == "must_not":
if isinstance(v, dict):
for kk, vv in v.items():
if kk == "exists":
qry.must_not.append(Q("exists", field=vv))
elif isinstance(v, list):
qry.must.append(Q("terms", **{k: v}))
elif isinstance(v, str) or isinstance(v, int):
qry.must.append(Q("term", **{k: v}))
else:
raise Exception("Condition value must be int, str or list.")
qry = bool_query
self.logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
for _ in range(ATTEMPT_TIME):
try:

View File

@ -405,34 +405,45 @@ class OSConnection(DocStoreConnection):
return False
def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int:
qry = None
assert "_id" not in condition
condition["kb_id"] = knowledgebaseId
# Build a bool query that combines id filter with other conditions
bool_query = Q("bool")
# Handle chunk IDs if present
if "id" in condition:
chunk_ids = condition["id"]
if not isinstance(chunk_ids, list):
chunk_ids = [chunk_ids]
if not chunk_ids: # when chunk_ids is empty, delete all
qry = Q("match_all")
else:
qry = Q("ids", values=chunk_ids)
if chunk_ids:
# Filter by specific chunk IDs
bool_query.filter.append(Q("ids", values=chunk_ids))
# If chunk_ids is empty, we don't add an ids filter - rely on other conditions
# Add all other conditions as filters
for k, v in condition.items():
if k == "id":
continue # Already handled above
if k == "exists":
bool_query.filter.append(Q("exists", field=v))
elif k == "must_not":
if isinstance(v, dict):
for kk, vv in v.items():
if kk == "exists":
bool_query.must_not.append(Q("exists", field=vv))
elif isinstance(v, list):
bool_query.must.append(Q("terms", **{k: v}))
elif isinstance(v, str) or isinstance(v, int):
bool_query.must.append(Q("term", **{k: v}))
elif v is not None:
raise Exception("Condition value must be int, str or list.")
# If no filters were added, use match_all (for tenant-wide operations)
if not bool_query.filter and not bool_query.must and not bool_query.must_not:
qry = Q("match_all")
else:
qry = Q("bool")
for k, v in condition.items():
if k == "exists":
qry.filter.append(Q("exists", field=v))
elif k == "must_not":
if isinstance(v, dict):
for kk, vv in v.items():
if kk == "exists":
qry.must_not.append(Q("exists", field=vv))
elif isinstance(v, list):
qry.must.append(Q("terms", **{k: v}))
elif isinstance(v, str) or isinstance(v, int):
qry.must.append(Q("term", **{k: v}))
else:
raise Exception("Condition value must be int, str or list.")
qry = bool_query
logger.debug("OSConnection.delete query: " + json.dumps(qry.to_dict()))
for _ in range(ATTEMPT_TIME):
try:

View File

@ -15,9 +15,10 @@
#
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
import pytest
from common import retrieval_chunks
from common import add_chunk, delete_chunks, retrieval_chunks
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth
@ -310,3 +311,89 @@ class TestChunksRetrieval:
responses = list(as_completed(futures))
assert len(responses) == count, responses
assert all(future.result()["code"] == 0 for future in futures)
class TestDeletedChunksNotRetrievable:
"""Regression tests for issue #12520: deleted slices should not appear in retrieval/reference."""
@pytest.mark.p1
def test_deleted_chunk_not_in_retrieval(self, HttpApiAuth, add_document):
"""
Test that a deleted chunk is not returned by the retrieval API.
Steps:
1. Add a chunk with unique content
2. Verify the chunk is retrievable
3. Delete the chunk
4. Verify the chunk is no longer retrievable
"""
dataset_id, document_id = add_document
# Add a chunk with unique content that we can search for
unique_content = "UNIQUE_TEST_CONTENT_12520_REGRESSION"
res = add_chunk(HttpApiAuth, dataset_id, document_id, {"content": unique_content})
assert res["code"] == 0, f"Failed to add chunk: {res}"
chunk_id = res["data"]["chunk"]["id"]
# Wait for indexing to complete
sleep(2)
# Verify the chunk is retrievable
payload = {"question": unique_content, "dataset_ids": [dataset_id]}
res = retrieval_chunks(HttpApiAuth, payload)
assert res["code"] == 0, f"Retrieval failed: {res}"
chunk_ids_before = [c["id"] for c in res["data"]["chunks"]]
assert chunk_id in chunk_ids_before, f"Chunk {chunk_id} should be retrievable before deletion"
# Delete the chunk
res = delete_chunks(HttpApiAuth, dataset_id, document_id, {"chunk_ids": [chunk_id]})
assert res["code"] == 0, f"Failed to delete chunk: {res}"
# Wait for deletion to propagate
sleep(1)
# Verify the chunk is no longer retrievable
res = retrieval_chunks(HttpApiAuth, payload)
assert res["code"] == 0, f"Retrieval failed after deletion: {res}"
chunk_ids_after = [c["id"] for c in res["data"]["chunks"]]
assert chunk_id not in chunk_ids_after, f"Chunk {chunk_id} should NOT be retrievable after deletion"
@pytest.mark.p2
def test_deleted_chunks_batch_not_in_retrieval(self, HttpApiAuth, add_document):
"""
Test that multiple deleted chunks are not returned by retrieval.
"""
dataset_id, document_id = add_document
# Add multiple chunks with unique content
chunk_ids = []
for i in range(3):
unique_content = f"BATCH_DELETE_TEST_CHUNK_{i}_12520"
res = add_chunk(HttpApiAuth, dataset_id, document_id, {"content": unique_content})
assert res["code"] == 0, f"Failed to add chunk {i}: {res}"
chunk_ids.append(res["data"]["chunk"]["id"])
# Wait for indexing
sleep(2)
# Verify chunks are retrievable
payload = {"question": "BATCH_DELETE_TEST_CHUNK", "dataset_ids": [dataset_id]}
res = retrieval_chunks(HttpApiAuth, payload)
assert res["code"] == 0
retrieved_ids_before = [c["id"] for c in res["data"]["chunks"]]
for cid in chunk_ids:
assert cid in retrieved_ids_before, f"Chunk {cid} should be retrievable before deletion"
# Delete all chunks
res = delete_chunks(HttpApiAuth, dataset_id, document_id, {"chunk_ids": chunk_ids})
assert res["code"] == 0, f"Failed to delete chunks: {res}"
# Wait for deletion to propagate
sleep(1)
# Verify none of the chunks are retrievable
res = retrieval_chunks(HttpApiAuth, payload)
assert res["code"] == 0
retrieved_ids_after = [c["id"] for c in res["data"]["chunks"]]
for cid in chunk_ids:
assert cid not in retrieved_ids_after, f"Chunk {cid} should NOT be retrievable after deletion"

View File

@ -0,0 +1,291 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Unit tests for delete query construction in ES/OpenSearch connectors.
These tests verify that the delete method correctly combines chunk IDs with
other filter conditions (doc_id, kb_id) to scope deletions properly.
This addresses issue #12520: "Files of deleted slices can still be searched
and displayed in 'reference'" - caused by delete queries not properly
combining all filter conditions.
Run with: python -m pytest test/unit/test_delete_query_construction.py -v
"""
import pytest
from elasticsearch_dsl import Q, Search
class TestDeleteQueryConstruction:
"""
Tests that verify the delete query is constructed correctly to include
all necessary filter conditions (chunk IDs + doc_id + kb_id).
"""
def build_delete_query(self, condition: dict, knowledgebase_id: str) -> dict:
"""
Simulates the query construction logic from es_conn.py/opensearch_conn.py delete method.
This is extracted to test the logic without needing actual ES/OS connections.
"""
condition = condition.copy() # Don't mutate the original
condition["kb_id"] = knowledgebase_id
# Build a bool query that combines id filter with other conditions
bool_query = Q("bool")
# Handle chunk IDs if present
if "id" in condition:
chunk_ids = condition["id"]
if not isinstance(chunk_ids, list):
chunk_ids = [chunk_ids]
if chunk_ids:
# Filter by specific chunk IDs
bool_query.filter.append(Q("ids", values=chunk_ids))
# Add all other conditions as filters
for k, v in condition.items():
if k == "id":
continue # Already handled above
if k == "exists":
bool_query.filter.append(Q("exists", field=v))
elif k == "must_not":
if isinstance(v, dict):
for kk, vv in v.items():
if kk == "exists":
bool_query.must_not.append(Q("exists", field=vv))
elif isinstance(v, list):
bool_query.must.append(Q("terms", **{k: v}))
elif isinstance(v, str) or isinstance(v, int):
bool_query.must.append(Q("term", **{k: v}))
elif v is not None:
raise Exception("Condition value must be int, str or list.")
# If no filters were added, use match_all
if not bool_query.filter and not bool_query.must and not bool_query.must_not:
qry = Q("match_all")
else:
qry = bool_query
return Search().query(qry).to_dict()
def test_delete_with_chunk_ids_includes_kb_id(self):
"""
CRITICAL: When deleting by chunk IDs, kb_id MUST be included in the query.
This was the root cause of issue #12520 - the original code would
only use Q("ids") and ignore kb_id.
"""
condition = {"id": ["chunk1", "chunk2"]}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
# Verify chunk IDs filter is present
ids_filter = [f for f in query_dict.get("filter", []) if "ids" in f]
assert len(ids_filter) == 1, "Should have ids filter"
assert set(ids_filter[0]["ids"]["values"]) == {"chunk1", "chunk2"}
# Verify kb_id is also in the query (CRITICAL FIX)
must_terms = query_dict.get("must", [])
kb_id_terms = [t for t in must_terms if "term" in t and "kb_id" in t.get("term", {})]
assert len(kb_id_terms) == 1, "kb_id MUST be included when deleting by chunk IDs"
assert kb_id_terms[0]["term"]["kb_id"] == "kb123"
def test_delete_with_chunk_ids_and_doc_id(self):
"""
When deleting chunks, both chunk IDs AND doc_id should be in the query
to properly scope the deletion to a specific document.
"""
condition = {"id": ["chunk1"], "doc_id": "doc456"}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
# Verify all three conditions are present
ids_filter = [f for f in query_dict.get("filter", []) if "ids" in f]
assert len(ids_filter) == 1, "Should have ids filter"
must_terms = query_dict.get("must", [])
# Check kb_id
kb_id_terms = [t for t in must_terms if "term" in t and "kb_id" in t.get("term", {})]
assert len(kb_id_terms) == 1, "kb_id must be present"
# Check doc_id
doc_id_terms = [t for t in must_terms if "term" in t and "doc_id" in t.get("term", {})]
assert len(doc_id_terms) == 1, "doc_id must be present"
assert doc_id_terms[0]["term"]["doc_id"] == "doc456"
def test_delete_single_chunk_id_converted_to_list(self):
"""
Single chunk ID (not in a list) should be handled correctly.
"""
condition = {"id": "single_chunk"}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
ids_filter = [f for f in query_dict.get("filter", []) if "ids" in f]
assert len(ids_filter) == 1
assert ids_filter[0]["ids"]["values"] == ["single_chunk"]
def test_delete_empty_chunk_ids_uses_other_conditions(self):
"""
When chunk_ids is empty, should rely on other conditions (doc_id, kb_id).
This is used for deleting all chunks of a document.
"""
condition = {"id": [], "doc_id": "doc456"}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
# Empty chunk_ids should NOT add an ids filter
ids_filter = [f for f in query_dict.get("filter", []) if "ids" in f]
assert len(ids_filter) == 0, "Empty chunk_ids should not create ids filter"
# But kb_id and doc_id should still be present
must_terms = query_dict.get("must", [])
assert any("kb_id" in str(t) for t in must_terms), "kb_id must be present"
assert any("doc_id" in str(t) for t in must_terms), "doc_id must be present"
def test_delete_by_doc_id_only(self):
"""
Delete all chunks of a document (no specific chunk IDs).
"""
condition = {"doc_id": "doc456"}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
must_terms = query_dict.get("must", [])
# Both doc_id and kb_id should be in query
doc_terms = [t for t in must_terms if "term" in t and "doc_id" in t.get("term", {})]
kb_terms = [t for t in must_terms if "term" in t and "kb_id" in t.get("term", {})]
assert len(doc_terms) == 1
assert len(kb_terms) == 1
def test_delete_with_must_not_exists(self):
"""
Test handling of must_not with exists condition (used in graph cleanup).
"""
condition = {
"kb_id": "kb123", # Will be overwritten
"must_not": {"exists": "source_id"}
}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
must_not = query_dict.get("must_not", [])
exists_filters = [f for f in must_not if "exists" in f]
assert len(exists_filters) == 1
assert exists_filters[0]["exists"]["field"] == "source_id"
def test_delete_with_list_values(self):
"""
Test that list values use 'terms' query (plural).
"""
condition = {"knowledge_graph_kwd": ["entity", "relation"]}
query = self.build_delete_query(condition, "kb123")
query_dict = query["query"]["bool"]
must_terms = query_dict.get("must", [])
terms_query = [t for t in must_terms if "terms" in t]
assert len(terms_query) >= 1
# Find the knowledge_graph_kwd terms
kw_terms = [t for t in terms_query if "knowledge_graph_kwd" in t.get("terms", {})]
assert len(kw_terms) == 1
class TestChunkAppDeleteCondition:
"""
Tests that verify the chunk_app.py rm endpoint passes the correct
condition to docStoreConn.delete.
"""
def test_rm_endpoint_includes_doc_id_in_condition(self):
"""
The /chunk/rm endpoint MUST include doc_id in the condition
passed to settings.docStoreConn.delete.
This is the fix applied to api/apps/chunk_app.py
"""
# Simulate what the rm endpoint should construct
req = {
"doc_id": "doc123",
"chunk_ids": ["chunk1", "chunk2"]
}
# This is what the FIXED code should produce:
correct_condition = {
"id": req["chunk_ids"],
"doc_id": req["doc_id"] # <-- CRITICAL: doc_id must be included
}
# Verify doc_id is in the condition
assert "doc_id" in correct_condition, "doc_id MUST be in delete condition"
assert correct_condition["doc_id"] == "doc123"
# Verify chunk IDs are in the condition
assert "id" in correct_condition
assert correct_condition["id"] == ["chunk1", "chunk2"]
class TestSDKDocDeleteCondition:
"""
Tests that verify the SDK doc.py rm_chunk endpoint constructs
the correct deletion condition.
"""
def test_sdk_rm_chunk_includes_doc_id(self):
"""
The SDK /datasets/<id>/documents/<id>/chunks DELETE endpoint
should include doc_id in the condition.
"""
# Simulate SDK request
document_id = "doc456"
chunk_ids = ["chunk1", "chunk2"]
# The CORRECT condition construction (from sdk/doc.py):
condition = {"doc_id": document_id}
if chunk_ids:
condition["id"] = chunk_ids
assert condition == {
"doc_id": "doc456",
"id": ["chunk1", "chunk2"]
}
def test_sdk_rm_chunk_all_chunks(self):
"""
When no chunk_ids specified, delete all chunks of the document.
"""
document_id = "doc456"
chunk_ids = [] # Delete all
condition = {"doc_id": document_id}
if chunk_ids:
condition["id"] = chunk_ids
# When no chunk_ids, only doc_id should be in condition
assert condition == {"doc_id": "doc456"}
assert "id" not in condition
if __name__ == "__main__":
pytest.main([__file__, "-v"])