Compare commits

..

5 Commits

Author SHA1 Message Date
aad3446c17 DynamicOutputs: allow selectors to reach nested DynamicCombo/DynamicSlot
Schema.validate() now walks DynamicCombo.Input / DynamicSlot.Input options
recursively when building the selector-target map, so DynamicOutputs.ByKey /
BySlot can address a dynamic input nested inside another dynamic option via
its dotted path (e.g. selector="outer.inner_slot"). The runtime dispatch /
type resolver already handled dotted-path nested inputs; this just removes
the schema-level rejection.

Amp-Thread-ID: https://ampcode.com/threads/T-019e8568-f382-743d-a97f-0de3ff29d501
Co-authored-by: Amp <amp@ampcode.com>
2026-06-02 08:13:16 -07:00
22d467dc84 DynamicOutputs: replace FromInput with BySlot; outputs always declared in Schema.outputs
ByKey already covered literal-driven dispatch (Combo/DynamicCombo/String);
add BySlot as the symmetric resolved-type-driven form (mirrors DynamicSlot).

Inputs no longer carry output declarations. DynamicCombo.Option / DynamicSlot.Option
go back to {key|when, inputs} only — outputs always live on the corresponding
DynamicOutputs entry in Schema.outputs. Validation enforces that ByKey option keys
align with the referenced DynamicCombo's keys and BySlot option 'when' types are a
subset of the referenced DynamicSlot's accepted types (including when=None).

Removes FromInput/_select_from_input_outputs/_from_input_as_dict and the
option-level output serialization helpers.

Amp-Thread-ID: https://ampcode.com/threads/T-019e8568-f382-743d-a97f-0de3ff29d501
Co-authored-by: Amp <amp@ampcode.com>
2026-06-01 22:39:16 -07:00
cca4119fdf DynamicOutputs: tighter integration with DynamicCombo / DynamicSlot via FromInput
Lets dynamic-input nodes co-declare per-option outputs so the option list is a
single source of truth for both inputs and outputs.

* DynamicCombo.Option / DynamicSlot.Option gain an optional outputs=[...] list.
* DynamicOutputs.FromInput(input_id) is a positional placeholder in outputs[]
  that resolves to the referenced input's active-option outputs at finalize
  time. DynamicCombo selects by literal value; DynamicSlot selects by the
  upstream slot's resolved type (or when=None when unlinked).
* get_finalized_class_outputs gains schema_inputs / live_input_types and the
  TypeResolver / execute() compute live_input_types only when at least one
  FromInput → DynamicSlot is present.
* Schema.validate(): FromInput must reference an existing DynamicCombo /
  DynamicSlot input, each input may be referenced at most once, and option
  output ids stay globally unique.
* V1 info synthesizes the dynamic_outputs entry as kind='by_key' for combos
  and kind='by_slot' for slots, inlining the option outputs.

Tests: combo + slot FromInput finalization, FromInput validation (missing /
duplicate / id collision), and TypeResolver end-to-end picks for both kinds.

Amp-Thread-ID: https://ampcode.com/threads/T-019e8568-f382-743d-a97f-0de3ff29d501
Co-authored-by: Amp <amp@ampcode.com>
2026-06-01 21:10:17 -07:00
188065c011 DynamicOutputs: fix block_execution sizing, harden validation, trim comments
Address code-review feedback:

* Fix pre-existing bug surfaced by this work: a V3 node returning a bare
  ExecutionBlocker (normalized to NodeOutput(block_execution=msg) with no
  positional/named result) was silently dropped in get_output_from_returns.
  block_execution now takes precedence and shapes the blocker tuple to the
  active output count first.
* Reject malformed linked-input source node ids (non-string) up front in
  validate_inputs as 'bad_linked_input' instead of letting them fall into
  the type resolver.
* Schema.validate(): reject duplicate DynamicOutputs group ids.
* Drop a couple of over-defensive / restating comments.

New tests: blocker sizing through both the NodeOutput and bare-blocker paths
for dynamic nodes, and duplicate-group-id schema rejection.

Amp-Thread-ID: https://ampcode.com/threads/T-019e8568-f382-743d-a97f-0de3ff29d501
Co-authored-by: Amp <amp@ampcode.com>
2026-06-01 20:48:08 -07:00
54f87a09a5 DynamicOutputs.ByKey: schema, type resolver, and execution wiring
Introduce a per-prompt finalized output list for V3 nodes whose active
output shape depends on a literal input. First slice:

* Schema:
  - io.DynamicOutputs.ByKey(id, selector, options=[io.DynamicOutputs.Option(...)])
  - Validates: selector input exists; output ids unique within and across
    static + dynamic branches; no overlap between branches.
  - get_v1_info() advertises dynamic groups separately via NodeInfoV1.dynamic_outputs;
    static RETURN_TYPES / OUTPUT_IS_LIST stay as the always-present prefix.
* NodeOutput:
  - New NodeOutput.from_named({id: value}) for dynamic nodes; engine reorders
    against the finalized active output list. Mixing positional+named raises.
* Execution:
  - execute() computes FinalizedOutputs once per attempt and threads it
    through initial / pending-async / pending-subgraph output paths.
  - merge_result_data / get_output_data / get_output_from_returns size
    blocker tuples and concat behaviour against the finalized list.
  - validate_inputs rejects positional links whose slot index is outside
    the upstream node's active output count as 'bad_linked_output'.
* TypeResolver:
  - resolve_output_type and is_output_list consult the finalized output
    list when present; finalized_output_count() exposes the active count
    for validation.

Covered by new unit tests under tests-unit/comfy_api_test and
tests-unit/execution_test.

Amp-Thread-ID: https://ampcode.com/threads/T-019e8568-f382-743d-a97f-0de3ff29d501
Co-authored-by: Amp <amp@ampcode.com>
2026-06-01 20:34:27 -07:00
5 changed files with 1549 additions and 64 deletions

View File

@ -1224,9 +1224,9 @@ class DynamicSlot(ComfyTypeI):
* a list of ComfyType classes (shared branch across multiple types)
* a ``MultiType.Input`` instance (parsed via its ``.io_types``)
"""
def __init__(self, when: Any, inputs: list[Input]):
def __init__(self, when: Any, inputs: list[Input] = None):
self.when = when
self.inputs = inputs
self.inputs = inputs or []
# ``_when_types`` is the ordered tuple of io_types (deterministic);
# ``_when_set`` is the same content as a set for fast matching.
self._when_types = self._normalize_when(when)
@ -1390,6 +1390,288 @@ class DynamicSlot(ComfyTypeI):
out_dict[input_type][finalized_id] = value
out_dict["dynamic_paths"][finalized_id] = finalize_prefix(curr_prefix, curr_prefix[-1])
@dataclass
class FinalizedOutputs:
"""Resolved set of active output slots for a single prompt execution.
Produced by :py:func:`get_finalized_class_outputs`; held alongside V1's
``RETURN_TYPES`` view so the execution engine can size results, validate
link indices, and reorder named ``NodeOutput`` results against it.
"""
outputs: list[Output]
output_ids: list[str]
return_types: list[str]
return_names: list[str]
output_is_list: list[bool]
output_tooltips: list[str | None]
def __len__(self) -> int:
return len(self.outputs)
class DynamicOutputs:
"""Container namespace for dynamic output group declarations.
Place an instance of one of the inner classes directly inside
``Schema.outputs`` to declare a set of outputs whose shape depends on
prompt data. The active branch is chosen at execution time by
:py:func:`get_finalized_class_outputs`.
Forms:
* :py:class:`DynamicOutputs.ByKey` — selector is a literal input
(``Combo`` / ``DynamicCombo`` / ``String`` / etc.) and the active
option is chosen by direct value comparison against ``Option.key``.
* :py:class:`DynamicOutputs.BySlot` — selector is a
:py:class:`DynamicSlot.Input` and the active option is chosen by
the slot's resolved upstream type, matching ``Option.when``.
Inactive options do not produce placeholder slots — downstream links to
nonexistent finalized slots are rejected by validation.
"""
class Option:
"""A branch of outputs for :py:class:`DynamicOutputs.ByKey`."""
def __init__(self, key: str, outputs: list[Output]):
if not isinstance(key, str) or not key:
raise ValueError("DynamicOutputs.Option: key must be a non-empty string")
_validate_outputs("DynamicOutputs.Option", outputs)
self.key = key
self.outputs = outputs
def as_dict(self):
return {"key": self.key, "outputs": _serialize_outputs(self.outputs)}
class SlotOption:
"""A branch of outputs for :py:class:`DynamicOutputs.BySlot`.
``when`` accepts the same shapes as :py:class:`DynamicSlot.Option.when`
(``None``, ``io.AnyType``, a single ComfyType class, a list of classes,
or a ``MultiType.Input``); validation requires it to be a subset of the
target ``DynamicSlot.Input``'s declared types.
"""
def __init__(self, when: Any, outputs: list[Output]):
_validate_outputs("DynamicOutputs.SlotOption", outputs)
self.when = when
self.outputs = outputs
self._when_types = DynamicSlot.Option._normalize_when(when)
self._when_set: frozenset[str] | None = (
None if self._when_types is None else frozenset(self._when_types)
)
def as_dict(self):
return {
"when": None if self._when_types is None else list(self._when_types),
"outputs": _serialize_outputs(self.outputs),
}
class ByKey:
"""Active outputs picked by the literal value of a same-node input."""
kind = "by_key"
def __init__(self, id: str, selector: str, options: list[DynamicOutputs.Option]):
if not isinstance(id, str) or not id:
raise ValueError("DynamicOutputs.ByKey: id must be a non-empty string")
if not isinstance(selector, str) or not selector:
raise ValueError("DynamicOutputs.ByKey: selector must be a non-empty string input id")
if not options:
raise ValueError("DynamicOutputs.ByKey: at least one Option is required")
seen_keys: set[str] = set()
seen_ids: set[str] = set()
for opt in options:
if not isinstance(opt, DynamicOutputs.Option):
raise ValueError(
f"DynamicOutputs.ByKey: options must be DynamicOutputs.Option, got {opt!r}"
)
if opt.key in seen_keys:
raise ValueError(f"DynamicOutputs.ByKey: duplicate option key {opt.key!r}")
seen_keys.add(opt.key)
for o in opt.outputs:
if o.id in seen_ids:
raise ValueError(
f"DynamicOutputs.ByKey: output id {o.id!r} appears in more than one option; "
"each output id must be unique within the group"
)
seen_ids.add(o.id)
self.id = id
self.selector = selector
self.options = options
def as_dict(self):
return {
"id": self.id,
"kind": self.kind,
"selector": self.selector,
"options": [opt.as_dict() for opt in self.options],
}
def select(self, prompt_inputs: dict[str, Any]) -> DynamicOutputs.Option | None:
"""Pick the matching ``Option`` for the prompt's selector value, or ``None``."""
value = prompt_inputs.get(self.selector)
# DynamicCombo wraps the literal in a nested dict {selector_id: {selector_id: key, ...}};
# unwrap so authors don't have to special-case it.
if isinstance(value, dict):
value = value.get(self.selector)
# Links are ``[node_id, slot_idx]`` lists; only literals finalize.
if isinstance(value, list):
return None
for opt in self.options:
if opt.key == value:
return opt
return None
class BySlot:
"""Active outputs picked by the resolved upstream type of a DynamicSlot input."""
kind = "by_slot"
def __init__(self, id: str, selector: str, options: list[DynamicOutputs.SlotOption]):
if not isinstance(id, str) or not id:
raise ValueError("DynamicOutputs.BySlot: id must be a non-empty string")
if not isinstance(selector, str) or not selector:
raise ValueError("DynamicOutputs.BySlot: selector must be a non-empty string input id")
if not options:
raise ValueError("DynamicOutputs.BySlot: at least one Option is required")
seen_types: set[str] = set()
seen_none = False
seen_ids: set[str] = set()
for opt in options:
if not isinstance(opt, DynamicOutputs.SlotOption):
raise ValueError(
f"DynamicOutputs.BySlot: options must be DynamicOutputs.SlotOption, got {opt!r}"
)
if opt._when_types is None:
if seen_none:
raise ValueError("DynamicOutputs.BySlot: only one option may declare when=None")
seen_none = True
else:
for t in opt._when_types:
if t in seen_types:
raise ValueError(
f"DynamicOutputs.BySlot: type {t!r} appears in more than one option; "
"each type must be claimed by exactly one option"
)
seen_types.add(t)
for o in opt.outputs:
if o.id in seen_ids:
raise ValueError(
f"DynamicOutputs.BySlot: output id {o.id!r} appears in more than one option; "
"each output id must be unique within the group"
)
seen_ids.add(o.id)
self.id = id
self.selector = selector
self.options = options
def as_dict(self):
return {
"id": self.id,
"kind": self.kind,
"selector": self.selector,
"options": [opt.as_dict() for opt in self.options],
}
def select(self, prompt_inputs: dict[str, Any],
live_input_types: dict[str, str] | None) -> DynamicOutputs.SlotOption | None:
"""Pick the matching ``SlotOption`` for the slot's resolved type."""
raw = prompt_inputs.get(self.selector)
has_link = isinstance(raw, list) and raw is not None
if not has_link:
for opt in self.options:
if opt._when_types is None:
return opt
return None
resolved = (live_input_types or {}).get(self.selector, "*")
resolved_set = {t.strip() for t in resolved.split(",")}
for opt in self.options:
if opt._when_types is None:
continue
if resolved_set & opt._when_set:
return opt
return None
def _validate_outputs(label: str, outputs: list[Output]) -> None:
"""Sanity-check a list of outputs at option construction."""
if not outputs:
return
for o in outputs:
if not isinstance(o, Output):
raise ValueError(f"{label}: outputs must contain Output instances, got {o!r}")
if o.id is None:
raise ValueError(f"{label}: every output must declare an id")
def _serialize_outputs(outputs: list[Output]) -> list[dict]:
"""Inline-serialize outputs for V1 ``dynamic_outputs`` entries."""
return [{"id": o.id, "type": o.get_io_type(), **o.as_dict()} for o in outputs]
def _output_metadata(o: Output) -> tuple[str, str, str, bool, str | None]:
"""Return (id, return_type, display_name, is_output_list, tooltip) for an Output."""
rt = o.get_io_type()
name = o.display_name if o.display_name else rt
return o.id, rt, name, o.is_output_list, (o.tooltip if o.tooltip else None)
def get_finalized_class_outputs(
schema_outputs: list,
prompt_inputs: dict[str, Any] | None,
live_input_types: dict[str, str] | None = None,
) -> FinalizedOutputs:
"""Resolve the active output list for a node.
Expands :py:class:`DynamicOutputs.ByKey` against ``prompt_inputs`` and
:py:class:`DynamicOutputs.BySlot` against ``live_input_types``. Inactive
options contribute no slots — downstream links to ranges that only existed
under a different branch are caught by validation rather than silently
filled with ``AnyType`` placeholders.
"""
inputs = prompt_inputs or {}
outputs: list[Output] = []
ids: list[str] = []
types: list[str] = []
names: list[str] = []
is_list: list[bool] = []
tooltips: list[str | None] = []
def append_output(o: Output):
oid, rt, name, isl, tt = _output_metadata(o)
outputs.append(o)
ids.append(oid)
types.append(rt)
names.append(name)
is_list.append(isl)
tooltips.append(tt)
for entry in schema_outputs or []:
if isinstance(entry, Output):
append_output(entry)
elif isinstance(entry, DynamicOutputs.ByKey):
selected = entry.select(inputs)
if selected is not None:
for o in selected.outputs:
append_output(o)
elif isinstance(entry, DynamicOutputs.BySlot):
selected = entry.select(inputs, live_input_types)
if selected is not None:
for o in selected.outputs:
append_output(o)
# else: ignore unknown entries (future-proofing for new dynamic kinds)
return FinalizedOutputs(
outputs=outputs,
output_ids=ids,
return_types=types,
return_names=names,
output_is_list=is_list,
output_tooltips=tooltips,
)
@comfytype(io_type="IMAGECOMPARE")
class ImageCompare(ComfyTypeI):
Type = dict
@ -1621,6 +1903,11 @@ class NodeInfoV1:
search_aliases: list[str]=None
essentials_category: str=None
has_intermediate_output: bool=None
dynamic_outputs: list[dict] | None = None
"""Templates for dynamic output groups (``DynamicOutputs.ByKey`` etc.). The active
output list depends on prompt data and is finalized per execution; static
``output`` / ``output_name`` / ``output_is_list`` arrays cover only always-present
outputs."""
@dataclass
@ -1757,29 +2044,120 @@ class Schema:
def validate(self):
'''Validate the schema:
- verify ids on inputs and outputs are unique - both internally and in relation to each other
- input/output ids are unique within and across each other
- DynamicOutputs.ByKey / BySlot have unique group ids
- ByKey selectors reference a real input (literal alignment validated when
the selector is a DynamicCombo)
- BySlot selectors reference a real DynamicSlot input and every option's
``when`` is accepted by that slot
'''
nested_inputs: list[Input] = []
for input in self.inputs:
if not isinstance(input, DynamicInput):
nested_inputs.extend(input.get_all())
input_ids = [i.id for i in nested_inputs]
output_ids = [o.id for o in self.outputs]
input_set = set(input_ids)
# Walk DynamicCombo / DynamicSlot options so a DynamicOutputs selector can
# target a nested dynamic input by dotted path (e.g. "outer.inner_slot").
# Other DynamicInput kinds (e.g. Autogrow) are skipped — their nested ids
# only exist at prompt time and aren't statically addressable.
selector_targets: dict[str, Input] = {}
def _walk(inputs: list[Input], prefix: str | None) -> None:
for inp in inputs:
dotted = inp.id if prefix is None else f"{prefix}.{inp.id}"
selector_targets[dotted] = inp
if isinstance(inp, (DynamicCombo.Input, DynamicSlot.Input)):
for opt in inp.options:
_walk(opt.inputs, dotted)
_walk(self.inputs, None)
combo_inputs = {sid: inp for sid, inp in selector_targets.items() if isinstance(inp, DynamicCombo.Input)}
slot_inputs = {sid: inp for sid, inp in selector_targets.items() if isinstance(inp, DynamicSlot.Input)}
all_selectable_ids = input_set | set(selector_targets)
# ``output_ids`` covers every id that may ever appear in a finalized
# output list — static outputs + every option's outputs across every
# dynamic group — so collisions between branches are caught up front.
output_ids: list[str] = []
for o in self.outputs:
if isinstance(o, Output):
output_ids.append(o.id)
elif isinstance(o, (DynamicOutputs.ByKey, DynamicOutputs.BySlot)):
for opt in o.options:
output_ids.extend(child.id for child in opt.outputs)
output_set = set(output_ids)
issues: list[str] = []
# verify ids are unique per list
if len(input_set) != len(input_ids):
issues.append(f"Input ids must be unique, but {[item for item, count in Counter(input_ids).items() if count > 1]} are not.")
if len(output_set) != len(output_ids):
issues.append(f"Output ids must be unique, but {[item for item, count in Counter(output_ids).items() if count > 1]} are not.")
group_ids: list[str] = []
for o in self.outputs:
if isinstance(o, DynamicOutputs.ByKey):
group_ids.append(o.id)
if o.selector not in all_selectable_ids:
issues.append(
f"DynamicOutputs.ByKey(id={o.id!r}) selector input {o.selector!r} "
f"does not exist on the schema."
)
# If the selector is a DynamicCombo, every option key must match one of its keys.
if o.selector in combo_inputs:
valid_keys = {opt.key for opt in combo_inputs[o.selector].options}
stray = [opt.key for opt in o.options if opt.key not in valid_keys]
if stray:
issues.append(
f"DynamicOutputs.ByKey(id={o.id!r}) option key(s) {stray!r} are not "
f"declared on DynamicCombo input {o.selector!r} (valid: {sorted(valid_keys)!r})."
)
elif isinstance(o, DynamicOutputs.BySlot):
group_ids.append(o.id)
if o.selector not in slot_inputs:
issues.append(
f"DynamicOutputs.BySlot(id={o.id!r}) selector {o.selector!r} must reference "
f"a DynamicSlot input on the schema."
)
else:
slot = slot_inputs[o.selector]
slot_accepted = set()
slot_has_none = False
for sopt in slot.options:
if sopt._when_types is None:
slot_has_none = True
else:
slot_accepted.update(sopt._when_types)
for opt in o.options:
if opt._when_types is None:
if not slot_has_none:
issues.append(
f"DynamicOutputs.BySlot(id={o.id!r}) option(when=None) requires "
f"DynamicSlot {o.selector!r} to declare a when=None option."
)
else:
stray_types = [t for t in opt._when_types if t not in slot_accepted]
if stray_types:
issues.append(
f"DynamicOutputs.BySlot(id={o.id!r}) option type(s) {stray_types!r} "
f"are not accepted by DynamicSlot {o.selector!r} "
f"(accepted: {sorted(slot_accepted)!r})."
)
if len(set(group_ids)) != len(group_ids):
issues.append(
f"DynamicOutputs group ids must be unique, but "
f"{[i for i, c in Counter(group_ids).items() if c > 1]} are not."
)
if len(issues) > 0:
raise ValueError("\n".join(issues))
# validate inputs and outputs
# per-element validation
for input in self.inputs:
input.validate()
for output in self.outputs:
output.validate()
if isinstance(output, Output):
output.validate()
elif isinstance(output, (DynamicOutputs.ByKey, DynamicOutputs.BySlot)):
for opt in output.options:
for child in opt.outputs:
child.validate()
if self.price_badge is not None:
self.price_badge.validate()
@ -1804,9 +2182,10 @@ class Schema:
self.hidden.append(Hidden.prompt)
if Hidden.extra_pnginfo not in self.hidden:
self.hidden.append(Hidden.extra_pnginfo)
# give outputs without ids default ids
# give outputs without ids default ids (dynamic groups require explicit ids
# so we can never accidentally collide synthesized names across branches).
for i, output in enumerate(self.outputs):
if output.id is None:
if isinstance(output, Output) and output.id is None:
output.id = f"_{i}_{output.io_type}_"
def get_v1_info(self, cls) -> NodeInfoV1:
@ -1815,15 +2194,21 @@ class Schema:
if self.hidden:
for hidden in self.hidden:
input.setdefault("hidden", {})[hidden.name] = (hidden.value,)
# create separate lists from output fields
# create separate lists from output fields (static outputs only — dynamic
# groups are advertised separately via ``dynamic_outputs`` so the frontend
# and any V1 consumer see a stable always-present prefix).
output = []
output_is_list = []
output_name = []
output_tooltips = []
output_matchtypes = []
any_matchtypes = False
dynamic_outputs: list[dict[str, Any]] = []
if self.outputs:
for o in self.outputs:
if isinstance(o, (DynamicOutputs.ByKey, DynamicOutputs.BySlot)):
dynamic_outputs.append(o.as_dict())
continue
output.append(o.io_type)
output_is_list.append(o.is_output_list)
output_name.append(o.display_name if o.display_name else o.io_type)
@ -1862,6 +2247,7 @@ class Schema:
price_badge=self.price_badge.as_dict(self.inputs) if self.price_badge is not None else None,
search_aliases=self.search_aliases if self.search_aliases else None,
essentials_category=self.essentials_category,
dynamic_outputs=dynamic_outputs or None,
)
return info
@ -2260,12 +2646,17 @@ class _ComfyNodeBaseInternal(_ComfyNodeInternal):
cls._ACCEPT_ALL_INPUTS = schema.accept_all_inputs
if cls._RETURN_TYPES is None:
# Class-level RETURN_TYPES / RETURN_NAMES / OUTPUT_IS_LIST cover the
# always-present static outputs only; dynamic groups are finalized
# per-prompt in get_finalized_class_outputs.
output = []
output_name = []
output_is_list = []
output_tooltips = []
if schema.outputs:
for o in schema.outputs:
if not isinstance(o, Output):
continue
output.append(o.io_type)
output_name.append(o.display_name if o.display_name else o.io_type)
output_is_list.append(o.is_output_list)
@ -2332,17 +2723,32 @@ class ComfyNode(_ComfyNodeBaseInternal):
class NodeOutput(_NodeOutputInternal):
'''
Standardized output of a node; can pass in any number of args and/or a UIOutput into 'ui' kwarg.
For nodes whose active output list depends on prompt data (e.g. those using
:py:class:`DynamicOutputs`), pass ``named={output_id: value, ...}`` instead
of positional args. The execution engine reorders against the finalized
output list at run time; unknown or missing ids raise.
'''
def __init__(self, *args: Any, ui: _UIOutput | dict=None, expand: dict=None, block_execution: str=None):
def __init__(self, *args: Any, named: dict[str, Any]=None, ui: _UIOutput | dict=None, expand: dict=None, block_execution: str=None):
if args and named is not None:
raise ValueError("NodeOutput: cannot mix positional args with named=...; choose one form")
self.args = args
self.named = named
self.ui = ui
self.expand = expand
self.block_execution = block_execution
@property
def result(self):
# Positional tuple only; named results live in ``self.named`` and are
# ordered against the finalized output list by the execution engine.
return self.args if len(self.args) > 0 else None
@classmethod
def from_named(cls, named: dict[str, Any], *, ui: _UIOutput | dict=None, expand: dict=None, block_execution: str=None) -> NodeOutput:
"""Build a NodeOutput keyed by output id, for dynamic-output nodes."""
return cls(named=named, ui=ui, expand=expand, block_execution=block_execution)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> NodeOutput:
args = ()

View File

@ -142,32 +142,48 @@ class TypeResolver:
return ANY_TYPE
class_type = node.get("class_type")
try:
return_types = class_def.RETURN_TYPES
except Exception:
return ANY_TYPE
if return_types is None or slot_idx < 0 or slot_idx >= len(return_types):
return ANY_TYPE
# V3 schemas may declare DynamicOutputs groups whose active slots are
# determined by prompt inputs and do not appear in class RETURN_TYPES;
# resolve types against the finalized output list when present.
finalized = self._get_finalized_outputs(node_id, node, class_def)
if finalized is not None:
if slot_idx < 0 or slot_idx >= len(finalized):
return ANY_TYPE
declared = finalized.return_types[slot_idx]
resolved_output = finalized.outputs[slot_idx]
resolved = declared
if isinstance(resolved_output, io.MatchType.Output):
schema = getattr(class_def, "SCHEMA", None) or class_def.GET_SCHEMA()
resolved = self._resolve_match_template(
node_id, schema, resolved_output.template.template_id, next_stack
)
else:
try:
return_types = class_def.RETURN_TYPES
except Exception:
return ANY_TYPE
if return_types is None or slot_idx < 0 or slot_idx >= len(return_types):
return ANY_TYPE
declared = return_types[slot_idx]
declared = return_types[slot_idx]
# Only V3 schemas carry MatchType template info; V1 RETURN_TYPES are
# always concrete strings.
resolved = declared
if isinstance(class_def, type) and issubclass(class_def, _ComfyNodeInternal):
schema = getattr(class_def, "SCHEMA", None)
if schema is None:
# RETURN_TYPES access above usually populates SCHEMA — be defensive.
try:
schema = class_def.GET_SCHEMA()
except Exception:
schema = None
if schema is not None and slot_idx < len(schema.outputs):
out = schema.outputs[slot_idx]
if isinstance(out, io.MatchType.Output):
resolved = self._resolve_match_template(
node_id, schema, out.template.template_id, next_stack
)
# Only V3 schemas carry MatchType template info; V1 RETURN_TYPES are
# always concrete strings.
resolved = declared
if isinstance(class_def, type) and issubclass(class_def, _ComfyNodeInternal):
schema = getattr(class_def, "SCHEMA", None)
if schema is None:
# RETURN_TYPES access above usually populates SCHEMA — be defensive.
try:
schema = class_def.GET_SCHEMA()
except Exception:
schema = None
if schema is not None and slot_idx < len(schema.outputs):
out = schema.outputs[slot_idx]
if isinstance(out, io.MatchType.Output):
resolved = self._resolve_match_template(
node_id, schema, out.template.template_id, next_stack
)
# Warn only for V1 wildcards declared as "*"; unresolved MatchType
# templates warn separately in _resolve_match_template, avoiding double-warns.
@ -216,6 +232,52 @@ class TypeResolver:
f"MatchType template '{template_id}' has no bound concrete upstream input; defaulting to AnyType")
return ANY_TYPE
def _get_finalized_outputs(self, node_id: str, node: dict | None, class_def) -> io.FinalizedOutputs | None:
"""Return ``FinalizedOutputs`` for V3 nodes with DynamicOutputs groups, else ``None``.
``BySlot`` groups need ``live_input_types`` (computed lazily from the
resolver itself) so the active option can be picked by resolved type.
"""
if not (isinstance(class_def, type) and issubclass(class_def, _ComfyNodeInternal)):
return None
try:
schema = class_def.GET_SCHEMA()
except Exception:
return None
has_dynamic = any(
isinstance(o, (io.DynamicOutputs.ByKey, io.DynamicOutputs.BySlot))
for o in schema.outputs
)
if not has_dynamic:
return None
prompt_inputs = (node or {}).get("inputs", {}) or {}
# live_input_types is only needed for BySlot — skip the resolver pass
# otherwise to keep the hot path cheap.
needs_live_types = any(isinstance(o, io.DynamicOutputs.BySlot) for o in schema.outputs)
live_input_types = self.compute_live_input_types(node_id) if needs_live_types else None
return io.get_finalized_class_outputs(
schema.outputs, prompt_inputs, live_input_types=live_input_types,
)
def finalized_output_count(self, node_id: str) -> int:
"""Number of active output slots on ``node_id``'s schema for the current prompt.
For V3 nodes with :py:class:`comfy_api.latest._io.DynamicOutputs` groups
the count is computed against the node's prompt inputs; for static V3
/ V1 nodes it falls back to ``len(RETURN_TYPES)``. Unknown nodes
report ``0``.
"""
node, class_def = self._get_class_def_for_node(node_id)
if class_def is None:
return 0
finalized = self._get_finalized_outputs(node_id, node, class_def)
if finalized is not None:
return len(finalized)
try:
return len(class_def.RETURN_TYPES)
except Exception:
return 0
def is_output_list(self, node_id: str, slot_idx: int) -> bool:
"""Whether the source slot is declared as a list output (``OUTPUT_IS_LIST[idx]``)."""
if isinstance(slot_idx, bool) or not isinstance(slot_idx, int):
@ -224,11 +286,16 @@ class TypeResolver:
if cache_key in self._is_output_list_cache:
return self._is_output_list_cache[cache_key]
result = False
_, class_def = self._get_class_def_for_node(node_id)
node, class_def = self._get_class_def_for_node(node_id)
if class_def is not None:
lst = getattr(class_def, "OUTPUT_IS_LIST", None)
if lst is not None and 0 <= slot_idx < len(lst):
result = bool(lst[slot_idx])
finalized = self._get_finalized_outputs(node_id, node, class_def)
if finalized is not None:
if 0 <= slot_idx < len(finalized):
result = bool(finalized.output_is_list[slot_idx])
else:
lst = getattr(class_def, "OUTPUT_IS_LIST", None)
if lst is not None and 0 <= slot_idx < len(lst):
result = bool(lst[slot_idx])
self._is_output_list_cache[cache_key] = result
return result

View File

@ -314,15 +314,56 @@ async def _async_map_node_over_list(prompt_id, unique_id, obj, input_data_all, f
return results
def merge_result_data(results, obj):
def _expected_output_count(obj, finalized_outputs=None):
"""Size results / blocker tuples by the finalized output list when present."""
if finalized_outputs is not None:
return len(finalized_outputs)
return len(getattr(obj, "RETURN_TYPES", ()))
def _normalize_named_result(node_output, finalized_outputs):
"""Convert a ``NodeOutput.from_named({...})`` payload into an ordered tuple.
Strict by design: an active output id missing from the payload is an
error; an unknown id is an error. Use the finalized output list as the
single source of truth for ordering.
"""
if finalized_outputs is None:
raise Exception(
"NodeOutput(named=...) is only supported for V3 nodes with a finalized "
"output schema (e.g. nodes using DynamicOutputs)."
)
expected_ids = finalized_outputs.output_ids
payload = node_output.named
missing = [oid for oid in expected_ids if oid not in payload]
unknown = [oid for oid in payload if oid not in expected_ids]
if missing or unknown:
raise Exception(
f"NodeOutput(named=...) ids do not match active outputs: "
f"missing={missing}, unknown={unknown}, expected={expected_ids}"
)
return tuple(payload[oid] for oid in expected_ids)
def merge_result_data(results, obj, finalized_outputs=None):
# check which outputs need concatenating
output = []
output_is_list = [False] * len(results[0])
if hasattr(obj, "OUTPUT_IS_LIST"):
output_is_list = obj.OUTPUT_IS_LIST
expected_count = _expected_output_count(obj, finalized_outputs)
if finalized_outputs is not None:
output_is_list = finalized_outputs.output_is_list
else:
output_is_list = [False] * len(results[0])
if hasattr(obj, "OUTPUT_IS_LIST"):
output_is_list = obj.OUTPUT_IS_LIST
for r in results:
if len(r) != expected_count:
raise Exception(
f"Node returned {len(r)} outputs but active schema has {expected_count}"
)
# merge node execution results
for i, is_list in zip(range(len(results[0])), output_is_list):
for i, is_list in zip(range(expected_count), output_is_list):
if is_list:
value = []
for o in results:
@ -335,19 +376,20 @@ def merge_result_data(results, obj):
output.append([o[i] for o in results])
return output
async def get_output_data(prompt_id, unique_id, obj, input_data_all, execution_block_cb=None, pre_execute_cb=None, v3_data=None):
async def get_output_data(prompt_id, unique_id, obj, input_data_all, execution_block_cb=None, pre_execute_cb=None, v3_data=None, finalized_outputs=None):
return_values = await _async_map_node_over_list(prompt_id, unique_id, obj, input_data_all, obj.FUNCTION, allow_interrupt=True, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb, v3_data=v3_data)
has_pending_task = any(isinstance(r, asyncio.Task) and not r.done() for r in return_values)
if has_pending_task:
return return_values, {}, False, has_pending_task
output, ui, has_subgraph = get_output_from_returns(return_values, obj)
output, ui, has_subgraph = get_output_from_returns(return_values, obj, finalized_outputs=finalized_outputs)
return output, ui, has_subgraph, False
def get_output_from_returns(return_values, obj):
def get_output_from_returns(return_values, obj, finalized_outputs=None):
results = []
uis = []
subgraph_results = []
has_subgraph = False
expected_count = _expected_output_count(obj, finalized_outputs)
for i in range(len(return_values)):
r = return_values[i]
if isinstance(r, dict):
@ -359,12 +401,12 @@ def get_output_from_returns(return_values, obj):
new_graph = r['expand']
result = r.get("result", None)
if isinstance(result, ExecutionBlocker):
result = tuple([result] * len(obj.RETURN_TYPES))
result = tuple([result] * expected_count)
subgraph_results.append((new_graph, result))
elif 'result' in r:
result = r.get("result", None)
if isinstance(result, ExecutionBlocker):
result = tuple([result] * len(obj.RETURN_TYPES))
result = tuple([result] * expected_count)
results.append(result)
subgraph_results.append((None, result))
elif isinstance(r, _NodeOutputInternal):
@ -374,29 +416,42 @@ def get_output_from_returns(return_values, obj):
uis.append(r.ui)
else:
uis.append(r.ui.as_dict())
if r.expand is not None:
# Named NodeOutput → reorder against the finalized output list before
# downstream code treats this as a fixed-shape tuple.
named_result = (
_normalize_named_result(r, finalized_outputs)
if r.named is not None
else None
)
# block_execution takes precedence: EXECUTE_NORMALIZED converts a bare
# ExecutionBlocker return into NodeOutput(block_execution=msg) with no
# positional / named result, so we must shape the tuple ourselves.
if r.block_execution is not None:
result = tuple([ExecutionBlocker(r.block_execution)] * expected_count)
if r.expand is not None:
has_subgraph = True
subgraph_results.append((r.expand, result))
else:
results.append(result)
subgraph_results.append((None, result))
elif r.expand is not None:
has_subgraph = True
new_graph = r.expand
result = r.result
if r.block_execution is not None:
result = tuple([ExecutionBlocker(r.block_execution)] * len(obj.RETURN_TYPES))
subgraph_results.append((new_graph, result))
elif r.result is not None:
result = r.result
if r.block_execution is not None:
result = tuple([ExecutionBlocker(r.block_execution)] * len(obj.RETURN_TYPES))
result = named_result if named_result is not None else r.result
subgraph_results.append((r.expand, result))
elif named_result is not None or r.result is not None:
result = named_result if named_result is not None else r.result
results.append(result)
subgraph_results.append((None, result))
else:
if isinstance(r, ExecutionBlocker):
r = tuple([r] * len(obj.RETURN_TYPES))
r = tuple([r] * expected_count)
results.append(r)
subgraph_results.append((None, r))
if has_subgraph:
output = subgraph_results
elif len(results) > 0:
output = merge_result_data(results, obj)
output = merge_result_data(results, obj, finalized_outputs=finalized_outputs)
else:
output = []
ui = dict()
@ -444,6 +499,24 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
execution_list.cache_update(unique_id, cached)
return (ExecutionResult.SUCCESS, None, None)
# Finalize active outputs once so initial / async-resume / subgraph-resume paths agree.
finalized_outputs = None
if issubclass(class_def, _ComfyNodeInternal):
schema = class_def.GET_SCHEMA()
has_dynamic = any(
isinstance(o, (_io.DynamicOutputs.ByKey, _io.DynamicOutputs.BySlot))
for o in schema.outputs
)
if has_dynamic:
# live_input_types is only needed for BySlot finalization.
needs_live_types = any(isinstance(o, _io.DynamicOutputs.BySlot) for o in schema.outputs)
live_input_types = None
if needs_live_types and hasattr(dynprompt, "get_type_resolver"):
live_input_types = dynprompt.get_type_resolver().compute_live_input_types(unique_id)
finalized_outputs = _io.get_finalized_class_outputs(
schema.outputs, inputs, live_input_types=live_input_types,
)
input_data_all = None
try:
if unique_id in pending_async_nodes:
@ -459,7 +532,7 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
else:
results.append(r)
del pending_async_nodes[unique_id]
output_data, output_ui, has_subgraph = get_output_from_returns(results, class_def)
output_data, output_ui, has_subgraph = get_output_from_returns(results, class_def, finalized_outputs=finalized_outputs)
elif unique_id in pending_subgraph_results:
cached_results = pending_subgraph_results[unique_id]
resolved_outputs = []
@ -478,7 +551,7 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
else:
resolved_output.append(r)
resolved_outputs.append(tuple(resolved_output))
output_data = merge_result_data(resolved_outputs, class_def)
output_data = merge_result_data(resolved_outputs, class_def, finalized_outputs=finalized_outputs)
output_ui = []
del pending_subgraph_results[unique_id]
has_subgraph = False
@ -536,7 +609,7 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
GraphBuilder.set_default_prefix(unique_id, call_index, 0)
try:
output_data, output_ui, has_subgraph, has_pending_tasks = await get_output_data(prompt_id, unique_id, obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb, v3_data=v3_data)
output_data, output_ui, has_subgraph, has_pending_tasks = await get_output_data(prompt_id, unique_id, obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb, v3_data=v3_data, finalized_outputs=finalized_outputs)
finally:
if comfy.memory_management.aimdo_enabled:
if args.verbose == "DEBUG":
@ -921,6 +994,32 @@ async def validate_inputs(prompt_id, prompt, item, validated, visiting=None, typ
continue
o_id = val[0]
if not isinstance(o_id, str):
errors.append({
"type": "bad_linked_input",
"message": "Bad linked input, source node id must be a string",
"details": f"{x}, linked_node({val})",
"extra_info": {"input_name": x, "linked_node": val}
})
continue
# Reject links pointing at slot indices outside the upstream node's
# active output count (e.g. stale link after a DynamicOutputs branch
# change).
upstream_output_count = type_resolver.finalized_output_count(o_id)
if not isinstance(val[1], int) or isinstance(val[1], bool) or val[1] < 0 or val[1] >= upstream_output_count:
error = {
"type": "bad_linked_output",
"message": "Linked output slot does not exist on the source node",
"details": f"{x}, linked_node({o_id}), output_index({val[1]}), active_output_count({upstream_output_count})",
"extra_info": {
"input_name": x,
"linked_node": val,
"output_index": val[1],
"active_output_count": upstream_output_count,
}
}
errors.append(error)
continue
# Walks MatchType/template chains so API workflows without
# frontend-injected type metadata get the same answer as the UI.
received_type = type_resolver.resolve_output_type(o_id, val[1])

View File

@ -0,0 +1,537 @@
"""Unit tests for ``DynamicOutputs.ByKey`` and the finalized-outputs path."""
import pytest
from comfy_api.latest import _io as io
# ---------------------------------------------------------------------------
# Schema-level construction and validation
# ---------------------------------------------------------------------------
def _byke():
return io.DynamicOutputs.ByKey(
id="result",
selector="mode",
options=[
io.DynamicOutputs.Option(key="image",
outputs=[io.Image.Output("image"), io.Mask.Output("mask")]),
io.DynamicOutputs.Option(key="latent",
outputs=[io.Latent.Output("latent")]),
],
)
def test_option_rejects_empty_key():
with pytest.raises(ValueError, match="non-empty string"):
io.DynamicOutputs.Option(key="", outputs=[])
def test_option_rejects_non_output_entry():
with pytest.raises(ValueError, match="Output instances"):
io.DynamicOutputs.Option(key="x", outputs=["not an output"])
def test_option_requires_explicit_output_ids():
with pytest.raises(ValueError, match="declare an id"):
io.DynamicOutputs.Option(key="x", outputs=[io.Image.Output()]) # no id
def test_bykey_rejects_empty_options():
with pytest.raises(ValueError, match="at least one Option"):
io.DynamicOutputs.ByKey(id="r", selector="m", options=[])
def test_bykey_rejects_duplicate_keys():
with pytest.raises(ValueError, match="duplicate option key"):
io.DynamicOutputs.ByKey(
id="r", selector="m",
options=[
io.DynamicOutputs.Option(key="x", outputs=[io.Image.Output("a")]),
io.DynamicOutputs.Option(key="x", outputs=[io.Latent.Output("b")]),
],
)
def test_bykey_rejects_duplicate_output_ids_across_options():
with pytest.raises(ValueError, match="appears in more than one option"):
io.DynamicOutputs.ByKey(
id="r", selector="m",
options=[
io.DynamicOutputs.Option(key="x", outputs=[io.Image.Output("dup")]),
io.DynamicOutputs.Option(key="y", outputs=[io.Latent.Output("dup")]),
],
)
# ---------------------------------------------------------------------------
# Schema integration
# ---------------------------------------------------------------------------
def _make_node(extra_outputs=None):
"""Build a V3 node class with a selector input + DynamicOutputs group."""
extras = extra_outputs or []
class DynNode(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="DynNode",
inputs=[io.Combo.Input("mode", options=["image", "latent"], default="image")],
outputs=[*extras, _byke()],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({"image": None, "mask": None})
return DynNode
def test_schema_validate_rejects_unknown_selector():
class BadSelector(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="BadSelector",
inputs=[io.Combo.Input("not_mode", options=["a"])],
outputs=[
io.DynamicOutputs.ByKey(
id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="a", outputs=[io.Image.Output("a")])],
),
],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({"a": None})
with pytest.raises(ValueError, match="selector input 'mode' does not exist"):
BadSelector.GET_SCHEMA()
def test_schema_validate_rejects_id_collision_with_static_output():
class Collision(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="Collision",
inputs=[io.Combo.Input("mode", options=["a"])],
outputs=[
io.Image.Output("shared"),
io.DynamicOutputs.ByKey(
id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="a", outputs=[io.Latent.Output("shared")])],
),
],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({"shared": None})
with pytest.raises(ValueError, match="Output ids must be unique"):
Collision.GET_SCHEMA()
def test_schema_get_v1_info_emits_dynamic_outputs_field():
DynNode = _make_node()
DynNode.GET_SCHEMA()
info = DynNode.SCHEMA.get_v1_info(DynNode)
assert info.dynamic_outputs is not None and len(info.dynamic_outputs) == 1
group = info.dynamic_outputs[0]
assert group["kind"] == "by_key"
assert group["selector"] == "mode"
assert {opt["key"] for opt in group["options"]} == {"image", "latent"}
# Static output arrays are empty — only the dynamic group is declared.
assert info.output == []
assert info.output_is_list == []
def test_schema_static_outputs_stable_prefix_in_v1_arrays():
"""A static output before a dynamic group still surfaces in RETURN_TYPES etc."""
DynNode = _make_node(extra_outputs=[io.String.Output("status")])
DynNode.GET_SCHEMA()
# Class-level static arrays are the always-present prefix.
assert list(DynNode.RETURN_TYPES) == ["STRING"]
assert list(DynNode.RETURN_NAMES) == ["status"]
assert list(DynNode.OUTPUT_IS_LIST) == [False]
# ---------------------------------------------------------------------------
# get_finalized_class_outputs
# ---------------------------------------------------------------------------
def test_finalize_picks_active_branch():
schema_outputs = [_byke()]
finalized = io.get_finalized_class_outputs(schema_outputs, {"mode": "latent"})
assert finalized.output_ids == ["latent"]
assert finalized.return_types == ["LATENT"]
assert finalized.output_is_list == [False]
def test_finalize_unknown_selector_yields_empty():
schema_outputs = [_byke()]
finalized = io.get_finalized_class_outputs(schema_outputs, {"mode": "nonexistent"})
assert len(finalized) == 0
def test_finalize_link_selector_yields_empty():
"""Link as selector value is treated as 'not finalizable' — no branch."""
schema_outputs = [_byke()]
finalized = io.get_finalized_class_outputs(schema_outputs, {"mode": ["src", 0]})
assert len(finalized) == 0
def test_finalize_static_prefix_preserved():
schema_outputs = [io.String.Output("status"), _byke()]
finalized = io.get_finalized_class_outputs(schema_outputs, {"mode": "image"})
assert finalized.output_ids == ["status", "image", "mask"]
assert finalized.return_types == ["STRING", "IMAGE", "MASK"]
# ---------------------------------------------------------------------------
# NodeOutput.from_named
# ---------------------------------------------------------------------------
def test_nodeoutput_from_named_stores_dict():
out = io.NodeOutput.from_named({"a": 1, "b": 2})
assert out.named == {"a": 1, "b": 2}
assert out.args == ()
assert out.result is None # `.result` is the positional tuple
def test_nodeoutput_rejects_mixed_positional_and_named():
with pytest.raises(ValueError, match="cannot mix positional"):
io.NodeOutput(1, 2, named={"a": 1})
# ---------------------------------------------------------------------------
# Group-id uniqueness
# ---------------------------------------------------------------------------
def test_schema_rejects_duplicate_dynamic_group_ids():
class Dup(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="Dup",
inputs=[io.Combo.Input("mode", options=["a"])],
outputs=[
io.DynamicOutputs.ByKey(id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="a", outputs=[io.Image.Output("x")])]),
io.DynamicOutputs.ByKey(id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="a", outputs=[io.Latent.Output("y")])]),
],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({"x": None, "y": None})
with pytest.raises(ValueError, match="DynamicOutputs group ids must be unique"):
Dup.GET_SCHEMA()
# ---------------------------------------------------------------------------
# DynamicOutputs.ByKey with a DynamicCombo selector
# ---------------------------------------------------------------------------
def _combo_input():
return io.DynamicCombo.Input("mode", options=[
io.DynamicCombo.Option(key="image", inputs=[io.Image.Input("img")]),
io.DynamicCombo.Option(key="latent", inputs=[io.Latent.Input("lat")]),
])
def _bykey_outputs():
return io.DynamicOutputs.ByKey(id="result", selector="mode", options=[
io.DynamicOutputs.Option(key="image", outputs=[io.Image.Output("processed"), io.Mask.Output("alpha")]),
io.DynamicOutputs.Option(key="latent", outputs=[io.Latent.Output("denoised")]),
])
def test_bykey_with_dynamic_combo_finalizes_branch():
finalized = io.get_finalized_class_outputs(
[io.String.Output("status"), _bykey_outputs()],
{"mode": {"mode": "image", "img": None}}, # DynamicCombo dispatch shape
)
assert finalized.output_ids == ["status", "processed", "alpha"]
assert finalized.return_types == ["STRING", "IMAGE", "MASK"]
def test_bykey_with_dynamic_combo_other_branch():
finalized = io.get_finalized_class_outputs(
[_bykey_outputs()],
{"mode": {"mode": "latent", "lat": None}},
)
assert finalized.output_ids == ["denoised"]
def test_schema_rejects_bykey_key_not_on_dynamic_combo():
class StrayKey(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="StrayKey",
inputs=[_combo_input()],
outputs=[io.DynamicOutputs.ByKey(id="r", selector="mode", options=[
io.DynamicOutputs.Option(key="image", outputs=[io.Image.Output("a")]),
io.DynamicOutputs.Option(key="audio", outputs=[io.String.Output("b")]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
with pytest.raises(ValueError, match=r"option key\(s\) \['audio'\] are not declared"):
StrayKey.GET_SCHEMA()
# ---------------------------------------------------------------------------
# DynamicOutputs.BySlot
# ---------------------------------------------------------------------------
def _slot_input():
return io.DynamicSlot.Input("slot", options=[
io.DynamicSlot.Option(when=io.Image),
io.DynamicSlot.Option(when=io.Latent),
io.DynamicSlot.Option(when=None, inputs=[io.Int.Input("seed")]),
])
def _byslot_outputs():
return io.DynamicOutputs.BySlot(id="slot_out", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=io.Image, outputs=[io.Image.Output("processed"), io.Mask.Output("alpha")]),
io.DynamicOutputs.SlotOption(when=io.Latent, outputs=[io.Latent.Output("denoised")]),
io.DynamicOutputs.SlotOption(when=None, outputs=[]),
])
def test_byslot_finalizes_by_resolved_type():
finalized = io.get_finalized_class_outputs(
[_byslot_outputs()],
{"slot": ["upstream", 0]},
live_input_types={"slot": "IMAGE"},
)
assert finalized.output_ids == ["processed", "alpha"]
finalized = io.get_finalized_class_outputs(
[_byslot_outputs()],
{"slot": ["upstream", 0]},
live_input_types={"slot": "LATENT"},
)
assert finalized.output_ids == ["denoised"]
def test_byslot_unconnected_uses_when_none():
finalized = io.get_finalized_class_outputs([_byslot_outputs()], {})
# when=None option declares outputs=[] → no active outputs
assert finalized.output_ids == []
def test_byslot_unmatched_type_yields_empty():
finalized = io.get_finalized_class_outputs(
[_byslot_outputs()],
{"slot": ["upstream", 0]},
live_input_types={"slot": "AUDIO"},
)
assert finalized.output_ids == []
def test_byslot_rejects_duplicate_when_types():
with pytest.raises(ValueError, match="appears in more than one option"):
io.DynamicOutputs.BySlot(id="r", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=io.Image, outputs=[io.Image.Output("a")]),
io.DynamicOutputs.SlotOption(when=io.Image, outputs=[io.Mask.Output("b")]),
])
def test_byslot_rejects_duplicate_when_none():
with pytest.raises(ValueError, match="only one option may declare when=None"):
io.DynamicOutputs.BySlot(id="r", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=None, outputs=[]),
io.DynamicOutputs.SlotOption(when=None, outputs=[io.Image.Output("x")]),
])
def test_schema_rejects_byslot_selector_not_a_dynamic_slot():
class WrongSel(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="WrongSel",
inputs=[io.Combo.Input("not_a_slot", options=["a"])],
outputs=[io.DynamicOutputs.BySlot(id="r", selector="not_a_slot", options=[
io.DynamicOutputs.SlotOption(when=io.Image, outputs=[io.Image.Output("x")]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
with pytest.raises(ValueError, match="must reference a DynamicSlot input"):
WrongSel.GET_SCHEMA()
def test_schema_rejects_byslot_when_type_not_on_slot():
class StrayWhen(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="StrayWhen",
inputs=[_slot_input()],
outputs=[io.DynamicOutputs.BySlot(id="r", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=io.Audio, outputs=[io.Audio.Output("x")]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
with pytest.raises(ValueError, match=r"type\(s\) \['AUDIO'\] are not accepted"):
StrayWhen.GET_SCHEMA()
def test_schema_rejects_byslot_when_none_without_slot_when_none():
class NoNone(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="NoNone",
inputs=[io.DynamicSlot.Input("slot", optional=False, options=[
io.DynamicSlot.Option(when=io.Image),
])],
outputs=[io.DynamicOutputs.BySlot(id="r", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=None, outputs=[]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
with pytest.raises(ValueError, match="requires DynamicSlot 'slot' to declare a when=None"):
NoNone.GET_SCHEMA()
def test_v1_info_emits_byslot_entry():
class N(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="SlotV1",
inputs=[_slot_input()],
outputs=[_byslot_outputs()],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
N.GET_SCHEMA()
info = N.SCHEMA.get_v1_info(N)
assert info.dynamic_outputs is not None and len(info.dynamic_outputs) == 1
entry = info.dynamic_outputs[0]
assert entry["kind"] == "by_slot"
assert entry["selector"] == "slot"
whens = [opt["when"] for opt in entry["options"]]
assert whens == [["IMAGE"], ["LATENT"], None]
# ---------------------------------------------------------------------------
# Nested dynamic selectors (selector reaches into a DynamicCombo / DynamicSlot
# option's nested inputs via a dotted path).
# ---------------------------------------------------------------------------
def test_schema_accepts_byslot_selector_into_nested_dynamic_slot():
"""A BySlot selector may target a DynamicSlot nested inside a DynamicCombo option."""
class Nested(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="NestedBySlot",
inputs=[
io.DynamicCombo.Input("outer", options=[
io.DynamicCombo.Option(key="a", inputs=[
io.DynamicSlot.Input("inner_slot", options=[
io.DynamicSlot.Option(when=io.Image),
io.DynamicSlot.Option(when=None),
]),
]),
]),
],
outputs=[io.DynamicOutputs.BySlot(id="r", selector="outer.inner_slot", options=[
io.DynamicOutputs.SlotOption(when=io.Image, outputs=[io.Image.Output("img")]),
io.DynamicOutputs.SlotOption(when=None, outputs=[]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
# Must not raise — selector resolves to the nested DynamicSlot via dotted path.
Nested.GET_SCHEMA()
def test_schema_accepts_bykey_selector_into_nested_dynamic_combo():
"""A ByKey selector may target a DynamicCombo nested inside another DynamicCombo option."""
class Nested(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="NestedByKey",
inputs=[
io.DynamicCombo.Input("outer", options=[
io.DynamicCombo.Option(key="branch", inputs=[
io.DynamicCombo.Input("inner", options=[
io.DynamicCombo.Option(key="x", inputs=[]),
io.DynamicCombo.Option(key="y", inputs=[]),
]),
]),
]),
],
outputs=[io.DynamicOutputs.ByKey(id="r", selector="outer.inner", options=[
io.DynamicOutputs.Option(key="x", outputs=[io.Int.Output("ix")]),
io.DynamicOutputs.Option(key="y", outputs=[io.String.Output("iy")]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
Nested.GET_SCHEMA()
def test_schema_rejects_byslot_nested_when_type_not_on_target_slot():
"""The when-type alignment check follows the dotted selector into the nested slot."""
class BadNested(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="BadNested",
inputs=[
io.DynamicCombo.Input("outer", options=[
io.DynamicCombo.Option(key="a", inputs=[
io.DynamicSlot.Input("inner_slot", options=[
io.DynamicSlot.Option(when=io.Image),
]),
]),
]),
],
outputs=[io.DynamicOutputs.BySlot(id="r", selector="outer.inner_slot", options=[
io.DynamicOutputs.SlotOption(when=io.Audio, outputs=[io.Audio.Output("x")]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
with pytest.raises(ValueError, match=r"type\(s\) \['AUDIO'\] are not accepted"):
BadNested.GET_SCHEMA()

View File

@ -0,0 +1,376 @@
"""TypeResolver + execution-helper tests for ``DynamicOutputs.ByKey``.
Covers the wiring between the per-prompt finalized output list and the
execution layer:
* type resolver returns the active branch's declared type
* type resolver reports the active output count for stale-link validation
* ``is_output_list`` reflects the active branch
* execution helpers refuse to consume ``NodeOutput(named=...)`` against a
non-dynamic node, and reorder against the finalized list for dynamic ones
"""
from __future__ import annotations
import sys
import types as _pytypes
import pytest
# ---------------------------------------------------------------------------
# Shared fixtures (mirror tests-unit/execution_test/test_type_resolver.py)
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_nodes_module():
real_nodes = sys.modules.get("nodes")
fake = _pytypes.ModuleType("nodes")
fake.NODE_CLASS_MAPPINGS = {}
sys.modules["nodes"] = fake
try:
yield fake.NODE_CLASS_MAPPINGS
finally:
if real_nodes is not None:
sys.modules["nodes"] = real_nodes
else:
del sys.modules["nodes"]
@pytest.fixture
def TypeResolver(fake_nodes_module):
from comfy_execution.type_resolver import TypeResolver as TR
return TR
def _v1_node(return_types: tuple[str, ...]):
class _V1:
RETURN_TYPES = return_types
@classmethod
def INPUT_TYPES(cls):
return {"required": {}}
return _V1
def _make_dyn_node():
"""V3 node: ``mode`` selector with two branches."""
from comfy_api.latest import _io as io
class DynBranch(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="DynBranch",
inputs=[io.Combo.Input("mode", options=["image", "latent"], default="image")],
outputs=[
io.DynamicOutputs.ByKey(
id="result", selector="mode",
options=[
io.DynamicOutputs.Option(key="image", outputs=[
io.Image.Output("image"),
io.Mask.Output("mask"),
]),
io.DynamicOutputs.Option(key="latent", outputs=[
io.Latent.Output("latent"),
]),
],
),
],
)
@classmethod
def execute(cls, mode):
if mode == "latent":
return io.NodeOutput.from_named({"latent": None})
return io.NodeOutput.from_named({"image": None, "mask": None})
DynBranch.GET_SCHEMA()
return DynBranch
# ---------------------------------------------------------------------------
# TypeResolver against finalized outputs
# ---------------------------------------------------------------------------
def test_dynamic_resolve_picks_active_branch_image(fake_nodes_module, TypeResolver):
fake_nodes_module["DynBranch"] = _make_dyn_node()
prompt = {"n1": {"class_type": "DynBranch", "inputs": {"mode": "image"}}}
r = TypeResolver(prompt)
assert r.resolve_output_type("n1", 0) == "IMAGE"
assert r.resolve_output_type("n1", 1) == "MASK"
def test_dynamic_resolve_picks_active_branch_latent(fake_nodes_module, TypeResolver):
fake_nodes_module["DynBranch"] = _make_dyn_node()
prompt = {"n1": {"class_type": "DynBranch", "inputs": {"mode": "latent"}}}
r = TypeResolver(prompt)
assert r.resolve_output_type("n1", 0) == "LATENT"
def test_dynamic_finalized_output_count(fake_nodes_module, TypeResolver):
fake_nodes_module["DynBranch"] = _make_dyn_node()
fake_nodes_module["Static"] = _v1_node(("INT", "FLOAT"))
prompt = {
"img": {"class_type": "DynBranch", "inputs": {"mode": "image"}},
"lat": {"class_type": "DynBranch", "inputs": {"mode": "latent"}},
"stat": {"class_type": "Static", "inputs": {}},
}
r = TypeResolver(prompt)
assert r.finalized_output_count("img") == 2 # image + mask
assert r.finalized_output_count("lat") == 1
assert r.finalized_output_count("stat") == 2 # static V1 falls through
def test_dynamic_is_output_list_reflects_branch(fake_nodes_module, TypeResolver):
from comfy_api.latest import _io as io
class DynList(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="DynList",
inputs=[io.Combo.Input("mode", options=["one", "many"], default="one")],
outputs=[
io.DynamicOutputs.ByKey(
id="r", selector="mode",
options=[
io.DynamicOutputs.Option(key="one", outputs=[
io.Image.Output("img"),
]),
io.DynamicOutputs.Option(key="many", outputs=[
io.Image.Output("imgs", is_output_list=True),
]),
],
),
],
)
@classmethod
def execute(cls, mode):
return io.NodeOutput.from_named({"img": None} if mode == "one" else {"imgs": [None]})
DynList.GET_SCHEMA()
fake_nodes_module["DynList"] = DynList
prompt = {
"one": {"class_type": "DynList", "inputs": {"mode": "one"}},
"many": {"class_type": "DynList", "inputs": {"mode": "many"}},
}
r = TypeResolver(prompt)
assert r.is_output_list("one", 0) is False
assert r.is_output_list("many", 0) is True
def test_dynamic_out_of_range_returns_any(fake_nodes_module, TypeResolver):
"""Slot index beyond the finalized branch resolves to AnyType (validation rejects separately)."""
fake_nodes_module["DynBranch"] = _make_dyn_node()
prompt = {"n1": {"class_type": "DynBranch", "inputs": {"mode": "latent"}}}
r = TypeResolver(prompt)
assert r.resolve_output_type("n1", 5) == "*"
# ---------------------------------------------------------------------------
# Execution-side helpers
# ---------------------------------------------------------------------------
def test_normalize_named_result_reorders_to_finalized():
from comfy_api.latest import _io as io
from execution import _normalize_named_result
finalized = io.get_finalized_class_outputs(
[io.DynamicOutputs.ByKey(
id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="x", outputs=[
io.Image.Output("a"), io.Mask.Output("b"), io.Latent.Output("c"),
])],
)],
{"mode": "x"},
)
node_output = io.NodeOutput.from_named({"c": 30, "a": 10, "b": 20})
assert _normalize_named_result(node_output, finalized) == (10, 20, 30)
def test_normalize_named_result_rejects_unknown_or_missing_ids():
from comfy_api.latest import _io as io
from execution import _normalize_named_result
finalized = io.get_finalized_class_outputs(
[io.DynamicOutputs.ByKey(
id="r", selector="mode",
options=[io.DynamicOutputs.Option(key="x", outputs=[
io.Image.Output("a"), io.Mask.Output("b"),
])],
)],
{"mode": "x"},
)
with pytest.raises(Exception, match="missing"):
_normalize_named_result(io.NodeOutput.from_named({"a": 1}), finalized)
with pytest.raises(Exception, match="unknown"):
_normalize_named_result(io.NodeOutput.from_named({"a": 1, "b": 2, "z": 3}), finalized)
def test_normalize_named_result_requires_dynamic_node():
from comfy_api.latest import _io as io
from execution import _normalize_named_result
with pytest.raises(Exception, match="DynamicOutputs"):
_normalize_named_result(io.NodeOutput.from_named({"a": 1}), None)
# ---------------------------------------------------------------------------
# Blocker / output-shape paths through get_output_from_returns
# ---------------------------------------------------------------------------
def _dyn_finalized(branch_outputs):
from comfy_api.latest import _io as io
return io.get_finalized_class_outputs(
[io.DynamicOutputs.ByKey(id="r", selector="mode", options=[
io.DynamicOutputs.Option(key="x", outputs=branch_outputs),
])],
{"mode": "x"},
)
def test_blocker_sized_to_finalized_outputs_for_node_output():
"""V3 node returning a bare ``ExecutionBlocker`` must yield blocker tuples
sized to the active output count, not the empty static RETURN_TYPES."""
from comfy_api.latest import _io as io
from comfy_execution.graph_utils import ExecutionBlocker
from execution import get_output_from_returns
finalized = _dyn_finalized([io.Image.Output("a"), io.Mask.Output("b")])
class _Obj:
RETURN_TYPES = () # only static outputs — dynamic group lives in schema
out = io.NodeOutput(block_execution="paused")
output, _ui, has_subgraph = get_output_from_returns([out], _Obj(), finalized_outputs=finalized)
assert has_subgraph is False
# merge_result_data flattens per-slot, one input → list-of-one per slot
assert len(output) == 2
for slot in output:
assert len(slot) == 1
assert isinstance(slot[0], ExecutionBlocker)
assert slot[0].message == "paused"
# ---------------------------------------------------------------------------
# DynamicOutputs.ByKey driven by a DynamicCombo selector (end-to-end resolver)
# ---------------------------------------------------------------------------
def _make_combo_bykey_node():
from comfy_api.latest import _io as io
class ComboBK(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="ComboBK",
inputs=[
io.DynamicCombo.Input("mode", options=[
io.DynamicCombo.Option(key="image", inputs=[io.Image.Input("img")]),
io.DynamicCombo.Option(key="latent", inputs=[io.Latent.Input("lat")]),
]),
],
outputs=[io.DynamicOutputs.ByKey(id="result", selector="mode", options=[
io.DynamicOutputs.Option(key="image",
outputs=[io.Image.Output("processed"), io.Mask.Output("alpha")]),
io.DynamicOutputs.Option(key="latent",
outputs=[io.Latent.Output("denoised")]),
])],
)
@classmethod
def execute(cls, mode, **kwargs):
if mode["mode"] == "latent":
return io.NodeOutput.from_named({"denoised": None})
return io.NodeOutput.from_named({"processed": None, "alpha": None})
ComboBK.GET_SCHEMA()
return ComboBK
def _make_slot_byslot_node():
from comfy_api.latest import _io as io
class SlotBS(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="SlotBS",
inputs=[
io.DynamicSlot.Input("slot", options=[
io.DynamicSlot.Option(when=io.Image),
io.DynamicSlot.Option(when=io.Latent),
io.DynamicSlot.Option(when=None),
]),
],
outputs=[io.DynamicOutputs.BySlot(id="slot_out", selector="slot", options=[
io.DynamicOutputs.SlotOption(when=io.Image,
outputs=[io.Image.Output("processed"), io.Mask.Output("alpha")]),
io.DynamicOutputs.SlotOption(when=io.Latent,
outputs=[io.Latent.Output("denoised")]),
io.DynamicOutputs.SlotOption(when=None, outputs=[]),
])],
)
@classmethod
def execute(cls, **kwargs):
return io.NodeOutput.from_named({})
SlotBS.GET_SCHEMA()
return SlotBS
def test_combo_bykey_resolver_picks_branch(fake_nodes_module, TypeResolver):
fake_nodes_module["ComboBK"] = _make_combo_bykey_node()
prompt = {
"img": {"class_type": "ComboBK", "inputs": {"mode": {"mode": "image", "img": None}}},
"lat": {"class_type": "ComboBK", "inputs": {"mode": {"mode": "latent", "lat": None}}},
}
r = TypeResolver(prompt)
assert r.resolve_output_type("img", 0) == "IMAGE"
assert r.resolve_output_type("img", 1) == "MASK"
assert r.resolve_output_type("lat", 0) == "LATENT"
assert r.finalized_output_count("img") == 2
assert r.finalized_output_count("lat") == 1
def test_slot_byslot_resolver_picks_by_resolved_type(fake_nodes_module, TypeResolver):
fake_nodes_module["SlotBS"] = _make_slot_byslot_node()
fake_nodes_module["ImageSrc"] = _v1_node(("IMAGE",))
fake_nodes_module["LatentSrc"] = _v1_node(("LATENT",))
prompt = {
"img_src": {"class_type": "ImageSrc", "inputs": {}},
"lat_src": {"class_type": "LatentSrc", "inputs": {}},
"image_consumer": {"class_type": "SlotBS", "inputs": {"slot": ["img_src", 0]}},
"latent_consumer": {"class_type": "SlotBS", "inputs": {"slot": ["lat_src", 0]}},
"unconnected": {"class_type": "SlotBS", "inputs": {}},
}
r = TypeResolver(prompt)
assert r.resolve_output_type("image_consumer", 0) == "IMAGE"
assert r.resolve_output_type("image_consumer", 1) == "MASK"
assert r.resolve_output_type("latent_consumer", 0) == "LATENT"
# Unconnected → when=None branch declares outputs=[]
assert r.finalized_output_count("unconnected") == 0
def test_bare_execution_blocker_sized_to_finalized_outputs():
"""The non-NodeOutput path (bare ``ExecutionBlocker`` from V1-style returns)
also sizes against the finalized list."""
from comfy_api.latest import _io as io
from comfy_execution.graph_utils import ExecutionBlocker
from execution import get_output_from_returns
finalized = _dyn_finalized([io.Image.Output("a"), io.Mask.Output("b"), io.Latent.Output("c")])
class _Obj:
RETURN_TYPES = ()
blocker = ExecutionBlocker("stopped")
output, _ui, has_subgraph = get_output_from_returns([blocker], _Obj(), finalized_outputs=finalized)
assert has_subgraph is False
assert len(output) == 3
for slot in output:
assert slot[0] is blocker