diff --git a/common/data_source/dingtalk_ai_table_connector.py b/common/data_source/dingtalk_ai_table_connector.py index 66588d4d3..40dc44b61 100644 --- a/common/data_source/dingtalk_ai_table_connector.py +++ b/common/data_source/dingtalk_ai_table_connector.py @@ -22,8 +22,8 @@ from alibabacloud_tea_util.client import Client as UtilClient from common.data_source.config import INDEX_BATCH_SIZE, DocumentSource from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError -from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch -from common.data_source.models import Document, GenerateDocumentsOutput +from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch, SlimConnectorWithPermSync +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ class DingTalkAITableClientNotSetUpError(PermissionError): super().__init__("DingTalk Notable client is not set up. Did you forget to call load_credentials()?") -class DingTalkAITableConnector(LoadConnector, PollConnector): +class DingTalkAITableConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): """ DingTalk AI Table (Notable) connector for accessing table records. @@ -75,6 +75,9 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): self._client: NotableClient | None = None self._access_token: str | None = None + def _document_id(self, sheet_id: str, record_id: str) -> str: + return f"{_DINGTALK_AI_TABLE_DOC_ID_PREFIX}{self.table_id}:{sheet_id}:{record_id}" + def _create_client(self) -> NotableClient: """Create DingTalk Notable API client.""" config = open_api_models.Config() @@ -280,6 +283,8 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): record_id = record.get("id", "unknown") fields = record.get("fields", {}) + doc_id = self._document_id(sheet_id, str(record_id)) + # Convert fields to JSON string for blob content content = json.dumps(fields, ensure_ascii=False, indent=2) blob = content.encode("utf-8") @@ -304,7 +309,7 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): # Create document doc = Document( - id=f"{_DINGTALK_AI_TABLE_DOC_ID_PREFIX}{self.table_id}:{sheet_id}:{record_id}", + id=doc_id, source=DocumentSource.DINGTALK_AI_TABLE, semantic_identifier=semantic_identifier, extension=".json", @@ -316,6 +321,44 @@ class DingTalkAITableConnector(LoadConnector, PollConnector): return doc + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + """ + Enumerate current record IDs for all sheets without building document blobs. + + IDs match :meth:`_convert_record_to_document` / full ingest. + """ + del callback + logger.info( + "[DingTalk Notable]: slim snapshot table_id=%s operator_id=%s", + self.table_id, + self.operator_id, + ) + sheets = self._get_all_sheets() + batch: list[SlimDocument] = [] + for sheet in sheets: + sheet_id = sheet["id"] + next_token: str | None = None + while True: + records, next_token = self._list_records( + sheet_id=sheet_id, + next_token=next_token, + ) + for record in records: + rid = record.get("id") + if not rid: + continue + batch.append(SlimDocument(id=self._document_id(sheet_id, str(rid)))) + if len(batch) >= self.batch_size: + yield batch + batch = [] + if not next_token: + break + if batch: + yield batch + def _yield_documents_from_table( self, start: SecondsSinceUnixEpoch | None = None, diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 5a5409f01..86f6ede06 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -1547,10 +1547,18 @@ class DingTalkAITable(SyncBase): """ Sync records from DingTalk AI Table (Notable). """ + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + self.connector = DingTalkAITableConnector( table_id=self.conf.get("table_id"), operator_id=self.conf.get("operator_id"), - batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE), + batch_size=batch_size, ) credentials = self.conf.get("credentials", {}) @@ -1562,14 +1570,36 @@ class DingTalkAITable(SyncBase): ) poll_start = task.get("poll_range_start") + file_list = None if task.get("reindex") == "1" or poll_start is None: document_generator = self.connector.load_from_state() _begin_info = "totally" else: + end_ts = datetime.now(timezone.utc).timestamp() + if self.conf.get("sync_deleted_files"): + file_list = [] + logging.info( + "DingTalk AI Table: fetching slim snapshot for stale-document reconciliation " + "(connector_id=%s, kb_id=%s, table_id=%s)", + task["connector_id"], + task["kb_id"], + self.conf.get("table_id"), + ) + try: + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + except Exception: + logging.exception( + "DingTalk AI Table slim snapshot failed; continuing without stale-document cleanup " + "(connector_id=%s, kb_id=%s)", + task["connector_id"], + task["kb_id"], + ) + file_list = None document_generator = self.connector.poll_source( poll_start.timestamp(), - datetime.now(timezone.utc).timestamp(), + end_ts, ) _begin_info = f"from {poll_start}" @@ -1579,7 +1609,7 @@ class DingTalkAITable(SyncBase): task, ) - return document_generator + return document_generator, file_list class MySQL(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 327bbc826..2b177f274 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -105,6 +105,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.AIRTABLE]: { syncDeletedFiles: true, }, + [DataSourceKey.DINGTALK_AI_TABLE]: { + syncDeletedFiles: true, + }, [DataSourceKey.WEBDAV]: { syncDeletedFiles: true, },