mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 08:58:09 +08:00
Merge commit '9c339239' into sandboxed-agent-rebase
Made-with: Cursor # Conflicts: # api/README.md # api/controllers/console/app/workflow_draft_variable.py # api/core/agent/cot_agent_runner.py # api/core/agent/fc_agent_runner.py # api/core/app/apps/advanced_chat/app_runner.py # api/core/plugin/backwards_invocation/model.py # api/core/prompt/advanced_prompt_transform.py # api/core/workflow/nodes/base/node.py # api/core/workflow/nodes/llm/llm_utils.py # api/core/workflow/nodes/llm/node.py # api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py # api/core/workflow/nodes/question_classifier/question_classifier_node.py # api/core/workflow/runtime/graph_runtime_state.py # api/extensions/storage/base_storage.py # api/factories/variable_factory.py # api/pyproject.toml # api/services/variable_truncator.py # api/uv.lock # web/app/account/oauth/authorize/page.tsx # web/app/components/app/configuration/config-var/config-modal/field.tsx # web/app/components/base/alert.tsx # web/app/components/base/chat/chat/answer/human-input-content/executed-action.tsx # web/app/components/base/chat/chat/answer/more.tsx # web/app/components/base/chat/chat/answer/operation.tsx # web/app/components/base/chat/chat/answer/workflow-process.tsx # web/app/components/base/chat/chat/citation/index.tsx # web/app/components/base/chat/chat/citation/popup.tsx # web/app/components/base/chat/chat/citation/progress-tooltip.tsx # web/app/components/base/chat/chat/citation/tooltip.tsx # web/app/components/base/chat/chat/question.tsx # web/app/components/base/chat/embedded-chatbot/inputs-form/index.tsx # web/app/components/base/chat/embedded-chatbot/inputs-form/view-form-dropdown.tsx # web/app/components/base/markdown-blocks/form.tsx # web/app/components/base/prompt-editor/plugins/hitl-input-block/component-ui.tsx # web/app/components/base/tag-management/panel.tsx # web/app/components/base/tag-management/trigger.tsx # web/app/components/header/account-setting/index.tsx # web/app/components/header/account-setting/members-page/transfer-ownership-modal/index.tsx # web/app/components/header/account-setting/model-provider-page/provider-added-card/index.tsx # web/app/signin/utils/post-login-redirect.ts # web/eslint-suppressions.json # web/package.json # web/pnpm-lock.yaml
This commit is contained in:
@ -1,9 +1,11 @@
|
||||
"""Workflow-level GraphEngine layers that depend on outer infrastructure."""
|
||||
|
||||
from .llm_quota import LLMQuotaLayer
|
||||
from .observability import ObservabilityLayer
|
||||
from .persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
|
||||
__all__ = [
|
||||
"LLMQuotaLayer",
|
||||
"ObservabilityLayer",
|
||||
"PersistenceWorkflowInfo",
|
||||
"WorkflowPersistenceLayer",
|
||||
|
||||
128
api/core/app/workflow/layers/llm_quota.py
Normal file
128
api/core/app/workflow/layers/llm_quota.py
Normal file
@ -0,0 +1,128 @@
|
||||
"""
|
||||
LLM quota deduction layer for GraphEngine.
|
||||
|
||||
This layer centralizes model-quota deduction outside node implementations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, cast, final
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available
|
||||
from core.errors.error import QuotaExceededError
|
||||
from core.model_manager import ModelInstance
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase
|
||||
from core.workflow.graph_events.node import NodeRunSucceededEvent
|
||||
from core.workflow.nodes.base.node import Node
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.workflow.nodes.llm.node import LLMNode
|
||||
from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode
|
||||
from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@final
|
||||
class LLMQuotaLayer(GraphEngineLayer):
|
||||
"""Graph layer that applies LLM quota deduction after node execution."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._abort_sent = False
|
||||
|
||||
@override
|
||||
def on_graph_start(self) -> None:
|
||||
self._abort_sent = False
|
||||
|
||||
@override
|
||||
def on_event(self, event: GraphEngineEvent) -> None:
|
||||
_ = event
|
||||
|
||||
@override
|
||||
def on_graph_end(self, error: Exception | None) -> None:
|
||||
_ = error
|
||||
|
||||
@override
|
||||
def on_node_run_start(self, node: Node) -> None:
|
||||
if self._abort_sent:
|
||||
return
|
||||
|
||||
model_instance = self._extract_model_instance(node)
|
||||
if model_instance is None:
|
||||
return
|
||||
|
||||
try:
|
||||
ensure_llm_quota_available(model_instance=model_instance)
|
||||
except QuotaExceededError as exc:
|
||||
self._set_stop_event(node)
|
||||
self._send_abort_command(reason=str(exc))
|
||||
logger.warning("LLM quota check failed, node_id=%s, error=%s", node.id, exc)
|
||||
|
||||
@override
|
||||
def on_node_run_end(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
if error is not None or not isinstance(result_event, NodeRunSucceededEvent):
|
||||
return
|
||||
|
||||
model_instance = self._extract_model_instance(node)
|
||||
if model_instance is None:
|
||||
return
|
||||
|
||||
try:
|
||||
deduct_llm_quota(
|
||||
tenant_id=node.tenant_id,
|
||||
model_instance=model_instance,
|
||||
usage=result_event.node_run_result.llm_usage,
|
||||
)
|
||||
except QuotaExceededError as exc:
|
||||
self._set_stop_event(node)
|
||||
self._send_abort_command(reason=str(exc))
|
||||
logger.warning("LLM quota deduction exceeded, node_id=%s, error=%s", node.id, exc)
|
||||
except Exception:
|
||||
logger.exception("LLM quota deduction failed, node_id=%s", node.id)
|
||||
|
||||
@staticmethod
|
||||
def _set_stop_event(node: Node) -> None:
|
||||
stop_event = getattr(node.graph_runtime_state, "stop_event", None)
|
||||
if stop_event is not None:
|
||||
stop_event.set()
|
||||
|
||||
def _send_abort_command(self, *, reason: str) -> None:
|
||||
if not self.command_channel or self._abort_sent:
|
||||
return
|
||||
|
||||
try:
|
||||
self.command_channel.send_command(
|
||||
AbortCommand(
|
||||
command_type=CommandType.ABORT,
|
||||
reason=reason,
|
||||
)
|
||||
)
|
||||
self._abort_sent = True
|
||||
except Exception:
|
||||
logger.exception("Failed to send quota abort command")
|
||||
|
||||
@staticmethod
|
||||
def _extract_model_instance(node: Node) -> ModelInstance | None:
|
||||
try:
|
||||
match node.node_type:
|
||||
case NodeType.LLM:
|
||||
return cast("LLMNode", node).model_instance
|
||||
case NodeType.PARAMETER_EXTRACTOR:
|
||||
return cast("ParameterExtractorNode", node).model_instance
|
||||
case NodeType.QUESTION_CLASSIFIER:
|
||||
return cast("QuestionClassifierNode", node).model_instance
|
||||
case _:
|
||||
return None
|
||||
except AttributeError:
|
||||
logger.warning(
|
||||
"LLMQuotaLayer skipped quota deduction because node does not expose a model instance, node_id=%s",
|
||||
node.id,
|
||||
)
|
||||
return None
|
||||
Reference in New Issue
Block a user