feat(cli): app run overhaul — always stream, --inputs JSON, HITL pause/resume, Ctrl+C stop

- Remove blocking mode; all apps stream SSE, --stream controls live vs collect output
- Replace --input k=v with --inputs '{json}' (single object, mutually exclusive with --inputs-file)
- Add --workflow-id, --file flags
- HITL: human_input_required → pause JSON to stdout + hint to stderr + exit 2
- Ctrl+C: captures task_id from SSE, calls stop-task, exits 1
- New difyctl run app resume subcommand: POST form, reconnect SSE, stream to completion
- resume: --action (auto-select), --with-history (include_state_snapshot), --stream flags
- Delete BlockingStrategy; simplify pickStrategy(isText, livePrint)
- Add HitlPauseError, SILENT_EVENTS handling in sse-collector and stream-handlers
- Update dify-mock: always SSE, hitl-pause/hitl-resume scenarios, stop/form/events handlers
- Update agent guide: --inputs JSON syntax, HITL pause/resume instructions
This commit is contained in:
GareArc
2026-05-15 02:53:19 -07:00
parent 8be6665d22
commit 31cf656b35
20 changed files with 895 additions and 267 deletions

View File

@ -5,21 +5,21 @@ import { createClient } from '../http/client.js'
import { AppRunClient, buildRunBody } from './app-run.js'
describe('buildRunBody', () => {
it('sets response_mode=blocking by default', () => {
expect(buildRunBody({}).response_mode).toBe('blocking')
it('does not include response_mode', () => {
expect('response_mode' in buildRunBody({})).toBe(false)
})
it('omits query when message empty', () => {
expect('query' in buildRunBody({})).toBe(false)
})
it('includes query when message present', () => {
it('maps message → query', () => {
expect(buildRunBody({ message: 'hi' }).query).toBe('hi')
})
it('passes through inputs', () => {
const body = buildRunBody({ inputs: { a: '1' } })
expect(body.inputs).toEqual({ a: '1' })
const body = buildRunBody({ inputs: { a: '1', b: 42 } })
expect(body.inputs).toEqual({ a: '1', b: 42 })
})
it('omits conversation_id when missing/empty', () => {
@ -29,39 +29,22 @@ describe('buildRunBody', () => {
it('includes workspace_id when set', () => {
expect(buildRunBody({ workspaceId: 'ws-1' }).workspace_id).toBe('ws-1')
})
})
describe('AppRunClient.runBlocking', () => {
let mock: DifyMock
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
})
afterEach(async () => {
await mock.stop()
it('includes workflow_id when workflowId provided', () => {
expect(buildRunBody({ workflowId: 'wf-abc' }).workflow_id).toBe('wf-abc')
})
it('returns chat-shaped envelope for chat app', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const out = await c.runBlocking('app-1', buildRunBody({ message: 'hi' }))
expect(out.mode).toBe('chat')
expect(out.answer).toBe('echo: hi')
it('omits workflow_id when workflowId empty', () => {
expect('workflow_id' in buildRunBody({ workflowId: '' })).toBe(false)
})
it('returns workflow-shaped envelope for workflow app', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const out = await c.runBlocking('app-2', buildRunBody({ inputs: { x: '1' } }))
expect((out.data as { status: string }).status).toBe('succeeded')
it('includes files when provided and non-empty', () => {
const files = [{ type: 'image', url: 'http://example.com/img.png' }]
expect(buildRunBody({ files }).files).toEqual(files)
})
it('404 unknown app surfaces as error', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 }))
await expect(c.runBlocking('nope', buildRunBody({}))).rejects.toThrow()
})
})
describe('buildRunBody response_mode override', () => {
it('sets response_mode=streaming when requested', () => {
expect(buildRunBody({ responseMode: 'streaming' }).response_mode).toBe('streaming')
it('omits files when empty array', () => {
expect('files' in buildRunBody({ files: [] })).toBe(false)
})
})
@ -76,7 +59,7 @@ describe('AppRunClient.runStream', () => {
it('yields events for chat app', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const iter = await c.runStream('app-1', buildRunBody({ message: 'hi', responseMode: 'streaming' }))
const iter = await c.runStream('app-1', buildRunBody({ message: 'hi' }))
const dec = new TextDecoder()
const names: string[] = []
const datas: string[] = []
@ -93,7 +76,7 @@ describe('AppRunClient.runStream', () => {
mock.setScenario('server-5xx')
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 }))
await expect(
c.runStream('app-1', buildRunBody({ message: 'hi', responseMode: 'streaming' })),
c.runStream('app-1', buildRunBody({ message: 'hi' })),
).rejects.toMatchObject({ code: 'server_5xx' })
})
@ -101,7 +84,7 @@ describe('AppRunClient.runStream', () => {
expect.assertions(1)
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const ctrl = new AbortController()
const iter = await c.runStream('app-1', buildRunBody({ message: 'hi', responseMode: 'streaming' }), { signal: ctrl.signal })
const iter = await c.runStream('app-1', buildRunBody({ message: 'hi' }), { signal: ctrl.signal })
ctrl.abort()
try {
for await (const _ of iter) { /* drain */ }
@ -113,10 +96,42 @@ describe('AppRunClient.runStream', () => {
it('derives event name from JSON event field when SSE event line absent', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
const iter = await c.runStream('app-2', buildRunBody({ inputs: { x: '1' }, responseMode: 'streaming' }))
const iter = await c.runStream('app-2', buildRunBody({ inputs: { x: '1' } }))
const names: string[] = []
for await (const ev of iter)
names.push(ev.name)
expect(names).toEqual(['workflow_started', 'node_started', 'node_finished', 'workflow_finished'])
})
})
describe('AppRunClient.stopTask', () => {
let mock: DifyMock
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
})
afterEach(async () => {
await mock.stop()
})
it('resolves without error for known app and task', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
await expect(c.stopTask('app-1', 'task-42')).resolves.toBeUndefined()
})
})
describe('AppRunClient.submitHumanInput', () => {
let mock: DifyMock
beforeEach(async () => {
mock = await startMock({ scenario: 'happy' })
})
afterEach(async () => {
await mock.stop()
})
it('resolves without error', async () => {
const c = new AppRunClient(createClient({ host: mock.url, bearer: 'dfoa_test' }))
await expect(
c.submitHumanInput('app-1', 'tok-abc', 'approve', { comment: 'looks good' }),
).resolves.toBeUndefined()
})
})

View File

@ -3,22 +3,18 @@ import type { SseEvent } from '../http/sse.js'
import { normalizeDifyStream } from '../http/sse-dify.js'
import { parseSSE } from '../http/sse.js'
export type RunResponse = Record<string, unknown>
export type ResponseMode = 'blocking' | 'streaming'
export type RunBodyArgs = {
readonly message?: string
readonly inputs?: Readonly<Record<string, string>>
readonly inputs?: Readonly<Record<string, unknown>>
readonly conversationId?: string
readonly workspaceId?: string
readonly responseMode?: ResponseMode
readonly workflowId?: string
readonly files?: readonly Record<string, unknown>[]
}
export function buildRunBody(args: RunBodyArgs): Record<string, unknown> {
const body: Record<string, unknown> = {
inputs: args.inputs ?? {},
response_mode: args.responseMode ?? 'blocking',
}
if (args.message !== undefined && args.message !== '')
body.query = args.message
@ -26,11 +22,16 @@ export function buildRunBody(args: RunBodyArgs): Record<string, unknown> {
body.conversation_id = args.conversationId
if (args.workspaceId !== undefined && args.workspaceId !== '')
body.workspace_id = args.workspaceId
if (args.workflowId !== undefined && args.workflowId !== '')
body.workflow_id = args.workflowId
if (args.files !== undefined && args.files.length > 0)
body.files = args.files
return body
}
export type RunStreamOptions = {
export type StreamOptions = {
signal?: AbortSignal
includeStateSnapshot?: boolean
}
export class AppRunClient {
@ -40,25 +41,60 @@ export class AppRunClient {
this.http = http
}
async runBlocking(appId: string, body: Record<string, unknown>): Promise<RunResponse> {
const raw = await this.http.post(`apps/${encodeURIComponent(appId)}/run`, { json: body }).json()
return raw as RunResponse
}
async runStream(
appId: string,
body: Record<string, unknown>,
opts: RunStreamOptions = {},
opts: StreamOptions = {},
): Promise<AsyncIterable<SseEvent>> {
const path = `apps/${encodeURIComponent(appId)}/run`
const res = await this.http.post(path, {
const res = await this.http.post(`apps/${encodeURIComponent(appId)}/run`, {
json: body,
headers: { Accept: 'text/event-stream' },
retry: { limit: 0 },
timeout: false,
signal: opts.signal,
})
if (res.body === null)
throw new Error('streaming response body missing')
return normalizeDifyStream(parseSSE(res.body, opts.signal))
}
async stopTask(appId: string, taskId: string): Promise<void> {
await this.http.post(`apps/${encodeURIComponent(appId)}/tasks/${encodeURIComponent(taskId)}/stop`, {
json: {},
timeout: 30_000,
})
}
async submitHumanInput(
appId: string,
formToken: string,
action: string,
inputs: Record<string, unknown>,
): Promise<void> {
await this.http.post(
`apps/${encodeURIComponent(appId)}/form/human_input/${encodeURIComponent(formToken)}`,
{ json: { action, inputs }, timeout: 30_000 },
)
}
async reconnectStream(
appId: string,
workflowRunId: string,
opts: StreamOptions = {},
): Promise<AsyncIterable<SseEvent>> {
const url = `apps/${encodeURIComponent(appId)}/tasks/${encodeURIComponent(workflowRunId)}/events`
const res = await this.http.get(url, {
searchParams: {
include_state_snapshot: opts.includeStateSnapshot === true ? 'true' : 'false',
continue_on_pause: 'false',
},
headers: { Accept: 'text/event-stream' },
retry: { limit: 0 },
timeout: false,
signal: opts.signal,
})
if (res.body === null)
throw new Error('reconnect stream body missing')
return normalizeDifyStream(parseSSE(res.body, opts.signal))
}
}

View File

@ -1,29 +0,0 @@
import type { RunContext, RunStrategy } from './index.js'
import { buildRunBody } from '../../../../api/app-run.js'
import { runWithSpinner } from '../../../../io/spinner.js'
import { chatConversationHint, newAppRunObject, RUN_MODES } from '../handlers.js'
const CHAT_MODES: ReadonlySet<string> = new Set([RUN_MODES.Chat, RUN_MODES.AgentChat, RUN_MODES.AdvancedChat])
export class BlockingStrategy implements RunStrategy {
async execute(ctx: RunContext): Promise<void> {
const { opts, deps, mode, format, printFlags } = ctx
const body = buildRunBody({
message: opts.message,
inputs: opts.inputs,
conversationId: opts.conversationId,
workspaceId: opts.workspace,
})
const resp = await runWithSpinner(
{ io: deps.io, label: 'Running app', enabled: ctx.isText },
() => ctx.runClient.runBlocking(opts.appId, body),
)
const respMode = typeof resp.mode === 'string' && resp.mode !== '' ? resp.mode : mode
deps.io.out.write(printFlags.toPrinter(format).print(newAppRunObject(respMode, resp)))
if (ctx.isText && CHAT_MODES.has(respMode)) {
const hint = chatConversationHint(resp)
if (hint !== undefined)
deps.io.err.write(hint)
}
}
}

View File

@ -1,31 +1,28 @@
import type { AppRunClient } from '../../../../api/app-run.js'
import type { AppRunPrintFlags } from '../print-flags.js'
import type { RunAppDeps, RunAppOptions } from '../run.js'
import { BlockingStrategy } from './blocking.js'
import { StreamingStructuredStrategy } from './streaming-structured.js'
import { StreamingTextStrategy } from './streaming-text.js'
export type RunContext = {
readonly opts: RunAppOptions
readonly opts: RunAppOptions & { inputs: Record<string, unknown> }
readonly deps: RunAppDeps
readonly mode: string
readonly isAgent: boolean
readonly format: string
readonly isText: boolean
readonly livePrint: boolean
readonly runClient: AppRunClient
readonly printFlags: AppRunPrintFlags
readonly exit: (code: number) => never
}
export type RunStrategy = {
execute: (ctx: RunContext) => Promise<void>
}
const blocking = new BlockingStrategy()
const streamingText = new StreamingTextStrategy()
const streamingStructured = new StreamingStructuredStrategy()
export function pickStrategy(useStream: boolean, isText: boolean): RunStrategy {
if (!useStream)
return blocking
return isText ? streamingText : streamingStructured
export function pickStrategy(isText: boolean, livePrint: boolean): RunStrategy {
return isText && livePrint ? streamingText : streamingStructured
}

View File

@ -1,28 +1,103 @@
import type { SseEvent } from '../../../../http/sse.js'
import type { HitlPausePayload } from '../sse-collector.js'
import type { RunContext, RunStrategy } from './index.js'
import { buildRunBody } from '../../../../api/app-run.js'
import { newAppRunObject } from '../handlers.js'
import { collect } from '../sse-collector.js'
import { chatConversationHint, newAppRunObject, RUN_MODES } from '../handlers.js'
import { collect, HitlPauseError } from '../sse-collector.js'
const CHAT_MODES: ReadonlySet<string> = new Set([RUN_MODES.Chat, RUN_MODES.AgentChat, RUN_MODES.AdvancedChat])
function buildHitlExitJson(appId: string, payload: HitlPausePayload): string {
return JSON.stringify({
status: 'paused',
app_id: appId,
task_id: payload.task_id,
workflow_run_id: payload.workflow_run_id,
form_token: payload.form_token,
form_content: payload.form_content,
inputs: payload.inputs,
resolved_default_values: payload.resolved_default_values,
user_actions: payload.user_actions,
expiration_time: payload.expiration_time,
})
}
function hitlResumeHint(appId: string, payload: HitlPausePayload): string {
let hint = `hint: workflow paused — resume with: difyctl run app resume ${appId} ${payload.form_token} --workflow-run-id ${payload.workflow_run_id}`
const actions = payload.user_actions as { id: string }[]
if (actions.length > 1) {
const firstAction = actions[0]?.id
if (firstAction !== undefined)
hint += ` --action ${firstAction}`
}
return `${hint}\n`
}
async function* captureTaskId(
iter: AsyncIterable<SseEvent>,
onCapture: (id: string) => void,
): AsyncIterable<SseEvent> {
const dec = new TextDecoder()
for await (const ev of iter) {
if (ev.data.byteLength > 0) {
try {
const parsed = JSON.parse(dec.decode(ev.data)) as Record<string, unknown>
if (typeof parsed.task_id === 'string' && parsed.task_id !== '')
onCapture(parsed.task_id)
}
catch { /* ignore parse errors */ }
}
yield ev
}
}
export class StreamingStructuredStrategy implements RunStrategy {
async execute(ctx: RunContext): Promise<void> {
const { opts, deps, mode, format, printFlags } = ctx
const { opts, deps, mode, format, isText, printFlags, exit } = ctx
const ctrl = new AbortController()
const body = buildRunBody({
message: opts.message,
inputs: opts.inputs,
inputs: opts.inputs as Record<string, unknown>,
conversationId: opts.conversationId,
workspaceId: opts.workspace,
responseMode: 'streaming',
workflowId: opts.workflowId,
})
let taskId: string | undefined
const cleanup = () => {
if (taskId !== undefined)
void ctx.runClient.stopTask(opts.appId, taskId).catch(() => {})
ctrl.abort()
exit(1)
}
process.once('SIGINT', cleanup)
let resp: Record<string, unknown>
try {
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal })
resp = await collect(events, mode)
const wrappedEvents = captureTaskId(events, (id) => {
taskId = id
})
resp = await collect(wrappedEvents, mode)
}
catch (err) {
ctrl.abort()
if (err instanceof HitlPauseError) {
deps.io.out.write(`${buildHitlExitJson(opts.appId, err.pausePayload)}\n`)
deps.io.err.write(hitlResumeHint(opts.appId, err.pausePayload))
exit(2)
}
throw err
}
deps.io.out.write(printFlags.toPrinter(format).print(newAppRunObject(mode, resp)))
finally {
process.off('SIGINT', cleanup)
}
const respMode = typeof resp.mode === 'string' && resp.mode !== '' ? resp.mode : mode
deps.io.out.write(printFlags.toPrinter(format).print(newAppRunObject(respMode, resp)))
if (isText && CHAT_MODES.has(respMode)) {
const hint = chatConversationHint(resp)
if (hint !== undefined)
deps.io.err.write(hint)
}
}
}

View File

@ -1,27 +1,83 @@
import type { HitlPausePayload } from '../sse-collector.js'
import type { RunContext, RunStrategy } from './index.js'
import { buildRunBody } from '../../../../api/app-run.js'
import { decodeStreamError } from '../sse-collector.js'
import { decodeStreamError, HitlPauseError } from '../sse-collector.js'
function buildHitlExitJson(appId: string, payload: HitlPausePayload): string {
return JSON.stringify({
status: 'paused',
app_id: appId,
task_id: payload.task_id,
workflow_run_id: payload.workflow_run_id,
form_token: payload.form_token,
form_content: payload.form_content,
inputs: payload.inputs,
resolved_default_values: payload.resolved_default_values,
user_actions: payload.user_actions,
expiration_time: payload.expiration_time,
})
}
function hitlResumeHint(appId: string, payload: HitlPausePayload): string {
let hint = `hint: workflow paused — resume with: difyctl run app resume ${appId} ${payload.form_token} --workflow-run-id ${payload.workflow_run_id}`
const actions = payload.user_actions as { id: string }[]
if (actions.length > 1) {
const firstAction = actions[0]?.id
if (firstAction !== undefined)
hint += ` --action ${firstAction}`
}
return `${hint}\n`
}
export class StreamingTextStrategy implements RunStrategy {
async execute(ctx: RunContext): Promise<void> {
const { opts, deps, mode, printFlags } = ctx
const { opts, deps, mode, printFlags, exit } = ctx
const ctrl = new AbortController()
const body = buildRunBody({
message: opts.message,
inputs: opts.inputs,
inputs: opts.inputs as Record<string, unknown>,
conversationId: opts.conversationId,
workspaceId: opts.workspace,
responseMode: 'streaming',
workflowId: opts.workflowId,
})
let taskId: string | undefined
const cleanup = () => {
if (taskId !== undefined)
void ctx.runClient.stopTask(opts.appId, taskId).catch(() => {})
ctrl.abort()
exit(1)
}
process.once('SIGINT', cleanup)
try {
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal })
const sp = printFlags.toStreamPrinter(mode)
const dec = new TextDecoder()
for await (const ev of events) {
if (ev.name === 'ping')
continue
if (ev.name === 'error')
throw decodeStreamError(ev.data)
sp.onEvent(deps.io.out, deps.io.err, ev)
if (ev.data.byteLength > 0) {
try {
const parsed = JSON.parse(dec.decode(ev.data)) as Record<string, unknown>
if (typeof parsed.task_id === 'string' && parsed.task_id !== '' && taskId === undefined)
taskId = parsed.task_id
}
catch { /* ignore */ }
}
try {
sp.onEvent(deps.io.out, deps.io.err, ev)
}
catch (err) {
if (err instanceof HitlPauseError) {
deps.io.out.write(`${buildHitlExitJson(opts.appId, err.pausePayload)}\n`)
deps.io.err.write(hitlResumeHint(opts.appId, err.pausePayload))
exit(2)
}
throw err
}
}
sp.onEnd(deps.io.out, deps.io.err)
}
@ -29,5 +85,8 @@ export class StreamingTextStrategy implements RunStrategy {
ctrl.abort()
throw err
}
finally {
process.off('SIGINT', cleanup)
}
}
}

View File

@ -6,25 +6,33 @@ WORKFLOW
2. Run the app:
difyctl run app <id> "your message"
difyctl run app <id> "your message" -o json
difyctl run app <id> --inputs '{"key":"value"}' -o json
APP MODES
chat / advanced-chat Conversational. Accepts --conversation <id> to
resume an existing thread.
completion Single-turn. Ignores --conversation.
workflow Multi-step graph. Use --input key=val for each
input variable the workflow declares.
agent-chat Always streams regardless of --stream flag.
workflow Multi-step graph. Pass all input variables as a
JSON object via --inputs.
agent-chat Conversational with autonomous tool use.
FLAGS
--input key=val Pass named inputs. Repeatable. Required for
workflow apps that declare input variables.
--input language=English --input topic="AI safety"
--stream Request SSE streaming. Recommended for runs
exceeding ~30s. Agent apps stream regardless.
--inputs '{"k":"v"}' All input variables as one JSON object.
--inputs '{"language":"English","topic":"AI safety"}'
--inputs-file path Load inputs from a JSON file. Mutually exclusive
with --inputs.
--stream Print output live as tokens/events arrive.
--conversation <id> Resume a conversation (chat/advanced-chat only).
--workspace <id> Target a specific workspace.
HITL PAUSE (exit code 2)
When a workflow pauses for human input, stdout receives a JSON object
with status "paused", form_token, workflow_run_id, and resolved_default_values.
Resume with:
difyctl run app resume <app_id> <form_token> --workflow-run-id <id>
You can supply form values by:
difyctl run app resume <app_id> <form_token> --workflow-run-id <id> --inputs '{"name":"Alice"}'
ERROR RECOVERY
not logged in difyctl auth login
app not found (404) difyctl get app

View File

@ -1,6 +1,4 @@
import { Args, Flags } from '@oclif/core'
import { BaseError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { DifyCommand } from '../../_shared/dify-command.js'
import { httpRetryFlag } from '../../_shared/global-flags.js'
import { agentGuide } from './guide.js'
@ -12,7 +10,8 @@ export default class RunApp extends DifyCommand {
static override examples = [
'<%= config.bin %> run app app-1 "hello"',
'<%= config.bin %> run app app-1 --input name=world',
'<%= config.bin %> run app app-1 --inputs \'{"name":"world"}\'',
'<%= config.bin %> run app app-1 --inputs-file inputs.json',
'<%= config.bin %> run app app-1 --stream',
'<%= config.bin %> run app app-1 -o json',
]
@ -23,50 +22,34 @@ export default class RunApp extends DifyCommand {
}
static override flags = {
'input': Flags.string({ description: 'app input (--input k=v, repeatable)', multiple: true, default: [] }),
'conversation': Flags.string({ description: 'resume a chat conversation by id' }),
'workspace': Flags.string({ description: 'workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }),
'stream': Flags.boolean({
description: 'request streaming SSE; recommended for runs that may exceed ~30s. Agent apps stream regardless.',
default: false,
}),
'inputs': Flags.string({ description: 'Input variables as a JSON object, e.g. --inputs \'{"key":"value"}\'. Mutually exclusive with --inputs-file.' }),
'inputs-file': Flags.string({ description: 'Path to a JSON file containing the inputs object. Mutually exclusive with --inputs.' }),
'file': Flags.string({ description: 'Named file input (--file key=@path, repeatable)', multiple: true, default: [] }),
'conversation': Flags.string({ description: 'Resume a chat conversation by id' }),
'workflow-id': Flags.string({ description: 'Pin to a specific published workflow version' }),
'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }),
'http-retry': httpRetryFlag,
'output': Flags.string({ char: 'o', description: 'output format (json|yaml|text)', default: '' }),
'output': Flags.string({ char: 'o', description: 'Output format (json|yaml|text)', default: '' }),
}
async run(): Promise<void> {
const { args, flags, raw } = await this.parse(RunApp)
const { args, flags } = await this.parse(RunApp)
const format = flags.output
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'], withCache: true, format })
const inputs = parseInputs(flags.input)
const streamSetExplicitly = raw.some(t => t.type === 'flag' && t.flag === 'stream')
await runApp(
{
appId: args.id,
message: args.message,
inputs,
inputsJson: flags.inputs,
inputsFile: flags['inputs-file'],
conversationId: flags.conversation,
workflowId: flags['workflow-id'],
workspace: flags.workspace,
format,
stream: flags.stream,
streamSetExplicitly,
},
{ bundle: ctx.bundle, http: ctx.http, host: ctx.host, io: ctx.io, cache: ctx.cache },
)
}
}
function parseInputs(raw: readonly string[]): Record<string, string> {
const out: Record<string, string> = {}
for (const item of raw) {
const eq = item.indexOf('=')
if (eq <= 0) {
throw new BaseError({
code: ErrorCode.UsageInvalidFlag,
message: `--input expects key=value, got ${JSON.stringify(item)}`,
})
}
out[item.slice(0, eq)] = item.slice(eq + 1)
}
return out
}

View File

@ -0,0 +1,52 @@
import { Args, Flags } from '@oclif/core'
import { DifyCommand } from '../../../_shared/dify-command.js'
import { httpRetryFlag } from '../../../_shared/global-flags.js'
import { resumeApp } from './run.js'
export default class RunAppResume extends DifyCommand {
static override description = 'Resume a paused workflow app after submitting a human input form'
static override examples = [
'<%= config.bin %> run app resume app-1 ft-abc --workflow-run-id wf-run-1 --action submit --inputs \'{"name":"Alice"}\'',
'<%= config.bin %> run app resume app-1 ft-abc --workflow-run-id wf-run-1 --inputs-file form.json',
]
static override args = {
id: Args.string({ description: 'app id', required: true }),
formToken: Args.string({ description: 'form token from the HITL pause JSON', required: true }),
}
static override flags = {
'workflow-run-id': Flags.string({ description: 'workflow_run_id from the HITL pause JSON', required: true }),
'action': Flags.string({ description: 'user action id (auto-selected when form has exactly one action)' }),
'inputs': Flags.string({ description: 'Input variables as a JSON object, e.g. --inputs \'{"key":"value"}\'. Mutually exclusive with --inputs-file.' }),
'inputs-file': Flags.string({ description: 'Path to a JSON file containing the inputs object. Mutually exclusive with --inputs.' }),
'workspace': Flags.string({ description: 'workspace id override' }),
'with-history': Flags.boolean({ description: 'Replay executed-node history before attaching to live stream.', default: false }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive. Default: collect and print at end.', default: false }),
'output': Flags.string({ char: 'o', description: 'output format (json|yaml|text)', default: '' }),
'http-retry': httpRetryFlag,
}
async run(): Promise<void> {
const { args, flags } = await this.parse(RunAppResume)
const format = flags.output
const ctx = await this.authedCtx({ retryFlag: flags['http-retry'], withCache: true, format })
await resumeApp(
{
appId: args.id,
formToken: args.formToken,
workflowRunId: flags['workflow-run-id'],
action: flags.action,
inputsJson: flags.inputs,
inputsFile: flags['inputs-file'],
format,
workspace: flags.workspace,
withHistory: flags['with-history'],
stream: flags.stream,
},
{ bundle: ctx.bundle, http: ctx.http, host: ctx.host, io: ctx.io, cache: ctx.cache },
)
}
}

View File

@ -0,0 +1,144 @@
import type { KyInstance } from 'ky'
import type { HostsBundle } from '../../../../auth/hosts.js'
import type { AppInfoCache } from '../../../../cache/app-info.js'
import type { IOStreams } from '../../../../io/streams.js'
import type { RunContext } from '../_strategies/index.js'
import { AppMetaClient } from '../../../../api/app-meta.js'
import { AppRunClient } from '../../../../api/app-run.js'
import { AppsClient } from '../../../../api/apps.js'
import { FieldInfo } from '../../../../types/app-meta.js'
import { resolveWorkspaceId } from '../../../../workspace/resolver.js'
import { pickStrategy } from '../_strategies/index.js'
import { RUN_MODES } from '../handlers.js'
import { AppRunPrintFlags } from '../print-flags.js'
export type ResumeAppOptions = {
readonly appId: string
readonly formToken: string
readonly workflowRunId: string
readonly action?: string
readonly inputs?: Readonly<Record<string, unknown>>
readonly inputsJson?: string
readonly inputsFile?: string
readonly format?: string
readonly workspace?: string
readonly withHistory?: boolean
readonly stream?: boolean
}
export type ResumeAppDeps = {
readonly bundle: HostsBundle
readonly http: KyInstance
readonly host: string
readonly io: IOStreams
readonly cache?: AppInfoCache
readonly envLookup?: (k: string) => string | undefined
readonly exit?: (code: number) => never
}
const TEXT_FORMATS = new Set(['', 'text'])
async function resolveInputs(
inputsJson: string | undefined,
inputsFile: string | undefined,
directInputs: Readonly<Record<string, unknown>> | undefined,
): Promise<Record<string, unknown>> {
if (inputsJson !== undefined && inputsFile !== undefined)
throw new Error('--inputs and --inputs-file are mutually exclusive')
if (inputsJson !== undefined) {
let parsed: unknown
try {
parsed = JSON.parse(inputsJson)
}
catch {
throw new Error('--inputs must be valid JSON')
}
if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed))
throw new Error('--inputs must be a JSON object')
return parsed as Record<string, unknown>
}
if (inputsFile !== undefined) {
const { readFile } = await import('node:fs/promises')
let parsed: unknown
try {
parsed = JSON.parse(await readFile(inputsFile, 'utf8'))
}
catch {
throw new Error('--inputs-file must contain valid JSON')
}
if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed))
throw new Error('--inputs-file must be a JSON object')
return parsed as Record<string, unknown>
}
return { ...(directInputs ?? {}) }
}
export async function resumeApp(opts: ResumeAppOptions, deps: ResumeAppDeps): Promise<void> {
const env = deps.envLookup ?? ((k: string) => process.env[k])
const wsId = resolveWorkspaceId({ flag: opts.workspace, env: env('DIFY_WORKSPACE_ID'), bundle: deps.bundle })
const apps = new AppsClient(deps.http)
const meta = new AppMetaClient({ apps, host: deps.host, cache: deps.cache })
const m = await meta.get(opts.appId, wsId, [FieldInfo])
const mode = m.info?.mode ?? RUN_MODES.Workflow
const runClient = new AppRunClient(deps.http)
const exit = deps.exit ?? ((code: number) => process.exit(code) as never)
let action = opts.action
if (action === undefined) {
const formResp = await deps.http.get(
`apps/${encodeURIComponent(opts.appId)}/form/human_input/${encodeURIComponent(opts.formToken)}`,
).json<{ user_actions: { id: string }[] }>()
if (formResp.user_actions.length === 1) {
action = formResp.user_actions[0]?.id ?? ''
}
else if (formResp.user_actions.length === 0) {
action = ''
}
else {
throw new Error('--action required: form has multiple user actions')
}
}
const inputs = await resolveInputs(opts.inputsJson, opts.inputsFile, opts.inputs)
await runClient.submitHumanInput(opts.appId, opts.formToken, action, inputs)
const format = opts.format ?? ''
const isText = TEXT_FORMATS.has(format)
const livePrint = opts.stream === true
const printFlags = new AppRunPrintFlags()
const adaptedRunClient = {
runStream: (_appId: string, _body: unknown, streamOpts?: { signal?: AbortSignal }) =>
runClient.reconnectStream(opts.appId, opts.workflowRunId, {
signal: streamOpts?.signal,
includeStateSnapshot: opts.withHistory === true,
}),
stopTask: (appId: string, taskId: string) => runClient.stopTask(appId, taskId),
submitHumanInput: runClient.submitHumanInput.bind(runClient),
reconnectStream: runClient.reconnectStream.bind(runClient),
}
const runCtx: RunContext = {
opts: {
appId: opts.appId,
inputs: inputs as Record<string, unknown>,
conversationId: undefined,
workflowId: undefined,
workspace: opts.workspace,
format,
stream: opts.stream,
},
deps,
mode,
format,
isText,
livePrint,
runClient: adaptedRunClient as unknown as AppRunClient,
printFlags,
exit,
}
await pickStrategy(isText, livePrint).execute(runCtx)
}

View File

@ -8,6 +8,7 @@ import { startMock } from '../../../../test/fixtures/dify-mock/server.js'
import { loadAppInfoCache } from '../../../cache/app-info.js'
import { createClient } from '../../../http/client.js'
import { bufferStreams } from '../../../io/streams.js'
import { resumeApp } from './resume/run.js'
import { runApp } from './run.js'
function bundle(): HostsBundle {
@ -104,7 +105,7 @@ describe('runApp', () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true, streamSetExplicitly: true },
{ appId: 'app-1', message: 'hi', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('echo: ')
@ -116,7 +117,7 @@ describe('runApp', () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true, streamSetExplicitly: true, format: 'json' },
{ appId: 'app-1', message: 'hi', stream: true, format: 'json' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const parsed = JSON.parse(io.outBuf()) as { mode: string, answer: string, conversation_id: string }
@ -125,7 +126,7 @@ describe('runApp', () => {
expect(parsed.conversation_id).toBe('conv-1')
})
it('agent-chat forces streaming without --stream', async () => {
it('agent-chat without --stream: collects and prints answer', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
@ -133,17 +134,16 @@ describe('runApp', () => {
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('do research')
expect(io.errBuf()).toContain('thought:')
expect(io.errBuf()).toContain('--conversation conv-1')
})
it('agent-chat with --stream=false explicitly: warns then streams', async () => {
it('agent-chat with --stream: live-prints answer and thoughts to stderr', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
{ appId: 'app-4', workspace: 'ws-2', message: 'go', stream: false, streamSetExplicitly: true },
{ appId: 'app-4', workspace: 'ws-2', message: 'go', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('agent apps require streaming')
expect(io.outBuf()).toContain('go')
expect(io.errBuf()).toContain('thought:')
})
@ -152,7 +152,7 @@ describe('runApp', () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true, streamSetExplicitly: true, format: 'json' },
{ appId: 'app-2', inputs: { x: '1' }, stream: true, format: 'json' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const parsed = JSON.parse(io.outBuf()) as { mode: string, data: { status: string } }
@ -165,8 +165,118 @@ describe('runApp', () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await expect(runApp(
{ appId: 'app-1', message: 'hi', stream: true, streamSetExplicitly: true },
{ appId: 'app-1', message: 'hi', stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test', retryAttempts: 0 }), host: mock.url, io, cache },
)).rejects.toMatchObject({ code: 'server_5xx' })
})
it('--inputs-file: reads inputs from file', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
const inputsFile = join(dir, 'inputs.json')
const { writeFile } = await import('node:fs/promises')
await writeFile(inputsFile, JSON.stringify({ x: 'from-file' }))
await runApp(
{ appId: 'app-2', inputsFile },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const out = JSON.parse(io.outBuf().trim()) as { result: string }
expect(out.result).toBe('echo: ')
})
it('--inputs-file: rejects non-object JSON', async () => {
const io = bufferStreams()
const { writeFile } = await import('node:fs/promises')
const inputsFile = join(dir, 'bad.json')
await writeFile(inputsFile, JSON.stringify([1, 2, 3]))
await expect(runApp(
{ appId: 'app-2', inputsFile },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io },
)).rejects.toThrow(/must be a JSON object/)
})
it('--inputs: accepts JSON object string', async () => {
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await runApp(
{ appId: 'app-2', inputsJson: '{"x":"hello"}' },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const out = JSON.parse(io.outBuf().trim()) as { result: string }
expect(out.result).toBe('echo: ')
})
it('--inputs and --inputs-file are mutually exclusive', async () => {
const io = bufferStreams()
const { writeFile } = await import('node:fs/promises')
const inputsFile = join(dir, 'f.json')
await writeFile(inputsFile, '{}')
await expect(runApp(
{ appId: 'app-2', inputsJson: '{}', inputsFile },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io },
)).rejects.toThrow(/mutually exclusive/)
})
it('hitl pause: writes pause JSON to stdout, hint to stderr, exits 2', async () => {
mock.setScenario('hitl-pause')
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
let exitCode = -1
await expect(runApp(
{ appId: 'app-2', inputs: {} },
{
bundle: bundle(),
http: createClient({ host: mock.url, bearer: 'dfoa_test' }),
host: mock.url,
io,
cache,
exit: (code) => {
exitCode = code
throw new Error(`exit:${code}`)
},
},
)).rejects.toThrow('exit:2')
expect(exitCode).toBe(2)
const payload = JSON.parse(io.outBuf()) as { status: string, form_token: string, workflow_run_id: string }
expect(payload.status).toBe('paused')
expect(payload.form_token).toBe('ft-hitl-1')
expect(payload.workflow_run_id).toBe('wf-run-hitl-1')
expect(io.errBuf()).toContain('difyctl run app resume')
})
it('resume: withHistory: false completes successfully', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, withHistory: false },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const out = JSON.parse(io.outBuf().trim()) as { result: string }
expect(out.result).toBe('echo: resumed')
})
it('resume: submits form and streams workflow to completion', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {} },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
const out = JSON.parse(io.outBuf().trim()) as { result: string }
expect(out.result).toBe('echo: resumed')
})
it('resume --stream: live-prints workflow node progress to stderr', async () => {
mock.setScenario('hitl-resume')
const io = bufferStreams()
const cache = await loadAppInfoCache({ configDir: dir })
await resumeApp(
{ appId: 'app-2', formToken: 'ft-hitl-1', workflowRunId: 'wf-run-hitl-1', action: 'submit', inputs: {}, stream: true },
{ bundle: bundle(), http: createClient({ host: mock.url, bearer: 'dfoa_test' }), host: mock.url, io, cache },
)
// stream mode for workflow: node_started → "→ <title>" on stderr
expect(io.errBuf()).toContain('After Resume')
})
})

View File

@ -16,12 +16,14 @@ import { AppRunPrintFlags } from './print-flags.js'
export type RunAppOptions = {
readonly appId: string
readonly message?: string
readonly inputs?: Readonly<Record<string, string>>
readonly inputs?: Readonly<Record<string, unknown>>
readonly inputsJson?: string
readonly inputsFile?: string
readonly conversationId?: string
readonly workflowId?: string
readonly workspace?: string
readonly format?: string
readonly stream?: boolean
readonly streamSetExplicitly?: boolean
}
export type RunAppDeps = {
@ -31,10 +33,46 @@ export type RunAppDeps = {
readonly io: IOStreams
readonly cache?: AppInfoCache
readonly envLookup?: (k: string) => string | undefined
readonly exit?: (code: number) => never
}
const TEXT_FORMATS = new Set(['', 'text'])
async function resolveInputs(
inputsJson: string | undefined,
inputsFile: string | undefined,
directInputs: Readonly<Record<string, unknown>> | undefined,
): Promise<Record<string, unknown>> {
if (inputsJson !== undefined && inputsFile !== undefined)
throw new BaseError({ code: ErrorCode.UsageInvalidFlag, message: '--inputs and --inputs-file are mutually exclusive' })
if (inputsJson !== undefined) {
let parsed: unknown
try {
parsed = JSON.parse(inputsJson)
}
catch {
throw new BaseError({ code: ErrorCode.UsageInvalidFlag, message: '--inputs must be valid JSON' })
}
if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed))
throw new BaseError({ code: ErrorCode.UsageInvalidFlag, message: '--inputs must be a JSON object' })
return parsed as Record<string, unknown>
}
if (inputsFile !== undefined) {
const { readFile } = await import('node:fs/promises')
let parsed: unknown
try {
parsed = JSON.parse(await readFile(inputsFile, 'utf8'))
}
catch {
throw new BaseError({ code: ErrorCode.UsageInvalidFlag, message: '--inputs-file must contain valid JSON' })
}
if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed))
throw new BaseError({ code: ErrorCode.UsageInvalidFlag, message: '--inputs-file must be a JSON object' })
return parsed as Record<string, unknown>
}
return { ...(directInputs ?? {}) }
}
export async function runApp(opts: RunAppOptions, deps: RunAppDeps): Promise<void> {
const env = deps.envLookup ?? ((k: string) => process.env[k])
const wsId = resolveWorkspaceId({ flag: opts.workspace, env: env('DIFY_WORKSPACE_ID'), bundle: deps.bundle })
@ -49,20 +87,18 @@ export async function runApp(opts: RunAppOptions, deps: RunAppDeps): Promise<voi
throw new BaseError({
code: ErrorCode.UsageInvalidFlag,
message: 'workflow apps do not accept a positional message',
hint: 'pass workflow inputs via --input key=value (repeatable)',
hint: 'pass workflow inputs via --inputs \'{"key":"value"}\'',
})
}
const isAgent = m.info?.is_agent === true || mode === RUN_MODES.AgentChat
const useStream = opts.stream === true || isAgent
if (isAgent && opts.streamSetExplicitly === true && opts.stream === false)
deps.io.err.write('note: agent apps require streaming; output is collected before printing\n')
const inputs = await resolveInputs(opts.inputsJson, opts.inputsFile, opts.inputs)
const format = opts.format ?? ''
const isText = TEXT_FORMATS.has(format)
const livePrint = opts.stream === true
const runClient = new AppRunClient(deps.http)
const printFlags = new AppRunPrintFlags()
const ctx = { opts, deps, mode, isAgent, format, isText, runClient, printFlags }
await pickStrategy(useStream, isText).execute(ctx)
const exit = deps.exit ?? ((code: number) => process.exit(code) as never)
const ctx = { opts: { ...opts, inputs }, deps, mode, format, isText, livePrint, runClient, printFlags, exit }
await pickStrategy(isText, livePrint).execute(ctx)
}

View File

@ -1,6 +1,6 @@
import type { SseEvent } from '../../../http/sse.js'
import { describe, expect, it } from 'vitest'
import { collect, collectorFor, decodeStreamError } from './sse-collector.js'
import { collect, collectorFor, decodeStreamError, HitlPauseError } from './sse-collector.js'
const enc = new TextEncoder()
function ev(name: string, data: object): SseEvent {
@ -120,3 +120,73 @@ describe('decodeStreamError', () => {
expect(err.message).toMatch(/error event/i)
})
})
describe('collect — human_input_required', () => {
it('throws HitlPauseError when human_input_required arrives', async () => {
const hitlData = {
task_id: 'task-1',
workflow_run_id: 'wf-run-1',
form_token: 'ft-1',
form_content: 'Please fill in',
inputs: [],
resolved_default_values: {},
user_actions: [{ id: 'submit', title: 'Submit' }],
expiration_time: 9999999999,
}
await expect(collect(iterOf(
ev('workflow_started', {}),
ev('human_input_required', hitlData),
), 'workflow')).rejects.toBeInstanceOf(HitlPauseError)
})
it('HitlPauseError carries the pause payload', async () => {
const hitlData = {
task_id: 'task-1',
workflow_run_id: 'wf-run-1',
form_token: 'ft-abc',
form_content: 'form',
inputs: [],
resolved_default_values: { name: 'Alice' },
user_actions: [],
expiration_time: 9999999999,
}
let caught: HitlPauseError | undefined
try {
await collect(iterOf(ev('human_input_required', hitlData)), 'workflow')
}
catch (e) {
if (e instanceof HitlPauseError)
caught = e
}
expect(caught).toBeDefined()
expect(caught!.pausePayload.form_token).toBe('ft-abc')
expect(caught!.pausePayload.resolved_default_values).toEqual({ name: 'Alice' })
})
})
describe('collect — silent events', () => {
it('silently ignores iteration_started and loop_started', async () => {
const got = await collect(iterOf(
ev('iteration_started', { id: 'iter-1' }),
ev('loop_started', { id: 'loop-1' }),
ev('node_started', {}),
ev('message', { answer: 'x' }),
), 'chat')
expect(got.answer).toBe('x')
})
it('silently ignores node_retry', async () => {
const got = await collect(iterOf(
ev('node_retry', { id: 'n1', retry: 1 }),
ev('message', { answer: 'ok' }),
), 'chat')
expect(got.answer).toBe('ok')
})
it('workflow_paused without prior HITL throws a plain error', async () => {
await expect(collect(iterOf(
ev('workflow_started', {}),
ev('workflow_paused', { reasons: [] }),
), 'workflow')).rejects.toThrow(/paused/)
})
})

View File

@ -4,6 +4,26 @@ import { newError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { RUN_MODES } from './handlers.js'
export type HitlPausePayload = {
task_id: string
workflow_run_id: string
form_token: string
form_content: string
inputs: unknown[]
resolved_default_values: Record<string, string>
user_actions: unknown[]
expiration_time: number
}
export class HitlPauseError extends Error {
readonly pausePayload: HitlPausePayload
constructor(payload: HitlPausePayload) {
super('workflow paused for human input')
this.name = 'HitlPauseError'
this.pausePayload = payload
}
}
export type Collector = {
consume: (ev: SseEvent) => void
finalize: () => Record<string, unknown>
@ -149,16 +169,32 @@ export function decodeStreamError(data: Uint8Array): BaseError {
return err
}
const SILENT_EVENTS = new Set([
'node_retry',
'iteration_started',
'iteration_next',
'iteration_completed',
'loop_started',
'loop_next',
'loop_completed',
])
export async function collect(
iter: AsyncIterable<SseEvent>,
mode: string,
): Promise<Record<string, unknown>> {
const c = collectorFor(mode)
for await (const ev of iter) {
if (ev.name === 'ping')
if (ev.name === 'ping' || SILENT_EVENTS.has(ev.name))
continue
if (ev.name === 'error')
throw decodeStreamError(ev.data)
if (ev.name === 'human_input_required') {
throw new HitlPauseError(parseJson(ev.data) as unknown as HitlPausePayload)
}
if (ev.name === 'workflow_paused') {
throw newError(ErrorCode.Unknown, 'workflow paused (non-interactive pause; check server logs)')
}
c.consume(ev)
}
return c.finalize()

View File

@ -1,7 +1,8 @@
import type { SseEvent } from '../../../http/sse.js'
import { Buffer } from 'node:buffer'
import { PassThrough } from 'node:stream'
import { PassThrough, Writable } from 'node:stream'
import { describe, expect, it } from 'vitest'
import { HitlPauseError } from './sse-collector.js'
import { streamPrinterFor } from './stream-handlers.js'
const enc = new TextEncoder()
@ -78,3 +79,46 @@ describe('streamPrinterFor — unknown mode', () => {
expect(() => streamPrinterFor('whatever')).toThrow()
})
})
function capture(): { stream: Writable, buf: () => string } {
const chunks: Buffer[] = []
const stream = new Writable({
write(chunk, _enc, cb) {
chunks.push(Buffer.from(chunk as ArrayBuffer))
cb()
},
})
return { stream, buf: () => Buffer.concat(chunks).toString() }
}
describe('streamPrinterFor — HITL events', () => {
it('throws HitlPauseError on human_input_required', () => {
const sp = streamPrinterFor('workflow')
const { stream } = capture()
const hitl = {
task_id: 't-1',
workflow_run_id: 'wf-1',
form_token: 'ft-1',
form_content: 'fill',
inputs: [],
resolved_default_values: {},
user_actions: [],
expiration_time: 999,
}
expect(() => sp.onEvent(stream, stream, ev('human_input_required', hitl))).toThrow(HitlPauseError)
})
})
describe('streamPrinterFor — silent events', () => {
it('silently ignores iteration_started', () => {
const sp = streamPrinterFor('workflow')
const { stream } = capture()
expect(() => sp.onEvent(stream, stream, ev('iteration_started', { id: 'i-1' }))).not.toThrow()
})
it('silently ignores node_retry', () => {
const sp = streamPrinterFor('chat')
const { stream } = capture()
expect(() => sp.onEvent(stream, stream, ev('node_retry', { id: 'n1' }))).not.toThrow()
})
})

View File

@ -1,8 +1,10 @@
import type { SseEvent } from '../../../http/sse.js'
import type { StreamPrinter } from '../../../printers/stream-printer.js'
import type { HitlPausePayload } from './sse-collector.js'
import { newError } from '../../../errors/base.js'
import { ErrorCode } from '../../../errors/codes.js'
import { RUN_MODES } from './handlers.js'
import { HitlPauseError } from './sse-collector.js'
const dec = new TextDecoder()
@ -17,9 +19,30 @@ function parseJson(data: Uint8Array): Record<string, unknown> {
}
}
const SILENT_EVENTS = new Set([
'node_retry',
'iteration_started',
'iteration_next',
'iteration_completed',
'loop_started',
'loop_next',
'loop_completed',
])
function handleCommonEvents(ev: SseEvent): boolean {
if (SILENT_EVENTS.has(ev.name))
return true
if (ev.name === 'human_input_required') {
throw new HitlPauseError(parseJson(ev.data) as unknown as HitlPausePayload)
}
return false
}
class ChatStreamPrinter implements StreamPrinter {
private convoId = ''
onEvent(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream, ev: SseEvent): void {
if (handleCommonEvents(ev))
return
const c = parseJson(ev.data)
switch (ev.name) {
case 'message':
@ -49,6 +72,8 @@ class ChatStreamPrinter implements StreamPrinter {
class CompletionStreamPrinter implements StreamPrinter {
onEvent(out: NodeJS.WritableStream, _errOut: NodeJS.WritableStream, ev: SseEvent): void {
if (handleCommonEvents(ev))
return
if (ev.name !== 'message')
return
const c = parseJson(ev.data)
@ -64,6 +89,8 @@ class CompletionStreamPrinter implements StreamPrinter {
class WorkflowStreamPrinter implements StreamPrinter {
private final: Record<string, unknown> | undefined
onEvent(_out: NodeJS.WritableStream, errOut: NodeJS.WritableStream, ev: SseEvent): void {
if (handleCommonEvents(ev))
return
const c = parseJson(ev.data)
switch (ev.name) {
case 'node_started': {

View File

@ -197,54 +197,12 @@ export type AppRunRequest = {
inputs: Record<string, any>
/** Query */
query?: string
/** Response Mode */
response_mode?: 'blocking' | 'streaming'
/** Workflow Id */
workflow_id?: string
/** Workspace Id */
workspace_id?: string
}
/** ChatMessageResponse */
export type ChatMessageResponse = {
/** Answer */
answer: string
/** Conversation Id */
conversation_id: string
/** Created At */
created_at: number
/** Event */
event: string
/** Id */
id: string
/** Message Id */
message_id: string
metadata?: MessageMetadata
/** Mode */
mode: string
/** Task Id */
task_id: string
}
/** CompletionMessageResponse */
export type CompletionMessageResponse = {
/** Answer */
answer: string
/** Created At */
created_at: number
/** Event */
event: string
/** Id */
id: string
/** Message Id */
message_id: string
metadata?: MessageMetadata
/** Mode */
mode: string
/** Task Id */
task_id: string
}
/** DeviceCodeRequest */
export type DeviceCodeRequest = {
/** Client Id */
@ -306,6 +264,16 @@ export type DevicePollRequest = {
device_code: string
}
/** HumanInputFormSubmitPayload */
export type HumanInputFormSubmitPayload = {
/** Action */
action: string
/** Inputs */
inputs: Record<string, JsonValue>
}
export type JsonValue = any
/** MessageMetadata */
export type MessageMetadata = {
/**
@ -440,20 +408,6 @@ export type WorkflowRunData = {
workflow_id: string
}
/** WorkflowRunResponse */
export type WorkflowRunResponse = {
data: WorkflowRunData
/**
* Mode
* @default "workflow"
*/
mode?: 'workflow'
/** Task Id */
task_id: string
/** Workflow Run Id */
workflow_run_id: string
}
/** WorkspaceDetailResponse */
export type WorkspaceDetailResponse = {
/** Created At */