mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
Compare commits
10 Commits
revert-120
...
feat/retry
| Author | SHA1 | Date | |
|---|---|---|---|
| 81375088e9 | |||
| cfade297e8 | |||
| 8933dd85bf | |||
| 84ac004772 | |||
| bb35818976 | |||
| 822af70dce | |||
| d5f33212ac | |||
| db2aa83a7c | |||
| ae5e8d3160 | |||
| fc6c0317a5 |
@ -587,7 +587,7 @@ def upgrade_db():
|
||||
click.echo(click.style("Starting database migration.", fg="green"))
|
||||
|
||||
# run db migration
|
||||
import flask_migrate
|
||||
import flask_migrate # type: ignore
|
||||
|
||||
flask_migrate.upgrade()
|
||||
|
||||
|
||||
@ -440,6 +440,31 @@ class WorkflowConfigApi(Resource):
|
||||
}
|
||||
|
||||
|
||||
class DraftWorkflowNodeRetriableApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
@marshal_with(workflow_run_node_execution_fields)
|
||||
def post(self, app_model: App, node_id: str):
|
||||
"""
|
||||
Run draft workflow node
|
||||
"""
|
||||
# The role of the current user in the ta table must be admin, owner, or editor
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
|
||||
args = parser.parse_args()
|
||||
workflow_service = WorkflowService()
|
||||
workflow_node_execution = workflow_service.run_retriable_draft_workflow_node(
|
||||
app_model=app_model, node_id=node_id, user_inputs=args.get("inputs", {}), account=current_user
|
||||
)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
|
||||
api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
|
||||
api.add_resource(WorkflowConfigApi, "/apps/<uuid:app_id>/workflows/draft/config")
|
||||
api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run")
|
||||
@ -459,3 +484,4 @@ api.add_resource(
|
||||
DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>"
|
||||
)
|
||||
api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow")
|
||||
api.add_resource(DraftWorkflowNodeRetriableApi, "/apps/<uuid:app_id>/workflows/draft/retry/nodes/<string:node_id>/run")
|
||||
|
||||
@ -413,7 +413,7 @@ class DocumentIndexingEstimateApi(DocumentResource):
|
||||
indexing_runner = IndexingRunner()
|
||||
|
||||
try:
|
||||
response = indexing_runner.indexing_estimate(
|
||||
estimate_response = indexing_runner.indexing_estimate(
|
||||
current_user.current_tenant_id,
|
||||
[extract_setting],
|
||||
data_process_rule_dict,
|
||||
@ -421,6 +421,7 @@ class DocumentIndexingEstimateApi(DocumentResource):
|
||||
"English",
|
||||
dataset_id,
|
||||
)
|
||||
return estimate_response.model_dump(), 200
|
||||
except LLMBadRequestError:
|
||||
raise ProviderNotInitializeError(
|
||||
"No Embedding Model available. Please configure a valid provider "
|
||||
@ -431,7 +432,7 @@ class DocumentIndexingEstimateApi(DocumentResource):
|
||||
except Exception as e:
|
||||
raise IndexingEstimateError(str(e))
|
||||
|
||||
return response.model_dump(), 200
|
||||
return response, 200
|
||||
|
||||
|
||||
class DocumentBatchIndexingEstimateApi(DocumentResource):
|
||||
@ -442,9 +443,8 @@ class DocumentBatchIndexingEstimateApi(DocumentResource):
|
||||
dataset_id = str(dataset_id)
|
||||
batch = str(batch)
|
||||
documents = self.get_batch_documents(dataset_id, batch)
|
||||
response = {"tokens": 0, "total_price": 0, "currency": "USD", "total_segments": 0, "preview": []}
|
||||
if not documents:
|
||||
return response, 200
|
||||
return {"tokens": 0, "total_price": 0, "currency": "USD", "total_segments": 0, "preview": []}, 200
|
||||
data_process_rule = documents[0].dataset_process_rule
|
||||
data_process_rule_dict = data_process_rule.to_dict()
|
||||
info_list = []
|
||||
@ -522,6 +522,7 @@ class DocumentBatchIndexingEstimateApi(DocumentResource):
|
||||
"English",
|
||||
dataset_id,
|
||||
)
|
||||
return response.model_dump(), 200
|
||||
except LLMBadRequestError:
|
||||
raise ProviderNotInitializeError(
|
||||
"No Embedding Model available. Please configure a valid provider "
|
||||
@ -531,7 +532,6 @@ class DocumentBatchIndexingEstimateApi(DocumentResource):
|
||||
raise ProviderNotInitializeError(ex.description)
|
||||
except Exception as e:
|
||||
raise IndexingEstimateError(str(e))
|
||||
return response.model_dump(), 200
|
||||
|
||||
|
||||
class DocumentBatchIndexingStatusApi(DocumentResource):
|
||||
|
||||
@ -22,6 +22,7 @@ from fields.document_fields import document_fields, document_status_fields
|
||||
from libs.login import current_user
|
||||
from models.dataset import Dataset, Document, DocumentSegment
|
||||
from services.dataset_service import DocumentService
|
||||
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig
|
||||
from services.file_service import FileService
|
||||
|
||||
|
||||
@ -67,13 +68,14 @@ class DocumentAddByTextApi(DatasetApiResource):
|
||||
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
|
||||
}
|
||||
args["data_source"] = data_source
|
||||
knowledge_config = KnowledgeConfig(**args)
|
||||
# validate args
|
||||
DocumentService.document_create_args_validate(args)
|
||||
DocumentService.document_create_args_validate(knowledge_config)
|
||||
|
||||
try:
|
||||
documents, batch = DocumentService.save_document_with_dataset_id(
|
||||
dataset=dataset,
|
||||
document_data=args,
|
||||
knowledge_config=knowledge_config,
|
||||
account=current_user,
|
||||
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
|
||||
created_from="api",
|
||||
@ -122,12 +124,13 @@ class DocumentUpdateByTextApi(DatasetApiResource):
|
||||
args["data_source"] = data_source
|
||||
# validate args
|
||||
args["original_document_id"] = str(document_id)
|
||||
DocumentService.document_create_args_validate(args)
|
||||
knowledge_config = KnowledgeConfig(**args)
|
||||
DocumentService.document_create_args_validate(knowledge_config)
|
||||
|
||||
try:
|
||||
documents, batch = DocumentService.save_document_with_dataset_id(
|
||||
dataset=dataset,
|
||||
document_data=args,
|
||||
knowledge_config=knowledge_config,
|
||||
account=current_user,
|
||||
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
|
||||
created_from="api",
|
||||
@ -186,12 +189,13 @@ class DocumentAddByFileApi(DatasetApiResource):
|
||||
data_source = {"type": "upload_file", "info_list": {"file_info_list": {"file_ids": [upload_file.id]}}}
|
||||
args["data_source"] = data_source
|
||||
# validate args
|
||||
DocumentService.document_create_args_validate(args)
|
||||
knowledge_config = KnowledgeConfig(**args)
|
||||
DocumentService.document_create_args_validate(knowledge_config)
|
||||
|
||||
try:
|
||||
documents, batch = DocumentService.save_document_with_dataset_id(
|
||||
dataset=dataset,
|
||||
document_data=args,
|
||||
knowledge_config=knowledge_config,
|
||||
account=dataset.created_by_account,
|
||||
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
|
||||
created_from="api",
|
||||
@ -245,12 +249,14 @@ class DocumentUpdateByFileApi(DatasetApiResource):
|
||||
args["data_source"] = data_source
|
||||
# validate args
|
||||
args["original_document_id"] = str(document_id)
|
||||
DocumentService.document_create_args_validate(args)
|
||||
|
||||
knowledge_config = KnowledgeConfig(**args)
|
||||
DocumentService.document_create_args_validate(knowledge_config)
|
||||
|
||||
try:
|
||||
documents, batch = DocumentService.save_document_with_dataset_id(
|
||||
dataset=dataset,
|
||||
document_data=args,
|
||||
knowledge_config=knowledge_config,
|
||||
account=dataset.created_by_account,
|
||||
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
|
||||
created_from="api",
|
||||
|
||||
@ -68,7 +68,6 @@ from models.enums import CreatedByRole
|
||||
from models.workflow import (
|
||||
Workflow,
|
||||
WorkflowNodeExecution,
|
||||
WorkflowRun,
|
||||
WorkflowRunStatus,
|
||||
)
|
||||
|
||||
@ -104,10 +103,12 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
)
|
||||
|
||||
if isinstance(user, EndUser):
|
||||
self._user_id = user.session_id
|
||||
self._user_id = user.id
|
||||
user_session_id = user.session_id
|
||||
self._created_by_role = CreatedByRole.END_USER
|
||||
elif isinstance(user, Account):
|
||||
self._user_id = user.id
|
||||
user_session_id = user.id
|
||||
self._created_by_role = CreatedByRole.ACCOUNT
|
||||
else:
|
||||
raise NotImplementedError(f"User type not supported: {type(user)}")
|
||||
@ -125,7 +126,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
SystemVariableKey.QUERY: message.query,
|
||||
SystemVariableKey.FILES: application_generate_entity.files,
|
||||
SystemVariableKey.CONVERSATION_ID: conversation.id,
|
||||
SystemVariableKey.USER_ID: self._user_id,
|
||||
SystemVariableKey.USER_ID: user_session_id,
|
||||
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
|
||||
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
|
||||
SystemVariableKey.WORKFLOW_ID: workflow.id,
|
||||
@ -137,6 +138,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
|
||||
self._conversation_name_generate_thread = None
|
||||
self._recorded_files: list[Mapping[str, Any]] = []
|
||||
self._workflow_run_id = ""
|
||||
|
||||
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
|
||||
"""
|
||||
@ -266,7 +268,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
"""
|
||||
# init fake graph runtime state
|
||||
graph_runtime_state: Optional[GraphRuntimeState] = None
|
||||
workflow_run: Optional[WorkflowRun] = None
|
||||
|
||||
for queue_message in self._queue_manager.listen():
|
||||
event = queue_message.event
|
||||
@ -291,111 +292,163 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
user_id=self._user_id,
|
||||
created_by_role=self._created_by_role,
|
||||
)
|
||||
self._workflow_run_id = workflow_run.id
|
||||
message = self._get_message(session=session)
|
||||
if not message:
|
||||
raise ValueError(f"Message not found: {self._message_id}")
|
||||
message.workflow_run_id = workflow_run.id
|
||||
session.commit()
|
||||
|
||||
workflow_start_resp = self._workflow_start_to_stream_response(
|
||||
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
||||
)
|
||||
session.commit()
|
||||
|
||||
yield workflow_start_resp
|
||||
elif isinstance(
|
||||
event,
|
||||
QueueNodeRetryEvent,
|
||||
):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
||||
workflow_run=workflow_run, event=event
|
||||
)
|
||||
|
||||
node_retry_resp = self._workflow_node_retry_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
||||
session=session, workflow_run=workflow_run, event=event
|
||||
)
|
||||
node_retry_resp = self._workflow_node_retry_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if node_retry_resp:
|
||||
yield node_retry_resp
|
||||
elif isinstance(event, QueueNodeStartedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
workflow_node_execution = self._handle_node_execution_start(
|
||||
session=session, workflow_run=workflow_run, event=event
|
||||
)
|
||||
|
||||
node_start_resp = self._workflow_node_start_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
node_start_resp = self._workflow_node_start_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if node_start_resp:
|
||||
yield node_start_resp
|
||||
elif isinstance(event, QueueNodeSucceededEvent):
|
||||
workflow_node_execution = self._handle_workflow_node_execution_success(event)
|
||||
|
||||
# Record files if it's an answer node or end node
|
||||
if event.node_type in [NodeType.ANSWER, NodeType.END]:
|
||||
self._recorded_files.extend(self._fetch_files_from_node_outputs(event.outputs or {}))
|
||||
|
||||
node_finish_resp = self._workflow_node_finish_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)
|
||||
|
||||
node_finish_resp = self._workflow_node_finish_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if node_finish_resp:
|
||||
yield node_finish_resp
|
||||
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
|
||||
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
|
||||
with Session(db.engine) as session:
|
||||
workflow_node_execution = self._handle_workflow_node_execution_failed(session=session, event=event)
|
||||
|
||||
node_finish_resp = self._workflow_node_finish_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
node_finish_resp = self._workflow_node_finish_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
if node_finish_resp:
|
||||
yield node_finish_resp
|
||||
|
||||
elif isinstance(event, QueueParallelBranchRunStartedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_parallel_branch_start_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield parallel_start_resp
|
||||
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_parallel_branch_finished_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield parallel_finish_resp
|
||||
elif isinstance(event, QueueIterationStartEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_start_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_start_resp = self._workflow_iteration_start_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_start_resp
|
||||
elif isinstance(event, QueueIterationNextEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_next_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_next_resp = self._workflow_iteration_next_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_next_resp
|
||||
elif isinstance(event, QueueIterationCompletedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_completed_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_finish_resp
|
||||
elif isinstance(event, QueueWorkflowSucceededEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
@ -404,7 +457,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_success(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -421,16 +474,15 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
yield workflow_finish_resp
|
||||
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
|
||||
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
raise ValueError("graph runtime state not initialized.")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_partial_success(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -439,7 +491,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
conversation_id=None,
|
||||
trace_manager=trace_manager,
|
||||
)
|
||||
|
||||
workflow_finish_resp = self._workflow_finish_to_stream_response(
|
||||
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
||||
)
|
||||
@ -448,16 +499,15 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
yield workflow_finish_resp
|
||||
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
|
||||
elif isinstance(event, QueueWorkflowFailedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
raise ValueError("graph runtime state not initialized.")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_failed(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -473,15 +523,16 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_run.error}"))
|
||||
err = self._handle_error(event=err_event, session=session, message_id=self._message_id)
|
||||
session.commit()
|
||||
|
||||
yield workflow_finish_resp
|
||||
yield self._error_to_stream_response(err)
|
||||
break
|
||||
elif isinstance(event, QueueStopEvent):
|
||||
if workflow_run and graph_runtime_state:
|
||||
if self._workflow_run_id and graph_runtime_state:
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_failed(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -490,7 +541,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
conversation_id=self._conversation_id,
|
||||
trace_manager=trace_manager,
|
||||
)
|
||||
|
||||
workflow_finish_resp = self._workflow_finish_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
@ -499,6 +549,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
||||
# Save message
|
||||
self._save_message(session=session, graph_runtime_state=graph_runtime_state)
|
||||
session.commit()
|
||||
|
||||
yield workflow_finish_resp
|
||||
|
||||
yield self._message_end_to_stream_response()
|
||||
|
||||
@ -91,10 +91,12 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
)
|
||||
|
||||
if isinstance(user, EndUser):
|
||||
self._user_id = user.session_id
|
||||
self._user_id = user.id
|
||||
user_session_id = user.session_id
|
||||
self._created_by_role = CreatedByRole.END_USER
|
||||
elif isinstance(user, Account):
|
||||
self._user_id = user.id
|
||||
user_session_id = user.id
|
||||
self._created_by_role = CreatedByRole.ACCOUNT
|
||||
else:
|
||||
raise ValueError(f"Invalid user type: {type(user)}")
|
||||
@ -104,7 +106,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
|
||||
self._workflow_system_variables = {
|
||||
SystemVariableKey.FILES: application_generate_entity.files,
|
||||
SystemVariableKey.USER_ID: self._user_id,
|
||||
SystemVariableKey.USER_ID: user_session_id,
|
||||
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
|
||||
SystemVariableKey.WORKFLOW_ID: workflow.id,
|
||||
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
|
||||
@ -112,6 +114,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
|
||||
self._task_state = WorkflowTaskState()
|
||||
self._wip_workflow_node_executions = {}
|
||||
self._workflow_run_id = ""
|
||||
|
||||
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
|
||||
"""
|
||||
@ -233,7 +236,6 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
:return:
|
||||
"""
|
||||
graph_runtime_state = None
|
||||
workflow_run = None
|
||||
|
||||
for queue_message in self._queue_manager.listen():
|
||||
event = queue_message.event
|
||||
@ -256,111 +258,168 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
user_id=self._user_id,
|
||||
created_by_role=self._created_by_role,
|
||||
)
|
||||
self._workflow_run_id = workflow_run.id
|
||||
start_resp = self._workflow_start_to_stream_response(
|
||||
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
||||
)
|
||||
session.commit()
|
||||
|
||||
yield start_resp
|
||||
elif isinstance(
|
||||
event,
|
||||
QueueNodeRetryEvent,
|
||||
):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
||||
workflow_run=workflow_run, event=event
|
||||
)
|
||||
|
||||
response = self._workflow_node_retry_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
workflow_node_execution = self._handle_workflow_node_execution_retried(
|
||||
session=session, workflow_run=workflow_run, event=event
|
||||
)
|
||||
response = self._workflow_node_retry_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if response:
|
||||
yield response
|
||||
elif isinstance(event, QueueNodeStartedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
|
||||
|
||||
node_start_response = self._workflow_node_start_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
workflow_node_execution = self._handle_node_execution_start(
|
||||
session=session, workflow_run=workflow_run, event=event
|
||||
)
|
||||
node_start_response = self._workflow_node_start_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if node_start_response:
|
||||
yield node_start_response
|
||||
elif isinstance(event, QueueNodeSucceededEvent):
|
||||
workflow_node_execution = self._handle_workflow_node_execution_success(event)
|
||||
|
||||
node_success_response = self._workflow_node_finish_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)
|
||||
node_success_response = self._workflow_node_finish_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if node_success_response:
|
||||
yield node_success_response
|
||||
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
|
||||
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
|
||||
with Session(db.engine) as session:
|
||||
workflow_node_execution = self._handle_workflow_node_execution_failed(
|
||||
session=session,
|
||||
event=event,
|
||||
)
|
||||
node_failed_response = self._workflow_node_finish_to_stream_response(
|
||||
session=session,
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
node_failed_response = self._workflow_node_finish_to_stream_response(
|
||||
event=event,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
)
|
||||
if node_failed_response:
|
||||
yield node_failed_response
|
||||
|
||||
elif isinstance(event, QueueParallelBranchRunStartedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_parallel_branch_start_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield parallel_start_resp
|
||||
|
||||
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_parallel_branch_finished_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield parallel_finish_resp
|
||||
|
||||
elif isinstance(event, QueueIterationStartEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_start_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_start_resp = self._workflow_iteration_start_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_start_resp
|
||||
|
||||
elif isinstance(event, QueueIterationNextEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_next_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_next_resp = self._workflow_iteration_next_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_next_resp
|
||||
|
||||
elif isinstance(event, QueueIterationCompletedEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
yield self._workflow_iteration_completed_to_stream_response(
|
||||
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
|
||||
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
|
||||
session=session,
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
workflow_run=workflow_run,
|
||||
event=event,
|
||||
)
|
||||
|
||||
yield iter_finish_resp
|
||||
|
||||
elif isinstance(event, QueueWorkflowSucceededEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
raise ValueError("graph runtime state not initialized.")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_success(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -378,18 +437,18 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
workflow_run=workflow_run,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
yield workflow_finish_resp
|
||||
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
raise ValueError("graph runtime state not initialized.")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_partial_success(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -409,15 +468,15 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
|
||||
yield workflow_finish_resp
|
||||
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
|
||||
if not workflow_run:
|
||||
if not self._workflow_run_id:
|
||||
raise ValueError("workflow run not initialized.")
|
||||
|
||||
if not graph_runtime_state:
|
||||
raise ValueError("graph runtime state not initialized.")
|
||||
|
||||
with Session(db.engine) as session:
|
||||
workflow_run = self._handle_workflow_run_failed(
|
||||
session=session,
|
||||
workflow_run=workflow_run,
|
||||
workflow_run_id=self._workflow_run_id,
|
||||
start_at=graph_runtime_state.start_at,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
total_steps=graph_runtime_state.node_run_steps,
|
||||
@ -437,6 +496,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
|
||||
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
||||
)
|
||||
session.commit()
|
||||
|
||||
yield workflow_finish_resp
|
||||
elif isinstance(event, QueueTextChunkEvent):
|
||||
delta_text = event.text
|
||||
|
||||
@ -46,7 +46,6 @@ from core.workflow.enums import SystemVariableKey
|
||||
from core.workflow.nodes import NodeType
|
||||
from core.workflow.nodes.tool.entities import ToolNodeData
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
from models.enums import CreatedByRole, WorkflowRunTriggeredFrom
|
||||
from models.model import EndUser
|
||||
@ -66,7 +65,6 @@ class WorkflowCycleManage:
|
||||
_application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity]
|
||||
_task_state: WorkflowTaskState
|
||||
_workflow_system_variables: dict[SystemVariableKey, Any]
|
||||
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
|
||||
|
||||
def _handle_workflow_run_start(
|
||||
self,
|
||||
@ -130,7 +128,7 @@ class WorkflowCycleManage:
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
workflow_run: WorkflowRun,
|
||||
workflow_run_id: str,
|
||||
start_at: float,
|
||||
total_tokens: int,
|
||||
total_steps: int,
|
||||
@ -148,7 +146,7 @@ class WorkflowCycleManage:
|
||||
:param conversation_id: conversation id
|
||||
:return:
|
||||
"""
|
||||
workflow_run = self._refetch_workflow_run(session=session, workflow_run_id=workflow_run.id)
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=workflow_run_id)
|
||||
|
||||
outputs = WorkflowEntry.handle_special_values(outputs)
|
||||
|
||||
@ -175,7 +173,7 @@ class WorkflowCycleManage:
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
workflow_run: WorkflowRun,
|
||||
workflow_run_id: str,
|
||||
start_at: float,
|
||||
total_tokens: int,
|
||||
total_steps: int,
|
||||
@ -184,18 +182,7 @@ class WorkflowCycleManage:
|
||||
conversation_id: Optional[str] = None,
|
||||
trace_manager: Optional[TraceQueueManager] = None,
|
||||
) -> WorkflowRun:
|
||||
"""
|
||||
Workflow run success
|
||||
:param workflow_run: workflow run
|
||||
:param start_at: start time
|
||||
:param total_tokens: total tokens
|
||||
:param total_steps: total steps
|
||||
:param outputs: outputs
|
||||
:param conversation_id: conversation id
|
||||
:return:
|
||||
"""
|
||||
workflow_run = self._refetch_workflow_run(session=session, workflow_run_id=workflow_run.id)
|
||||
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=workflow_run_id)
|
||||
outputs = WorkflowEntry.handle_special_values(dict(outputs) if outputs else None)
|
||||
|
||||
workflow_run.status = WorkflowRunStatus.PARTIAL_SUCCESSED.value
|
||||
@ -222,7 +209,7 @@ class WorkflowCycleManage:
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
workflow_run: WorkflowRun,
|
||||
workflow_run_id: str,
|
||||
start_at: float,
|
||||
total_tokens: int,
|
||||
total_steps: int,
|
||||
@ -242,7 +229,7 @@ class WorkflowCycleManage:
|
||||
:param error: error message
|
||||
:return:
|
||||
"""
|
||||
workflow_run = self._refetch_workflow_run(session=session, workflow_run_id=workflow_run.id)
|
||||
workflow_run = self._get_workflow_run(session=session, workflow_run_id=workflow_run_id)
|
||||
|
||||
workflow_run.status = status.value
|
||||
workflow_run.error = error
|
||||
@ -284,49 +271,41 @@ class WorkflowCycleManage:
|
||||
return workflow_run
|
||||
|
||||
def _handle_node_execution_start(
|
||||
self, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
|
||||
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
|
||||
) -> WorkflowNodeExecution:
|
||||
# init workflow node execution
|
||||
workflow_node_execution = WorkflowNodeExecution()
|
||||
workflow_node_execution.id = event.node_execution_id
|
||||
workflow_node_execution.tenant_id = workflow_run.tenant_id
|
||||
workflow_node_execution.app_id = workflow_run.app_id
|
||||
workflow_node_execution.workflow_id = workflow_run.workflow_id
|
||||
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
|
||||
workflow_node_execution.workflow_run_id = workflow_run.id
|
||||
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
|
||||
workflow_node_execution.index = event.node_run_index
|
||||
workflow_node_execution.node_execution_id = event.node_execution_id
|
||||
workflow_node_execution.node_id = event.node_id
|
||||
workflow_node_execution.node_type = event.node_type.value
|
||||
workflow_node_execution.title = event.node_data.title
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.RUNNING.value
|
||||
workflow_node_execution.created_by_role = workflow_run.created_by_role
|
||||
workflow_node_execution.created_by = workflow_run.created_by
|
||||
workflow_node_execution.execution_metadata = json.dumps(
|
||||
{
|
||||
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
|
||||
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
|
||||
}
|
||||
)
|
||||
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
workflow_node_execution = WorkflowNodeExecution()
|
||||
workflow_node_execution.tenant_id = workflow_run.tenant_id
|
||||
workflow_node_execution.app_id = workflow_run.app_id
|
||||
workflow_node_execution.workflow_id = workflow_run.workflow_id
|
||||
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
|
||||
workflow_node_execution.workflow_run_id = workflow_run.id
|
||||
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
|
||||
workflow_node_execution.index = event.node_run_index
|
||||
workflow_node_execution.node_execution_id = event.node_execution_id
|
||||
workflow_node_execution.node_id = event.node_id
|
||||
workflow_node_execution.node_type = event.node_type.value
|
||||
workflow_node_execution.title = event.node_data.title
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.RUNNING.value
|
||||
workflow_node_execution.created_by_role = workflow_run.created_by_role
|
||||
workflow_node_execution.created_by = workflow_run.created_by
|
||||
workflow_node_execution.execution_metadata = json.dumps(
|
||||
{
|
||||
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
|
||||
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
|
||||
}
|
||||
)
|
||||
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
|
||||
session.add(workflow_node_execution)
|
||||
session.commit()
|
||||
session.refresh(workflow_node_execution)
|
||||
|
||||
self._wip_workflow_node_executions[workflow_node_execution.node_execution_id] = workflow_node_execution
|
||||
session.add(workflow_node_execution)
|
||||
return workflow_node_execution
|
||||
|
||||
def _handle_workflow_node_execution_success(self, event: QueueNodeSucceededEvent) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Workflow node execution success
|
||||
:param event: queue node succeeded event
|
||||
:return:
|
||||
"""
|
||||
workflow_node_execution = self._refetch_workflow_node_execution(event.node_execution_id)
|
||||
|
||||
def _handle_workflow_node_execution_success(
|
||||
self, *, session: Session, event: QueueNodeSucceededEvent
|
||||
) -> WorkflowNodeExecution:
|
||||
workflow_node_execution = self._get_workflow_node_execution(
|
||||
session=session, node_execution_id=event.node_execution_id
|
||||
)
|
||||
inputs = WorkflowEntry.handle_special_values(event.inputs)
|
||||
process_data = WorkflowEntry.handle_special_values(event.process_data)
|
||||
outputs = WorkflowEntry.handle_special_values(event.outputs)
|
||||
@ -336,20 +315,6 @@ class WorkflowCycleManage:
|
||||
finished_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
elapsed_time = (finished_at - event.start_at).total_seconds()
|
||||
|
||||
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update(
|
||||
{
|
||||
WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.SUCCEEDED.value,
|
||||
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
|
||||
WorkflowNodeExecution.process_data: json.dumps(process_data) if event.process_data else None,
|
||||
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
|
||||
WorkflowNodeExecution.execution_metadata: execution_metadata,
|
||||
WorkflowNodeExecution.finished_at: finished_at,
|
||||
WorkflowNodeExecution.elapsed_time: elapsed_time,
|
||||
}
|
||||
)
|
||||
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
process_data = WorkflowEntry.handle_special_values(event.process_data)
|
||||
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
|
||||
@ -360,19 +325,22 @@ class WorkflowCycleManage:
|
||||
workflow_node_execution.finished_at = finished_at
|
||||
workflow_node_execution.elapsed_time = elapsed_time
|
||||
|
||||
self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _handle_workflow_node_execution_failed(
|
||||
self, event: QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
event: QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent,
|
||||
) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Workflow node execution failed
|
||||
:param event: queue node failed event
|
||||
:return:
|
||||
"""
|
||||
workflow_node_execution = self._refetch_workflow_node_execution(event.node_execution_id)
|
||||
workflow_node_execution = self._get_workflow_node_execution(
|
||||
session=session, node_execution_id=event.node_execution_id
|
||||
)
|
||||
|
||||
inputs = WorkflowEntry.handle_special_values(event.inputs)
|
||||
process_data = WorkflowEntry.handle_special_values(event.process_data)
|
||||
@ -382,25 +350,6 @@ class WorkflowCycleManage:
|
||||
execution_metadata = (
|
||||
json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None
|
||||
)
|
||||
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update(
|
||||
{
|
||||
WorkflowNodeExecution.status: (
|
||||
WorkflowNodeExecutionStatus.FAILED.value
|
||||
if not isinstance(event, QueueNodeExceptionEvent)
|
||||
else WorkflowNodeExecutionStatus.EXCEPTION.value
|
||||
),
|
||||
WorkflowNodeExecution.error: event.error,
|
||||
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
|
||||
WorkflowNodeExecution.process_data: json.dumps(process_data) if process_data else None,
|
||||
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
|
||||
WorkflowNodeExecution.finished_at: finished_at,
|
||||
WorkflowNodeExecution.elapsed_time: elapsed_time,
|
||||
WorkflowNodeExecution.execution_metadata: execution_metadata,
|
||||
}
|
||||
)
|
||||
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
process_data = WorkflowEntry.handle_special_values(event.process_data)
|
||||
workflow_node_execution.status = (
|
||||
WorkflowNodeExecutionStatus.FAILED.value
|
||||
@ -415,12 +364,10 @@ class WorkflowCycleManage:
|
||||
workflow_node_execution.elapsed_time = elapsed_time
|
||||
workflow_node_execution.execution_metadata = execution_metadata
|
||||
|
||||
self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _handle_workflow_node_execution_retried(
|
||||
self, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
|
||||
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
|
||||
) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Workflow node execution failed
|
||||
@ -444,6 +391,7 @@ class WorkflowCycleManage:
|
||||
execution_metadata = json.dumps(merged_metadata)
|
||||
|
||||
workflow_node_execution = WorkflowNodeExecution()
|
||||
workflow_node_execution.id = event.node_execution_id
|
||||
workflow_node_execution.tenant_id = workflow_run.tenant_id
|
||||
workflow_node_execution.app_id = workflow_run.app_id
|
||||
workflow_node_execution.workflow_id = workflow_run.workflow_id
|
||||
@ -466,10 +414,7 @@ class WorkflowCycleManage:
|
||||
workflow_node_execution.execution_metadata = execution_metadata
|
||||
workflow_node_execution.index = event.node_run_index
|
||||
|
||||
db.session.add(workflow_node_execution)
|
||||
db.session.commit()
|
||||
db.session.refresh(workflow_node_execution)
|
||||
|
||||
session.add(workflow_node_execution)
|
||||
return workflow_node_execution
|
||||
|
||||
#################################################
|
||||
@ -547,17 +492,20 @@ class WorkflowCycleManage:
|
||||
)
|
||||
|
||||
def _workflow_node_start_to_stream_response(
|
||||
self, event: QueueNodeStartedEvent, task_id: str, workflow_node_execution: WorkflowNodeExecution
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
event: QueueNodeStartedEvent,
|
||||
task_id: str,
|
||||
workflow_node_execution: WorkflowNodeExecution,
|
||||
) -> Optional[NodeStartStreamResponse]:
|
||||
"""
|
||||
Workflow node start to stream response.
|
||||
:param event: queue node started event
|
||||
:param task_id: task id
|
||||
:param workflow_node_execution: workflow node execution
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
|
||||
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
|
||||
return None
|
||||
if not workflow_node_execution.workflow_run_id:
|
||||
return None
|
||||
|
||||
response = NodeStartStreamResponse(
|
||||
task_id=task_id,
|
||||
@ -593,6 +541,8 @@ class WorkflowCycleManage:
|
||||
|
||||
def _workflow_node_finish_to_stream_response(
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
event: QueueNodeSucceededEvent
|
||||
| QueueNodeFailedEvent
|
||||
| QueueNodeInIterationFailedEvent
|
||||
@ -600,15 +550,14 @@ class WorkflowCycleManage:
|
||||
task_id: str,
|
||||
workflow_node_execution: WorkflowNodeExecution,
|
||||
) -> Optional[NodeFinishStreamResponse]:
|
||||
"""
|
||||
Workflow node finish to stream response.
|
||||
:param event: queue node succeeded or failed event
|
||||
:param task_id: task id
|
||||
:param workflow_node_execution: workflow node execution
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
|
||||
return None
|
||||
if not workflow_node_execution.workflow_run_id:
|
||||
return None
|
||||
if not workflow_node_execution.finished_at:
|
||||
return None
|
||||
|
||||
return NodeFinishStreamResponse(
|
||||
task_id=task_id,
|
||||
@ -640,19 +589,20 @@ class WorkflowCycleManage:
|
||||
|
||||
def _workflow_node_retry_to_stream_response(
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
event: QueueNodeRetryEvent,
|
||||
task_id: str,
|
||||
workflow_node_execution: WorkflowNodeExecution,
|
||||
) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]:
|
||||
"""
|
||||
Workflow node finish to stream response.
|
||||
:param event: queue node succeeded or failed event
|
||||
:param task_id: task id
|
||||
:param workflow_node_execution: workflow node execution
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
|
||||
return None
|
||||
if not workflow_node_execution.workflow_run_id:
|
||||
return None
|
||||
if not workflow_node_execution.finished_at:
|
||||
return None
|
||||
|
||||
return NodeRetryStreamResponse(
|
||||
task_id=task_id,
|
||||
@ -684,15 +634,10 @@ class WorkflowCycleManage:
|
||||
)
|
||||
|
||||
def _workflow_parallel_branch_start_to_stream_response(
|
||||
self, task_id: str, workflow_run: WorkflowRun, event: QueueParallelBranchRunStartedEvent
|
||||
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueParallelBranchRunStartedEvent
|
||||
) -> ParallelBranchStartStreamResponse:
|
||||
"""
|
||||
Workflow parallel branch start to stream response
|
||||
:param task_id: task id
|
||||
:param workflow_run: workflow run
|
||||
:param event: parallel branch run started event
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
return ParallelBranchStartStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
@ -708,17 +653,14 @@ class WorkflowCycleManage:
|
||||
|
||||
def _workflow_parallel_branch_finished_to_stream_response(
|
||||
self,
|
||||
*,
|
||||
session: Session,
|
||||
task_id: str,
|
||||
workflow_run: WorkflowRun,
|
||||
event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent,
|
||||
) -> ParallelBranchFinishedStreamResponse:
|
||||
"""
|
||||
Workflow parallel branch finished to stream response
|
||||
:param task_id: task id
|
||||
:param workflow_run: workflow run
|
||||
:param event: parallel branch run succeeded or failed event
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
return ParallelBranchFinishedStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
@ -735,15 +677,10 @@ class WorkflowCycleManage:
|
||||
)
|
||||
|
||||
def _workflow_iteration_start_to_stream_response(
|
||||
self, task_id: str, workflow_run: WorkflowRun, event: QueueIterationStartEvent
|
||||
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationStartEvent
|
||||
) -> IterationNodeStartStreamResponse:
|
||||
"""
|
||||
Workflow iteration start to stream response
|
||||
:param task_id: task id
|
||||
:param workflow_run: workflow run
|
||||
:param event: iteration start event
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
return IterationNodeStartStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
@ -762,15 +699,10 @@ class WorkflowCycleManage:
|
||||
)
|
||||
|
||||
def _workflow_iteration_next_to_stream_response(
|
||||
self, task_id: str, workflow_run: WorkflowRun, event: QueueIterationNextEvent
|
||||
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationNextEvent
|
||||
) -> IterationNodeNextStreamResponse:
|
||||
"""
|
||||
Workflow iteration next to stream response
|
||||
:param task_id: task id
|
||||
:param workflow_run: workflow run
|
||||
:param event: iteration next event
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
return IterationNodeNextStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
@ -791,15 +723,10 @@ class WorkflowCycleManage:
|
||||
)
|
||||
|
||||
def _workflow_iteration_completed_to_stream_response(
|
||||
self, task_id: str, workflow_run: WorkflowRun, event: QueueIterationCompletedEvent
|
||||
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationCompletedEvent
|
||||
) -> IterationNodeCompletedStreamResponse:
|
||||
"""
|
||||
Workflow iteration completed to stream response
|
||||
:param task_id: task id
|
||||
:param workflow_run: workflow run
|
||||
:param event: iteration completed event
|
||||
:return:
|
||||
"""
|
||||
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
|
||||
_ = session
|
||||
return IterationNodeCompletedStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
@ -883,7 +810,7 @@ class WorkflowCycleManage:
|
||||
|
||||
return None
|
||||
|
||||
def _refetch_workflow_run(self, *, session: Session, workflow_run_id: str) -> WorkflowRun:
|
||||
def _get_workflow_run(self, *, session: Session, workflow_run_id: str) -> WorkflowRun:
|
||||
"""
|
||||
Refetch workflow run
|
||||
:param workflow_run_id: workflow run id
|
||||
@ -896,14 +823,9 @@ class WorkflowCycleManage:
|
||||
|
||||
return workflow_run
|
||||
|
||||
def _refetch_workflow_node_execution(self, node_execution_id: str) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Refetch workflow node execution
|
||||
:param node_execution_id: workflow node execution id
|
||||
:return:
|
||||
"""
|
||||
workflow_node_execution = self._wip_workflow_node_executions.get(node_execution_id)
|
||||
|
||||
def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution:
|
||||
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.id == node_execution_id)
|
||||
workflow_node_execution = session.scalar(stmt)
|
||||
if not workflow_node_execution:
|
||||
raise WorkflowNodeExecutionNotFoundError(node_execution_id)
|
||||
|
||||
|
||||
@ -276,7 +276,7 @@ class IndexingRunner:
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.TEXT_EMBEDDING,
|
||||
)
|
||||
preview_texts = []
|
||||
preview_texts = [] # type: ignore
|
||||
|
||||
total_segments = 0
|
||||
index_type = doc_form
|
||||
@ -300,13 +300,13 @@ class IndexingRunner:
|
||||
if len(preview_texts) < 10:
|
||||
if doc_form and doc_form == "qa_model":
|
||||
preview_detail = QAPreviewDetail(
|
||||
question=document.page_content, answer=document.metadata.get("answer")
|
||||
question=document.page_content, answer=document.metadata.get("answer") or ""
|
||||
)
|
||||
preview_texts.append(preview_detail)
|
||||
else:
|
||||
preview_detail = PreviewDetail(content=document.page_content)
|
||||
preview_detail = PreviewDetail(content=document.page_content) # type: ignore
|
||||
if document.children:
|
||||
preview_detail.child_chunks = [child.page_content for child in document.children]
|
||||
preview_detail.child_chunks = [child.page_content for child in document.children] # type: ignore
|
||||
preview_texts.append(preview_detail)
|
||||
|
||||
# delete image files and related db records
|
||||
@ -325,7 +325,7 @@ class IndexingRunner:
|
||||
|
||||
if doc_form and doc_form == "qa_model":
|
||||
return IndexingEstimate(total_segments=total_segments * 20, qa_preview=preview_texts, preview=[])
|
||||
return IndexingEstimate(total_segments=total_segments, preview=preview_texts)
|
||||
return IndexingEstimate(total_segments=total_segments, preview=preview_texts) # type: ignore
|
||||
|
||||
def _extract(
|
||||
self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict
|
||||
@ -454,7 +454,7 @@ class IndexingRunner:
|
||||
embedding_model_instance=embedding_model_instance,
|
||||
)
|
||||
|
||||
return character_splitter
|
||||
return character_splitter # type: ignore
|
||||
|
||||
def _split_to_documents_for_estimate(
|
||||
self, text_docs: list[Document], splitter: TextSplitter, processing_rule: DatasetProcessRule
|
||||
@ -535,7 +535,7 @@ class IndexingRunner:
|
||||
# create keyword index
|
||||
create_keyword_thread = threading.Thread(
|
||||
target=self._process_keyword_index,
|
||||
args=(current_app._get_current_object(), dataset.id, dataset_document.id, documents),
|
||||
args=(current_app._get_current_object(), dataset.id, dataset_document.id, documents), # type: ignore
|
||||
)
|
||||
create_keyword_thread.start()
|
||||
|
||||
|
||||
@ -258,78 +258,79 @@ class RetrievalService:
|
||||
include_segment_ids = []
|
||||
segment_child_map = {}
|
||||
for document in documents:
|
||||
document_id = document.metadata["document_id"]
|
||||
document_id = document.metadata.get("document_id")
|
||||
dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == document_id).first()
|
||||
if dataset_document and dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:
|
||||
child_index_node_id = document.metadata["doc_id"]
|
||||
result = (
|
||||
db.session.query(ChildChunk, DocumentSegment)
|
||||
.join(DocumentSegment, ChildChunk.segment_id == DocumentSegment.id)
|
||||
.filter(
|
||||
ChildChunk.index_node_id == child_index_node_id,
|
||||
DocumentSegment.dataset_id == dataset_document.dataset_id,
|
||||
DocumentSegment.enabled == True,
|
||||
DocumentSegment.status == "completed",
|
||||
if dataset_document:
|
||||
if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:
|
||||
child_index_node_id = document.metadata.get("doc_id")
|
||||
result = (
|
||||
db.session.query(ChildChunk, DocumentSegment)
|
||||
.join(DocumentSegment, ChildChunk.segment_id == DocumentSegment.id)
|
||||
.filter(
|
||||
ChildChunk.index_node_id == child_index_node_id,
|
||||
DocumentSegment.dataset_id == dataset_document.dataset_id,
|
||||
DocumentSegment.enabled == True,
|
||||
DocumentSegment.status == "completed",
|
||||
)
|
||||
.first()
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if result:
|
||||
child_chunk, segment = result
|
||||
if result:
|
||||
child_chunk, segment = result
|
||||
if not segment:
|
||||
continue
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.append(segment.id)
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0),
|
||||
}
|
||||
map_detail = {
|
||||
"max_score": document.metadata.get("score", 0.0),
|
||||
"child_chunks": [child_chunk_detail],
|
||||
}
|
||||
segment_child_map[segment.id] = map_detail
|
||||
record = {
|
||||
"segment": segment,
|
||||
}
|
||||
records.append(record)
|
||||
else:
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0),
|
||||
}
|
||||
segment_child_map[segment.id]["child_chunks"].append(child_chunk_detail)
|
||||
segment_child_map[segment.id]["max_score"] = max(
|
||||
segment_child_map[segment.id]["max_score"], document.metadata.get("score", 0.0)
|
||||
)
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
index_node_id = document.metadata["doc_id"]
|
||||
|
||||
segment = (
|
||||
db.session.query(DocumentSegment)
|
||||
.filter(
|
||||
DocumentSegment.dataset_id == dataset_document.dataset_id,
|
||||
DocumentSegment.enabled == True,
|
||||
DocumentSegment.status == "completed",
|
||||
DocumentSegment.index_node_id == index_node_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not segment:
|
||||
continue
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.append(segment.id)
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0),
|
||||
}
|
||||
map_detail = {
|
||||
"max_score": document.metadata.get("score", 0.0),
|
||||
"child_chunks": [child_chunk_detail],
|
||||
}
|
||||
segment_child_map[segment.id] = map_detail
|
||||
record = {
|
||||
"segment": segment,
|
||||
}
|
||||
records.append(record)
|
||||
else:
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0),
|
||||
}
|
||||
segment_child_map[segment.id]["child_chunks"].append(child_chunk_detail)
|
||||
segment_child_map[segment.id]["max_score"] = max(
|
||||
segment_child_map[segment.id]["max_score"], document.metadata.get("score", 0.0)
|
||||
)
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
index_node_id = document.metadata["doc_id"]
|
||||
include_segment_ids.append(segment.id)
|
||||
record = {
|
||||
"segment": segment,
|
||||
"score": document.metadata.get("score", None),
|
||||
}
|
||||
|
||||
segment = (
|
||||
db.session.query(DocumentSegment)
|
||||
.filter(
|
||||
DocumentSegment.dataset_id == dataset_document.dataset_id,
|
||||
DocumentSegment.enabled == True,
|
||||
DocumentSegment.status == "completed",
|
||||
DocumentSegment.index_node_id == index_node_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not segment:
|
||||
continue
|
||||
include_segment_ids.append(segment.id)
|
||||
record = {
|
||||
"segment": segment,
|
||||
"score": document.metadata.get("score", None),
|
||||
}
|
||||
|
||||
records.append(record)
|
||||
records.append(record)
|
||||
for record in records:
|
||||
if record["segment"].id in segment_child_map:
|
||||
record["child_chunks"] = segment_child_map[record["segment"].id].get("child_chunks", None)
|
||||
|
||||
@ -122,26 +122,27 @@ class DatasetDocumentStore:
|
||||
db.session.add(segment_document)
|
||||
db.session.flush()
|
||||
if save_child:
|
||||
for postion, child in enumerate(doc.children, start=1):
|
||||
child_segment = ChildChunk(
|
||||
tenant_id=self._dataset.tenant_id,
|
||||
dataset_id=self._dataset.id,
|
||||
document_id=self._document_id,
|
||||
segment_id=segment_document.id,
|
||||
position=postion,
|
||||
index_node_id=child.metadata["doc_id"],
|
||||
index_node_hash=child.metadata["doc_hash"],
|
||||
content=child.page_content,
|
||||
word_count=len(child.page_content),
|
||||
type="automatic",
|
||||
created_by=self._user_id,
|
||||
)
|
||||
db.session.add(child_segment)
|
||||
if doc.children:
|
||||
for postion, child in enumerate(doc.children, start=1):
|
||||
child_segment = ChildChunk(
|
||||
tenant_id=self._dataset.tenant_id,
|
||||
dataset_id=self._dataset.id,
|
||||
document_id=self._document_id,
|
||||
segment_id=segment_document.id,
|
||||
position=postion,
|
||||
index_node_id=child.metadata.get("doc_id"),
|
||||
index_node_hash=child.metadata.get("doc_hash"),
|
||||
content=child.page_content,
|
||||
word_count=len(child.page_content),
|
||||
type="automatic",
|
||||
created_by=self._user_id,
|
||||
)
|
||||
db.session.add(child_segment)
|
||||
else:
|
||||
segment_document.content = doc.page_content
|
||||
if doc.metadata.get("answer"):
|
||||
segment_document.answer = doc.metadata.pop("answer", "")
|
||||
segment_document.index_node_hash = doc.metadata["doc_hash"]
|
||||
segment_document.index_node_hash = doc.metadata.get("doc_hash")
|
||||
segment_document.word_count = len(doc.page_content)
|
||||
segment_document.tokens = tokens
|
||||
if save_child and doc.children:
|
||||
@ -160,8 +161,8 @@ class DatasetDocumentStore:
|
||||
document_id=self._document_id,
|
||||
segment_id=segment_document.id,
|
||||
position=position,
|
||||
index_node_id=child.metadata["doc_id"],
|
||||
index_node_hash=child.metadata["doc_hash"],
|
||||
index_node_id=child.metadata.get("doc_id"),
|
||||
index_node_hash=child.metadata.get("doc_hash"),
|
||||
content=child.page_content,
|
||||
word_count=len(child.page_content),
|
||||
type="automatic",
|
||||
|
||||
@ -4,7 +4,7 @@ import os
|
||||
from typing import Optional, cast
|
||||
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook
|
||||
from openpyxl import load_workbook # type: ignore
|
||||
|
||||
from core.rag.extractor.extractor_base import BaseExtractor
|
||||
from core.rag.models.document import Document
|
||||
|
||||
@ -81,4 +81,4 @@ class BaseIndexProcessor(ABC):
|
||||
embedding_model_instance=embedding_model_instance,
|
||||
)
|
||||
|
||||
return character_splitter
|
||||
return character_splitter # type: ignore
|
||||
|
||||
@ -30,12 +30,18 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
|
||||
|
||||
def transform(self, documents: list[Document], **kwargs) -> list[Document]:
|
||||
process_rule = kwargs.get("process_rule")
|
||||
if not process_rule:
|
||||
raise ValueError("No process rule found.")
|
||||
if process_rule.get("mode") == "automatic":
|
||||
automatic_rule = DatasetProcessRule.AUTOMATIC_RULES
|
||||
rules = Rule(**automatic_rule)
|
||||
else:
|
||||
if not process_rule.get("rules"):
|
||||
raise ValueError("No rules found in process rule.")
|
||||
rules = Rule(**process_rule.get("rules"))
|
||||
# Split the text documents into nodes.
|
||||
if not rules.segmentation:
|
||||
raise ValueError("No segmentation found in rules.")
|
||||
splitter = self._get_splitter(
|
||||
processing_rule_mode=process_rule.get("mode"),
|
||||
max_tokens=rules.segmentation.max_tokens,
|
||||
|
||||
@ -30,8 +30,12 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
|
||||
|
||||
def transform(self, documents: list[Document], **kwargs) -> list[Document]:
|
||||
process_rule = kwargs.get("process_rule")
|
||||
if not process_rule:
|
||||
raise ValueError("No process rule found.")
|
||||
if not process_rule.get("rules"):
|
||||
raise ValueError("No rules found in process rule.")
|
||||
rules = Rule(**process_rule.get("rules"))
|
||||
all_documents = []
|
||||
all_documents = [] # type: ignore
|
||||
if rules.parent_mode == ParentMode.PARAGRAPH:
|
||||
# Split the text documents into nodes.
|
||||
splitter = self._get_splitter(
|
||||
@ -161,6 +165,8 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
|
||||
process_rule_mode: str,
|
||||
embedding_model_instance: Optional[ModelInstance],
|
||||
) -> list[ChildDocument]:
|
||||
if not rules.subchunk_segmentation:
|
||||
raise ValueError("No subchunk segmentation found in rules.")
|
||||
child_splitter = self._get_splitter(
|
||||
processing_rule_mode=process_rule_mode,
|
||||
max_tokens=rules.subchunk_segmentation.max_tokens,
|
||||
|
||||
@ -37,12 +37,16 @@ class QAIndexProcessor(BaseIndexProcessor):
|
||||
def transform(self, documents: list[Document], **kwargs) -> list[Document]:
|
||||
preview = kwargs.get("preview")
|
||||
process_rule = kwargs.get("process_rule")
|
||||
if not process_rule:
|
||||
raise ValueError("No process rule found.")
|
||||
if not process_rule.get("rules"):
|
||||
raise ValueError("No rules found in process rule.")
|
||||
rules = Rule(**process_rule.get("rules"))
|
||||
splitter = self._get_splitter(
|
||||
processing_rule_mode=process_rule.get("mode"),
|
||||
max_tokens=rules.segmentation.max_tokens,
|
||||
chunk_overlap=rules.segmentation.chunk_overlap,
|
||||
separator=rules.segmentation.separator,
|
||||
max_tokens=rules.segmentation.max_tokens if rules.segmentation else 0,
|
||||
chunk_overlap=rules.segmentation.chunk_overlap if rules.segmentation else 0,
|
||||
separator=rules.segmentation.separator if rules.segmentation else "",
|
||||
embedding_model_instance=kwargs.get("embedding_model_instance"),
|
||||
)
|
||||
|
||||
@ -71,8 +75,8 @@ class QAIndexProcessor(BaseIndexProcessor):
|
||||
all_documents.extend(split_documents)
|
||||
if preview:
|
||||
self._format_qa_document(
|
||||
current_app._get_current_object(),
|
||||
kwargs.get("tenant_id"),
|
||||
current_app._get_current_object(), # type: ignore
|
||||
kwargs.get("tenant_id"), # type: ignore
|
||||
all_documents[0],
|
||||
all_qa_documents,
|
||||
kwargs.get("doc_language", "English"),
|
||||
@ -85,8 +89,8 @@ class QAIndexProcessor(BaseIndexProcessor):
|
||||
document_format_thread = threading.Thread(
|
||||
target=self._format_qa_document,
|
||||
kwargs={
|
||||
"flask_app": current_app._get_current_object(),
|
||||
"tenant_id": kwargs.get("tenant_id"),
|
||||
"flask_app": current_app._get_current_object(), # type: ignore
|
||||
"tenant_id": kwargs.get("tenant_id"), # type: ignore
|
||||
"document_node": doc,
|
||||
"all_qa_documents": all_qa_documents,
|
||||
"document_language": kwargs.get("doc_language", "English"),
|
||||
|
||||
@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
|
||||
from collections.abc import Sequence
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ChildDocument(BaseModel):
|
||||
@ -15,7 +15,7 @@ class ChildDocument(BaseModel):
|
||||
"""Arbitrary metadata about the page content (e.g., source, relationships to other
|
||||
documents, etc.).
|
||||
"""
|
||||
metadata: Optional[dict] = Field(default_factory=dict)
|
||||
metadata: dict = {}
|
||||
|
||||
|
||||
class Document(BaseModel):
|
||||
@ -28,7 +28,7 @@ class Document(BaseModel):
|
||||
"""Arbitrary metadata about the page content (e.g., source, relationships to other
|
||||
documents, etc.).
|
||||
"""
|
||||
metadata: Optional[dict] = Field(default_factory=dict)
|
||||
metadata: dict = {}
|
||||
|
||||
provider: Optional[str] = "dify"
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@ from dify_app import DifyApp
|
||||
def init_app(app: DifyApp):
|
||||
# register blueprint routers
|
||||
|
||||
from flask_cors import CORS
|
||||
from flask_cors import CORS # type: ignore
|
||||
|
||||
from controllers.console import bp as console_app_bp
|
||||
from controllers.files import bp as files_bp
|
||||
|
||||
@ -29,7 +29,6 @@ workflow_run_for_list_fields = {
|
||||
"created_at": TimestampField,
|
||||
"finished_at": TimestampField,
|
||||
"exceptions_count": fields.Integer,
|
||||
"retry_index": fields.Integer,
|
||||
}
|
||||
|
||||
advanced_chat_workflow_run_for_list_fields = {
|
||||
@ -46,7 +45,6 @@ advanced_chat_workflow_run_for_list_fields = {
|
||||
"created_at": TimestampField,
|
||||
"finished_at": TimestampField,
|
||||
"exceptions_count": fields.Integer,
|
||||
"retry_index": fields.Integer,
|
||||
}
|
||||
|
||||
advanced_chat_workflow_run_pagination_fields = {
|
||||
@ -81,19 +79,6 @@ workflow_run_detail_fields = {
|
||||
"exceptions_count": fields.Integer,
|
||||
}
|
||||
|
||||
retry_event_field = {
|
||||
"elapsed_time": fields.Float,
|
||||
"status": fields.String,
|
||||
"inputs": fields.Raw(attribute="inputs"),
|
||||
"process_data": fields.Raw(attribute="process_data"),
|
||||
"outputs": fields.Raw(attribute="outputs"),
|
||||
"metadata": fields.Raw(attribute="metadata"),
|
||||
"llm_usage": fields.Raw(attribute="llm_usage"),
|
||||
"error": fields.String,
|
||||
"retry_index": fields.Integer,
|
||||
}
|
||||
|
||||
|
||||
workflow_run_node_execution_fields = {
|
||||
"id": fields.String,
|
||||
"index": fields.Integer,
|
||||
|
||||
@ -628,7 +628,7 @@ class DocumentSegment(db.Model): # type: ignore[name-defined]
|
||||
return text
|
||||
|
||||
|
||||
class ChildChunk(db.Model):
|
||||
class ChildChunk(db.Model): # type: ignore[name-defined]
|
||||
__tablename__ = "child_chunks"
|
||||
__table_args__ = (
|
||||
db.PrimaryKeyConstraint("id", name="child_chunk_pkey"),
|
||||
@ -910,7 +910,7 @@ class ExternalKnowledgeBindings(db.Model): # type: ignore[name-defined]
|
||||
updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
|
||||
|
||||
class DatasetAutoDisableLog(db.Model):
|
||||
class DatasetAutoDisableLog(db.Model): # type: ignore[name-defined]
|
||||
__tablename__ = "dataset_auto_disable_logs"
|
||||
__table_args__ = (
|
||||
db.PrimaryKeyConstraint("id", name="dataset_auto_disable_log_pkey"),
|
||||
|
||||
@ -400,11 +400,11 @@ class WorkflowRun(db.Model): # type: ignore[name-defined]
|
||||
type: Mapped[str] = mapped_column(db.String(255))
|
||||
triggered_from: Mapped[str] = mapped_column(db.String(255))
|
||||
version: Mapped[str] = mapped_column(db.String(255))
|
||||
graph: Mapped[str] = mapped_column(db.Text)
|
||||
inputs: Mapped[str] = mapped_column(db.Text)
|
||||
graph: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
inputs: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded
|
||||
outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}")
|
||||
error: Mapped[str] = mapped_column(db.Text)
|
||||
error: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text("0"))
|
||||
total_tokens: Mapped[int] = mapped_column(server_default=db.text("0"))
|
||||
total_steps = db.Column(db.Integer, server_default=db.text("0"))
|
||||
@ -609,29 +609,29 @@ class WorkflowNodeExecution(db.Model): # type: ignore[name-defined]
|
||||
),
|
||||
)
|
||||
|
||||
id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
|
||||
tenant_id = db.Column(StringUUID, nullable=False)
|
||||
app_id = db.Column(StringUUID, nullable=False)
|
||||
workflow_id = db.Column(StringUUID, nullable=False)
|
||||
triggered_from = db.Column(db.String(255), nullable=False)
|
||||
workflow_run_id = db.Column(StringUUID)
|
||||
index = db.Column(db.Integer, nullable=False)
|
||||
predecessor_node_id = db.Column(db.String(255))
|
||||
node_execution_id = db.Column(db.String(255), nullable=True)
|
||||
node_id = db.Column(db.String(255), nullable=False)
|
||||
node_type = db.Column(db.String(255), nullable=False)
|
||||
title = db.Column(db.String(255), nullable=False)
|
||||
inputs = db.Column(db.Text)
|
||||
process_data = db.Column(db.Text)
|
||||
outputs = db.Column(db.Text)
|
||||
status = db.Column(db.String(255), nullable=False)
|
||||
error = db.Column(db.Text)
|
||||
elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text("0"))
|
||||
execution_metadata = db.Column(db.Text)
|
||||
created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
created_by_role = db.Column(db.String(255), nullable=False)
|
||||
created_by = db.Column(StringUUID, nullable=False)
|
||||
finished_at = db.Column(db.DateTime)
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID)
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID)
|
||||
triggered_from: Mapped[str] = mapped_column(db.String(255))
|
||||
workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID)
|
||||
index: Mapped[int] = mapped_column(db.Integer)
|
||||
predecessor_node_id: Mapped[Optional[str]] = mapped_column(db.String(255))
|
||||
node_execution_id: Mapped[Optional[str]] = mapped_column(db.String(255))
|
||||
node_id: Mapped[str] = mapped_column(db.String(255))
|
||||
node_type: Mapped[str] = mapped_column(db.String(255))
|
||||
title: Mapped[str] = mapped_column(db.String(255))
|
||||
inputs: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
process_data: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
outputs: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
status: Mapped[str] = mapped_column(db.String(255))
|
||||
error: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
elapsed_time: Mapped[float] = mapped_column(db.Float, server_default=db.text("0"))
|
||||
execution_metadata: Mapped[Optional[str]] = mapped_column(db.Text)
|
||||
created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp())
|
||||
created_by_role: Mapped[str] = mapped_column(db.String(255))
|
||||
created_by: Mapped[str] = mapped_column(StringUUID)
|
||||
finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
|
||||
|
||||
@property
|
||||
def created_by_account(self):
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import click
|
||||
from celery import shared_task
|
||||
from flask import render_template
|
||||
from celery import shared_task # type: ignore
|
||||
|
||||
from extensions.ext_mail import mail
|
||||
from models.account import Account, Tenant, TenantAccountJoin
|
||||
@ -27,7 +27,7 @@ def send_document_clean_notify_task():
|
||||
try:
|
||||
dataset_auto_disable_logs = DatasetAutoDisableLog.query.filter(DatasetAutoDisableLog.notified == False).all()
|
||||
# group by tenant_id
|
||||
dataset_auto_disable_logs_map = {}
|
||||
dataset_auto_disable_logs_map: dict[str, list[DatasetAutoDisableLog]] = defaultdict(list)
|
||||
for dataset_auto_disable_log in dataset_auto_disable_logs:
|
||||
dataset_auto_disable_logs_map[dataset_auto_disable_log.tenant_id].append(dataset_auto_disable_log)
|
||||
|
||||
@ -37,11 +37,13 @@ def send_document_clean_notify_task():
|
||||
if not tenant:
|
||||
continue
|
||||
current_owner_join = TenantAccountJoin.query.filter_by(tenant_id=tenant.id, role="owner").first()
|
||||
if not current_owner_join:
|
||||
continue
|
||||
account = Account.query.filter(Account.id == current_owner_join.account_id).first()
|
||||
if not account:
|
||||
continue
|
||||
|
||||
dataset_auto_dataset_map = {}
|
||||
dataset_auto_dataset_map = {} # type: ignore
|
||||
for dataset_auto_disable_log in tenant_dataset_auto_disable_logs:
|
||||
dataset_auto_dataset_map[dataset_auto_disable_log.dataset_id].append(
|
||||
dataset_auto_disable_log.document_id
|
||||
@ -53,14 +55,9 @@ def send_document_clean_notify_task():
|
||||
document_count = len(document_ids)
|
||||
knowledge_details.append(f"<li>Knowledge base {dataset.name}: {document_count} documents</li>")
|
||||
|
||||
html_content = render_template(
|
||||
"clean_document_job_mail_template-US.html",
|
||||
)
|
||||
mail.send(to=to, subject="立即加入 Dify 工作空间", html=html_content)
|
||||
|
||||
end_at = time.perf_counter()
|
||||
logging.info(
|
||||
click.style("Send document clean notify mail succeeded: latency: {}".format(end_at - start_at), fg="green")
|
||||
)
|
||||
except Exception:
|
||||
logging.exception("Send invite member mail to {} failed".format(to))
|
||||
logging.exception("Send invite member mail to failed")
|
||||
|
||||
@ -4,7 +4,7 @@ from enum import StrEnum
|
||||
from typing import Optional, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import yaml
|
||||
import yaml # type: ignore
|
||||
from packaging import version
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
@ -465,7 +465,7 @@ class AppDslService:
|
||||
else:
|
||||
cls._append_model_config_export_data(export_data, app_model)
|
||||
|
||||
return yaml.dump(export_data, allow_unicode=True)
|
||||
return yaml.dump(export_data, allow_unicode=True) # type: ignore
|
||||
|
||||
@classmethod
|
||||
def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
|
||||
|
||||
@ -41,6 +41,7 @@ from models.source import DataSourceOauthBinding
|
||||
from services.entities.knowledge_entities.knowledge_entities import (
|
||||
ChildChunkUpdateArgs,
|
||||
KnowledgeConfig,
|
||||
RerankingModel,
|
||||
RetrievalModel,
|
||||
SegmentUpdateArgs,
|
||||
)
|
||||
@ -548,12 +549,14 @@ class DocumentService:
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_document(dataset_id: str, document_id: str) -> Optional[Document]:
|
||||
document = (
|
||||
db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
|
||||
)
|
||||
|
||||
return document
|
||||
def get_document(dataset_id: str, document_id: Optional[str] = None) -> Optional[Document]:
|
||||
if document_id:
|
||||
document = (
|
||||
db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first()
|
||||
)
|
||||
return document
|
||||
else:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_document_by_id(document_id: str) -> Optional[Document]:
|
||||
@ -744,25 +747,26 @@ class DocumentService:
|
||||
if features.billing.enabled:
|
||||
if not knowledge_config.original_document_id:
|
||||
count = 0
|
||||
if knowledge_config.data_source.info_list.data_source_type == "upload_file":
|
||||
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids
|
||||
count = len(upload_file_list)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "notion_import":
|
||||
notion_info_list = knowledge_config.data_source.info_list.notion_info_list
|
||||
for notion_info in notion_info_list:
|
||||
count = count + len(notion_info.pages)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "website_crawl":
|
||||
website_info = knowledge_config.data_source.info_list.website_info_list
|
||||
count = len(website_info.urls)
|
||||
batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT)
|
||||
if count > batch_upload_limit:
|
||||
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
|
||||
if knowledge_config.data_source:
|
||||
if knowledge_config.data_source.info_list.data_source_type == "upload_file":
|
||||
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids # type: ignore
|
||||
count = len(upload_file_list)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "notion_import":
|
||||
notion_info_list = knowledge_config.data_source.info_list.notion_info_list
|
||||
for notion_info in notion_info_list: # type: ignore
|
||||
count = count + len(notion_info.pages)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "website_crawl":
|
||||
website_info = knowledge_config.data_source.info_list.website_info_list
|
||||
count = len(website_info.urls) # type: ignore
|
||||
batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT)
|
||||
if count > batch_upload_limit:
|
||||
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
|
||||
|
||||
DocumentService.check_documents_upload_quota(count, features)
|
||||
DocumentService.check_documents_upload_quota(count, features)
|
||||
|
||||
# if dataset is empty, update dataset data_source_type
|
||||
if not dataset.data_source_type:
|
||||
dataset.data_source_type = knowledge_config.data_source.info_list.data_source_type
|
||||
dataset.data_source_type = knowledge_config.data_source.info_list.data_source_type # type: ignore
|
||||
|
||||
if not dataset.indexing_technique:
|
||||
if knowledge_config.indexing_technique not in Dataset.INDEXING_TECHNIQUE_LIST:
|
||||
@ -789,7 +793,7 @@ class DocumentService:
|
||||
"score_threshold_enabled": False,
|
||||
}
|
||||
|
||||
dataset.retrieval_model = knowledge_config.retrieval_model.model_dump() or default_retrieval_model
|
||||
dataset.retrieval_model = knowledge_config.retrieval_model.model_dump() or default_retrieval_model # type: ignore
|
||||
|
||||
documents = []
|
||||
if knowledge_config.original_document_id:
|
||||
@ -801,34 +805,35 @@ class DocumentService:
|
||||
# save process rule
|
||||
if not dataset_process_rule:
|
||||
process_rule = knowledge_config.process_rule
|
||||
if process_rule.mode in ("custom", "hierarchical"):
|
||||
dataset_process_rule = DatasetProcessRule(
|
||||
dataset_id=dataset.id,
|
||||
mode=process_rule.mode,
|
||||
rules=process_rule.rules.model_dump_json(),
|
||||
created_by=account.id,
|
||||
)
|
||||
elif process_rule.mode == "automatic":
|
||||
dataset_process_rule = DatasetProcessRule(
|
||||
dataset_id=dataset.id,
|
||||
mode=process_rule.mode,
|
||||
rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES),
|
||||
created_by=account.id,
|
||||
)
|
||||
else:
|
||||
logging.warn(
|
||||
f"Invalid process rule mode: {process_rule['mode']}, can not find dataset process rule"
|
||||
)
|
||||
return
|
||||
db.session.add(dataset_process_rule)
|
||||
db.session.commit()
|
||||
if process_rule:
|
||||
if process_rule.mode in ("custom", "hierarchical"):
|
||||
dataset_process_rule = DatasetProcessRule(
|
||||
dataset_id=dataset.id,
|
||||
mode=process_rule.mode,
|
||||
rules=process_rule.rules.model_dump_json() if process_rule.rules else None,
|
||||
created_by=account.id,
|
||||
)
|
||||
elif process_rule.mode == "automatic":
|
||||
dataset_process_rule = DatasetProcessRule(
|
||||
dataset_id=dataset.id,
|
||||
mode=process_rule.mode,
|
||||
rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES),
|
||||
created_by=account.id,
|
||||
)
|
||||
else:
|
||||
logging.warn(
|
||||
f"Invalid process rule mode: {process_rule.mode}, can not find dataset process rule"
|
||||
)
|
||||
return
|
||||
db.session.add(dataset_process_rule)
|
||||
db.session.commit()
|
||||
lock_name = "add_document_lock_dataset_id_{}".format(dataset.id)
|
||||
with redis_client.lock(lock_name, timeout=600):
|
||||
position = DocumentService.get_documents_position(dataset.id)
|
||||
document_ids = []
|
||||
duplicate_document_ids = []
|
||||
if knowledge_config.data_source.info_list.data_source_type == "upload_file":
|
||||
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids
|
||||
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids # type: ignore
|
||||
for file_id in upload_file_list:
|
||||
file = (
|
||||
db.session.query(UploadFile)
|
||||
@ -854,7 +859,7 @@ class DocumentService:
|
||||
name=file_name,
|
||||
).first()
|
||||
if document:
|
||||
document.dataset_process_rule_id = dataset_process_rule.id
|
||||
document.dataset_process_rule_id = dataset_process_rule.id # type: ignore
|
||||
document.updated_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
document.created_from = created_from
|
||||
document.doc_form = knowledge_config.doc_form
|
||||
@ -868,7 +873,7 @@ class DocumentService:
|
||||
continue
|
||||
document = DocumentService.build_document(
|
||||
dataset,
|
||||
dataset_process_rule.id,
|
||||
dataset_process_rule.id, # type: ignore
|
||||
knowledge_config.data_source.info_list.data_source_type,
|
||||
knowledge_config.doc_form,
|
||||
knowledge_config.doc_language,
|
||||
@ -886,6 +891,8 @@ class DocumentService:
|
||||
position += 1
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "notion_import":
|
||||
notion_info_list = knowledge_config.data_source.info_list.notion_info_list
|
||||
if not notion_info_list:
|
||||
raise ValueError("No notion info list found.")
|
||||
exist_page_ids = []
|
||||
exist_document = {}
|
||||
documents = Document.query.filter_by(
|
||||
@ -921,7 +928,7 @@ class DocumentService:
|
||||
}
|
||||
document = DocumentService.build_document(
|
||||
dataset,
|
||||
dataset_process_rule.id,
|
||||
dataset_process_rule.id, # type: ignore
|
||||
knowledge_config.data_source.info_list.data_source_type,
|
||||
knowledge_config.doc_form,
|
||||
knowledge_config.doc_language,
|
||||
@ -944,6 +951,8 @@ class DocumentService:
|
||||
clean_notion_document_task.delay(list(exist_document.values()), dataset.id)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "website_crawl":
|
||||
website_info = knowledge_config.data_source.info_list.website_info_list
|
||||
if not website_info:
|
||||
raise ValueError("No website info list found.")
|
||||
urls = website_info.urls
|
||||
for url in urls:
|
||||
data_source_info = {
|
||||
@ -959,7 +968,7 @@ class DocumentService:
|
||||
document_name = url
|
||||
document = DocumentService.build_document(
|
||||
dataset,
|
||||
dataset_process_rule.id,
|
||||
dataset_process_rule.id, # type: ignore
|
||||
knowledge_config.data_source.info_list.data_source_type,
|
||||
knowledge_config.doc_form,
|
||||
knowledge_config.doc_language,
|
||||
@ -1054,7 +1063,7 @@ class DocumentService:
|
||||
dataset_process_rule = DatasetProcessRule(
|
||||
dataset_id=dataset.id,
|
||||
mode=process_rule.mode,
|
||||
rules=process_rule.rules.model_dump_json(),
|
||||
rules=process_rule.rules.model_dump_json() if process_rule.rules else None,
|
||||
created_by=account.id,
|
||||
)
|
||||
elif process_rule.mode == "automatic":
|
||||
@ -1073,6 +1082,8 @@ class DocumentService:
|
||||
file_name = ""
|
||||
data_source_info = {}
|
||||
if document_data.data_source.info_list.data_source_type == "upload_file":
|
||||
if not document_data.data_source.info_list.file_info_list:
|
||||
raise ValueError("No file info list found.")
|
||||
upload_file_list = document_data.data_source.info_list.file_info_list.file_ids
|
||||
for file_id in upload_file_list:
|
||||
file = (
|
||||
@ -1090,6 +1101,8 @@ class DocumentService:
|
||||
"upload_file_id": file_id,
|
||||
}
|
||||
elif document_data.data_source.info_list.data_source_type == "notion_import":
|
||||
if not document_data.data_source.info_list.notion_info_list:
|
||||
raise ValueError("No notion info list found.")
|
||||
notion_info_list = document_data.data_source.info_list.notion_info_list
|
||||
for notion_info in notion_info_list:
|
||||
workspace_id = notion_info.workspace_id
|
||||
@ -1107,20 +1120,21 @@ class DocumentService:
|
||||
data_source_info = {
|
||||
"notion_workspace_id": workspace_id,
|
||||
"notion_page_id": page.page_id,
|
||||
"notion_page_icon": page.page_icon,
|
||||
"notion_page_icon": page.page_icon.model_dump() if page.page_icon else None, # type: ignore
|
||||
"type": page.type,
|
||||
}
|
||||
elif document_data.data_source.info_list.data_source_type == "website_crawl":
|
||||
website_info = document_data.data_source.info_list.website_info_list
|
||||
urls = website_info.urls
|
||||
for url in urls:
|
||||
data_source_info = {
|
||||
"url": url,
|
||||
"provider": website_info.provider,
|
||||
"job_id": website_info.job_id,
|
||||
"only_main_content": website_info.only_main_content,
|
||||
"mode": "crawl",
|
||||
}
|
||||
if website_info:
|
||||
urls = website_info.urls
|
||||
for url in urls:
|
||||
data_source_info = {
|
||||
"url": url,
|
||||
"provider": website_info.provider,
|
||||
"job_id": website_info.job_id,
|
||||
"only_main_content": website_info.only_main_content, # type: ignore
|
||||
"mode": "crawl",
|
||||
}
|
||||
document.data_source_type = document_data.data_source.info_list.data_source_type
|
||||
document.data_source_info = json.dumps(data_source_info)
|
||||
document.name = file_name
|
||||
@ -1155,15 +1169,21 @@ class DocumentService:
|
||||
if features.billing.enabled:
|
||||
count = 0
|
||||
if knowledge_config.data_source.info_list.data_source_type == "upload_file":
|
||||
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids
|
||||
upload_file_list = (
|
||||
knowledge_config.data_source.info_list.file_info_list.file_ids
|
||||
if knowledge_config.data_source.info_list.file_info_list
|
||||
else []
|
||||
)
|
||||
count = len(upload_file_list)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "notion_import":
|
||||
notion_info_list = knowledge_config.data_source.info_list.notion_info_list
|
||||
for notion_info in notion_info_list:
|
||||
count = count + len(notion_info.pages)
|
||||
if notion_info_list:
|
||||
for notion_info in notion_info_list:
|
||||
count = count + len(notion_info.pages)
|
||||
elif knowledge_config.data_source.info_list.data_source_type == "website_crawl":
|
||||
website_info = knowledge_config.data_source.info_list.website_info_list
|
||||
count = len(website_info.urls)
|
||||
if website_info:
|
||||
count = len(website_info.urls)
|
||||
batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT)
|
||||
if count > batch_upload_limit:
|
||||
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
|
||||
@ -1174,20 +1194,20 @@ class DocumentService:
|
||||
retrieval_model = None
|
||||
if knowledge_config.indexing_technique == "high_quality":
|
||||
dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding(
|
||||
knowledge_config.embedding_model_provider, knowledge_config.embedding_model
|
||||
knowledge_config.embedding_model_provider, # type: ignore
|
||||
knowledge_config.embedding_model, # type: ignore
|
||||
)
|
||||
dataset_collection_binding_id = dataset_collection_binding.id
|
||||
if knowledge_config.retrieval_model:
|
||||
retrieval_model = knowledge_config.retrieval_model
|
||||
else:
|
||||
default_retrieval_model = {
|
||||
"search_method": RetrievalMethod.SEMANTIC_SEARCH.value,
|
||||
"reranking_enable": False,
|
||||
"reranking_model": {"reranking_provider_name": "", "reranking_model_name": ""},
|
||||
"top_k": 2,
|
||||
"score_threshold_enabled": False,
|
||||
}
|
||||
retrieval_model = RetrievalModel(**default_retrieval_model)
|
||||
retrieval_model = RetrievalModel(
|
||||
search_method=RetrievalMethod.SEMANTIC_SEARCH.value,
|
||||
reranking_enable=False,
|
||||
reranking_model=RerankingModel(reranking_provider_name="", reranking_model_name=""),
|
||||
top_k=2,
|
||||
score_threshold_enabled=False,
|
||||
)
|
||||
# save dataset
|
||||
dataset = Dataset(
|
||||
tenant_id=tenant_id,
|
||||
@ -1557,12 +1577,12 @@ class SegmentService:
|
||||
raise ValueError("Can't update disabled segment")
|
||||
try:
|
||||
word_count_change = segment.word_count
|
||||
content = args.content
|
||||
content = args.content or segment.content
|
||||
if segment.content == content:
|
||||
segment.word_count = len(content)
|
||||
if document.doc_form == "qa_model":
|
||||
segment.answer = args.answer
|
||||
segment.word_count += len(args.answer)
|
||||
segment.word_count += len(args.answer) if args.answer else 0
|
||||
word_count_change = segment.word_count - word_count_change
|
||||
if args.keywords:
|
||||
segment.keywords = args.keywords
|
||||
@ -1577,7 +1597,12 @@ class SegmentService:
|
||||
db.session.add(document)
|
||||
# update segment index task
|
||||
if args.enabled:
|
||||
VectorService.create_segments_vector([args.keywords], [segment], dataset)
|
||||
VectorService.create_segments_vector(
|
||||
[args.keywords] if args.keywords else None,
|
||||
[segment],
|
||||
dataset,
|
||||
document.doc_form,
|
||||
)
|
||||
if document.doc_form == IndexType.PARENT_CHILD_INDEX and args.regenerate_child_chunks:
|
||||
# regenerate child chunks
|
||||
# get embedding model instance
|
||||
@ -1605,6 +1630,8 @@ class SegmentService:
|
||||
.filter(DatasetProcessRule.id == document.dataset_process_rule_id)
|
||||
.first()
|
||||
)
|
||||
if not processing_rule:
|
||||
raise ValueError("No processing rule found.")
|
||||
VectorService.generate_child_chunks(
|
||||
segment, document, dataset, embedding_model_instance, processing_rule, True
|
||||
)
|
||||
@ -1639,7 +1666,7 @@ class SegmentService:
|
||||
segment.disabled_by = None
|
||||
if document.doc_form == "qa_model":
|
||||
segment.answer = args.answer
|
||||
segment.word_count += len(args.answer)
|
||||
segment.word_count += len(args.answer) if args.answer else 0
|
||||
word_count_change = segment.word_count - word_count_change
|
||||
# update document word count
|
||||
if word_count_change != 0:
|
||||
@ -1673,6 +1700,8 @@ class SegmentService:
|
||||
.filter(DatasetProcessRule.id == document.dataset_process_rule_id)
|
||||
.first()
|
||||
)
|
||||
if not processing_rule:
|
||||
raise ValueError("No processing rule found.")
|
||||
VectorService.generate_child_chunks(
|
||||
segment, document, dataset, embedding_model_instance, processing_rule, True
|
||||
)
|
||||
|
||||
@ -97,7 +97,7 @@ class KnowledgeConfig(BaseModel):
|
||||
original_document_id: Optional[str] = None
|
||||
duplicate: bool = True
|
||||
indexing_technique: Literal["high_quality", "economy"]
|
||||
data_source: Optional[DataSource] = None
|
||||
data_source: DataSource
|
||||
process_rule: Optional[ProcessRule] = None
|
||||
retrieval_model: Optional[RetrievalModel] = None
|
||||
doc_form: str = "text_model"
|
||||
|
||||
@ -69,7 +69,7 @@ class HitTestingService:
|
||||
db.session.add(dataset_query)
|
||||
db.session.commit()
|
||||
|
||||
return cls.compact_retrieve_response(query, all_documents)
|
||||
return cls.compact_retrieve_response(query, all_documents) # type: ignore
|
||||
|
||||
@classmethod
|
||||
def external_retrieve(
|
||||
|
||||
@ -29,6 +29,8 @@ class VectorService:
|
||||
.filter(DatasetProcessRule.id == document.dataset_process_rule_id)
|
||||
.first()
|
||||
)
|
||||
if not processing_rule:
|
||||
raise ValueError("No processing rule found.")
|
||||
# get embedding model instance
|
||||
if dataset.indexing_technique == "high_quality":
|
||||
# check embedding model setting
|
||||
@ -98,7 +100,7 @@ class VectorService:
|
||||
def generate_child_chunks(
|
||||
cls,
|
||||
segment: DocumentSegment,
|
||||
dataset_document: Document,
|
||||
dataset_document: DatasetDocument,
|
||||
dataset: Dataset,
|
||||
embedding_model_instance: ModelInstance,
|
||||
processing_rule: DatasetProcessRule,
|
||||
@ -130,7 +132,7 @@ class VectorService:
|
||||
doc_language=dataset_document.doc_language,
|
||||
)
|
||||
# save child chunks
|
||||
if len(documents) > 0 and len(documents[0].children) > 0:
|
||||
if documents and documents[0].children:
|
||||
index_processor.load(dataset, documents)
|
||||
|
||||
for position, child_chunk in enumerate(documents[0].children, start=1):
|
||||
|
||||
@ -2,13 +2,14 @@ import json
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any, Optional, cast
|
||||
from typing import Optional, cast
|
||||
from uuid import uuid4
|
||||
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
|
||||
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.variables import Variable
|
||||
from core.workflow.entities.node_entities import NodeRunResult
|
||||
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
|
||||
from core.workflow.errors import WorkflowNodeRunFailedError
|
||||
from core.workflow.nodes import NodeType
|
||||
from core.workflow.nodes.base.entities import BaseNodeData
|
||||
@ -242,29 +243,7 @@ class WorkflowService:
|
||||
raise ValueError("Node run failed with no run result")
|
||||
# single step debug mode error handling return
|
||||
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
|
||||
node_error_args: dict[str, Any] = {
|
||||
"status": WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
"error": node_run_result.error,
|
||||
"inputs": node_run_result.inputs,
|
||||
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
|
||||
}
|
||||
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
|
||||
node_run_result = NodeRunResult(
|
||||
**node_error_args,
|
||||
outputs={
|
||||
**node_instance.node_data.default_value_dict,
|
||||
"error_message": node_run_result.error,
|
||||
"error_type": node_run_result.error_type,
|
||||
},
|
||||
)
|
||||
else:
|
||||
node_run_result = NodeRunResult(
|
||||
**node_error_args,
|
||||
outputs={
|
||||
"error_message": node_run_result.error,
|
||||
"error_type": node_run_result.error_type,
|
||||
},
|
||||
)
|
||||
node_run_result = self._handle_continue_on_error(node_instance, node_run_result)
|
||||
run_succeeded = node_run_result.status in (
|
||||
WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
@ -277,6 +256,7 @@ class WorkflowService:
|
||||
error = e.error
|
||||
|
||||
workflow_node_execution = WorkflowNodeExecution()
|
||||
workflow_node_execution.id = str(uuid4())
|
||||
workflow_node_execution.tenant_id = app_model.tenant_id
|
||||
workflow_node_execution.app_id = app_model.id
|
||||
workflow_node_execution.workflow_id = draft_workflow.id
|
||||
@ -360,3 +340,149 @@ class WorkflowService:
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid app mode: {app_model.mode}")
|
||||
|
||||
def run_retriable_draft_workflow_node(
|
||||
self, app_model: App, node_id: str, user_inputs: dict, account: Account
|
||||
) -> list[WorkflowNodeExecution]:
|
||||
"""
|
||||
Run draft retry workflow node
|
||||
"""
|
||||
# fetch draft workflow by app_model
|
||||
draft_workflow = self.get_draft_workflow(app_model=app_model)
|
||||
if not draft_workflow:
|
||||
raise ValueError("Workflow not initialized")
|
||||
# init retry variables
|
||||
start_at = time.perf_counter()
|
||||
should_retry = True
|
||||
retries = 0
|
||||
max_retries = 0
|
||||
retry_interval = 0.0
|
||||
list_node_executions = []
|
||||
|
||||
while retries <= max_retries and should_retry:
|
||||
reties_start_at = time.perf_counter()
|
||||
should_retry = False
|
||||
try:
|
||||
# run draft workflow node
|
||||
node_instance, generator = WorkflowEntry.single_step_run(
|
||||
workflow=draft_workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=user_inputs,
|
||||
user_id=account.id,
|
||||
)
|
||||
node_instance = cast(BaseNode[BaseNodeData], node_instance)
|
||||
max_retries = node_instance.node_data.retry_config.max_retries
|
||||
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
|
||||
node_run_result: NodeRunResult | None = None
|
||||
for event in generator:
|
||||
if isinstance(event, RunCompletedEvent):
|
||||
node_run_result = event.run_result
|
||||
|
||||
# sign output files
|
||||
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
|
||||
break
|
||||
|
||||
if not node_run_result:
|
||||
raise ValueError("Node run failed with no run result")
|
||||
# single step debug mode error handling return
|
||||
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
||||
if node_instance.should_retry and retries < max_retries:
|
||||
retries += 1
|
||||
should_retry = True
|
||||
node_run_result.status = WorkflowNodeExecutionStatus.RETRY
|
||||
|
||||
elif node_instance.should_continue_on_error:
|
||||
node_run_result = self._handle_continue_on_error(node_instance, node_run_result)
|
||||
|
||||
elif node_instance.node_type == NodeType.HTTP_REQUEST and node_run_result.outputs:
|
||||
node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
|
||||
error = node_run_result.error or None
|
||||
except WorkflowNodeRunFailedError as e:
|
||||
node_instance = e.node_instance
|
||||
node_run_result = None
|
||||
error = e.error
|
||||
|
||||
start_at = (
|
||||
reties_start_at
|
||||
if not node_run_result or node_run_result.status == WorkflowNodeExecutionStatus.RETRY
|
||||
else start_at
|
||||
)
|
||||
|
||||
workflow_node_execution = WorkflowNodeExecution()
|
||||
workflow_node_execution.tenant_id = app_model.tenant_id
|
||||
workflow_node_execution.app_id = app_model.id
|
||||
workflow_node_execution.workflow_id = draft_workflow.id
|
||||
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
|
||||
workflow_node_execution.index = 1
|
||||
workflow_node_execution.node_id = node_id
|
||||
workflow_node_execution.node_type = node_instance.node_type
|
||||
workflow_node_execution.title = node_instance.node_data.title
|
||||
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
|
||||
workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
|
||||
workflow_node_execution.created_by = account.id
|
||||
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
|
||||
workflow_node_execution.error = error or None
|
||||
|
||||
if node_run_result:
|
||||
# create workflow node execution
|
||||
inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
|
||||
process_data = (
|
||||
WorkflowEntry.handle_special_values(node_run_result.process_data)
|
||||
if node_run_result.process_data
|
||||
else None
|
||||
)
|
||||
outputs = (
|
||||
WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
|
||||
)
|
||||
workflow_node_execution.status = node_run_result.status.value
|
||||
workflow_node_execution.inputs = json.dumps(inputs)
|
||||
workflow_node_execution.process_data = json.dumps(process_data)
|
||||
workflow_node_execution.outputs = json.dumps(outputs)
|
||||
workflow_node_execution.execution_metadata = (
|
||||
json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
|
||||
)
|
||||
|
||||
db.session.add(workflow_node_execution)
|
||||
list_node_executions.append(workflow_node_execution)
|
||||
db.session.commit()
|
||||
if should_retry and retry_interval:
|
||||
time.sleep(retry_interval)
|
||||
|
||||
return list_node_executions
|
||||
|
||||
def _handle_continue_on_error(
|
||||
self, node_instance: BaseNode[BaseNodeData], node_run_result: NodeRunResult
|
||||
) -> NodeRunResult:
|
||||
node_error_args = {
|
||||
"status": WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
"error": node_run_result.error,
|
||||
"inputs": node_run_result.inputs,
|
||||
"metadata": {NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
|
||||
}
|
||||
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
|
||||
node_run_result = NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
error=node_run_result.error,
|
||||
inputs=node_run_result.inputs,
|
||||
metadata={NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
|
||||
outputs={
|
||||
**node_instance.node_data.default_value_dict,
|
||||
"error_message": node_run_result.error,
|
||||
"error_type": node_run_result.error_type,
|
||||
},
|
||||
)
|
||||
else:
|
||||
node_run_result = NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
error=node_run_result.error,
|
||||
inputs=node_run_result.inputs,
|
||||
metadata={NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
|
||||
outputs={
|
||||
"error_message": node_run_result.error,
|
||||
"error_type": node_run_result.error_type,
|
||||
},
|
||||
)
|
||||
return node_run_result
|
||||
|
||||
@ -2,7 +2,7 @@ import logging
|
||||
import time
|
||||
|
||||
import click
|
||||
from celery import shared_task
|
||||
from celery import shared_task # type: ignore
|
||||
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.tools.utils.web_reader_tool import get_image_upload_file_ids
|
||||
@ -44,7 +44,8 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
|
||||
for upload_file_id in image_upload_file_ids:
|
||||
image_file = db.session.query(UploadFile).filter(UploadFile.id == upload_file_id).first()
|
||||
try:
|
||||
storage.delete(image_file.key)
|
||||
if image_file and image_file.key:
|
||||
storage.delete(image_file.key)
|
||||
except Exception:
|
||||
logging.exception(
|
||||
"Delete image_files failed when storage deleted, \
|
||||
|
||||
@ -2,7 +2,7 @@ import logging
|
||||
import time
|
||||
|
||||
import click
|
||||
from celery import shared_task
|
||||
from celery import shared_task # type: ignore
|
||||
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from extensions.ext_database import db
|
||||
|
||||
@ -3,7 +3,7 @@ import logging
|
||||
import time
|
||||
|
||||
import click
|
||||
from celery import shared_task
|
||||
from celery import shared_task # type: ignore
|
||||
|
||||
from core.rag.index_processor.constant.index_type import IndexType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
|
||||
@ -147,7 +147,7 @@ const Apps = ({
|
||||
if (onSuccess)
|
||||
onSuccess()
|
||||
localStorage.setItem(NEED_REFRESH_APP_LIST_KEY, '1')
|
||||
getRedirection(isCurrentWorkspaceEditor, app, push)
|
||||
getRedirection(isCurrentWorkspaceEditor, { id: app.app_id }, push)
|
||||
}
|
||||
catch (e) {
|
||||
Toast.notify({ type: 'error', message: t('app.newApp.appCreateFailed') })
|
||||
|
||||
Reference in New Issue
Block a user