mirror of
https://github.com/langgenius/dify.git
synced 2026-05-27 20:36:18 +08:00
Compare commits
4 Commits
feat/add-a
...
refactor/c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6be223ac3f | |||
| 13ac79780e | |||
| 69b6a4ca5a | |||
| 5e9d9f091e |
@ -38,8 +38,6 @@ from clients.agent_backend.request_builder import (
|
||||
AgentBackendOutputConfig,
|
||||
AgentBackendRunRequestBuilder,
|
||||
AgentBackendWorkflowNodeRunInput,
|
||||
CleanupLayerSpec,
|
||||
extract_cleanup_layer_specs,
|
||||
redact_for_agent_backend_log,
|
||||
)
|
||||
|
||||
@ -70,11 +68,9 @@ __all__ = [
|
||||
"AgentBackendTransportError",
|
||||
"AgentBackendValidationError",
|
||||
"AgentBackendWorkflowNodeRunInput",
|
||||
"CleanupLayerSpec",
|
||||
"DifyAgentBackendRunClient",
|
||||
"FakeAgentBackendRunClient",
|
||||
"FakeAgentBackendScenario",
|
||||
"create_agent_backend_run_client",
|
||||
"extract_cleanup_layer_specs",
|
||||
"redact_for_agent_backend_log",
|
||||
]
|
||||
|
||||
@ -20,8 +20,6 @@ from dify_agent.protocol import (
|
||||
RunEvent,
|
||||
RunFailedEvent,
|
||||
RunFailedEventData,
|
||||
RunPausedEvent,
|
||||
RunPausedEventData,
|
||||
RunStartedEvent,
|
||||
RunStatusResponse,
|
||||
RunSucceededEvent,
|
||||
@ -36,7 +34,6 @@ class FakeAgentBackendScenario(StrEnum):
|
||||
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
PAUSED = "paused"
|
||||
|
||||
|
||||
class FakeAgentBackendRunClient:
|
||||
@ -92,13 +89,6 @@ class FakeAgentBackendRunClient:
|
||||
updated_at=_FIXED_TIME,
|
||||
error="fake failure",
|
||||
)
|
||||
case FakeAgentBackendScenario.PAUSED:
|
||||
return RunStatusResponse(
|
||||
run_id=run_id,
|
||||
status="paused",
|
||||
created_at=_FIXED_TIME,
|
||||
updated_at=_FIXED_TIME,
|
||||
)
|
||||
|
||||
def _events(self, run_id: str) -> tuple[RunEvent, ...]:
|
||||
match self.scenario:
|
||||
@ -125,17 +115,3 @@ class FakeAgentBackendRunClient:
|
||||
data=RunFailedEventData(error="fake failure", reason="unit_test"),
|
||||
),
|
||||
)
|
||||
case FakeAgentBackendScenario.PAUSED:
|
||||
return (
|
||||
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
|
||||
RunPausedEvent(
|
||||
id="2-0",
|
||||
run_id=run_id,
|
||||
created_at=_FIXED_TIME,
|
||||
data=RunPausedEventData(
|
||||
reason="human_input_required",
|
||||
message="Agent requested human input.",
|
||||
session_snapshot=CompositorSessionSnapshot(layers=[]),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
@ -11,13 +11,11 @@ composition-driven.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import ClassVar, cast
|
||||
from typing import ClassVar
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from agenton.compositor.schemas import LayerSessionSnapshot
|
||||
from agenton.layers import ExitIntent
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
@ -31,7 +29,6 @@ from dify_agent.layers.execution_context import (
|
||||
)
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
from dify_agent.protocol import (
|
||||
DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
DIFY_AGENT_MODEL_LAYER_ID,
|
||||
DIFY_AGENT_OUTPUT_LAYER_ID,
|
||||
CreateRunRequest,
|
||||
@ -48,84 +45,6 @@ WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
|
||||
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
|
||||
|
||||
# Layer types that hold credentials in their per-run config. These are excluded
|
||||
# from the cleanup-replay composition (and from the snapshot that is sent with
|
||||
# the cleanup request) because we deliberately do not persist plaintext
|
||||
# credentials between runs.
|
||||
_CLEANUP_EXCLUDED_LAYER_TYPES: tuple[str, ...] = (
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
)
|
||||
|
||||
|
||||
class CleanupLayerSpec(BaseModel):
|
||||
"""One layer node replayed by an Agent backend cleanup-only run.
|
||||
|
||||
Cleanup composition cannot include credential-bearing plugin layers, so we
|
||||
persist only the non-plugin layer specs together with the original config.
|
||||
Storing the config (rather than just ``name``/``type``) means cleanup does
|
||||
not depend on the original build-time inputs being re-derivable.
|
||||
"""
|
||||
|
||||
name: str
|
||||
type: str
|
||||
deps: dict[str, str] = Field(default_factory=dict)
|
||||
metadata: dict[str, JsonValue] = Field(default_factory=dict)
|
||||
config: JsonValue = None
|
||||
|
||||
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
||||
|
||||
|
||||
def extract_cleanup_layer_specs(composition: RunComposition) -> list[CleanupLayerSpec]:
|
||||
"""Project the in-flight composition into the persistable cleanup spec list.
|
||||
|
||||
Plugin layers are intentionally dropped (their configs hold credentials and
|
||||
the lifecycle contract says "do not include an LLM layer" during cleanup).
|
||||
The filtered names must later drive snapshot filtering so the agenton
|
||||
compositor's name-order check still passes for the cleanup run.
|
||||
"""
|
||||
excluded = set(_CLEANUP_EXCLUDED_LAYER_TYPES)
|
||||
specs: list[CleanupLayerSpec] = []
|
||||
for layer in composition.layers:
|
||||
if layer.type in excluded:
|
||||
continue
|
||||
config_value: JsonValue = None
|
||||
if isinstance(layer.config, BaseModel):
|
||||
config_value = layer.config.model_dump(mode="json", warnings=False)
|
||||
else:
|
||||
# ``RunLayerSpec.config`` is typed as ``LayerConfigInput`` which
|
||||
# includes ``Mapping[str, object] | bytes``. In the cleanup-replay
|
||||
# pipeline our builder only emits BaseModel-derived configs or
|
||||
# ``None``, so the wider input alias narrows safely here.
|
||||
config_value = cast(JsonValue, layer.config)
|
||||
specs.append(
|
||||
CleanupLayerSpec(
|
||||
name=layer.name,
|
||||
type=layer.type,
|
||||
deps=dict(layer.deps),
|
||||
metadata=dict(layer.metadata),
|
||||
config=config_value,
|
||||
)
|
||||
)
|
||||
return specs
|
||||
|
||||
|
||||
def _filter_snapshot_to_specs(
|
||||
snapshot: CompositorSessionSnapshot,
|
||||
specs: list[CleanupLayerSpec],
|
||||
) -> CompositorSessionSnapshot:
|
||||
"""Keep only snapshot layers whose names appear in the cleanup spec list.
|
||||
|
||||
The agenton compositor rejects a snapshot whose layer-name sequence does
|
||||
not match the active composition exactly. Cleanup-replay drops plugin
|
||||
layers, so we must drop the matching snapshot entries here.
|
||||
"""
|
||||
kept_names = {spec.name for spec in specs}
|
||||
filtered_layers: list[LayerSessionSnapshot] = [layer for layer in snapshot.layers if layer.name in kept_names]
|
||||
if len(filtered_layers) == len(snapshot.layers):
|
||||
return snapshot
|
||||
return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers)
|
||||
|
||||
|
||||
class AgentBackendModelConfig(BaseModel):
|
||||
"""API-side model/plugin selection before it is converted to Dify Agent layers."""
|
||||
@ -167,8 +86,7 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
|
||||
output: AgentBackendOutputConfig | None = None
|
||||
tools: DifyPluginToolsLayerConfig | None = None
|
||||
session_snapshot: CompositorSessionSnapshot | None = None
|
||||
include_history: bool = True
|
||||
suspend_on_exit: bool = True
|
||||
suspend_on_exit: bool = False
|
||||
metadata: dict[str, JsonValue] = Field(default_factory=dict)
|
||||
|
||||
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
|
||||
@ -184,50 +102,6 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
|
||||
class AgentBackendRunRequestBuilder:
|
||||
"""Converts API product state into the public ``dify-agent`` run protocol."""
|
||||
|
||||
def build_cleanup_request(
|
||||
self,
|
||||
*,
|
||||
session_snapshot: CompositorSessionSnapshot,
|
||||
composition_layer_specs: list[CleanupLayerSpec],
|
||||
idempotency_key: str | None = None,
|
||||
metadata: dict[str, JsonValue] | None = None,
|
||||
) -> CreateRunRequest:
|
||||
"""Build a lifecycle-only cleanup request that replays the prior layers.
|
||||
|
||||
The agenton compositor enforces that the session snapshot's layer names
|
||||
match the active composition in order, so cleanup must replay the same
|
||||
non-plugin layer graph that produced the snapshot. Plugin layers
|
||||
(``dify.plugin.llm``, ``dify.plugin.tools``) are excluded from both the
|
||||
composition and the snapshot before submission because their configs
|
||||
require credentials that are not persisted between runs.
|
||||
"""
|
||||
if not composition_layer_specs:
|
||||
raise ValueError(
|
||||
"build_cleanup_request requires composition_layer_specs; an empty "
|
||||
"composition would fail the agent backend's snapshot validation."
|
||||
)
|
||||
request_metadata = dict(metadata or {})
|
||||
request_metadata["agent_backend_lifecycle"] = "session_cleanup"
|
||||
layers = [
|
||||
RunLayerSpec(
|
||||
name=spec.name,
|
||||
type=spec.type,
|
||||
deps=dict(spec.deps),
|
||||
metadata=dict(spec.metadata),
|
||||
config=spec.config,
|
||||
)
|
||||
for spec in composition_layer_specs
|
||||
]
|
||||
filtered_snapshot = _filter_snapshot_to_specs(session_snapshot, composition_layer_specs)
|
||||
return CreateRunRequest(
|
||||
composition=RunComposition(layers=layers),
|
||||
purpose="workflow_node",
|
||||
idempotency_key=idempotency_key,
|
||||
metadata=request_metadata,
|
||||
session_snapshot=filtered_snapshot,
|
||||
on_exit=LayerExitSignals(default=ExitIntent.DELETE),
|
||||
)
|
||||
|
||||
def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest:
|
||||
"""Build a workflow Agent Node run request without defining another wire schema."""
|
||||
layers: list[RunLayerSpec] = []
|
||||
@ -261,20 +135,6 @@ class AgentBackendRunRequestBuilder:
|
||||
metadata=run_input.metadata,
|
||||
config=run_input.execution_context,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
if run_input.include_history:
|
||||
layers.append(
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
metadata={**run_input.metadata, "origin": "agent_session_history"},
|
||||
)
|
||||
)
|
||||
|
||||
layers.extend(
|
||||
[
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_MODEL_LAYER_ID,
|
||||
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
|
||||
@ -27,7 +27,6 @@ from core.moderation.base import ModerationError
|
||||
from core.moderation.input_moderation import InputModeration
|
||||
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
|
||||
from core.workflow.node_factory import get_default_root_node_id
|
||||
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
|
||||
from core.workflow.system_variables import (
|
||||
build_bootstrap_variables,
|
||||
build_system_variables,
|
||||
@ -240,7 +239,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
)
|
||||
|
||||
workflow_entry.graph_engine.layer(persistence_layer)
|
||||
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
|
||||
conversation_variable_layer = ConversationVariablePersistenceLayer(
|
||||
ConversationVariableUpdater(session_factory.get_session_maker())
|
||||
)
|
||||
|
||||
@ -10,7 +10,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
|
||||
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
|
||||
from core.workflow.node_factory import get_default_root_node_id
|
||||
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
|
||||
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
|
||||
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
@ -167,7 +166,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
)
|
||||
|
||||
workflow_entry.graph_engine.layer(persistence_layer)
|
||||
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
|
||||
for layer in self._graph_engine_layers:
|
||||
workflow_entry.graph_engine.layer(layer)
|
||||
|
||||
|
||||
@ -475,7 +475,6 @@ class DifyNodeFactory(NodeFactory):
|
||||
from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator
|
||||
from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator
|
||||
from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker
|
||||
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore
|
||||
|
||||
return {
|
||||
"binding_resolver": WorkflowAgentBindingResolver(),
|
||||
@ -495,7 +494,6 @@ class DifyNodeFactory(NodeFactory):
|
||||
# outputs contain no file refs.
|
||||
"type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()),
|
||||
"failure_orchestrator": OutputFailureOrchestrator(),
|
||||
"session_store": WorkflowAgentRuntimeSessionStore(),
|
||||
}
|
||||
return {
|
||||
"strategy_resolver": self._agent_strategy_resolver,
|
||||
|
||||
@ -1,11 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
|
||||
from clients.agent_backend import (
|
||||
AgentBackendError,
|
||||
AgentBackendHTTPError,
|
||||
@ -20,14 +17,11 @@ from clients.agent_backend import (
|
||||
AgentBackendStreamInternalEvent,
|
||||
AgentBackendTransportError,
|
||||
AgentBackendValidationError,
|
||||
CleanupLayerSpec,
|
||||
extract_cleanup_layer_specs,
|
||||
)
|
||||
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
|
||||
from core.workflow.system_variables import SystemVariableKey, get_system_text
|
||||
from graphon.entities.pause_reason import SchedulingPause
|
||||
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent
|
||||
from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
|
||||
from graphon.nodes.base.node import Node
|
||||
from models.agent_config_entities import WorkflowNodeJobConfig
|
||||
|
||||
@ -46,14 +40,11 @@ from .runtime_request_builder import (
|
||||
WorkflowAgentRuntimeRequestBuilder,
|
||||
WorkflowAgentRuntimeRequestBuildError,
|
||||
)
|
||||
from .session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from graphon.entities import GraphInitParams
|
||||
from graphon.runtime import GraphRuntimeState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return.
|
||||
# Stream + started events are filtered out before we yield; transport errors
|
||||
@ -83,7 +74,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
output_adapter: WorkflowAgentOutputAdapter,
|
||||
type_checker: PerOutputTypeChecker,
|
||||
failure_orchestrator: OutputFailureOrchestrator,
|
||||
session_store: WorkflowAgentRuntimeSessionStore | None = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
node_id=node_id,
|
||||
@ -98,7 +88,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
self._output_adapter = output_adapter
|
||||
self._type_checker = type_checker
|
||||
self._failure_orchestrator = failure_orchestrator
|
||||
self._session_store = session_store
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
@ -145,17 +134,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
"agent_config_snapshot_id": bundle.snapshot.id,
|
||||
"binding_id": bundle.binding.id,
|
||||
}
|
||||
session_scope = WorkflowAgentSessionScope(
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
app_id=dify_ctx.app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=self._node_id,
|
||||
node_execution_id=self.id,
|
||||
binding_id=bundle.binding.id,
|
||||
agent_id=bundle.agent.id,
|
||||
agent_config_snapshot_id=bundle.snapshot.id,
|
||||
)
|
||||
|
||||
# Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both
|
||||
# the backend request and the post-run type check.
|
||||
@ -169,9 +147,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
attempt = 0
|
||||
while True:
|
||||
try:
|
||||
session_snapshot = None
|
||||
if self._session_store is not None:
|
||||
session_snapshot = self._session_store.load_active_snapshot(session_scope)
|
||||
runtime_request = self._runtime_request_builder.build(
|
||||
WorkflowAgentRuntimeBuildContext(
|
||||
dify_context=dify_ctx,
|
||||
@ -184,7 +159,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
agent=bundle.agent,
|
||||
snapshot=bundle.snapshot,
|
||||
attempt=attempt,
|
||||
session_snapshot=session_snapshot,
|
||||
)
|
||||
)
|
||||
except WorkflowAgentRuntimeRequestBuildError as error:
|
||||
@ -247,35 +221,9 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
)
|
||||
return
|
||||
|
||||
if isinstance(terminal_event, AgentBackendRunPausedInternalEvent):
|
||||
self._save_session_snapshot(
|
||||
session_scope=session_scope,
|
||||
backend_run_id=terminal_event.run_id,
|
||||
snapshot=terminal_event.session_snapshot,
|
||||
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
|
||||
metadata=metadata,
|
||||
)
|
||||
yield PauseRequestedEvent(
|
||||
reason=SchedulingPause(
|
||||
message=terminal_event.message
|
||||
or "Agent backend run requested workflow pause for external input."
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# Non-success terminal (failed / cancelled) skips per-output
|
||||
# post-processing — the backend itself already failed. We also retire
|
||||
# the local ACTIVE session row so a workflow loop back into the same
|
||||
# Agent node cannot resume from a stale snapshot. The failed agent
|
||||
# backend layers (suspended per ``on_exit``) are left for agent
|
||||
# backend's own GC; this row will no longer be picked up by the
|
||||
# workflow-terminal cleanup layer.
|
||||
# Non-success terminal (failed / cancelled / paused) skips per-output
|
||||
# post-processing — the backend itself already failed.
|
||||
if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent):
|
||||
self._mark_session_cleaned_on_failure(
|
||||
session_scope=session_scope,
|
||||
backend_run_id=terminal_event.run_id,
|
||||
metadata=metadata,
|
||||
)
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=self._output_adapter.build_failure_result(
|
||||
event=terminal_event,
|
||||
@ -286,14 +234,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
)
|
||||
return
|
||||
|
||||
self._save_session_snapshot(
|
||||
session_scope=session_scope,
|
||||
backend_run_id=terminal_event.run_id,
|
||||
snapshot=terminal_event.session_snapshot,
|
||||
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
# ──── Stage 4: per-output type check ────
|
||||
type_check = self._type_checker.check(
|
||||
declared_outputs=effective_outputs,
|
||||
@ -444,75 +384,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
|
||||
],
|
||||
}
|
||||
|
||||
def _save_session_snapshot(
|
||||
self,
|
||||
*,
|
||||
session_scope: WorkflowAgentSessionScope,
|
||||
backend_run_id: str,
|
||||
snapshot: CompositorSessionSnapshot | None,
|
||||
composition_layer_specs: list[CleanupLayerSpec],
|
||||
metadata: dict[str, Any],
|
||||
) -> None:
|
||||
if self._session_store is None:
|
||||
return
|
||||
try:
|
||||
self._session_store.save_active_snapshot(
|
||||
scope=session_scope,
|
||||
backend_run_id=backend_run_id,
|
||||
snapshot=snapshot,
|
||||
composition_layer_specs=composition_layer_specs,
|
||||
)
|
||||
agent_backend = dict(metadata.get("agent_backend") or {})
|
||||
agent_backend["session_snapshot_persisted"] = snapshot is not None
|
||||
metadata["agent_backend"] = agent_backend
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist workflow Agent runtime session snapshot: "
|
||||
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
|
||||
session_scope.tenant_id,
|
||||
session_scope.workflow_run_id,
|
||||
session_scope.node_id,
|
||||
session_scope.binding_id,
|
||||
session_scope.agent_id,
|
||||
backend_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
agent_backend = dict(metadata.get("agent_backend") or {})
|
||||
agent_backend["session_snapshot_persisted"] = False
|
||||
agent_backend["session_snapshot_persist_error"] = "workflow_agent_runtime_session_store_error"
|
||||
metadata["agent_backend"] = agent_backend
|
||||
|
||||
def _mark_session_cleaned_on_failure(
|
||||
self,
|
||||
*,
|
||||
session_scope: WorkflowAgentSessionScope,
|
||||
backend_run_id: str,
|
||||
metadata: dict[str, Any],
|
||||
) -> None:
|
||||
if self._session_store is None:
|
||||
return
|
||||
try:
|
||||
self._session_store.mark_cleaned(scope=session_scope, backend_run_id=backend_run_id)
|
||||
agent_backend = dict(metadata.get("agent_backend") or {})
|
||||
agent_backend["session_snapshot_cleaned_on_failure"] = True
|
||||
metadata["agent_backend"] = agent_backend
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to mark workflow Agent runtime session cleaned on agent run failure: "
|
||||
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
|
||||
session_scope.tenant_id,
|
||||
session_scope.workflow_run_id,
|
||||
session_scope.node_id,
|
||||
session_scope.binding_id,
|
||||
session_scope.agent_id,
|
||||
backend_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
agent_backend = dict(metadata.get("agent_backend") or {})
|
||||
agent_backend["session_snapshot_cleaned_on_failure"] = False
|
||||
agent_backend["session_snapshot_cleanup_error"] = "workflow_agent_runtime_session_store_error"
|
||||
metadata["agent_backend"] = agent_backend
|
||||
|
||||
@staticmethod
|
||||
def _patch_event_with_defaults(
|
||||
event: AgentBackendRunSucceededInternalEvent,
|
||||
|
||||
@ -4,7 +4,6 @@ from collections.abc import Mapping, Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Literal, Protocol, cast
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
|
||||
from dify_agent.protocol import CreateRunRequest
|
||||
|
||||
@ -29,7 +28,6 @@ from models.agent_config_entities import (
|
||||
from models.agent_config_entities import (
|
||||
effective_declared_outputs as _effective_declared_outputs,
|
||||
)
|
||||
from models.provider_ids import ModelProviderID
|
||||
|
||||
from .output_failure_orchestrator import retry_idempotency_key
|
||||
from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError
|
||||
@ -68,7 +66,6 @@ class WorkflowAgentRuntimeBuildContext:
|
||||
# Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the
|
||||
# idempotency key so the backend treats each retry as a fresh request.
|
||||
attempt: int = 0
|
||||
session_snapshot: CompositorSessionSnapshot | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
@ -132,14 +129,11 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
request = self._request_builder.build_for_workflow_node(
|
||||
AgentBackendWorkflowNodeRunInput(
|
||||
model=AgentBackendModelConfig(
|
||||
plugin_id=self._plugin_daemon_plugin_id(
|
||||
plugin_id=agent_soul.model.plugin_id,
|
||||
model_provider=agent_soul.model.model_provider,
|
||||
),
|
||||
model_provider=self._plugin_daemon_provider_name(agent_soul.model.model_provider),
|
||||
plugin_id=agent_soul.model.plugin_id,
|
||||
model_provider=agent_soul.model.model_provider,
|
||||
model=agent_soul.model.model,
|
||||
credentials=self._normalize_credentials(credentials),
|
||||
model_settings=agent_soul.model.model_settings,
|
||||
model_settings=cast(dict[str, Any], agent_soul.model.model_settings),
|
||||
),
|
||||
# The execution-context layer is now the only public protocol
|
||||
# carrier for Dify tenant/user/run identifiers. ``user_id`` must
|
||||
@ -164,7 +158,6 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
user_prompt=user_prompt,
|
||||
output=self._build_output_config(node_job.declared_outputs),
|
||||
tools=tools_layer,
|
||||
session_snapshot=context.session_snapshot,
|
||||
idempotency_key=self._idempotency_key(context),
|
||||
metadata=metadata,
|
||||
)
|
||||
@ -184,20 +177,6 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
return "single_step"
|
||||
return "workflow_run"
|
||||
|
||||
@staticmethod
|
||||
def _plugin_daemon_plugin_id(*, plugin_id: str, model_provider: str) -> str:
|
||||
"""Return the transport plugin id expected by plugin-daemon headers."""
|
||||
if plugin_id.count("/") == 1:
|
||||
return plugin_id
|
||||
if plugin_id:
|
||||
return ModelProviderID(plugin_id).plugin_id
|
||||
return ModelProviderID(model_provider).plugin_id
|
||||
|
||||
@staticmethod
|
||||
def _plugin_daemon_provider_name(model_provider: str) -> str:
|
||||
"""Return the provider name expected by plugin-daemon dispatch payloads."""
|
||||
return ModelProviderID(model_provider).provider_name
|
||||
|
||||
@staticmethod
|
||||
def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str:
|
||||
# Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so
|
||||
|
||||
@ -1,247 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import override
|
||||
|
||||
from clients.agent_backend import AgentBackendError, AgentBackendRunClient, AgentBackendRunRequestBuilder
|
||||
from clients.agent_backend.factory import create_agent_backend_run_client
|
||||
from configs import dify_config
|
||||
from core.workflow.system_variables import SystemVariableKey, get_system_text
|
||||
from graphon.graph_engine.layers import GraphEngineLayer
|
||||
from graphon.graph_events import (
|
||||
GraphEngineEvent,
|
||||
GraphRunAbortedEvent,
|
||||
GraphRunFailedEvent,
|
||||
GraphRunPartialSucceededEvent,
|
||||
GraphRunSucceededEvent,
|
||||
)
|
||||
|
||||
from .session_store import StoredWorkflowAgentSession, WorkflowAgentRuntimeSessionStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Upper bound on how long a cleanup-only run is allowed to settle before the
|
||||
# layer gives up and leaves the row ACTIVE so it can be retried later. Cleanup
|
||||
# work is mostly local agent-backend bookkeeping (no LLM inference), so 30s is
|
||||
# generous; a hung backend should never block workflow termination beyond this.
|
||||
_CLEANUP_WAIT_TIMEOUT_SECONDS = 30.0
|
||||
|
||||
|
||||
class WorkflowAgentSessionCleanupLayer(GraphEngineLayer):
|
||||
"""Retires workflow Agent session snapshots when a workflow reaches a terminal state.
|
||||
|
||||
Implementation notes — there are two failure modes the cleanup path has to
|
||||
avoid simultaneously:
|
||||
|
||||
1. The agenton compositor on the agent-backend side validates the cleanup
|
||||
request's session snapshot against the replayed composition before
|
||||
running any lifecycle hook. If the snapshot's layer names diverge from
|
||||
the composition, the run fails asynchronously with ``run_failed`` — but
|
||||
the initial ``POST /runs`` already returned 202, so the API side has no
|
||||
visibility of the failure unless it waits for terminal status. The
|
||||
``composition_layer_specs`` persistence in A.1–A.4 plus the
|
||||
``_filter_snapshot_to_specs`` shape in ``build_cleanup_request`` keeps
|
||||
the two name lists in sync.
|
||||
|
||||
2. The current agent backend's ``runner.py::_run_agent`` always invokes
|
||||
``run.get_layer("llm")`` and the structured-output / history validators
|
||||
before exiting any slot — there is no ``purpose: "cleanup"`` branch
|
||||
yet. A truly cleanup-only request (no LLM layer) therefore still
|
||||
crashes inside the runner with ``Layer 'llm' is not defined in this
|
||||
compositor run.``. Until the backend grows a cleanup-only purpose,
|
||||
this layer **does not issue an HTTP cleanup run**: it simply retires
|
||||
the local snapshot row so stale state cannot be re-resumed, and lets
|
||||
the agent backend's own retention TTL release the suspended layers.
|
||||
|
||||
The HTTP-cleanup machinery (``build_cleanup_request`` + ``wait_run``) is
|
||||
intentionally still wired into the request builder + integration tests so
|
||||
that when the agent backend supports cleanup runs we can flip the switch
|
||||
here with a one-line change (see ``_HTTP_CLEANUP_SUPPORTED``).
|
||||
"""
|
||||
|
||||
# Flip to True once dify-agent's runner has a ``purpose=cleanup`` branch
|
||||
# that skips the LLM/output/user-prompt invariants. Until then we only
|
||||
# update the local row; the spec list is still persisted so the future
|
||||
# HTTP cleanup path has everything it needs.
|
||||
_HTTP_CLEANUP_SUPPORTED: bool = False
|
||||
|
||||
_TERMINAL_EVENTS = (
|
||||
GraphRunSucceededEvent,
|
||||
GraphRunPartialSucceededEvent,
|
||||
GraphRunFailedEvent,
|
||||
GraphRunAbortedEvent,
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
session_store: WorkflowAgentRuntimeSessionStore,
|
||||
request_builder: AgentBackendRunRequestBuilder,
|
||||
agent_backend_client: AgentBackendRunClient | None,
|
||||
cleanup_wait_timeout_seconds: float = _CLEANUP_WAIT_TIMEOUT_SECONDS,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._session_store = session_store
|
||||
self._request_builder = request_builder
|
||||
self._agent_backend_client = agent_backend_client
|
||||
self._cleanup_wait_timeout_seconds = cleanup_wait_timeout_seconds
|
||||
|
||||
@override
|
||||
def on_graph_start(self) -> None:
|
||||
return
|
||||
|
||||
@override
|
||||
def on_event(self, event: GraphEngineEvent) -> None:
|
||||
if not isinstance(event, self._TERMINAL_EVENTS):
|
||||
return
|
||||
workflow_run_id = get_system_text(
|
||||
self.graph_runtime_state.variable_pool,
|
||||
SystemVariableKey.WORKFLOW_EXECUTION_ID,
|
||||
)
|
||||
if not workflow_run_id:
|
||||
logger.warning("Skipping workflow Agent session cleanup: workflow_run_id is missing.")
|
||||
return
|
||||
|
||||
for stored_session in self._session_store.list_active_sessions(workflow_run_id=workflow_run_id):
|
||||
self._cleanup_session(stored_session)
|
||||
|
||||
@override
|
||||
def on_graph_end(self, error: Exception | None) -> None:
|
||||
return
|
||||
|
||||
def _cleanup_session(self, stored_session: StoredWorkflowAgentSession) -> None:
|
||||
scope = stored_session.scope
|
||||
if not self._HTTP_CLEANUP_SUPPORTED:
|
||||
# Agent backend has no cleanup-only run mode yet (see class
|
||||
# docstring). Retire the local row so future re-entries do not
|
||||
# resume from stale state, and let the backend's retention TTL
|
||||
# release the suspended layers on its own schedule.
|
||||
logger.info(
|
||||
"Workflow Agent session retired locally; HTTP cleanup is disabled "
|
||||
"until the agent backend supports a cleanup-only run mode. "
|
||||
"workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s previous_run_id=%s",
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.binding_id,
|
||||
scope.agent_id,
|
||||
stored_session.backend_run_id,
|
||||
)
|
||||
self._session_store.mark_cleaned(scope=scope, backend_run_id=stored_session.backend_run_id)
|
||||
return
|
||||
|
||||
if self._agent_backend_client is None:
|
||||
# HTTP cleanup was enabled by the caller but no client was wired
|
||||
# in (e.g. the API runs without AGENT_BACKEND_BASE_URL configured).
|
||||
# Leave the row ACTIVE so an operator restart with proper config
|
||||
# can drive the cleanup; do not silently retire it.
|
||||
logger.warning(
|
||||
"Skipping Agent backend cleanup: HTTP cleanup is enabled but no agent "
|
||||
"backend client is wired in. workflow_run_id=%s node_id=%s agent_id=%s",
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.agent_id,
|
||||
)
|
||||
return
|
||||
|
||||
if not stored_session.composition_layer_specs:
|
||||
# Sessions persisted before A.1 landed do not carry the spec list,
|
||||
# so we cannot replay a valid cleanup composition. Leave the row
|
||||
# ACTIVE and warn so the absence shows up in observability rather
|
||||
# than being silently swallowed by a doomed cleanup run.
|
||||
logger.warning(
|
||||
"Skipping Agent backend cleanup: no composition_layer_specs persisted. "
|
||||
"workflow_run_id=%s node_id=%s agent_id=%s",
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.agent_id,
|
||||
)
|
||||
return
|
||||
|
||||
request = self._request_builder.build_cleanup_request(
|
||||
session_snapshot=stored_session.session_snapshot,
|
||||
composition_layer_specs=stored_session.composition_layer_specs,
|
||||
idempotency_key=f"{scope.workflow_run_id}:{scope.node_id}:{scope.binding_id}:agent-session-cleanup",
|
||||
metadata={
|
||||
"tenant_id": scope.tenant_id,
|
||||
"app_id": scope.app_id,
|
||||
"workflow_id": scope.workflow_id,
|
||||
"workflow_run_id": scope.workflow_run_id,
|
||||
"node_id": scope.node_id,
|
||||
"node_execution_id": scope.node_execution_id,
|
||||
"binding_id": scope.binding_id,
|
||||
"agent_id": scope.agent_id,
|
||||
"agent_config_snapshot_id": scope.agent_config_snapshot_id,
|
||||
"previous_agent_backend_run_id": stored_session.backend_run_id,
|
||||
},
|
||||
)
|
||||
try:
|
||||
response = self._agent_backend_client.create_run(request)
|
||||
except AgentBackendError:
|
||||
logger.warning(
|
||||
"Agent backend session cleanup request failed: workflow_run_id=%s node_id=%s agent_id=%s",
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.agent_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
status_response = self._agent_backend_client.wait_run(
|
||||
response.run_id, timeout_seconds=self._cleanup_wait_timeout_seconds
|
||||
)
|
||||
except AgentBackendError:
|
||||
logger.warning(
|
||||
"Agent backend session cleanup wait_run failed: "
|
||||
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.agent_id,
|
||||
response.run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
if status_response.status != "succeeded":
|
||||
logger.warning(
|
||||
"Agent backend session cleanup did not succeed: status=%s error=%s "
|
||||
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
|
||||
status_response.status,
|
||||
status_response.error,
|
||||
scope.workflow_run_id,
|
||||
scope.node_id,
|
||||
scope.agent_id,
|
||||
response.run_id,
|
||||
)
|
||||
return
|
||||
|
||||
self._session_store.mark_cleaned(scope=scope, backend_run_id=response.run_id)
|
||||
|
||||
|
||||
def build_workflow_agent_session_cleanup_layer() -> WorkflowAgentSessionCleanupLayer:
|
||||
"""Wire the cleanup layer with the standard production dependencies.
|
||||
|
||||
The agent backend client is constructed only when ``AGENT_BACKEND_BASE_URL``
|
||||
is configured (or the deterministic fake is explicitly enabled). When
|
||||
neither is set — for example unit tests that bring up the workflow runner
|
||||
without an Agent node — we pass ``None`` so the layer stays harmless. With
|
||||
``_HTTP_CLEANUP_SUPPORTED = False`` the local-retire branch never touches
|
||||
the client anyway, but keeping it ``None`` avoids importing httpx and lets
|
||||
test harnesses skip backend configuration.
|
||||
"""
|
||||
agent_backend_client: AgentBackendRunClient | None
|
||||
if dify_config.AGENT_BACKEND_USE_FAKE or dify_config.AGENT_BACKEND_BASE_URL:
|
||||
agent_backend_client = create_agent_backend_run_client(
|
||||
base_url=dify_config.AGENT_BACKEND_BASE_URL,
|
||||
use_fake=dify_config.AGENT_BACKEND_USE_FAKE,
|
||||
fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO,
|
||||
)
|
||||
else:
|
||||
agent_backend_client = None
|
||||
|
||||
return WorkflowAgentSessionCleanupLayer(
|
||||
session_store=WorkflowAgentRuntimeSessionStore(),
|
||||
request_builder=AgentBackendRunRequestBuilder(),
|
||||
agent_backend_client=agent_backend_client,
|
||||
)
|
||||
@ -1,179 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from pydantic import TypeAdapter
|
||||
from sqlalchemy import select
|
||||
|
||||
from clients.agent_backend.request_builder import CleanupLayerSpec
|
||||
from core.db.session_factory import session_factory
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.agent import (
|
||||
WorkflowAgentRuntimeSession,
|
||||
WorkflowAgentRuntimeSessionStatus,
|
||||
)
|
||||
|
||||
_SPECS_ADAPTER: TypeAdapter[list[CleanupLayerSpec]] = TypeAdapter(list[CleanupLayerSpec])
|
||||
|
||||
|
||||
def _serialize_specs(specs: list[CleanupLayerSpec]) -> str:
|
||||
return _SPECS_ADAPTER.dump_json(specs).decode()
|
||||
|
||||
|
||||
def _deserialize_specs(value: str | None) -> list[CleanupLayerSpec]:
|
||||
if not value:
|
||||
return []
|
||||
return _SPECS_ADAPTER.validate_json(value)
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class WorkflowAgentSessionScope:
|
||||
tenant_id: str
|
||||
app_id: str
|
||||
workflow_id: str
|
||||
workflow_run_id: str | None
|
||||
node_id: str
|
||||
node_execution_id: str
|
||||
binding_id: str
|
||||
agent_id: str
|
||||
agent_config_snapshot_id: str
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class StoredWorkflowAgentSession:
|
||||
scope: WorkflowAgentSessionScope
|
||||
session_snapshot: CompositorSessionSnapshot
|
||||
backend_run_id: str | None
|
||||
composition_layer_specs: list[CleanupLayerSpec] = field(default_factory=list)
|
||||
|
||||
|
||||
class WorkflowAgentRuntimeSessionStore:
|
||||
"""Stores Agent backend session snapshots for workflow Agent node re-entry."""
|
||||
|
||||
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
|
||||
if scope.workflow_run_id is None:
|
||||
return None
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
row = session.scalar(
|
||||
select(WorkflowAgentRuntimeSession).where(
|
||||
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
|
||||
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
|
||||
WorkflowAgentRuntimeSession.node_id == scope.node_id,
|
||||
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
|
||||
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
|
||||
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
|
||||
)
|
||||
)
|
||||
if row is None:
|
||||
return None
|
||||
return CompositorSessionSnapshot.model_validate_json(row.session_snapshot)
|
||||
|
||||
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
|
||||
with session_factory.create_session() as session:
|
||||
rows = session.scalars(
|
||||
select(WorkflowAgentRuntimeSession).where(
|
||||
WorkflowAgentRuntimeSession.workflow_run_id == workflow_run_id,
|
||||
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
|
||||
)
|
||||
).all()
|
||||
return [
|
||||
StoredWorkflowAgentSession(
|
||||
scope=WorkflowAgentSessionScope(
|
||||
tenant_id=row.tenant_id,
|
||||
app_id=row.app_id,
|
||||
workflow_id=row.workflow_id,
|
||||
workflow_run_id=row.workflow_run_id,
|
||||
node_id=row.node_id,
|
||||
node_execution_id=row.node_execution_id or "",
|
||||
binding_id=row.binding_id,
|
||||
agent_id=row.agent_id,
|
||||
agent_config_snapshot_id=row.agent_config_snapshot_id,
|
||||
),
|
||||
session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot),
|
||||
backend_run_id=row.backend_run_id,
|
||||
composition_layer_specs=_deserialize_specs(row.composition_layer_specs),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def save_active_snapshot(
|
||||
self,
|
||||
*,
|
||||
scope: WorkflowAgentSessionScope,
|
||||
backend_run_id: str,
|
||||
snapshot: CompositorSessionSnapshot | None,
|
||||
composition_layer_specs: list[CleanupLayerSpec],
|
||||
) -> None:
|
||||
if scope.workflow_run_id is None or snapshot is None:
|
||||
return
|
||||
|
||||
snapshot_json = snapshot.model_dump_json()
|
||||
specs_json = _serialize_specs(composition_layer_specs)
|
||||
with session_factory.create_session() as session:
|
||||
row = session.scalar(
|
||||
select(WorkflowAgentRuntimeSession).where(
|
||||
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
|
||||
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
|
||||
WorkflowAgentRuntimeSession.node_id == scope.node_id,
|
||||
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
|
||||
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
|
||||
)
|
||||
)
|
||||
if row is None:
|
||||
row = WorkflowAgentRuntimeSession(
|
||||
tenant_id=scope.tenant_id,
|
||||
app_id=scope.app_id,
|
||||
workflow_id=scope.workflow_id,
|
||||
workflow_run_id=scope.workflow_run_id,
|
||||
node_id=scope.node_id,
|
||||
node_execution_id=scope.node_execution_id,
|
||||
binding_id=scope.binding_id,
|
||||
agent_id=scope.agent_id,
|
||||
agent_config_snapshot_id=scope.agent_config_snapshot_id,
|
||||
backend_run_id=backend_run_id,
|
||||
session_snapshot=snapshot_json,
|
||||
composition_layer_specs=specs_json,
|
||||
status=WorkflowAgentRuntimeSessionStatus.ACTIVE,
|
||||
)
|
||||
session.add(row)
|
||||
else:
|
||||
row.node_execution_id = scope.node_execution_id
|
||||
row.agent_config_snapshot_id = scope.agent_config_snapshot_id
|
||||
row.backend_run_id = backend_run_id
|
||||
row.session_snapshot = snapshot_json
|
||||
row.composition_layer_specs = specs_json
|
||||
row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE
|
||||
row.cleaned_at = None
|
||||
session.commit()
|
||||
|
||||
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
|
||||
if scope.workflow_run_id is None:
|
||||
return
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
row = session.scalar(
|
||||
select(WorkflowAgentRuntimeSession).where(
|
||||
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
|
||||
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
|
||||
WorkflowAgentRuntimeSession.node_id == scope.node_id,
|
||||
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
|
||||
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
|
||||
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
|
||||
)
|
||||
)
|
||||
if row is None:
|
||||
return
|
||||
if backend_run_id is not None:
|
||||
row.backend_run_id = backend_run_id
|
||||
row.status = WorkflowAgentRuntimeSessionStatus.CLEANED
|
||||
row.cleaned_at = naive_utc_now()
|
||||
session.commit()
|
||||
|
||||
|
||||
__all__ = [
|
||||
"StoredWorkflowAgentSession",
|
||||
"WorkflowAgentRuntimeSessionStore",
|
||||
"WorkflowAgentSessionScope",
|
||||
]
|
||||
@ -1,90 +0,0 @@
|
||||
"""add workflow agent runtime sessions
|
||||
|
||||
Revision ID: 7885bd53f9a9
|
||||
Revises: d4a5e1f3c9b7
|
||||
Create Date: 2026-05-27 09:53:54.711805
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
import models as models
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "7885bd53f9a9"
|
||||
down_revision = "d4a5e1f3c9b7"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def _is_pg() -> bool:
|
||||
return op.get_bind().dialect.name == "postgresql"
|
||||
|
||||
|
||||
def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column:
|
||||
"""Match the ``uuidv7()`` default that other tables on Postgres rely on,
|
||||
while staying portable on MySQL where the ORM supplies the id."""
|
||||
kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key}
|
||||
if primary_key and _is_pg():
|
||||
kwargs["server_default"] = sa.text("uuidv7()")
|
||||
return sa.Column(name, models.types.StringUUID(), **kwargs)
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"workflow_agent_runtime_sessions",
|
||||
_uuid_column("id", primary_key=True),
|
||||
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("app_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("workflow_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("node_id", sa.String(length=255), nullable=False),
|
||||
sa.Column("node_execution_id", sa.String(length=255), nullable=True),
|
||||
sa.Column("binding_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("backend_run_id", sa.String(length=255), nullable=True),
|
||||
sa.Column("session_snapshot", models.types.LongText(), nullable=False),
|
||||
# MySQL rejects ``server_default`` on TEXT/BLOB columns. The JSON
|
||||
# payload is always populated at the ORM layer via
|
||||
# ``WorkflowAgentRuntimeSessionStore.save_active_snapshot`` so the
|
||||
# missing DB-level default cannot leave new rows uninitialized.
|
||||
sa.Column("composition_layer_specs", models.types.LongText(), nullable=False),
|
||||
sa.Column(
|
||||
"status",
|
||||
sa.String(length=32),
|
||||
server_default=sa.text("'active'"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("cleaned_at", sa.DateTime(), nullable=True),
|
||||
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")),
|
||||
sa.UniqueConstraint(
|
||||
"tenant_id",
|
||||
"workflow_run_id",
|
||||
"node_id",
|
||||
"binding_id",
|
||||
"agent_id",
|
||||
name=op.f("workflow_agent_runtime_session_scope_unique"),
|
||||
),
|
||||
)
|
||||
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
"workflow_agent_runtime_session_lookup_idx",
|
||||
["tenant_id", "workflow_run_id", "node_id", "status"],
|
||||
unique=False,
|
||||
)
|
||||
batch_op.create_index(
|
||||
"workflow_agent_runtime_session_backend_run_idx",
|
||||
["backend_run_id"],
|
||||
unique=False,
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
|
||||
batch_op.drop_index("workflow_agent_runtime_session_backend_run_idx")
|
||||
batch_op.drop_index("workflow_agent_runtime_session_lookup_idx")
|
||||
op.drop_table("workflow_agent_runtime_sessions")
|
||||
@ -20,8 +20,6 @@ from .agent import (
|
||||
AgentStatus,
|
||||
WorkflowAgentBindingType,
|
||||
WorkflowAgentNodeBinding,
|
||||
WorkflowAgentRuntimeSession,
|
||||
WorkflowAgentRuntimeSessionStatus,
|
||||
)
|
||||
from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint
|
||||
from .comment import (
|
||||
@ -237,8 +235,6 @@ __all__ = [
|
||||
"Workflow",
|
||||
"WorkflowAgentBindingType",
|
||||
"WorkflowAgentNodeBinding",
|
||||
"WorkflowAgentRuntimeSession",
|
||||
"WorkflowAgentRuntimeSessionStatus",
|
||||
"WorkflowAppLog",
|
||||
"WorkflowAppLogCreatedFrom",
|
||||
"WorkflowArchiveLog",
|
||||
|
||||
@ -92,15 +92,6 @@ class WorkflowAgentBindingType(StrEnum):
|
||||
INLINE_AGENT = "inline_agent"
|
||||
|
||||
|
||||
class WorkflowAgentRuntimeSessionStatus(StrEnum):
|
||||
"""Lifecycle state of an Agent backend session snapshot owned by a workflow run."""
|
||||
|
||||
# Snapshot can be reused by a later Agent run in the same workflow run.
|
||||
ACTIVE = "active"
|
||||
# Snapshot has been retired and must not be submitted to Agent backend again.
|
||||
CLEANED = "cleaned"
|
||||
|
||||
|
||||
class Agent(DefaultFieldsMixin, Base):
|
||||
"""Workspace-scoped Agent identity used by Agent Roster and workflow-only agents."""
|
||||
|
||||
@ -282,56 +273,3 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base):
|
||||
if isinstance(self.node_job_config, str):
|
||||
return json.loads(self.node_job_config)
|
||||
return dict(self.node_job_config)
|
||||
|
||||
|
||||
class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base):
|
||||
"""Persisted Agent backend session snapshot for one workflow Agent node execution scope.
|
||||
|
||||
The snapshot is runtime state returned by Agent backend. It is intentionally
|
||||
separate from Agent Soul snapshots and workflow node-job config.
|
||||
"""
|
||||
|
||||
__tablename__ = "workflow_agent_runtime_sessions"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"),
|
||||
UniqueConstraint(
|
||||
"tenant_id",
|
||||
"workflow_run_id",
|
||||
"node_id",
|
||||
"binding_id",
|
||||
"agent_id",
|
||||
name="workflow_agent_runtime_session_scope_unique",
|
||||
),
|
||||
Index(
|
||||
"workflow_agent_runtime_session_lookup_idx",
|
||||
"tenant_id",
|
||||
"workflow_run_id",
|
||||
"node_id",
|
||||
"status",
|
||||
),
|
||||
Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"),
|
||||
)
|
||||
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False)
|
||||
# JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps,
|
||||
# config}). Drives Agent backend cleanup-only runs: the agenton compositor
|
||||
# rejects a session snapshot whose layer names do not match the cleanup
|
||||
# composition, so we must replay the same layer graph (minus credential-
|
||||
# bearing plugin layers) when issuing the cleanup request.
|
||||
composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]")
|
||||
status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column(
|
||||
EnumText(WorkflowAgentRuntimeSessionStatus, length=32),
|
||||
nullable=False,
|
||||
default=WorkflowAgentRuntimeSessionStatus.ACTIVE,
|
||||
)
|
||||
cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
|
||||
@ -1,134 +0,0 @@
|
||||
"""Integration test for the cleanup request against the real agenton compositor.
|
||||
|
||||
The bug fixed by A+D was invisible to unit tests that use ``FakeAgentBackendRunClient``
|
||||
because the fake client never runs agenton's ``_validate_session_snapshot``. This
|
||||
test plugs a cleanup request through the real ``Compositor`` (with the same
|
||||
providers the agent backend wires in production) so that the snapshot-vs-
|
||||
composition name-order check would fail loudly if the cleanup builder ever
|
||||
regressed back to the empty-composition shape.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerProvider
|
||||
from agenton.compositor.schemas import LayerSessionSnapshot
|
||||
from agenton.layers.base import LifecycleState
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
|
||||
from agenton_collections.layers.plain.basic import PromptLayer
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer
|
||||
|
||||
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec
|
||||
|
||||
|
||||
def test_cleanup_request_passes_agenton_snapshot_validation():
|
||||
"""The cleanup request's composition layer names must match the (filtered)
|
||||
snapshot's layer names exactly — agenton's compositor enforces this and
|
||||
the agent backend rejects mismatches as ``run_failed`` asynchronously,
|
||||
which is the trap A/D fixed."""
|
||||
# Persisted (non-plugin) layer specs — these are what cleanup will replay.
|
||||
# We exclude the dify.execution_context layer from this integration check
|
||||
# because its real provider needs a plugin-daemon HTTP client; the cleanup
|
||||
# validation we are exercising is the snapshot-vs-composition name check,
|
||||
# which is purely structural and does not depend on which non-plugin layer
|
||||
# types appear.
|
||||
persisted_specs = [
|
||||
CleanupLayerSpec(
|
||||
name="workflow_node_job_prompt",
|
||||
type=PLAIN_PROMPT_LAYER_TYPE_ID,
|
||||
config={"prefix": "Do the cleanup."},
|
||||
),
|
||||
CleanupLayerSpec(name="history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
# Saved snapshot still carries the LLM layer entry — cleanup's
|
||||
# ``_filter_snapshot_to_specs`` must drop it so names match.
|
||||
full_snapshot = CompositorSessionSnapshot(
|
||||
layers=[
|
||||
LayerSessionSnapshot(
|
||||
name="workflow_node_job_prompt",
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={},
|
||||
),
|
||||
LayerSessionSnapshot(
|
||||
name="history",
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={"messages": []},
|
||||
),
|
||||
LayerSessionSnapshot(
|
||||
name="llm",
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
cleanup_request = AgentBackendRunRequestBuilder().build_cleanup_request(
|
||||
session_snapshot=full_snapshot,
|
||||
composition_layer_specs=persisted_specs,
|
||||
)
|
||||
|
||||
# Drive the real agenton compositor through ``from_config`` + ``_create_run``
|
||||
# the same way the agent backend's RunScheduler does. ``_create_run`` is the
|
||||
# private path that calls ``_validate_session_snapshot``; we use it directly
|
||||
# to keep the test synchronous (no async ``enter()`` lifecycle needed —
|
||||
# validation is the only thing under test).
|
||||
config = {
|
||||
"schema_version": 1,
|
||||
"layers": [
|
||||
{"name": layer.name, "type": layer.type, "deps": dict(layer.deps), "metadata": dict(layer.metadata)}
|
||||
for layer in cleanup_request.composition.layers
|
||||
],
|
||||
}
|
||||
compositor = Compositor.from_config(
|
||||
config,
|
||||
providers=[
|
||||
LayerProvider.from_layer_type(PromptLayer),
|
||||
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
|
||||
],
|
||||
)
|
||||
|
||||
layer_configs = {layer.name: layer.config for layer in cleanup_request.composition.layers}
|
||||
# This is the call that would raise ``ValueError`` if the cleanup snapshot
|
||||
# and composition disagreed on layer names — the exact failure mode the
|
||||
# original ``layers=[]`` cleanup hit.
|
||||
run = compositor._create_run( # type: ignore[reportPrivateUsage]
|
||||
configs=cast(dict[str, object], layer_configs),
|
||||
session_snapshot=cleanup_request.session_snapshot,
|
||||
)
|
||||
assert list(run.slots.keys()) == ["workflow_node_job_prompt", "history"]
|
||||
|
||||
|
||||
def test_cleanup_request_with_mismatched_specs_would_be_rejected_by_agenton():
|
||||
"""Regression sentinel: if a future refactor stops filtering the snapshot,
|
||||
agenton would reject the request — and that rejection is what the runtime
|
||||
fix is preventing. We confirm the validator does fail when given the
|
||||
pre-fix shape so the previous test's success is not a coincidence."""
|
||||
snapshot_with_extra = CompositorSessionSnapshot(
|
||||
layers=[
|
||||
LayerSessionSnapshot(
|
||||
name="history",
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={},
|
||||
),
|
||||
LayerSessionSnapshot(
|
||||
name="llm", # extra layer not in composition
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={},
|
||||
),
|
||||
]
|
||||
)
|
||||
compositor = Compositor.from_config(
|
||||
{
|
||||
"schema_version": 1,
|
||||
"layers": [{"name": "history", "type": PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, "deps": {}, "metadata": {}}],
|
||||
},
|
||||
providers=[LayerProvider.from_layer_type(PydanticAIHistoryLayer)],
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="layer names must match"):
|
||||
compositor._create_run( # type: ignore[reportPrivateUsage]
|
||||
configs={},
|
||||
session_snapshot=snapshot_with_extra,
|
||||
)
|
||||
@ -63,25 +63,3 @@ def test_fake_client_cancel_run_returns_cancelled_status():
|
||||
|
||||
assert cancelled.run_id == "fake-run-1"
|
||||
assert cancelled.status == "cancelled"
|
||||
|
||||
|
||||
def test_fake_client_paused_scenario_returns_paused_status_and_event():
|
||||
"""The paused scenario exists for HITL-style flows; both ``wait_run`` and
|
||||
the event stream must report the pause so consumers can branch on it."""
|
||||
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED)
|
||||
|
||||
status = client.wait_run("fake-run-1")
|
||||
events = list(client.stream_events("fake-run-1"))
|
||||
|
||||
assert status.status == "paused"
|
||||
assert status.error is None
|
||||
assert events[-1].type == "run_paused"
|
||||
assert events[-1].data.reason == "human_input_required"
|
||||
|
||||
|
||||
def test_fake_client_success_wait_run_returns_succeeded_status():
|
||||
"""Covers the default SUCCESS branch of ``wait_run`` directly."""
|
||||
status = FakeAgentBackendRunClient().wait_run("fake-run-1")
|
||||
|
||||
assert status.status == "succeeded"
|
||||
assert status.error is None
|
||||
|
||||
@ -1,23 +1,15 @@
|
||||
from typing import Any, cast
|
||||
|
||||
import pytest
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from agenton.compositor.schemas import LayerSessionSnapshot
|
||||
from agenton.layers import ExitIntent
|
||||
from agenton.layers.base import LifecycleState
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
DifyPluginLLMLayerConfig,
|
||||
DifyPluginToolConfig,
|
||||
DifyPluginToolsLayerConfig,
|
||||
)
|
||||
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
|
||||
from dify_agent.protocol import (
|
||||
DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
DIFY_AGENT_MODEL_LAYER_ID,
|
||||
DIFY_AGENT_OUTPUT_LAYER_ID,
|
||||
CreateRunRequest,
|
||||
@ -34,7 +26,6 @@ from clients.agent_backend import (
|
||||
AgentBackendOutputConfig,
|
||||
AgentBackendRunRequestBuilder,
|
||||
AgentBackendWorkflowNodeRunInput,
|
||||
CleanupLayerSpec,
|
||||
redact_for_agent_backend_log,
|
||||
)
|
||||
|
||||
@ -80,11 +71,10 @@ def test_request_builder_outputs_dify_agent_create_run_request():
|
||||
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
|
||||
WORKFLOW_USER_PROMPT_LAYER_ID,
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_ID,
|
||||
DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
DIFY_AGENT_MODEL_LAYER_ID,
|
||||
DIFY_AGENT_OUTPUT_LAYER_ID,
|
||||
]
|
||||
assert request.on_exit.default is ExitIntent.SUSPEND
|
||||
assert request.on_exit.default is ExitIntent.DELETE
|
||||
assert request.idempotency_key == "workflow-run-1:node-execution-1"
|
||||
assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"}
|
||||
|
||||
@ -109,10 +99,9 @@ def test_request_builder_sets_model_and_output_layer_contract_ids():
|
||||
layers = {layer.name: layer for layer in request.composition.layers}
|
||||
|
||||
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].type == DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID
|
||||
assert cast(DifyExecutionContextLayerConfig, layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config).user_id == "user-1"
|
||||
assert layers[DIFY_AGENT_HISTORY_LAYER_ID].type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config.user_id == "user-1"
|
||||
assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID
|
||||
assert cast(DifyPluginLLMLayerConfig, layers[DIFY_AGENT_MODEL_LAYER_ID].config).plugin_id == "langgenius/openai"
|
||||
assert layers[DIFY_AGENT_MODEL_LAYER_ID].config.plugin_id == "langgenius/openai"
|
||||
assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
|
||||
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
|
||||
|
||||
@ -141,92 +130,16 @@ def test_request_builder_adds_dify_plugin_tools_layer_when_configured():
|
||||
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
|
||||
tools_config = cast(DifyPluginToolsLayerConfig, layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config)
|
||||
assert tools_config.tools[0].tool_name == "current_time"
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time"
|
||||
|
||||
|
||||
def test_request_builder_can_delete_on_exit_for_cleanup_paths():
|
||||
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
|
||||
run_input = _run_input()
|
||||
run_input.suspend_on_exit = False
|
||||
run_input.suspend_on_exit = True
|
||||
|
||||
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
|
||||
|
||||
assert request.on_exit.default is ExitIntent.DELETE
|
||||
|
||||
|
||||
def test_request_builder_builds_cleanup_request_replays_persisted_layer_specs():
|
||||
"""The cleanup request must replay the persisted (non-plugin) layer specs
|
||||
and filter the snapshot to match so the agenton compositor's
|
||||
snapshot-vs-composition name-order validator passes."""
|
||||
session_snapshot = CompositorSessionSnapshot(
|
||||
layers=[
|
||||
LayerSessionSnapshot(name="history", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={"k": 1}),
|
||||
LayerSessionSnapshot(name="llm", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
|
||||
]
|
||||
)
|
||||
specs = [CleanupLayerSpec(name="history", type="pydantic_ai.history")]
|
||||
|
||||
request = AgentBackendRunRequestBuilder().build_cleanup_request(
|
||||
session_snapshot=session_snapshot,
|
||||
composition_layer_specs=specs,
|
||||
idempotency_key="run-1:node-1:binding-1:agent-session-cleanup",
|
||||
metadata={"workflow_run_id": "run-1"},
|
||||
)
|
||||
|
||||
assert [layer.name for layer in request.composition.layers] == ["history"]
|
||||
assert request.session_snapshot is not None
|
||||
assert [layer.name for layer in request.session_snapshot.layers] == ["history"]
|
||||
assert request.on_exit.default is ExitIntent.DELETE
|
||||
assert request.idempotency_key == "run-1:node-1:binding-1:agent-session-cleanup"
|
||||
assert request.metadata["agent_backend_lifecycle"] == "session_cleanup"
|
||||
|
||||
|
||||
def test_request_builder_rejects_empty_composition_layer_specs():
|
||||
"""Empty specs would put us back in the original ``layers=[]`` trap that
|
||||
fails on agenton's snapshot-vs-composition validation."""
|
||||
with pytest.raises(ValueError, match="composition_layer_specs"):
|
||||
AgentBackendRunRequestBuilder().build_cleanup_request(
|
||||
session_snapshot=CompositorSessionSnapshot(layers=[]),
|
||||
composition_layer_specs=[],
|
||||
)
|
||||
|
||||
|
||||
def test_extract_cleanup_layer_specs_drops_plugin_layers_keeps_configs():
|
||||
from dify_agent.protocol import RunComposition, RunLayerSpec
|
||||
|
||||
from clients.agent_backend import extract_cleanup_layer_specs
|
||||
|
||||
composition = RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(
|
||||
name="agent_soul_prompt",
|
||||
type="plain.prompt",
|
||||
config=PromptLayerConfig(prefix="hello"),
|
||||
),
|
||||
RunLayerSpec(
|
||||
name="llm",
|
||||
type="dify.plugin.llm",
|
||||
config=None, # protocol allows None; the redacted config is what matters
|
||||
),
|
||||
RunLayerSpec(
|
||||
name="tools",
|
||||
type="dify.plugin.tools",
|
||||
),
|
||||
RunLayerSpec(
|
||||
name="history",
|
||||
type="pydantic_ai.history",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
specs = extract_cleanup_layer_specs(composition)
|
||||
|
||||
assert [spec.name for spec in specs] == ["agent_soul_prompt", "history"]
|
||||
# Non-plugin configs are dumped as JSON-compatible dicts so the persisted
|
||||
# row can be replayed without holding live pydantic instances.
|
||||
soul_config = specs[0].config
|
||||
assert isinstance(soul_config, dict)
|
||||
assert soul_config.get("prefix") == "hello"
|
||||
assert request.on_exit.default is ExitIntent.SUSPEND
|
||||
|
||||
|
||||
def test_request_builder_rejects_blank_prompts():
|
||||
@ -246,6 +159,6 @@ def test_request_builder_rejects_blank_prompts():
|
||||
def test_redact_for_agent_backend_log_hides_credentials():
|
||||
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
|
||||
|
||||
redacted = cast(dict[str, Any], redact_for_agent_backend_log(request))
|
||||
redacted = redact_for_agent_backend_log(request)
|
||||
|
||||
assert redacted["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
|
||||
assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
|
||||
|
||||
@ -1,12 +1,9 @@
|
||||
from types import SimpleNamespace
|
||||
from typing import cast
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
|
||||
from clients.agent_backend import (
|
||||
AgentBackendRunEventAdapter,
|
||||
AgentBackendStreamInternalEvent,
|
||||
CleanupLayerSpec,
|
||||
FakeAgentBackendRunClient,
|
||||
FakeAgentBackendScenario,
|
||||
)
|
||||
@ -16,10 +13,9 @@ from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBu
|
||||
from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData
|
||||
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
|
||||
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
|
||||
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
|
||||
from graphon.entities import GraphInitParams
|
||||
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent
|
||||
from graphon.node_events import StreamCompletedEvent
|
||||
from graphon.runtime import GraphRuntimeState
|
||||
from graphon.variables.segments import StringSegment
|
||||
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
|
||||
@ -88,47 +84,7 @@ class FakeBindingResolver(WorkflowAgentBindingResolver):
|
||||
return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot)
|
||||
|
||||
|
||||
class FakeSessionStore:
|
||||
def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None:
|
||||
self.loaded_snapshot = snapshot
|
||||
self.saved: list[
|
||||
tuple[
|
||||
WorkflowAgentSessionScope,
|
||||
str,
|
||||
CompositorSessionSnapshot | None,
|
||||
list[CleanupLayerSpec],
|
||||
]
|
||||
] = []
|
||||
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
|
||||
|
||||
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
|
||||
return self.loaded_snapshot
|
||||
|
||||
def save_active_snapshot(
|
||||
self,
|
||||
*,
|
||||
scope: WorkflowAgentSessionScope,
|
||||
backend_run_id: str,
|
||||
snapshot: CompositorSessionSnapshot | None,
|
||||
composition_layer_specs: list[CleanupLayerSpec],
|
||||
) -> None:
|
||||
self.saved.append((scope, backend_run_id, snapshot, list(composition_layer_specs)))
|
||||
|
||||
def mark_cleaned(
|
||||
self,
|
||||
*,
|
||||
scope: WorkflowAgentSessionScope,
|
||||
backend_run_id: str | None = None,
|
||||
) -> None:
|
||||
self.cleaned.append((scope, backend_run_id))
|
||||
|
||||
|
||||
def _node(
|
||||
*,
|
||||
scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS,
|
||||
agent_backend_client: FakeAgentBackendRunClient | None = None,
|
||||
session_store: FakeSessionStore | None = None,
|
||||
) -> DifyAgentNode:
|
||||
def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode:
|
||||
graph_init_params = GraphInitParams(
|
||||
workflow_id="workflow-1",
|
||||
graph_config={"nodes": [], "edges": []},
|
||||
@ -150,7 +106,6 @@ def _node(
|
||||
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
|
||||
return True
|
||||
|
||||
client = agent_backend_client or FakeAgentBackendRunClient(scenario=scenario)
|
||||
return DifyAgentNode(
|
||||
node_id="agent-node",
|
||||
data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}),
|
||||
@ -158,12 +113,11 @@ def _node(
|
||||
graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())),
|
||||
binding_resolver=FakeBindingResolver(),
|
||||
runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()),
|
||||
agent_backend_client=client,
|
||||
agent_backend_client=FakeAgentBackendRunClient(scenario=scenario),
|
||||
event_adapter=AgentBackendRunEventAdapter(),
|
||||
output_adapter=WorkflowAgentOutputAdapter(),
|
||||
type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()),
|
||||
failure_orchestrator=OutputFailureOrchestrator(),
|
||||
session_store=cast(WorkflowAgentRuntimeSessionStore | None, session_store),
|
||||
)
|
||||
|
||||
|
||||
@ -178,7 +132,7 @@ def test_agent_node_run_maps_successful_agent_backend_run_to_node_result():
|
||||
assert agent_log["agent_backend"]["run_id"] == "fake-run-1"
|
||||
assert agent_log["agent_backend"]["status"] == "succeeded"
|
||||
assert result.process_data["agent_id"] == "agent-1"
|
||||
assert result.inputs["agent_backend_request"]["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
|
||||
assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
|
||||
|
||||
|
||||
def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
|
||||
@ -191,126 +145,6 @@ def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
|
||||
assert result.error_type == "unit_test"
|
||||
|
||||
|
||||
def test_agent_node_failed_run_marks_session_cleaned_to_prevent_stale_reuse():
|
||||
"""A failed agent run must retire the local ACTIVE session row so a workflow
|
||||
loop back into the same Agent node does not resume from a stale snapshot."""
|
||||
existing_snapshot = CompositorSessionSnapshot(layers=[])
|
||||
store = FakeSessionStore(snapshot=existing_snapshot)
|
||||
|
||||
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
|
||||
|
||||
assert len(events) == 1
|
||||
assert store.cleaned, "failed agent run should mark the session cleaned"
|
||||
cleaned_scope, cleaned_backend_run_id = store.cleaned[0]
|
||||
assert cleaned_scope.workflow_run_id == "workflow-run-1"
|
||||
assert cleaned_backend_run_id == "fake-run-1"
|
||||
# A failed run does not produce a fresh snapshot to persist.
|
||||
assert store.saved == []
|
||||
|
||||
|
||||
def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot():
|
||||
existing_snapshot = CompositorSessionSnapshot(layers=[])
|
||||
store = FakeSessionStore(snapshot=existing_snapshot)
|
||||
client = FakeAgentBackendRunClient()
|
||||
node = _node(agent_backend_client=client, session_store=store)
|
||||
|
||||
events = list(node._run())
|
||||
|
||||
assert len(events) == 1
|
||||
assert store.saved
|
||||
scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0]
|
||||
assert scope.workflow_run_id == "workflow-run-1"
|
||||
assert backend_run_id == "fake-run-1"
|
||||
assert saved_snapshot is not None
|
||||
assert client.request is not None
|
||||
assert client.request.session_snapshot is existing_snapshot
|
||||
# Persist enough composition shape to replay a cleanup run; plugin layers
|
||||
# (which would carry credentials) are intentionally absent.
|
||||
saved_layer_names = [spec.name for spec in saved_specs]
|
||||
assert saved_layer_names, "cleanup specs must persist at least the non-plugin layers"
|
||||
plugin_types = {"dify.plugin.llm", "dify.plugin.tools"}
|
||||
assert not {spec.type for spec in saved_specs} & plugin_types
|
||||
|
||||
|
||||
def test_agent_node_run_when_session_store_save_raises_records_persist_error_in_metadata():
|
||||
"""A DB-side write failure must not crash the node; it should set
|
||||
``session_snapshot_persist_error`` in the agent_backend metadata so the
|
||||
incident is observable from the workflow_node_executions record."""
|
||||
|
||||
class _ExplodingSessionStore(FakeSessionStore):
|
||||
def save_active_snapshot(self, **kwargs): # type: ignore[override]
|
||||
del kwargs
|
||||
raise RuntimeError("simulated DB failure")
|
||||
|
||||
store = _ExplodingSessionStore()
|
||||
events = list(_node(session_store=store)._run())
|
||||
|
||||
assert len(events) == 1
|
||||
result = cast(StreamCompletedEvent, events[0]).node_run_result
|
||||
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
|
||||
assert agent_backend["session_snapshot_persisted"] is False
|
||||
assert agent_backend["session_snapshot_persist_error"] == "workflow_agent_runtime_session_store_error"
|
||||
|
||||
|
||||
def test_agent_node_failed_run_when_mark_cleaned_raises_records_cleanup_error_in_metadata():
|
||||
"""Same defensive pattern: a DB-side mark_cleaned failure must surface as
|
||||
a ``session_snapshot_cleanup_error`` in metadata, not as a node crash."""
|
||||
|
||||
class _ExplodingMarkCleanedStore(FakeSessionStore):
|
||||
def mark_cleaned(self, **kwargs): # type: ignore[override]
|
||||
del kwargs
|
||||
raise RuntimeError("simulated DB failure")
|
||||
|
||||
store = _ExplodingMarkCleanedStore()
|
||||
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
|
||||
|
||||
assert len(events) == 1
|
||||
result = cast(StreamCompletedEvent, events[0]).node_run_result
|
||||
assert result.status == WorkflowNodeExecutionStatus.FAILED
|
||||
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
|
||||
assert agent_backend["session_snapshot_cleaned_on_failure"] is False
|
||||
assert agent_backend["session_snapshot_cleanup_error"] == "workflow_agent_runtime_session_store_error"
|
||||
|
||||
|
||||
def test_agent_node_success_run_without_session_store_skips_persistence():
|
||||
"""When ``session_store`` is None the node still completes successfully —
|
||||
the lifecycle branch is a no-op and the run result is unaffected."""
|
||||
events = list(_node(session_store=None)._run())
|
||||
|
||||
assert len(events) == 1
|
||||
result = cast(StreamCompletedEvent, events[0]).node_run_result
|
||||
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
|
||||
# No persistence metadata is attached when the store is missing.
|
||||
assert "session_snapshot_persisted" not in agent_backend
|
||||
|
||||
|
||||
def test_agent_node_failed_run_without_session_store_skips_mark_cleaned():
|
||||
"""``session_store=None`` + failed terminal must remain a no-op for
|
||||
the cleanup branch — the node failure path still surfaces correctly."""
|
||||
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=None)._run())
|
||||
|
||||
assert len(events) == 1
|
||||
result = cast(StreamCompletedEvent, events[0]).node_run_result
|
||||
assert result.status == WorkflowNodeExecutionStatus.FAILED
|
||||
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
|
||||
assert "session_snapshot_cleaned_on_failure" not in agent_backend
|
||||
|
||||
|
||||
def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot():
|
||||
store = FakeSessionStore()
|
||||
node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store)
|
||||
|
||||
events = list(node._run())
|
||||
|
||||
assert len(events) == 1
|
||||
assert isinstance(events[0], PauseRequestedEvent)
|
||||
assert store.saved
|
||||
assert store.saved[0][1] == "fake-run-1"
|
||||
assert store.saved[0][3], "paused agent run should still persist replayable layer specs"
|
||||
|
||||
|
||||
def test_agent_node_records_stream_usage_metadata():
|
||||
metadata = {"agent_backend": {"run_id": "run-1"}}
|
||||
|
||||
|
||||
@ -1,14 +1,10 @@
|
||||
from dataclasses import replace
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID
|
||||
|
||||
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID
|
||||
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
|
||||
from core.workflow.nodes.agent_v2.plugin_tools_builder import WorkflowAgentPluginToolsBuilder
|
||||
from core.workflow.nodes.agent_v2.runtime_request_builder import (
|
||||
WorkflowAgentRuntimeBuildContext,
|
||||
WorkflowAgentRuntimeRequestBuilder,
|
||||
@ -31,17 +27,6 @@ class FakeCredentialsProvider:
|
||||
return {"api_key": "secret-key"}
|
||||
|
||||
|
||||
class CapturingCredentialsProvider:
|
||||
def __init__(self) -> None:
|
||||
self.provider_name: str | None = None
|
||||
self.model_name: str | None = None
|
||||
|
||||
def fetch(self, provider_name: str, model_name: str) -> dict[str, object]:
|
||||
self.provider_name = provider_name
|
||||
self.model_name = model_name
|
||||
return {"api_key": "secret-key"}
|
||||
|
||||
|
||||
class FakePluginToolsBuilder:
|
||||
def __init__(self) -> None:
|
||||
# Capture the runtime invocation source so tests can assert it was
|
||||
@ -151,31 +136,7 @@ def test_builds_create_run_request_from_agent_soul_and_node_job():
|
||||
assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output."
|
||||
assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"]
|
||||
assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string"
|
||||
assert DIFY_AGENT_HISTORY_LAYER_ID in layers
|
||||
assert result.redacted_request["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
|
||||
|
||||
|
||||
def test_normalizes_langgenius_model_provider_for_agent_backend_transport():
|
||||
context = _context()
|
||||
context.snapshot.config_snapshot = AgentSoulConfig(
|
||||
prompt={"system_prompt": "You are careful."},
|
||||
model=AgentSoulModelConfig(
|
||||
plugin_id="langgenius/openai/openai",
|
||||
model_provider="langgenius/openai/openai",
|
||||
model="gpt-test",
|
||||
),
|
||||
)
|
||||
credentials_provider = CapturingCredentialsProvider()
|
||||
|
||||
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=credentials_provider).build(context)
|
||||
|
||||
dumped = result.request.model_dump(mode="json")
|
||||
layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]}
|
||||
model_config = layers[DIFY_AGENT_MODEL_LAYER_ID]["config"]
|
||||
assert credentials_provider.provider_name == "langgenius/openai/openai"
|
||||
assert credentials_provider.model_name == "gpt-test"
|
||||
assert model_config["plugin_id"] == "langgenius/openai"
|
||||
assert model_config["model_provider"] == "openai"
|
||||
assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
|
||||
|
||||
|
||||
def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata():
|
||||
@ -226,7 +187,7 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada
|
||||
assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string"
|
||||
assert output_schema["properties"]["confidence"]["type"] == "number"
|
||||
assert output_schema["required"] == ["report"]
|
||||
assert dumped["composition"]["layers"][5]["config"]["model_settings"] == {"temperature": 0.2}
|
||||
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
|
||||
assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid"
|
||||
assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed"
|
||||
warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"]
|
||||
@ -263,7 +224,7 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
|
||||
plugin_tools_builder = FakePluginToolsBuilder()
|
||||
result = WorkflowAgentRuntimeRequestBuilder(
|
||||
credentials_provider=FakeCredentialsProvider(),
|
||||
plugin_tools_builder=cast(WorkflowAgentPluginToolsBuilder, plugin_tools_builder),
|
||||
plugin_tools_builder=plugin_tools_builder,
|
||||
).build(context)
|
||||
|
||||
dumped = result.request.model_dump(mode="json")
|
||||
@ -283,15 +244,6 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
|
||||
assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from
|
||||
|
||||
|
||||
def test_build_passes_saved_session_snapshot_to_agent_backend_request():
|
||||
session_snapshot = CompositorSessionSnapshot(layers=[])
|
||||
context = replace(_context(), session_snapshot=session_snapshot)
|
||||
|
||||
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
|
||||
|
||||
assert result.request.session_snapshot is session_snapshot
|
||||
|
||||
|
||||
def test_requires_agent_soul_model_config():
|
||||
context = _context()
|
||||
snapshot = AgentConfigSnapshot(
|
||||
|
||||
@ -1,412 +0,0 @@
|
||||
from datetime import UTC
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from agenton.compositor.schemas import LayerSessionSnapshot
|
||||
from agenton.layers.base import LifecycleState
|
||||
from dify_agent.protocol import CancelRunRequest, RunEvent, RunStatusResponse
|
||||
|
||||
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec, FakeAgentBackendRunClient
|
||||
from clients.agent_backend.errors import AgentBackendHTTPError
|
||||
from core.workflow.nodes.agent_v2.session_cleanup_layer import WorkflowAgentSessionCleanupLayer
|
||||
from core.workflow.nodes.agent_v2.session_store import (
|
||||
StoredWorkflowAgentSession,
|
||||
WorkflowAgentRuntimeSessionStore,
|
||||
WorkflowAgentSessionScope,
|
||||
)
|
||||
from core.workflow.system_variables import build_system_variables
|
||||
from graphon.entities.pause_reason import SchedulingPause
|
||||
from graphon.graph_engine.command_channels import CommandChannel
|
||||
from graphon.graph_events import (
|
||||
GraphRunAbortedEvent,
|
||||
GraphRunFailedEvent,
|
||||
GraphRunPartialSucceededEvent,
|
||||
GraphRunPausedEvent,
|
||||
GraphRunStartedEvent,
|
||||
GraphRunSucceededEvent,
|
||||
)
|
||||
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
|
||||
|
||||
|
||||
def _layer_snapshot(name: str) -> LayerSessionSnapshot:
|
||||
return LayerSessionSnapshot(
|
||||
name=name,
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={},
|
||||
)
|
||||
|
||||
|
||||
def _stored_session(scope: WorkflowAgentSessionScope, *, index: int = 1) -> StoredWorkflowAgentSession:
|
||||
"""A typical stored session with prompt + execution_context + history + llm specs.
|
||||
|
||||
The LLM layer is *not* in ``composition_layer_specs`` because the cleanup
|
||||
contract excludes credential-bearing plugin layers, but it *is* present in
|
||||
the saved snapshot so the layer's filter logic gets exercised.
|
||||
"""
|
||||
return StoredWorkflowAgentSession(
|
||||
scope=scope,
|
||||
session_snapshot=CompositorSessionSnapshot(
|
||||
layers=[
|
||||
_layer_snapshot("workflow_node_job_prompt"),
|
||||
_layer_snapshot("execution_context"),
|
||||
_layer_snapshot("history"),
|
||||
_layer_snapshot("llm"),
|
||||
]
|
||||
),
|
||||
backend_run_id=f"agent-run-{index}",
|
||||
composition_layer_specs=[
|
||||
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
|
||||
CleanupLayerSpec(name="execution_context", type="dify.execution_context", config={"tenant_id": "t"}),
|
||||
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class FakeSessionStore:
|
||||
"""In-memory stand-in for ``WorkflowAgentRuntimeSessionStore``."""
|
||||
|
||||
def __init__(self, *, stored: list[StoredWorkflowAgentSession] | None = None) -> None:
|
||||
self._stored = stored if stored is not None else [_stored_session(_default_scope())]
|
||||
self.list_calls: list[str] = []
|
||||
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
|
||||
|
||||
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
|
||||
self.list_calls.append(workflow_run_id)
|
||||
return list(self._stored)
|
||||
|
||||
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
|
||||
self.cleaned.append((scope, backend_run_id))
|
||||
|
||||
|
||||
def _default_scope() -> WorkflowAgentSessionScope:
|
||||
return WorkflowAgentSessionScope(
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
workflow_id="workflow-1",
|
||||
workflow_run_id="workflow-run-1",
|
||||
node_id="agent-node",
|
||||
node_execution_id="node-exec-1",
|
||||
binding_id="binding-1",
|
||||
agent_id="agent-1",
|
||||
agent_config_snapshot_id="snapshot-1",
|
||||
)
|
||||
|
||||
|
||||
class _WaitableFakeAgentBackendRunClient(FakeAgentBackendRunClient):
|
||||
"""``FakeAgentBackendRunClient`` plus the ``wait_run`` hook the layer needs."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
run_id: str = "cleanup-run-1",
|
||||
wait_status: str = "succeeded",
|
||||
wait_error: str | None = None,
|
||||
wait_raises: Exception | None = None,
|
||||
) -> None:
|
||||
super().__init__(run_id=run_id)
|
||||
self._wait_status = wait_status
|
||||
self._wait_error = wait_error
|
||||
self._wait_raises = wait_raises
|
||||
self.wait_calls: list[tuple[str, float | None]] = []
|
||||
|
||||
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
|
||||
self.wait_calls.append((run_id, timeout_seconds))
|
||||
if self._wait_raises is not None:
|
||||
raise self._wait_raises
|
||||
from datetime import datetime
|
||||
|
||||
return RunStatusResponse(
|
||||
run_id=run_id,
|
||||
status=cast(object, self._wait_status), # protocol Literal; cast keeps tests flexible
|
||||
created_at=datetime(2026, 1, 1, tzinfo=UTC),
|
||||
updated_at=datetime(2026, 1, 1, tzinfo=UTC),
|
||||
error=self._wait_error,
|
||||
)
|
||||
|
||||
# Inherit ``create_run`` from FakeAgentBackendRunClient; the missing protocol
|
||||
# methods below are stub-only because the cleanup layer never calls them.
|
||||
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None): # pragma: no cover
|
||||
del run_id, request
|
||||
raise NotImplementedError
|
||||
|
||||
def stream_events(self, run_id: str, *, after: str | None = None): # pragma: no cover
|
||||
del run_id, after
|
||||
if False:
|
||||
yield cast(RunEvent, None)
|
||||
|
||||
|
||||
def _build_layer(
|
||||
*,
|
||||
session_store: FakeSessionStore,
|
||||
agent_backend_client: _WaitableFakeAgentBackendRunClient,
|
||||
http_cleanup_supported: bool = True,
|
||||
) -> WorkflowAgentSessionCleanupLayer:
|
||||
variable_pool = VariablePool.from_bootstrap(
|
||||
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
|
||||
layer = WorkflowAgentSessionCleanupLayer(
|
||||
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
|
||||
request_builder=AgentBackendRunRequestBuilder(),
|
||||
agent_backend_client=agent_backend_client,
|
||||
)
|
||||
# Tests opt in to the future HTTP-cleanup branch; the production default
|
||||
# (False) is exercised by the dedicated tests below.
|
||||
layer._HTTP_CLEANUP_SUPPORTED = http_cleanup_supported # type: ignore[reportPrivateUsage]
|
||||
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
|
||||
return layer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"terminal_event",
|
||||
[
|
||||
GraphRunSucceededEvent(outputs={}),
|
||||
GraphRunPartialSucceededEvent(exceptions_count=1, outputs={}),
|
||||
GraphRunFailedEvent(error="boom"),
|
||||
GraphRunAbortedEvent(reason="user cancelled", outputs={}),
|
||||
],
|
||||
ids=["succeeded", "partial_succeeded", "failed", "aborted"],
|
||||
)
|
||||
def test_cleanup_layer_triggers_cleanup_only_run_on_each_terminal_event(terminal_event):
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient()
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(terminal_event)
|
||||
|
||||
assert session_store.list_calls == ["workflow-run-1"]
|
||||
assert agent_backend_client.request is not None
|
||||
# Cleanup composition replays the persisted (non-plugin) layer specs so the
|
||||
# agent backend's snapshot-vs-composition name match succeeds.
|
||||
layer_names = [layer.name for layer in agent_backend_client.request.composition.layers]
|
||||
assert layer_names == ["workflow_node_job_prompt", "execution_context", "history"]
|
||||
assert agent_backend_client.request.on_exit.default.value == "delete"
|
||||
assert agent_backend_client.request.metadata["agent_backend_lifecycle"] == "session_cleanup"
|
||||
# Snapshot is filtered to drop the plugin layer entry so names match the
|
||||
# cleanup composition.
|
||||
assert agent_backend_client.request.session_snapshot is not None
|
||||
snapshot_names = [layer.name for layer in agent_backend_client.request.session_snapshot.layers]
|
||||
assert snapshot_names == ["workflow_node_job_prompt", "execution_context", "history"]
|
||||
# The layer waited for terminal status and the run succeeded, so the row
|
||||
# is marked CLEANED with the cleanup run id.
|
||||
assert agent_backend_client.wait_calls
|
||||
assert session_store.cleaned == [(_default_scope(), "cleanup-run-1")]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"non_terminal_event",
|
||||
[
|
||||
GraphRunStartedEvent(),
|
||||
GraphRunPausedEvent(reasons=[SchedulingPause(message="awaiting human input")], outputs={}),
|
||||
],
|
||||
ids=["started", "paused"],
|
||||
)
|
||||
def test_cleanup_layer_ignores_non_terminal_events(non_terminal_event):
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient()
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(non_terminal_event)
|
||||
|
||||
assert session_store.list_calls == []
|
||||
assert agent_backend_client.request is None
|
||||
assert session_store.cleaned == []
|
||||
|
||||
|
||||
def test_cleanup_layer_does_not_mark_cleaned_when_cleanup_run_fails():
|
||||
"""Trap D: cleanup-only run goes ``run_failed`` (e.g. snapshot validation
|
||||
error) — the layer must leave the row ACTIVE so it can be retried instead
|
||||
of silently leaking suspended agent-backend layers."""
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient(
|
||||
wait_status="failed",
|
||||
wait_error="snapshot mismatch",
|
||||
)
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
assert agent_backend_client.wait_calls
|
||||
assert session_store.cleaned == []
|
||||
|
||||
|
||||
def test_cleanup_layer_does_not_mark_cleaned_when_wait_raises():
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient(
|
||||
wait_raises=AgentBackendHTTPError("boom", status_code=500, detail=None),
|
||||
)
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
assert session_store.cleaned == []
|
||||
|
||||
|
||||
def test_cleanup_layer_marks_cleaned_locally_when_http_cleanup_disabled():
|
||||
"""Production default: dify-agent has no cleanup-only run mode yet, so the
|
||||
layer must retire the local row without issuing a doomed HTTP request that
|
||||
would crash inside the agent backend's runner on the missing LLM layer."""
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient()
|
||||
layer = _build_layer(
|
||||
session_store=session_store,
|
||||
agent_backend_client=agent_backend_client,
|
||||
http_cleanup_supported=False,
|
||||
)
|
||||
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
# No HTTP call goes out — the trap is avoided entirely.
|
||||
assert agent_backend_client.request is None
|
||||
assert agent_backend_client.wait_calls == []
|
||||
# Local row is still retired so a workflow loop cannot resume from stale state.
|
||||
assert session_store.cleaned == [(_default_scope(), "agent-run-1")]
|
||||
|
||||
|
||||
def test_cleanup_layer_skips_sessions_without_persisted_specs():
|
||||
"""Backwards-compatible safety net: a row written before A.1 landed has
|
||||
no composition_layer_specs, so cleanup would unavoidably hit the snapshot-
|
||||
validation trap. The layer must skip such rows instead of issuing a
|
||||
doomed request."""
|
||||
scope = _default_scope()
|
||||
legacy_session = StoredWorkflowAgentSession(
|
||||
scope=scope,
|
||||
session_snapshot=CompositorSessionSnapshot(layers=[_layer_snapshot("history")]),
|
||||
backend_run_id="legacy-run",
|
||||
composition_layer_specs=[],
|
||||
)
|
||||
session_store = FakeSessionStore(stored=[legacy_session])
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient()
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
assert agent_backend_client.request is None
|
||||
assert session_store.cleaned == []
|
||||
|
||||
|
||||
def test_cleanup_layer_fans_out_to_every_active_session():
|
||||
scopes = [
|
||||
WorkflowAgentSessionScope(
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
workflow_id="workflow-1",
|
||||
workflow_run_id="workflow-run-1",
|
||||
node_id=f"agent-node-{i}",
|
||||
node_execution_id=f"node-exec-{i}",
|
||||
binding_id=f"binding-{i}",
|
||||
agent_id=f"agent-{i}",
|
||||
agent_config_snapshot_id=f"snapshot-{i}",
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
session_store = FakeSessionStore(stored=[_stored_session(scope, index=i) for i, scope in enumerate(scopes, 1)])
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient(run_id="cleanup-run-many")
|
||||
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
|
||||
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
# One cleanup row per stored ACTIVE session, all marked cleaned with the
|
||||
# backend run id returned by the agent backend client.
|
||||
assert [entry[0] for entry in session_store.cleaned] == scopes
|
||||
assert {entry[1] for entry in session_store.cleaned} == {"cleanup-run-many"}
|
||||
|
||||
|
||||
def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog):
|
||||
"""The HTTP cleanup branch must defensively skip when no client was wired.
|
||||
|
||||
This is the deployment-misconfig path: ``_HTTP_CLEANUP_SUPPORTED`` was
|
||||
flipped to ``True`` but ``AGENT_BACKEND_BASE_URL`` is unset, so the
|
||||
factory returned ``None``. The layer must not crash and must not silently
|
||||
retire the row — the warning surfaces the misconfig.
|
||||
"""
|
||||
import logging
|
||||
|
||||
session_store = FakeSessionStore()
|
||||
layer = WorkflowAgentSessionCleanupLayer(
|
||||
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
|
||||
request_builder=AgentBackendRunRequestBuilder(),
|
||||
agent_backend_client=None,
|
||||
)
|
||||
layer._HTTP_CLEANUP_SUPPORTED = True # type: ignore[reportPrivateUsage]
|
||||
variable_pool = VariablePool.from_bootstrap(
|
||||
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
|
||||
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
assert session_store.cleaned == []
|
||||
assert any("no agent backend client is wired in" in record.message for record in caplog.records)
|
||||
|
||||
|
||||
def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog):
|
||||
"""``workflow_run_id`` is the keying field; without it the fanout cannot
|
||||
target a row, so the layer logs a warning and bails."""
|
||||
import logging
|
||||
|
||||
session_store = FakeSessionStore()
|
||||
agent_backend_client = _WaitableFakeAgentBackendRunClient()
|
||||
layer = WorkflowAgentSessionCleanupLayer(
|
||||
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
|
||||
request_builder=AgentBackendRunRequestBuilder(),
|
||||
agent_backend_client=agent_backend_client,
|
||||
)
|
||||
# Bootstrap *without* a workflow_execution_id system variable.
|
||||
variable_pool = VariablePool.from_bootstrap(
|
||||
system_variables=build_system_variables(workflow_execution_id=""),
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
|
||||
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
layer.on_event(GraphRunSucceededEvent(outputs={}))
|
||||
|
||||
assert session_store.list_calls == []
|
||||
assert session_store.cleaned == []
|
||||
assert any("workflow_run_id is missing" in record.message for record in caplog.records)
|
||||
|
||||
|
||||
def test_build_workflow_agent_session_cleanup_layer_returns_layer_without_client_when_unconfigured(
|
||||
monkeypatch,
|
||||
):
|
||||
"""The production builder must pass ``None`` for the agent backend client
|
||||
when neither AGENT_BACKEND_BASE_URL nor AGENT_BACKEND_USE_FAKE is set, so
|
||||
that unit-test environments without backend config don't crash at runner
|
||||
construction."""
|
||||
from configs import dify_config
|
||||
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
|
||||
build_workflow_agent_session_cleanup_layer,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
|
||||
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", False, raising=False)
|
||||
|
||||
layer = build_workflow_agent_session_cleanup_layer()
|
||||
assert layer._agent_backend_client is None # type: ignore[reportPrivateUsage]
|
||||
|
||||
|
||||
def test_build_workflow_agent_session_cleanup_layer_returns_layer_with_fake_client(monkeypatch):
|
||||
"""With ``AGENT_BACKEND_USE_FAKE`` enabled the helper wires in the
|
||||
deterministic fake client without needing a base_url."""
|
||||
from clients.agent_backend.fake_client import FakeAgentBackendRunClient
|
||||
from configs import dify_config
|
||||
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
|
||||
build_workflow_agent_session_cleanup_layer,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
|
||||
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", True, raising=False)
|
||||
monkeypatch.setattr(dify_config, "AGENT_BACKEND_FAKE_SCENARIO", "success", raising=False)
|
||||
|
||||
layer = build_workflow_agent_session_cleanup_layer()
|
||||
assert isinstance(layer._agent_backend_client, FakeAgentBackendRunClient) # type: ignore[reportPrivateUsage]
|
||||
@ -1,286 +0,0 @@
|
||||
"""Unit tests for :mod:`core.workflow.nodes.agent_v2.session_store`.
|
||||
|
||||
Uses the in-memory SQLite engine configured by the project conftest plus a
|
||||
per-test ``CREATE TABLE`` so the real ORM round-trip exercises every store
|
||||
method. Keeps the suite self-contained — no Postgres / Docker required — while
|
||||
still hitting the actual ``session_factory`` code path that production uses.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
from agenton.compositor.schemas import LayerSessionSnapshot
|
||||
from agenton.layers.base import LifecycleState
|
||||
from sqlalchemy import delete
|
||||
|
||||
from clients.agent_backend.request_builder import CleanupLayerSpec
|
||||
from core.db.session_factory import session_factory
|
||||
from core.workflow.nodes.agent_v2.session_store import (
|
||||
StoredWorkflowAgentSession,
|
||||
WorkflowAgentRuntimeSessionStore,
|
||||
WorkflowAgentSessionScope,
|
||||
)
|
||||
from models.agent import WorkflowAgentRuntimeSession, WorkflowAgentRuntimeSessionStatus
|
||||
|
||||
|
||||
def _scope(workflow_run_id: str | None = "wfr-1", binding_id: str = "binding-1") -> WorkflowAgentSessionScope:
|
||||
return WorkflowAgentSessionScope(
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
workflow_id="workflow-1",
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="agent-node",
|
||||
node_execution_id="node-exec-1",
|
||||
binding_id=binding_id,
|
||||
agent_id="agent-1",
|
||||
agent_config_snapshot_id="snapshot-1",
|
||||
)
|
||||
|
||||
|
||||
def _snapshot(messages: int = 1) -> CompositorSessionSnapshot:
|
||||
return CompositorSessionSnapshot(
|
||||
layers=[
|
||||
LayerSessionSnapshot(
|
||||
name="history",
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]},
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _specs() -> list[CleanupLayerSpec]:
|
||||
return [
|
||||
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
|
||||
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _create_table() -> Generator[None, None, None]:
|
||||
"""Create the lifecycle table on the in-memory SQLite engine, drop after."""
|
||||
engine = session_factory.get_session_maker().kw["bind"]
|
||||
WorkflowAgentRuntimeSession.__table__.create(bind=engine, checkfirst=True)
|
||||
yield
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowAgentRuntimeSession))
|
||||
session.commit()
|
||||
WorkflowAgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True)
|
||||
|
||||
|
||||
def test_load_active_snapshot_returns_none_when_scope_has_no_workflow_run_id():
|
||||
"""``workflow_run_id`` is the keying column; no row can match without it."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
assert store.load_active_snapshot(_scope(workflow_run_id=None)) is None
|
||||
|
||||
|
||||
def test_load_active_snapshot_returns_none_when_no_row_matches():
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
assert store.load_active_snapshot(_scope()) is None
|
||||
|
||||
|
||||
def test_save_active_snapshot_creates_row_and_load_round_trips():
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
snapshot = _snapshot(messages=2)
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(), backend_run_id="run-1", snapshot=snapshot, composition_layer_specs=_specs()
|
||||
)
|
||||
|
||||
loaded = store.load_active_snapshot(_scope())
|
||||
assert loaded is not None
|
||||
assert len(loaded.layers) == 1
|
||||
assert loaded.layers[0].name == "history"
|
||||
assert loaded.layers[0].runtime_state["messages"] == snapshot.layers[0].runtime_state["messages"]
|
||||
|
||||
|
||||
def test_save_active_snapshot_skips_when_workflow_run_id_missing():
|
||||
"""Without a workflow_run_id the row cannot be keyed; save is a no-op."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(workflow_run_id=None),
|
||||
backend_run_id="run-skipped",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
assert session.query(WorkflowAgentRuntimeSession).count() == 0
|
||||
|
||||
|
||||
def test_save_active_snapshot_skips_when_snapshot_missing():
|
||||
"""A run that produced no snapshot (e.g. failed agent run) does not write."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-empty",
|
||||
snapshot=None,
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
assert session.query(WorkflowAgentRuntimeSession).count() == 0
|
||||
|
||||
|
||||
def test_save_active_snapshot_updates_existing_row_on_re_entry():
|
||||
"""A second save under the same scope must update in place, not insert."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-1",
|
||||
snapshot=_snapshot(messages=1),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
# Second call with new snapshot + backend_run_id.
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-2",
|
||||
snapshot=_snapshot(messages=2),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
rows = session.query(WorkflowAgentRuntimeSession).all()
|
||||
assert len(rows) == 1
|
||||
assert rows[0].backend_run_id == "run-2"
|
||||
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
|
||||
assert rows[0].cleaned_at is None
|
||||
|
||||
|
||||
def test_save_active_snapshot_resurrects_cleaned_row():
|
||||
"""If a prior cleanup retired the row, a re-entry flips it back to ACTIVE."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-1",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
|
||||
# Save again — the existing row was CLEANED; should be revived.
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-2",
|
||||
snapshot=_snapshot(messages=3),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
rows = session.query(WorkflowAgentRuntimeSession).all()
|
||||
assert len(rows) == 1
|
||||
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
|
||||
assert rows[0].cleaned_at is None
|
||||
assert rows[0].backend_run_id == "run-2"
|
||||
|
||||
|
||||
def test_list_active_sessions_returns_specs_and_snapshot():
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(binding_id="binding-A"),
|
||||
backend_run_id="run-A",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(binding_id="binding-B"),
|
||||
backend_run_id="run-B",
|
||||
snapshot=_snapshot(messages=2),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
|
||||
listed = store.list_active_sessions(workflow_run_id="wfr-1")
|
||||
assert {s.backend_run_id for s in listed} == {"run-A", "run-B"}
|
||||
by_run = {s.backend_run_id: s for s in listed}
|
||||
assert isinstance(by_run["run-A"], StoredWorkflowAgentSession)
|
||||
# Specs round-trip through pydantic TypeAdapter — ensure deserialize works.
|
||||
assert by_run["run-A"].composition_layer_specs[0].name == "workflow_node_job_prompt"
|
||||
assert by_run["run-A"].composition_layer_specs[1].type == "pydantic_ai.history"
|
||||
# node_execution_id default-replaces NULL with "" when the DB column is None.
|
||||
assert by_run["run-A"].scope.node_execution_id == "node-exec-1"
|
||||
|
||||
|
||||
def test_list_active_sessions_skips_cleaned_rows():
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(binding_id="binding-A"),
|
||||
backend_run_id="run-A",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(binding_id="binding-B"),
|
||||
backend_run_id="run-B",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.mark_cleaned(scope=_scope(binding_id="binding-A"), backend_run_id="cleanup-A")
|
||||
|
||||
listed = store.list_active_sessions(workflow_run_id="wfr-1")
|
||||
assert {s.backend_run_id for s in listed} == {"run-B"}
|
||||
|
||||
|
||||
def test_list_active_sessions_handles_legacy_rows_without_specs():
|
||||
"""Rows persisted before composition_layer_specs landed have an empty string."""
|
||||
# Insert a legacy-shape row directly: empty specs payload simulates a row
|
||||
# written before the spec persistence feature landed in A.1.
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-legacy",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=[],
|
||||
)
|
||||
listed = store.list_active_sessions(workflow_run_id="wfr-1")
|
||||
assert len(listed) == 1
|
||||
assert listed[0].composition_layer_specs == []
|
||||
|
||||
|
||||
def test_mark_cleaned_sets_status_and_cleaned_at_with_backend_run_id():
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-1",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
row = session.query(WorkflowAgentRuntimeSession).one()
|
||||
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
|
||||
assert row.cleaned_at is not None
|
||||
assert row.backend_run_id == "cleanup-1"
|
||||
|
||||
|
||||
def test_mark_cleaned_preserves_existing_backend_run_id_when_none_given():
|
||||
"""``backend_run_id=None`` means "leave the previous one in place"."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.save_active_snapshot(
|
||||
scope=_scope(),
|
||||
backend_run_id="run-1",
|
||||
snapshot=_snapshot(),
|
||||
composition_layer_specs=_specs(),
|
||||
)
|
||||
store.mark_cleaned(scope=_scope(), backend_run_id=None)
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
row = session.query(WorkflowAgentRuntimeSession).one()
|
||||
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
|
||||
assert row.backend_run_id == "run-1"
|
||||
|
||||
|
||||
def test_mark_cleaned_is_a_noop_when_no_active_row():
|
||||
"""No matching ACTIVE row → no-op (already-cleaned rows are not re-touched)."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
|
||||
with session_factory.create_session() as session:
|
||||
assert session.query(WorkflowAgentRuntimeSession).count() == 0
|
||||
|
||||
|
||||
def test_mark_cleaned_is_a_noop_when_workflow_run_id_missing():
|
||||
"""Without a workflow_run_id we cannot key the row; ignore the call."""
|
||||
store = WorkflowAgentRuntimeSessionStore()
|
||||
store.mark_cleaned(scope=_scope(workflow_run_id=None), backend_run_id="cleanup-1")
|
||||
# Sanity — no rows created or touched.
|
||||
with session_factory.create_session() as session:
|
||||
assert session.query(WorkflowAgentRuntimeSession).count() == 0
|
||||
@ -6,8 +6,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { startMock } from '../../test/fixtures/dify-mock/server.js'
|
||||
import { loadAppInfoCache } from '../cache/app-info.js'
|
||||
import { createClient } from '../http/client.js'
|
||||
import { CACHE_APP_INFO, cachePath } from '../store/manager.js'
|
||||
import { YamlStore } from '../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../store/dir.js'
|
||||
import { CACHE_APP_INFO, getCache } from '../store/manager.js'
|
||||
import { FieldInfo, FieldParameters } from '../types/app-meta.js'
|
||||
import { AppMetaClient } from './app-meta.js'
|
||||
import { AppsClient } from './apps.js'
|
||||
@ -15,17 +15,24 @@ import { AppsClient } from './apps.js'
|
||||
describe('AppMetaClient', () => {
|
||||
let mock: DifyMock
|
||||
let dir: string
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-meta-'))
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await mock.stop()
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('cache miss → fetch → populate; warm hit skips network', async () => {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
|
||||
const spy = vi.spyOn(apps, 'describe')
|
||||
const client = new AppMetaClient({ apps, host: mock.url, cache })
|
||||
@ -40,7 +47,7 @@ describe('AppMetaClient', () => {
|
||||
})
|
||||
|
||||
it('slim hit + full request triggers fresh fetch + merges', async () => {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
|
||||
const spy = vi.spyOn(apps, 'describe')
|
||||
const client = new AppMetaClient({ apps, host: mock.url, cache })
|
||||
@ -54,7 +61,7 @@ describe('AppMetaClient', () => {
|
||||
})
|
||||
|
||||
it('expired cache entry refetches', async () => {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)), ttlMs: 100, now: () => new Date('2026-05-09T00:00:00Z') })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO), ttlMs: 100, now: () => new Date('2026-05-09T00:00:00Z') })
|
||||
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
|
||||
const spy = vi.spyOn(apps, 'describe')
|
||||
const client = new AppMetaClient({ apps, host: mock.url, cache, now: () => new Date('2026-05-09T00:00:00Z') })
|
||||
@ -68,7 +75,7 @@ describe('AppMetaClient', () => {
|
||||
})
|
||||
|
||||
it('invalidate forces next get to fetch', async () => {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
|
||||
const spy = vi.spyOn(apps, 'describe')
|
||||
const client = new AppMetaClient({ apps, host: mock.url, cache })
|
||||
|
||||
@ -1,101 +0,0 @@
|
||||
import { mkdtemp, rm, stat, writeFile } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_PERM } from '../store/dir.js'
|
||||
import { FileBackend, TOKENS_FILE_NAME } from './file-backend.js'
|
||||
|
||||
describe('FileBackend', () => {
|
||||
let dir: string
|
||||
let backend: FileBackend
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-tokens-'))
|
||||
backend = new FileBackend(dir)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('returns undefined when file is missing', async () => {
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('returns empty list when file is missing', async () => {
|
||||
expect(await backend.list('cloud.dify.ai')).toEqual([])
|
||||
})
|
||||
|
||||
it('round-trips put/get for a single token', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_abc')
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_abc')
|
||||
})
|
||||
|
||||
it('list returns accountIds for the given host', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
await backend.put('cloud.dify.ai', 'acct-2', 'dfoa_b')
|
||||
await backend.put('self.example.com', 'acct-3', 'dfoa_c')
|
||||
const ids = await backend.list('cloud.dify.ai')
|
||||
expect([...ids].sort()).toEqual(['acct-1', 'acct-2'])
|
||||
})
|
||||
|
||||
it('list returns empty array for unknown host', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
expect(await backend.list('other.example.com')).toEqual([])
|
||||
})
|
||||
|
||||
it('delete removes the entry', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
await backend.delete('cloud.dify.ai', 'acct-1')
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('delete is a no-op for missing entries', async () => {
|
||||
await expect(backend.delete('cloud.dify.ai', 'missing')).resolves.toBeUndefined()
|
||||
})
|
||||
|
||||
it('delete prunes empty host entries', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
await backend.delete('cloud.dify.ai', 'acct-1')
|
||||
expect(await backend.list('cloud.dify.ai')).toEqual([])
|
||||
})
|
||||
|
||||
it('overwrites existing token for same host+accountId', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_old')
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_new')
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_new')
|
||||
})
|
||||
|
||||
it('writes file with mode 0600', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
const info = await stat(join(dir, TOKENS_FILE_NAME))
|
||||
expect(info.mode & 0o777).toBe(FILE_PERM)
|
||||
})
|
||||
|
||||
it('rewrites existing file with mode 0600 even if previously permissive', async () => {
|
||||
const path = join(dir, TOKENS_FILE_NAME)
|
||||
await writeFile(path, 'hosts: {}\n', { mode: 0o644 })
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
const info = await stat(path)
|
||||
expect(info.mode & 0o777).toBe(FILE_PERM)
|
||||
})
|
||||
|
||||
it('writes valid YAML readable by a fresh backend', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
const fresh = new FileBackend(dir)
|
||||
expect(await fresh.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_a')
|
||||
})
|
||||
|
||||
it('persists multiple hosts simultaneously', async () => {
|
||||
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
|
||||
await backend.put('self.example.com', 'acct-2', 'dfoa_b')
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_a')
|
||||
expect(await backend.get('self.example.com', 'acct-2')).toBe('dfoa_b')
|
||||
})
|
||||
|
||||
it('treats malformed YAML as empty', async () => {
|
||||
const path = join(dir, TOKENS_FILE_NAME)
|
||||
await writeFile(path, 'not: valid: yaml: [\n', { mode: FILE_PERM })
|
||||
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
})
|
||||
@ -1,99 +0,0 @@
|
||||
import type { TokenStore } from './store.js'
|
||||
import { mkdir, readFile, rename, stat, unlink, writeFile } from 'node:fs/promises'
|
||||
import { join } from 'node:path'
|
||||
import yaml from 'js-yaml'
|
||||
import { DIR_PERM, FILE_PERM } from '../store/dir.js'
|
||||
|
||||
export const TOKENS_FILE_NAME = 'tokens.yml'
|
||||
|
||||
type AccountMap = Record<string, string>
|
||||
type HostMap = Record<string, AccountMap>
|
||||
type TokensFile = { hosts?: HostMap }
|
||||
|
||||
export class FileBackend implements TokenStore {
|
||||
private readonly dir: string
|
||||
private readonly path: string
|
||||
|
||||
constructor(dir: string) {
|
||||
this.dir = dir
|
||||
this.path = join(dir, TOKENS_FILE_NAME)
|
||||
}
|
||||
|
||||
async put(host: string, accountId: string, token: string): Promise<void> {
|
||||
const file = await this.read()
|
||||
const hosts = file.hosts ?? {}
|
||||
const accounts = hosts[host] ?? {}
|
||||
accounts[accountId] = token
|
||||
hosts[host] = accounts
|
||||
await this.write({ hosts })
|
||||
}
|
||||
|
||||
async get(host: string, accountId: string): Promise<string | undefined> {
|
||||
const file = await this.read()
|
||||
return file.hosts?.[host]?.[accountId]
|
||||
}
|
||||
|
||||
async delete(host: string, accountId: string): Promise<void> {
|
||||
const file = await this.read()
|
||||
const accounts = file.hosts?.[host]
|
||||
if (accounts === undefined || !(accountId in accounts))
|
||||
return
|
||||
delete accounts[accountId]
|
||||
if (Object.keys(accounts).length === 0 && file.hosts !== undefined)
|
||||
delete file.hosts[host]
|
||||
await this.write(file)
|
||||
}
|
||||
|
||||
async list(host: string): Promise<readonly string[]> {
|
||||
const file = await this.read()
|
||||
const accounts = file.hosts?.[host]
|
||||
return accounts === undefined ? [] : Object.keys(accounts)
|
||||
}
|
||||
|
||||
private async read(): Promise<TokensFile> {
|
||||
let raw: string
|
||||
try {
|
||||
raw = await readFile(this.path, 'utf8')
|
||||
}
|
||||
catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code === 'ENOENT')
|
||||
return {}
|
||||
throw err
|
||||
}
|
||||
let parsed: unknown
|
||||
try {
|
||||
parsed = yaml.load(raw)
|
||||
}
|
||||
catch {
|
||||
return {}
|
||||
}
|
||||
if (parsed === null || typeof parsed !== 'object')
|
||||
return {}
|
||||
return parsed as TokensFile
|
||||
}
|
||||
|
||||
private async write(file: TokensFile): Promise<void> {
|
||||
await mkdir(this.dir, { recursive: true, mode: DIR_PERM })
|
||||
const body = yaml.dump(file, { lineWidth: -1, noRefs: true })
|
||||
const tmp = `${this.path}.tmp.${process.pid}.${Date.now()}`
|
||||
try {
|
||||
await writeFile(tmp, body, { mode: FILE_PERM })
|
||||
await rename(tmp, this.path)
|
||||
}
|
||||
catch (err) {
|
||||
try {
|
||||
await unlink(tmp)
|
||||
}
|
||||
catch { /* tmp may not exist */ }
|
||||
throw err
|
||||
}
|
||||
try {
|
||||
const info = await stat(this.path)
|
||||
if ((info.mode & 0o777) !== FILE_PERM) {
|
||||
const { chmod } = await import('node:fs/promises')
|
||||
await chmod(this.path, FILE_PERM)
|
||||
}
|
||||
}
|
||||
catch { /* best-effort permission tighten */ }
|
||||
}
|
||||
}
|
||||
@ -1,9 +1,9 @@
|
||||
import { mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_PERM } from '../store/dir.js'
|
||||
import { HOSTS_FILE_NAME, HostsBundleSchema, loadHosts, saveHosts } from './hosts.js'
|
||||
import { ENV_CONFIG_DIR } from '../store/dir.js'
|
||||
import { HostsBundleSchema, loadHosts, saveHosts } from './hosts.js'
|
||||
|
||||
describe('HostsBundleSchema', () => {
|
||||
it('parses a minimal logged-out bundle', () => {
|
||||
@ -46,86 +46,86 @@ describe('HostsBundleSchema', () => {
|
||||
})
|
||||
expect(parsed.available_workspaces).toHaveLength(2)
|
||||
})
|
||||
|
||||
it('drops unknown top-level fields on parse', () => {
|
||||
const parsed = HostsBundleSchema.parse({
|
||||
current_host: 'cloud.dify.ai',
|
||||
future_field: 42,
|
||||
token_storage: 'file',
|
||||
})
|
||||
expect(parsed.current_host).toBe('cloud.dify.ai')
|
||||
expect((parsed as Record<string, unknown>).future_field).toBeUndefined()
|
||||
})
|
||||
})
|
||||
|
||||
describe('loadHosts/saveHosts', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-hosts-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('returns undefined when file is missing', async () => {
|
||||
expect(await loadHosts(dir)).toBeUndefined()
|
||||
it('returns undefined when nothing was saved', () => {
|
||||
expect(loadHosts()).toBeUndefined()
|
||||
})
|
||||
|
||||
it('round-trips bundle through YAML', async () => {
|
||||
await saveHosts(dir, {
|
||||
it('round-trips a fully-populated bundle', () => {
|
||||
saveHosts({
|
||||
current_host: 'cloud.dify.ai',
|
||||
scheme: 'https',
|
||||
account: { id: 'acct-1', email: 'a@b.c', name: 'A' },
|
||||
workspace: { id: 'ws-1', name: 'My Space', role: 'owner' },
|
||||
available_workspaces: [
|
||||
{ id: 'ws-1', name: 'My Space', role: 'owner' },
|
||||
{ id: 'ws-2', name: 'Other', role: 'normal' },
|
||||
],
|
||||
token_storage: 'keychain',
|
||||
token_id: 'tok_xyz',
|
||||
})
|
||||
const loaded = await loadHosts(dir)
|
||||
const loaded = loadHosts()
|
||||
expect(loaded?.current_host).toBe('cloud.dify.ai')
|
||||
expect(loaded?.scheme).toBe('https')
|
||||
expect(loaded?.account?.email).toBe('a@b.c')
|
||||
expect(loaded?.workspace?.id).toBe('ws-1')
|
||||
expect(loaded?.available_workspaces).toHaveLength(2)
|
||||
expect(loaded?.token_storage).toBe('keychain')
|
||||
expect(loaded?.token_id).toBe('tok_xyz')
|
||||
})
|
||||
|
||||
it('round-trips a file-mode bundle with bearer token', () => {
|
||||
saveHosts({
|
||||
current_host: 'self.example.com',
|
||||
token_storage: 'file',
|
||||
tokens: { bearer: 'dfoa_test' },
|
||||
})
|
||||
const loaded = loadHosts()
|
||||
expect(loaded?.tokens?.bearer).toBe('dfoa_test')
|
||||
expect(loaded?.token_storage).toBe('file')
|
||||
})
|
||||
|
||||
it('overwrites previous bundle on save', () => {
|
||||
saveHosts({ current_host: 'old.example.com', token_storage: 'file' })
|
||||
saveHosts({ current_host: 'new.example.com', token_storage: 'keychain' })
|
||||
const loaded = loadHosts()
|
||||
expect(loaded?.current_host).toBe('new.example.com')
|
||||
expect(loaded?.token_storage).toBe('keychain')
|
||||
})
|
||||
|
||||
it('writes file with mode 0600', async () => {
|
||||
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
|
||||
const info = await stat(join(dir, HOSTS_FILE_NAME))
|
||||
expect(info.mode & 0o777).toBe(FILE_PERM)
|
||||
})
|
||||
|
||||
it('rewrites permissive existing file with mode 0600', async () => {
|
||||
const path = join(dir, HOSTS_FILE_NAME)
|
||||
await writeFile(path, 'current_host: ""\ntoken_storage: file\n', { mode: 0o644 })
|
||||
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
|
||||
const info = await stat(path)
|
||||
expect(info.mode & 0o777).toBe(FILE_PERM)
|
||||
})
|
||||
|
||||
it('atomic write: temp file does not survive on success', async () => {
|
||||
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
|
||||
const { readdir } = await import('node:fs/promises')
|
||||
const entries = await readdir(dir)
|
||||
expect(entries.filter(n => n.includes('.tmp.'))).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('drops unknown top-level fields', async () => {
|
||||
const path = join(dir, HOSTS_FILE_NAME)
|
||||
await writeFile(path, 'current_host: cloud.dify.ai\nfuture_field: 42\ntoken_storage: file\n', { mode: FILE_PERM })
|
||||
const loaded = await loadHosts(dir)
|
||||
expect(loaded?.current_host).toBe('cloud.dify.ai')
|
||||
expect((loaded as Record<string, unknown> | undefined)?.future_field).toBeUndefined()
|
||||
})
|
||||
|
||||
it('throws on malformed YAML', async () => {
|
||||
const path = join(dir, HOSTS_FILE_NAME)
|
||||
await writeFile(path, ': : :\n', { mode: FILE_PERM })
|
||||
await expect(loadHosts(dir)).rejects.toThrow()
|
||||
})
|
||||
|
||||
it('throws when YAML contradicts schema', async () => {
|
||||
const path = join(dir, HOSTS_FILE_NAME)
|
||||
await writeFile(path, 'token_storage: cloud\n', { mode: FILE_PERM })
|
||||
await expect(loadHosts(dir)).rejects.toThrow()
|
||||
})
|
||||
|
||||
it('produces YAML with stable keys', async () => {
|
||||
await saveHosts(dir, {
|
||||
it('rejects invalid input at save time', () => {
|
||||
expect(() => saveHosts({
|
||||
current_host: 'cloud.dify.ai',
|
||||
token_storage: 'file',
|
||||
tokens: { bearer: 'dfoa_x' },
|
||||
})
|
||||
const raw = await readFile(join(dir, HOSTS_FILE_NAME), 'utf8')
|
||||
expect(raw).toContain('current_host: cloud.dify.ai')
|
||||
expect(raw).toContain('bearer: dfoa_x')
|
||||
token_storage: 'cloud',
|
||||
} as never)).toThrow()
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,10 +1,5 @@
|
||||
import { mkdir, readFile, rename, unlink, writeFile } from 'node:fs/promises'
|
||||
import { join } from 'node:path'
|
||||
import yaml from 'js-yaml'
|
||||
import { z } from 'zod'
|
||||
import { DIR_PERM, FILE_PERM } from '../store/dir.js'
|
||||
|
||||
export const HOSTS_FILE_NAME = 'hosts.yml'
|
||||
import { getHostStore } from '../store/manager.js'
|
||||
|
||||
const StorageModeSchema = z.enum(['keychain', 'file'])
|
||||
export type StorageMode = z.infer<typeof StorageModeSchema>
|
||||
@ -48,53 +43,14 @@ export const HostsBundleSchema = z.object({
|
||||
})
|
||||
export type HostsBundle = z.infer<typeof HostsBundleSchema>
|
||||
|
||||
export async function loadHosts(dir: string): Promise<HostsBundle | undefined> {
|
||||
const path = join(dir, HOSTS_FILE_NAME)
|
||||
let raw: string
|
||||
try {
|
||||
raw = await readFile(path, 'utf8')
|
||||
}
|
||||
catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code === 'ENOENT')
|
||||
return undefined
|
||||
throw err
|
||||
}
|
||||
const parsed = yaml.load(raw)
|
||||
return HostsBundleSchema.parse(parsed ?? {})
|
||||
export function loadHosts(): HostsBundle | undefined {
|
||||
const raw = getHostStore().getTyped<Record<string, unknown>>()
|
||||
if (raw === null)
|
||||
return undefined
|
||||
return HostsBundleSchema.parse(raw)
|
||||
}
|
||||
|
||||
export async function saveHosts(dir: string, bundle: HostsBundle): Promise<void> {
|
||||
await mkdir(dir, { recursive: true, mode: DIR_PERM })
|
||||
export function saveHosts(bundle: HostsBundle): void {
|
||||
const validated = HostsBundleSchema.parse(bundle)
|
||||
const body = yaml.dump(stripUndefined(validated), { lineWidth: -1, noRefs: true, sortKeys: false })
|
||||
const target = join(dir, HOSTS_FILE_NAME)
|
||||
const tmp = `${target}.tmp.${process.pid}.${Date.now()}`
|
||||
try {
|
||||
await writeFile(tmp, body, { mode: FILE_PERM })
|
||||
await rename(tmp, target)
|
||||
}
|
||||
catch (err) {
|
||||
try {
|
||||
await unlink(tmp)
|
||||
}
|
||||
catch { /* tmp may not exist */ }
|
||||
throw err
|
||||
}
|
||||
const { chmod, stat } = await import('node:fs/promises')
|
||||
try {
|
||||
const info = await stat(target)
|
||||
if ((info.mode & 0o777) !== FILE_PERM)
|
||||
await chmod(target, FILE_PERM)
|
||||
}
|
||||
catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
function stripUndefined<T extends Record<string, unknown>>(input: T): Record<string, unknown> {
|
||||
const out: Record<string, unknown> = {}
|
||||
for (const [k, v] of Object.entries(input)) {
|
||||
if (v === undefined)
|
||||
continue
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
getHostStore().setTyped(validated)
|
||||
}
|
||||
|
||||
@ -1,111 +0,0 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
const passwords = new Map<string, string>()
|
||||
const setPassword = vi.fn()
|
||||
const getPassword = vi.fn()
|
||||
const deletePassword = vi.fn()
|
||||
|
||||
class FakeAsyncEntry {
|
||||
private readonly key: string
|
||||
constructor(service: string, username: string) {
|
||||
this.key = `${service}::${username}`
|
||||
}
|
||||
|
||||
async setPassword(value: string): Promise<void> {
|
||||
setPassword(this.key, value)
|
||||
passwords.set(this.key, value)
|
||||
}
|
||||
|
||||
async getPassword(): Promise<string | undefined> {
|
||||
getPassword(this.key)
|
||||
return passwords.get(this.key)
|
||||
}
|
||||
|
||||
async deletePassword(): Promise<boolean> {
|
||||
deletePassword(this.key)
|
||||
if (!passwords.has(this.key))
|
||||
return false
|
||||
passwords.delete(this.key)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
vi.mock('@napi-rs/keyring', () => ({
|
||||
AsyncEntry: FakeAsyncEntry,
|
||||
}))
|
||||
|
||||
const { KEYRING_SERVICE, KeyringBackend } = await import('./keyring-backend.js')
|
||||
|
||||
beforeEach(() => {
|
||||
passwords.clear()
|
||||
setPassword.mockClear()
|
||||
getPassword.mockClear()
|
||||
deletePassword.mockClear()
|
||||
})
|
||||
|
||||
describe('KeyringBackend', () => {
|
||||
it('uses service name "difyctl"', () => {
|
||||
expect(KEYRING_SERVICE).toBe('difyctl')
|
||||
})
|
||||
|
||||
it('returns undefined when no password is stored', async () => {
|
||||
const k = new KeyringBackend()
|
||||
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('round-trips put/get', async () => {
|
||||
const k = new KeyringBackend()
|
||||
await k.put('cloud.dify.ai', 'acct-1', 'dfoa_x')
|
||||
expect(await k.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_x')
|
||||
})
|
||||
|
||||
it('keys by host::accountId', async () => {
|
||||
const k = new KeyringBackend()
|
||||
await k.put('cloud.dify.ai', 'acct-1', 'A')
|
||||
await k.put('cloud.dify.ai', 'acct-2', 'B')
|
||||
expect(await k.get('cloud.dify.ai', 'acct-1')).toBe('A')
|
||||
expect(await k.get('cloud.dify.ai', 'acct-2')).toBe('B')
|
||||
})
|
||||
|
||||
it('delete removes the entry', async () => {
|
||||
const k = new KeyringBackend()
|
||||
await k.put('cloud.dify.ai', 'acct-1', 'A')
|
||||
await k.delete('cloud.dify.ai', 'acct-1')
|
||||
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('delete is a no-op for missing entries', async () => {
|
||||
const k = new KeyringBackend()
|
||||
await expect(k.delete('cloud.dify.ai', 'gone')).resolves.toBeUndefined()
|
||||
})
|
||||
|
||||
it('list returns empty array (keyring does not enumerate)', async () => {
|
||||
const k = new KeyringBackend()
|
||||
await k.put('cloud.dify.ai', 'acct-1', 'A')
|
||||
expect(await k.list('cloud.dify.ai')).toEqual([])
|
||||
})
|
||||
|
||||
it('swallows getPassword exceptions and returns undefined', async () => {
|
||||
const k = new KeyringBackend()
|
||||
getPassword.mockImplementationOnce(() => {
|
||||
throw new Error('NoEntry')
|
||||
})
|
||||
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('swallows delete exceptions', async () => {
|
||||
const k = new KeyringBackend()
|
||||
deletePassword.mockImplementationOnce(() => {
|
||||
throw new Error('NoEntry')
|
||||
})
|
||||
await expect(k.delete('cloud.dify.ai', 'acct-1')).resolves.toBeUndefined()
|
||||
})
|
||||
|
||||
it('lets put propagate exceptions (caller decides fallback)', async () => {
|
||||
const k = new KeyringBackend()
|
||||
setPassword.mockImplementationOnce(() => {
|
||||
throw new Error('keyring locked')
|
||||
})
|
||||
await expect(k.put('cloud.dify.ai', 'acct-1', 'tok')).rejects.toThrow(/keyring locked/)
|
||||
})
|
||||
})
|
||||
@ -1,35 +0,0 @@
|
||||
import type { TokenStore } from './store.js'
|
||||
import { AsyncEntry } from '@napi-rs/keyring'
|
||||
|
||||
export const KEYRING_SERVICE = 'difyctl'
|
||||
|
||||
function username(host: string, accountId: string): string {
|
||||
return `${host}::${accountId}`
|
||||
}
|
||||
|
||||
export class KeyringBackend implements TokenStore {
|
||||
async put(host: string, accountId: string, token: string): Promise<void> {
|
||||
await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).setPassword(token)
|
||||
}
|
||||
|
||||
async get(host: string, accountId: string): Promise<string | undefined> {
|
||||
try {
|
||||
const v = await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).getPassword()
|
||||
return v ?? undefined
|
||||
}
|
||||
catch {
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
|
||||
async delete(host: string, accountId: string): Promise<void> {
|
||||
try {
|
||||
await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).deletePassword()
|
||||
}
|
||||
catch { /* missing entry is fine */ }
|
||||
}
|
||||
|
||||
async list(_host: string): Promise<readonly string[]> {
|
||||
return []
|
||||
}
|
||||
}
|
||||
@ -1,75 +0,0 @@
|
||||
import type { TokenStore } from './store.js'
|
||||
import { describe, expect, it, vi } from 'vitest'
|
||||
import { selectStore } from './store.js'
|
||||
|
||||
function memBackend(label: string): TokenStore & { _label: string } {
|
||||
const map = new Map<string, string>()
|
||||
const k = (h: string, a: string) => `${h}::${a}`
|
||||
return {
|
||||
_label: label,
|
||||
async put(h, a, t) { map.set(k(h, a), t) },
|
||||
async get(h, a) { return map.get(k(h, a)) },
|
||||
async delete(h, a) { map.delete(k(h, a)) },
|
||||
async list() { return [] },
|
||||
}
|
||||
}
|
||||
|
||||
describe('selectStore', () => {
|
||||
it('returns keychain when probe succeeds', async () => {
|
||||
const k = memBackend('keyring')
|
||||
const f = memBackend('file')
|
||||
const result = await selectStore({
|
||||
configDir: '/tmp/x',
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('keychain')
|
||||
expect(result.store).toBe(k)
|
||||
})
|
||||
|
||||
it('falls back to file when keyring put throws', async () => {
|
||||
const k = memBackend('keyring')
|
||||
const f = memBackend('file')
|
||||
k.put = vi.fn().mockRejectedValue(new Error('locked'))
|
||||
const result = await selectStore({
|
||||
configDir: '/tmp/x',
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('falls back to file when probe round-trip mismatches', async () => {
|
||||
const k = memBackend('keyring')
|
||||
const f = memBackend('file')
|
||||
k.get = vi.fn().mockResolvedValue('something-else')
|
||||
const result = await selectStore({
|
||||
configDir: '/tmp/x',
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('falls back to file when keyring constructor throws', async () => {
|
||||
const f = memBackend('file')
|
||||
const result = await selectStore({
|
||||
configDir: '/tmp/x',
|
||||
factory: {
|
||||
keyring: () => { throw new Error('no backend') },
|
||||
file: () => f,
|
||||
},
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('cleans up probe entry after successful probe', async () => {
|
||||
const k = memBackend('keyring')
|
||||
const f = memBackend('file')
|
||||
await selectStore({
|
||||
configDir: '/tmp/x',
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(await k.get('__difyctl_probe__', '__probe__')).toBeUndefined()
|
||||
})
|
||||
})
|
||||
@ -1,40 +0,0 @@
|
||||
import { FileBackend } from './file-backend.js'
|
||||
import { KeyringBackend } from './keyring-backend.js'
|
||||
|
||||
export type TokenStore = {
|
||||
put: (host: string, accountId: string, token: string) => Promise<void>
|
||||
get: (host: string, accountId: string) => Promise<string | undefined>
|
||||
delete: (host: string, accountId: string) => Promise<void>
|
||||
list: (host: string) => Promise<readonly string[]>
|
||||
}
|
||||
|
||||
export type StorageMode = 'keychain' | 'file'
|
||||
|
||||
export type SelectStoreOptions = {
|
||||
readonly configDir: string
|
||||
readonly factory?: {
|
||||
readonly keyring?: () => TokenStore
|
||||
readonly file?: (dir: string) => TokenStore
|
||||
}
|
||||
}
|
||||
|
||||
const PROBE_HOST = '__difyctl_probe__'
|
||||
const PROBE_ACCOUNT = '__probe__'
|
||||
const PROBE_VALUE = 'probe-v1'
|
||||
|
||||
export async function selectStore(opts: SelectStoreOptions): Promise<{ store: TokenStore, mode: StorageMode }> {
|
||||
const fileFactory = opts.factory?.file ?? ((dir: string) => new FileBackend(dir))
|
||||
const keyringFactory = opts.factory?.keyring ?? (() => new KeyringBackend())
|
||||
try {
|
||||
const k = keyringFactory()
|
||||
await k.put(PROBE_HOST, PROBE_ACCOUNT, PROBE_VALUE)
|
||||
const got = await k.get(PROBE_HOST, PROBE_ACCOUNT)
|
||||
await k.delete(PROBE_HOST, PROBE_ACCOUNT)
|
||||
if (got !== PROBE_VALUE)
|
||||
throw new Error('keyring round-trip mismatch')
|
||||
return { store: k, mode: 'keychain' }
|
||||
}
|
||||
catch {
|
||||
return { store: fileFactory(opts.configDir), mode: 'file' }
|
||||
}
|
||||
}
|
||||
31
cli/src/cache/app-info.test.ts
vendored
31
cli/src/cache/app-info.test.ts
vendored
@ -4,8 +4,8 @@ import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import yaml from 'js-yaml'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { CACHE_APP_INFO, cachePath } from '../store/manager.js'
|
||||
import { YamlStore } from '../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../store/dir.js'
|
||||
import { CACHE_APP_INFO, cachePath, getCache } from '../store/manager.js'
|
||||
import { platform } from '../sys/index.js'
|
||||
import { FieldInfo, FieldParameters } from '../types/app-meta.js'
|
||||
import { APP_INFO_TTL_MS, loadAppInfoCache } from './app-info.js'
|
||||
@ -35,18 +35,25 @@ function metaInfoOnly(): AppMeta {
|
||||
|
||||
describe('app-info disk cache', () => {
|
||||
let dir: string
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-cache-'))
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('round-trips an entry across reloads', async () => {
|
||||
const c1 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c1 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await c1.set('http://localhost:9999', 'app-1', metaInfoOnly())
|
||||
|
||||
const c2 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c2 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const got = c2.get('http://localhost:9999', 'app-1')
|
||||
expect(got).toBeDefined()
|
||||
expect(got?.meta.info?.id).toBe('app-1')
|
||||
@ -55,7 +62,7 @@ describe('app-info disk cache', () => {
|
||||
|
||||
it('isFresh respects TTL', async () => {
|
||||
const now = new Date('2026-05-09T00:00:00Z')
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)), now: () => now })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO), now: () => now })
|
||||
await c.set('h', 'app-1', metaInfoOnly())
|
||||
const r = c.get('h', 'app-1')
|
||||
expect(r).toBeDefined()
|
||||
@ -66,23 +73,23 @@ describe('app-info disk cache', () => {
|
||||
})
|
||||
|
||||
it('keys by (host, app_id) — different hosts isolate', async () => {
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await c.set('h1', 'app-1', metaInfoOnly())
|
||||
expect(c.get('h2', 'app-1')).toBeUndefined()
|
||||
expect(c.get('h1', 'app-1')).toBeDefined()
|
||||
})
|
||||
|
||||
it('delete removes entry from disk', async () => {
|
||||
const c1 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c1 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await c1.set('h', 'app-1', metaInfoOnly())
|
||||
await c1.delete('h', 'app-1')
|
||||
|
||||
const c2 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c2 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
expect(c2.get('h', 'app-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('writes file with 0600 permission', async () => {
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await c.set('h', 'app-1', metaInfoOnly())
|
||||
const { stat } = await import('node:fs/promises')
|
||||
const s = await stat(appInfoPath(dir))
|
||||
@ -91,19 +98,19 @@ describe('app-info disk cache', () => {
|
||||
})
|
||||
|
||||
it('missing cache file is not an error', async () => {
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
expect(c.get('h', 'app-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('corrupt cache file is treated as empty', async () => {
|
||||
const { writeFile } = await import('node:fs/promises')
|
||||
await writeFile(appInfoPath(dir), ': : not valid yaml', 'utf8')
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
expect(c.get('h', 'app-1')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('updates same key in place (no growth)', async () => {
|
||||
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await c.set('h', 'app-1', metaInfoOnly())
|
||||
const slim: AppMeta = {
|
||||
...metaInfoOnly(),
|
||||
|
||||
31
cli/src/cache/nudge-store.test.ts
vendored
31
cli/src/cache/nudge-store.test.ts
vendored
@ -3,8 +3,8 @@ import { tmpdir } from 'node:os'
|
||||
import { dirname, join } from 'node:path'
|
||||
import yaml from 'js-yaml'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { CACHE_NUDGE, cachePath } from '../store/manager.js'
|
||||
import { YamlStore } from '../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../store/dir.js'
|
||||
import { CACHE_NUDGE, cachePath, getCache } from '../store/manager.js'
|
||||
import { loadNudgeStore, WARN_INTERVAL_MS } from './nudge-store.js'
|
||||
|
||||
function nudgeStorePath(dir: string): string {
|
||||
@ -15,21 +15,28 @@ const HOST = 'https://cloud.dify.ai'
|
||||
|
||||
describe('NudgeStore', () => {
|
||||
let dir: string
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-nudge-'))
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('canWarn=true when no prior record exists', async () => {
|
||||
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)) })
|
||||
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE) })
|
||||
expect(store.canWarn(HOST)).toBe(true)
|
||||
})
|
||||
|
||||
it('canWarn=false within the silence window, true past it', async () => {
|
||||
const t0 = new Date('2026-05-19T12:00:00.000Z')
|
||||
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
|
||||
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
|
||||
await store.markWarned(HOST)
|
||||
expect(store.canWarn(HOST, new Date('2026-05-19T18:00:00.000Z'))).toBe(false)
|
||||
expect(store.canWarn(HOST, new Date('2026-05-20T12:00:00.000Z'))).toBe(true)
|
||||
@ -37,7 +44,7 @@ describe('NudgeStore', () => {
|
||||
|
||||
it('canWarn clamps negative elapsed under clock skew (treats as still in window)', async () => {
|
||||
const t0 = new Date('2026-05-19T12:00:00.000Z')
|
||||
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
|
||||
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
|
||||
await store.markWarned(HOST)
|
||||
const pastClock = new Date('2026-05-19T11:00:00.000Z') // clock moved backwards 1h
|
||||
expect(store.canWarn(HOST, pastClock)).toBe(false)
|
||||
@ -45,22 +52,22 @@ describe('NudgeStore', () => {
|
||||
|
||||
it('markWarned persists across store reloads', async () => {
|
||||
const t0 = new Date('2026-05-19T12:00:00.000Z')
|
||||
const s1 = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
|
||||
const s1 = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
|
||||
await s1.markWarned(HOST)
|
||||
const s2 = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
|
||||
const s2 = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
|
||||
expect(s2.canWarn(HOST)).toBe(false)
|
||||
})
|
||||
|
||||
it('treats a corrupt cache file as empty', async () => {
|
||||
const path = nudgeStorePath(dir)
|
||||
await writeCacheFile(path, '{ not valid json')
|
||||
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)) })
|
||||
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE) })
|
||||
expect(store.canWarn(HOST)).toBe(true)
|
||||
})
|
||||
|
||||
it('writes ISO timestamps under warned/<host> on disk', async () => {
|
||||
const t = new Date('2026-05-19T12:00:00.000Z')
|
||||
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
|
||||
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
|
||||
await store.markWarned(HOST)
|
||||
const raw = await readFile(nudgeStorePath(dir), 'utf8')
|
||||
const parsed = yaml.load(raw) as Record<string, unknown>
|
||||
@ -72,11 +79,11 @@ describe('NudgeStore', () => {
|
||||
// warns about a different host. Without merge-on-write the second writer
|
||||
// would clobber the first.
|
||||
const t = new Date('2026-05-19T12:00:00.000Z')
|
||||
const a = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
|
||||
const b = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
|
||||
const a = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
|
||||
const b = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
|
||||
await a.markWarned('https://a.example')
|
||||
await b.markWarned('https://b.example')
|
||||
const reread = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
|
||||
const reread = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
|
||||
expect(reread.canWarn('https://a.example')).toBe(false)
|
||||
expect(reread.canWarn('https://b.example')).toBe(false)
|
||||
})
|
||||
|
||||
@ -12,7 +12,6 @@ import { BaseError } from '../../errors/base.js'
|
||||
import { ErrorCode } from '../../errors/codes.js'
|
||||
import { formatErrorForCli } from '../../errors/format.js'
|
||||
import { createClient } from '../../http/client.js'
|
||||
import { resolveConfigDir } from '../../store/dir.js'
|
||||
import { realStreams } from '../../sys/io/streams'
|
||||
import { hostWithScheme } from '../../util/host.js'
|
||||
import { versionInfo } from '../../version/info.js'
|
||||
@ -24,7 +23,6 @@ export type AuthedContext = {
|
||||
readonly http: KyInstance
|
||||
readonly host: string
|
||||
readonly io: IOStreams
|
||||
readonly configDir: string
|
||||
readonly cache?: AppInfoCache
|
||||
}
|
||||
|
||||
@ -38,9 +36,8 @@ export async function buildAuthedContext(
|
||||
cmd: Pick<Command, 'error'>,
|
||||
opts: AuthedContextOptions,
|
||||
): Promise<AuthedContext> {
|
||||
const configDir = resolveConfigDir()
|
||||
const io = realStreams(opts.format ?? '')
|
||||
const bundle = await loadHosts(configDir)
|
||||
const bundle = loadHosts()
|
||||
if (bundle === undefined || bundle.tokens?.bearer === undefined || bundle.tokens.bearer === '') {
|
||||
const err = new BaseError({
|
||||
code: ErrorCode.NotLoggedIn,
|
||||
@ -61,7 +58,7 @@ export async function buildAuthedContext(
|
||||
|
||||
await runCompatNudge({ host, io })
|
||||
|
||||
return { bundle, http, host, io, configDir, cache }
|
||||
return { bundle, http, host, io, cache }
|
||||
}
|
||||
|
||||
// Best-effort nudge: never throws, never blocks. Lives here so every authed
|
||||
|
||||
@ -2,7 +2,7 @@ import type { SessionListResponse, SessionRow } from '@dify/contracts/api/openap
|
||||
import type { DifyMock } from '../../../../../test/fixtures/dify-mock/server.js'
|
||||
import type { AccountSessionsClient } from '../../../../api/account-sessions.js'
|
||||
import type { HostsBundle } from '../../../../auth/hosts.js'
|
||||
import type { TokenStore } from '../../../../auth/store.js'
|
||||
import type { Key, Store } from '../../../../store/store.js'
|
||||
import { mkdtemp, readFile, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
@ -10,26 +10,23 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { startMock } from '../../../../../test/fixtures/dify-mock/server.js'
|
||||
import { saveHosts } from '../../../../auth/hosts.js'
|
||||
import { createClient } from '../../../../http/client.js'
|
||||
import { ENV_CONFIG_DIR, resolveConfigDir } from '../../../../store/dir.js'
|
||||
import { tokenKey } from '../../../../store/manager.js'
|
||||
import { bufferStreams } from '../../../../sys/io/streams'
|
||||
import { listAllSessions, runDevicesList, runDevicesRevoke } from './devices.js'
|
||||
|
||||
class MemStore implements TokenStore {
|
||||
readonly entries = new Map<string, string>()
|
||||
async put(host: string, accountId: string, token: string): Promise<void> {
|
||||
this.entries.set(`${host}::${accountId}`, token)
|
||||
class MemStore implements Store {
|
||||
readonly entries = new Map<string, unknown>()
|
||||
get<T>(key: Key<T>): T {
|
||||
return (this.entries.get(key.key) as T | undefined) ?? key.default
|
||||
}
|
||||
|
||||
async get(host: string, accountId: string): Promise<string | undefined> {
|
||||
return this.entries.get(`${host}::${accountId}`)
|
||||
set<T>(key: Key<T>, value: T): void {
|
||||
this.entries.set(key.key, value)
|
||||
}
|
||||
|
||||
async delete(host: string, accountId: string): Promise<void> {
|
||||
this.entries.delete(`${host}::${accountId}`)
|
||||
}
|
||||
|
||||
async list(host: string): Promise<readonly string[]> {
|
||||
const prefix = `${host}::`
|
||||
return Array.from(this.entries.keys()).filter(k => k.startsWith(prefix))
|
||||
unset<T>(key: Key<T>): void {
|
||||
this.entries.delete(key.key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,11 +90,18 @@ describe('runDevicesList', () => {
|
||||
describe('runDevicesRevoke', () => {
|
||||
let mock: DifyMock
|
||||
let configDir: string
|
||||
let prevConfigDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
configDir = await mkdtemp(join(tmpdir(), 'difyctl-devrevoke-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = configDir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await mock.stop()
|
||||
await rm(configDir, { recursive: true, force: true })
|
||||
})
|
||||
@ -106,11 +110,11 @@ describe('runDevicesRevoke', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
await store.put(b.current_host, 'acct-1', 'dfoa_test')
|
||||
await saveHosts(configDir, b)
|
||||
store.set(tokenKey(b.current_host, 'acct-1'), 'dfoa_test')
|
||||
saveHosts(b)
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'difyctl on desktop', all: false })
|
||||
await runDevicesRevoke({ io, bundle: b, http, store, target: 'difyctl on desktop', all: false })
|
||||
expect(io.outBuf()).toContain('Revoked 1 session(s)')
|
||||
expect(store.entries.size).toBe(1)
|
||||
})
|
||||
@ -121,7 +125,7 @@ describe('runDevicesRevoke', () => {
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'tok-2', all: false })
|
||||
await runDevicesRevoke({ io, bundle: b, http, store, target: 'tok-2', all: false })
|
||||
expect(io.outBuf()).toContain('Revoked 1 session(s)')
|
||||
})
|
||||
|
||||
@ -131,7 +135,7 @@ describe('runDevicesRevoke', () => {
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'web', all: false })
|
||||
await runDevicesRevoke({ io, bundle: b, http, store, target: 'web', all: false })
|
||||
expect(io.outBuf()).toContain('Revoked 1 session(s)')
|
||||
})
|
||||
|
||||
@ -141,7 +145,7 @@ describe('runDevicesRevoke', () => {
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await expect(runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'difyctl', all: false }))
|
||||
await expect(runDevicesRevoke({ io, bundle: b, http, store, target: 'difyctl', all: false }))
|
||||
.rejects
|
||||
.toThrow(/matches multiple/)
|
||||
})
|
||||
@ -152,7 +156,7 @@ describe('runDevicesRevoke', () => {
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await expect(runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'nonexistent', all: false }))
|
||||
await expect(runDevicesRevoke({ io, bundle: b, http, store, target: 'nonexistent', all: false }))
|
||||
.rejects
|
||||
.toThrow(/no session matches/)
|
||||
})
|
||||
@ -163,7 +167,7 @@ describe('runDevicesRevoke', () => {
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runDevicesRevoke({ configDir, io, bundle: b, http, store, all: true })
|
||||
await runDevicesRevoke({ io, bundle: b, http, store, all: true })
|
||||
expect(io.outBuf()).toContain('Revoked 2 session(s)')
|
||||
})
|
||||
|
||||
@ -171,20 +175,20 @@ describe('runDevicesRevoke', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const b = bundleFor(mock.url, 'tok-1')
|
||||
await store.put(b.current_host, 'acct-1', 'dfoa_test')
|
||||
await saveHosts(configDir, b)
|
||||
store.set(tokenKey(b.current_host, 'acct-1'), 'dfoa_test')
|
||||
saveHosts(b)
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'tok-1', all: false })
|
||||
await runDevicesRevoke({ io, bundle: b, http, store, target: 'tok-1', all: false })
|
||||
expect(store.entries.size).toBe(0)
|
||||
await expect(readFile(join(configDir, 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
|
||||
await expect(readFile(join(resolveConfigDir(), 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
|
||||
})
|
||||
|
||||
it('no target + no --all: throws UsageMissingArg', async () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
await expect(runDevicesRevoke({ configDir, io, bundle: bundleFor(mock.url), http, store, all: false }))
|
||||
await expect(runDevicesRevoke({ io, bundle: bundleFor(mock.url), http, store, all: false }))
|
||||
.rejects
|
||||
.toThrow(/specify a device label/)
|
||||
})
|
||||
|
||||
@ -1,15 +1,13 @@
|
||||
import type { SessionRow } from '@dify/contracts/api/openapi/types.gen'
|
||||
import type { KyInstance } from 'ky'
|
||||
import type { HostsBundle } from '../../../../auth/hosts.js'
|
||||
import type { TokenStore } from '../../../../auth/store.js'
|
||||
import type { Store } from '../../../../store/store.js'
|
||||
import type { IOStreams } from '../../../../sys/io/streams'
|
||||
import { unlink } from 'node:fs/promises'
|
||||
import { join } from 'node:path'
|
||||
import { AccountSessionsClient } from '../../../../api/account-sessions.js'
|
||||
import { HOSTS_FILE_NAME } from '../../../../auth/hosts.js'
|
||||
import { BaseError } from '../../../../errors/base.js'
|
||||
import { ErrorCode } from '../../../../errors/codes.js'
|
||||
import { LIMIT_DEFAULT, LIMIT_MAX, parseLimit } from '../../../../limit/limit.js'
|
||||
import { getHostStore, getTokenStore, tokenKey } from '../../../../store/manager.js'
|
||||
import { colorEnabled, colorScheme } from '../../../../sys/io/color.js'
|
||||
import { runWithSpinner } from '../../../../sys/io/spinner.js'
|
||||
|
||||
@ -72,11 +70,11 @@ export async function listAllSessions(client: AccountSessionsClient): Promise<re
|
||||
}
|
||||
|
||||
export type DevicesRevokeOptions = {
|
||||
readonly configDir: string
|
||||
readonly io: IOStreams
|
||||
readonly bundle: HostsBundle | undefined
|
||||
readonly http: KyInstance
|
||||
readonly store: TokenStore
|
||||
/** Optional override for tests; production code resolves via `getTokenStore`. */
|
||||
readonly store?: Store
|
||||
readonly target?: string
|
||||
readonly all: boolean
|
||||
readonly yes?: boolean
|
||||
@ -104,8 +102,10 @@ export async function runDevicesRevoke(opts: DevicesRevokeOptions): Promise<void
|
||||
for (const id of ids)
|
||||
await sessions.revoke(id)
|
||||
|
||||
if (selfHit)
|
||||
await clearLocal(opts.configDir, b, opts.store)
|
||||
if (selfHit) {
|
||||
const tokens = opts.store ?? (await getTokenStore()).store
|
||||
clearLocal(b, tokens)
|
||||
}
|
||||
|
||||
opts.io.out.write(`${cs.successIcon()} Revoked ${ids.length} session(s)\n`)
|
||||
}
|
||||
@ -179,17 +179,10 @@ function renderTable(rows: readonly SessionRow[], currentId: string): string {
|
||||
return body.length === 0 ? `${fmt(header)}\n` : `${[fmt(header), ...body.map(fmt)].join('\n')}\n`
|
||||
}
|
||||
|
||||
async function clearLocal(configDir: string, bundle: HostsBundle, store: TokenStore): Promise<void> {
|
||||
function clearLocal(bundle: HostsBundle, store: Store): void {
|
||||
const accountId = bundle.account?.id ?? bundle.external_subject?.email ?? 'default'
|
||||
try {
|
||||
await store.delete(bundle.current_host, accountId)
|
||||
store.unset(tokenKey(bundle.current_host, accountId))
|
||||
}
|
||||
catch { /* best-effort */ }
|
||||
try {
|
||||
await unlink(join(configDir, HOSTS_FILE_NAME))
|
||||
}
|
||||
catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import { selectStore } from '../../../../auth/store.js'
|
||||
import { Args, Flags } from '../../../../framework/flags.js'
|
||||
import { DifyCommand } from '../../../_shared/dify-command.js'
|
||||
import { httpRetryFlag } from '../../../_shared/global-flags.js'
|
||||
@ -25,13 +24,10 @@ export default class DevicesRevoke extends DifyCommand {
|
||||
async run(argv: string[]): Promise<void> {
|
||||
const { args, flags } = this.parse(DevicesRevoke, argv)
|
||||
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'] })
|
||||
const { store } = await selectStore({ configDir: ctx.configDir })
|
||||
await runDevicesRevoke({
|
||||
configDir: ctx.configDir,
|
||||
io: ctx.io,
|
||||
bundle: ctx.bundle,
|
||||
http: ctx.http,
|
||||
store,
|
||||
target: args.target,
|
||||
all: flags.all,
|
||||
yes: flags.yes,
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import { Flags } from '../../../framework/flags.js'
|
||||
import { resolveConfigDir } from '../../../store/dir.js'
|
||||
import { realStreams } from '../../../sys/io/streams'
|
||||
import { DifyCommand } from '../../_shared/dify-command.js'
|
||||
import { runLogin } from './login.js'
|
||||
@ -31,7 +30,6 @@ export default class Login extends DifyCommand {
|
||||
async run(argv: string[]): Promise<void> {
|
||||
const { flags } = this.parse(Login, argv)
|
||||
await runLogin({
|
||||
configDir: resolveConfigDir(),
|
||||
io: realStreams(),
|
||||
host: flags.host,
|
||||
noBrowser: flags['no-browser'],
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import type { DifyMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import type { TokenStore } from '../../../auth/store.js'
|
||||
import type { Key, Store } from '../../../store/store.js'
|
||||
import type { Clock } from './device-flow.js'
|
||||
import { mkdtemp, readFile, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
@ -8,6 +8,8 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import { DeviceFlowApi } from '../../../api/oauth-device.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { tokenKey } from '../../../store/manager.js'
|
||||
import { bufferStreams } from '../../../sys/io/streams'
|
||||
import { runLogin } from './login.js'
|
||||
|
||||
@ -18,38 +20,38 @@ const noopClock: Clock = {
|
||||
|
||||
const noopBrowser = async (): Promise<void> => { /* skip OS open */ }
|
||||
|
||||
class MemStore implements TokenStore {
|
||||
readonly entries = new Map<string, string>()
|
||||
async put(host: string, accountId: string, token: string): Promise<void> {
|
||||
this.entries.set(`${host}::${accountId}`, token)
|
||||
class MemStore implements Store {
|
||||
readonly entries = new Map<string, unknown>()
|
||||
get<T>(key: Key<T>): T {
|
||||
return (this.entries.get(key.key) as T | undefined) ?? key.default
|
||||
}
|
||||
|
||||
async get(host: string, accountId: string): Promise<string | undefined> {
|
||||
return this.entries.get(`${host}::${accountId}`)
|
||||
set<T>(key: Key<T>, value: T): void {
|
||||
this.entries.set(key.key, value)
|
||||
}
|
||||
|
||||
async delete(host: string, accountId: string): Promise<void> {
|
||||
this.entries.delete(`${host}::${accountId}`)
|
||||
}
|
||||
|
||||
async list(host: string): Promise<readonly string[]> {
|
||||
const prefix = `${host}::`
|
||||
return Array.from(this.entries.keys())
|
||||
.filter(k => k.startsWith(prefix))
|
||||
.map(k => k.slice(prefix.length))
|
||||
unset<T>(key: Key<T>): void {
|
||||
this.entries.delete(key.key)
|
||||
}
|
||||
}
|
||||
|
||||
describe('runLogin', () => {
|
||||
let mock: DifyMock
|
||||
let configDir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
configDir = await mkdtemp(join(tmpdir(), 'difyctl-login-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = configDir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await mock.stop()
|
||||
await rm(configDir, { recursive: true, force: true })
|
||||
})
|
||||
@ -58,7 +60,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const bundle = await runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
@ -73,7 +74,7 @@ describe('runLogin', () => {
|
||||
expect(bundle.account?.email).toBe('tester@dify.ai')
|
||||
expect(bundle.workspace?.id).toBe('ws-1')
|
||||
expect(bundle.available_workspaces).toHaveLength(2)
|
||||
const stored = await store.get(bundle.current_host, 'acct-1')
|
||||
const stored = store.get(tokenKey(bundle.current_host, 'acct-1'))
|
||||
expect(stored).toBe('dfoa_test')
|
||||
|
||||
const hostsRaw = await readFile(join(configDir, 'hosts.yml'), 'utf8')
|
||||
@ -91,7 +92,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const bundle = await runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
@ -115,7 +115,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
await expect(runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
@ -135,7 +134,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
await expect(runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
@ -152,7 +150,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
await expect(runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
@ -169,7 +166,6 @@ describe('runLogin', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
await runLogin({
|
||||
configDir,
|
||||
io,
|
||||
host: mock.url,
|
||||
noBrowser: true,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import type { CodeResponse, PollSuccess } from '../../../api/oauth-device.js'
|
||||
import type { HostsBundle, StorageMode, Workspace } from '../../../auth/hosts.js'
|
||||
import type { TokenStore } from '../../../auth/store.js'
|
||||
import type { HostsBundle, Workspace } from '../../../auth/hosts.js'
|
||||
import type { StorageMode, Store } from '../../../store/store.js'
|
||||
import type { IOStreams } from '../../../sys/io/streams'
|
||||
import type { BrowserEnv, BrowserOpener } from '../../../util/browser.js'
|
||||
import type { Clock } from './device-flow.js'
|
||||
@ -8,21 +8,20 @@ import * as os from 'node:os'
|
||||
import * as readline from 'node:readline'
|
||||
import { DeviceFlowApi } from '../../../api/oauth-device.js'
|
||||
import { saveHosts } from '../../../auth/hosts.js'
|
||||
import { selectStore } from '../../../auth/store.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { getTokenStore, tokenKey } from '../../../store/manager.js'
|
||||
import { colorEnabled, colorScheme } from '../../../sys/io/color.js'
|
||||
import { decideOpen, OpenDecision, openUrl, realEnv } from '../../../util/browser.js'
|
||||
import { bareHost, DEFAULT_HOST, resolveHost, validateVerificationURI } from '../../../util/host.js'
|
||||
import { awaitAuthorization, realClock } from './device-flow.js'
|
||||
|
||||
export type LoginOptions = {
|
||||
readonly configDir: string
|
||||
readonly io: IOStreams
|
||||
readonly host?: string
|
||||
readonly noBrowser?: boolean
|
||||
readonly insecure?: boolean
|
||||
readonly deviceLabel?: string
|
||||
readonly store?: { readonly store: TokenStore, readonly mode: StorageMode }
|
||||
readonly store?: { readonly store: Store, readonly mode: StorageMode }
|
||||
readonly api?: DeviceFlowApi
|
||||
readonly browserEnv?: BrowserEnv
|
||||
readonly browserOpener?: BrowserOpener
|
||||
@ -59,11 +58,11 @@ export async function runLogin(opts: LoginOptions): Promise<HostsBundle> {
|
||||
|
||||
const success = await awaitAuthorization(api, code, { clock: opts.clock ?? realClock() })
|
||||
|
||||
const storeBundle = opts.store ?? await selectStore({ configDir: opts.configDir })
|
||||
const storeBundle = opts.store ?? await getTokenStore()
|
||||
const bundle = bundleFromSuccess(host, success, storeBundle.mode)
|
||||
|
||||
await storeBundle.store.put(bundle.current_host, accountKey(bundle), success.token)
|
||||
await saveHosts(opts.configDir, bundle)
|
||||
storeBundle.store.set(tokenKey(bundle.current_host, accountKey(bundle)), success.token)
|
||||
saveHosts(bundle)
|
||||
|
||||
renderLoggedIn(opts.io.out, cs, host, success)
|
||||
return bundle
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
import type { KyInstance } from 'ky'
|
||||
import { loadHosts } from '../../../auth/hosts.js'
|
||||
import { selectStore } from '../../../auth/store.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { resolveConfigDir } from '../../../store/dir.js'
|
||||
import { runWithSpinner } from '../../../sys/io/spinner.js'
|
||||
import { realStreams } from '../../../sys/io/streams'
|
||||
import { hostWithScheme } from '../../../util/host.js'
|
||||
@ -18,9 +16,7 @@ export default class Logout extends DifyCommand {
|
||||
|
||||
async run(argv: string[]): Promise<void> {
|
||||
this.parse(Logout, argv)
|
||||
const configDir = resolveConfigDir()
|
||||
const bundle = await loadHosts(configDir)
|
||||
const { store } = await selectStore({ configDir })
|
||||
const bundle = loadHosts()
|
||||
|
||||
let http: KyInstance | undefined
|
||||
if (bundle !== undefined && bundle.current_host !== '' && bundle.tokens?.bearer !== undefined && bundle.tokens.bearer !== '') {
|
||||
@ -34,7 +30,7 @@ export default class Logout extends DifyCommand {
|
||||
const io = realStreams()
|
||||
await runWithSpinner(
|
||||
{ io, label: 'Signing out', enabled: true, style: 'dify-dim' },
|
||||
() => runLogout({ configDir, io, bundle, http, store }),
|
||||
() => runLogout({ io, bundle, http }),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import type { DifyMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import type { HostsBundle } from '../../../auth/hosts.js'
|
||||
import type { TokenStore } from '../../../auth/store.js'
|
||||
import type { Key, Store } from '../../../store/store.js'
|
||||
import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
@ -8,28 +8,23 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import { saveHosts } from '../../../auth/hosts.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { tokenKey } from '../../../store/manager.js'
|
||||
import { bufferStreams } from '../../../sys/io/streams'
|
||||
import { runLogout } from './logout.js'
|
||||
|
||||
class MemStore implements TokenStore {
|
||||
readonly entries = new Map<string, string>()
|
||||
async put(host: string, accountId: string, token: string): Promise<void> {
|
||||
this.entries.set(`${host}::${accountId}`, token)
|
||||
class MemStore implements Store {
|
||||
readonly entries = new Map<string, unknown>()
|
||||
get<T>(key: Key<T>): T {
|
||||
return (this.entries.get(key.key) as T | undefined) ?? key.default
|
||||
}
|
||||
|
||||
async get(host: string, accountId: string): Promise<string | undefined> {
|
||||
return this.entries.get(`${host}::${accountId}`)
|
||||
set<T>(key: Key<T>, value: T): void {
|
||||
this.entries.set(key.key, value)
|
||||
}
|
||||
|
||||
async delete(host: string, accountId: string): Promise<void> {
|
||||
this.entries.delete(`${host}::${accountId}`)
|
||||
}
|
||||
|
||||
async list(host: string): Promise<readonly string[]> {
|
||||
const prefix = `${host}::`
|
||||
return Array.from(this.entries.keys())
|
||||
.filter(k => k.startsWith(prefix))
|
||||
.map(k => k.slice(prefix.length))
|
||||
unset<T>(key: Key<T>): void {
|
||||
this.entries.delete(key.key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,13 +47,20 @@ function fixtureBundle(host: string): HostsBundle {
|
||||
describe('runLogout', () => {
|
||||
let mock: DifyMock
|
||||
let configDir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
configDir = await mkdtemp(join(tmpdir(), 'difyctl-logout-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = configDir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await mock.stop()
|
||||
await rm(configDir, { recursive: true, force: true })
|
||||
})
|
||||
@ -67,11 +69,11 @@ describe('runLogout', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const bundle = fixtureBundle(mock.url)
|
||||
await store.put(bundle.current_host, 'acct-1', 'dfoa_test')
|
||||
await saveHosts(configDir, bundle)
|
||||
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfoa_test')
|
||||
saveHosts(bundle)
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runLogout({ configDir, io, bundle, http, store })
|
||||
await runLogout({ io, bundle, http, store })
|
||||
|
||||
expect(store.entries.size).toBe(0)
|
||||
await expect(readFile(join(configDir, 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
|
||||
@ -82,7 +84,7 @@ describe('runLogout', () => {
|
||||
it('not-logged-in: throws BaseError', async () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
await expect(runLogout({ configDir, io, bundle: undefined, store })).rejects.toThrow(/not logged in/)
|
||||
await expect(runLogout({ io, bundle: undefined, store })).rejects.toThrow(/not logged in/)
|
||||
})
|
||||
|
||||
it('hosts.yml absent: still completes locally + emits success', async () => {
|
||||
@ -91,7 +93,7 @@ describe('runLogout', () => {
|
||||
const bundle = fixtureBundle(mock.url)
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runLogout({ configDir, io, bundle, http, store })
|
||||
await runLogout({ io, bundle, http, store })
|
||||
|
||||
expect(io.outBuf()).toContain('Logged out of')
|
||||
})
|
||||
@ -100,12 +102,12 @@ describe('runLogout', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const bundle = fixtureBundle(mock.url)
|
||||
await store.put(bundle.current_host, 'acct-1', 'dfoa_test')
|
||||
await saveHosts(configDir, bundle)
|
||||
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfoa_test')
|
||||
saveHosts(bundle)
|
||||
mock.setScenario('server-5xx')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 })
|
||||
|
||||
await runLogout({ configDir, io, bundle, http, store })
|
||||
await runLogout({ io, bundle, http, store })
|
||||
|
||||
expect(store.entries.size).toBe(0)
|
||||
expect(io.errBuf()).toContain('server revoke failed')
|
||||
@ -117,11 +119,11 @@ describe('runLogout', () => {
|
||||
const store = new MemStore()
|
||||
const bundle = fixtureBundle(mock.url)
|
||||
bundle.tokens = { bearer: 'dfp_personal_token' }
|
||||
await store.put(bundle.current_host, 'acct-1', 'dfp_personal_token')
|
||||
await saveHosts(configDir, bundle)
|
||||
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfp_personal_token')
|
||||
saveHosts(bundle)
|
||||
const http = createClient({ host: mock.url, bearer: 'dfp_personal_token' })
|
||||
|
||||
await runLogout({ configDir, io, bundle, http, store })
|
||||
await runLogout({ io, bundle, http, store })
|
||||
|
||||
expect(io.errBuf()).toBe('')
|
||||
expect(store.entries.size).toBe(0)
|
||||
@ -131,11 +133,11 @@ describe('runLogout', () => {
|
||||
const io = bufferStreams()
|
||||
const store = new MemStore()
|
||||
const bundle = fixtureBundle(mock.url)
|
||||
await saveHosts(configDir, bundle)
|
||||
saveHosts(bundle)
|
||||
await writeFile(join(configDir, 'config.yml'), 'foo: bar\n', 'utf8')
|
||||
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
|
||||
|
||||
await runLogout({ configDir, io, bundle, http, store })
|
||||
await runLogout({ io, bundle, http, store })
|
||||
|
||||
const cfg = await readFile(join(configDir, 'config.yml'), 'utf8')
|
||||
expect(cfg).toContain('foo: bar')
|
||||
|
||||
@ -1,21 +1,19 @@
|
||||
import type { KyInstance } from 'ky'
|
||||
import type { HostsBundle } from '../../../auth/hosts.js'
|
||||
import type { TokenStore } from '../../../auth/store.js'
|
||||
import type { Store } from '../../../store/store.js'
|
||||
import type { IOStreams } from '../../../sys/io/streams'
|
||||
import { unlink } from 'node:fs/promises'
|
||||
import { join } from 'node:path'
|
||||
import { AccountSessionsClient } from '../../../api/account-sessions.js'
|
||||
import { HOSTS_FILE_NAME } from '../../../auth/hosts.js'
|
||||
import { BaseError } from '../../../errors/base.js'
|
||||
import { ErrorCode } from '../../../errors/codes.js'
|
||||
import { getHostStore, getTokenStore, tokenKey } from '../../../store/manager.js'
|
||||
import { colorEnabled, colorScheme } from '../../../sys/io/color.js'
|
||||
|
||||
export type LogoutOptions = {
|
||||
readonly configDir: string
|
||||
readonly io: IOStreams
|
||||
readonly bundle: HostsBundle | undefined
|
||||
readonly http?: KyInstance
|
||||
readonly store: TokenStore
|
||||
/** Optional override for tests; production code resolves via `getTokenStore`. */
|
||||
readonly store?: Store
|
||||
}
|
||||
|
||||
export async function runLogout(opts: LogoutOptions): Promise<void> {
|
||||
@ -40,7 +38,8 @@ export async function runLogout(opts: LogoutOptions): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
await clearLocal(opts.configDir, bundle, opts.store)
|
||||
const tokens = opts.store ?? (await getTokenStore()).store
|
||||
clearLocal(bundle, tokens)
|
||||
|
||||
if (revokeWarning !== '')
|
||||
opts.io.err.write(revokeWarning)
|
||||
@ -53,18 +52,10 @@ function revokeAllowed(bearer: string): boolean {
|
||||
return REVOCABLE_PREFIXES.some(p => bearer.startsWith(p))
|
||||
}
|
||||
|
||||
async function clearLocal(configDir: string, bundle: HostsBundle, store: TokenStore): Promise<void> {
|
||||
function clearLocal(bundle: HostsBundle, store: Store): void {
|
||||
const accountId = bundle.account?.id ?? bundle.external_subject?.email ?? 'default'
|
||||
try {
|
||||
await store.delete(bundle.current_host, accountId)
|
||||
store.unset(tokenKey(bundle.current_host, accountId))
|
||||
}
|
||||
catch { /* best-effort */ }
|
||||
const hostsPath = join(configDir, HOSTS_FILE_NAME)
|
||||
try {
|
||||
await unlink(hostsPath)
|
||||
}
|
||||
catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import { loadHosts } from '../../../auth/hosts.js'
|
||||
import { Flags } from '../../../framework/flags.js'
|
||||
import { resolveConfigDir } from '../../../store/dir.js'
|
||||
import { realStreams } from '../../../sys/io/streams'
|
||||
import { DifyCommand } from '../../_shared/dify-command.js'
|
||||
import { runStatus } from './status.js'
|
||||
@ -21,8 +20,7 @@ export default class Status extends DifyCommand {
|
||||
|
||||
async run(argv: string[]): Promise<void> {
|
||||
const { flags } = this.parse(Status, argv)
|
||||
const configDir = resolveConfigDir()
|
||||
const bundle = await loadHosts(configDir)
|
||||
const bundle = loadHosts()
|
||||
await runStatus({ io: realStreams(), bundle, verbose: flags.verbose, json: flags.json })
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import { loadHosts } from '../../../auth/hosts.js'
|
||||
import { Flags } from '../../../framework/flags.js'
|
||||
import { resolveConfigDir } from '../../../store/dir.js'
|
||||
import { realStreams } from '../../../sys/io/streams'
|
||||
import { DifyCommand } from '../../_shared/dify-command.js'
|
||||
import { runWhoami } from './whoami.js'
|
||||
@ -19,8 +18,7 @@ export default class Whoami extends DifyCommand {
|
||||
|
||||
async run(argv: string[]): Promise<void> {
|
||||
const { flags } = this.parse(Whoami, argv)
|
||||
const configDir = resolveConfigDir()
|
||||
const bundle = await loadHosts(configDir)
|
||||
const bundle = loadHosts()
|
||||
await runWhoami({ io: realStreams(), bundle, json: flags.json })
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,43 +1,49 @@
|
||||
import { mkdtemp, writeFile } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_NAME } from '../../../config/schema.js'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { isBaseError } from '../../../errors/base.js'
|
||||
import { ErrorCode } from '../../../errors/codes.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { getConfigurationStore } from '../../../store/manager.js'
|
||||
import { runConfigGet } from './run.js'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
|
||||
describe('runConfigGet', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-get-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
it('returns set value with trailing newline', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: yaml\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.format' })
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('returns set value with trailing newline', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'yaml' },
|
||||
})
|
||||
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.format' })
|
||||
expect(out).toBe('yaml\n')
|
||||
})
|
||||
|
||||
it('returns empty line when key is unset (matches Go fmt.Fprintln)', () => {
|
||||
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.format' })
|
||||
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.format' })
|
||||
expect(out).toBe('\n')
|
||||
})
|
||||
|
||||
it('throws BaseError(config_invalid_key) on unknown key', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigGet({ store: makeStore(dir), key: 'bogus.key' })
|
||||
runConfigGet({ store: getConfigurationStore(), key: 'bogus.key' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -45,13 +51,12 @@ describe('runConfigGet', () => {
|
||||
expect(caught.code).toBe(ErrorCode.ConfigInvalidKey)
|
||||
})
|
||||
|
||||
it('returns numeric limit as string', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n limit: 75\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.limit' })
|
||||
it('returns numeric limit as string', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { limit: 75 },
|
||||
})
|
||||
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.limit' })
|
||||
expect(out).toBe('75\n')
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
import { join } from 'node:path'
|
||||
import { raw } from '../../../framework/output.js'
|
||||
import { resolveConfigDir } from '../../../store/dir.js'
|
||||
import { CONFIG_FILE_NAME } from '../../../store/manager.js'
|
||||
import { DifyCommand } from '../../_shared/dify-command.js'
|
||||
import { runConfigPath } from './run.js'
|
||||
|
||||
export default class ConfigPath extends DifyCommand {
|
||||
static override description = 'Print the resolved config.yml path'
|
||||
@ -12,6 +13,8 @@ export default class ConfigPath extends DifyCommand {
|
||||
|
||||
async run(argv: string[]) {
|
||||
this.parse(ConfigPath, argv)
|
||||
return raw(runConfigPath({ dir: resolveConfigDir() }))
|
||||
return raw(
|
||||
join(resolveConfigDir(), CONFIG_FILE_NAME),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,14 +0,0 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { runConfigPath } from './run.js'
|
||||
|
||||
describe('runConfigPath', () => {
|
||||
it('joins dir and config.yml with trailing newline', () => {
|
||||
const out = runConfigPath({ dir: '/tmp/x' })
|
||||
expect(out).toBe('/tmp/x/config.yml\n')
|
||||
})
|
||||
|
||||
it('handles trailing slash on dir', () => {
|
||||
const out = runConfigPath({ dir: '/tmp/x/' })
|
||||
expect(out).toBe('/tmp/x/config.yml\n')
|
||||
})
|
||||
})
|
||||
@ -1,10 +0,0 @@
|
||||
import { join } from 'node:path'
|
||||
import { FILE_NAME } from '../../../config/schema.js'
|
||||
|
||||
export type RunConfigPathOptions = {
|
||||
readonly dir: string
|
||||
}
|
||||
|
||||
export function runConfigPath(opts: RunConfigPathOptions): string {
|
||||
return `${join(opts.dir, FILE_NAME)}\n`
|
||||
}
|
||||
@ -1,35 +1,46 @@
|
||||
import { mkdtemp, readFile } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_NAME } from '../../../config/schema.js'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { loadConfig } from '../../../config/config-loader.js'
|
||||
import { isBaseError } from '../../../errors/base.js'
|
||||
import { ErrorCode, ExitCode } from '../../../errors/codes.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { getConfigurationStore } from '../../../store/manager.js'
|
||||
import { runConfigSet } from './run.js'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
|
||||
describe('runConfigSet', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-set-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
it('writes config.yml and returns "set k = v\\n"', async () => {
|
||||
const out = runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'json' })
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('persists the value and returns "set k = v\\n"', () => {
|
||||
const out = runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'json' })
|
||||
expect(out).toBe('set defaults.format = json\n')
|
||||
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
|
||||
expect(raw).toContain('format: json')
|
||||
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found)
|
||||
expect(r.config.defaults.format).toBe('json')
|
||||
})
|
||||
|
||||
it('rejects invalid format value with config_invalid_value', async () => {
|
||||
it('rejects invalid format value with config_invalid_value', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'csv' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'csv' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -40,7 +51,7 @@ describe('runConfigSet', () => {
|
||||
it('rejects unknown key with config_invalid_key', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigSet({ store: makeStore(dir), key: 'bogus', value: 'x' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'bogus', value: 'x' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -48,18 +59,22 @@ describe('runConfigSet', () => {
|
||||
expect(caught.code).toBe(ErrorCode.ConfigInvalidKey)
|
||||
})
|
||||
|
||||
it('preserves prior keys when setting a new one', async () => {
|
||||
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'yaml' })
|
||||
runConfigSet({ store: makeStore(dir), key: 'defaults.limit', value: '40' })
|
||||
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
|
||||
expect(raw).toContain('format: yaml')
|
||||
expect(raw).toContain('limit: 40')
|
||||
it('preserves prior keys when setting a new one', () => {
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'yaml' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'defaults.limit', value: '40' })
|
||||
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found) {
|
||||
expect(r.config.defaults.format).toBe('yaml')
|
||||
expect(r.config.defaults.limit).toBe(40)
|
||||
}
|
||||
})
|
||||
|
||||
it('exit code for invalid value is Usage (2)', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'csv' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'csv' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -70,7 +85,7 @@ describe('runConfigSet', () => {
|
||||
it('exit code for unknown key is Usage (2)', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigSet({ store: makeStore(dir), key: 'bogus', value: 'x' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'bogus', value: 'x' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -81,7 +96,7 @@ describe('runConfigSet', () => {
|
||||
it('typed wrap chain: invalid defaults.limit surfaces ConfigInvalidValue (not UsageInvalidFlag)', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigSet({ store: makeStore(dir), key: 'defaults.limit', value: 'abc' })
|
||||
runConfigSet({ store: getConfigurationStore(), key: 'defaults.limit', value: 'abc' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
|
||||
@ -1,48 +1,61 @@
|
||||
import { mkdtemp, readFile, writeFile } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_NAME } from '../../../config/schema.js'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { loadConfig } from '../../../config/config-loader.js'
|
||||
import { isBaseError } from '../../../errors/base.js'
|
||||
import { ErrorCode } from '../../../errors/codes.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { getConfigurationStore } from '../../../store/manager.js'
|
||||
import { runConfigUnset } from './run.js'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
|
||||
describe('runConfigUnset', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-unset-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
it('clears the requested key, leaves others intact', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: json\n limit: 25\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigUnset({ store: makeStore(dir), key: 'defaults.format' })
|
||||
expect(out).toBe('unset defaults.format\n')
|
||||
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
|
||||
expect(raw).not.toContain('format:')
|
||||
expect(raw).toContain('limit: 25')
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('is a no-op (writes empty config) when key was already unset', async () => {
|
||||
const out = runConfigUnset({ store: makeStore(dir), key: 'defaults.format' })
|
||||
it('clears the requested key, leaves others intact', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'json', limit: 25 },
|
||||
})
|
||||
const out = runConfigUnset({ store: getConfigurationStore(), key: 'defaults.format' })
|
||||
expect(out).toBe('unset defaults.format\n')
|
||||
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
|
||||
expect(raw).toContain('schema_version: 1')
|
||||
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found) {
|
||||
expect(r.config.defaults.format).not.toBe('json')
|
||||
expect(r.config.defaults.limit).toBe(25)
|
||||
}
|
||||
})
|
||||
|
||||
it('is a no-op (writes empty config) when key was already unset', () => {
|
||||
const out = runConfigUnset({ store: getConfigurationStore(), key: 'defaults.format' })
|
||||
expect(out).toBe('unset defaults.format\n')
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found)
|
||||
expect(r.config.schema_version).toBe(1)
|
||||
})
|
||||
|
||||
it('rejects unknown key', () => {
|
||||
let caught: unknown
|
||||
try {
|
||||
runConfigUnset({ store: makeStore(dir), key: 'bogus' })
|
||||
runConfigUnset({ store: getConfigurationStore(), key: 'bogus' })
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
|
||||
@ -1,67 +1,69 @@
|
||||
import { mkdtemp, writeFile } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { FILE_NAME } from '../../../config/schema.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { getConfigurationStore } from '../../../store/manager.js'
|
||||
import { runConfigView } from './run.js'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
|
||||
describe('runConfigView', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-view-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
// tmpdir cleanup is best-effort
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('text format: empty config returns empty string', () => {
|
||||
const out = runConfigView({ store: makeStore(dir) })
|
||||
const out = runConfigView({ store: getConfigurationStore() })
|
||||
expect(out).toBe('')
|
||||
})
|
||||
|
||||
it('text format: emits "key = value" lines for set keys only', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: json\n limit: 50\nstate:\n current_app: app-1\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigView({ store: makeStore(dir) })
|
||||
it('text format: emits "key = value" lines for set keys only', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'json', limit: 50 },
|
||||
state: { current_app: 'app-1' },
|
||||
})
|
||||
const out = runConfigView({ store: getConfigurationStore() })
|
||||
expect(out).toBe(
|
||||
'defaults.format = json\ndefaults.limit = 50\nstate.current_app = app-1\n',
|
||||
)
|
||||
})
|
||||
|
||||
it('text format: skips unset keys', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: yaml\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigView({ store: makeStore(dir) })
|
||||
it('text format: skips unset keys', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'yaml' },
|
||||
})
|
||||
const out = runConfigView({ store: getConfigurationStore() })
|
||||
expect(out).toBe('defaults.format = yaml\n')
|
||||
expect(out).not.toContain('defaults.limit')
|
||||
expect(out).not.toContain('state.current_app')
|
||||
})
|
||||
|
||||
it('json format: empty config returns "{}\\n"', () => {
|
||||
const out = runConfigView({ store: makeStore(dir), json: true })
|
||||
const out = runConfigView({ store: getConfigurationStore(), json: true })
|
||||
expect(out).toBe('{}\n')
|
||||
})
|
||||
|
||||
it('json format: defaults.limit is numeric, others are strings', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: table\n limit: 100\nstate:\n current_app: app-x\n',
|
||||
'utf8',
|
||||
)
|
||||
const out = runConfigView({ store: makeStore(dir), json: true })
|
||||
it('json format: defaults.limit is numeric, others are strings', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'table', limit: 100 },
|
||||
state: { current_app: 'app-x' },
|
||||
})
|
||||
const out = runConfigView({ store: getConfigurationStore(), json: true })
|
||||
const parsed = JSON.parse(out) as Record<string, unknown>
|
||||
expect(parsed['defaults.format']).toBe('table')
|
||||
expect(parsed['defaults.limit']).toBe(100)
|
||||
@ -69,7 +71,7 @@ describe('runConfigView', () => {
|
||||
})
|
||||
|
||||
it('json format: trailing newline matches Go encoder.Encode', () => {
|
||||
const out = runConfigView({ store: makeStore(dir), json: true })
|
||||
const out = runConfigView({ store: getConfigurationStore(), json: true })
|
||||
expect(out.endsWith('\n')).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
@ -8,8 +8,8 @@ import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import { loadAppInfoCache } from '../../../cache/app-info.js'
|
||||
import { formatted, stringifyOutput } from '../../../framework/output.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { CACHE_APP_INFO, cachePath } from '../../../store/manager.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../../../store/dir.js'
|
||||
import { CACHE_APP_INFO, getCache } from '../../../store/manager.js'
|
||||
import { runDescribeApp } from './run.js'
|
||||
|
||||
function bundle(): HostsBundle {
|
||||
@ -29,17 +29,24 @@ function bundle(): HostsBundle {
|
||||
describe('runDescribeApp', () => {
|
||||
let mock: DifyMock
|
||||
let dir: string
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-desc-'))
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await mock.stop()
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
async function render(opts: Parameters<typeof runDescribeApp>[0]): Promise<string> {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const data = await runDescribeApp(
|
||||
opts,
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, cache },
|
||||
@ -82,7 +89,7 @@ describe('runDescribeApp', () => {
|
||||
})
|
||||
|
||||
it('refresh: bypasses cache', async () => {
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runDescribeApp(
|
||||
{ appId: 'app-1' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, cache },
|
||||
|
||||
@ -7,8 +7,8 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
|
||||
import { loadAppInfoCache } from '../../../cache/app-info.js'
|
||||
import { createClient } from '../../../http/client.js'
|
||||
import { CACHE_APP_INFO, cachePath } from '../../../store/manager.js'
|
||||
import { YamlStore } from '../../../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../../../store/dir.js'
|
||||
import { CACHE_APP_INFO, getCache } from '../../../store/manager.js'
|
||||
import { bufferStreams } from '../../../sys/io/streams'
|
||||
import { resumeApp } from '../../resume/app/run.js'
|
||||
import { runApp } from './run.js'
|
||||
@ -30,18 +30,25 @@ function bundle(): HostsBundle {
|
||||
describe('runApp', () => {
|
||||
let mock: DifyMock
|
||||
let dir: string
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
mock = await startMock({ scenario: 'happy' })
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-runapp-'))
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await mock.stop()
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('chat: prints answer + conversation hint to stderr', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-1', message: 'hi' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -52,7 +59,7 @@ describe('runApp', () => {
|
||||
|
||||
it('workflow: rejects positional message with usage error', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await expect(runApp(
|
||||
{ appId: 'app-2', message: 'hi' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -61,7 +68,7 @@ describe('runApp', () => {
|
||||
|
||||
it('workflow: prints single-string output as plain text', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { x: '1' } },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -71,7 +78,7 @@ describe('runApp', () => {
|
||||
|
||||
it('json: passes through full envelope', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-1', message: 'hi', format: 'json' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -104,7 +111,7 @@ describe('runApp', () => {
|
||||
|
||||
it('--stream chat: streams answer to stdout and hint to stderr', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-1', message: 'hi', stream: true },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -116,7 +123,7 @@ describe('runApp', () => {
|
||||
|
||||
it('--stream -o json chat: aggregates into blocking-shape envelope', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-1', message: 'hi', stream: true, format: 'json' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -129,7 +136,7 @@ describe('runApp', () => {
|
||||
|
||||
it('agent-chat without --stream: collects and prints answer', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-4', workspace: 'ws-2', message: 'do research' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -140,7 +147,7 @@ describe('runApp', () => {
|
||||
|
||||
it('agent-chat with --stream: live-prints answer and thoughts to stderr', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-4', workspace: 'ws-2', message: 'go', stream: true },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -151,7 +158,7 @@ describe('runApp', () => {
|
||||
|
||||
it('--stream workflow -o json: aggregates from workflow_finished', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { x: '1' }, stream: true, format: 'json' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -164,7 +171,7 @@ describe('runApp', () => {
|
||||
it('stream-error scenario: error event surfaces typed BaseError', async () => {
|
||||
mock.setScenario('stream-error')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await expect(runApp(
|
||||
{ appId: 'app-1', message: 'hi', stream: true },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 }), host: mock.url, io, cache },
|
||||
@ -173,7 +180,7 @@ describe('runApp', () => {
|
||||
|
||||
it('--inputs-file: reads inputs from file', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const inputsFile = join(dir, 'inputs.json')
|
||||
const { writeFile } = await import('node:fs/promises')
|
||||
await writeFile(inputsFile, JSON.stringify({ x: 'from-file' }))
|
||||
@ -197,7 +204,7 @@ describe('runApp', () => {
|
||||
|
||||
it('--inputs: accepts JSON object string', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputsJson: '{"x":"hello"}' },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -219,7 +226,7 @@ describe('runApp', () => {
|
||||
it('hitl pause (text): writes readable block to stdout, hint to stderr, exits 0', async () => {
|
||||
mock.setScenario('hitl-pause')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
let exitCode = -1
|
||||
await expect(runApp(
|
||||
{ appId: 'app-2', inputs: {} },
|
||||
@ -248,7 +255,7 @@ describe('runApp', () => {
|
||||
it('hitl pause (json): writes JSON envelope to stdout, exits 0', async () => {
|
||||
mock.setScenario('hitl-pause')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
let exitCode = -1
|
||||
await expect(runApp(
|
||||
{ appId: 'app-2', inputs: {}, format: 'json' },
|
||||
@ -274,7 +281,7 @@ describe('runApp', () => {
|
||||
it('resume: withHistory: false completes successfully', async () => {
|
||||
mock.setScenario('hitl-resume')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await resumeApp(
|
||||
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, withHistory: false },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -285,7 +292,7 @@ describe('runApp', () => {
|
||||
it('resume: submits form and streams workflow to completion', async () => {
|
||||
mock.setScenario('hitl-resume')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await resumeApp(
|
||||
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {} },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -296,7 +303,7 @@ describe('runApp', () => {
|
||||
it('resume --stream: live-prints workflow node progress to stderr', async () => {
|
||||
mock.setScenario('hitl-resume')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await resumeApp(
|
||||
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, stream: true },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -307,7 +314,7 @@ describe('runApp', () => {
|
||||
|
||||
it('workflow: --file remote URL is passed as remote_url input variable', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', files: ['doc=https://example.com/report.pdf'] },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
@ -326,7 +333,7 @@ describe('runApp', () => {
|
||||
it('workflow: --file @path uploads file and passes local_file input variable', async () => {
|
||||
const { writeFile } = await import('node:fs/promises')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
const filePath = join(dir, 'test.pdf')
|
||||
await writeFile(filePath, 'fake pdf content')
|
||||
await runApp(
|
||||
@ -345,7 +352,7 @@ describe('runApp', () => {
|
||||
|
||||
it('workflow: --file overrides same-named key from --inputs (file wins)', async () => {
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { doc: 'old-value' }, files: ['doc=https://example.com/override.pdf'] },
|
||||
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
|
||||
|
||||
@ -22,7 +22,6 @@ export default class UseWorkspace extends DifyCommand {
|
||||
const { args, flags } = this.parse(UseWorkspace, argv)
|
||||
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'] })
|
||||
await runUseWorkspace({ workspaceId: args.workspaceId }, {
|
||||
configDir: ctx.configDir,
|
||||
bundle: ctx.bundle,
|
||||
http: ctx.http,
|
||||
io: ctx.io,
|
||||
|
||||
@ -9,6 +9,7 @@ import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { loadHosts, saveHosts } from '../../../auth/hosts.js'
|
||||
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
|
||||
import { bufferStreams } from '../../../sys/io/streams.js'
|
||||
import { runUseWorkspace } from './use.js'
|
||||
|
||||
@ -51,23 +52,29 @@ function fakeClient(opts: {
|
||||
describe('runUseWorkspace', () => {
|
||||
let configDir: string
|
||||
|
||||
let prevConfigDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
configDir = await mkdtemp(join(tmpdir(), 'difyctl-use-workspace-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = configDir
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(configDir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('happy path: POST /switch → GET /workspaces → write hosts.yml', async () => {
|
||||
const io = bufferStreams()
|
||||
const b = bundle()
|
||||
await saveHosts(configDir, b)
|
||||
saveHosts(b)
|
||||
const client = fakeClient({})
|
||||
|
||||
const next = await runUseWorkspace(
|
||||
{ workspaceId: 'ws-2' },
|
||||
{
|
||||
configDir,
|
||||
bundle: b,
|
||||
http: {} as KyInstance,
|
||||
io,
|
||||
@ -82,7 +89,7 @@ describe('runUseWorkspace', () => {
|
||||
{ id: 'ws-1', name: 'Default', role: 'owner' },
|
||||
{ id: 'ws-2', name: 'Switched', role: 'normal' },
|
||||
])
|
||||
const reloaded = await loadHosts(configDir)
|
||||
const reloaded = loadHosts()
|
||||
expect(reloaded?.workspace?.id).toBe('ws-2')
|
||||
expect(reloaded?.workspace?.name).toBe('Switched')
|
||||
expect(io.outBuf()).toMatch(/Switched to Switched \(ws-2\)/)
|
||||
@ -93,15 +100,15 @@ describe('runUseWorkspace', () => {
|
||||
// We expect saveHosts to record the fresh name from the server.
|
||||
const io = bufferStreams()
|
||||
const b = bundle()
|
||||
await saveHosts(configDir, b)
|
||||
saveHosts(b)
|
||||
const client = fakeClient({})
|
||||
|
||||
await runUseWorkspace(
|
||||
{ workspaceId: 'ws-2' },
|
||||
{ configDir, bundle: b, http: {} as KyInstance, io, workspacesFactory: () => client as never },
|
||||
{ bundle: b, http: {} as KyInstance, io, workspacesFactory: () => client as never },
|
||||
)
|
||||
|
||||
const reloaded = await loadHosts(configDir)
|
||||
const reloaded = loadHosts()
|
||||
expect(reloaded?.workspace?.name).toBe('Switched')
|
||||
expect(reloaded?.available_workspaces?.find(w => w.id === 'ws-2')?.name).toBe('Switched')
|
||||
})
|
||||
@ -109,8 +116,8 @@ describe('runUseWorkspace', () => {
|
||||
it('does NOT mutate hosts.yml when POST /switch fails', async () => {
|
||||
const io = bufferStreams()
|
||||
const b = bundle()
|
||||
await saveHosts(configDir, b)
|
||||
const before = await loadHosts(configDir)
|
||||
saveHosts(b)
|
||||
const before = loadHosts()
|
||||
|
||||
const client = fakeClient({
|
||||
switch: () => Promise.reject(new Error('forbidden')),
|
||||
@ -120,7 +127,6 @@ describe('runUseWorkspace', () => {
|
||||
runUseWorkspace(
|
||||
{ workspaceId: 'ws-2' },
|
||||
{
|
||||
configDir,
|
||||
bundle: b,
|
||||
http: {} as KyInstance,
|
||||
io,
|
||||
@ -130,7 +136,7 @@ describe('runUseWorkspace', () => {
|
||||
).rejects.toThrow(/forbidden/)
|
||||
|
||||
expect(client.list).not.toHaveBeenCalled()
|
||||
const after = await loadHosts(configDir)
|
||||
const after = loadHosts()
|
||||
expect(after).toEqual(before)
|
||||
expect(after?.workspace?.id).toBe('ws-1')
|
||||
})
|
||||
@ -138,8 +144,8 @@ describe('runUseWorkspace', () => {
|
||||
it('does NOT mutate hosts.yml when GET /workspaces fails after switch', async () => {
|
||||
const io = bufferStreams()
|
||||
const b = bundle()
|
||||
await saveHosts(configDir, b)
|
||||
const before = await loadHosts(configDir)
|
||||
saveHosts(b)
|
||||
const before = loadHosts()
|
||||
|
||||
const client = fakeClient({
|
||||
list: () => Promise.reject(new Error('transient list failure')),
|
||||
@ -149,7 +155,6 @@ describe('runUseWorkspace', () => {
|
||||
runUseWorkspace(
|
||||
{ workspaceId: 'ws-2' },
|
||||
{
|
||||
configDir,
|
||||
bundle: b,
|
||||
http: {} as KyInstance,
|
||||
io,
|
||||
@ -158,14 +163,14 @@ describe('runUseWorkspace', () => {
|
||||
),
|
||||
).rejects.toThrow(/transient list failure/)
|
||||
|
||||
const after = await loadHosts(configDir)
|
||||
const after = loadHosts()
|
||||
expect(after).toEqual(before)
|
||||
})
|
||||
|
||||
it('throws when server returns switch=<id> but id is missing from /workspaces list', async () => {
|
||||
const io = bufferStreams()
|
||||
const b = bundle()
|
||||
await saveHosts(configDir, b)
|
||||
saveHosts(b)
|
||||
|
||||
const client = fakeClient({
|
||||
switch: () => Promise.resolve({
|
||||
@ -187,7 +192,6 @@ describe('runUseWorkspace', () => {
|
||||
runUseWorkspace(
|
||||
{ workspaceId: 'ws-7' },
|
||||
{
|
||||
configDir,
|
||||
bundle: b,
|
||||
http: {} as KyInstance,
|
||||
io,
|
||||
|
||||
@ -13,7 +13,6 @@ export type UseWorkspaceOptions = {
|
||||
}
|
||||
|
||||
export type UseWorkspaceDeps = {
|
||||
readonly configDir: string
|
||||
readonly bundle: HostsBundle
|
||||
readonly http: KyInstance
|
||||
readonly io: IOStreams
|
||||
@ -70,7 +69,7 @@ export async function runUseWorkspace(
|
||||
role: w.role,
|
||||
})),
|
||||
}
|
||||
await saveHosts(deps.configDir, next)
|
||||
saveHosts(next)
|
||||
deps.io.out.write(`${cs.successIcon()} Switched to ${matched.name} (${matched.id})\n`)
|
||||
return next
|
||||
}
|
||||
|
||||
@ -1,48 +1,52 @@
|
||||
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises'
|
||||
import type { YamlStore } from '../store/store'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { isBaseError } from '../errors/base'
|
||||
import { ErrorCode } from '../errors/codes'
|
||||
import { YamlStore } from '../store/store'
|
||||
import { ENV_CONFIG_DIR } from '../store/dir'
|
||||
import { getConfigurationStore } from '../store/manager'
|
||||
import { loadConfig } from './config-loader'
|
||||
import { FILE_NAME } from './schema'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
|
||||
describe('loadConfig', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-cfg-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await mkdir(dir, { recursive: true }).catch(() => {})
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('returns found:false when config.yml is missing', () => {
|
||||
const r = loadConfig(makeStore(dir))
|
||||
it('returns found:false when config is missing', () => {
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(false)
|
||||
})
|
||||
|
||||
it('parses a minimal valid config.yml', async () => {
|
||||
await writeFile(join(dir, FILE_NAME), 'schema_version: 1\n', 'utf8')
|
||||
const r = loadConfig(makeStore(dir))
|
||||
it('parses a minimal valid config', () => {
|
||||
getConfigurationStore().setTyped({ schema_version: 1 })
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found)
|
||||
expect(r.config.schema_version).toBe(1)
|
||||
})
|
||||
|
||||
it('parses defaults + state', async () => {
|
||||
await writeFile(
|
||||
join(dir, FILE_NAME),
|
||||
'schema_version: 1\ndefaults:\n format: json\n limit: 100\nstate:\n current_app: app-1\n',
|
||||
'utf8',
|
||||
)
|
||||
const r = loadConfig(makeStore(dir))
|
||||
it('parses defaults + state', () => {
|
||||
getConfigurationStore().setTyped({
|
||||
schema_version: 1,
|
||||
defaults: { format: 'json', limit: 100 },
|
||||
state: { current_app: 'app-1' },
|
||||
})
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found) {
|
||||
expect(r.config.defaults.format).toBe('json')
|
||||
@ -51,11 +55,29 @@ describe('loadConfig', () => {
|
||||
}
|
||||
})
|
||||
|
||||
it('throws BaseError(config_schema_unsupported) when YAML is malformed', async () => {
|
||||
await writeFile(join(dir, FILE_NAME), '::not yaml::: {{[', 'utf8')
|
||||
it('throws BaseError(config_schema_unsupported) when the store fails to parse the file', () => {
|
||||
// Simulate a corrupt on-disk file via a fake store; loadConfig must wrap
|
||||
// the underlying error as ConfigSchemaUnsupported.
|
||||
const throwingStore = {
|
||||
getTyped: () => { throw new Error('YAML parse failure') },
|
||||
} as unknown as YamlStore
|
||||
let caught: unknown
|
||||
try {
|
||||
loadConfig(makeStore(dir))
|
||||
loadConfig(throwingStore)
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
if (isBaseError(caught)) {
|
||||
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
|
||||
expect(caught.hint).toMatch(/not valid YAML/)
|
||||
}
|
||||
})
|
||||
|
||||
it('throws BaseError(config_schema_unsupported) when zod validation fails', () => {
|
||||
getConfigurationStore().setTyped({ defaults: { limit: 9999 } })
|
||||
let caught: unknown
|
||||
try {
|
||||
loadConfig(getConfigurationStore())
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
@ -63,23 +85,11 @@ describe('loadConfig', () => {
|
||||
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
|
||||
})
|
||||
|
||||
it('throws BaseError(config_schema_unsupported) when zod validation fails', async () => {
|
||||
await writeFile(join(dir, FILE_NAME), 'defaults:\n limit: 9999\n', 'utf8')
|
||||
it('throws BaseError(config_schema_unsupported) when schema_version > 1 (forward-refuse)', () => {
|
||||
getConfigurationStore().setTyped({ schema_version: 2 })
|
||||
let caught: unknown
|
||||
try {
|
||||
loadConfig(makeStore(dir))
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
if (isBaseError(caught))
|
||||
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
|
||||
})
|
||||
|
||||
it('throws BaseError(config_schema_unsupported) when schema_version > 1 (forward-refuse)', async () => {
|
||||
await writeFile(join(dir, FILE_NAME), 'schema_version: 2\n', 'utf8')
|
||||
let caught: unknown
|
||||
try {
|
||||
loadConfig(makeStore(dir))
|
||||
loadConfig(getConfigurationStore())
|
||||
}
|
||||
catch (err) { caught = err }
|
||||
expect(isBaseError(caught)).toBe(true)
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { CONFIG_FILE_NAME } from '../store/manager.js'
|
||||
import {
|
||||
ALLOWED_FORMATS,
|
||||
ConfigFileSchema,
|
||||
CURRENT_SCHEMA_VERSION,
|
||||
emptyConfig,
|
||||
FILE_NAME,
|
||||
} from './schema.js'
|
||||
|
||||
describe('config schema', () => {
|
||||
@ -12,8 +12,8 @@ describe('config schema', () => {
|
||||
expect(CURRENT_SCHEMA_VERSION).toBe(1)
|
||||
})
|
||||
|
||||
it('FILE_NAME is config.yml', () => {
|
||||
expect(FILE_NAME).toBe('config.yml')
|
||||
it('CONFIG_FILE_NAME is config.yml', () => {
|
||||
expect(CONFIG_FILE_NAME).toBe('config.yml')
|
||||
})
|
||||
|
||||
it('ALLOWED_FORMATS matches Go set (json/yaml/table/wide/name/text)', () => {
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { z } from 'zod'
|
||||
|
||||
export const CURRENT_SCHEMA_VERSION = 1
|
||||
export const FILE_NAME = 'config.yml'
|
||||
|
||||
export const ALLOWED_FORMATS = ['json', 'yaml', 'table', 'wide', 'name', 'text'] as const
|
||||
export type AllowedFormat = (typeof ALLOWED_FORMATS)[number]
|
||||
|
||||
@ -1,45 +1,57 @@
|
||||
import { mkdtemp, readdir, readFile, stat } from 'node:fs/promises'
|
||||
import { mkdtemp, rm } from 'node:fs/promises'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { beforeEach, describe, expect, it } from 'vitest'
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
|
||||
import { loadConfig } from '../config/config-loader'
|
||||
import { emptyConfig, FILE_NAME } from '../config/schema'
|
||||
import { platform } from '../sys'
|
||||
import { emptyConfig } from '../config/schema'
|
||||
import { saveConfig } from './config-writer'
|
||||
import { YamlStore } from './store'
|
||||
|
||||
function makeStore(dir: string): YamlStore {
|
||||
return new YamlStore(join(dir, FILE_NAME))
|
||||
}
|
||||
import { ENV_CONFIG_DIR } from './dir'
|
||||
import { getConfigurationStore } from './manager'
|
||||
|
||||
describe('saveConfig', () => {
|
||||
let dir: string
|
||||
let prevConfigDir: string | undefined
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-w-'))
|
||||
prevConfigDir = process.env[ENV_CONFIG_DIR]
|
||||
process.env[ENV_CONFIG_DIR] = dir
|
||||
})
|
||||
|
||||
it('writes config.yml in the target dir', async () => {
|
||||
saveConfig(makeStore(dir), { ...emptyConfig(), schema_version: 1 })
|
||||
const stats = await stat(join(dir, FILE_NAME))
|
||||
expect(stats.isFile()).toBe(true)
|
||||
afterEach(async () => {
|
||||
if (prevConfigDir === undefined)
|
||||
delete process.env[ENV_CONFIG_DIR]
|
||||
else
|
||||
process.env[ENV_CONFIG_DIR] = prevConfigDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('stamps schema_version=1 even if caller passed 0', () => {
|
||||
saveConfig(makeStore(dir), { ...emptyConfig() })
|
||||
const r = loadConfig(makeStore(dir))
|
||||
saveConfig(getConfigurationStore(), { ...emptyConfig() })
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found)
|
||||
expect(r.config.schema_version).toBe(1)
|
||||
})
|
||||
|
||||
it('round-trips defaults + state through YAML', () => {
|
||||
saveConfig(makeStore(dir), {
|
||||
it('overrides a stale schema_version on save', () => {
|
||||
saveConfig(getConfigurationStore(), {
|
||||
...emptyConfig(),
|
||||
schema_version: 999 as never,
|
||||
})
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found)
|
||||
expect(r.config.schema_version).toBe(1)
|
||||
})
|
||||
|
||||
it('round-trips defaults + state', () => {
|
||||
saveConfig(getConfigurationStore(), {
|
||||
schema_version: 1,
|
||||
defaults: { format: 'wide', limit: 75 },
|
||||
state: { current_app: 'app-xyz' },
|
||||
})
|
||||
const r = loadConfig(makeStore(dir))
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found) {
|
||||
expect(r.config.defaults.format).toBe('wide')
|
||||
@ -48,39 +60,22 @@ describe('saveConfig', () => {
|
||||
}
|
||||
})
|
||||
|
||||
it('writes file with mode 0o600 (POSIX)', async () => {
|
||||
if (platform() === 'win32')
|
||||
return
|
||||
saveConfig(makeStore(dir), emptyConfig())
|
||||
const s = await stat(join(dir, FILE_NAME))
|
||||
expect(s.mode & 0o777).toBe(0o600)
|
||||
})
|
||||
|
||||
it('does not leave a tmp file on success', async () => {
|
||||
saveConfig(makeStore(dir), emptyConfig())
|
||||
const entries = await readdir(dir)
|
||||
expect(entries.filter(f => f.endsWith('.tmp'))).toHaveLength(0)
|
||||
expect(entries.filter(f => f.includes('.tmp.'))).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('creates parent dir at 0o700 if absent', async () => {
|
||||
if (platform() === 'win32')
|
||||
return
|
||||
const nested = join(dir, 'nested', 'sub')
|
||||
saveConfig(makeStore(nested), emptyConfig())
|
||||
const s = await stat(nested)
|
||||
expect(s.isDirectory()).toBe(true)
|
||||
expect(s.mode & 0o777).toBe(0o700)
|
||||
})
|
||||
|
||||
it('emits parseable YAML (round-trip via fs.readFile + js-yaml)', async () => {
|
||||
saveConfig(makeStore(dir), {
|
||||
it('overwrites the previous config on resave', () => {
|
||||
saveConfig(getConfigurationStore(), {
|
||||
schema_version: 1,
|
||||
defaults: { format: 'json' },
|
||||
state: {},
|
||||
})
|
||||
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
|
||||
expect(raw).toMatch(/^schema_version:/m)
|
||||
expect(raw).toMatch(/format: json/)
|
||||
saveConfig(getConfigurationStore(), {
|
||||
schema_version: 1,
|
||||
defaults: { format: 'table' },
|
||||
state: { current_app: 'app-2' },
|
||||
})
|
||||
const r = loadConfig(getConfigurationStore())
|
||||
expect(r.found).toBe(true)
|
||||
if (r.found) {
|
||||
expect(r.config.defaults.format).toBe('table')
|
||||
expect(r.config.state.current_app).toBe('app-2')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
109
cli/src/store/keyring-based-store.test.ts
Normal file
109
cli/src/store/keyring-based-store.test.ts
Normal file
@ -0,0 +1,109 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
const passwords = new Map<string, string>()
|
||||
const setPassword = vi.fn()
|
||||
const getPassword = vi.fn()
|
||||
const deletePassword = vi.fn()
|
||||
|
||||
class FakeEntry {
|
||||
private readonly key: string
|
||||
constructor(service: string, username: string) {
|
||||
this.key = `${service}::${username}`
|
||||
}
|
||||
|
||||
setPassword(value: string): void {
|
||||
setPassword(this.key, value)
|
||||
passwords.set(this.key, value)
|
||||
}
|
||||
|
||||
getPassword(): string | null {
|
||||
getPassword(this.key)
|
||||
return passwords.get(this.key) ?? null
|
||||
}
|
||||
|
||||
deletePassword(): boolean {
|
||||
deletePassword(this.key)
|
||||
if (!passwords.has(this.key))
|
||||
return false
|
||||
passwords.delete(this.key)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
vi.mock('@napi-rs/keyring', () => ({
|
||||
Entry: FakeEntry,
|
||||
}))
|
||||
|
||||
const { KeyringBasedStore } = await import('./store.js')
|
||||
|
||||
const SERVICE = 'difyctl-test'
|
||||
|
||||
beforeEach(() => {
|
||||
passwords.clear()
|
||||
setPassword.mockClear()
|
||||
getPassword.mockClear()
|
||||
deletePassword.mockClear()
|
||||
})
|
||||
|
||||
describe('KeyringBasedStore', () => {
|
||||
it('returns default when entry missing', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
expect(s.get({ key: 'k', default: 'fallback' })).toBe('fallback')
|
||||
})
|
||||
|
||||
it('round-trips strings via JSON encoding', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
s.set({ key: 'k', default: '' }, 'tok-abc')
|
||||
expect(s.get({ key: 'k', default: '' })).toBe('tok-abc')
|
||||
})
|
||||
|
||||
it('isolates entries by key', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
s.set({ key: 'a', default: '' }, 'A')
|
||||
s.set({ key: 'b', default: '' }, 'B')
|
||||
expect(s.get({ key: 'a', default: '' })).toBe('A')
|
||||
expect(s.get({ key: 'b', default: '' })).toBe('B')
|
||||
})
|
||||
|
||||
it('unset removes the entry', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
s.set({ key: 'k', default: '' }, 'v')
|
||||
s.unset({ key: 'k', default: '' })
|
||||
expect(s.get({ key: 'k', default: '' })).toBe('')
|
||||
})
|
||||
|
||||
it('unset is a no-op when entry missing', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
expect(() => s.unset({ key: 'gone', default: '' })).not.toThrow()
|
||||
})
|
||||
|
||||
it('swallows getPassword exceptions and returns default', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
getPassword.mockImplementationOnce(
|
||||
() => {
|
||||
throw new Error('NoEntry')
|
||||
},
|
||||
)
|
||||
expect(s.get({ key: 'k', default: 'd' })).toBe('d')
|
||||
})
|
||||
|
||||
it('swallows unset exceptions', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
deletePassword.mockImplementationOnce(
|
||||
() => {
|
||||
throw new Error('NoEntry')
|
||||
},
|
||||
)
|
||||
expect(() => s.unset({ key: 'k', default: '' })).not.toThrow()
|
||||
})
|
||||
|
||||
it('lets set propagate exceptions (caller decides fallback)', () => {
|
||||
const s = new KeyringBasedStore(SERVICE)
|
||||
setPassword.mockImplementationOnce(
|
||||
() => {
|
||||
throw new Error('keyring locked')
|
||||
},
|
||||
)
|
||||
expect(() => s.set({ key: 'k', default: '' }, 'v')).toThrow(/keyring locked/)
|
||||
})
|
||||
})
|
||||
78
cli/src/store/manager.test.ts
Normal file
78
cli/src/store/manager.test.ts
Normal file
@ -0,0 +1,78 @@
|
||||
import type { Key, Store } from './store.js'
|
||||
import { describe, expect, it, vi } from 'vitest'
|
||||
import { getTokenStore } from './manager.js'
|
||||
|
||||
function memStore(label: string): Store & { _label: string } {
|
||||
const map = new Map<string, unknown>()
|
||||
return {
|
||||
_label: label,
|
||||
get<T>(key: Key<T>): T {
|
||||
return (map.get(key.key) as T | undefined) ?? key.default
|
||||
},
|
||||
set<T>(key: Key<T>, value: T): void {
|
||||
map.set(key.key, value)
|
||||
},
|
||||
unset<T>(key: Key<T>): void {
|
||||
map.delete(key.key)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
describe('getTokenStore', () => {
|
||||
it('returns keychain store when probe succeeds', async () => {
|
||||
const k = memStore('keyring')
|
||||
const f = memStore('file')
|
||||
const result = await getTokenStore({
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('keychain')
|
||||
expect(result.store).toBe(k)
|
||||
})
|
||||
|
||||
it('falls back to file when keyring set throws', async () => {
|
||||
const k = memStore('keyring')
|
||||
const f = memStore('file')
|
||||
k.set = vi.fn(
|
||||
() => {
|
||||
throw new Error('locked')
|
||||
},
|
||||
)
|
||||
const result = await getTokenStore({
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('falls back to file when probe round-trip mismatches', async () => {
|
||||
const k = memStore('keyring')
|
||||
const f = memStore('file')
|
||||
k.get = vi.fn(() => 'something-else')
|
||||
const result = await getTokenStore({
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('falls back to file when keyring constructor throws', async () => {
|
||||
const f = memStore('file')
|
||||
const result = await getTokenStore({
|
||||
factory: {
|
||||
keyring: () => { throw new Error('no backend') },
|
||||
file: () => f,
|
||||
},
|
||||
})
|
||||
expect(result.mode).toBe('file')
|
||||
expect(result.store).toBe(f)
|
||||
})
|
||||
|
||||
it('cleans up probe entry after successful probe', async () => {
|
||||
const k = memStore('keyring')
|
||||
const f = memStore('file')
|
||||
await getTokenStore({
|
||||
factory: { keyring: () => k, file: () => f },
|
||||
})
|
||||
expect(k.get({ key: '__difyctl_probe__', default: '' })).toBe('')
|
||||
})
|
||||
})
|
||||
@ -1,28 +1,77 @@
|
||||
import type { Store } from './store'
|
||||
import type { Key, StorageMode, Store } from './store'
|
||||
import { join } from 'node:path'
|
||||
import { FILE_NAME } from '../config/schema'
|
||||
import { resolveCacheDir, resolveConfigDir } from './dir'
|
||||
import { YamlStore } from './store'
|
||||
import { KeyringBasedStore, YamlStore } from './store'
|
||||
|
||||
export const CACHE_APP_INFO = 'app-info'
|
||||
export const CACHE_NUDGE = 'nudge'
|
||||
const HOSTS_FILE = 'hosts.yml'
|
||||
const TOKENS_FILE = 'tokens.yml'
|
||||
export const CONFIG_FILE_NAME = 'config.yml'
|
||||
|
||||
const KEYRING_SERVICE = 'difyctl'
|
||||
|
||||
function getStore(filePath: string): YamlStore {
|
||||
return new YamlStore(filePath)
|
||||
}
|
||||
|
||||
function resolveConfigurationPath(): string {
|
||||
return join(resolveConfigDir(), FILE_NAME)
|
||||
}
|
||||
|
||||
export function cachePath(cacheDir: string, name: string): string {
|
||||
return join(cacheDir, `${name}.yml`)
|
||||
}
|
||||
|
||||
export function getConfigurationStore(): YamlStore {
|
||||
return getStore(resolveConfigurationPath())
|
||||
return getStore(join(resolveConfigDir(), CONFIG_FILE_NAME))
|
||||
}
|
||||
|
||||
export function getCache(cacheName: string): Store {
|
||||
return getStore(cachePath(resolveCacheDir(), cacheName))
|
||||
}
|
||||
|
||||
export function getHostStore(): YamlStore {
|
||||
return getStore(join(resolveConfigDir(), HOSTS_FILE))
|
||||
}
|
||||
|
||||
const PROBE_KEY: Key<string> = { key: '__difyctl_probe__', default: '' }
|
||||
const PROBE_VALUE = 'probe-v1'
|
||||
|
||||
export type GetTokenStoreOptions = {
|
||||
readonly factory?: {
|
||||
readonly keyring?: () => Store
|
||||
readonly file?: () => Store
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Single entry point for the credential store. Probes the OS keyring; if it
|
||||
* round-trips a value, returns the keychain-backed store. Otherwise falls
|
||||
* back to the YAML file at `<configDir>/tokens.yml`. Both implementations
|
||||
* satisfy the `Store` interface, so callers interact uniformly.
|
||||
*
|
||||
* Business logic should always obtain the token store through this factory
|
||||
* rather than constructing one directly.
|
||||
*/
|
||||
export async function getTokenStore(opts: GetTokenStoreOptions = {}): Promise<{ store: Store, mode: StorageMode }> {
|
||||
const fileFactory = opts.factory?.file ?? (() => getStore(join(resolveConfigDir(), TOKENS_FILE)))
|
||||
const keyringFactory = opts.factory?.keyring ?? (() => new KeyringBasedStore(KEYRING_SERVICE))
|
||||
try {
|
||||
const k = keyringFactory()
|
||||
k.set(PROBE_KEY, PROBE_VALUE)
|
||||
const got = k.get(PROBE_KEY)
|
||||
k.unset(PROBE_KEY)
|
||||
if (got !== PROBE_VALUE)
|
||||
throw new Error('keyring round-trip mismatch')
|
||||
return { store: k, mode: 'keychain' }
|
||||
}
|
||||
catch {
|
||||
return { store: fileFactory(), mode: 'file' }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps an auth identity (host + accountId) to a `Store` key. All token store
|
||||
* reads/writes in business logic go through this helper so the on-disk /
|
||||
* keyring layout stays consistent.
|
||||
*/
|
||||
export function tokenKey(host: string, accountId: string): Key<string> {
|
||||
return { key: `tokens.${host}.${accountId}`, default: '' }
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import type { Platform } from '../sys'
|
||||
import fs from 'node:fs'
|
||||
import { dirname } from 'node:path'
|
||||
import { Entry } from '@napi-rs/keyring'
|
||||
import yaml from 'js-yaml'
|
||||
import lockfile from 'lockfile'
|
||||
import { pid, resolvePlatform } from '../sys'
|
||||
@ -8,7 +9,7 @@ import { pid, resolvePlatform } from '../sys'
|
||||
const FILE_PERM = 0o600
|
||||
const DIR_PERM = 0o700
|
||||
|
||||
type Key<T> = {
|
||||
export type Key<T> = {
|
||||
default: T
|
||||
key: string
|
||||
}
|
||||
@ -16,8 +17,11 @@ type Key<T> = {
|
||||
export type Store = {
|
||||
get: <T>(key: Key<T>) => T
|
||||
set: <T>(key: Key<T>, value: T) => void
|
||||
unset: <T>(key: Key<T>) => void
|
||||
}
|
||||
|
||||
export type StorageMode = 'keychain' | 'file'
|
||||
|
||||
export class ConcurrentAccessError extends Error {
|
||||
constructor(filePath: string) {
|
||||
super(`Another process is modifying the file ${filePath}. remove ${filePath}.lock to reset lock.`)
|
||||
@ -87,7 +91,6 @@ abstract class FileBasedStore implements Store {
|
||||
protected withLock<R>(body: () => R): R {
|
||||
this.lock()
|
||||
try {
|
||||
this.load()
|
||||
return body()
|
||||
}
|
||||
finally {
|
||||
@ -96,18 +99,44 @@ abstract class FileBasedStore implements Store {
|
||||
}
|
||||
|
||||
get<T>(key: Key<T>): T {
|
||||
return this.withLock(() => this.doGet(key))
|
||||
return this.withLock(() => {
|
||||
this.load()
|
||||
return this.doGet(key)
|
||||
})
|
||||
}
|
||||
|
||||
set<T>(key: Key<T>, value: T) {
|
||||
this.withLock(() => {
|
||||
this.load()
|
||||
this.doSet(key, value)
|
||||
this.flush()
|
||||
})
|
||||
}
|
||||
|
||||
unset<T>(key: Key<T>): void {
|
||||
this.withLock(() => {
|
||||
this.load()
|
||||
this.doUnset(key)
|
||||
this.flush()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the underlying file of the store. No-op if file doesn't exist.
|
||||
*/
|
||||
rm(): void {
|
||||
try {
|
||||
fs.unlinkSync(this.file_path)
|
||||
}
|
||||
catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
abstract doGet<T>(key: Key<T>): T
|
||||
abstract doSet<T>(key: Key<T>, value: T): void
|
||||
abstract doUnset<T>(key: Key<T>): void
|
||||
}
|
||||
|
||||
export class YamlStore extends FileBasedStore {
|
||||
@ -136,6 +165,7 @@ export class YamlStore extends FileBasedStore {
|
||||
|
||||
setTyped<T>(data: T): void {
|
||||
this.withLock(() => {
|
||||
this.load()
|
||||
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
|
||||
this.flush()
|
||||
})
|
||||
@ -156,6 +186,25 @@ export class YamlStore extends FileBasedStore {
|
||||
current[lastKey] = value
|
||||
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
|
||||
}
|
||||
|
||||
doUnset<T>(key: Key<T>): void {
|
||||
const data = loadYaml(this.raw_content) || {}
|
||||
const parts = key.key.split('.')
|
||||
const lastKey = parts.pop()
|
||||
if (lastKey === undefined)
|
||||
return
|
||||
let current: Record<string, unknown> = data
|
||||
for (const part of parts) {
|
||||
const next = current[part]
|
||||
if (next === null || next === undefined || typeof next !== 'object')
|
||||
return
|
||||
current = next as Record<string, unknown>
|
||||
}
|
||||
if (!(lastKey in current))
|
||||
return
|
||||
delete current[lastKey]
|
||||
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
|
||||
}
|
||||
}
|
||||
|
||||
function loadYaml(raw: string | undefined): Record<string, unknown> | null {
|
||||
@ -163,3 +212,39 @@ function loadYaml(raw: string | undefined): Record<string, unknown> | null {
|
||||
return null
|
||||
return (yaml.load(raw) ?? {}) as Record<string, unknown>
|
||||
}
|
||||
|
||||
/**
|
||||
* OS-keyring-based storage primitive. Sits at the same layer as
|
||||
* `FileBasedStore`: implements `Store` with each `Key<T>` corresponding to a
|
||||
* single keyring entry under the configured service. Values are JSON-encoded.
|
||||
*/
|
||||
export class KeyringBasedStore implements Store {
|
||||
private readonly service: string
|
||||
|
||||
constructor(service: string) {
|
||||
this.service = service
|
||||
}
|
||||
|
||||
get<T>(key: Key<T>): T {
|
||||
try {
|
||||
const v = new Entry(this.service, key.key).getPassword()
|
||||
if (v === null || v === undefined || v === '')
|
||||
return key.default
|
||||
return JSON.parse(v) as T
|
||||
}
|
||||
catch {
|
||||
return key.default
|
||||
}
|
||||
}
|
||||
|
||||
set<T>(key: Key<T>, value: T): void {
|
||||
new Entry(this.service, key.key).setPassword(JSON.stringify(value))
|
||||
}
|
||||
|
||||
unset<T>(key: Key<T>): void {
|
||||
try {
|
||||
new Entry(this.service, key.key).deletePassword()
|
||||
}
|
||||
catch { /* missing entry is fine */ }
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,8 +5,8 @@ import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { loadNudgeStore } from '../cache/nudge-store.js'
|
||||
import { CACHE_NUDGE, cachePath } from '../store/manager.js'
|
||||
import { YamlStore } from '../store/store.js'
|
||||
import { ENV_CACHE_DIR } from '../store/dir.js'
|
||||
import { CACHE_NUDGE, getCache } from '../store/manager.js'
|
||||
import { maybeNudgeCompat } from './nudge.js'
|
||||
|
||||
const HOST = 'https://cloud.dify.ai'
|
||||
@ -44,11 +44,18 @@ describe('maybeNudgeCompat', () => {
|
||||
let dir: string
|
||||
let store: NudgeStore
|
||||
|
||||
let prevCacheDir: string | undefined
|
||||
beforeEach(async () => {
|
||||
dir = await mkdtemp(join(tmpdir(), 'difyctl-nudge-'))
|
||||
store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: fixedNow })
|
||||
prevCacheDir = process.env[ENV_CACHE_DIR]
|
||||
process.env[ENV_CACHE_DIR] = dir
|
||||
store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: fixedNow })
|
||||
})
|
||||
afterEach(async () => {
|
||||
if (prevCacheDir === undefined)
|
||||
delete process.env[ENV_CACHE_DIR]
|
||||
else
|
||||
process.env[ENV_CACHE_DIR] = prevCacheDir
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
@ -78,12 +85,12 @@ describe('maybeNudgeCompat', () => {
|
||||
|
||||
it('warns again after the silence window has elapsed', async () => {
|
||||
const yesterday = new Date(NOW.getTime() - 25 * 60 * 60 * 1000)
|
||||
const tStore = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => yesterday })
|
||||
const tStore = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => yesterday })
|
||||
await tStore.markWarned(HOST)
|
||||
const probe = vi.fn(async () => UNSUPPORTED)
|
||||
const { emit, lines } = emitterSpy()
|
||||
|
||||
const freshStore = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: fixedNow })
|
||||
const freshStore = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: fixedNow })
|
||||
await maybeNudgeCompat(HOST, baseDeps({ store: freshStore, probe, emit }))
|
||||
|
||||
expect(probe).toHaveBeenCalledOnce()
|
||||
|
||||
@ -160,7 +160,8 @@ describe('runVersionProbe', () => {
|
||||
const url = new URL(mock.url)
|
||||
const prevConfig = process.env[ENV_CONFIG_DIR]
|
||||
try {
|
||||
await saveHosts(configDir, {
|
||||
process.env[ENV_CONFIG_DIR] = configDir
|
||||
saveHosts({
|
||||
current_host: url.host,
|
||||
scheme: url.protocol.replace(':', ''),
|
||||
token_storage: 'file',
|
||||
|
||||
@ -5,7 +5,6 @@ import type { Channel } from './info.js'
|
||||
import { META_PROBE_TIMEOUT_MS, MetaClient } from '../api/meta.js'
|
||||
import { loadHosts } from '../auth/hosts.js'
|
||||
import { createClient } from '../http/client.js'
|
||||
import { resolveConfigDir } from '../store/dir.js'
|
||||
import { arch, platform } from '../sys/index.js'
|
||||
import { hostWithScheme } from '../util/host.js'
|
||||
import { difyCompat, evaluateCompat } from './compat.js'
|
||||
@ -48,7 +47,7 @@ export type RunVersionProbeOptions = {
|
||||
readonly probe?: MetaProbe
|
||||
}
|
||||
|
||||
const defaultLoadBundle = async (): Promise<HostsBundle | undefined> => loadHosts(resolveConfigDir())
|
||||
const defaultLoadBundle = async (): Promise<HostsBundle | undefined> => loadHosts()
|
||||
|
||||
const defaultProbe: MetaProbe = async (endpoint) => {
|
||||
const http = createClient({ host: endpoint, timeoutMs: META_PROBE_TIMEOUT_MS, retryAttempts: 0 })
|
||||
|
||||
Reference in New Issue
Block a user