Files
ragflow/test
Hunnyboy1217 782084780e feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?

S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.

The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.

No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.

This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.

#### Files touched

- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match

#### Worth flagging for review

Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.

On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.

#### Test plan

- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 20:03:56 +08:00
..


(1). Deploy RAGFlow services and images

https://ragflow.io/docs/build_docker_image

(2). Configure the required environment for testing

Install Python dependencies (including test dependencies):

uv sync --python 3.12 --only-group test --no-default-groups --frozen

Activate the environment:

source .venv/bin/activate

Install SDK:

uv pip install sdk/python 

Modify the .env file: Add the following code:

COMPOSE_PROFILES=${COMPOSE_PROFILES},tei-cpu
TEI_MODEL=BAAI/bge-small-en-v1.5
RAGFLOW_IMAGE=infiniflow/ragflow:v0.25.2 #Replace with the image you are using

Start the containerwait two minutes:

docker compose -f docker/docker-compose.yml up -d


(3). Test Elasticsearch

a) Run sdk tests against Elasticsearch:

export HTTP_API_TEST_LEVEL=p2
export HOST_ADDRESS=http://127.0.0.1:9380  # Ensure that this port is the API port mapped to your localhost
pytest -s --tb=short --level=${HTTP_API_TEST_LEVEL} test/testcases/test_sdk_api 

b) Run http api tests against Elasticsearch:

pytest -s --tb=short --level=${HTTP_API_TEST_LEVEL} test/testcases/test_http_api 


(4). Test Infinity

Modify the .env file:

DOC_ENGINE=${DOC_ENGINE:-infinity}

Start the container:

docker compose -f docker/docker-compose.yml down -v 
docker compose -f docker/docker-compose.yml up -d

a) Run sdk tests against Infinity:

DOC_ENGINE=infinity pytest -s --tb=short --level=${HTTP_API_TEST_LEVEL} test/testcases/test_sdk_api 

b) Run http api tests against Infinity:

DOC_ENGINE=infinity pytest -s --tb=short --level=${HTTP_API_TEST_LEVEL} test/testcases/test_http_api