From 18fbfafca6421978d094e72cf64b3f173a7a3a15 Mon Sep 17 00:00:00 2001 From: Magicbook1108 Date: Tue, 28 Apr 2026 15:07:14 +0800 Subject: [PATCH] Feat: enable sync deleted files for more connectors (#14353) ### What problem does this PR solve? Feat: enable sync delted files for connectors ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/db/services/document_service.py | 41 ++-- common/data_source/bitbucket/connector.py | 10 +- common/data_source/blob_connector.py | 131 ++++++++---- common/data_source/box_connector.py | 134 +++++++----- common/data_source/confluence_connector.py | 6 - common/data_source/github/connector.py | 4 +- common/data_source/gmail_connector.py | 6 +- common/data_source/google_drive/connector.py | 4 - common/data_source/interfaces.py | 2 - common/data_source/jira/connector.py | 24 ++- common/data_source/notion_connector.py | 116 ++++++++++- common/data_source/sharepoint_connector.py | 4 +- common/data_source/slack_connector.py | 4 +- common/data_source/teams_connector.py | 4 +- common/data_source/zendesk_connector.py | 12 +- rag/svr/sync_data_source.py | 61 +++++- test/unit_test/rag/test_sync_data_source.py | 169 +++++++++++++++ .../data-source/add-datasource-modal.tsx | 8 +- .../data-source/constant/index.tsx | 196 ++++++++---------- .../data-source/constant/jira-constant.tsx | 149 +++++++++++++ .../data-source-detail-page/index.tsx | 8 +- 21 files changed, 789 insertions(+), 304 deletions(-) create mode 100644 test/unit_test/rag/test_sync_data_source.py create mode 100644 web/src/pages/user-setting/data-source/constant/jira-constant.tsx diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index fb5463cad..5d6289e57 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -423,6 +423,9 @@ class DocumentService(CommonService): if not cls.delete_document_and_update_kb_counts(doc.id): return True + chunk_index_name = search.index_name(tenant_id) + chunk_index_exists = settings.docStoreConn.index_exist(chunk_index_name, doc.kb_id) + # Cancel all running tasks first Using preset function in task_service.py --- set cancel flag in Redis try: cancel_all_task_of(doc.id) @@ -438,7 +441,8 @@ class DocumentService(CommonService): # Delete chunk images (non-critical, log and continue) try: - cls.delete_chunk_images(doc, tenant_id) + if chunk_index_exists: + cls.delete_chunk_images(doc, tenant_id) except Exception as e: logging.warning(f"Failed to delete chunk images for document {doc.id}: {e}") @@ -452,7 +456,7 @@ class DocumentService(CommonService): # Delete chunks from doc store - this is critical, log errors try: - settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id) + settings.docStoreConn.delete({"doc_id": doc.id}, chunk_index_name, doc.kb_id) except Exception as e: logging.error(f"Failed to delete chunks from doc store for document {doc.id}: {e}") @@ -464,23 +468,24 @@ class DocumentService(CommonService): # Cleanup knowledge graph references (non-critical, log and continue) try: - graph_source = settings.docStoreConn.get_fields( - settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), - ["source_id"], - ) - if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]: - settings.docStoreConn.update( - {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id}, - {"remove": {"source_id": doc.id}}, - search.index_name(tenant_id), - doc.kb_id, - ) - settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, {"removed_kwd": "Y"}, search.index_name(tenant_id), doc.kb_id) - settings.docStoreConn.delete( - {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}}, - search.index_name(tenant_id), - doc.kb_id, + if chunk_index_exists: + graph_source = settings.docStoreConn.get_fields( + settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, chunk_index_name, [doc.kb_id]), + ["source_id"], ) + if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]: + settings.docStoreConn.update( + {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id}, + {"remove": {"source_id": doc.id}}, + chunk_index_name, + doc.kb_id, + ) + settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, {"removed_kwd": "Y"}, chunk_index_name, doc.kb_id) + settings.docStoreConn.delete( + {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}}, + chunk_index_name, + doc.kb_id, + ) except Exception as e: logging.warning(f"Failed to cleanup knowledge graph for document {doc.id}: {e}") diff --git a/common/data_source/bitbucket/connector.py b/common/data_source/bitbucket/connector.py index f355a8945..4b0240fa5 100644 --- a/common/data_source/bitbucket/connector.py +++ b/common/data_source/bitbucket/connector.py @@ -269,17 +269,11 @@ class BitbucketConnector( def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: IndexingHeartbeatInterface | None = None, ) -> Iterator[list[SlimDocument]]: """Return only document IDs for all existing pull requests.""" batch: list[SlimDocument] = [] - params = self._build_params( - fields=SLIM_PR_LIST_RESPONSE_FIELDS, - start=start, - end=end, - ) + params = self._build_params(fields=SLIM_PR_LIST_RESPONSE_FIELDS) with self._client() as client: for slug in self._iter_target_repositories(client): for pr in self._iter_pull_requests_for_repo( @@ -385,4 +379,4 @@ if __name__ == "__main__": except StopIteration as e: bitbucket_checkpoint = e.value break - \ No newline at end of file + diff --git a/common/data_source/blob_connector.py b/common/data_source/blob_connector.py index 627aa8fba..7505b878b 100644 --- a/common/data_source/blob_connector.py +++ b/common/data_source/blob_connector.py @@ -10,7 +10,6 @@ from common.data_source.utils import ( download_object, extract_size_bytes, get_file_ext, - is_accepted_file_ext, ) from common.data_source.config import BlobType, DocumentSource, BLOB_STORAGE_SIZE_THRESHOLD, INDEX_BATCH_SIZE from common.data_source.exceptions import ( @@ -19,8 +18,14 @@ from common.data_source.exceptions import ( CredentialExpiredError, InsufficientPermissionsError ) -from common.data_source.interfaces import LoadConnector, OnyxExtensionType, PollConnector -from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput +from common.data_source.interfaces import LoadConnector, PollConnector +from common.data_source.models import ( + Document, + SecondsSinceUnixEpoch, + GenerateDocumentsOutput, + GenerateSlimDocumentOutput, + SlimDocument, +) class BlobStorageConnector(LoadConnector, PollConnector): @@ -123,37 +128,7 @@ class BlobStorageConnector(LoadConnector, PollConnector): end: datetime, ) -> GenerateDocumentsOutput: """Generate bucket objects""" - if self.s3_client is None: - raise ConnectorMissingCredentialError("Blob storage") - - paginator = self.s3_client.get_paginator("list_objects_v2") - pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix) - - # Collect all objects first to count filename occurrences - all_objects = [] - extension_type = OnyxExtensionType.Plain | OnyxExtensionType.Document - if bool(self._allow_images): - extension_type |= OnyxExtensionType.Multimedia - for page in pages: - if "Contents" not in page: - continue - for obj in page["Contents"]: - key = obj["Key"] - if key.endswith("/"): - continue - last_modified = obj["LastModified"].replace(tzinfo=timezone.utc) - if not (start < last_modified <= end): - continue - file_name = os.path.basename(key) - if not is_accepted_file_ext(get_file_ext(file_name), extension_type): - continue - all_objects.append(obj) - - # Count filename occurrences to determine which need full paths - filename_counts: dict[str, int] = {} - for obj in all_objects: - file_name = os.path.basename(obj["Key"]) - filename_counts[file_name] = filename_counts.get(file_name, 0) + 1 + all_objects, filename_counts = self._collect_blob_objects(start, end) batch: list[Document] = [] for obj in all_objects: @@ -171,20 +146,15 @@ class BlobStorageConnector(LoadConnector, PollConnector): f"{file_name} exceeds size threshold of {self.size_threshold}. Skipping." ) continue - + try: - blob = download_object(self.s3_client, self.bucket_name, key, self.size_threshold) + blob = download_object( + self.s3_client, self.bucket_name, key, self.size_threshold + ) if blob is None: continue - # Use full path only if filename appears multiple times - if filename_counts.get(file_name, 0) > 1: - relative_path = key - if self.prefix and key.startswith(self.prefix): - relative_path = key[len(self.prefix):] - semantic_id = relative_path.replace('/', ' / ') if relative_path else file_name - else: - semantic_id = file_name + semantic_id = self._get_semantic_id(key, file_name, filename_counts) batch.append( Document( @@ -194,7 +164,7 @@ class BlobStorageConnector(LoadConnector, PollConnector): semantic_identifier=semantic_id, extension=get_file_ext(file_name), doc_updated_at=last_modified, - size_bytes=size_bytes if size_bytes else 0 + size_bytes=size_bytes if size_bytes else 0, ) ) if len(batch) == self.batch_size: @@ -203,7 +173,76 @@ class BlobStorageConnector(LoadConnector, PollConnector): except Exception: logging.exception(f"Error decoding object {key}") - + + if batch: + yield batch + + def _collect_blob_objects( + self, + start: datetime, + end: datetime, + ) -> tuple[list[dict[str, Any]], dict[str, int]]: + """Collect object metadata for files in the requested window.""" + if self.s3_client is None: + raise ConnectorMissingCredentialError("Blob storage") + + paginator = self.s3_client.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix) + + # Collect all objects first to count filename occurrences + all_objects: list[dict[str, Any]] = [] + for page in pages: + if "Contents" not in page: + continue + for obj in page["Contents"]: + if obj["Key"].endswith("/"): + continue + last_modified = obj["LastModified"].replace(tzinfo=timezone.utc) + if start < last_modified <= end: + all_objects.append(obj) + + filename_counts: dict[str, int] = {} + for obj in all_objects: + file_name = os.path.basename(obj["Key"]) + filename_counts[file_name] = filename_counts.get(file_name, 0) + 1 + + return all_objects, filename_counts + + def _get_semantic_id( + self, + key: str, + file_name: str, + filename_counts: dict[str, int], + ) -> str: + """Use full relative path only when filenames collide.""" + if filename_counts.get(file_name, 0) > 1: + relative_path = key + if self.prefix and key.startswith(self.prefix): + relative_path = key[len(self.prefix):] + return relative_path.replace("/", " / ") if relative_path else file_name + return file_name + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + """Return a full current snapshot of blob object IDs without downloading content.""" + del callback + + all_objects, _ = self._collect_blob_objects( + start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime.now(timezone.utc), + ) + + batch: list[SlimDocument] = [] + for obj in all_objects: + batch.append( + SlimDocument(id=f"{self.bucket_type}:{self.bucket_name}:{obj['Key']}") + ) + if len(batch) == self.batch_size: + yield batch + batch = [] + if batch: yield batch diff --git a/common/data_source/box_connector.py b/common/data_source/box_connector.py index 253029d3c..cc44f356e 100644 --- a/common/data_source/box_connector.py +++ b/common/data_source/box_connector.py @@ -1,7 +1,7 @@ """Box connector""" import logging from datetime import datetime, timezone -from typing import Any +from typing import Any, Generator from box_sdk_gen import BoxClient from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE @@ -10,21 +10,21 @@ from common.data_source.exceptions import ( ConnectorValidationError, ) from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch -from common.data_source.models import Document, GenerateDocumentsOutput +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument from common.data_source.utils import get_file_ext + class BoxConnector(LoadConnector, PollConnector): def __init__(self, folder_id: str, batch_size: int = INDEX_BATCH_SIZE, use_marker: bool = True) -> None: self.batch_size = batch_size self.folder_id = "0" if not folder_id else folder_id self.use_marker = use_marker - + self.box_client: BoxClient | None = None def load_credentials(self, auth: Any): self.box_client = BoxClient(auth=auth) return None - def validate_connector_settings(self): if self.box_client is None: raise ConnectorMissingCredentialError("Box") @@ -35,79 +35,41 @@ class BoxConnector(LoadConnector, PollConnector): logging.exception("[Box]: Failed to validate Box credentials") raise ConnectorValidationError(f"Unexpected error during Box settings validation: {e}") - - def _yield_files_recursive( - self, - folder_id: str, - start: SecondsSinceUnixEpoch | None, - end: SecondsSinceUnixEpoch | None, - relative_folder_path: str = "", - ) -> GenerateDocumentsOutput: - + def _iter_files_recursive( + self, + folder_id: str, + relative_folder_path: str = "", + ) -> Generator[tuple[Any, str], None, None]: if self.box_client is None: raise ConnectorMissingCredentialError("Box") result = self.box_client.folders.get_folder_items( folder_id=folder_id, limit=self.batch_size, - usemarker=self.use_marker + usemarker=self.use_marker, ) while True: - batch: list[Document] = [] for entry in result.entries: - if entry.type == 'file' : - file = self.box_client.files.get_file_by_id( - entry.id - ) - modified_time: SecondsSinceUnixEpoch | None = None - raw_time = ( - getattr(file, "created_at", None) - or getattr(file, "content_created_at", None) - ) - - if raw_time: - modified_time = self._box_datetime_to_epoch_seconds(raw_time) - if start is not None and modified_time <= start: - continue - if end is not None and modified_time > end: - continue - - content_bytes = self.box_client.downloads.download_file(file.id) + if entry.type == "file": + file = self.box_client.files.get_file_by_id(entry.id) semantic_identifier = ( f"{relative_folder_path} / {file.name}" if relative_folder_path else file.name ) - - batch.append( - Document( - id=f"box:{file.id}", - blob=content_bytes.read(), - source=DocumentSource.BOX, - semantic_identifier=semantic_identifier, - extension=get_file_ext(file.name), - doc_updated_at=modified_time, - size_bytes=file.size, - metadata=file.metadata - ) - ) - elif entry.type == 'folder': + yield file, semantic_identifier + elif entry.type == "folder": child_relative_path = ( f"{relative_folder_path} / {entry.name}" if relative_folder_path else entry.name ) - yield from self._yield_files_recursive( + yield from self._iter_files_recursive( folder_id=entry.id, - start=start, - end=end, - relative_folder_path=child_relative_path + relative_folder_path=child_relative_path, ) - if batch: - yield batch - if not result.next_marker: break @@ -115,9 +77,56 @@ class BoxConnector(LoadConnector, PollConnector): folder_id=folder_id, limit=self.batch_size, marker=result.next_marker, - usemarker=True + usemarker=True, ) + def _yield_files_recursive( + self, + folder_id: str, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + relative_folder_path: str = "", + ) -> GenerateDocumentsOutput: + if self.box_client is None: + raise ConnectorMissingCredentialError("Box") + + batch: list[Document] = [] + for file, semantic_identifier in self._iter_files_recursive( + folder_id=folder_id, + relative_folder_path=relative_folder_path, + ): + modified_time: SecondsSinceUnixEpoch | None = None + raw_time = ( + getattr(file, "created_at", None) + or getattr(file, "content_created_at", None) + ) + + if raw_time: + modified_time = self._box_datetime_to_epoch_seconds(raw_time) + if start is not None and modified_time <= start: + continue + if end is not None and modified_time > end: + continue + + content_bytes = self.box_client.downloads.download_file(file.id) + batch.append( + Document( + id=f"box:{file.id}", + blob=content_bytes.read(), + source=DocumentSource.BOX, + semantic_identifier=semantic_identifier, + extension=get_file_ext(file.name), + doc_updated_at=modified_time, + size_bytes=file.size, + metadata=file.metadata, + ) + ) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch def _box_datetime_to_epoch_seconds(self, dt: datetime) -> SecondsSinceUnixEpoch: """Convert a Box SDK datetime to Unix epoch seconds (UTC). @@ -133,6 +142,21 @@ class BoxConnector(LoadConnector, PollConnector): return SecondsSinceUnixEpoch(int(dt.timestamp())) + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + + batch: list[SlimDocument] = [] + for file, _semantic_identifier in self._iter_files_recursive(folder_id=self.folder_id): + batch.append(SlimDocument(id=f"box:{file.id}")) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch def poll_source(self, start, end): return self._yield_files_recursive(folder_id=self.folder_id, start=start, end=end) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index abe55b5b2..ef0d6a776 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1904,8 +1904,6 @@ class ConfluenceConnector( def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: IndexingHeartbeatInterface | None = None, ) -> GenerateSlimDocumentOutput: """ @@ -1913,16 +1911,12 @@ class ConfluenceConnector( Does not fetch actual text. Used primarily for incremental permission sync. """ return self._retrieve_all_slim_docs( - start=start, - end=end, callback=callback, include_permissions=True, ) def _retrieve_all_slim_docs( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: IndexingHeartbeatInterface | None = None, include_permissions: bool = True, ) -> GenerateSlimDocumentOutput: diff --git a/common/data_source/github/connector.py b/common/data_source/github/connector.py index 258e2cf8b..2d65c995e 100644 --- a/common/data_source/github/connector.py +++ b/common/data_source/github/connector.py @@ -964,11 +964,9 @@ class GithubConnector(CheckpointedConnectorWithPermSyncGH[GithubConnectorCheckpo def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: Any = None, ) -> GenerateSlimDocumentOutput: - yield from self.retrieve_slim_document(start=start, end=end, callback=callback) + yield from self.retrieve_slim_document(callback=callback) def build_dummy_checkpoint(self) -> GithubConnectorCheckpoint: return GithubConnectorCheckpoint( diff --git a/common/data_source/gmail_connector.py b/common/data_source/gmail_connector.py index 1421f9f4b..ea4dd993a 100644 --- a/common/data_source/gmail_connector.py +++ b/common/data_source/gmail_connector.py @@ -270,12 +270,10 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback=None, ) -> GenerateSlimDocumentOutput: """Retrieve slim documents for permission synchronization.""" - query = build_time_range_query(start, end) + query = build_time_range_query() doc_batch = [] for user_email in self._get_all_user_emails(): @@ -343,4 +341,4 @@ if __name__ == "__main__": print(f) print("\n\n") except Exception as e: - logging.exception(f"Error loading credentials: {e}") \ No newline at end of file + logging.exception(f"Error loading credentials: {e}") diff --git a/common/data_source/google_drive/connector.py b/common/data_source/google_drive/connector.py index b44c28d74..add3b775f 100644 --- a/common/data_source/google_drive/connector.py +++ b/common/data_source/google_drive/connector.py @@ -1087,8 +1087,6 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: IndexingHeartbeatInterface | None = None, ) -> GenerateSlimDocumentOutput: try: @@ -1096,8 +1094,6 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP while checkpoint.completion_stage != DriveRetrievalStage.DONE: yield from self._extract_slim_docs_from_google_drive( checkpoint=checkpoint, - start=start, - end=end, ) self.logger.info("Drive perm sync: Slim doc retrieval complete") diff --git a/common/data_source/interfaces.py b/common/data_source/interfaces.py index b68a40c1e..324293baa 100644 --- a/common/data_source/interfaces.py +++ b/common/data_source/interfaces.py @@ -60,8 +60,6 @@ class SlimConnectorWithPermSync(ABC): @abstractmethod def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: Any = None, ) -> Generator[list[SlimDocument], None, None]: """Retrieve all simplified documents (with permission sync)""" diff --git a/common/data_source/jira/connector.py b/common/data_source/jira/connector.py index db3c3f894..aa4082f41 100644 --- a/common/data_source/jira/connector.py +++ b/common/data_source/jira/connector.py @@ -149,7 +149,10 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync else: logger.warning("[Jira] Scoped token requested but Jira base URL does not appear to be an Atlassian Cloud domain; scoped token ignored.") - user_email = credentials.get("jira_user_email") or credentials.get("username") + user_email = ( + credentials.get("jira_user_email") + or credentials.get("jira_username") + ) api_token = credentials.get("jira_api_token") or credentials.get("token") or credentials.get("api_token") password = credentials.get("jira_password") or credentials.get("password") rest_api_version = credentials.get("rest_api_version") @@ -377,16 +380,14 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, - callback: Any = None, # noqa: ARG002 - maintained for interface compatibility + callback: Any = None, # noqa: ARG002 - callback interface hook ) -> Generator[list[SlimDocument], None, None]: """Return lightweight references to Jira issues (used for permission syncing).""" if not self.jira_client: raise ConnectorMissingCredentialError("Jira") - start_ts = start if start is not None else 0 - end_ts = end if end is not None else datetime.now(timezone.utc).timestamp() + start_ts = 0 + end_ts = datetime.now(timezone.utc).timestamp() jql = self._build_jql(start_ts, end_ts) checkpoint = self.build_dummy_checkpoint() @@ -962,7 +963,16 @@ def main(config: dict[str, Any] | None = None) -> None: if not base_url: raise RuntimeError("Jira base URL must be provided via config or CLI arguments.") - if not (credentials.get("jira_api_token") or (credentials.get("jira_user_email") and credentials.get("jira_password"))): + if not ( + credentials.get("jira_api_token") + or ( + ( + credentials.get("jira_user_email") + or credentials.get("jira_username") + ) + and credentials.get("jira_password") + ) + ): raise RuntimeError("Provide either an API token or both email/password for Jira authentication.") connector_options = { diff --git a/common/data_source/notion_connector.py b/common/data_source/notion_connector.py index 30536dfb9..ea3d6d076 100644 --- a/common/data_source/notion_connector.py +++ b/common/data_source/notion_connector.py @@ -28,9 +28,11 @@ from common.data_source.interfaces import ( from common.data_source.models import ( Document, GenerateDocumentsOutput, + GenerateSlimDocumentOutput, NotionBlock, NotionPage, NotionSearchResponse, + SlimDocument, TextSection, ) from common.data_source.utils import ( @@ -433,6 +435,45 @@ class NotionConnector(LoadConnector, PollConnector): return result_blocks, child_pages, attachments + def _read_slim_blocks(self, base_block_id: str) -> tuple[list[str], list[str]]: + child_pages: list[str] = [] + attachment_ids: list[str] = [] + cursor = None + + while True: + data = self._fetch_child_blocks(base_block_id, cursor) + + if data is None: + return child_pages, attachment_ids + + for result in data["results"]: + result_block_id = result["id"] + result_type = result["type"] + + if result_type in {"file", "image", "pdf", "video", "audio"}: + attachment_ids.append(result_block_id) + + if result["has_children"]: + if result_type == "child_page": + child_pages.append(result_block_id) + else: + nested_child_pages, nested_attachment_ids = self._read_slim_blocks( + result_block_id + ) + child_pages.extend(nested_child_pages) + attachment_ids.extend(nested_attachment_ids) + + if result_type == "child_database" and self.recursive_index_enabled: + _, inner_child_pages = self._read_pages_from_database(result_block_id) + child_pages.extend(inner_child_pages) + + if data["next_cursor"] is None: + break + + cursor = data["next_cursor"] + + return child_pages, attachment_ids + def _read_page_title(self, page: NotionPage) -> Optional[str]: """Extracts the title from a Notion page.""" if hasattr(page, "database_name") and page.database_name: @@ -552,6 +593,79 @@ class NotionConnector(LoadConnector, PollConnector): pages = [self._fetch_page(page_id=self.root_page_id)] yield from batch_generator(self._read_pages(pages, start, end), self.batch_size) + def _read_pages_for_slim_docs( + self, + pages: list[NotionPage], + slim_indexed_pages: set[str], + ) -> Generator[SlimDocument, None, None]: + all_child_page_ids: list[str] = [] + + for page in pages: + if isinstance(page, dict): + page = NotionPage(**page) + if page.id in slim_indexed_pages: + continue + + child_page_ids, attachment_ids = self._read_slim_blocks(page.id) + all_child_page_ids.extend(child_page_ids) + slim_indexed_pages.add(page.id) + + yield SlimDocument(id=page.id) + for attachment_id in attachment_ids: + yield SlimDocument(id=attachment_id) + + if self.recursive_index_enabled and all_child_page_ids: + for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE): + child_page_batch = [ + self._fetch_page(page_id) + for page_id in child_page_batch_ids + if page_id not in slim_indexed_pages + ] + yield from self._read_pages_for_slim_docs( + child_page_batch, + slim_indexed_pages, + ) + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + slim_indexed_pages: set[str] = set() + + if self.recursive_index_enabled and self.root_page_id: + root_pages = [self._fetch_page(page_id=self.root_page_id)] + yield from batch_generator( + self._read_pages_for_slim_docs(root_pages, slim_indexed_pages), + self.batch_size, + ) + return + + query_dict = { + "filter": {"property": "object", "value": "page"}, + "page_size": 100, + } + + slim_batch: list[SlimDocument] = [] + while True: + db_res = self._search_notion(query_dict) + pages = [NotionPage(**page) for page in db_res.results] + + for doc in self._read_pages_for_slim_docs(pages, slim_indexed_pages): + slim_batch.append(doc) + if len(slim_batch) >= self.batch_size: + yield slim_batch + slim_batch = [] + if callback: + callback.progress("notion_slim_document", 1) + + if db_res.has_more: + query_dict["start_cursor"] = db_res.next_cursor + else: + break + + if slim_batch: + yield slim_batch + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: """Applies integration token to headers.""" self.headers["Authorization"] = f"Bearer {credentials['notion_integration_token']}" @@ -653,4 +767,4 @@ if __name__ == "__main__": document_batches = connector.load_from_state() for doc_batch in document_batches: for doc in doc_batch: - print(doc) \ No newline at end of file + print(doc) diff --git a/common/data_source/sharepoint_connector.py b/common/data_source/sharepoint_connector.py index 7bc8e3410..e5684023c 100644 --- a/common/data_source/sharepoint_connector.py +++ b/common/data_source/sharepoint_connector.py @@ -112,10 +112,8 @@ class SharePointConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPe def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: Any = None, ) -> Any: """Retrieve all simplified documents with permission sync""" # Simplified implementation - return [] \ No newline at end of file + return [] diff --git a/common/data_source/slack_connector.py b/common/data_source/slack_connector.py index 5fabc3d00..162826762 100644 --- a/common/data_source/slack_connector.py +++ b/common/data_source/slack_connector.py @@ -528,8 +528,6 @@ class SlackConnector( def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: Any = None, ) -> GenerateSlimDocumentOutput: if self.client is None: @@ -662,4 +660,4 @@ if __name__ == "__main__": connector.validate_connector_settings() print("Slack connector settings validated successfully") except Exception as e: - print(f"Validation failed: {e}") \ No newline at end of file + print(f"Validation failed: {e}") diff --git a/common/data_source/teams_connector.py b/common/data_source/teams_connector.py index 0b4cd5642..98b472667 100644 --- a/common/data_source/teams_connector.py +++ b/common/data_source/teams_connector.py @@ -106,10 +106,8 @@ class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSyn def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: Any = None, ) -> Any: """Retrieve all simplified documents with permission sync""" # Simplified implementation - return [] \ No newline at end of file + return [] diff --git a/common/data_source/zendesk_connector.py b/common/data_source/zendesk_connector.py index 85b3426fe..8ea48d553 100644 --- a/common/data_source/zendesk_connector.py +++ b/common/data_source/zendesk_connector.py @@ -553,15 +553,11 @@ class ZendeskConnector( def retrieve_all_slim_docs_perm_sync( self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, callback: IndexingHeartbeatInterface | None = None, ) -> GenerateSlimDocumentOutput: slim_doc_batch: list[SlimDocument] = [] if self.content_type == "articles": - articles = _get_articles( - self.client, start_time=int(start) if start else None - ) + articles = _get_articles(self.client) for article in articles: slim_doc_batch.append( SlimDocument( @@ -572,9 +568,7 @@ class ZendeskConnector( yield slim_doc_batch slim_doc_batch = [] elif self.content_type == "tickets": - tickets = _get_tickets( - self.client, start_time=int(start) if start else None - ) + tickets = _get_tickets(self.client) for ticket in tickets: slim_doc_batch.append( SlimDocument( @@ -664,4 +658,4 @@ if __name__ == "__main__": checkpoint = next_checkpoint if any_doc: - break \ No newline at end of file + break diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index ac70a6843..e2201abe7 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -227,7 +227,15 @@ class SyncBase: prefix = self._get_source_prefix() prefix = f"{prefix} " if prefix else "" next_update_info = self._format_window_boundary(next_update) - if file_list is not None: + if file_list == []: + logging.warning( + "%s deleted-file sync skipped because the snapshot was empty " + "(connector_id=%s, kb_id=%s)", + self.SOURCE_NAME, + task["connector_id"], + task["kb_id"], + ) + elif file_list is not None: removed_docs, _ = ConnectorService.cleanup_stale_documents_for_task( task["id"], task["connector_id"], @@ -270,6 +278,7 @@ class _BlobLikeBase(SyncBase): self.connector.set_allow_images(self.conf.get("allow_images", False)) self.connector.load_credentials(self.conf["credentials"]) + file_list = None document_batch_generator = ( self.connector.load_from_state() if task["reindex"] == "1" or not task["poll_range_start"] @@ -279,6 +288,15 @@ class _BlobLikeBase(SyncBase): ) ) + if ( + task["reindex"] != "1" + and task["poll_range_start"] + and self.conf.get("sync_deleted_files") + ): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + _begin_info = ( "totally" if task["reindex"] == "1" or not task["poll_range_start"] @@ -293,6 +311,8 @@ class _BlobLikeBase(SyncBase): _begin_info, ) ) + if file_list is not None: + return document_batch_generator, file_list return document_batch_generator @@ -375,14 +395,17 @@ class Confluence(SyncBase): credential_json=self.conf["credentials"]) self.connector.set_credentials_provider(credentials_provider) + file_list = None # Determine the time range for synchronization based on reindex or poll_range_start if task["reindex"] == "1" or not task["poll_range_start"]: start_time = 0.0 - _begin_info = "totally" else: start_time = task["poll_range_start"].timestamp() - _begin_info = f"from {task['poll_range_start']}" - + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + end_time = datetime.now(timezone.utc).timestamp() raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE @@ -427,7 +450,7 @@ class Confluence(SyncBase): yield batch self.log_connection("Confluence", self.conf["wiki_base"], task) - return wrapper() + return wrapper(), file_list class Notion(SyncBase): @@ -436,6 +459,7 @@ class Notion(SyncBase): async def _generate(self, task: dict): self.connector = NotionConnector(root_page_id=self.conf["root_page_id"]) self.connector.load_credentials(self.conf["credentials"]) + file_list = None document_generator = ( self.connector.load_from_state() if task["reindex"] == "1" or not task["poll_range_start"] @@ -443,9 +467,20 @@ class Notion(SyncBase): datetime.now(timezone.utc).timestamp()) ) + if ( + task["reindex"] != "1" + and task["poll_range_start"] + and self.conf.get("sync_deleted_files") + ): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + _begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format( task["poll_range_start"]) self.log_connection("Notion", f"root({self.conf['root_page_id']})", task) + if file_list is not None: + return document_generator, file_list return document_generator @@ -680,12 +715,17 @@ class Jira(SyncBase): self.connector.load_credentials(credentials) self.connector.validate_connector_settings() + file_list = None if task["reindex"] == "1" or not task["poll_range_start"]: start_time = 0.0 _begin_info = "totally" else: start_time = task["poll_range_start"].timestamp() + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) _begin_info = f"from {task['poll_range_start']}" end_time = datetime.now(timezone.utc).timestamp() @@ -744,6 +784,8 @@ class Jira(SyncBase): f"overlap_buffer_s={getattr(self.connector, 'time_buffer_seconds', connector_kwargs.get('time_buffer_seconds'))}" ), ) + if file_list is not None: + return document_batches(), file_list return document_batches() @staticmethod @@ -858,17 +900,24 @@ class BOX(SyncBase): self.connector.load_credentials(auth) poll_start = task["poll_range_start"] + file_list = None if task["reindex"] == "1" or poll_start is None: document_generator = self.connector.load_from_state() _begin_info = "totally" else: + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) document_generator = self.connector.poll_source( poll_start.timestamp(), datetime.now(timezone.utc).timestamp(), ) _begin_info = f"from {poll_start}" self.log_connection("Box", f"folder_id({self.conf['folder_id']})", task) + if file_list is not None: + return document_generator, file_list return document_generator @@ -980,10 +1029,8 @@ class Github(SyncBase): file_list = None if task.get("reindex") == "1" or not task.get("poll_range_start"): start_time = datetime.fromtimestamp(0, tz=timezone.utc) - _begin_info = "totally" else: start_time = task.get("poll_range_start") - _begin_info = f"from {start_time}" if self.conf.get("sync_deleted_files"): file_list = [] for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): diff --git a/test/unit_test/rag/test_sync_data_source.py b/test/unit_test/rag/test_sync_data_source.py new file mode 100644 index 000000000..e76722ba1 --- /dev/null +++ b/test/unit_test/rag/test_sync_data_source.py @@ -0,0 +1,169 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib +import importlib.util +import os +import sys +import types +import warnings + +import pytest + +warnings.filterwarnings( + "ignore", + message="pkg_resources is deprecated as an API.*", + category=UserWarning, +) + + +def _install_cv2_stub_if_unavailable(): + try: + importlib.import_module("cv2") + return + except Exception: + pass + + stub = types.ModuleType("cv2") + stub.INTER_LINEAR = 1 + stub.INTER_CUBIC = 2 + stub.BORDER_CONSTANT = 0 + stub.BORDER_REPLICATE = 1 + + def _missing(*_args, **_kwargs): + raise RuntimeError("cv2 runtime call is unavailable in this test environment") + + def _module_getattr(name): + if name.isupper(): + return 0 + return _missing + + stub.__getattr__ = _module_getattr + sys.modules["cv2"] = stub + + +def _install_xgboost_stub_if_unavailable(): + if "xgboost" in sys.modules: + return + if importlib.util.find_spec("xgboost") is not None: + return + sys.modules["xgboost"] = types.ModuleType("xgboost") + + +def _install_ollama_stub(): + stub = types.ModuleType("ollama") + + class _DummyClient: + def __init__(self, *_args, **_kwargs): + pass + + stub.Client = _DummyClient + sys.modules["ollama"] = stub + + +for proxy_key in ("ALL_PROXY", "all_proxy", "HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy"): + os.environ.pop(proxy_key, None) + +_install_cv2_stub_if_unavailable() +_install_xgboost_stub_if_unavailable() +_install_ollama_stub() + +sync_data_source = importlib.import_module("rag.svr.sync_data_source") + + +class _FakeSync(sync_data_source.SyncBase): + SOURCE_NAME = "fake" + + def __init__(self, generate_output): + super().__init__({}) + self._generate_output = generate_output + + async def _generate(self, task: dict): + return self._generate_output + + +def _make_task(): + return { + "id": "task-1", + "connector_id": "connector-1", + "kb_id": "kb-1", + "tenant_id": "tenant-1", + "poll_range_start": None, + "auto_parse": False, + } + + +def _patch_common_dependencies(monkeypatch): + monkeypatch.setattr( + sync_data_source.DocumentService, + "list_doc_headers_by_kb_and_source_type", + lambda *_args, **_kwargs: [], + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "done", + lambda *_args, **_kwargs: None, + ) + + +@pytest.mark.anyio +@pytest.mark.p2 +async def test_run_task_logic_skips_cleanup_for_empty_snapshot(monkeypatch): + cleanup_calls = [] + + _patch_common_dependencies(monkeypatch) + monkeypatch.setattr( + sync_data_source.ConnectorService, + "cleanup_stale_documents_for_task", + lambda *_args, **_kwargs: cleanup_calls.append((_args, _kwargs)), + ) + + await _FakeSync((iter(()), []))._run_task_logic(_make_task()) + + assert cleanup_calls == [] + + +@pytest.mark.anyio +@pytest.mark.p2 +async def test_run_task_logic_cleans_up_for_non_empty_snapshot(monkeypatch): + cleanup_calls = [] + + _patch_common_dependencies(monkeypatch) + + def _fake_cleanup(*args, **kwargs): + cleanup_calls.append((args, kwargs)) + return 2, [] + + monkeypatch.setattr( + sync_data_source.ConnectorService, + "cleanup_stale_documents_for_task", + _fake_cleanup, + ) + + file_list = [types.SimpleNamespace(id="doc-1")] + await _FakeSync((iter(()), file_list))._run_task_logic(_make_task()) + + assert cleanup_calls == [ + ( + ( + "task-1", + "connector-1", + "kb-1", + "tenant-1", + file_list, + ), + {}, + ) + ] diff --git a/web/src/pages/user-setting/data-source/add-datasource-modal.tsx b/web/src/pages/user-setting/data-source/add-datasource-modal.tsx index 64824b8f9..16d4eff89 100644 --- a/web/src/pages/user-setting/data-source/add-datasource-modal.tsx +++ b/web/src/pages/user-setting/data-source/add-datasource-modal.tsx @@ -7,9 +7,8 @@ import { useTranslation } from 'react-i18next'; import { DataSourceFormBaseFields, DataSourceFormDefaultValues, - DataSourceFormFields, getCommonExtraDefaultValues, - getCommonExtraFields, + getDataSourceFieldsWithExtras, mergeDataSourceFormValues, } from './constant'; import { IDataSorceInfo } from './interface'; @@ -28,10 +27,7 @@ const AddDataSourceModal = ({ if (sourceData) { setFields([ ...DataSourceFormBaseFields, - ...DataSourceFormFields[ - sourceData.id as keyof typeof DataSourceFormFields - ], - ...getCommonExtraFields(sourceData.id), + ...getDataSourceFieldsWithExtras(sourceData.id as any), ] as FormFieldConfig[]); } }, [sourceData]); diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 80022cbc9..6bf0784ea 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -11,36 +11,38 @@ import GoogleDriveTokenField from '../component/google-drive-token-field'; import { IDataSourceInfoMap } from '../interface'; import { bitbucketConstant } from './bitbucket-constant'; import { confluenceConstant } from './confluence-constant'; +import { jiraConstant } from './jira-constant'; import { S3Constant } from './s3-constant'; import { seafileConstant } from './seafile-constant'; export enum DataSourceKey { - RSS = 'rss', CONFLUENCE = 'confluence', - S3 = 's3', NOTION = 'notion', - DISCORD = 'discord', GOOGLE_DRIVE = 'google_drive', - MOODLE = 'moodle', GMAIL = 'gmail', + GOOGLE_CLOUD_STORAGE = 'google_cloud_storage', + OCI_STORAGE = 'oci_storage', + S3 = 's3', + R2 = 'r2', JIRA = 'jira', - WEBDAV = 'webdav', BOX = 'box', DROPBOX = 'dropbox', - R2 = 'r2', - OCI_STORAGE = 'oci_storage', - GOOGLE_CLOUD_STORAGE = 'google_cloud_storage', - AIRTABLE = 'airtable', - DINGTALK_AI_TABLE = 'dingtalk_ai_table', + BITBUCKET = 'bitbucket', GITLAB = 'gitlab', + GITHUB = 'github', + MOODLE = 'moodle', + DISCORD = 'discord', + ZENDESK = 'zendesk', + WEBDAV = 'webdav', + AIRTABLE = 'airtable', ASANA = 'asana', IMAP = 'imap', - GITHUB = 'github', - BITBUCKET = 'bitbucket', - ZENDESK = 'zendesk', + DINGTALK_AI_TABLE = 'dingtalk_ai_table', SEAFILE = 'seafile', MYSQL = 'mysql', POSTGRESQL = 'postgresql', + RSS = 'rss', + // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -56,6 +58,30 @@ export const DataSourceFeatureVisibilityMap = { [DataSourceKey.GITHUB]: { syncDeletedFiles: true, }, + [DataSourceKey.CONFLUENCE]: { + syncDeletedFiles: true, + }, + [DataSourceKey.BOX]: { + syncDeletedFiles: true, + }, + [DataSourceKey.S3]: { + syncDeletedFiles: true, + }, + [DataSourceKey.R2]: { + syncDeletedFiles: true, + }, + [DataSourceKey.GOOGLE_CLOUD_STORAGE]: { + syncDeletedFiles: true, + }, + [DataSourceKey.OCI_STORAGE]: { + syncDeletedFiles: true, + }, + [DataSourceKey.NOTION]: { + syncDeletedFiles: true, + }, + [DataSourceKey.JIRA]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = ( @@ -294,6 +320,47 @@ export const getCommonExtraDefaultValues = () => ({ }, }); +export const getDataSourceFieldsWithExtras = ( + source?: DataSourceKey, +): FormFieldConfig[] => { + if (!source) { + return []; + } + + const sourceFields = + DataSourceFormFields[source as keyof typeof DataSourceFormFields] || []; + const extraFields = getCommonExtraFields(source); + + if (source !== DataSourceKey.JIRA) { + return [...sourceFields, ...extraFields]; + } + + const modeFieldIndex = sourceFields.findIndex( + (field) => field.name === 'config.is_cloud', + ); + if (modeFieldIndex < 0) { + return [...sourceFields, ...extraFields]; + } + + const sharedFields = sourceFields.slice(0, modeFieldIndex); + const modeFields = sourceFields.slice(modeFieldIndex); + + const sharedCheckboxFieldIndex = sharedFields.findIndex( + (field) => field.type === FormFieldType.Checkbox, + ); + + if (sharedCheckboxFieldIndex < 0) { + return [...sharedFields, ...extraFields, ...modeFields]; + } + + return [ + ...sharedFields.slice(0, sharedCheckboxFieldIndex), + ...sharedFields.slice(sharedCheckboxFieldIndex), + ...extraFields, + ...modeFields, + ]; +}; + export const DataSourceFormFields = { [DataSourceKey.RSS]: [ { @@ -569,106 +636,7 @@ export const DataSourceFormFields = { required: true, }, ], - [DataSourceKey.JIRA]: [ - { - label: 'Jira Base URL', - name: 'config.base_url', - type: FormFieldType.Text, - required: true, - placeholder: 'https://your-domain.atlassian.net', - tooltip: t('setting.jiraBaseUrlTip'), - }, - { - label: 'Project Key', - name: 'config.project_key', - type: FormFieldType.Text, - required: false, - placeholder: 'RAGFlow', - tooltip: t('setting.jiraProjectKeyTip'), - }, - { - label: 'Custom JQL', - name: 'config.jql_query', - type: FormFieldType.Textarea, - required: false, - placeholder: 'project = RAG AND updated >= -7d', - tooltip: t('setting.jiraJqlTip'), - }, - { - label: 'Batch Size', - name: 'config.batch_size', - type: FormFieldType.Number, - required: false, - tooltip: t('setting.jiraBatchSizeTip'), - }, - { - label: 'Include Comments', - name: 'config.include_comments', - type: FormFieldType.Checkbox, - required: false, - defaultValue: true, - tooltip: t('setting.jiraCommentsTip'), - }, - { - label: 'Include Attachments', - name: 'config.include_attachments', - type: FormFieldType.Checkbox, - required: false, - defaultValue: false, - tooltip: t('setting.jiraAttachmentsTip'), - }, - { - label: 'Attachment Size Limit (bytes)', - name: 'config.attachment_size_limit', - type: FormFieldType.Number, - required: false, - defaultValue: 10 * 1024 * 1024, - tooltip: t('setting.jiraAttachmentSizeTip'), - }, - { - label: 'Labels to Skip', - name: 'config.labels_to_skip', - type: FormFieldType.Tag, - required: false, - tooltip: t('setting.jiraLabelsTip'), - }, - { - label: 'Comment Email Blacklist', - name: 'config.comment_email_blacklist', - type: FormFieldType.Tag, - required: false, - tooltip: t('setting.jiraBlacklistTip'), - }, - { - label: 'Use Scoped Token (Clould only)', - name: 'config.scoped_token', - type: FormFieldType.Checkbox, - required: false, - tooltip: t('setting.jiraScopedTokenTip'), - }, - { - label: 'Jira User Email (Cloud) or User Name (Server)', - name: 'config.credentials.jira_user_email', - type: FormFieldType.Text, - required: true, - placeholder: 'you@example.com', - tooltip: t('setting.jiraEmailTip'), - }, - { - label: 'Jira API Token (Cloud only)', - name: 'config.credentials.jira_api_token', - type: FormFieldType.Password, - required: false, - tooltip: t('setting.jiraTokenTip'), - }, - { - label: 'Jira Password (Server only)', - name: 'config.credentials.jira_password', - type: FormFieldType.Password, - required: false, - tooltip: t('setting.jiraPasswordTip'), - }, - ], + [DataSourceKey.JIRA]: jiraConstant(t), [DataSourceKey.WEBDAV]: [ { label: 'WebDAV Server URL', @@ -1247,6 +1215,7 @@ export const DataSourceFormDefaultValues = { name: '', source: DataSourceKey.JIRA, config: { + is_cloud: true, base_url: '', project_key: '', jql_query: '', @@ -1259,6 +1228,7 @@ export const DataSourceFormDefaultValues = { scoped_token: false, credentials: { jira_user_email: '', + jira_username: '', jira_api_token: '', jira_password: '', }, diff --git a/web/src/pages/user-setting/data-source/constant/jira-constant.tsx b/web/src/pages/user-setting/data-source/constant/jira-constant.tsx new file mode 100644 index 000000000..31af61c47 --- /dev/null +++ b/web/src/pages/user-setting/data-source/constant/jira-constant.tsx @@ -0,0 +1,149 @@ +import { FormFieldType } from '@/components/dynamic-form'; +import { TFunction } from 'i18next'; + +export const jiraConstant = (t: TFunction) => [ + { + label: 'Jira User Email', + name: 'config.credentials.jira_user_email', + type: FormFieldType.Text, + required: true, + placeholder: 'you@example.com', + tooltip: t('setting.jiraEmailTip'), + shouldRender: (formValues: any) => formValues?.config?.is_cloud !== false, + customValidate: (val: string, formValues: any) => { + if (formValues?.config?.is_cloud !== false) { + return Boolean(val) || 'Jira User Email is required'; + } + return true; + }, + }, + { + label: 'Jira Username', + name: 'config.credentials.jira_username', + type: FormFieldType.Text, + required: true, + tooltip: t('setting.jiraEmailTip'), + shouldRender: (formValues: any) => formValues?.config?.is_cloud === false, + customValidate: (val: string, formValues: any) => { + if (formValues?.config?.is_cloud === false) { + return Boolean(val) || 'Jira Username is required'; + } + return true; + }, + }, + { + label: 'Jira Base URL', + name: 'config.base_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://your-domain.atlassian.net', + tooltip: t('setting.jiraBaseUrlTip'), + }, + { + label: 'Project Key', + name: 'config.project_key', + type: FormFieldType.Text, + required: false, + placeholder: 'RAGFlow', + tooltip: t('setting.jiraProjectKeyTip'), + }, + { + label: 'Custom JQL', + name: 'config.jql_query', + type: FormFieldType.Textarea, + required: false, + placeholder: 'project = RAG AND updated >= -7d', + tooltip: t('setting.jiraJqlTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + tooltip: t('setting.jiraBatchSizeTip'), + }, + { + label: 'Attachment Size Limit (bytes)', + name: 'config.attachment_size_limit', + type: FormFieldType.Number, + required: false, + defaultValue: 10 * 1024 * 1024, + tooltip: t('setting.jiraAttachmentSizeTip'), + }, + { + label: 'Labels to Skip', + name: 'config.labels_to_skip', + type: FormFieldType.Tag, + required: false, + tooltip: t('setting.jiraLabelsTip'), + }, + { + label: 'Comment Email Blacklist', + name: 'config.comment_email_blacklist', + type: FormFieldType.Tag, + required: false, + tooltip: t('setting.jiraBlacklistTip'), + }, + { + label: 'Include Comments', + name: 'config.include_comments', + type: FormFieldType.Checkbox, + required: false, + defaultValue: true, + tooltip: t('setting.jiraCommentsTip'), + }, + { + label: 'Include Attachments', + name: 'config.include_attachments', + type: FormFieldType.Checkbox, + required: false, + defaultValue: false, + tooltip: t('setting.jiraAttachmentsTip'), + }, + { + label: 'Mode', + name: 'config.is_cloud', + type: FormFieldType.Segmented, + options: [ + { label: 'Cloud', value: true }, + { label: 'Server', value: false }, + ], + defaultValue: true, + }, + { + label: 'Jira API Token', + name: 'config.credentials.jira_api_token', + type: FormFieldType.Password, + required: false, + tooltip: t('setting.jiraTokenTip'), + shouldRender: (formValues: any) => formValues?.config?.is_cloud !== false, + customValidate: (val: string, formValues: any) => { + if (formValues?.config?.is_cloud !== false) { + return Boolean(val) || 'Jira API Token is required'; + } + return true; + }, + }, + { + label: 'Jira Password', + name: 'config.credentials.jira_password', + type: FormFieldType.Password, + required: false, + tooltip: t('setting.jiraPasswordTip'), + shouldRender: (formValues: any) => formValues?.config?.is_cloud === false, + customValidate: (val: string, formValues: any) => { + if (formValues?.config?.is_cloud === false) { + return Boolean(val) || 'Jira Password is required'; + } + return true; + }, + }, + { + label: 'Use Scoped Token', + name: 'config.scoped_token', + type: FormFieldType.Checkbox, + required: false, + tooltip: t('setting.jiraScopedTokenTip'), + shouldRender: (formValues: any) => formValues?.config?.is_cloud !== false, + }, +]; diff --git a/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx b/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx index 64f44aff1..1a4554abe 100644 --- a/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx +++ b/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx @@ -17,9 +17,8 @@ import { FieldValues } from 'react-hook-form'; import { DataSourceFormBaseFields, DataSourceFormDefaultValues, - DataSourceFormFields, getCommonExtraDefaultValues, - getCommonExtraFields, + getDataSourceFieldsWithExtras, mergeDataSourceFormValues, useDataSourceInfo, } from '../constant'; @@ -166,10 +165,7 @@ const SourceDetailPage = () => { if (detail) { const fields = [ ...baseFields, - ...DataSourceFormFields[ - detail.source as keyof typeof DataSourceFormFields - ], - ...getCommonExtraFields(detail.source), + ...getDataSourceFieldsWithExtras(detail.source as any), ...customFields, ] as FormFieldConfig[];