Merge remote-tracking branch 'origin/feat/r2' into feat/r2

# Conflicts:
#	api/core/datasource/website_crawl/website_crawl_plugin.py
#	api/services/rag_pipeline/rag_pipeline.py
This commit is contained in:
jyong
2025-06-16 13:50:33 +08:00
9 changed files with 95 additions and 47 deletions

View File

@ -17,12 +17,11 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.datasource.entities.datasource_entities import (
DatasourceInvokeMessage,
DatasourceProviderType,
GetOnlineDocumentPagesResponse,
GetWebsiteCrawlResponse,
OnlineDocumentPagesMessage,
WebsiteCrawlMessage,
)
from core.datasource.online_document.online_document_plugin import OnlineDocumentDatasourcePlugin
from core.datasource.website_crawl.website_crawl_plugin import WebsiteCrawlDatasourcePlugin
from core.model_runtime.utils.encoders import jsonable_encoder
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult
@ -44,14 +43,14 @@ from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
from models.dataset import Document, Pipeline, PipelineCustomizedTemplate # type: ignore
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from models.model import EndUser
from models.oauth import DatasourceProvider
from models.workflow import (
Workflow,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowType, WorkflowNodeExecutionModel,
WorkflowType,
)
from services.dataset_service import DatasetService
from services.datasource_provider_service import DatasourceProviderService
@ -424,6 +423,65 @@ class RagPipelineService:
return workflow_node_execution
def run_datasource_workflow_node_status(
self, pipeline: Pipeline, node_id: str, job_id: str, account: Account, datasource_type: str, is_published: bool
) -> dict:
"""
Run published workflow datasource
"""
if is_published:
# fetch published workflow by app_model
workflow = self.get_published_workflow(pipeline=pipeline)
else:
workflow = self.get_draft_workflow(pipeline=pipeline)
if not workflow:
raise ValueError("Workflow not initialized")
# run draft workflow node
datasource_node_data = None
start_at = time.perf_counter()
datasource_nodes = workflow.graph_dict.get("nodes", [])
for datasource_node in datasource_nodes:
if datasource_node.get("id") == node_id:
datasource_node_data = datasource_node.get("data", {})
break
if not datasource_node_data:
raise ValueError("Datasource node data not found")
from core.datasource.datasource_manager import DatasourceManager
datasource_runtime = DatasourceManager.get_datasource_runtime(
provider_id=f"{datasource_node_data.get('plugin_id')}/{datasource_node_data.get('provider_name')}",
datasource_name=datasource_node_data.get("datasource_name"),
tenant_id=pipeline.tenant_id,
datasource_type=DatasourceProviderType(datasource_type),
)
datasource_provider_service = DatasourceProviderService()
credentials = datasource_provider_service.get_real_datasource_credentials(
tenant_id=pipeline.tenant_id,
provider=datasource_node_data.get('provider_name'),
plugin_id=datasource_node_data.get('plugin_id'),
)
if credentials:
datasource_runtime.runtime.credentials = credentials[0].get("credentials")
match datasource_type:
case DatasourceProviderType.WEBSITE_CRAWL:
datasource_runtime = cast(WebsiteCrawlDatasourcePlugin, datasource_runtime)
website_crawl_results: list[WebsiteCrawlMessage] = []
for website_message in datasource_runtime.get_website_crawl(
user_id=account.id,
datasource_parameters={"job_id": job_id},
provider_type=datasource_runtime.datasource_provider_type(),
):
website_crawl_results.append(website_message)
return {
"result": [result for result in website_crawl_results.result],
"status": website_crawl_results.result.status,
"provider_type": datasource_node_data.get("provider_type"),
}
case _:
raise ValueError(f"Unsupported datasource provider: {datasource_runtime.datasource_provider_type}")
def run_datasource_workflow_node(
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account, datasource_type: str,