Compare commits

..

4 Commits

Author SHA1 Message Date
6be223ac3f prevent removing entire host file 2026-05-27 18:55:55 +08:00
13ac79780e prevent double load 2026-05-27 18:52:00 +08:00
69b6a4ca5a prevent direct instantiation of YamlStore 2026-05-27 18:21:05 +08:00
5e9d9f091e refactor token store 2026-05-27 18:15:35 +08:00
66 changed files with 899 additions and 3114 deletions

View File

@ -38,8 +38,6 @@ from clients.agent_backend.request_builder import (
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
CleanupLayerSpec,
extract_cleanup_layer_specs,
redact_for_agent_backend_log,
)
@ -70,11 +68,9 @@ __all__ = [
"AgentBackendTransportError",
"AgentBackendValidationError",
"AgentBackendWorkflowNodeRunInput",
"CleanupLayerSpec",
"DifyAgentBackendRunClient",
"FakeAgentBackendRunClient",
"FakeAgentBackendScenario",
"create_agent_backend_run_client",
"extract_cleanup_layer_specs",
"redact_for_agent_backend_log",
]

View File

@ -20,8 +20,6 @@ from dify_agent.protocol import (
RunEvent,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunStatusResponse,
RunSucceededEvent,
@ -36,7 +34,6 @@ class FakeAgentBackendScenario(StrEnum):
SUCCESS = "success"
FAILED = "failed"
PAUSED = "paused"
class FakeAgentBackendRunClient:
@ -92,13 +89,6 @@ class FakeAgentBackendRunClient:
updated_at=_FIXED_TIME,
error="fake failure",
)
case FakeAgentBackendScenario.PAUSED:
return RunStatusResponse(
run_id=run_id,
status="paused",
created_at=_FIXED_TIME,
updated_at=_FIXED_TIME,
)
def _events(self, run_id: str) -> tuple[RunEvent, ...]:
match self.scenario:
@ -125,17 +115,3 @@ class FakeAgentBackendRunClient:
data=RunFailedEventData(error="fake failure", reason="unit_test"),
),
)
case FakeAgentBackendScenario.PAUSED:
return (
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
RunPausedEvent(
id="2-0",
run_id=run_id,
created_at=_FIXED_TIME,
data=RunPausedEventData(
reason="human_input_required",
message="Agent requested human input.",
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
)

View File

@ -11,13 +11,11 @@ composition-driven.
from __future__ import annotations
from typing import ClassVar, cast
from typing import ClassVar
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
@ -31,7 +29,6 @@ from dify_agent.layers.execution_context import (
)
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import (
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
@ -48,84 +45,6 @@ WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
# Layer types that hold credentials in their per-run config. These are excluded
# from the cleanup-replay composition (and from the snapshot that is sent with
# the cleanup request) because we deliberately do not persist plaintext
# credentials between runs.
_CLEANUP_EXCLUDED_LAYER_TYPES: tuple[str, ...] = (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
)
class CleanupLayerSpec(BaseModel):
"""One layer node replayed by an Agent backend cleanup-only run.
Cleanup composition cannot include credential-bearing plugin layers, so we
persist only the non-plugin layer specs together with the original config.
Storing the config (rather than just ``name``/``type``) means cleanup does
not depend on the original build-time inputs being re-derivable.
"""
name: str
type: str
deps: dict[str, str] = Field(default_factory=dict)
metadata: dict[str, JsonValue] = Field(default_factory=dict)
config: JsonValue = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def extract_cleanup_layer_specs(composition: RunComposition) -> list[CleanupLayerSpec]:
"""Project the in-flight composition into the persistable cleanup spec list.
Plugin layers are intentionally dropped (their configs hold credentials and
the lifecycle contract says "do not include an LLM layer" during cleanup).
The filtered names must later drive snapshot filtering so the agenton
compositor's name-order check still passes for the cleanup run.
"""
excluded = set(_CLEANUP_EXCLUDED_LAYER_TYPES)
specs: list[CleanupLayerSpec] = []
for layer in composition.layers:
if layer.type in excluded:
continue
config_value: JsonValue = None
if isinstance(layer.config, BaseModel):
config_value = layer.config.model_dump(mode="json", warnings=False)
else:
# ``RunLayerSpec.config`` is typed as ``LayerConfigInput`` which
# includes ``Mapping[str, object] | bytes``. In the cleanup-replay
# pipeline our builder only emits BaseModel-derived configs or
# ``None``, so the wider input alias narrows safely here.
config_value = cast(JsonValue, layer.config)
specs.append(
CleanupLayerSpec(
name=layer.name,
type=layer.type,
deps=dict(layer.deps),
metadata=dict(layer.metadata),
config=config_value,
)
)
return specs
def _filter_snapshot_to_specs(
snapshot: CompositorSessionSnapshot,
specs: list[CleanupLayerSpec],
) -> CompositorSessionSnapshot:
"""Keep only snapshot layers whose names appear in the cleanup spec list.
The agenton compositor rejects a snapshot whose layer-name sequence does
not match the active composition exactly. Cleanup-replay drops plugin
layers, so we must drop the matching snapshot entries here.
"""
kept_names = {spec.name for spec in specs}
filtered_layers: list[LayerSessionSnapshot] = [layer for layer in snapshot.layers if layer.name in kept_names]
if len(filtered_layers) == len(snapshot.layers):
return snapshot
return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers)
class AgentBackendModelConfig(BaseModel):
"""API-side model/plugin selection before it is converted to Dify Agent layers."""
@ -167,8 +86,7 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
output: AgentBackendOutputConfig | None = None
tools: DifyPluginToolsLayerConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
include_history: bool = True
suspend_on_exit: bool = True
suspend_on_exit: bool = False
metadata: dict[str, JsonValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
@ -184,50 +102,6 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
class AgentBackendRunRequestBuilder:
"""Converts API product state into the public ``dify-agent`` run protocol."""
def build_cleanup_request(
self,
*,
session_snapshot: CompositorSessionSnapshot,
composition_layer_specs: list[CleanupLayerSpec],
idempotency_key: str | None = None,
metadata: dict[str, JsonValue] | None = None,
) -> CreateRunRequest:
"""Build a lifecycle-only cleanup request that replays the prior layers.
The agenton compositor enforces that the session snapshot's layer names
match the active composition in order, so cleanup must replay the same
non-plugin layer graph that produced the snapshot. Plugin layers
(``dify.plugin.llm``, ``dify.plugin.tools``) are excluded from both the
composition and the snapshot before submission because their configs
require credentials that are not persisted between runs.
"""
if not composition_layer_specs:
raise ValueError(
"build_cleanup_request requires composition_layer_specs; an empty "
"composition would fail the agent backend's snapshot validation."
)
request_metadata = dict(metadata or {})
request_metadata["agent_backend_lifecycle"] = "session_cleanup"
layers = [
RunLayerSpec(
name=spec.name,
type=spec.type,
deps=dict(spec.deps),
metadata=dict(spec.metadata),
config=spec.config,
)
for spec in composition_layer_specs
]
filtered_snapshot = _filter_snapshot_to_specs(session_snapshot, composition_layer_specs)
return CreateRunRequest(
composition=RunComposition(layers=layers),
purpose="workflow_node",
idempotency_key=idempotency_key,
metadata=request_metadata,
session_snapshot=filtered_snapshot,
on_exit=LayerExitSignals(default=ExitIntent.DELETE),
)
def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest:
"""Build a workflow Agent Node run request without defining another wire schema."""
layers: list[RunLayerSpec] = []
@ -261,20 +135,6 @@ class AgentBackendRunRequestBuilder:
metadata=run_input.metadata,
config=run_input.execution_context,
),
]
)
if run_input.include_history:
layers.append(
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
metadata={**run_input.metadata, "origin": "agent_session_history"},
)
)
layers.extend(
[
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,

View File

@ -27,7 +27,6 @@ from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
@ -240,7 +239,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
conversation_variable_layer = ConversationVariablePersistenceLayer(
ConversationVariableUpdater(session_factory.get_session_maker())
)

View File

@ -10,7 +10,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@ -167,7 +166,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@ -475,7 +475,6 @@ class DifyNodeFactory(NodeFactory):
from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator
from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator
from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore
return {
"binding_resolver": WorkflowAgentBindingResolver(),
@ -495,7 +494,6 @@ class DifyNodeFactory(NodeFactory):
# outputs contain no file refs.
"type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()),
"failure_orchestrator": OutputFailureOrchestrator(),
"session_store": WorkflowAgentRuntimeSessionStore(),
}
return {
"strategy_resolver": self._agent_strategy_resolver,

View File

@ -1,11 +1,8 @@
from __future__ import annotations
import logging
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendError,
AgentBackendHTTPError,
@ -20,14 +17,11 @@ from clients.agent_backend import (
AgentBackendStreamInternalEvent,
AgentBackendTransportError,
AgentBackendValidationError,
CleanupLayerSpec,
extract_cleanup_layer_specs,
)
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.entities.pause_reason import SchedulingPause
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent
from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from graphon.nodes.base.node import Node
from models.agent_config_entities import WorkflowNodeJobConfig
@ -46,14 +40,11 @@ from .runtime_request_builder import (
WorkflowAgentRuntimeRequestBuilder,
WorkflowAgentRuntimeRequestBuildError,
)
from .session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
if TYPE_CHECKING:
from graphon.entities import GraphInitParams
from graphon.runtime import GraphRuntimeState
logger = logging.getLogger(__name__)
# Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return.
# Stream + started events are filtered out before we yield; transport errors
@ -83,7 +74,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
output_adapter: WorkflowAgentOutputAdapter,
type_checker: PerOutputTypeChecker,
failure_orchestrator: OutputFailureOrchestrator,
session_store: WorkflowAgentRuntimeSessionStore | None = None,
) -> None:
super().__init__(
node_id=node_id,
@ -98,7 +88,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
self._output_adapter = output_adapter
self._type_checker = type_checker
self._failure_orchestrator = failure_orchestrator
self._session_store = session_store
@classmethod
def version(cls) -> str:
@ -145,17 +134,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
"agent_config_snapshot_id": bundle.snapshot.id,
"binding_id": bundle.binding.id,
}
session_scope = WorkflowAgentSessionScope(
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id=self._node_id,
node_execution_id=self.id,
binding_id=bundle.binding.id,
agent_id=bundle.agent.id,
agent_config_snapshot_id=bundle.snapshot.id,
)
# Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both
# the backend request and the post-run type check.
@ -169,9 +147,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
attempt = 0
while True:
try:
session_snapshot = None
if self._session_store is not None:
session_snapshot = self._session_store.load_active_snapshot(session_scope)
runtime_request = self._runtime_request_builder.build(
WorkflowAgentRuntimeBuildContext(
dify_context=dify_ctx,
@ -184,7 +159,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
agent=bundle.agent,
snapshot=bundle.snapshot,
attempt=attempt,
session_snapshot=session_snapshot,
)
)
except WorkflowAgentRuntimeRequestBuildError as error:
@ -247,35 +221,9 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
return
if isinstance(terminal_event, AgentBackendRunPausedInternalEvent):
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
snapshot=terminal_event.session_snapshot,
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
metadata=metadata,
)
yield PauseRequestedEvent(
reason=SchedulingPause(
message=terminal_event.message
or "Agent backend run requested workflow pause for external input."
)
)
return
# Non-success terminal (failed / cancelled) skips per-output
# post-processing — the backend itself already failed. We also retire
# the local ACTIVE session row so a workflow loop back into the same
# Agent node cannot resume from a stale snapshot. The failed agent
# backend layers (suspended per ``on_exit``) are left for agent
# backend's own GC; this row will no longer be picked up by the
# workflow-terminal cleanup layer.
# Non-success terminal (failed / cancelled / paused) skips per-output
# post-processing — the backend itself already failed.
if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent):
self._mark_session_cleaned_on_failure(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
metadata=metadata,
)
yield StreamCompletedEvent(
node_run_result=self._output_adapter.build_failure_result(
event=terminal_event,
@ -286,14 +234,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
return
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
snapshot=terminal_event.session_snapshot,
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
metadata=metadata,
)
# ──── Stage 4: per-output type check ────
type_check = self._type_checker.check(
declared_outputs=effective_outputs,
@ -444,75 +384,6 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
],
}
def _save_session_snapshot(
self,
*,
session_scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
metadata: dict[str, Any],
) -> None:
if self._session_store is None:
return
try:
self._session_store.save_active_snapshot(
scope=session_scope,
backend_run_id=backend_run_id,
snapshot=snapshot,
composition_layer_specs=composition_layer_specs,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_persisted"] = snapshot is not None
metadata["agent_backend"] = agent_backend
except Exception:
logger.warning(
"Failed to persist workflow Agent runtime session snapshot: "
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
session_scope.tenant_id,
session_scope.workflow_run_id,
session_scope.node_id,
session_scope.binding_id,
session_scope.agent_id,
backend_run_id,
exc_info=True,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_persisted"] = False
agent_backend["session_snapshot_persist_error"] = "workflow_agent_runtime_session_store_error"
metadata["agent_backend"] = agent_backend
def _mark_session_cleaned_on_failure(
self,
*,
session_scope: WorkflowAgentSessionScope,
backend_run_id: str,
metadata: dict[str, Any],
) -> None:
if self._session_store is None:
return
try:
self._session_store.mark_cleaned(scope=session_scope, backend_run_id=backend_run_id)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_cleaned_on_failure"] = True
metadata["agent_backend"] = agent_backend
except Exception:
logger.warning(
"Failed to mark workflow Agent runtime session cleaned on agent run failure: "
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
session_scope.tenant_id,
session_scope.workflow_run_id,
session_scope.node_id,
session_scope.binding_id,
session_scope.agent_id,
backend_run_id,
exc_info=True,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_cleaned_on_failure"] = False
agent_backend["session_snapshot_cleanup_error"] = "workflow_agent_runtime_session_store_error"
metadata["agent_backend"] = agent_backend
@staticmethod
def _patch_event_with_defaults(
event: AgentBackendRunSucceededInternalEvent,

View File

@ -4,7 +4,6 @@ from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any, Literal, Protocol, cast
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
from dify_agent.protocol import CreateRunRequest
@ -29,7 +28,6 @@ from models.agent_config_entities import (
from models.agent_config_entities import (
effective_declared_outputs as _effective_declared_outputs,
)
from models.provider_ids import ModelProviderID
from .output_failure_orchestrator import retry_idempotency_key
from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError
@ -68,7 +66,6 @@ class WorkflowAgentRuntimeBuildContext:
# Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the
# idempotency key so the backend treats each retry as a fresh request.
attempt: int = 0
session_snapshot: CompositorSessionSnapshot | None = None
@dataclass(frozen=True, slots=True)
@ -132,14 +129,11 @@ class WorkflowAgentRuntimeRequestBuilder:
request = self._request_builder.build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
plugin_id=self._plugin_daemon_plugin_id(
plugin_id=agent_soul.model.plugin_id,
model_provider=agent_soul.model.model_provider,
),
model_provider=self._plugin_daemon_provider_name(agent_soul.model.model_provider),
plugin_id=agent_soul.model.plugin_id,
model_provider=agent_soul.model.model_provider,
model=agent_soul.model.model,
credentials=self._normalize_credentials(credentials),
model_settings=agent_soul.model.model_settings,
model_settings=cast(dict[str, Any], agent_soul.model.model_settings),
),
# The execution-context layer is now the only public protocol
# carrier for Dify tenant/user/run identifiers. ``user_id`` must
@ -164,7 +158,6 @@ class WorkflowAgentRuntimeRequestBuilder:
user_prompt=user_prompt,
output=self._build_output_config(node_job.declared_outputs),
tools=tools_layer,
session_snapshot=context.session_snapshot,
idempotency_key=self._idempotency_key(context),
metadata=metadata,
)
@ -184,20 +177,6 @@ class WorkflowAgentRuntimeRequestBuilder:
return "single_step"
return "workflow_run"
@staticmethod
def _plugin_daemon_plugin_id(*, plugin_id: str, model_provider: str) -> str:
"""Return the transport plugin id expected by plugin-daemon headers."""
if plugin_id.count("/") == 1:
return plugin_id
if plugin_id:
return ModelProviderID(plugin_id).plugin_id
return ModelProviderID(model_provider).plugin_id
@staticmethod
def _plugin_daemon_provider_name(model_provider: str) -> str:
"""Return the provider name expected by plugin-daemon dispatch payloads."""
return ModelProviderID(model_provider).provider_name
@staticmethod
def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str:
# Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so

View File

@ -1,247 +0,0 @@
from __future__ import annotations
import logging
from typing import override
from clients.agent_backend import AgentBackendError, AgentBackendRunClient, AgentBackendRunRequestBuilder
from clients.agent_backend.factory import create_agent_backend_run_client
from configs import dify_config
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import (
GraphEngineEvent,
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunSucceededEvent,
)
from .session_store import StoredWorkflowAgentSession, WorkflowAgentRuntimeSessionStore
logger = logging.getLogger(__name__)
# Upper bound on how long a cleanup-only run is allowed to settle before the
# layer gives up and leaves the row ACTIVE so it can be retried later. Cleanup
# work is mostly local agent-backend bookkeeping (no LLM inference), so 30s is
# generous; a hung backend should never block workflow termination beyond this.
_CLEANUP_WAIT_TIMEOUT_SECONDS = 30.0
class WorkflowAgentSessionCleanupLayer(GraphEngineLayer):
"""Retires workflow Agent session snapshots when a workflow reaches a terminal state.
Implementation notes — there are two failure modes the cleanup path has to
avoid simultaneously:
1. The agenton compositor on the agent-backend side validates the cleanup
request's session snapshot against the replayed composition before
running any lifecycle hook. If the snapshot's layer names diverge from
the composition, the run fails asynchronously with ``run_failed`` — but
the initial ``POST /runs`` already returned 202, so the API side has no
visibility of the failure unless it waits for terminal status. The
``composition_layer_specs`` persistence in A.1A.4 plus the
``_filter_snapshot_to_specs`` shape in ``build_cleanup_request`` keeps
the two name lists in sync.
2. The current agent backend's ``runner.py::_run_agent`` always invokes
``run.get_layer("llm")`` and the structured-output / history validators
before exiting any slot — there is no ``purpose: "cleanup"`` branch
yet. A truly cleanup-only request (no LLM layer) therefore still
crashes inside the runner with ``Layer 'llm' is not defined in this
compositor run.``. Until the backend grows a cleanup-only purpose,
this layer **does not issue an HTTP cleanup run**: it simply retires
the local snapshot row so stale state cannot be re-resumed, and lets
the agent backend's own retention TTL release the suspended layers.
The HTTP-cleanup machinery (``build_cleanup_request`` + ``wait_run``) is
intentionally still wired into the request builder + integration tests so
that when the agent backend supports cleanup runs we can flip the switch
here with a one-line change (see ``_HTTP_CLEANUP_SUPPORTED``).
"""
# Flip to True once dify-agent's runner has a ``purpose=cleanup`` branch
# that skips the LLM/output/user-prompt invariants. Until then we only
# update the local row; the spec list is still persisted so the future
# HTTP cleanup path has everything it needs.
_HTTP_CLEANUP_SUPPORTED: bool = False
_TERMINAL_EVENTS = (
GraphRunSucceededEvent,
GraphRunPartialSucceededEvent,
GraphRunFailedEvent,
GraphRunAbortedEvent,
)
def __init__(
self,
*,
session_store: WorkflowAgentRuntimeSessionStore,
request_builder: AgentBackendRunRequestBuilder,
agent_backend_client: AgentBackendRunClient | None,
cleanup_wait_timeout_seconds: float = _CLEANUP_WAIT_TIMEOUT_SECONDS,
) -> None:
super().__init__()
self._session_store = session_store
self._request_builder = request_builder
self._agent_backend_client = agent_backend_client
self._cleanup_wait_timeout_seconds = cleanup_wait_timeout_seconds
@override
def on_graph_start(self) -> None:
return
@override
def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, self._TERMINAL_EVENTS):
return
workflow_run_id = get_system_text(
self.graph_runtime_state.variable_pool,
SystemVariableKey.WORKFLOW_EXECUTION_ID,
)
if not workflow_run_id:
logger.warning("Skipping workflow Agent session cleanup: workflow_run_id is missing.")
return
for stored_session in self._session_store.list_active_sessions(workflow_run_id=workflow_run_id):
self._cleanup_session(stored_session)
@override
def on_graph_end(self, error: Exception | None) -> None:
return
def _cleanup_session(self, stored_session: StoredWorkflowAgentSession) -> None:
scope = stored_session.scope
if not self._HTTP_CLEANUP_SUPPORTED:
# Agent backend has no cleanup-only run mode yet (see class
# docstring). Retire the local row so future re-entries do not
# resume from stale state, and let the backend's retention TTL
# release the suspended layers on its own schedule.
logger.info(
"Workflow Agent session retired locally; HTTP cleanup is disabled "
"until the agent backend supports a cleanup-only run mode. "
"workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s previous_run_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.binding_id,
scope.agent_id,
stored_session.backend_run_id,
)
self._session_store.mark_cleaned(scope=scope, backend_run_id=stored_session.backend_run_id)
return
if self._agent_backend_client is None:
# HTTP cleanup was enabled by the caller but no client was wired
# in (e.g. the API runs without AGENT_BACKEND_BASE_URL configured).
# Leave the row ACTIVE so an operator restart with proper config
# can drive the cleanup; do not silently retire it.
logger.warning(
"Skipping Agent backend cleanup: HTTP cleanup is enabled but no agent "
"backend client is wired in. workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
)
return
if not stored_session.composition_layer_specs:
# Sessions persisted before A.1 landed do not carry the spec list,
# so we cannot replay a valid cleanup composition. Leave the row
# ACTIVE and warn so the absence shows up in observability rather
# than being silently swallowed by a doomed cleanup run.
logger.warning(
"Skipping Agent backend cleanup: no composition_layer_specs persisted. "
"workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
)
return
request = self._request_builder.build_cleanup_request(
session_snapshot=stored_session.session_snapshot,
composition_layer_specs=stored_session.composition_layer_specs,
idempotency_key=f"{scope.workflow_run_id}:{scope.node_id}:{scope.binding_id}:agent-session-cleanup",
metadata={
"tenant_id": scope.tenant_id,
"app_id": scope.app_id,
"workflow_id": scope.workflow_id,
"workflow_run_id": scope.workflow_run_id,
"node_id": scope.node_id,
"node_execution_id": scope.node_execution_id,
"binding_id": scope.binding_id,
"agent_id": scope.agent_id,
"agent_config_snapshot_id": scope.agent_config_snapshot_id,
"previous_agent_backend_run_id": stored_session.backend_run_id,
},
)
try:
response = self._agent_backend_client.create_run(request)
except AgentBackendError:
logger.warning(
"Agent backend session cleanup request failed: workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
exc_info=True,
)
return
try:
status_response = self._agent_backend_client.wait_run(
response.run_id, timeout_seconds=self._cleanup_wait_timeout_seconds
)
except AgentBackendError:
logger.warning(
"Agent backend session cleanup wait_run failed: "
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
response.run_id,
exc_info=True,
)
return
if status_response.status != "succeeded":
logger.warning(
"Agent backend session cleanup did not succeed: status=%s error=%s "
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
status_response.status,
status_response.error,
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
response.run_id,
)
return
self._session_store.mark_cleaned(scope=scope, backend_run_id=response.run_id)
def build_workflow_agent_session_cleanup_layer() -> WorkflowAgentSessionCleanupLayer:
"""Wire the cleanup layer with the standard production dependencies.
The agent backend client is constructed only when ``AGENT_BACKEND_BASE_URL``
is configured (or the deterministic fake is explicitly enabled). When
neither is set — for example unit tests that bring up the workflow runner
without an Agent node — we pass ``None`` so the layer stays harmless. With
``_HTTP_CLEANUP_SUPPORTED = False`` the local-retire branch never touches
the client anyway, but keeping it ``None`` avoids importing httpx and lets
test harnesses skip backend configuration.
"""
agent_backend_client: AgentBackendRunClient | None
if dify_config.AGENT_BACKEND_USE_FAKE or dify_config.AGENT_BACKEND_BASE_URL:
agent_backend_client = create_agent_backend_run_client(
base_url=dify_config.AGENT_BACKEND_BASE_URL,
use_fake=dify_config.AGENT_BACKEND_USE_FAKE,
fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO,
)
else:
agent_backend_client = None
return WorkflowAgentSessionCleanupLayer(
session_store=WorkflowAgentRuntimeSessionStore(),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)

View File

@ -1,179 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field
from agenton.compositor import CompositorSessionSnapshot
from pydantic import TypeAdapter
from sqlalchemy import select
from clients.agent_backend.request_builder import CleanupLayerSpec
from core.db.session_factory import session_factory
from libs.datetime_utils import naive_utc_now
from models.agent import (
WorkflowAgentRuntimeSession,
WorkflowAgentRuntimeSessionStatus,
)
_SPECS_ADAPTER: TypeAdapter[list[CleanupLayerSpec]] = TypeAdapter(list[CleanupLayerSpec])
def _serialize_specs(specs: list[CleanupLayerSpec]) -> str:
return _SPECS_ADAPTER.dump_json(specs).decode()
def _deserialize_specs(value: str | None) -> list[CleanupLayerSpec]:
if not value:
return []
return _SPECS_ADAPTER.validate_json(value)
@dataclass(frozen=True, slots=True)
class WorkflowAgentSessionScope:
tenant_id: str
app_id: str
workflow_id: str
workflow_run_id: str | None
node_id: str
node_execution_id: str
binding_id: str
agent_id: str
agent_config_snapshot_id: str
@dataclass(frozen=True, slots=True)
class StoredWorkflowAgentSession:
scope: WorkflowAgentSessionScope
session_snapshot: CompositorSessionSnapshot
backend_run_id: str | None
composition_layer_specs: list[CleanupLayerSpec] = field(default_factory=list)
class WorkflowAgentRuntimeSessionStore:
"""Stores Agent backend session snapshots for workflow Agent node re-entry."""
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
if scope.workflow_run_id is None:
return None
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
)
if row is None:
return None
return CompositorSessionSnapshot.model_validate_json(row.session_snapshot)
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
with session_factory.create_session() as session:
rows = session.scalars(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.workflow_run_id == workflow_run_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
).all()
return [
StoredWorkflowAgentSession(
scope=WorkflowAgentSessionScope(
tenant_id=row.tenant_id,
app_id=row.app_id,
workflow_id=row.workflow_id,
workflow_run_id=row.workflow_run_id,
node_id=row.node_id,
node_execution_id=row.node_execution_id or "",
binding_id=row.binding_id,
agent_id=row.agent_id,
agent_config_snapshot_id=row.agent_config_snapshot_id,
),
session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot),
backend_run_id=row.backend_run_id,
composition_layer_specs=_deserialize_specs(row.composition_layer_specs),
)
for row in rows
]
def save_active_snapshot(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
) -> None:
if scope.workflow_run_id is None or snapshot is None:
return
snapshot_json = snapshot.model_dump_json()
specs_json = _serialize_specs(composition_layer_specs)
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
)
)
if row is None:
row = WorkflowAgentRuntimeSession(
tenant_id=scope.tenant_id,
app_id=scope.app_id,
workflow_id=scope.workflow_id,
workflow_run_id=scope.workflow_run_id,
node_id=scope.node_id,
node_execution_id=scope.node_execution_id,
binding_id=scope.binding_id,
agent_id=scope.agent_id,
agent_config_snapshot_id=scope.agent_config_snapshot_id,
backend_run_id=backend_run_id,
session_snapshot=snapshot_json,
composition_layer_specs=specs_json,
status=WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
session.add(row)
else:
row.node_execution_id = scope.node_execution_id
row.agent_config_snapshot_id = scope.agent_config_snapshot_id
row.backend_run_id = backend_run_id
row.session_snapshot = snapshot_json
row.composition_layer_specs = specs_json
row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE
row.cleaned_at = None
session.commit()
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
if scope.workflow_run_id is None:
return
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
)
if row is None:
return
if backend_run_id is not None:
row.backend_run_id = backend_run_id
row.status = WorkflowAgentRuntimeSessionStatus.CLEANED
row.cleaned_at = naive_utc_now()
session.commit()
__all__ = [
"StoredWorkflowAgentSession",
"WorkflowAgentRuntimeSessionStore",
"WorkflowAgentSessionScope",
]

View File

@ -1,90 +0,0 @@
"""add workflow agent runtime sessions
Revision ID: 7885bd53f9a9
Revises: d4a5e1f3c9b7
Create Date: 2026-05-27 09:53:54.711805
"""
import sqlalchemy as sa
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = "7885bd53f9a9"
down_revision = "d4a5e1f3c9b7"
branch_labels = None
depends_on = None
def _is_pg() -> bool:
return op.get_bind().dialect.name == "postgresql"
def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column:
"""Match the ``uuidv7()`` default that other tables on Postgres rely on,
while staying portable on MySQL where the ORM supplies the id."""
kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key}
if primary_key and _is_pg():
kwargs["server_default"] = sa.text("uuidv7()")
return sa.Column(name, models.types.StringUUID(), **kwargs)
def upgrade() -> None:
op.create_table(
"workflow_agent_runtime_sessions",
_uuid_column("id", primary_key=True),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False),
sa.Column("node_id", sa.String(length=255), nullable=False),
sa.Column("node_execution_id", sa.String(length=255), nullable=True),
sa.Column("binding_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False),
sa.Column("backend_run_id", sa.String(length=255), nullable=True),
sa.Column("session_snapshot", models.types.LongText(), nullable=False),
# MySQL rejects ``server_default`` on TEXT/BLOB columns. The JSON
# payload is always populated at the ORM layer via
# ``WorkflowAgentRuntimeSessionStore.save_active_snapshot`` so the
# missing DB-level default cannot leave new rows uninitialized.
sa.Column("composition_layer_specs", models.types.LongText(), nullable=False),
sa.Column(
"status",
sa.String(length=32),
server_default=sa.text("'active'"),
nullable=False,
),
sa.Column("cleaned_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")),
sa.UniqueConstraint(
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name=op.f("workflow_agent_runtime_session_scope_unique"),
),
)
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
batch_op.create_index(
"workflow_agent_runtime_session_lookup_idx",
["tenant_id", "workflow_run_id", "node_id", "status"],
unique=False,
)
batch_op.create_index(
"workflow_agent_runtime_session_backend_run_idx",
["backend_run_id"],
unique=False,
)
def downgrade() -> None:
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
batch_op.drop_index("workflow_agent_runtime_session_backend_run_idx")
batch_op.drop_index("workflow_agent_runtime_session_lookup_idx")
op.drop_table("workflow_agent_runtime_sessions")

View File

@ -20,8 +20,6 @@ from .agent import (
AgentStatus,
WorkflowAgentBindingType,
WorkflowAgentNodeBinding,
WorkflowAgentRuntimeSession,
WorkflowAgentRuntimeSessionStatus,
)
from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint
from .comment import (
@ -237,8 +235,6 @@ __all__ = [
"Workflow",
"WorkflowAgentBindingType",
"WorkflowAgentNodeBinding",
"WorkflowAgentRuntimeSession",
"WorkflowAgentRuntimeSessionStatus",
"WorkflowAppLog",
"WorkflowAppLogCreatedFrom",
"WorkflowArchiveLog",

View File

@ -92,15 +92,6 @@ class WorkflowAgentBindingType(StrEnum):
INLINE_AGENT = "inline_agent"
class WorkflowAgentRuntimeSessionStatus(StrEnum):
"""Lifecycle state of an Agent backend session snapshot owned by a workflow run."""
# Snapshot can be reused by a later Agent run in the same workflow run.
ACTIVE = "active"
# Snapshot has been retired and must not be submitted to Agent backend again.
CLEANED = "cleaned"
class Agent(DefaultFieldsMixin, Base):
"""Workspace-scoped Agent identity used by Agent Roster and workflow-only agents."""
@ -282,56 +273,3 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base):
if isinstance(self.node_job_config, str):
return json.loads(self.node_job_config)
return dict(self.node_job_config)
class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base):
"""Persisted Agent backend session snapshot for one workflow Agent node execution scope.
The snapshot is runtime state returned by Agent backend. It is intentionally
separate from Agent Soul snapshots and workflow node-job config.
"""
__tablename__ = "workflow_agent_runtime_sessions"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"),
UniqueConstraint(
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name="workflow_agent_runtime_session_scope_unique",
),
Index(
"workflow_agent_runtime_session_lookup_idx",
"tenant_id",
"workflow_run_id",
"node_id",
"status",
),
Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"),
)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(255), nullable=False)
node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False)
# JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps,
# config}). Drives Agent backend cleanup-only runs: the agenton compositor
# rejects a session snapshot whose layer names do not match the cleanup
# composition, so we must replay the same layer graph (minus credential-
# bearing plugin layers) when issuing the cleanup request.
composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]")
status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column(
EnumText(WorkflowAgentRuntimeSessionStatus, length=32),
nullable=False,
default=WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)

View File

@ -1,134 +0,0 @@
"""Integration test for the cleanup request against the real agenton compositor.
The bug fixed by A+D was invisible to unit tests that use ``FakeAgentBackendRunClient``
because the fake client never runs agenton's ``_validate_session_snapshot``. This
test plugs a cleanup request through the real ``Compositor`` (with the same
providers the agent backend wires in production) so that the snapshot-vs-
composition name-order check would fail loudly if the cleanup builder ever
regressed back to the empty-composition shape.
"""
from __future__ import annotations
from typing import cast
import pytest
from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerProvider
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec
def test_cleanup_request_passes_agenton_snapshot_validation():
"""The cleanup request's composition layer names must match the (filtered)
snapshot's layer names exactly — agenton's compositor enforces this and
the agent backend rejects mismatches as ``run_failed`` asynchronously,
which is the trap A/D fixed."""
# Persisted (non-plugin) layer specs — these are what cleanup will replay.
# We exclude the dify.execution_context layer from this integration check
# because its real provider needs a plugin-daemon HTTP client; the cleanup
# validation we are exercising is the snapshot-vs-composition name check,
# which is purely structural and does not depend on which non-plugin layer
# types appear.
persisted_specs = [
CleanupLayerSpec(
name="workflow_node_job_prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config={"prefix": "Do the cleanup."},
),
CleanupLayerSpec(name="history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
# Saved snapshot still carries the LLM layer entry — cleanup's
# ``_filter_snapshot_to_specs`` must drop it so names match.
full_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="workflow_node_job_prompt",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"messages": []},
),
LayerSessionSnapshot(
name="llm",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
]
)
cleanup_request = AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=full_snapshot,
composition_layer_specs=persisted_specs,
)
# Drive the real agenton compositor through ``from_config`` + ``_create_run``
# the same way the agent backend's RunScheduler does. ``_create_run`` is the
# private path that calls ``_validate_session_snapshot``; we use it directly
# to keep the test synchronous (no async ``enter()`` lifecycle needed —
# validation is the only thing under test).
config = {
"schema_version": 1,
"layers": [
{"name": layer.name, "type": layer.type, "deps": dict(layer.deps), "metadata": dict(layer.metadata)}
for layer in cleanup_request.composition.layers
],
}
compositor = Compositor.from_config(
config,
providers=[
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
],
)
layer_configs = {layer.name: layer.config for layer in cleanup_request.composition.layers}
# This is the call that would raise ``ValueError`` if the cleanup snapshot
# and composition disagreed on layer names — the exact failure mode the
# original ``layers=[]`` cleanup hit.
run = compositor._create_run( # type: ignore[reportPrivateUsage]
configs=cast(dict[str, object], layer_configs),
session_snapshot=cleanup_request.session_snapshot,
)
assert list(run.slots.keys()) == ["workflow_node_job_prompt", "history"]
def test_cleanup_request_with_mismatched_specs_would_be_rejected_by_agenton():
"""Regression sentinel: if a future refactor stops filtering the snapshot,
agenton would reject the request — and that rejection is what the runtime
fix is preventing. We confirm the validator does fail when given the
pre-fix shape so the previous test's success is not a coincidence."""
snapshot_with_extra = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
LayerSessionSnapshot(
name="llm", # extra layer not in composition
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
]
)
compositor = Compositor.from_config(
{
"schema_version": 1,
"layers": [{"name": "history", "type": PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, "deps": {}, "metadata": {}}],
},
providers=[LayerProvider.from_layer_type(PydanticAIHistoryLayer)],
)
with pytest.raises(ValueError, match="layer names must match"):
compositor._create_run( # type: ignore[reportPrivateUsage]
configs={},
session_snapshot=snapshot_with_extra,
)

View File

@ -63,25 +63,3 @@ def test_fake_client_cancel_run_returns_cancelled_status():
assert cancelled.run_id == "fake-run-1"
assert cancelled.status == "cancelled"
def test_fake_client_paused_scenario_returns_paused_status_and_event():
"""The paused scenario exists for HITL-style flows; both ``wait_run`` and
the event stream must report the pause so consumers can branch on it."""
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED)
status = client.wait_run("fake-run-1")
events = list(client.stream_events("fake-run-1"))
assert status.status == "paused"
assert status.error is None
assert events[-1].type == "run_paused"
assert events[-1].data.reason == "human_input_required"
def test_fake_client_success_wait_run_returns_succeeded_status():
"""Covers the default SUCCESS branch of ``wait_run`` directly."""
status = FakeAgentBackendRunClient().wait_run("fake-run-1")
assert status.status == "succeeded"
assert status.error is None

View File

@ -1,23 +1,15 @@
from typing import Any, cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers import ExitIntent
from agenton.layers.base import LifecycleState
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
DifyPluginLLMLayerConfig,
DifyPluginToolConfig,
DifyPluginToolsLayerConfig,
)
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
from dify_agent.protocol import (
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
@ -34,7 +26,6 @@ from clients.agent_backend import (
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
CleanupLayerSpec,
redact_for_agent_backend_log,
)
@ -80,11 +71,10 @@ def test_request_builder_outputs_dify_agent_create_run_request():
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
DIFY_EXECUTION_CONTEXT_LAYER_ID,
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
]
assert request.on_exit.default is ExitIntent.SUSPEND
assert request.on_exit.default is ExitIntent.DELETE
assert request.idempotency_key == "workflow-run-1:node-execution-1"
assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"}
@ -109,10 +99,9 @@ def test_request_builder_sets_model_and_output_layer_contract_ids():
layers = {layer.name: layer for layer in request.composition.layers}
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].type == DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID
assert cast(DifyExecutionContextLayerConfig, layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config).user_id == "user-1"
assert layers[DIFY_AGENT_HISTORY_LAYER_ID].type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config.user_id == "user-1"
assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID
assert cast(DifyPluginLLMLayerConfig, layers[DIFY_AGENT_MODEL_LAYER_ID].config).plugin_id == "langgenius/openai"
assert layers[DIFY_AGENT_MODEL_LAYER_ID].config.plugin_id == "langgenius/openai"
assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
@ -141,92 +130,16 @@ def test_request_builder_adds_dify_plugin_tools_layer_when_configured():
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
tools_config = cast(DifyPluginToolsLayerConfig, layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config)
assert tools_config.tools[0].tool_name == "current_time"
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time"
def test_request_builder_can_delete_on_exit_for_cleanup_paths():
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
run_input = _run_input()
run_input.suspend_on_exit = False
run_input.suspend_on_exit = True
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
assert request.on_exit.default is ExitIntent.DELETE
def test_request_builder_builds_cleanup_request_replays_persisted_layer_specs():
"""The cleanup request must replay the persisted (non-plugin) layer specs
and filter the snapshot to match so the agenton compositor's
snapshot-vs-composition name-order validator passes."""
session_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(name="history", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={"k": 1}),
LayerSessionSnapshot(name="llm", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
]
)
specs = [CleanupLayerSpec(name="history", type="pydantic_ai.history")]
request = AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=session_snapshot,
composition_layer_specs=specs,
idempotency_key="run-1:node-1:binding-1:agent-session-cleanup",
metadata={"workflow_run_id": "run-1"},
)
assert [layer.name for layer in request.composition.layers] == ["history"]
assert request.session_snapshot is not None
assert [layer.name for layer in request.session_snapshot.layers] == ["history"]
assert request.on_exit.default is ExitIntent.DELETE
assert request.idempotency_key == "run-1:node-1:binding-1:agent-session-cleanup"
assert request.metadata["agent_backend_lifecycle"] == "session_cleanup"
def test_request_builder_rejects_empty_composition_layer_specs():
"""Empty specs would put us back in the original ``layers=[]`` trap that
fails on agenton's snapshot-vs-composition validation."""
with pytest.raises(ValueError, match="composition_layer_specs"):
AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=CompositorSessionSnapshot(layers=[]),
composition_layer_specs=[],
)
def test_extract_cleanup_layer_specs_drops_plugin_layers_keeps_configs():
from dify_agent.protocol import RunComposition, RunLayerSpec
from clients.agent_backend import extract_cleanup_layer_specs
composition = RunComposition(
layers=[
RunLayerSpec(
name="agent_soul_prompt",
type="plain.prompt",
config=PromptLayerConfig(prefix="hello"),
),
RunLayerSpec(
name="llm",
type="dify.plugin.llm",
config=None, # protocol allows None; the redacted config is what matters
),
RunLayerSpec(
name="tools",
type="dify.plugin.tools",
),
RunLayerSpec(
name="history",
type="pydantic_ai.history",
),
]
)
specs = extract_cleanup_layer_specs(composition)
assert [spec.name for spec in specs] == ["agent_soul_prompt", "history"]
# Non-plugin configs are dumped as JSON-compatible dicts so the persisted
# row can be replayed without holding live pydantic instances.
soul_config = specs[0].config
assert isinstance(soul_config, dict)
assert soul_config.get("prefix") == "hello"
assert request.on_exit.default is ExitIntent.SUSPEND
def test_request_builder_rejects_blank_prompts():
@ -246,6 +159,6 @@ def test_request_builder_rejects_blank_prompts():
def test_redact_for_agent_backend_log_hides_credentials():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
redacted = cast(dict[str, Any], redact_for_agent_backend_log(request))
redacted = redact_for_agent_backend_log(request)
assert redacted["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"

View File

@ -1,12 +1,9 @@
from types import SimpleNamespace
from typing import cast
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendRunEventAdapter,
AgentBackendStreamInternalEvent,
CleanupLayerSpec,
FakeAgentBackendRunClient,
FakeAgentBackendScenario,
)
@ -16,10 +13,9 @@ from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBu
from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
from graphon.entities import GraphInitParams
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent
from graphon.node_events import StreamCompletedEvent
from graphon.runtime import GraphRuntimeState
from graphon.variables.segments import StringSegment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
@ -88,47 +84,7 @@ class FakeBindingResolver(WorkflowAgentBindingResolver):
return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot)
class FakeSessionStore:
def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None:
self.loaded_snapshot = snapshot
self.saved: list[
tuple[
WorkflowAgentSessionScope,
str,
CompositorSessionSnapshot | None,
list[CleanupLayerSpec],
]
] = []
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
return self.loaded_snapshot
def save_active_snapshot(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
) -> None:
self.saved.append((scope, backend_run_id, snapshot, list(composition_layer_specs)))
def mark_cleaned(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str | None = None,
) -> None:
self.cleaned.append((scope, backend_run_id))
def _node(
*,
scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS,
agent_backend_client: FakeAgentBackendRunClient | None = None,
session_store: FakeSessionStore | None = None,
) -> DifyAgentNode:
def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode:
graph_init_params = GraphInitParams(
workflow_id="workflow-1",
graph_config={"nodes": [], "edges": []},
@ -150,7 +106,6 @@ def _node(
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
return True
client = agent_backend_client or FakeAgentBackendRunClient(scenario=scenario)
return DifyAgentNode(
node_id="agent-node",
data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}),
@ -158,12 +113,11 @@ def _node(
graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())),
binding_resolver=FakeBindingResolver(),
runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()),
agent_backend_client=client,
agent_backend_client=FakeAgentBackendRunClient(scenario=scenario),
event_adapter=AgentBackendRunEventAdapter(),
output_adapter=WorkflowAgentOutputAdapter(),
type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()),
failure_orchestrator=OutputFailureOrchestrator(),
session_store=cast(WorkflowAgentRuntimeSessionStore | None, session_store),
)
@ -178,7 +132,7 @@ def test_agent_node_run_maps_successful_agent_backend_run_to_node_result():
assert agent_log["agent_backend"]["run_id"] == "fake-run-1"
assert agent_log["agent_backend"]["status"] == "succeeded"
assert result.process_data["agent_id"] == "agent-1"
assert result.inputs["agent_backend_request"]["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
@ -191,126 +145,6 @@ def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
assert result.error_type == "unit_test"
def test_agent_node_failed_run_marks_session_cleaned_to_prevent_stale_reuse():
"""A failed agent run must retire the local ACTIVE session row so a workflow
loop back into the same Agent node does not resume from a stale snapshot."""
existing_snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=existing_snapshot)
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
assert len(events) == 1
assert store.cleaned, "failed agent run should mark the session cleaned"
cleaned_scope, cleaned_backend_run_id = store.cleaned[0]
assert cleaned_scope.workflow_run_id == "workflow-run-1"
assert cleaned_backend_run_id == "fake-run-1"
# A failed run does not produce a fresh snapshot to persist.
assert store.saved == []
def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot():
existing_snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=existing_snapshot)
client = FakeAgentBackendRunClient()
node = _node(agent_backend_client=client, session_store=store)
events = list(node._run())
assert len(events) == 1
assert store.saved
scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0]
assert scope.workflow_run_id == "workflow-run-1"
assert backend_run_id == "fake-run-1"
assert saved_snapshot is not None
assert client.request is not None
assert client.request.session_snapshot is existing_snapshot
# Persist enough composition shape to replay a cleanup run; plugin layers
# (which would carry credentials) are intentionally absent.
saved_layer_names = [spec.name for spec in saved_specs]
assert saved_layer_names, "cleanup specs must persist at least the non-plugin layers"
plugin_types = {"dify.plugin.llm", "dify.plugin.tools"}
assert not {spec.type for spec in saved_specs} & plugin_types
def test_agent_node_run_when_session_store_save_raises_records_persist_error_in_metadata():
"""A DB-side write failure must not crash the node; it should set
``session_snapshot_persist_error`` in the agent_backend metadata so the
incident is observable from the workflow_node_executions record."""
class _ExplodingSessionStore(FakeSessionStore):
def save_active_snapshot(self, **kwargs): # type: ignore[override]
del kwargs
raise RuntimeError("simulated DB failure")
store = _ExplodingSessionStore()
events = list(_node(session_store=store)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert agent_backend["session_snapshot_persisted"] is False
assert agent_backend["session_snapshot_persist_error"] == "workflow_agent_runtime_session_store_error"
def test_agent_node_failed_run_when_mark_cleaned_raises_records_cleanup_error_in_metadata():
"""Same defensive pattern: a DB-side mark_cleaned failure must surface as
a ``session_snapshot_cleanup_error`` in metadata, not as a node crash."""
class _ExplodingMarkCleanedStore(FakeSessionStore):
def mark_cleaned(self, **kwargs): # type: ignore[override]
del kwargs
raise RuntimeError("simulated DB failure")
store = _ExplodingMarkCleanedStore()
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.FAILED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert agent_backend["session_snapshot_cleaned_on_failure"] is False
assert agent_backend["session_snapshot_cleanup_error"] == "workflow_agent_runtime_session_store_error"
def test_agent_node_success_run_without_session_store_skips_persistence():
"""When ``session_store`` is None the node still completes successfully —
the lifecycle branch is a no-op and the run result is unaffected."""
events = list(_node(session_store=None)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
# No persistence metadata is attached when the store is missing.
assert "session_snapshot_persisted" not in agent_backend
def test_agent_node_failed_run_without_session_store_skips_mark_cleaned():
"""``session_store=None`` + failed terminal must remain a no-op for
the cleanup branch — the node failure path still surfaces correctly."""
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=None)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.FAILED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert "session_snapshot_cleaned_on_failure" not in agent_backend
def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot():
store = FakeSessionStore()
node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store)
events = list(node._run())
assert len(events) == 1
assert isinstance(events[0], PauseRequestedEvent)
assert store.saved
assert store.saved[0][1] == "fake-run-1"
assert store.saved[0][3], "paused agent run should still persist replayable layer specs"
def test_agent_node_records_stream_usage_metadata():
metadata = {"agent_backend": {"run_id": "run-1"}}

View File

@ -1,14 +1,10 @@
from dataclasses import replace
from typing import cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
from core.workflow.nodes.agent_v2.plugin_tools_builder import WorkflowAgentPluginToolsBuilder
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeBuildContext,
WorkflowAgentRuntimeRequestBuilder,
@ -31,17 +27,6 @@ class FakeCredentialsProvider:
return {"api_key": "secret-key"}
class CapturingCredentialsProvider:
def __init__(self) -> None:
self.provider_name: str | None = None
self.model_name: str | None = None
def fetch(self, provider_name: str, model_name: str) -> dict[str, object]:
self.provider_name = provider_name
self.model_name = model_name
return {"api_key": "secret-key"}
class FakePluginToolsBuilder:
def __init__(self) -> None:
# Capture the runtime invocation source so tests can assert it was
@ -151,31 +136,7 @@ def test_builds_create_run_request_from_agent_soul_and_node_job():
assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output."
assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"]
assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string"
assert DIFY_AGENT_HISTORY_LAYER_ID in layers
assert result.redacted_request["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
def test_normalizes_langgenius_model_provider_for_agent_backend_transport():
context = _context()
context.snapshot.config_snapshot = AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai/openai",
model_provider="langgenius/openai/openai",
model="gpt-test",
),
)
credentials_provider = CapturingCredentialsProvider()
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=credentials_provider).build(context)
dumped = result.request.model_dump(mode="json")
layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]}
model_config = layers[DIFY_AGENT_MODEL_LAYER_ID]["config"]
assert credentials_provider.provider_name == "langgenius/openai/openai"
assert credentials_provider.model_name == "gpt-test"
assert model_config["plugin_id"] == "langgenius/openai"
assert model_config["model_provider"] == "openai"
assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata():
@ -226,7 +187,7 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada
assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string"
assert output_schema["properties"]["confidence"]["type"] == "number"
assert output_schema["required"] == ["report"]
assert dumped["composition"]["layers"][5]["config"]["model_settings"] == {"temperature": 0.2}
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid"
assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed"
warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"]
@ -263,7 +224,7 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
plugin_tools_builder = FakePluginToolsBuilder()
result = WorkflowAgentRuntimeRequestBuilder(
credentials_provider=FakeCredentialsProvider(),
plugin_tools_builder=cast(WorkflowAgentPluginToolsBuilder, plugin_tools_builder),
plugin_tools_builder=plugin_tools_builder,
).build(context)
dumped = result.request.model_dump(mode="json")
@ -283,15 +244,6 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from
def test_build_passes_saved_session_snapshot_to_agent_backend_request():
session_snapshot = CompositorSessionSnapshot(layers=[])
context = replace(_context(), session_snapshot=session_snapshot)
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
assert result.request.session_snapshot is session_snapshot
def test_requires_agent_soul_model_config():
context = _context()
snapshot = AgentConfigSnapshot(

View File

@ -1,412 +0,0 @@
from datetime import UTC
from typing import cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from dify_agent.protocol import CancelRunRequest, RunEvent, RunStatusResponse
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec, FakeAgentBackendRunClient
from clients.agent_backend.errors import AgentBackendHTTPError
from core.workflow.nodes.agent_v2.session_cleanup_layer import WorkflowAgentSessionCleanupLayer
from core.workflow.nodes.agent_v2.session_store import (
StoredWorkflowAgentSession,
WorkflowAgentRuntimeSessionStore,
WorkflowAgentSessionScope,
)
from core.workflow.system_variables import build_system_variables
from graphon.entities.pause_reason import SchedulingPause
from graphon.graph_engine.command_channels import CommandChannel
from graphon.graph_events import (
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunPausedEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
def _layer_snapshot(name: str) -> LayerSessionSnapshot:
return LayerSessionSnapshot(
name=name,
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
)
def _stored_session(scope: WorkflowAgentSessionScope, *, index: int = 1) -> StoredWorkflowAgentSession:
"""A typical stored session with prompt + execution_context + history + llm specs.
The LLM layer is *not* in ``composition_layer_specs`` because the cleanup
contract excludes credential-bearing plugin layers, but it *is* present in
the saved snapshot so the layer's filter logic gets exercised.
"""
return StoredWorkflowAgentSession(
scope=scope,
session_snapshot=CompositorSessionSnapshot(
layers=[
_layer_snapshot("workflow_node_job_prompt"),
_layer_snapshot("execution_context"),
_layer_snapshot("history"),
_layer_snapshot("llm"),
]
),
backend_run_id=f"agent-run-{index}",
composition_layer_specs=[
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
CleanupLayerSpec(name="execution_context", type="dify.execution_context", config={"tenant_id": "t"}),
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
],
)
class FakeSessionStore:
"""In-memory stand-in for ``WorkflowAgentRuntimeSessionStore``."""
def __init__(self, *, stored: list[StoredWorkflowAgentSession] | None = None) -> None:
self._stored = stored if stored is not None else [_stored_session(_default_scope())]
self.list_calls: list[str] = []
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
self.list_calls.append(workflow_run_id)
return list(self._stored)
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
self.cleaned.append((scope, backend_run_id))
def _default_scope() -> WorkflowAgentSessionScope:
return WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="agent-node",
node_execution_id="node-exec-1",
binding_id="binding-1",
agent_id="agent-1",
agent_config_snapshot_id="snapshot-1",
)
class _WaitableFakeAgentBackendRunClient(FakeAgentBackendRunClient):
"""``FakeAgentBackendRunClient`` plus the ``wait_run`` hook the layer needs."""
def __init__(
self,
*,
run_id: str = "cleanup-run-1",
wait_status: str = "succeeded",
wait_error: str | None = None,
wait_raises: Exception | None = None,
) -> None:
super().__init__(run_id=run_id)
self._wait_status = wait_status
self._wait_error = wait_error
self._wait_raises = wait_raises
self.wait_calls: list[tuple[str, float | None]] = []
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
self.wait_calls.append((run_id, timeout_seconds))
if self._wait_raises is not None:
raise self._wait_raises
from datetime import datetime
return RunStatusResponse(
run_id=run_id,
status=cast(object, self._wait_status), # protocol Literal; cast keeps tests flexible
created_at=datetime(2026, 1, 1, tzinfo=UTC),
updated_at=datetime(2026, 1, 1, tzinfo=UTC),
error=self._wait_error,
)
# Inherit ``create_run`` from FakeAgentBackendRunClient; the missing protocol
# methods below are stub-only because the cleanup layer never calls them.
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None): # pragma: no cover
del run_id, request
raise NotImplementedError
def stream_events(self, run_id: str, *, after: str | None = None): # pragma: no cover
del run_id, after
if False:
yield cast(RunEvent, None)
def _build_layer(
*,
session_store: FakeSessionStore,
agent_backend_client: _WaitableFakeAgentBackendRunClient,
http_cleanup_supported: bool = True,
) -> WorkflowAgentSessionCleanupLayer:
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)
# Tests opt in to the future HTTP-cleanup branch; the production default
# (False) is exercised by the dedicated tests below.
layer._HTTP_CLEANUP_SUPPORTED = http_cleanup_supported # type: ignore[reportPrivateUsage]
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
return layer
@pytest.mark.parametrize(
"terminal_event",
[
GraphRunSucceededEvent(outputs={}),
GraphRunPartialSucceededEvent(exceptions_count=1, outputs={}),
GraphRunFailedEvent(error="boom"),
GraphRunAbortedEvent(reason="user cancelled", outputs={}),
],
ids=["succeeded", "partial_succeeded", "failed", "aborted"],
)
def test_cleanup_layer_triggers_cleanup_only_run_on_each_terminal_event(terminal_event):
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(terminal_event)
assert session_store.list_calls == ["workflow-run-1"]
assert agent_backend_client.request is not None
# Cleanup composition replays the persisted (non-plugin) layer specs so the
# agent backend's snapshot-vs-composition name match succeeds.
layer_names = [layer.name for layer in agent_backend_client.request.composition.layers]
assert layer_names == ["workflow_node_job_prompt", "execution_context", "history"]
assert agent_backend_client.request.on_exit.default.value == "delete"
assert agent_backend_client.request.metadata["agent_backend_lifecycle"] == "session_cleanup"
# Snapshot is filtered to drop the plugin layer entry so names match the
# cleanup composition.
assert agent_backend_client.request.session_snapshot is not None
snapshot_names = [layer.name for layer in agent_backend_client.request.session_snapshot.layers]
assert snapshot_names == ["workflow_node_job_prompt", "execution_context", "history"]
# The layer waited for terminal status and the run succeeded, so the row
# is marked CLEANED with the cleanup run id.
assert agent_backend_client.wait_calls
assert session_store.cleaned == [(_default_scope(), "cleanup-run-1")]
@pytest.mark.parametrize(
"non_terminal_event",
[
GraphRunStartedEvent(),
GraphRunPausedEvent(reasons=[SchedulingPause(message="awaiting human input")], outputs={}),
],
ids=["started", "paused"],
)
def test_cleanup_layer_ignores_non_terminal_events(non_terminal_event):
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(non_terminal_event)
assert session_store.list_calls == []
assert agent_backend_client.request is None
assert session_store.cleaned == []
def test_cleanup_layer_does_not_mark_cleaned_when_cleanup_run_fails():
"""Trap D: cleanup-only run goes ``run_failed`` (e.g. snapshot validation
error) — the layer must leave the row ACTIVE so it can be retried instead
of silently leaking suspended agent-backend layers."""
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient(
wait_status="failed",
wait_error="snapshot mismatch",
)
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert agent_backend_client.wait_calls
assert session_store.cleaned == []
def test_cleanup_layer_does_not_mark_cleaned_when_wait_raises():
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient(
wait_raises=AgentBackendHTTPError("boom", status_code=500, detail=None),
)
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.cleaned == []
def test_cleanup_layer_marks_cleaned_locally_when_http_cleanup_disabled():
"""Production default: dify-agent has no cleanup-only run mode yet, so the
layer must retire the local row without issuing a doomed HTTP request that
would crash inside the agent backend's runner on the missing LLM layer."""
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(
session_store=session_store,
agent_backend_client=agent_backend_client,
http_cleanup_supported=False,
)
layer.on_event(GraphRunSucceededEvent(outputs={}))
# No HTTP call goes out — the trap is avoided entirely.
assert agent_backend_client.request is None
assert agent_backend_client.wait_calls == []
# Local row is still retired so a workflow loop cannot resume from stale state.
assert session_store.cleaned == [(_default_scope(), "agent-run-1")]
def test_cleanup_layer_skips_sessions_without_persisted_specs():
"""Backwards-compatible safety net: a row written before A.1 landed has
no composition_layer_specs, so cleanup would unavoidably hit the snapshot-
validation trap. The layer must skip such rows instead of issuing a
doomed request."""
scope = _default_scope()
legacy_session = StoredWorkflowAgentSession(
scope=scope,
session_snapshot=CompositorSessionSnapshot(layers=[_layer_snapshot("history")]),
backend_run_id="legacy-run",
composition_layer_specs=[],
)
session_store = FakeSessionStore(stored=[legacy_session])
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert agent_backend_client.request is None
assert session_store.cleaned == []
def test_cleanup_layer_fans_out_to_every_active_session():
scopes = [
WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id=f"agent-node-{i}",
node_execution_id=f"node-exec-{i}",
binding_id=f"binding-{i}",
agent_id=f"agent-{i}",
agent_config_snapshot_id=f"snapshot-{i}",
)
for i in range(3)
]
session_store = FakeSessionStore(stored=[_stored_session(scope, index=i) for i, scope in enumerate(scopes, 1)])
agent_backend_client = _WaitableFakeAgentBackendRunClient(run_id="cleanup-run-many")
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
# One cleanup row per stored ACTIVE session, all marked cleaned with the
# backend run id returned by the agent backend client.
assert [entry[0] for entry in session_store.cleaned] == scopes
assert {entry[1] for entry in session_store.cleaned} == {"cleanup-run-many"}
def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog):
"""The HTTP cleanup branch must defensively skip when no client was wired.
This is the deployment-misconfig path: ``_HTTP_CLEANUP_SUPPORTED`` was
flipped to ``True`` but ``AGENT_BACKEND_BASE_URL`` is unset, so the
factory returned ``None``. The layer must not crash and must not silently
retire the row — the warning surfaces the misconfig.
"""
import logging
session_store = FakeSessionStore()
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=None,
)
layer._HTTP_CLEANUP_SUPPORTED = True # type: ignore[reportPrivateUsage]
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
with caplog.at_level(logging.WARNING):
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.cleaned == []
assert any("no agent backend client is wired in" in record.message for record in caplog.records)
def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog):
"""``workflow_run_id`` is the keying field; without it the fanout cannot
target a row, so the layer logs a warning and bails."""
import logging
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)
# Bootstrap *without* a workflow_execution_id system variable.
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id=""),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
with caplog.at_level(logging.WARNING):
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.list_calls == []
assert session_store.cleaned == []
assert any("workflow_run_id is missing" in record.message for record in caplog.records)
def test_build_workflow_agent_session_cleanup_layer_returns_layer_without_client_when_unconfigured(
monkeypatch,
):
"""The production builder must pass ``None`` for the agent backend client
when neither AGENT_BACKEND_BASE_URL nor AGENT_BACKEND_USE_FAKE is set, so
that unit-test environments without backend config don't crash at runner
construction."""
from configs import dify_config
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
build_workflow_agent_session_cleanup_layer,
)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", False, raising=False)
layer = build_workflow_agent_session_cleanup_layer()
assert layer._agent_backend_client is None # type: ignore[reportPrivateUsage]
def test_build_workflow_agent_session_cleanup_layer_returns_layer_with_fake_client(monkeypatch):
"""With ``AGENT_BACKEND_USE_FAKE`` enabled the helper wires in the
deterministic fake client without needing a base_url."""
from clients.agent_backend.fake_client import FakeAgentBackendRunClient
from configs import dify_config
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
build_workflow_agent_session_cleanup_layer,
)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", True, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_FAKE_SCENARIO", "success", raising=False)
layer = build_workflow_agent_session_cleanup_layer()
assert isinstance(layer._agent_backend_client, FakeAgentBackendRunClient) # type: ignore[reportPrivateUsage]

View File

@ -1,286 +0,0 @@
"""Unit tests for :mod:`core.workflow.nodes.agent_v2.session_store`.
Uses the in-memory SQLite engine configured by the project conftest plus a
per-test ``CREATE TABLE`` so the real ORM round-trip exercises every store
method. Keeps the suite self-contained — no Postgres / Docker required — while
still hitting the actual ``session_factory`` code path that production uses.
"""
from __future__ import annotations
from collections.abc import Generator
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from sqlalchemy import delete
from clients.agent_backend.request_builder import CleanupLayerSpec
from core.db.session_factory import session_factory
from core.workflow.nodes.agent_v2.session_store import (
StoredWorkflowAgentSession,
WorkflowAgentRuntimeSessionStore,
WorkflowAgentSessionScope,
)
from models.agent import WorkflowAgentRuntimeSession, WorkflowAgentRuntimeSessionStatus
def _scope(workflow_run_id: str | None = "wfr-1", binding_id: str = "binding-1") -> WorkflowAgentSessionScope:
return WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id=workflow_run_id,
node_id="agent-node",
node_execution_id="node-exec-1",
binding_id=binding_id,
agent_id="agent-1",
agent_config_snapshot_id="snapshot-1",
)
def _snapshot(messages: int = 1) -> CompositorSessionSnapshot:
return CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]},
)
]
)
def _specs() -> list[CleanupLayerSpec]:
return [
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
]
@pytest.fixture(autouse=True)
def _create_table() -> Generator[None, None, None]:
"""Create the lifecycle table on the in-memory SQLite engine, drop after."""
engine = session_factory.get_session_maker().kw["bind"]
WorkflowAgentRuntimeSession.__table__.create(bind=engine, checkfirst=True)
yield
with session_factory.create_session() as session:
session.execute(delete(WorkflowAgentRuntimeSession))
session.commit()
WorkflowAgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True)
def test_load_active_snapshot_returns_none_when_scope_has_no_workflow_run_id():
"""``workflow_run_id`` is the keying column; no row can match without it."""
store = WorkflowAgentRuntimeSessionStore()
assert store.load_active_snapshot(_scope(workflow_run_id=None)) is None
def test_load_active_snapshot_returns_none_when_no_row_matches():
store = WorkflowAgentRuntimeSessionStore()
assert store.load_active_snapshot(_scope()) is None
def test_save_active_snapshot_creates_row_and_load_round_trips():
store = WorkflowAgentRuntimeSessionStore()
snapshot = _snapshot(messages=2)
store.save_active_snapshot(
scope=_scope(), backend_run_id="run-1", snapshot=snapshot, composition_layer_specs=_specs()
)
loaded = store.load_active_snapshot(_scope())
assert loaded is not None
assert len(loaded.layers) == 1
assert loaded.layers[0].name == "history"
assert loaded.layers[0].runtime_state["messages"] == snapshot.layers[0].runtime_state["messages"]
def test_save_active_snapshot_skips_when_workflow_run_id_missing():
"""Without a workflow_run_id the row cannot be keyed; save is a no-op."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(workflow_run_id=None),
backend_run_id="run-skipped",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_save_active_snapshot_skips_when_snapshot_missing():
"""A run that produced no snapshot (e.g. failed agent run) does not write."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-empty",
snapshot=None,
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_save_active_snapshot_updates_existing_row_on_re_entry():
"""A second save under the same scope must update in place, not insert."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(messages=1),
composition_layer_specs=_specs(),
)
# Second call with new snapshot + backend_run_id.
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-2",
snapshot=_snapshot(messages=2),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
rows = session.query(WorkflowAgentRuntimeSession).all()
assert len(rows) == 1
assert rows[0].backend_run_id == "run-2"
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
assert rows[0].cleaned_at is None
def test_save_active_snapshot_resurrects_cleaned_row():
"""If a prior cleanup retired the row, a re-entry flips it back to ACTIVE."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
# Save again — the existing row was CLEANED; should be revived.
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-2",
snapshot=_snapshot(messages=3),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
rows = session.query(WorkflowAgentRuntimeSession).all()
assert len(rows) == 1
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
assert rows[0].cleaned_at is None
assert rows[0].backend_run_id == "run-2"
def test_list_active_sessions_returns_specs_and_snapshot():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(binding_id="binding-A"),
backend_run_id="run-A",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.save_active_snapshot(
scope=_scope(binding_id="binding-B"),
backend_run_id="run-B",
snapshot=_snapshot(messages=2),
composition_layer_specs=_specs(),
)
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert {s.backend_run_id for s in listed} == {"run-A", "run-B"}
by_run = {s.backend_run_id: s for s in listed}
assert isinstance(by_run["run-A"], StoredWorkflowAgentSession)
# Specs round-trip through pydantic TypeAdapter — ensure deserialize works.
assert by_run["run-A"].composition_layer_specs[0].name == "workflow_node_job_prompt"
assert by_run["run-A"].composition_layer_specs[1].type == "pydantic_ai.history"
# node_execution_id default-replaces NULL with "" when the DB column is None.
assert by_run["run-A"].scope.node_execution_id == "node-exec-1"
def test_list_active_sessions_skips_cleaned_rows():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(binding_id="binding-A"),
backend_run_id="run-A",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.save_active_snapshot(
scope=_scope(binding_id="binding-B"),
backend_run_id="run-B",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(binding_id="binding-A"), backend_run_id="cleanup-A")
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert {s.backend_run_id for s in listed} == {"run-B"}
def test_list_active_sessions_handles_legacy_rows_without_specs():
"""Rows persisted before composition_layer_specs landed have an empty string."""
# Insert a legacy-shape row directly: empty specs payload simulates a row
# written before the spec persistence feature landed in A.1.
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-legacy",
snapshot=_snapshot(),
composition_layer_specs=[],
)
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert len(listed) == 1
assert listed[0].composition_layer_specs == []
def test_mark_cleaned_sets_status_and_cleaned_at_with_backend_run_id():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
with session_factory.create_session() as session:
row = session.query(WorkflowAgentRuntimeSession).one()
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
assert row.cleaned_at is not None
assert row.backend_run_id == "cleanup-1"
def test_mark_cleaned_preserves_existing_backend_run_id_when_none_given():
"""``backend_run_id=None`` means "leave the previous one in place"."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id=None)
with session_factory.create_session() as session:
row = session.query(WorkflowAgentRuntimeSession).one()
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
assert row.backend_run_id == "run-1"
def test_mark_cleaned_is_a_noop_when_no_active_row():
"""No matching ACTIVE row → no-op (already-cleaned rows are not re-touched)."""
store = WorkflowAgentRuntimeSessionStore()
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_mark_cleaned_is_a_noop_when_workflow_run_id_missing():
"""Without a workflow_run_id we cannot key the row; ignore the call."""
store = WorkflowAgentRuntimeSessionStore()
store.mark_cleaned(scope=_scope(workflow_run_id=None), backend_run_id="cleanup-1")
# Sanity — no rows created or touched.
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0

View File

@ -6,8 +6,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { startMock } from '../../test/fixtures/dify-mock/server.js'
import { loadAppInfoCache } from '../cache/app-info.js'
import { createClient } from '../http/client.js'
import { CACHE_APP_INFO, cachePath } from '../store/manager.js'
import { YamlStore } from '../store/store.js'
import { ENV_CACHE_DIR } from '../store/dir.js'
import { CACHE_APP_INFO, getCache } from '../store/manager.js'
import { FieldInfo, FieldParameters } from '../types/app-meta.js'
import { AppMetaClient } from './app-meta.js'
import { AppsClient } from './apps.js'
@ -15,17 +15,24 @@ import { AppsClient } from './apps.js'
describe('AppMetaClient', () => {
let mock: DifyMock
let dir: string
let prevCacheDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
dir = await mkdtemp(join(tmpdir(), 'difyctl-meta-'))
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await mock.stop()
await rm(dir, { recursive: true, force: true })
})
it('cache miss → fetch → populate; warm hit skips network', async () => {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const spy = vi.spyOn(apps, 'describe')
const client = new AppMetaClient({ apps, host: mock.url, cache })
@ -40,7 +47,7 @@ describe('AppMetaClient', () => {
})
it('slim hit + full request triggers fresh fetch + merges', async () => {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const spy = vi.spyOn(apps, 'describe')
const client = new AppMetaClient({ apps, host: mock.url, cache })
@ -54,7 +61,7 @@ describe('AppMetaClient', () => {
})
it('expired cache entry refetches', async () => {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)), ttlMs: 100, now: () => new Date('2026-05-09T00:00:00Z') })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO), ttlMs: 100, now: () => new Date('2026-05-09T00:00:00Z') })
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const spy = vi.spyOn(apps, 'describe')
const client = new AppMetaClient({ apps, host: mock.url, cache, now: () => new Date('2026-05-09T00:00:00Z') })
@ -68,7 +75,7 @@ describe('AppMetaClient', () => {
})
it('invalidate forces next get to fetch', async () => {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const apps = new AppsClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const spy = vi.spyOn(apps, 'describe')
const client = new AppMetaClient({ apps, host: mock.url, cache })

View File

@ -1,101 +0,0 @@
import { mkdtemp, rm, stat, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { FILE_PERM } from '../store/dir.js'
import { FileBackend, TOKENS_FILE_NAME } from './file-backend.js'
describe('FileBackend', () => {
let dir: string
let backend: FileBackend
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-tokens-'))
backend = new FileBackend(dir)
})
afterEach(async () => {
await rm(dir, { recursive: true, force: true })
})
it('returns undefined when file is missing', async () => {
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
it('returns empty list when file is missing', async () => {
expect(await backend.list('cloud.dify.ai')).toEqual([])
})
it('round-trips put/get for a single token', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_abc')
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_abc')
})
it('list returns accountIds for the given host', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
await backend.put('cloud.dify.ai', 'acct-2', 'dfoa_b')
await backend.put('self.example.com', 'acct-3', 'dfoa_c')
const ids = await backend.list('cloud.dify.ai')
expect([...ids].sort()).toEqual(['acct-1', 'acct-2'])
})
it('list returns empty array for unknown host', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
expect(await backend.list('other.example.com')).toEqual([])
})
it('delete removes the entry', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
await backend.delete('cloud.dify.ai', 'acct-1')
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
it('delete is a no-op for missing entries', async () => {
await expect(backend.delete('cloud.dify.ai', 'missing')).resolves.toBeUndefined()
})
it('delete prunes empty host entries', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
await backend.delete('cloud.dify.ai', 'acct-1')
expect(await backend.list('cloud.dify.ai')).toEqual([])
})
it('overwrites existing token for same host+accountId', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_old')
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_new')
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_new')
})
it('writes file with mode 0600', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
const info = await stat(join(dir, TOKENS_FILE_NAME))
expect(info.mode & 0o777).toBe(FILE_PERM)
})
it('rewrites existing file with mode 0600 even if previously permissive', async () => {
const path = join(dir, TOKENS_FILE_NAME)
await writeFile(path, 'hosts: {}\n', { mode: 0o644 })
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
const info = await stat(path)
expect(info.mode & 0o777).toBe(FILE_PERM)
})
it('writes valid YAML readable by a fresh backend', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
const fresh = new FileBackend(dir)
expect(await fresh.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_a')
})
it('persists multiple hosts simultaneously', async () => {
await backend.put('cloud.dify.ai', 'acct-1', 'dfoa_a')
await backend.put('self.example.com', 'acct-2', 'dfoa_b')
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_a')
expect(await backend.get('self.example.com', 'acct-2')).toBe('dfoa_b')
})
it('treats malformed YAML as empty', async () => {
const path = join(dir, TOKENS_FILE_NAME)
await writeFile(path, 'not: valid: yaml: [\n', { mode: FILE_PERM })
expect(await backend.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
})

View File

@ -1,99 +0,0 @@
import type { TokenStore } from './store.js'
import { mkdir, readFile, rename, stat, unlink, writeFile } from 'node:fs/promises'
import { join } from 'node:path'
import yaml from 'js-yaml'
import { DIR_PERM, FILE_PERM } from '../store/dir.js'
export const TOKENS_FILE_NAME = 'tokens.yml'
type AccountMap = Record<string, string>
type HostMap = Record<string, AccountMap>
type TokensFile = { hosts?: HostMap }
export class FileBackend implements TokenStore {
private readonly dir: string
private readonly path: string
constructor(dir: string) {
this.dir = dir
this.path = join(dir, TOKENS_FILE_NAME)
}
async put(host: string, accountId: string, token: string): Promise<void> {
const file = await this.read()
const hosts = file.hosts ?? {}
const accounts = hosts[host] ?? {}
accounts[accountId] = token
hosts[host] = accounts
await this.write({ hosts })
}
async get(host: string, accountId: string): Promise<string | undefined> {
const file = await this.read()
return file.hosts?.[host]?.[accountId]
}
async delete(host: string, accountId: string): Promise<void> {
const file = await this.read()
const accounts = file.hosts?.[host]
if (accounts === undefined || !(accountId in accounts))
return
delete accounts[accountId]
if (Object.keys(accounts).length === 0 && file.hosts !== undefined)
delete file.hosts[host]
await this.write(file)
}
async list(host: string): Promise<readonly string[]> {
const file = await this.read()
const accounts = file.hosts?.[host]
return accounts === undefined ? [] : Object.keys(accounts)
}
private async read(): Promise<TokensFile> {
let raw: string
try {
raw = await readFile(this.path, 'utf8')
}
catch (err) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT')
return {}
throw err
}
let parsed: unknown
try {
parsed = yaml.load(raw)
}
catch {
return {}
}
if (parsed === null || typeof parsed !== 'object')
return {}
return parsed as TokensFile
}
private async write(file: TokensFile): Promise<void> {
await mkdir(this.dir, { recursive: true, mode: DIR_PERM })
const body = yaml.dump(file, { lineWidth: -1, noRefs: true })
const tmp = `${this.path}.tmp.${process.pid}.${Date.now()}`
try {
await writeFile(tmp, body, { mode: FILE_PERM })
await rename(tmp, this.path)
}
catch (err) {
try {
await unlink(tmp)
}
catch { /* tmp may not exist */ }
throw err
}
try {
const info = await stat(this.path)
if ((info.mode & 0o777) !== FILE_PERM) {
const { chmod } = await import('node:fs/promises')
await chmod(this.path, FILE_PERM)
}
}
catch { /* best-effort permission tighten */ }
}
}

View File

@ -1,9 +1,9 @@
import { mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { FILE_PERM } from '../store/dir.js'
import { HOSTS_FILE_NAME, HostsBundleSchema, loadHosts, saveHosts } from './hosts.js'
import { ENV_CONFIG_DIR } from '../store/dir.js'
import { HostsBundleSchema, loadHosts, saveHosts } from './hosts.js'
describe('HostsBundleSchema', () => {
it('parses a minimal logged-out bundle', () => {
@ -46,86 +46,86 @@ describe('HostsBundleSchema', () => {
})
expect(parsed.available_workspaces).toHaveLength(2)
})
it('drops unknown top-level fields on parse', () => {
const parsed = HostsBundleSchema.parse({
current_host: 'cloud.dify.ai',
future_field: 42,
token_storage: 'file',
})
expect(parsed.current_host).toBe('cloud.dify.ai')
expect((parsed as Record<string, unknown>).future_field).toBeUndefined()
})
})
describe('loadHosts/saveHosts', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-hosts-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('returns undefined when file is missing', async () => {
expect(await loadHosts(dir)).toBeUndefined()
it('returns undefined when nothing was saved', () => {
expect(loadHosts()).toBeUndefined()
})
it('round-trips bundle through YAML', async () => {
await saveHosts(dir, {
it('round-trips a fully-populated bundle', () => {
saveHosts({
current_host: 'cloud.dify.ai',
scheme: 'https',
account: { id: 'acct-1', email: 'a@b.c', name: 'A' },
workspace: { id: 'ws-1', name: 'My Space', role: 'owner' },
available_workspaces: [
{ id: 'ws-1', name: 'My Space', role: 'owner' },
{ id: 'ws-2', name: 'Other', role: 'normal' },
],
token_storage: 'keychain',
token_id: 'tok_xyz',
})
const loaded = await loadHosts(dir)
const loaded = loadHosts()
expect(loaded?.current_host).toBe('cloud.dify.ai')
expect(loaded?.scheme).toBe('https')
expect(loaded?.account?.email).toBe('a@b.c')
expect(loaded?.workspace?.id).toBe('ws-1')
expect(loaded?.available_workspaces).toHaveLength(2)
expect(loaded?.token_storage).toBe('keychain')
expect(loaded?.token_id).toBe('tok_xyz')
})
it('round-trips a file-mode bundle with bearer token', () => {
saveHosts({
current_host: 'self.example.com',
token_storage: 'file',
tokens: { bearer: 'dfoa_test' },
})
const loaded = loadHosts()
expect(loaded?.tokens?.bearer).toBe('dfoa_test')
expect(loaded?.token_storage).toBe('file')
})
it('overwrites previous bundle on save', () => {
saveHosts({ current_host: 'old.example.com', token_storage: 'file' })
saveHosts({ current_host: 'new.example.com', token_storage: 'keychain' })
const loaded = loadHosts()
expect(loaded?.current_host).toBe('new.example.com')
expect(loaded?.token_storage).toBe('keychain')
})
it('writes file with mode 0600', async () => {
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
const info = await stat(join(dir, HOSTS_FILE_NAME))
expect(info.mode & 0o777).toBe(FILE_PERM)
})
it('rewrites permissive existing file with mode 0600', async () => {
const path = join(dir, HOSTS_FILE_NAME)
await writeFile(path, 'current_host: ""\ntoken_storage: file\n', { mode: 0o644 })
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
const info = await stat(path)
expect(info.mode & 0o777).toBe(FILE_PERM)
})
it('atomic write: temp file does not survive on success', async () => {
await saveHosts(dir, { current_host: 'cloud.dify.ai', token_storage: 'file' })
const { readdir } = await import('node:fs/promises')
const entries = await readdir(dir)
expect(entries.filter(n => n.includes('.tmp.'))).toHaveLength(0)
})
it('drops unknown top-level fields', async () => {
const path = join(dir, HOSTS_FILE_NAME)
await writeFile(path, 'current_host: cloud.dify.ai\nfuture_field: 42\ntoken_storage: file\n', { mode: FILE_PERM })
const loaded = await loadHosts(dir)
expect(loaded?.current_host).toBe('cloud.dify.ai')
expect((loaded as Record<string, unknown> | undefined)?.future_field).toBeUndefined()
})
it('throws on malformed YAML', async () => {
const path = join(dir, HOSTS_FILE_NAME)
await writeFile(path, ': : :\n', { mode: FILE_PERM })
await expect(loadHosts(dir)).rejects.toThrow()
})
it('throws when YAML contradicts schema', async () => {
const path = join(dir, HOSTS_FILE_NAME)
await writeFile(path, 'token_storage: cloud\n', { mode: FILE_PERM })
await expect(loadHosts(dir)).rejects.toThrow()
})
it('produces YAML with stable keys', async () => {
await saveHosts(dir, {
it('rejects invalid input at save time', () => {
expect(() => saveHosts({
current_host: 'cloud.dify.ai',
token_storage: 'file',
tokens: { bearer: 'dfoa_x' },
})
const raw = await readFile(join(dir, HOSTS_FILE_NAME), 'utf8')
expect(raw).toContain('current_host: cloud.dify.ai')
expect(raw).toContain('bearer: dfoa_x')
token_storage: 'cloud',
} as never)).toThrow()
})
})

View File

@ -1,10 +1,5 @@
import { mkdir, readFile, rename, unlink, writeFile } from 'node:fs/promises'
import { join } from 'node:path'
import yaml from 'js-yaml'
import { z } from 'zod'
import { DIR_PERM, FILE_PERM } from '../store/dir.js'
export const HOSTS_FILE_NAME = 'hosts.yml'
import { getHostStore } from '../store/manager.js'
const StorageModeSchema = z.enum(['keychain', 'file'])
export type StorageMode = z.infer<typeof StorageModeSchema>
@ -48,53 +43,14 @@ export const HostsBundleSchema = z.object({
})
export type HostsBundle = z.infer<typeof HostsBundleSchema>
export async function loadHosts(dir: string): Promise<HostsBundle | undefined> {
const path = join(dir, HOSTS_FILE_NAME)
let raw: string
try {
raw = await readFile(path, 'utf8')
}
catch (err) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT')
return undefined
throw err
}
const parsed = yaml.load(raw)
return HostsBundleSchema.parse(parsed ?? {})
export function loadHosts(): HostsBundle | undefined {
const raw = getHostStore().getTyped<Record<string, unknown>>()
if (raw === null)
return undefined
return HostsBundleSchema.parse(raw)
}
export async function saveHosts(dir: string, bundle: HostsBundle): Promise<void> {
await mkdir(dir, { recursive: true, mode: DIR_PERM })
export function saveHosts(bundle: HostsBundle): void {
const validated = HostsBundleSchema.parse(bundle)
const body = yaml.dump(stripUndefined(validated), { lineWidth: -1, noRefs: true, sortKeys: false })
const target = join(dir, HOSTS_FILE_NAME)
const tmp = `${target}.tmp.${process.pid}.${Date.now()}`
try {
await writeFile(tmp, body, { mode: FILE_PERM })
await rename(tmp, target)
}
catch (err) {
try {
await unlink(tmp)
}
catch { /* tmp may not exist */ }
throw err
}
const { chmod, stat } = await import('node:fs/promises')
try {
const info = await stat(target)
if ((info.mode & 0o777) !== FILE_PERM)
await chmod(target, FILE_PERM)
}
catch { /* best-effort */ }
}
function stripUndefined<T extends Record<string, unknown>>(input: T): Record<string, unknown> {
const out: Record<string, unknown> = {}
for (const [k, v] of Object.entries(input)) {
if (v === undefined)
continue
out[k] = v
}
return out
getHostStore().setTyped(validated)
}

View File

@ -1,111 +0,0 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
const passwords = new Map<string, string>()
const setPassword = vi.fn()
const getPassword = vi.fn()
const deletePassword = vi.fn()
class FakeAsyncEntry {
private readonly key: string
constructor(service: string, username: string) {
this.key = `${service}::${username}`
}
async setPassword(value: string): Promise<void> {
setPassword(this.key, value)
passwords.set(this.key, value)
}
async getPassword(): Promise<string | undefined> {
getPassword(this.key)
return passwords.get(this.key)
}
async deletePassword(): Promise<boolean> {
deletePassword(this.key)
if (!passwords.has(this.key))
return false
passwords.delete(this.key)
return true
}
}
vi.mock('@napi-rs/keyring', () => ({
AsyncEntry: FakeAsyncEntry,
}))
const { KEYRING_SERVICE, KeyringBackend } = await import('./keyring-backend.js')
beforeEach(() => {
passwords.clear()
setPassword.mockClear()
getPassword.mockClear()
deletePassword.mockClear()
})
describe('KeyringBackend', () => {
it('uses service name "difyctl"', () => {
expect(KEYRING_SERVICE).toBe('difyctl')
})
it('returns undefined when no password is stored', async () => {
const k = new KeyringBackend()
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
it('round-trips put/get', async () => {
const k = new KeyringBackend()
await k.put('cloud.dify.ai', 'acct-1', 'dfoa_x')
expect(await k.get('cloud.dify.ai', 'acct-1')).toBe('dfoa_x')
})
it('keys by host::accountId', async () => {
const k = new KeyringBackend()
await k.put('cloud.dify.ai', 'acct-1', 'A')
await k.put('cloud.dify.ai', 'acct-2', 'B')
expect(await k.get('cloud.dify.ai', 'acct-1')).toBe('A')
expect(await k.get('cloud.dify.ai', 'acct-2')).toBe('B')
})
it('delete removes the entry', async () => {
const k = new KeyringBackend()
await k.put('cloud.dify.ai', 'acct-1', 'A')
await k.delete('cloud.dify.ai', 'acct-1')
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
it('delete is a no-op for missing entries', async () => {
const k = new KeyringBackend()
await expect(k.delete('cloud.dify.ai', 'gone')).resolves.toBeUndefined()
})
it('list returns empty array (keyring does not enumerate)', async () => {
const k = new KeyringBackend()
await k.put('cloud.dify.ai', 'acct-1', 'A')
expect(await k.list('cloud.dify.ai')).toEqual([])
})
it('swallows getPassword exceptions and returns undefined', async () => {
const k = new KeyringBackend()
getPassword.mockImplementationOnce(() => {
throw new Error('NoEntry')
})
expect(await k.get('cloud.dify.ai', 'acct-1')).toBeUndefined()
})
it('swallows delete exceptions', async () => {
const k = new KeyringBackend()
deletePassword.mockImplementationOnce(() => {
throw new Error('NoEntry')
})
await expect(k.delete('cloud.dify.ai', 'acct-1')).resolves.toBeUndefined()
})
it('lets put propagate exceptions (caller decides fallback)', async () => {
const k = new KeyringBackend()
setPassword.mockImplementationOnce(() => {
throw new Error('keyring locked')
})
await expect(k.put('cloud.dify.ai', 'acct-1', 'tok')).rejects.toThrow(/keyring locked/)
})
})

View File

@ -1,35 +0,0 @@
import type { TokenStore } from './store.js'
import { AsyncEntry } from '@napi-rs/keyring'
export const KEYRING_SERVICE = 'difyctl'
function username(host: string, accountId: string): string {
return `${host}::${accountId}`
}
export class KeyringBackend implements TokenStore {
async put(host: string, accountId: string, token: string): Promise<void> {
await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).setPassword(token)
}
async get(host: string, accountId: string): Promise<string | undefined> {
try {
const v = await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).getPassword()
return v ?? undefined
}
catch {
return undefined
}
}
async delete(host: string, accountId: string): Promise<void> {
try {
await new AsyncEntry(KEYRING_SERVICE, username(host, accountId)).deletePassword()
}
catch { /* missing entry is fine */ }
}
async list(_host: string): Promise<readonly string[]> {
return []
}
}

View File

@ -1,75 +0,0 @@
import type { TokenStore } from './store.js'
import { describe, expect, it, vi } from 'vitest'
import { selectStore } from './store.js'
function memBackend(label: string): TokenStore & { _label: string } {
const map = new Map<string, string>()
const k = (h: string, a: string) => `${h}::${a}`
return {
_label: label,
async put(h, a, t) { map.set(k(h, a), t) },
async get(h, a) { return map.get(k(h, a)) },
async delete(h, a) { map.delete(k(h, a)) },
async list() { return [] },
}
}
describe('selectStore', () => {
it('returns keychain when probe succeeds', async () => {
const k = memBackend('keyring')
const f = memBackend('file')
const result = await selectStore({
configDir: '/tmp/x',
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('keychain')
expect(result.store).toBe(k)
})
it('falls back to file when keyring put throws', async () => {
const k = memBackend('keyring')
const f = memBackend('file')
k.put = vi.fn().mockRejectedValue(new Error('locked'))
const result = await selectStore({
configDir: '/tmp/x',
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('falls back to file when probe round-trip mismatches', async () => {
const k = memBackend('keyring')
const f = memBackend('file')
k.get = vi.fn().mockResolvedValue('something-else')
const result = await selectStore({
configDir: '/tmp/x',
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('falls back to file when keyring constructor throws', async () => {
const f = memBackend('file')
const result = await selectStore({
configDir: '/tmp/x',
factory: {
keyring: () => { throw new Error('no backend') },
file: () => f,
},
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('cleans up probe entry after successful probe', async () => {
const k = memBackend('keyring')
const f = memBackend('file')
await selectStore({
configDir: '/tmp/x',
factory: { keyring: () => k, file: () => f },
})
expect(await k.get('__difyctl_probe__', '__probe__')).toBeUndefined()
})
})

View File

@ -1,40 +0,0 @@
import { FileBackend } from './file-backend.js'
import { KeyringBackend } from './keyring-backend.js'
export type TokenStore = {
put: (host: string, accountId: string, token: string) => Promise<void>
get: (host: string, accountId: string) => Promise<string | undefined>
delete: (host: string, accountId: string) => Promise<void>
list: (host: string) => Promise<readonly string[]>
}
export type StorageMode = 'keychain' | 'file'
export type SelectStoreOptions = {
readonly configDir: string
readonly factory?: {
readonly keyring?: () => TokenStore
readonly file?: (dir: string) => TokenStore
}
}
const PROBE_HOST = '__difyctl_probe__'
const PROBE_ACCOUNT = '__probe__'
const PROBE_VALUE = 'probe-v1'
export async function selectStore(opts: SelectStoreOptions): Promise<{ store: TokenStore, mode: StorageMode }> {
const fileFactory = opts.factory?.file ?? ((dir: string) => new FileBackend(dir))
const keyringFactory = opts.factory?.keyring ?? (() => new KeyringBackend())
try {
const k = keyringFactory()
await k.put(PROBE_HOST, PROBE_ACCOUNT, PROBE_VALUE)
const got = await k.get(PROBE_HOST, PROBE_ACCOUNT)
await k.delete(PROBE_HOST, PROBE_ACCOUNT)
if (got !== PROBE_VALUE)
throw new Error('keyring round-trip mismatch')
return { store: k, mode: 'keychain' }
}
catch {
return { store: fileFactory(opts.configDir), mode: 'file' }
}
}

View File

@ -4,8 +4,8 @@ import { tmpdir } from 'node:os'
import { join } from 'node:path'
import yaml from 'js-yaml'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { CACHE_APP_INFO, cachePath } from '../store/manager.js'
import { YamlStore } from '../store/store.js'
import { ENV_CACHE_DIR } from '../store/dir.js'
import { CACHE_APP_INFO, cachePath, getCache } from '../store/manager.js'
import { platform } from '../sys/index.js'
import { FieldInfo, FieldParameters } from '../types/app-meta.js'
import { APP_INFO_TTL_MS, loadAppInfoCache } from './app-info.js'
@ -35,18 +35,25 @@ function metaInfoOnly(): AppMeta {
describe('app-info disk cache', () => {
let dir: string
let prevCacheDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-cache-'))
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await rm(dir, { recursive: true, force: true })
})
it('round-trips an entry across reloads', async () => {
const c1 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c1 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await c1.set('http://localhost:9999', 'app-1', metaInfoOnly())
const c2 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c2 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const got = c2.get('http://localhost:9999', 'app-1')
expect(got).toBeDefined()
expect(got?.meta.info?.id).toBe('app-1')
@ -55,7 +62,7 @@ describe('app-info disk cache', () => {
it('isFresh respects TTL', async () => {
const now = new Date('2026-05-09T00:00:00Z')
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)), now: () => now })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO), now: () => now })
await c.set('h', 'app-1', metaInfoOnly())
const r = c.get('h', 'app-1')
expect(r).toBeDefined()
@ -66,23 +73,23 @@ describe('app-info disk cache', () => {
})
it('keys by (host, app_id) — different hosts isolate', async () => {
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await c.set('h1', 'app-1', metaInfoOnly())
expect(c.get('h2', 'app-1')).toBeUndefined()
expect(c.get('h1', 'app-1')).toBeDefined()
})
it('delete removes entry from disk', async () => {
const c1 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c1 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await c1.set('h', 'app-1', metaInfoOnly())
await c1.delete('h', 'app-1')
const c2 = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c2 = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
expect(c2.get('h', 'app-1')).toBeUndefined()
})
it('writes file with 0600 permission', async () => {
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await c.set('h', 'app-1', metaInfoOnly())
const { stat } = await import('node:fs/promises')
const s = await stat(appInfoPath(dir))
@ -91,19 +98,19 @@ describe('app-info disk cache', () => {
})
it('missing cache file is not an error', async () => {
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
expect(c.get('h', 'app-1')).toBeUndefined()
})
it('corrupt cache file is treated as empty', async () => {
const { writeFile } = await import('node:fs/promises')
await writeFile(appInfoPath(dir), ': : not valid yaml', 'utf8')
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
expect(c.get('h', 'app-1')).toBeUndefined()
})
it('updates same key in place (no growth)', async () => {
const c = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const c = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await c.set('h', 'app-1', metaInfoOnly())
const slim: AppMeta = {
...metaInfoOnly(),

View File

@ -3,8 +3,8 @@ import { tmpdir } from 'node:os'
import { dirname, join } from 'node:path'
import yaml from 'js-yaml'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { CACHE_NUDGE, cachePath } from '../store/manager.js'
import { YamlStore } from '../store/store.js'
import { ENV_CACHE_DIR } from '../store/dir.js'
import { CACHE_NUDGE, cachePath, getCache } from '../store/manager.js'
import { loadNudgeStore, WARN_INTERVAL_MS } from './nudge-store.js'
function nudgeStorePath(dir: string): string {
@ -15,21 +15,28 @@ const HOST = 'https://cloud.dify.ai'
describe('NudgeStore', () => {
let dir: string
let prevCacheDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-nudge-'))
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await rm(dir, { recursive: true, force: true })
})
it('canWarn=true when no prior record exists', async () => {
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)) })
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE) })
expect(store.canWarn(HOST)).toBe(true)
})
it('canWarn=false within the silence window, true past it', async () => {
const t0 = new Date('2026-05-19T12:00:00.000Z')
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
await store.markWarned(HOST)
expect(store.canWarn(HOST, new Date('2026-05-19T18:00:00.000Z'))).toBe(false)
expect(store.canWarn(HOST, new Date('2026-05-20T12:00:00.000Z'))).toBe(true)
@ -37,7 +44,7 @@ describe('NudgeStore', () => {
it('canWarn clamps negative elapsed under clock skew (treats as still in window)', async () => {
const t0 = new Date('2026-05-19T12:00:00.000Z')
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
await store.markWarned(HOST)
const pastClock = new Date('2026-05-19T11:00:00.000Z') // clock moved backwards 1h
expect(store.canWarn(HOST, pastClock)).toBe(false)
@ -45,22 +52,22 @@ describe('NudgeStore', () => {
it('markWarned persists across store reloads', async () => {
const t0 = new Date('2026-05-19T12:00:00.000Z')
const s1 = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
const s1 = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
await s1.markWarned(HOST)
const s2 = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t0 })
const s2 = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t0 })
expect(s2.canWarn(HOST)).toBe(false)
})
it('treats a corrupt cache file as empty', async () => {
const path = nudgeStorePath(dir)
await writeCacheFile(path, '{ not valid json')
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)) })
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE) })
expect(store.canWarn(HOST)).toBe(true)
})
it('writes ISO timestamps under warned/<host> on disk', async () => {
const t = new Date('2026-05-19T12:00:00.000Z')
const store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
const store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
await store.markWarned(HOST)
const raw = await readFile(nudgeStorePath(dir), 'utf8')
const parsed = yaml.load(raw) as Record<string, unknown>
@ -72,11 +79,11 @@ describe('NudgeStore', () => {
// warns about a different host. Without merge-on-write the second writer
// would clobber the first.
const t = new Date('2026-05-19T12:00:00.000Z')
const a = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
const b = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
const a = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
const b = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
await a.markWarned('https://a.example')
await b.markWarned('https://b.example')
const reread = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => t })
const reread = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => t })
expect(reread.canWarn('https://a.example')).toBe(false)
expect(reread.canWarn('https://b.example')).toBe(false)
})

View File

@ -12,7 +12,6 @@ import { BaseError } from '../../errors/base.js'
import { ErrorCode } from '../../errors/codes.js'
import { formatErrorForCli } from '../../errors/format.js'
import { createClient } from '../../http/client.js'
import { resolveConfigDir } from '../../store/dir.js'
import { realStreams } from '../../sys/io/streams'
import { hostWithScheme } from '../../util/host.js'
import { versionInfo } from '../../version/info.js'
@ -24,7 +23,6 @@ export type AuthedContext = {
readonly http: KyInstance
readonly host: string
readonly io: IOStreams
readonly configDir: string
readonly cache?: AppInfoCache
}
@ -38,9 +36,8 @@ export async function buildAuthedContext(
cmd: Pick<Command, 'error'>,
opts: AuthedContextOptions,
): Promise<AuthedContext> {
const configDir = resolveConfigDir()
const io = realStreams(opts.format ?? '')
const bundle = await loadHosts(configDir)
const bundle = loadHosts()
if (bundle === undefined || bundle.tokens?.bearer === undefined || bundle.tokens.bearer === '') {
const err = new BaseError({
code: ErrorCode.NotLoggedIn,
@ -61,7 +58,7 @@ export async function buildAuthedContext(
await runCompatNudge({ host, io })
return { bundle, http, host, io, configDir, cache }
return { bundle, http, host, io, cache }
}
// Best-effort nudge: never throws, never blocks. Lives here so every authed

View File

@ -2,7 +2,7 @@ import type { SessionListResponse, SessionRow } from '@dify/contracts/api/openap
import type { DifyMock } from '../../../../../test/fixtures/dify-mock/server.js'
import type { AccountSessionsClient } from '../../../../api/account-sessions.js'
import type { HostsBundle } from '../../../../auth/hosts.js'
import type { TokenStore } from '../../../../auth/store.js'
import type { Key, Store } from '../../../../store/store.js'
import { mkdtemp, readFile, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
@ -10,26 +10,23 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { startMock } from '../../../../../test/fixtures/dify-mock/server.js'
import { saveHosts } from '../../../../auth/hosts.js'
import { createClient } from '../../../../http/client.js'
import { ENV_CONFIG_DIR, resolveConfigDir } from '../../../../store/dir.js'
import { tokenKey } from '../../../../store/manager.js'
import { bufferStreams } from '../../../../sys/io/streams'
import { listAllSessions, runDevicesList, runDevicesRevoke } from './devices.js'
class MemStore implements TokenStore {
readonly entries = new Map<string, string>()
async put(host: string, accountId: string, token: string): Promise<void> {
this.entries.set(`${host}::${accountId}`, token)
class MemStore implements Store {
readonly entries = new Map<string, unknown>()
get<T>(key: Key<T>): T {
return (this.entries.get(key.key) as T | undefined) ?? key.default
}
async get(host: string, accountId: string): Promise<string | undefined> {
return this.entries.get(`${host}::${accountId}`)
set<T>(key: Key<T>, value: T): void {
this.entries.set(key.key, value)
}
async delete(host: string, accountId: string): Promise<void> {
this.entries.delete(`${host}::${accountId}`)
}
async list(host: string): Promise<readonly string[]> {
const prefix = `${host}::`
return Array.from(this.entries.keys()).filter(k => k.startsWith(prefix))
unset<T>(key: Key<T>): void {
this.entries.delete(key.key)
}
}
@ -93,11 +90,18 @@ describe('runDevicesList', () => {
describe('runDevicesRevoke', () => {
let mock: DifyMock
let configDir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
configDir = await mkdtemp(join(tmpdir(), 'difyctl-devrevoke-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = configDir
})
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await mock.stop()
await rm(configDir, { recursive: true, force: true })
})
@ -106,11 +110,11 @@ describe('runDevicesRevoke', () => {
const io = bufferStreams()
const store = new MemStore()
const b = bundleFor(mock.url, 'tok-1')
await store.put(b.current_host, 'acct-1', 'dfoa_test')
await saveHosts(configDir, b)
store.set(tokenKey(b.current_host, 'acct-1'), 'dfoa_test')
saveHosts(b)
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'difyctl on desktop', all: false })
await runDevicesRevoke({ io, bundle: b, http, store, target: 'difyctl on desktop', all: false })
expect(io.outBuf()).toContain('Revoked 1 session(s)')
expect(store.entries.size).toBe(1)
})
@ -121,7 +125,7 @@ describe('runDevicesRevoke', () => {
const b = bundleFor(mock.url, 'tok-1')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'tok-2', all: false })
await runDevicesRevoke({ io, bundle: b, http, store, target: 'tok-2', all: false })
expect(io.outBuf()).toContain('Revoked 1 session(s)')
})
@ -131,7 +135,7 @@ describe('runDevicesRevoke', () => {
const b = bundleFor(mock.url, 'tok-1')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'web', all: false })
await runDevicesRevoke({ io, bundle: b, http, store, target: 'web', all: false })
expect(io.outBuf()).toContain('Revoked 1 session(s)')
})
@ -141,7 +145,7 @@ describe('runDevicesRevoke', () => {
const b = bundleFor(mock.url, 'tok-1')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await expect(runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'difyctl', all: false }))
await expect(runDevicesRevoke({ io, bundle: b, http, store, target: 'difyctl', all: false }))
.rejects
.toThrow(/matches multiple/)
})
@ -152,7 +156,7 @@ describe('runDevicesRevoke', () => {
const b = bundleFor(mock.url, 'tok-1')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await expect(runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'nonexistent', all: false }))
await expect(runDevicesRevoke({ io, bundle: b, http, store, target: 'nonexistent', all: false }))
.rejects
.toThrow(/no session matches/)
})
@ -163,7 +167,7 @@ describe('runDevicesRevoke', () => {
const b = bundleFor(mock.url, 'tok-1')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runDevicesRevoke({ configDir, io, bundle: b, http, store, all: true })
await runDevicesRevoke({ io, bundle: b, http, store, all: true })
expect(io.outBuf()).toContain('Revoked 2 session(s)')
})
@ -171,20 +175,20 @@ describe('runDevicesRevoke', () => {
const io = bufferStreams()
const store = new MemStore()
const b = bundleFor(mock.url, 'tok-1')
await store.put(b.current_host, 'acct-1', 'dfoa_test')
await saveHosts(configDir, b)
store.set(tokenKey(b.current_host, 'acct-1'), 'dfoa_test')
saveHosts(b)
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runDevicesRevoke({ configDir, io, bundle: b, http, store, target: 'tok-1', all: false })
await runDevicesRevoke({ io, bundle: b, http, store, target: 'tok-1', all: false })
expect(store.entries.size).toBe(0)
await expect(readFile(join(configDir, 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
await expect(readFile(join(resolveConfigDir(), 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
})
it('no target + no --all: throws UsageMissingArg', async () => {
const io = bufferStreams()
const store = new MemStore()
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await expect(runDevicesRevoke({ configDir, io, bundle: bundleFor(mock.url), http, store, all: false }))
await expect(runDevicesRevoke({ io, bundle: bundleFor(mock.url), http, store, all: false }))
.rejects
.toThrow(/specify a device label/)
})

View File

@ -1,15 +1,13 @@
import type { SessionRow } from '@dify/contracts/api/openapi/types.gen'
import type { KyInstance } from 'ky'
import type { HostsBundle } from '../../../../auth/hosts.js'
import type { TokenStore } from '../../../../auth/store.js'
import type { Store } from '../../../../store/store.js'
import type { IOStreams } from '../../../../sys/io/streams'
import { unlink } from 'node:fs/promises'
import { join } from 'node:path'
import { AccountSessionsClient } from '../../../../api/account-sessions.js'
import { HOSTS_FILE_NAME } from '../../../../auth/hosts.js'
import { BaseError } from '../../../../errors/base.js'
import { ErrorCode } from '../../../../errors/codes.js'
import { LIMIT_DEFAULT, LIMIT_MAX, parseLimit } from '../../../../limit/limit.js'
import { getHostStore, getTokenStore, tokenKey } from '../../../../store/manager.js'
import { colorEnabled, colorScheme } from '../../../../sys/io/color.js'
import { runWithSpinner } from '../../../../sys/io/spinner.js'
@ -72,11 +70,11 @@ export async function listAllSessions(client: AccountSessionsClient): Promise<re
}
export type DevicesRevokeOptions = {
readonly configDir: string
readonly io: IOStreams
readonly bundle: HostsBundle | undefined
readonly http: KyInstance
readonly store: TokenStore
/** Optional override for tests; production code resolves via `getTokenStore`. */
readonly store?: Store
readonly target?: string
readonly all: boolean
readonly yes?: boolean
@ -104,8 +102,10 @@ export async function runDevicesRevoke(opts: DevicesRevokeOptions): Promise<void
for (const id of ids)
await sessions.revoke(id)
if (selfHit)
await clearLocal(opts.configDir, b, opts.store)
if (selfHit) {
const tokens = opts.store ?? (await getTokenStore()).store
clearLocal(b, tokens)
}
opts.io.out.write(`${cs.successIcon()} Revoked ${ids.length} session(s)\n`)
}
@ -179,17 +179,10 @@ function renderTable(rows: readonly SessionRow[], currentId: string): string {
return body.length === 0 ? `${fmt(header)}\n` : `${[fmt(header), ...body.map(fmt)].join('\n')}\n`
}
async function clearLocal(configDir: string, bundle: HostsBundle, store: TokenStore): Promise<void> {
function clearLocal(bundle: HostsBundle, store: Store): void {
const accountId = bundle.account?.id ?? bundle.external_subject?.email ?? 'default'
try {
await store.delete(bundle.current_host, accountId)
store.unset(tokenKey(bundle.current_host, accountId))
}
catch { /* best-effort */ }
try {
await unlink(join(configDir, HOSTS_FILE_NAME))
}
catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
throw err
}
}

View File

@ -1,4 +1,3 @@
import { selectStore } from '../../../../auth/store.js'
import { Args, Flags } from '../../../../framework/flags.js'
import { DifyCommand } from '../../../_shared/dify-command.js'
import { httpRetryFlag } from '../../../_shared/global-flags.js'
@ -25,13 +24,10 @@ export default class DevicesRevoke extends DifyCommand {
async run(argv: string[]): Promise<void> {
const { args, flags } = this.parse(DevicesRevoke, argv)
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'] })
const { store } = await selectStore({ configDir: ctx.configDir })
await runDevicesRevoke({
configDir: ctx.configDir,
io: ctx.io,
bundle: ctx.bundle,
http: ctx.http,
store,
target: args.target,
all: flags.all,
yes: flags.yes,

View File

@ -1,5 +1,4 @@
import { Flags } from '../../../framework/flags.js'
import { resolveConfigDir } from '../../../store/dir.js'
import { realStreams } from '../../../sys/io/streams'
import { DifyCommand } from '../../_shared/dify-command.js'
import { runLogin } from './login.js'
@ -31,7 +30,6 @@ export default class Login extends DifyCommand {
async run(argv: string[]): Promise<void> {
const { flags } = this.parse(Login, argv)
await runLogin({
configDir: resolveConfigDir(),
io: realStreams(),
host: flags.host,
noBrowser: flags['no-browser'],

View File

@ -1,5 +1,5 @@
import type { DifyMock } from '../../../../test/fixtures/dify-mock/server.js'
import type { TokenStore } from '../../../auth/store.js'
import type { Key, Store } from '../../../store/store.js'
import type { Clock } from './device-flow.js'
import { mkdtemp, readFile, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
@ -8,6 +8,8 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
import { DeviceFlowApi } from '../../../api/oauth-device.js'
import { createClient } from '../../../http/client.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { tokenKey } from '../../../store/manager.js'
import { bufferStreams } from '../../../sys/io/streams'
import { runLogin } from './login.js'
@ -18,38 +20,38 @@ const noopClock: Clock = {
const noopBrowser = async (): Promise<void> => { /* skip OS open */ }
class MemStore implements TokenStore {
readonly entries = new Map<string, string>()
async put(host: string, accountId: string, token: string): Promise<void> {
this.entries.set(`${host}::${accountId}`, token)
class MemStore implements Store {
readonly entries = new Map<string, unknown>()
get<T>(key: Key<T>): T {
return (this.entries.get(key.key) as T | undefined) ?? key.default
}
async get(host: string, accountId: string): Promise<string | undefined> {
return this.entries.get(`${host}::${accountId}`)
set<T>(key: Key<T>, value: T): void {
this.entries.set(key.key, value)
}
async delete(host: string, accountId: string): Promise<void> {
this.entries.delete(`${host}::${accountId}`)
}
async list(host: string): Promise<readonly string[]> {
const prefix = `${host}::`
return Array.from(this.entries.keys())
.filter(k => k.startsWith(prefix))
.map(k => k.slice(prefix.length))
unset<T>(key: Key<T>): void {
this.entries.delete(key.key)
}
}
describe('runLogin', () => {
let mock: DifyMock
let configDir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
configDir = await mkdtemp(join(tmpdir(), 'difyctl-login-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = configDir
})
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await mock.stop()
await rm(configDir, { recursive: true, force: true })
})
@ -58,7 +60,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
const bundle = await runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,
@ -73,7 +74,7 @@ describe('runLogin', () => {
expect(bundle.account?.email).toBe('tester@dify.ai')
expect(bundle.workspace?.id).toBe('ws-1')
expect(bundle.available_workspaces).toHaveLength(2)
const stored = await store.get(bundle.current_host, 'acct-1')
const stored = store.get(tokenKey(bundle.current_host, 'acct-1'))
expect(stored).toBe('dfoa_test')
const hostsRaw = await readFile(join(configDir, 'hosts.yml'), 'utf8')
@ -91,7 +92,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
const bundle = await runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,
@ -115,7 +115,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
await expect(runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,
@ -135,7 +134,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
await expect(runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,
@ -152,7 +150,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
await expect(runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,
@ -169,7 +166,6 @@ describe('runLogin', () => {
const io = bufferStreams()
const store = new MemStore()
await runLogin({
configDir,
io,
host: mock.url,
noBrowser: true,

View File

@ -1,6 +1,6 @@
import type { CodeResponse, PollSuccess } from '../../../api/oauth-device.js'
import type { HostsBundle, StorageMode, Workspace } from '../../../auth/hosts.js'
import type { TokenStore } from '../../../auth/store.js'
import type { HostsBundle, Workspace } from '../../../auth/hosts.js'
import type { StorageMode, Store } from '../../../store/store.js'
import type { IOStreams } from '../../../sys/io/streams'
import type { BrowserEnv, BrowserOpener } from '../../../util/browser.js'
import type { Clock } from './device-flow.js'
@ -8,21 +8,20 @@ import * as os from 'node:os'
import * as readline from 'node:readline'
import { DeviceFlowApi } from '../../../api/oauth-device.js'
import { saveHosts } from '../../../auth/hosts.js'
import { selectStore } from '../../../auth/store.js'
import { createClient } from '../../../http/client.js'
import { getTokenStore, tokenKey } from '../../../store/manager.js'
import { colorEnabled, colorScheme } from '../../../sys/io/color.js'
import { decideOpen, OpenDecision, openUrl, realEnv } from '../../../util/browser.js'
import { bareHost, DEFAULT_HOST, resolveHost, validateVerificationURI } from '../../../util/host.js'
import { awaitAuthorization, realClock } from './device-flow.js'
export type LoginOptions = {
readonly configDir: string
readonly io: IOStreams
readonly host?: string
readonly noBrowser?: boolean
readonly insecure?: boolean
readonly deviceLabel?: string
readonly store?: { readonly store: TokenStore, readonly mode: StorageMode }
readonly store?: { readonly store: Store, readonly mode: StorageMode }
readonly api?: DeviceFlowApi
readonly browserEnv?: BrowserEnv
readonly browserOpener?: BrowserOpener
@ -59,11 +58,11 @@ export async function runLogin(opts: LoginOptions): Promise<HostsBundle> {
const success = await awaitAuthorization(api, code, { clock: opts.clock ?? realClock() })
const storeBundle = opts.store ?? await selectStore({ configDir: opts.configDir })
const storeBundle = opts.store ?? await getTokenStore()
const bundle = bundleFromSuccess(host, success, storeBundle.mode)
await storeBundle.store.put(bundle.current_host, accountKey(bundle), success.token)
await saveHosts(opts.configDir, bundle)
storeBundle.store.set(tokenKey(bundle.current_host, accountKey(bundle)), success.token)
saveHosts(bundle)
renderLoggedIn(opts.io.out, cs, host, success)
return bundle

View File

@ -1,8 +1,6 @@
import type { KyInstance } from 'ky'
import { loadHosts } from '../../../auth/hosts.js'
import { selectStore } from '../../../auth/store.js'
import { createClient } from '../../../http/client.js'
import { resolveConfigDir } from '../../../store/dir.js'
import { runWithSpinner } from '../../../sys/io/spinner.js'
import { realStreams } from '../../../sys/io/streams'
import { hostWithScheme } from '../../../util/host.js'
@ -18,9 +16,7 @@ export default class Logout extends DifyCommand {
async run(argv: string[]): Promise<void> {
this.parse(Logout, argv)
const configDir = resolveConfigDir()
const bundle = await loadHosts(configDir)
const { store } = await selectStore({ configDir })
const bundle = loadHosts()
let http: KyInstance | undefined
if (bundle !== undefined && bundle.current_host !== '' && bundle.tokens?.bearer !== undefined && bundle.tokens.bearer !== '') {
@ -34,7 +30,7 @@ export default class Logout extends DifyCommand {
const io = realStreams()
await runWithSpinner(
{ io, label: 'Signing out', enabled: true, style: 'dify-dim' },
() => runLogout({ configDir, io, bundle, http, store }),
() => runLogout({ io, bundle, http }),
)
}
}

View File

@ -1,6 +1,6 @@
import type { DifyMock } from '../../../../test/fixtures/dify-mock/server.js'
import type { HostsBundle } from '../../../auth/hosts.js'
import type { TokenStore } from '../../../auth/store.js'
import type { Key, Store } from '../../../store/store.js'
import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
@ -8,28 +8,23 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
import { saveHosts } from '../../../auth/hosts.js'
import { createClient } from '../../../http/client.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { tokenKey } from '../../../store/manager.js'
import { bufferStreams } from '../../../sys/io/streams'
import { runLogout } from './logout.js'
class MemStore implements TokenStore {
readonly entries = new Map<string, string>()
async put(host: string, accountId: string, token: string): Promise<void> {
this.entries.set(`${host}::${accountId}`, token)
class MemStore implements Store {
readonly entries = new Map<string, unknown>()
get<T>(key: Key<T>): T {
return (this.entries.get(key.key) as T | undefined) ?? key.default
}
async get(host: string, accountId: string): Promise<string | undefined> {
return this.entries.get(`${host}::${accountId}`)
set<T>(key: Key<T>, value: T): void {
this.entries.set(key.key, value)
}
async delete(host: string, accountId: string): Promise<void> {
this.entries.delete(`${host}::${accountId}`)
}
async list(host: string): Promise<readonly string[]> {
const prefix = `${host}::`
return Array.from(this.entries.keys())
.filter(k => k.startsWith(prefix))
.map(k => k.slice(prefix.length))
unset<T>(key: Key<T>): void {
this.entries.delete(key.key)
}
}
@ -52,13 +47,20 @@ function fixtureBundle(host: string): HostsBundle {
describe('runLogout', () => {
let mock: DifyMock
let configDir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
configDir = await mkdtemp(join(tmpdir(), 'difyctl-logout-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = configDir
})
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await mock.stop()
await rm(configDir, { recursive: true, force: true })
})
@ -67,11 +69,11 @@ describe('runLogout', () => {
const io = bufferStreams()
const store = new MemStore()
const bundle = fixtureBundle(mock.url)
await store.put(bundle.current_host, 'acct-1', 'dfoa_test')
await saveHosts(configDir, bundle)
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfoa_test')
saveHosts(bundle)
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runLogout({ configDir, io, bundle, http, store })
await runLogout({ io, bundle, http, store })
expect(store.entries.size).toBe(0)
await expect(readFile(join(configDir, 'hosts.yml'), 'utf8')).rejects.toThrow(/ENOENT/)
@ -82,7 +84,7 @@ describe('runLogout', () => {
it('not-logged-in: throws BaseError', async () => {
const io = bufferStreams()
const store = new MemStore()
await expect(runLogout({ configDir, io, bundle: undefined, store })).rejects.toThrow(/not logged in/)
await expect(runLogout({ io, bundle: undefined, store })).rejects.toThrow(/not logged in/)
})
it('hosts.yml absent: still completes locally + emits success', async () => {
@ -91,7 +93,7 @@ describe('runLogout', () => {
const bundle = fixtureBundle(mock.url)
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runLogout({ configDir, io, bundle, http, store })
await runLogout({ io, bundle, http, store })
expect(io.outBuf()).toContain('Logged out of')
})
@ -100,12 +102,12 @@ describe('runLogout', () => {
const io = bufferStreams()
const store = new MemStore()
const bundle = fixtureBundle(mock.url)
await store.put(bundle.current_host, 'acct-1', 'dfoa_test')
await saveHosts(configDir, bundle)
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfoa_test')
saveHosts(bundle)
mock.setScenario('server-5xx')
const http = createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 })
await runLogout({ configDir, io, bundle, http, store })
await runLogout({ io, bundle, http, store })
expect(store.entries.size).toBe(0)
expect(io.errBuf()).toContain('server revoke failed')
@ -117,11 +119,11 @@ describe('runLogout', () => {
const store = new MemStore()
const bundle = fixtureBundle(mock.url)
bundle.tokens = { bearer: 'dfp_personal_token' }
await store.put(bundle.current_host, 'acct-1', 'dfp_personal_token')
await saveHosts(configDir, bundle)
store.set(tokenKey(bundle.current_host, 'acct-1'), 'dfp_personal_token')
saveHosts(bundle)
const http = createClient({ host: mock.url, bearer: 'dfp_personal_token' })
await runLogout({ configDir, io, bundle, http, store })
await runLogout({ io, bundle, http, store })
expect(io.errBuf()).toBe('')
expect(store.entries.size).toBe(0)
@ -131,11 +133,11 @@ describe('runLogout', () => {
const io = bufferStreams()
const store = new MemStore()
const bundle = fixtureBundle(mock.url)
await saveHosts(configDir, bundle)
saveHosts(bundle)
await writeFile(join(configDir, 'config.yml'), 'foo: bar\n', 'utf8')
const http = createClient({ host: mock.url, bearer: 'dfoa_test' })
await runLogout({ configDir, io, bundle, http, store })
await runLogout({ io, bundle, http, store })
const cfg = await readFile(join(configDir, 'config.yml'), 'utf8')
expect(cfg).toContain('foo: bar')

View File

@ -1,21 +1,19 @@
import type { KyInstance } from 'ky'
import type { HostsBundle } from '../../../auth/hosts.js'
import type { TokenStore } from '../../../auth/store.js'
import type { Store } from '../../../store/store.js'
import type { IOStreams } from '../../../sys/io/streams'
import { unlink } from 'node:fs/promises'
import { join } from 'node:path'
import { AccountSessionsClient } from '../../../api/account-sessions.js'
import { HOSTS_FILE_NAME } from '../../../auth/hosts.js'
import { BaseError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { getHostStore, getTokenStore, tokenKey } from '../../../store/manager.js'
import { colorEnabled, colorScheme } from '../../../sys/io/color.js'
export type LogoutOptions = {
readonly configDir: string
readonly io: IOStreams
readonly bundle: HostsBundle | undefined
readonly http?: KyInstance
readonly store: TokenStore
/** Optional override for tests; production code resolves via `getTokenStore`. */
readonly store?: Store
}
export async function runLogout(opts: LogoutOptions): Promise<void> {
@ -40,7 +38,8 @@ export async function runLogout(opts: LogoutOptions): Promise<void> {
}
}
await clearLocal(opts.configDir, bundle, opts.store)
const tokens = opts.store ?? (await getTokenStore()).store
clearLocal(bundle, tokens)
if (revokeWarning !== '')
opts.io.err.write(revokeWarning)
@ -53,18 +52,10 @@ function revokeAllowed(bearer: string): boolean {
return REVOCABLE_PREFIXES.some(p => bearer.startsWith(p))
}
async function clearLocal(configDir: string, bundle: HostsBundle, store: TokenStore): Promise<void> {
function clearLocal(bundle: HostsBundle, store: Store): void {
const accountId = bundle.account?.id ?? bundle.external_subject?.email ?? 'default'
try {
await store.delete(bundle.current_host, accountId)
store.unset(tokenKey(bundle.current_host, accountId))
}
catch { /* best-effort */ }
const hostsPath = join(configDir, HOSTS_FILE_NAME)
try {
await unlink(hostsPath)
}
catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
throw err
}
}

View File

@ -1,6 +1,5 @@
import { loadHosts } from '../../../auth/hosts.js'
import { Flags } from '../../../framework/flags.js'
import { resolveConfigDir } from '../../../store/dir.js'
import { realStreams } from '../../../sys/io/streams'
import { DifyCommand } from '../../_shared/dify-command.js'
import { runStatus } from './status.js'
@ -21,8 +20,7 @@ export default class Status extends DifyCommand {
async run(argv: string[]): Promise<void> {
const { flags } = this.parse(Status, argv)
const configDir = resolveConfigDir()
const bundle = await loadHosts(configDir)
const bundle = loadHosts()
await runStatus({ io: realStreams(), bundle, verbose: flags.verbose, json: flags.json })
}
}

View File

@ -1,6 +1,5 @@
import { loadHosts } from '../../../auth/hosts.js'
import { Flags } from '../../../framework/flags.js'
import { resolveConfigDir } from '../../../store/dir.js'
import { realStreams } from '../../../sys/io/streams'
import { DifyCommand } from '../../_shared/dify-command.js'
import { runWhoami } from './whoami.js'
@ -19,8 +18,7 @@ export default class Whoami extends DifyCommand {
async run(argv: string[]): Promise<void> {
const { flags } = this.parse(Whoami, argv)
const configDir = resolveConfigDir()
const bundle = await loadHosts(configDir)
const bundle = loadHosts()
await runWhoami({ io: realStreams(), bundle, json: flags.json })
}
}

View File

@ -1,43 +1,49 @@
import { mkdtemp, writeFile } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, describe, expect, it } from 'vitest'
import { FILE_NAME } from '../../../config/schema.js'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { isBaseError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { getConfigurationStore } from '../../../store/manager.js'
import { runConfigGet } from './run.js'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
describe('runConfigGet', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-get-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
it('returns set value with trailing newline', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: yaml\n',
'utf8',
)
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.format' })
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('returns set value with trailing newline', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'yaml' },
})
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.format' })
expect(out).toBe('yaml\n')
})
it('returns empty line when key is unset (matches Go fmt.Fprintln)', () => {
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.format' })
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.format' })
expect(out).toBe('\n')
})
it('throws BaseError(config_invalid_key) on unknown key', () => {
let caught: unknown
try {
runConfigGet({ store: makeStore(dir), key: 'bogus.key' })
runConfigGet({ store: getConfigurationStore(), key: 'bogus.key' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -45,13 +51,12 @@ describe('runConfigGet', () => {
expect(caught.code).toBe(ErrorCode.ConfigInvalidKey)
})
it('returns numeric limit as string', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n limit: 75\n',
'utf8',
)
const out = runConfigGet({ store: makeStore(dir), key: 'defaults.limit' })
it('returns numeric limit as string', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { limit: 75 },
})
const out = runConfigGet({ store: getConfigurationStore(), key: 'defaults.limit' })
expect(out).toBe('75\n')
})
})

View File

@ -1,7 +1,8 @@
import { join } from 'node:path'
import { raw } from '../../../framework/output.js'
import { resolveConfigDir } from '../../../store/dir.js'
import { CONFIG_FILE_NAME } from '../../../store/manager.js'
import { DifyCommand } from '../../_shared/dify-command.js'
import { runConfigPath } from './run.js'
export default class ConfigPath extends DifyCommand {
static override description = 'Print the resolved config.yml path'
@ -12,6 +13,8 @@ export default class ConfigPath extends DifyCommand {
async run(argv: string[]) {
this.parse(ConfigPath, argv)
return raw(runConfigPath({ dir: resolveConfigDir() }))
return raw(
join(resolveConfigDir(), CONFIG_FILE_NAME),
)
}
}

View File

@ -1,14 +0,0 @@
import { describe, expect, it } from 'vitest'
import { runConfigPath } from './run.js'
describe('runConfigPath', () => {
it('joins dir and config.yml with trailing newline', () => {
const out = runConfigPath({ dir: '/tmp/x' })
expect(out).toBe('/tmp/x/config.yml\n')
})
it('handles trailing slash on dir', () => {
const out = runConfigPath({ dir: '/tmp/x/' })
expect(out).toBe('/tmp/x/config.yml\n')
})
})

View File

@ -1,10 +0,0 @@
import { join } from 'node:path'
import { FILE_NAME } from '../../../config/schema.js'
export type RunConfigPathOptions = {
readonly dir: string
}
export function runConfigPath(opts: RunConfigPathOptions): string {
return `${join(opts.dir, FILE_NAME)}\n`
}

View File

@ -1,35 +1,46 @@
import { mkdtemp, readFile } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, describe, expect, it } from 'vitest'
import { FILE_NAME } from '../../../config/schema.js'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { loadConfig } from '../../../config/config-loader.js'
import { isBaseError } from '../../../errors/base.js'
import { ErrorCode, ExitCode } from '../../../errors/codes.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { getConfigurationStore } from '../../../store/manager.js'
import { runConfigSet } from './run.js'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
describe('runConfigSet', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-set-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
it('writes config.yml and returns "set k = v\\n"', async () => {
const out = runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'json' })
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('persists the value and returns "set k = v\\n"', () => {
const out = runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'json' })
expect(out).toBe('set defaults.format = json\n')
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
expect(raw).toContain('format: json')
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found)
expect(r.config.defaults.format).toBe('json')
})
it('rejects invalid format value with config_invalid_value', async () => {
it('rejects invalid format value with config_invalid_value', () => {
let caught: unknown
try {
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'csv' })
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'csv' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -40,7 +51,7 @@ describe('runConfigSet', () => {
it('rejects unknown key with config_invalid_key', () => {
let caught: unknown
try {
runConfigSet({ store: makeStore(dir), key: 'bogus', value: 'x' })
runConfigSet({ store: getConfigurationStore(), key: 'bogus', value: 'x' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -48,18 +59,22 @@ describe('runConfigSet', () => {
expect(caught.code).toBe(ErrorCode.ConfigInvalidKey)
})
it('preserves prior keys when setting a new one', async () => {
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'yaml' })
runConfigSet({ store: makeStore(dir), key: 'defaults.limit', value: '40' })
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
expect(raw).toContain('format: yaml')
expect(raw).toContain('limit: 40')
it('preserves prior keys when setting a new one', () => {
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'yaml' })
runConfigSet({ store: getConfigurationStore(), key: 'defaults.limit', value: '40' })
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found) {
expect(r.config.defaults.format).toBe('yaml')
expect(r.config.defaults.limit).toBe(40)
}
})
it('exit code for invalid value is Usage (2)', () => {
let caught: unknown
try {
runConfigSet({ store: makeStore(dir), key: 'defaults.format', value: 'csv' })
runConfigSet({ store: getConfigurationStore(), key: 'defaults.format', value: 'csv' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -70,7 +85,7 @@ describe('runConfigSet', () => {
it('exit code for unknown key is Usage (2)', () => {
let caught: unknown
try {
runConfigSet({ store: makeStore(dir), key: 'bogus', value: 'x' })
runConfigSet({ store: getConfigurationStore(), key: 'bogus', value: 'x' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -81,7 +96,7 @@ describe('runConfigSet', () => {
it('typed wrap chain: invalid defaults.limit surfaces ConfigInvalidValue (not UsageInvalidFlag)', () => {
let caught: unknown
try {
runConfigSet({ store: makeStore(dir), key: 'defaults.limit', value: 'abc' })
runConfigSet({ store: getConfigurationStore(), key: 'defaults.limit', value: 'abc' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)

View File

@ -1,48 +1,61 @@
import { mkdtemp, readFile, writeFile } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, describe, expect, it } from 'vitest'
import { FILE_NAME } from '../../../config/schema.js'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { loadConfig } from '../../../config/config-loader.js'
import { isBaseError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { getConfigurationStore } from '../../../store/manager.js'
import { runConfigUnset } from './run.js'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
describe('runConfigUnset', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-unset-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
it('clears the requested key, leaves others intact', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: json\n limit: 25\n',
'utf8',
)
const out = runConfigUnset({ store: makeStore(dir), key: 'defaults.format' })
expect(out).toBe('unset defaults.format\n')
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
expect(raw).not.toContain('format:')
expect(raw).toContain('limit: 25')
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('is a no-op (writes empty config) when key was already unset', async () => {
const out = runConfigUnset({ store: makeStore(dir), key: 'defaults.format' })
it('clears the requested key, leaves others intact', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'json', limit: 25 },
})
const out = runConfigUnset({ store: getConfigurationStore(), key: 'defaults.format' })
expect(out).toBe('unset defaults.format\n')
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
expect(raw).toContain('schema_version: 1')
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found) {
expect(r.config.defaults.format).not.toBe('json')
expect(r.config.defaults.limit).toBe(25)
}
})
it('is a no-op (writes empty config) when key was already unset', () => {
const out = runConfigUnset({ store: getConfigurationStore(), key: 'defaults.format' })
expect(out).toBe('unset defaults.format\n')
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found)
expect(r.config.schema_version).toBe(1)
})
it('rejects unknown key', () => {
let caught: unknown
try {
runConfigUnset({ store: makeStore(dir), key: 'bogus' })
runConfigUnset({ store: getConfigurationStore(), key: 'bogus' })
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)

View File

@ -1,67 +1,69 @@
import { mkdtemp, writeFile } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { FILE_NAME } from '../../../config/schema.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { getConfigurationStore } from '../../../store/manager.js'
import { runConfigView } from './run.js'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
describe('runConfigView', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-view-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
afterEach(async () => {
// tmpdir cleanup is best-effort
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('text format: empty config returns empty string', () => {
const out = runConfigView({ store: makeStore(dir) })
const out = runConfigView({ store: getConfigurationStore() })
expect(out).toBe('')
})
it('text format: emits "key = value" lines for set keys only', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: json\n limit: 50\nstate:\n current_app: app-1\n',
'utf8',
)
const out = runConfigView({ store: makeStore(dir) })
it('text format: emits "key = value" lines for set keys only', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'json', limit: 50 },
state: { current_app: 'app-1' },
})
const out = runConfigView({ store: getConfigurationStore() })
expect(out).toBe(
'defaults.format = json\ndefaults.limit = 50\nstate.current_app = app-1\n',
)
})
it('text format: skips unset keys', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: yaml\n',
'utf8',
)
const out = runConfigView({ store: makeStore(dir) })
it('text format: skips unset keys', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'yaml' },
})
const out = runConfigView({ store: getConfigurationStore() })
expect(out).toBe('defaults.format = yaml\n')
expect(out).not.toContain('defaults.limit')
expect(out).not.toContain('state.current_app')
})
it('json format: empty config returns "{}\\n"', () => {
const out = runConfigView({ store: makeStore(dir), json: true })
const out = runConfigView({ store: getConfigurationStore(), json: true })
expect(out).toBe('{}\n')
})
it('json format: defaults.limit is numeric, others are strings', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: table\n limit: 100\nstate:\n current_app: app-x\n',
'utf8',
)
const out = runConfigView({ store: makeStore(dir), json: true })
it('json format: defaults.limit is numeric, others are strings', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'table', limit: 100 },
state: { current_app: 'app-x' },
})
const out = runConfigView({ store: getConfigurationStore(), json: true })
const parsed = JSON.parse(out) as Record<string, unknown>
expect(parsed['defaults.format']).toBe('table')
expect(parsed['defaults.limit']).toBe(100)
@ -69,7 +71,7 @@ describe('runConfigView', () => {
})
it('json format: trailing newline matches Go encoder.Encode', () => {
const out = runConfigView({ store: makeStore(dir), json: true })
const out = runConfigView({ store: getConfigurationStore(), json: true })
expect(out.endsWith('\n')).toBe(true)
})
})

View File

@ -8,8 +8,8 @@ import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
import { loadAppInfoCache } from '../../../cache/app-info.js'
import { formatted, stringifyOutput } from '../../../framework/output.js'
import { createClient } from '../../../http/client.js'
import { CACHE_APP_INFO, cachePath } from '../../../store/manager.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CACHE_DIR } from '../../../store/dir.js'
import { CACHE_APP_INFO, getCache } from '../../../store/manager.js'
import { runDescribeApp } from './run.js'
function bundle(): HostsBundle {
@ -29,17 +29,24 @@ function bundle(): HostsBundle {
describe('runDescribeApp', () => {
let mock: DifyMock
let dir: string
let prevCacheDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
dir = await mkdtemp(join(tmpdir(), 'difyctl-desc-'))
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await mock.stop()
await rm(dir, { recursive: true, force: true })
})
async function render(opts: Parameters<typeof runDescribeApp>[0]): Promise<string> {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const data = await runDescribeApp(
opts,
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, cache },
@ -82,7 +89,7 @@ describe('runDescribeApp', () => {
})
it('refresh: bypasses cache', async () => {
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runDescribeApp(
{ appId: 'app-1' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, cache },

View File

@ -7,8 +7,8 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
import { loadAppInfoCache } from '../../../cache/app-info.js'
import { createClient } from '../../../http/client.js'
import { CACHE_APP_INFO, cachePath } from '../../../store/manager.js'
import { YamlStore } from '../../../store/store.js'
import { ENV_CACHE_DIR } from '../../../store/dir.js'
import { CACHE_APP_INFO, getCache } from '../../../store/manager.js'
import { bufferStreams } from '../../../sys/io/streams'
import { resumeApp } from '../../resume/app/run.js'
import { runApp } from './run.js'
@ -30,18 +30,25 @@ function bundle(): HostsBundle {
describe('runApp', () => {
let mock: DifyMock
let dir: string
let prevCacheDir: string | undefined
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
dir = await mkdtemp(join(tmpdir(), 'difyctl-runapp-'))
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await mock.stop()
await rm(dir, { recursive: true, force: true })
})
it('chat: prints answer + conversation hint to stderr', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -52,7 +59,7 @@ describe('runApp', () => {
it('workflow: rejects positional message with usage error', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await expect(runApp(
{ appId: 'app-2', message: 'hi' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -61,7 +68,7 @@ describe('runApp', () => {
it('workflow: prints single-string output as plain text', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' } },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -71,7 +78,7 @@ describe('runApp', () => {
it('json: passes through full envelope', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', format: 'json' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -104,7 +111,7 @@ describe('runApp', () => {
it('--stream chat: streams answer to stdout and hint to stderr', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -116,7 +123,7 @@ describe('runApp', () => {
it('--stream -o json chat: aggregates into blocking-shape envelope', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true, format: 'json' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -129,7 +136,7 @@ describe('runApp', () => {
it('agent-chat without --stream: collects and prints answer', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-4', workspace: 'ws-2', message: 'do research' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -140,7 +147,7 @@ describe('runApp', () => {
it('agent-chat with --stream: live-prints answer and thoughts to stderr', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-4', workspace: 'ws-2', message: 'go', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -151,7 +158,7 @@ describe('runApp', () => {
it('--stream workflow -o json: aggregates from workflow_finished', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true, format: 'json' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -164,7 +171,7 @@ describe('runApp', () => {
it('stream-error scenario: error event surfaces typed BaseError', async () => {
mock.setScenario('stream-error')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await expect(runApp(
{ appId: 'app-1', message: 'hi', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 }), host: mock.url, io, cache },
@ -173,7 +180,7 @@ describe('runApp', () => {
it('--inputs-file: reads inputs from file', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const inputsFile = join(dir, 'inputs.json')
const { writeFile } = await import('node:fs/promises')
await writeFile(inputsFile, JSON.stringify({ x: 'from-file' }))
@ -197,7 +204,7 @@ describe('runApp', () => {
it('--inputs: accepts JSON object string', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputsJson: '{"x":"hello"}' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -219,7 +226,7 @@ describe('runApp', () => {
it('hitl pause (text): writes readable block to stdout, hint to stderr, exits 0', async () => {
mock.setScenario('hitl-pause')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
let exitCode = -1
await expect(runApp(
{ appId: 'app-2', inputs: {} },
@ -248,7 +255,7 @@ describe('runApp', () => {
it('hitl pause (json): writes JSON envelope to stdout, exits 0', async () => {
mock.setScenario('hitl-pause')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
let exitCode = -1
await expect(runApp(
{ appId: 'app-2', inputs: {}, format: 'json' },
@ -274,7 +281,7 @@ describe('runApp', () => {
it('resume: withHistory: false completes successfully', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, withHistory: false },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -285,7 +292,7 @@ describe('runApp', () => {
it('resume: submits form and streams workflow to completion', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {} },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -296,7 +303,7 @@ describe('runApp', () => {
it('resume --stream: live-prints workflow node progress to stderr', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -307,7 +314,7 @@ describe('runApp', () => {
it('workflow: --file remote URL is passed as remote_url input variable', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', files: ['doc=https://example.com/report.pdf'] },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
@ -326,7 +333,7 @@ describe('runApp', () => {
it('workflow: --file @path uploads file and passes local_file input variable', async () => {
const { writeFile } = await import('node:fs/promises')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
const filePath = join(dir, 'test.pdf')
await writeFile(filePath, 'fake pdf content')
await runApp(
@ -345,7 +352,7 @@ describe('runApp', () => {
it('workflow: --file overrides same-named key from --inputs (file wins)', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: new YamlStore(cachePath(dir, CACHE_APP_INFO)) })
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { doc: 'old-value' }, files: ['doc=https://example.com/override.pdf'] },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },

View File

@ -22,7 +22,6 @@ export default class UseWorkspace extends DifyCommand {
const { args, flags } = this.parse(UseWorkspace, argv)
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'] })
await runUseWorkspace({ workspaceId: args.workspaceId }, {
configDir: ctx.configDir,
bundle: ctx.bundle,
http: ctx.http,
io: ctx.io,

View File

@ -9,6 +9,7 @@ import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { loadHosts, saveHosts } from '../../../auth/hosts.js'
import { ENV_CONFIG_DIR } from '../../../store/dir.js'
import { bufferStreams } from '../../../sys/io/streams.js'
import { runUseWorkspace } from './use.js'
@ -51,23 +52,29 @@ function fakeClient(opts: {
describe('runUseWorkspace', () => {
let configDir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
configDir = await mkdtemp(join(tmpdir(), 'difyctl-use-workspace-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = configDir
})
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(configDir, { recursive: true, force: true })
})
it('happy path: POST /switch → GET /workspaces → write hosts.yml', async () => {
const io = bufferStreams()
const b = bundle()
await saveHosts(configDir, b)
saveHosts(b)
const client = fakeClient({})
const next = await runUseWorkspace(
{ workspaceId: 'ws-2' },
{
configDir,
bundle: b,
http: {} as KyInstance,
io,
@ -82,7 +89,7 @@ describe('runUseWorkspace', () => {
{ id: 'ws-1', name: 'Default', role: 'owner' },
{ id: 'ws-2', name: 'Switched', role: 'normal' },
])
const reloaded = await loadHosts(configDir)
const reloaded = loadHosts()
expect(reloaded?.workspace?.id).toBe('ws-2')
expect(reloaded?.workspace?.name).toBe('Switched')
expect(io.outBuf()).toMatch(/Switched to Switched \(ws-2\)/)
@ -93,15 +100,15 @@ describe('runUseWorkspace', () => {
// We expect saveHosts to record the fresh name from the server.
const io = bufferStreams()
const b = bundle()
await saveHosts(configDir, b)
saveHosts(b)
const client = fakeClient({})
await runUseWorkspace(
{ workspaceId: 'ws-2' },
{ configDir, bundle: b, http: {} as KyInstance, io, workspacesFactory: () => client as never },
{ bundle: b, http: {} as KyInstance, io, workspacesFactory: () => client as never },
)
const reloaded = await loadHosts(configDir)
const reloaded = loadHosts()
expect(reloaded?.workspace?.name).toBe('Switched')
expect(reloaded?.available_workspaces?.find(w => w.id === 'ws-2')?.name).toBe('Switched')
})
@ -109,8 +116,8 @@ describe('runUseWorkspace', () => {
it('does NOT mutate hosts.yml when POST /switch fails', async () => {
const io = bufferStreams()
const b = bundle()
await saveHosts(configDir, b)
const before = await loadHosts(configDir)
saveHosts(b)
const before = loadHosts()
const client = fakeClient({
switch: () => Promise.reject(new Error('forbidden')),
@ -120,7 +127,6 @@ describe('runUseWorkspace', () => {
runUseWorkspace(
{ workspaceId: 'ws-2' },
{
configDir,
bundle: b,
http: {} as KyInstance,
io,
@ -130,7 +136,7 @@ describe('runUseWorkspace', () => {
).rejects.toThrow(/forbidden/)
expect(client.list).not.toHaveBeenCalled()
const after = await loadHosts(configDir)
const after = loadHosts()
expect(after).toEqual(before)
expect(after?.workspace?.id).toBe('ws-1')
})
@ -138,8 +144,8 @@ describe('runUseWorkspace', () => {
it('does NOT mutate hosts.yml when GET /workspaces fails after switch', async () => {
const io = bufferStreams()
const b = bundle()
await saveHosts(configDir, b)
const before = await loadHosts(configDir)
saveHosts(b)
const before = loadHosts()
const client = fakeClient({
list: () => Promise.reject(new Error('transient list failure')),
@ -149,7 +155,6 @@ describe('runUseWorkspace', () => {
runUseWorkspace(
{ workspaceId: 'ws-2' },
{
configDir,
bundle: b,
http: {} as KyInstance,
io,
@ -158,14 +163,14 @@ describe('runUseWorkspace', () => {
),
).rejects.toThrow(/transient list failure/)
const after = await loadHosts(configDir)
const after = loadHosts()
expect(after).toEqual(before)
})
it('throws when server returns switch=<id> but id is missing from /workspaces list', async () => {
const io = bufferStreams()
const b = bundle()
await saveHosts(configDir, b)
saveHosts(b)
const client = fakeClient({
switch: () => Promise.resolve({
@ -187,7 +192,6 @@ describe('runUseWorkspace', () => {
runUseWorkspace(
{ workspaceId: 'ws-7' },
{
configDir,
bundle: b,
http: {} as KyInstance,
io,

View File

@ -13,7 +13,6 @@ export type UseWorkspaceOptions = {
}
export type UseWorkspaceDeps = {
readonly configDir: string
readonly bundle: HostsBundle
readonly http: KyInstance
readonly io: IOStreams
@ -70,7 +69,7 @@ export async function runUseWorkspace(
role: w.role,
})),
}
await saveHosts(deps.configDir, next)
saveHosts(next)
deps.io.out.write(`${cs.successIcon()} Switched to ${matched.name} (${matched.id})\n`)
return next
}

View File

@ -1,48 +1,52 @@
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises'
import type { YamlStore } from '../store/store'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { isBaseError } from '../errors/base'
import { ErrorCode } from '../errors/codes'
import { YamlStore } from '../store/store'
import { ENV_CONFIG_DIR } from '../store/dir'
import { getConfigurationStore } from '../store/manager'
import { loadConfig } from './config-loader'
import { FILE_NAME } from './schema'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
describe('loadConfig', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-cfg-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
afterEach(async () => {
await mkdir(dir, { recursive: true }).catch(() => {})
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('returns found:false when config.yml is missing', () => {
const r = loadConfig(makeStore(dir))
it('returns found:false when config is missing', () => {
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(false)
})
it('parses a minimal valid config.yml', async () => {
await writeFile(join(dir, FILE_NAME), 'schema_version: 1\n', 'utf8')
const r = loadConfig(makeStore(dir))
it('parses a minimal valid config', () => {
getConfigurationStore().setTyped({ schema_version: 1 })
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found)
expect(r.config.schema_version).toBe(1)
})
it('parses defaults + state', async () => {
await writeFile(
join(dir, FILE_NAME),
'schema_version: 1\ndefaults:\n format: json\n limit: 100\nstate:\n current_app: app-1\n',
'utf8',
)
const r = loadConfig(makeStore(dir))
it('parses defaults + state', () => {
getConfigurationStore().setTyped({
schema_version: 1,
defaults: { format: 'json', limit: 100 },
state: { current_app: 'app-1' },
})
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found) {
expect(r.config.defaults.format).toBe('json')
@ -51,11 +55,29 @@ describe('loadConfig', () => {
}
})
it('throws BaseError(config_schema_unsupported) when YAML is malformed', async () => {
await writeFile(join(dir, FILE_NAME), '::not yaml::: {{[', 'utf8')
it('throws BaseError(config_schema_unsupported) when the store fails to parse the file', () => {
// Simulate a corrupt on-disk file via a fake store; loadConfig must wrap
// the underlying error as ConfigSchemaUnsupported.
const throwingStore = {
getTyped: () => { throw new Error('YAML parse failure') },
} as unknown as YamlStore
let caught: unknown
try {
loadConfig(makeStore(dir))
loadConfig(throwingStore)
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
if (isBaseError(caught)) {
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
expect(caught.hint).toMatch(/not valid YAML/)
}
})
it('throws BaseError(config_schema_unsupported) when zod validation fails', () => {
getConfigurationStore().setTyped({ defaults: { limit: 9999 } })
let caught: unknown
try {
loadConfig(getConfigurationStore())
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
@ -63,23 +85,11 @@ describe('loadConfig', () => {
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
})
it('throws BaseError(config_schema_unsupported) when zod validation fails', async () => {
await writeFile(join(dir, FILE_NAME), 'defaults:\n limit: 9999\n', 'utf8')
it('throws BaseError(config_schema_unsupported) when schema_version > 1 (forward-refuse)', () => {
getConfigurationStore().setTyped({ schema_version: 2 })
let caught: unknown
try {
loadConfig(makeStore(dir))
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)
if (isBaseError(caught))
expect(caught.code).toBe(ErrorCode.ConfigSchemaUnsupported)
})
it('throws BaseError(config_schema_unsupported) when schema_version > 1 (forward-refuse)', async () => {
await writeFile(join(dir, FILE_NAME), 'schema_version: 2\n', 'utf8')
let caught: unknown
try {
loadConfig(makeStore(dir))
loadConfig(getConfigurationStore())
}
catch (err) { caught = err }
expect(isBaseError(caught)).toBe(true)

View File

@ -1,10 +1,10 @@
import { describe, expect, it } from 'vitest'
import { CONFIG_FILE_NAME } from '../store/manager.js'
import {
ALLOWED_FORMATS,
ConfigFileSchema,
CURRENT_SCHEMA_VERSION,
emptyConfig,
FILE_NAME,
} from './schema.js'
describe('config schema', () => {
@ -12,8 +12,8 @@ describe('config schema', () => {
expect(CURRENT_SCHEMA_VERSION).toBe(1)
})
it('FILE_NAME is config.yml', () => {
expect(FILE_NAME).toBe('config.yml')
it('CONFIG_FILE_NAME is config.yml', () => {
expect(CONFIG_FILE_NAME).toBe('config.yml')
})
it('ALLOWED_FORMATS matches Go set (json/yaml/table/wide/name/text)', () => {

View File

@ -1,7 +1,6 @@
import { z } from 'zod'
export const CURRENT_SCHEMA_VERSION = 1
export const FILE_NAME = 'config.yml'
export const ALLOWED_FORMATS = ['json', 'yaml', 'table', 'wide', 'name', 'text'] as const
export type AllowedFormat = (typeof ALLOWED_FORMATS)[number]

View File

@ -1,45 +1,57 @@
import { mkdtemp, readdir, readFile, stat } from 'node:fs/promises'
import { mkdtemp, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, describe, expect, it } from 'vitest'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { loadConfig } from '../config/config-loader'
import { emptyConfig, FILE_NAME } from '../config/schema'
import { platform } from '../sys'
import { emptyConfig } from '../config/schema'
import { saveConfig } from './config-writer'
import { YamlStore } from './store'
function makeStore(dir: string): YamlStore {
return new YamlStore(join(dir, FILE_NAME))
}
import { ENV_CONFIG_DIR } from './dir'
import { getConfigurationStore } from './manager'
describe('saveConfig', () => {
let dir: string
let prevConfigDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-w-'))
prevConfigDir = process.env[ENV_CONFIG_DIR]
process.env[ENV_CONFIG_DIR] = dir
})
it('writes config.yml in the target dir', async () => {
saveConfig(makeStore(dir), { ...emptyConfig(), schema_version: 1 })
const stats = await stat(join(dir, FILE_NAME))
expect(stats.isFile()).toBe(true)
afterEach(async () => {
if (prevConfigDir === undefined)
delete process.env[ENV_CONFIG_DIR]
else
process.env[ENV_CONFIG_DIR] = prevConfigDir
await rm(dir, { recursive: true, force: true })
})
it('stamps schema_version=1 even if caller passed 0', () => {
saveConfig(makeStore(dir), { ...emptyConfig() })
const r = loadConfig(makeStore(dir))
saveConfig(getConfigurationStore(), { ...emptyConfig() })
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found)
expect(r.config.schema_version).toBe(1)
})
it('round-trips defaults + state through YAML', () => {
saveConfig(makeStore(dir), {
it('overrides a stale schema_version on save', () => {
saveConfig(getConfigurationStore(), {
...emptyConfig(),
schema_version: 999 as never,
})
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found)
expect(r.config.schema_version).toBe(1)
})
it('round-trips defaults + state', () => {
saveConfig(getConfigurationStore(), {
schema_version: 1,
defaults: { format: 'wide', limit: 75 },
state: { current_app: 'app-xyz' },
})
const r = loadConfig(makeStore(dir))
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found) {
expect(r.config.defaults.format).toBe('wide')
@ -48,39 +60,22 @@ describe('saveConfig', () => {
}
})
it('writes file with mode 0o600 (POSIX)', async () => {
if (platform() === 'win32')
return
saveConfig(makeStore(dir), emptyConfig())
const s = await stat(join(dir, FILE_NAME))
expect(s.mode & 0o777).toBe(0o600)
})
it('does not leave a tmp file on success', async () => {
saveConfig(makeStore(dir), emptyConfig())
const entries = await readdir(dir)
expect(entries.filter(f => f.endsWith('.tmp'))).toHaveLength(0)
expect(entries.filter(f => f.includes('.tmp.'))).toHaveLength(0)
})
it('creates parent dir at 0o700 if absent', async () => {
if (platform() === 'win32')
return
const nested = join(dir, 'nested', 'sub')
saveConfig(makeStore(nested), emptyConfig())
const s = await stat(nested)
expect(s.isDirectory()).toBe(true)
expect(s.mode & 0o777).toBe(0o700)
})
it('emits parseable YAML (round-trip via fs.readFile + js-yaml)', async () => {
saveConfig(makeStore(dir), {
it('overwrites the previous config on resave', () => {
saveConfig(getConfigurationStore(), {
schema_version: 1,
defaults: { format: 'json' },
state: {},
})
const raw = await readFile(join(dir, FILE_NAME), 'utf8')
expect(raw).toMatch(/^schema_version:/m)
expect(raw).toMatch(/format: json/)
saveConfig(getConfigurationStore(), {
schema_version: 1,
defaults: { format: 'table' },
state: { current_app: 'app-2' },
})
const r = loadConfig(getConfigurationStore())
expect(r.found).toBe(true)
if (r.found) {
expect(r.config.defaults.format).toBe('table')
expect(r.config.state.current_app).toBe('app-2')
}
})
})

View File

@ -0,0 +1,109 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
const passwords = new Map<string, string>()
const setPassword = vi.fn()
const getPassword = vi.fn()
const deletePassword = vi.fn()
class FakeEntry {
private readonly key: string
constructor(service: string, username: string) {
this.key = `${service}::${username}`
}
setPassword(value: string): void {
setPassword(this.key, value)
passwords.set(this.key, value)
}
getPassword(): string | null {
getPassword(this.key)
return passwords.get(this.key) ?? null
}
deletePassword(): boolean {
deletePassword(this.key)
if (!passwords.has(this.key))
return false
passwords.delete(this.key)
return true
}
}
vi.mock('@napi-rs/keyring', () => ({
Entry: FakeEntry,
}))
const { KeyringBasedStore } = await import('./store.js')
const SERVICE = 'difyctl-test'
beforeEach(() => {
passwords.clear()
setPassword.mockClear()
getPassword.mockClear()
deletePassword.mockClear()
})
describe('KeyringBasedStore', () => {
it('returns default when entry missing', () => {
const s = new KeyringBasedStore(SERVICE)
expect(s.get({ key: 'k', default: 'fallback' })).toBe('fallback')
})
it('round-trips strings via JSON encoding', () => {
const s = new KeyringBasedStore(SERVICE)
s.set({ key: 'k', default: '' }, 'tok-abc')
expect(s.get({ key: 'k', default: '' })).toBe('tok-abc')
})
it('isolates entries by key', () => {
const s = new KeyringBasedStore(SERVICE)
s.set({ key: 'a', default: '' }, 'A')
s.set({ key: 'b', default: '' }, 'B')
expect(s.get({ key: 'a', default: '' })).toBe('A')
expect(s.get({ key: 'b', default: '' })).toBe('B')
})
it('unset removes the entry', () => {
const s = new KeyringBasedStore(SERVICE)
s.set({ key: 'k', default: '' }, 'v')
s.unset({ key: 'k', default: '' })
expect(s.get({ key: 'k', default: '' })).toBe('')
})
it('unset is a no-op when entry missing', () => {
const s = new KeyringBasedStore(SERVICE)
expect(() => s.unset({ key: 'gone', default: '' })).not.toThrow()
})
it('swallows getPassword exceptions and returns default', () => {
const s = new KeyringBasedStore(SERVICE)
getPassword.mockImplementationOnce(
() => {
throw new Error('NoEntry')
},
)
expect(s.get({ key: 'k', default: 'd' })).toBe('d')
})
it('swallows unset exceptions', () => {
const s = new KeyringBasedStore(SERVICE)
deletePassword.mockImplementationOnce(
() => {
throw new Error('NoEntry')
},
)
expect(() => s.unset({ key: 'k', default: '' })).not.toThrow()
})
it('lets set propagate exceptions (caller decides fallback)', () => {
const s = new KeyringBasedStore(SERVICE)
setPassword.mockImplementationOnce(
() => {
throw new Error('keyring locked')
},
)
expect(() => s.set({ key: 'k', default: '' }, 'v')).toThrow(/keyring locked/)
})
})

View File

@ -0,0 +1,78 @@
import type { Key, Store } from './store.js'
import { describe, expect, it, vi } from 'vitest'
import { getTokenStore } from './manager.js'
function memStore(label: string): Store & { _label: string } {
const map = new Map<string, unknown>()
return {
_label: label,
get<T>(key: Key<T>): T {
return (map.get(key.key) as T | undefined) ?? key.default
},
set<T>(key: Key<T>, value: T): void {
map.set(key.key, value)
},
unset<T>(key: Key<T>): void {
map.delete(key.key)
},
}
}
describe('getTokenStore', () => {
it('returns keychain store when probe succeeds', async () => {
const k = memStore('keyring')
const f = memStore('file')
const result = await getTokenStore({
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('keychain')
expect(result.store).toBe(k)
})
it('falls back to file when keyring set throws', async () => {
const k = memStore('keyring')
const f = memStore('file')
k.set = vi.fn(
() => {
throw new Error('locked')
},
)
const result = await getTokenStore({
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('falls back to file when probe round-trip mismatches', async () => {
const k = memStore('keyring')
const f = memStore('file')
k.get = vi.fn(() => 'something-else')
const result = await getTokenStore({
factory: { keyring: () => k, file: () => f },
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('falls back to file when keyring constructor throws', async () => {
const f = memStore('file')
const result = await getTokenStore({
factory: {
keyring: () => { throw new Error('no backend') },
file: () => f,
},
})
expect(result.mode).toBe('file')
expect(result.store).toBe(f)
})
it('cleans up probe entry after successful probe', async () => {
const k = memStore('keyring')
const f = memStore('file')
await getTokenStore({
factory: { keyring: () => k, file: () => f },
})
expect(k.get({ key: '__difyctl_probe__', default: '' })).toBe('')
})
})

View File

@ -1,28 +1,77 @@
import type { Store } from './store'
import type { Key, StorageMode, Store } from './store'
import { join } from 'node:path'
import { FILE_NAME } from '../config/schema'
import { resolveCacheDir, resolveConfigDir } from './dir'
import { YamlStore } from './store'
import { KeyringBasedStore, YamlStore } from './store'
export const CACHE_APP_INFO = 'app-info'
export const CACHE_NUDGE = 'nudge'
const HOSTS_FILE = 'hosts.yml'
const TOKENS_FILE = 'tokens.yml'
export const CONFIG_FILE_NAME = 'config.yml'
const KEYRING_SERVICE = 'difyctl'
function getStore(filePath: string): YamlStore {
return new YamlStore(filePath)
}
function resolveConfigurationPath(): string {
return join(resolveConfigDir(), FILE_NAME)
}
export function cachePath(cacheDir: string, name: string): string {
return join(cacheDir, `${name}.yml`)
}
export function getConfigurationStore(): YamlStore {
return getStore(resolveConfigurationPath())
return getStore(join(resolveConfigDir(), CONFIG_FILE_NAME))
}
export function getCache(cacheName: string): Store {
return getStore(cachePath(resolveCacheDir(), cacheName))
}
export function getHostStore(): YamlStore {
return getStore(join(resolveConfigDir(), HOSTS_FILE))
}
const PROBE_KEY: Key<string> = { key: '__difyctl_probe__', default: '' }
const PROBE_VALUE = 'probe-v1'
export type GetTokenStoreOptions = {
readonly factory?: {
readonly keyring?: () => Store
readonly file?: () => Store
}
}
/**
* Single entry point for the credential store. Probes the OS keyring; if it
* round-trips a value, returns the keychain-backed store. Otherwise falls
* back to the YAML file at `<configDir>/tokens.yml`. Both implementations
* satisfy the `Store` interface, so callers interact uniformly.
*
* Business logic should always obtain the token store through this factory
* rather than constructing one directly.
*/
export async function getTokenStore(opts: GetTokenStoreOptions = {}): Promise<{ store: Store, mode: StorageMode }> {
const fileFactory = opts.factory?.file ?? (() => getStore(join(resolveConfigDir(), TOKENS_FILE)))
const keyringFactory = opts.factory?.keyring ?? (() => new KeyringBasedStore(KEYRING_SERVICE))
try {
const k = keyringFactory()
k.set(PROBE_KEY, PROBE_VALUE)
const got = k.get(PROBE_KEY)
k.unset(PROBE_KEY)
if (got !== PROBE_VALUE)
throw new Error('keyring round-trip mismatch')
return { store: k, mode: 'keychain' }
}
catch {
return { store: fileFactory(), mode: 'file' }
}
}
/**
* Maps an auth identity (host + accountId) to a `Store` key. All token store
* reads/writes in business logic go through this helper so the on-disk /
* keyring layout stays consistent.
*/
export function tokenKey(host: string, accountId: string): Key<string> {
return { key: `tokens.${host}.${accountId}`, default: '' }
}

View File

@ -1,6 +1,7 @@
import type { Platform } from '../sys'
import fs from 'node:fs'
import { dirname } from 'node:path'
import { Entry } from '@napi-rs/keyring'
import yaml from 'js-yaml'
import lockfile from 'lockfile'
import { pid, resolvePlatform } from '../sys'
@ -8,7 +9,7 @@ import { pid, resolvePlatform } from '../sys'
const FILE_PERM = 0o600
const DIR_PERM = 0o700
type Key<T> = {
export type Key<T> = {
default: T
key: string
}
@ -16,8 +17,11 @@ type Key<T> = {
export type Store = {
get: <T>(key: Key<T>) => T
set: <T>(key: Key<T>, value: T) => void
unset: <T>(key: Key<T>) => void
}
export type StorageMode = 'keychain' | 'file'
export class ConcurrentAccessError extends Error {
constructor(filePath: string) {
super(`Another process is modifying the file ${filePath}. remove ${filePath}.lock to reset lock.`)
@ -87,7 +91,6 @@ abstract class FileBasedStore implements Store {
protected withLock<R>(body: () => R): R {
this.lock()
try {
this.load()
return body()
}
finally {
@ -96,18 +99,44 @@ abstract class FileBasedStore implements Store {
}
get<T>(key: Key<T>): T {
return this.withLock(() => this.doGet(key))
return this.withLock(() => {
this.load()
return this.doGet(key)
})
}
set<T>(key: Key<T>, value: T) {
this.withLock(() => {
this.load()
this.doSet(key, value)
this.flush()
})
}
unset<T>(key: Key<T>): void {
this.withLock(() => {
this.load()
this.doUnset(key)
this.flush()
})
}
/**
* Remove the underlying file of the store. No-op if file doesn't exist.
*/
rm(): void {
try {
fs.unlinkSync(this.file_path)
}
catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'ENOENT')
throw err
}
}
abstract doGet<T>(key: Key<T>): T
abstract doSet<T>(key: Key<T>, value: T): void
abstract doUnset<T>(key: Key<T>): void
}
export class YamlStore extends FileBasedStore {
@ -136,6 +165,7 @@ export class YamlStore extends FileBasedStore {
setTyped<T>(data: T): void {
this.withLock(() => {
this.load()
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
this.flush()
})
@ -156,6 +186,25 @@ export class YamlStore extends FileBasedStore {
current[lastKey] = value
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
}
doUnset<T>(key: Key<T>): void {
const data = loadYaml(this.raw_content) || {}
const parts = key.key.split('.')
const lastKey = parts.pop()
if (lastKey === undefined)
return
let current: Record<string, unknown> = data
for (const part of parts) {
const next = current[part]
if (next === null || next === undefined || typeof next !== 'object')
return
current = next as Record<string, unknown>
}
if (!(lastKey in current))
return
delete current[lastKey]
this.raw_content = yaml.dump(data, { lineWidth: -1, noRefs: true })
}
}
function loadYaml(raw: string | undefined): Record<string, unknown> | null {
@ -163,3 +212,39 @@ function loadYaml(raw: string | undefined): Record<string, unknown> | null {
return null
return (yaml.load(raw) ?? {}) as Record<string, unknown>
}
/**
* OS-keyring-based storage primitive. Sits at the same layer as
* `FileBasedStore`: implements `Store` with each `Key<T>` corresponding to a
* single keyring entry under the configured service. Values are JSON-encoded.
*/
export class KeyringBasedStore implements Store {
private readonly service: string
constructor(service: string) {
this.service = service
}
get<T>(key: Key<T>): T {
try {
const v = new Entry(this.service, key.key).getPassword()
if (v === null || v === undefined || v === '')
return key.default
return JSON.parse(v) as T
}
catch {
return key.default
}
}
set<T>(key: Key<T>, value: T): void {
new Entry(this.service, key.key).setPassword(JSON.stringify(value))
}
unset<T>(key: Key<T>): void {
try {
new Entry(this.service, key.key).deletePassword()
}
catch { /* missing entry is fine */ }
}
}

View File

@ -5,8 +5,8 @@ import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { loadNudgeStore } from '../cache/nudge-store.js'
import { CACHE_NUDGE, cachePath } from '../store/manager.js'
import { YamlStore } from '../store/store.js'
import { ENV_CACHE_DIR } from '../store/dir.js'
import { CACHE_NUDGE, getCache } from '../store/manager.js'
import { maybeNudgeCompat } from './nudge.js'
const HOST = 'https://cloud.dify.ai'
@ -44,11 +44,18 @@ describe('maybeNudgeCompat', () => {
let dir: string
let store: NudgeStore
let prevCacheDir: string | undefined
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'difyctl-nudge-'))
store = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: fixedNow })
prevCacheDir = process.env[ENV_CACHE_DIR]
process.env[ENV_CACHE_DIR] = dir
store = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: fixedNow })
})
afterEach(async () => {
if (prevCacheDir === undefined)
delete process.env[ENV_CACHE_DIR]
else
process.env[ENV_CACHE_DIR] = prevCacheDir
await rm(dir, { recursive: true, force: true })
})
@ -78,12 +85,12 @@ describe('maybeNudgeCompat', () => {
it('warns again after the silence window has elapsed', async () => {
const yesterday = new Date(NOW.getTime() - 25 * 60 * 60 * 1000)
const tStore = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: () => yesterday })
const tStore = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: () => yesterday })
await tStore.markWarned(HOST)
const probe = vi.fn(async () => UNSUPPORTED)
const { emit, lines } = emitterSpy()
const freshStore = await loadNudgeStore({ store: new YamlStore(cachePath(dir, CACHE_NUDGE)), now: fixedNow })
const freshStore = await loadNudgeStore({ store: getCache(CACHE_NUDGE), now: fixedNow })
await maybeNudgeCompat(HOST, baseDeps({ store: freshStore, probe, emit }))
expect(probe).toHaveBeenCalledOnce()

View File

@ -160,7 +160,8 @@ describe('runVersionProbe', () => {
const url = new URL(mock.url)
const prevConfig = process.env[ENV_CONFIG_DIR]
try {
await saveHosts(configDir, {
process.env[ENV_CONFIG_DIR] = configDir
saveHosts({
current_host: url.host,
scheme: url.protocol.replace(':', ''),
token_storage: 'file',

View File

@ -5,7 +5,6 @@ import type { Channel } from './info.js'
import { META_PROBE_TIMEOUT_MS, MetaClient } from '../api/meta.js'
import { loadHosts } from '../auth/hosts.js'
import { createClient } from '../http/client.js'
import { resolveConfigDir } from '../store/dir.js'
import { arch, platform } from '../sys/index.js'
import { hostWithScheme } from '../util/host.js'
import { difyCompat, evaluateCompat } from './compat.js'
@ -48,7 +47,7 @@ export type RunVersionProbeOptions = {
readonly probe?: MetaProbe
}
const defaultLoadBundle = async (): Promise<HostsBundle | undefined> => loadHosts(resolveConfigDir())
const defaultLoadBundle = async (): Promise<HostsBundle | undefined> => loadHosts()
const defaultProbe: MetaProbe = async (endpoint) => {
const http = createClient({ host: endpoint, timeoutMs: META_PROBE_TIMEOUT_MS, retryAttempts: 0 })