diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index a3da5c1b49..9294b479e9 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -667,7 +667,6 @@ class MailConfig(BaseSettings): class RagEtlConfig(BaseSettings): """ Configuration for RAG ETL processes - """ # TODO: This config is not only for rag etl, it is also for file upload, we should move it to file upload config ETL_TYPE: str = Field( diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index 19ded1696a..d56e5243b3 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -55,7 +55,8 @@ class PipelineGenerator(BaseAppGenerator): streaming: Literal[True], call_depth: int, workflow_thread_pool_id: Optional[str], - ) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ... + ) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: + ... @overload def generate( @@ -69,7 +70,8 @@ class PipelineGenerator(BaseAppGenerator): streaming: Literal[False], call_depth: int, workflow_thread_pool_id: Optional[str], - ) -> Mapping[str, Any]: ... + ) -> Mapping[str, Any]: + ... @overload def generate( @@ -83,7 +85,8 @@ class PipelineGenerator(BaseAppGenerator): streaming: bool, call_depth: int, workflow_thread_pool_id: Optional[str], - ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ... + ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: + ... def generate( self, @@ -184,7 +187,7 @@ class PipelineGenerator(BaseAppGenerator): ) if invoke_from == InvokeFrom.DEBUGGER: return self._generate( - flask_app=current_app._get_current_object(),# type: ignore + flask_app=current_app._get_current_object(), # type: ignore context=contextvars.copy_context(), pipeline=pipeline, workflow_id=workflow.id, @@ -199,6 +202,7 @@ class PipelineGenerator(BaseAppGenerator): else: # run in child thread context = contextvars.copy_context() + @copy_current_request_context def worker_with_context(): # Run the worker within the copied context @@ -222,24 +226,25 @@ class PipelineGenerator(BaseAppGenerator): worker_thread.start() # return batch, dataset, documents return { - "batch": batch, - "dataset": PipelineDataset( - id=dataset.id, - name=dataset.name, - description=dataset.description, - chunk_structure=dataset.chunk_structure, - ).model_dump(), - "documents": [PipelineDocument( - id=document.id, - position=document.position, - data_source_info=json.loads(document.data_source_info) if document.data_source_info else None, - name=document.name, - indexing_status=document.indexing_status, - error=document.error, - enabled=document.enabled, - ).model_dump() for document in documents - ] - } + "batch": batch, + "dataset": PipelineDataset( + id=dataset.id, + name=dataset.name, + description=dataset.description, + chunk_structure=dataset.chunk_structure, + ).model_dump(), + "documents": [PipelineDocument( + id=document.id, + position=document.position, + data_source_info=json.loads(document.data_source_info) if document.data_source_info else None, + name=document.name, + indexing_status=document.indexing_status, + error=document.error, + enabled=document.enabled, + ).model_dump() for document in documents + ] + } + def _generate( self, *, @@ -268,6 +273,7 @@ class PipelineGenerator(BaseAppGenerator): :param streaming: is stream :param workflow_thread_pool_id: workflow thread pool id """ + print("jin ru la 1") for var, val in context.items(): var.set(val) @@ -279,6 +285,7 @@ class PipelineGenerator(BaseAppGenerator): saved_user = g._login_user with flask_app.app_context(): # Restore user in new app context + print("jin ru la 2") if saved_user is not None: from flask import g @@ -306,6 +313,7 @@ class PipelineGenerator(BaseAppGenerator): application_generate_entity=application_generate_entity, workflow_thread_pool_id=workflow_thread_pool_id, ) + # new thread worker_thread = threading.Thread( target=worker_with_context @@ -396,7 +404,7 @@ class PipelineGenerator(BaseAppGenerator): ) return self._generate( - flask_app=current_app._get_current_object(),# type: ignore + flask_app=current_app._get_current_object(), # type: ignore pipeline=pipeline, workflow=workflow, user=user, @@ -479,7 +487,7 @@ class PipelineGenerator(BaseAppGenerator): ) return self._generate( - flask_app=current_app._get_current_object(),# type: ignore + flask_app=current_app._get_current_object(), # type: ignore pipeline=pipeline, workflow=workflow, user=user, @@ -506,6 +514,7 @@ class PipelineGenerator(BaseAppGenerator): :param workflow_thread_pool_id: workflow thread pool id :return: """ + print("jin ru la 3") for var, val in context.items(): var.set(val) from flask import g