From 89961962c0281560fbe9daeb4d00ef6ea4161c8a Mon Sep 17 00:00:00 2001 From: NeedmeFordev <124189514+spider-yamet@users.noreply.github.com> Date: Wed, 6 May 2026 08:06:23 +0200 Subject: [PATCH] feat(dingtalk-ai-table): support deleted-file sync via slim snapshot (#14525) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Incremental DingTalk AI Table (Notable) sync did not reconcile rows removed on the remote side with documents already in the knowledge base. This follows the coordinated datasource work in #14362 (“sync deleted files”). This PR adds a **full slim snapshot** (`retrieve_all_slim_docs_perm_sync`) that lists **current record IDs for all sheets** without building document blobs, using the same logical document IDs as full ingest (`dingtalk_ai_table:{table_id}:{sheet_id}:{record_id}`). When **`sync_deleted_files`** is enabled on incremental runs, `DingTalkAITable._generate` returns **`(document_generator, file_list)`** so **`SyncBase`** can run **`cleanup_stale_documents_for_task`** and remove KB rows that no longer exist remotely. Design notes: - **`_document_id`** centralizes the ID string so slim snapshots and **`_convert_record_to_document`** stay aligned with **`hash128(doc.id)`** semantics used during ingestion/cleanup. - **`end_ts`** is captured before building **`file_list`**, then **`poll_source`** uses the same upper bound (consistent with other Dropbox-style connectors). - **`batch_size`** from connector config is coerced to a positive **`int`** before constructing the connector. - Slim snapshot failures are caught in **`_generate`**; **`file_list`** is set to **`None`** so cleanup is skipped rather than running on partial/error state. ### Type of change - [x] New Feature (non-breaking change which adds functionality) ### Files changed (summary) | Area | Change | |------|--------| | `common/data_source/dingtalk_ai_table_connector.py` | `SlimConnectorWithPermSync`, `retrieve_all_slim_docs_perm_sync`, `_document_id` shared with document conversion | | `rag/svr/sync_data_source.py` | `DingTalkAITable._generate`: slim snapshot + tuple return; `batch_size` validation; shared `end_ts` with `poll_source` | | `web/src/pages/user-setting/data-source/constant/index.tsx` | `syncDeletedFiles` for DingTalk AI Table in `DataSourceFeatureVisibilityMap` | Closes / relates to: #14362 --- .../dingtalk_ai_table_connector.py | 51 +++++++++++++++++-- rag/svr/sync_data_source.py | 36 +++++++++++-- .../data-source/constant/index.tsx | 3 ++ 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/common/data_source/dingtalk_ai_table_connector.py b/common/data_source/dingtalk_ai_table_connector.py index 66588d4d3..40dc44b61 100644 --- a/common/data_source/dingtalk_ai_table_connector.py +++ b/common/data_source/dingtalk_ai_table_connector.py @@ -22,8 +22,8 @@ from alibabacloud_tea_util.client import Client as UtilClient from common.data_source.config import INDEX_BATCH_SIZE, DocumentSource from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError -from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch -from common.data_source.models import Document, GenerateDocumentsOutput +from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch, SlimConnectorWithPermSync +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ class DingTalkAITableClientNotSetUpError(PermissionError): super().__init__("DingTalk Notable client is not set up. Did you forget to call load_credentials()?") -class DingTalkAITableConnector(LoadConnector, PollConnector): +class DingTalkAITableConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): """ DingTalk AI Table (Notable) connector for accessing table records. @@ -75,6 +75,9 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): self._client: NotableClient | None = None self._access_token: str | None = None + def _document_id(self, sheet_id: str, record_id: str) -> str: + return f"{_DINGTALK_AI_TABLE_DOC_ID_PREFIX}{self.table_id}:{sheet_id}:{record_id}" + def _create_client(self) -> NotableClient: """Create DingTalk Notable API client.""" config = open_api_models.Config() @@ -280,6 +283,8 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): record_id = record.get("id", "unknown") fields = record.get("fields", {}) + doc_id = self._document_id(sheet_id, str(record_id)) + # Convert fields to JSON string for blob content content = json.dumps(fields, ensure_ascii=False, indent=2) blob = content.encode("utf-8") @@ -304,7 +309,7 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): # Create document doc = Document( - id=f"{_DINGTALK_AI_TABLE_DOC_ID_PREFIX}{self.table_id}:{sheet_id}:{record_id}", + id=doc_id, source=DocumentSource.DINGTALK_AI_TABLE, semantic_identifier=semantic_identifier, extension=".json", @@ -316,6 +321,44 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): return doc + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + """ + Enumerate current record IDs for all sheets without building document blobs. + + IDs match :meth:`_convert_record_to_document` / full ingest. + """ + del callback + logger.info( + "[DingTalk Notable]: slim snapshot table_id=%s operator_id=%s", + self.table_id, + self.operator_id, + ) + sheets = self._get_all_sheets() + batch: list[SlimDocument] = [] + for sheet in sheets: + sheet_id = sheet["id"] + next_token: str | None = None + while True: + records, next_token = self._list_records( + sheet_id=sheet_id, + next_token=next_token, + ) + for record in records: + rid = record.get("id") + if not rid: + continue + batch.append(SlimDocument(id=self._document_id(sheet_id, str(rid)))) + if len(batch) >= self.batch_size: + yield batch + batch = [] + if not next_token: + break + if batch: + yield batch + def _yield_documents_from_table( self, start: SecondsSinceUnixEpoch | None = None, diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 5a5409f01..86f6ede06 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -1547,10 +1547,18 @@ class DingTalkAITable(SyncBase): """ Sync records from DingTalk AI Table (Notable). """ + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + self.connector = DingTalkAITableConnector( table_id=self.conf.get("table_id"), operator_id=self.conf.get("operator_id"), - batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE), + batch_size=batch_size, ) credentials = self.conf.get("credentials", {}) @@ -1562,14 +1570,36 @@ class DingTalkAITable(SyncBase): ) poll_start = task.get("poll_range_start") + file_list = None if task.get("reindex") == "1" or poll_start is None: document_generator = self.connector.load_from_state() _begin_info = "totally" else: + end_ts = datetime.now(timezone.utc).timestamp() + if self.conf.get("sync_deleted_files"): + file_list = [] + logging.info( + "DingTalk AI Table: fetching slim snapshot for stale-document reconciliation " + "(connector_id=%s, kb_id=%s, table_id=%s)", + task["connector_id"], + task["kb_id"], + self.conf.get("table_id"), + ) + try: + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + except Exception: + logging.exception( + "DingTalk AI Table slim snapshot failed; continuing without stale-document cleanup " + "(connector_id=%s, kb_id=%s)", + task["connector_id"], + task["kb_id"], + ) + file_list = None document_generator = self.connector.poll_source( poll_start.timestamp(), - datetime.now(timezone.utc).timestamp(), + end_ts, ) _begin_info = f"from {poll_start}" @@ -1579,7 +1609,7 @@ class DingTalkAITable(SyncBase): task, ) - return document_generator + return document_generator, file_list class MySQL(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 327bbc826..2b177f274 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -105,6 +105,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.AIRTABLE]: { syncDeletedFiles: true, }, + [DataSourceKey.DINGTALK_AI_TABLE]: { + syncDeletedFiles: true, + }, [DataSourceKey.WEBDAV]: { syncDeletedFiles: true, },