Files
ragflow/test/unit_test/rag/graphrag/test_merge_graph_nodes.py
Preston Percival e8f19aa338 feat(graphrag): fix merge concurrency and add resume-from-checkpoint (#14238)
This PR addresses three related GraphRAG reliability issues that
together allow long-running GraphRAG tasks (10+ hours of LLM extraction)
to be resumed after a crash or pause without re-doing completed work. It
builds on #14096 (per-doc subgraph cache) and extends the same idea to
the resolution and community-detection phases.

Fixes #14236.

## 1. Fix concurrent merge crash

Long GraphRAG runs would crash near the end of entity resolution with:
```
RuntimeError: dictionary keys changed during iteration
```
in `Extractor._merge_graph_nodes`. Two changes:

- `rag/graphrag/general/extractor.py`: snapshot `graph.neighbors(node1)`
via `list(...)` before iterating, so concurrent `add_edge` /
`remove_node` mutations on the shared `nx.Graph` cannot invalidate the
iterator. Also tracks each redirected neighbour in `node0_neighbors` so
a later merged node sharing the same external neighbour takes the
edge-merge branch instead of overwriting via `add_edge`.
- `rag/graphrag/entity_resolution.py`: serialize the merge step with a
dedicated `asyncio.Semaphore(1)`. `nx.Graph` is not thread-safe and
concurrent merges on overlapping neighbourhoods can produce incorrect
results even with the snapshot fix.

## 2. Don't wipe partial graph on pause

Previously the pause / cancel UI path called
`settings.docStoreConn.delete({"knowledge_graph_kwd": [...]}, ...)`,
destroying every subgraph, entity, relation, and graph row.
Re-triggering then started GraphRAG from scratch even though #14096 had
already added `load_subgraph_from_store`.

After main was merged in (which deleted `api/apps/kb_app.py` per
#14394), the pause path now lives on the new REST surface `DELETE
/v1/datasets/<id>/<index_type>`:

- `api/apps/services/dataset_api_service.py`: `delete_index` accepts a
`wipe: bool = True` parameter. When `False` the doc-store rows and
GraphRAG phase markers are left intact and only the running task is
cancelled. Default preserves historical behaviour.
- `api/apps/restful_apis/dataset_api.py`: parses `?wipe=false|0|no|off`
from the query string and forwards it.
- `web/src/utils/api.ts` + `web/src/services/knowledge-service.ts`:
`unbindPipelineTask` appends `?wipe=false` when explicitly false.
- The GraphRAG pause action in
`web/src/pages/dataset/dataset/generate-button/hook.ts` passes `wipe:
false` for `KnowledgeGraph`; raptor is unchanged.

**UX impact:** the pause icon next to a running GraphRAG task no longer
wipes graph data. The only path that still wipes is the explicit Delete
action in `GenerateLogButton` (trash icon behind a confirmation modal).

## 3. Phase-completion markers (`rag/graphrag/phase_markers.py`)

A small Redis-backed marker layer at
`graphrag:phase:{kb_id}:{resolution_done|community_done}` (7-day TTL).
`run_graphrag_for_kb` consults the markers on entry and skips phases
that already completed in a prior run. Markers are cleared automatically
when:
- new docs are merged into the graph (which invalidates prior resolution
and community results),
- `delete_index` wipes the graph, or
- `delete_knowledge_graph` is called.

Redis failures never block a run -- markers are an optimization, not a
gate.

## 4. Idempotent community detection

`extract_community` previously did `delete-then-insert` on
`community_report` rows; a crash mid-insert left the dataset with no
reports. Now report IDs are derived deterministically from `(kb_id,
community.title)`, the existing report IDs are snapshotted before
insert, new rows are written, then only stale rows are pruned. A failure
at any step leaves either the prior or the new report set intact --
never a partial mix.

## 5. Tunable doc-store insert pipeline

The GraphRAG insert loop in `rag/graphrag/utils.py` and the
`community_report` insert in `rag/graphrag/general/index.py` were both
hardcoded to `es_bulk_size = 4` and ran strictly sequentially. On a real
KB this meant 1077 chunks took ~21 minutes for a 100-chunk slice -- pure
round-trip overhead.

- New `insert_chunks_bounded()` helper in `rag/graphrag/utils.py`
batches inserts via a bounded `asyncio.Semaphore`. Same retry / timeout
semantics as the prior loop.
- Defaults: 64 docs per batch, 4 batches in flight (matches the regular
ingest pipeline in `document_service.py`). Tunable per-deployment via
`GRAPHRAG_INSERT_BULK_SIZE` and `GRAPHRAG_INSERT_CONCURRENCY`.
- Both `set_graph` and `extract_community` now use the helper.

This dropped the same 1077-chunk insert from minutes to seconds in local
testing without measurable extra pressure on Infinity (total in-flight
docs ≤ `BULK_SIZE × CONCURRENCY` = 256 by default).

## Tests

- `test/unit_test/rag/graphrag/test_merge_graph_nodes.py` (3 tests):
dense neighbourhood merge, neighbour-snapshot regression, concurrent
serialized merges.
- `test/unit_test/rag/graphrag/test_phase_markers.py` (4 tests): set/has
round-trip, kb-scoped clear, no-op on empty input, graceful Redis
failure.
-
`test/testcases/test_web_api/test_dataset_management/test_dataset_sdk_routes_unit.py`:
new `test_delete_index_wipe_flag_unit` covers `wipe=false` for both
GraphRAG and raptor on the new REST route, and confirms the default
still wipes and clears phase markers.

## Compatibility

- Backward compatible: tasks queued before this change behave
identically (default `wipe=true`, no markers expected).
- No schema/migration changes; all new state lives in Redis.
- New optional REST query param `wipe` on `DELETE
/v1/datasets/<id>/<index_type>`.
- New optional env vars `GRAPHRAG_INSERT_BULK_SIZE` and
`GRAPHRAG_INSERT_CONCURRENCY`; defaults preserve safe behaviour.

## Example of resume

Screenshot below shows a test resuming knowledge graph generation after
applying the concurrency fix and re-deploying.

<img width="521" height="677" alt="image"
src="https://github.com/user-attachments/assets/9ef0d405-cbb3-420d-a1a1-e51f3e7e9b7a"
/>

### Type of change

- [X] Bug Fix (non-breaking change which fixes an issue)
- [ ] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
2026-05-06 15:01:01 +08:00

143 lines
4.4 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Regression tests for Extractor._merge_graph_nodes concurrency bug.
The historical implementation iterated over ``graph.neighbors(node1)`` directly
while mutating ``graph`` in the loop body (``add_edge`` / ``remove_node``).
Under concurrent merges on overlapping neighbourhoods this raised
``RuntimeError: dictionary keys changed during iteration``.
The fix snapshots the neighbour list. These tests pin that behaviour so the
bug cannot silently regress.
"""
import asyncio
from types import SimpleNamespace
import networkx as nx
import pytest
from rag.graphrag.general.extractor import Extractor
from rag.graphrag.utils import GraphChange
def _stub_extractor() -> Extractor:
llm = SimpleNamespace(llm_name="test-llm", max_length=4096)
ext = Extractor.__new__(Extractor)
ext._llm = llm
ext._language = "English"
async def _summary(_name, desc, task_id=""):
return desc
ext._handle_entity_relation_summary = _summary # type: ignore[assignment]
return ext
def _make_node(graph: nx.Graph, name: str) -> None:
graph.add_node(
name,
description=f"desc-{name}",
source_id=[name],
entity_type="person",
)
def _make_edge(graph: nx.Graph, src: str, tgt: str) -> None:
graph.add_edge(
src,
tgt,
src_id=src,
tgt_id=tgt,
description=f"{src}->{tgt}",
weight=1.0,
keywords=[],
source_id=[src],
)
@pytest.mark.p1
@pytest.mark.asyncio
async def test_merge_graph_nodes_handles_dense_neighbourhood():
"""A node with many neighbours must merge cleanly without raising."""
graph = nx.Graph()
for name in ["A", "B"] + [f"N{i}" for i in range(20)]:
_make_node(graph, name)
for i in range(20):
_make_edge(graph, "A", f"N{i}")
_make_edge(graph, "B", f"N{i}")
ext = _stub_extractor()
change = GraphChange()
await ext._merge_graph_nodes(graph, ["A", "B"], change)
assert "B" not in graph.nodes
assert "A" in graph.nodes
# All 20 N* neighbours should still be connected to the surviving node A
assert set(graph.neighbors("A")) == {f"N{i}" for i in range(20)}
@pytest.mark.p1
@pytest.mark.asyncio
async def test_merge_graph_nodes_neighbours_are_snapshotted():
"""Regression: iterating graph.neighbors() must not explode if the
underlying adjacency dict is mutated during the loop."""
graph = nx.Graph()
for name in ["A", "B", "C", "D"]:
_make_node(graph, name)
# B and C share neighbour D, so merging {A, B} adds edge A-D while
# the neighbour iterator for B is live.
_make_edge(graph, "B", "C")
_make_edge(graph, "B", "D")
_make_edge(graph, "A", "D")
ext = _stub_extractor()
change = GraphChange()
await ext._merge_graph_nodes(graph, ["A", "B"], change)
assert "B" not in graph.nodes
assert graph.has_edge("A", "C")
assert graph.has_edge("A", "D")
@pytest.mark.p1
@pytest.mark.asyncio
async def test_concurrent_merges_do_not_raise_under_semaphore():
"""Two concurrent merges on overlapping neighbourhoods must succeed
when serialized (as entity_resolution now does via Semaphore(1))."""
graph = nx.Graph()
for name in ["A1", "A2", "B1", "B2", "X"]:
_make_node(graph, name)
_make_edge(graph, "A1", "X")
_make_edge(graph, "A2", "X")
_make_edge(graph, "B1", "X")
_make_edge(graph, "B2", "X")
ext = _stub_extractor()
change = GraphChange()
sem = asyncio.Semaphore(1)
async def merge(nodes):
async with sem:
await ext._merge_graph_nodes(graph, nodes, change)
await asyncio.gather(merge(["A1", "A2"]), merge(["B1", "B2"]))
assert "A2" not in graph.nodes and "B2" not in graph.nodes
# Both survivors must still share neighbour X
assert graph.has_edge("A1", "X")
assert graph.has_edge("B1", "X")