mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 17:08:03 +08:00
refactor(api): rename celery queue for workflow & chatflow execution
Rename the celery queue name from `chatflow_execute` to `workflow_base_app_execution` to better reflect it is used by workflow based applications, not only chatflow.
This commit is contained in:
@ -22,7 +22,7 @@ from libs.exception import BaseHTTPException
|
||||
from models.human_input import RecipientType
|
||||
from models.model import App, AppMode
|
||||
from repositories.factory import DifyAPIRepositoryFactory
|
||||
from tasks.app_generate.workflow_execute_task import APP_EXECUTE_QUEUE, resume_app_execution
|
||||
from tasks.app_generate.workflow_execute_task import WORKFLOW_BASED_APP_EXECUTION_QUEUE, resume_app_execution
|
||||
|
||||
|
||||
class Form:
|
||||
@ -230,7 +230,7 @@ class HumanInputService:
|
||||
try:
|
||||
resume_app_execution.apply_async(
|
||||
kwargs={"payload": payload},
|
||||
queue=APP_EXECUTE_QUEUE,
|
||||
queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE,
|
||||
)
|
||||
except Exception: # pragma: no cover
|
||||
logger.exception("Failed to enqueue resume task for workflow run %s", workflow_run_id)
|
||||
|
||||
@ -32,7 +32,7 @@ from repositories.factory import DifyAPIRepositoryFactory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
APP_EXECUTE_QUEUE = "chatflow_execute"
|
||||
WORKFLOW_BASED_APP_EXECUTION_QUEUE = "workflow_base_app_execution"
|
||||
|
||||
|
||||
class _UserType(StrEnum):
|
||||
@ -267,7 +267,7 @@ def _publish_streaming_response(response_stream: Iterable[Any], workflow_run_id:
|
||||
topic.publish(payload.encode())
|
||||
|
||||
|
||||
@shared_task(queue=APP_EXECUTE_QUEUE)
|
||||
@shared_task(queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE)
|
||||
def chatflow_execute_task(payload: str) -> Mapping[str, Any] | None:
|
||||
exec_params = AppExecutionParams.model_validate_json(payload)
|
||||
|
||||
@ -511,11 +511,11 @@ def _resume_workflow(
|
||||
workflow_run_repo.delete_workflow_pause(pause_entity)
|
||||
|
||||
|
||||
@shared_task(queue=APP_EXECUTE_QUEUE, name="resume_app_execution")
|
||||
@shared_task(queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE, name="resume_app_execution")
|
||||
def resume_app_execution(payload: dict[str, Any]) -> None:
|
||||
_resume_app_execution(payload)
|
||||
|
||||
|
||||
@shared_task(queue=APP_EXECUTE_QUEUE, name="resume_chatflow_execution")
|
||||
@shared_task(queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE, name="resume_chatflow_execution")
|
||||
def resume_chatflow_execution(payload: dict[str, Any]) -> None:
|
||||
_resume_app_execution(payload)
|
||||
|
||||
Reference in New Issue
Block a user