mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-06-16 20:06:04 +08:00
Compare commits
11 Commits
v0.25.0
...
dynamictyp
| Author | SHA1 | Date | |
|---|---|---|---|
| c20a04fef0 | |||
| cc3e2abd7f | |||
| 8cac12afa8 | |||
| 346ee898cb | |||
| 16dd7d115c | |||
| d91c1d8d48 | |||
| 0e4a15b7fb | |||
| 004ac8820b | |||
| 15f55f1b24 | |||
| e01b335e39 | |||
| 19390c112a |
@ -1090,7 +1090,7 @@ class Autogrow(ComfyTypeI):
|
||||
self.template.validate()
|
||||
|
||||
@staticmethod
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None):
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None, live_input_types: dict[str, str] | None = None):
|
||||
# NOTE: purposely do not include self in out_dict; instead use only the template inputs
|
||||
# need to figure out names based on template type
|
||||
is_names = ("names" in value[1]["template"])
|
||||
@ -1139,7 +1139,7 @@ class Autogrow(ComfyTypeI):
|
||||
finalized_prefix = finalize_prefix(curr_prefix)
|
||||
out_dict["dynamic_paths"][finalized_prefix] = finalized_prefix
|
||||
out_dict["dynamic_paths_default_value"][finalized_prefix] = DynamicPathsDefaultValue.EMPTY_DICT
|
||||
parse_class_inputs(out_dict, live_inputs, new_dict, curr_prefix)
|
||||
parse_class_inputs(out_dict, live_inputs, new_dict, curr_prefix, live_input_types)
|
||||
|
||||
@comfytype(io_type="COMFY_DYNAMICCOMBO_V3")
|
||||
class DynamicCombo(ComfyTypeI):
|
||||
@ -1177,7 +1177,7 @@ class DynamicCombo(ComfyTypeI):
|
||||
input.validate()
|
||||
|
||||
@staticmethod
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None):
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None, live_input_types: dict[str, str] | None = None):
|
||||
finalized_id = finalize_prefix(curr_prefix)
|
||||
if finalized_id in live_inputs:
|
||||
key = live_inputs[finalized_id]
|
||||
@ -1189,57 +1189,206 @@ class DynamicCombo(ComfyTypeI):
|
||||
selected_option = option
|
||||
break
|
||||
if selected_option is not None:
|
||||
parse_class_inputs(out_dict, live_inputs, selected_option["inputs"], curr_prefix)
|
||||
parse_class_inputs(out_dict, live_inputs, selected_option["inputs"], curr_prefix, live_input_types)
|
||||
# add self to inputs
|
||||
out_dict[input_type][finalized_id] = value
|
||||
out_dict["dynamic_paths"][finalized_id] = finalize_prefix(curr_prefix, curr_prefix[-1])
|
||||
|
||||
@comfytype(io_type="COMFY_DYNAMICSLOT_V3")
|
||||
class DynamicSlot(ComfyTypeI):
|
||||
"""A slot whose revealed inputs depend on the type connected upstream.
|
||||
|
||||
Each ``Option`` declares a ``when`` condition; the first option whose
|
||||
condition matches the slot's resolved upstream type (or whose
|
||||
``when=None`` matches an empty slot) decides which child inputs are
|
||||
exposed.
|
||||
|
||||
Each concrete type may appear in at most one option's ``when``, so the
|
||||
matching branch is unambiguous. The unconnected case (``when=None``) is
|
||||
its own bucket and may also appear at most once.
|
||||
|
||||
The AnyType limitation documented in
|
||||
:py:mod:`comfy_execution.type_resolver` applies: an upstream output
|
||||
declared as ``AnyType`` resolves to ``"*"`` and will only match a
|
||||
``when=io.AnyType`` option, never a concrete-type one.
|
||||
"""
|
||||
Type = dict[str, Any]
|
||||
|
||||
class Input(DynamicInput):
|
||||
def __init__(self, slot: Input, inputs: list[Input],
|
||||
display_name: str=None, tooltip: str=None, lazy: bool=None, extra_dict=None):
|
||||
assert(not isinstance(slot, DynamicInput))
|
||||
self.slot = copy.copy(slot)
|
||||
self.slot.display_name = slot.display_name if slot.display_name is not None else display_name
|
||||
optional = True
|
||||
self.slot.tooltip = slot.tooltip if slot.tooltip is not None else tooltip
|
||||
self.slot.lazy = slot.lazy if slot.lazy is not None else lazy
|
||||
self.slot.extra_dict = slot.extra_dict if slot.extra_dict is not None else extra_dict
|
||||
super().__init__(slot.id, self.slot.display_name, optional, self.slot.tooltip, self.slot.lazy, self.slot.extra_dict)
|
||||
class Option:
|
||||
"""One branch of inputs revealed when the slot's resolved type matches ``when``.
|
||||
|
||||
``when`` accepts:
|
||||
* ``None`` — no link present
|
||||
* ``io.AnyType`` — upstream resolved type is literally ``"*"``
|
||||
* a single ComfyType class (e.g. ``io.Image``)
|
||||
* a list of ComfyType classes (shared branch across multiple types)
|
||||
* a ``MultiType.Input`` instance (parsed via its ``.io_types``)
|
||||
"""
|
||||
def __init__(self, when: Any, inputs: list[Input]):
|
||||
self.when = when
|
||||
self.inputs = inputs
|
||||
self.force_input = None
|
||||
# force widget inputs to have no widgets, otherwise this would be awkward
|
||||
if isinstance(self.slot, WidgetInput):
|
||||
self.force_input = True
|
||||
self.slot.force_input = True
|
||||
# ``_when_types`` is the ordered tuple of io_types (deterministic);
|
||||
# ``_when_set`` is the same content as a set for fast matching.
|
||||
self._when_types = self._normalize_when(when)
|
||||
self._when_set: frozenset[str] | None = (
|
||||
None if self._when_types is None else frozenset(self._when_types)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_when(when: Any) -> tuple[str, ...] | None:
|
||||
"""Normalize ``when`` to an ordered, deduplicated tuple of io_types, or ``None`` for the unconnected case."""
|
||||
if when is None:
|
||||
return None
|
||||
if isinstance(when, type) and issubclass(when, _ComfyType):
|
||||
return (when.io_type,)
|
||||
if isinstance(when, MultiType.Input):
|
||||
result = tuple(dict.fromkeys(t.io_type for t in when.io_types))
|
||||
if "*" in result and len(result) > 1:
|
||||
raise ValueError(
|
||||
"DynamicSlot.Option: AnyType cannot be grouped with concrete types; "
|
||||
"use a separate Option(when=io.AnyType, ...) instead"
|
||||
)
|
||||
return result
|
||||
if isinstance(when, Iterable) and not isinstance(when, str):
|
||||
types: list[str] = []
|
||||
for t in when:
|
||||
if not (isinstance(t, type) and issubclass(t, _ComfyType)):
|
||||
raise ValueError(
|
||||
f"DynamicSlot.Option: list entries must be ComfyType classes, got {t!r}"
|
||||
)
|
||||
if t.io_type not in types:
|
||||
types.append(t.io_type)
|
||||
if not types:
|
||||
raise ValueError("DynamicSlot.Option: when=[] is not allowed; use when=None instead")
|
||||
if "*" in types and len(types) > 1:
|
||||
raise ValueError(
|
||||
"DynamicSlot.Option: AnyType cannot be grouped with concrete types; "
|
||||
"use a separate Option(when=io.AnyType, ...) instead"
|
||||
)
|
||||
return tuple(types)
|
||||
raise ValueError(
|
||||
"DynamicSlot.Option: when must be None, a ComfyType class, a list of ComfyType classes, "
|
||||
f"or a MultiType.Input; got {when!r}"
|
||||
)
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
"when": None if self._when_types is None else list(self._when_types),
|
||||
"inputs": create_input_dict_v1(self.inputs),
|
||||
}
|
||||
|
||||
class Input(DynamicInput):
|
||||
def __init__(self, id: str, options: list[DynamicSlot.Option],
|
||||
display_name: str=None, optional: bool=True, tooltip: str=None, lazy: bool=None, extra_dict=None):
|
||||
if not options:
|
||||
raise ValueError("DynamicSlot.Input: at least one Option is required")
|
||||
for opt in options:
|
||||
if not isinstance(opt, DynamicSlot.Option):
|
||||
raise ValueError(
|
||||
f"DynamicSlot.Input: options must be DynamicSlot.Option instances, got {opt!r}"
|
||||
)
|
||||
super().__init__(id, display_name, optional, tooltip, lazy, extra_dict)
|
||||
self.options = options
|
||||
# Enforce uniqueness: each io_type (and the unconnected case) may
|
||||
# appear in at most one option's ``when``. Also derive the slot's
|
||||
# declared connection type as the ordered union of every non-None
|
||||
# option's ``when`` set so authors control displayed precedence.
|
||||
seen_types: set[str] = set()
|
||||
seen_none = False
|
||||
connected_types: list[str] = []
|
||||
for opt in options:
|
||||
if opt._when_types is None:
|
||||
if seen_none:
|
||||
raise ValueError("DynamicSlot.Input: only one Option may declare when=None")
|
||||
seen_none = True
|
||||
continue
|
||||
for t in opt._when_types:
|
||||
if t in seen_types:
|
||||
raise ValueError(
|
||||
f"DynamicSlot.Input: type {t!r} appears in more than one Option's `when`; "
|
||||
"each type must be claimed by exactly one option"
|
||||
)
|
||||
seen_types.add(t)
|
||||
connected_types.append(t)
|
||||
if not connected_types:
|
||||
raise ValueError(
|
||||
"DynamicSlot.Input: at least one Option must have a non-None `when`; "
|
||||
"a slot with only a `when=None` option can never be connected"
|
||||
)
|
||||
# A required slot demands a link, so the when=None branch is unreachable.
|
||||
if not optional and seen_none:
|
||||
raise ValueError(
|
||||
"DynamicSlot.Input: optional=False forbids when=None options; "
|
||||
"the unconnected branch is unreachable when a link is required"
|
||||
)
|
||||
self._slot_io_type = ",".join(connected_types)
|
||||
|
||||
# parse_class_inputs dispatches on the class io_type (COMFY_DYNAMICSLOT_V3),
|
||||
# so get_all/get_io_type must not be overridden; slotType is published via as_dict.
|
||||
|
||||
def get_all(self) -> list[Input]:
|
||||
return [self.slot] + self.inputs
|
||||
seen_ids: set[str] = set()
|
||||
children: list[Input] = []
|
||||
for opt in self.options:
|
||||
for inp in opt.inputs:
|
||||
if inp.id in seen_ids:
|
||||
continue
|
||||
seen_ids.add(inp.id)
|
||||
children.append(inp)
|
||||
return [self] + children
|
||||
|
||||
def as_dict(self):
|
||||
return super().as_dict() | prune_dict({
|
||||
"slotType": str(self.slot.get_io_type()),
|
||||
"inputs": create_input_dict_v1(self.inputs),
|
||||
"forceInput": self.force_input,
|
||||
"slotType": self._slot_io_type,
|
||||
"options": [o.as_dict() for o in self.options],
|
||||
# Always render as a connector — slotType may include widget-capable
|
||||
# types (INT/STRING/etc.) but a DynamicSlot is a connection point.
|
||||
"forceInput": True,
|
||||
})
|
||||
|
||||
def validate(self):
|
||||
self.slot.validate()
|
||||
for input in self.inputs:
|
||||
input.validate()
|
||||
for opt in self.options:
|
||||
for inp in opt.inputs:
|
||||
inp.validate()
|
||||
|
||||
@staticmethod
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None):
|
||||
def _select_option(options: list[dict[str, Any]], live_input_types: dict[str, str] | None,
|
||||
finalized_id: str, has_link: bool) -> dict[str, Any] | None:
|
||||
"""Pick the first option whose ``when`` matches the slot's state.
|
||||
|
||||
Connected: pick the first option whose ``when`` set intersects the
|
||||
comma-split resolved type. Unconnected: pick the first ``when=None``.
|
||||
With per-option type uniqueness, at most one connected option can match
|
||||
any single concrete type; ordering only matters when upstream declares
|
||||
a multi-type union (e.g. ``"IMAGE,MASK"``).
|
||||
"""
|
||||
if not has_link:
|
||||
for opt in options:
|
||||
if opt["when"] is None:
|
||||
return opt
|
||||
return None
|
||||
resolved = (live_input_types or {}).get(finalized_id, "*")
|
||||
resolved_set = {t.strip() for t in resolved.split(",")}
|
||||
for opt in options:
|
||||
when = opt["when"]
|
||||
if when is None:
|
||||
continue
|
||||
if resolved_set & set(when):
|
||||
return opt
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _expand_schema_for_dynamic(out_dict: dict[str, Any], live_inputs: dict[str, Any], value: tuple[str, dict[str, Any]], input_type: str, curr_prefix: list[str] | None, live_input_types: dict[str, str] | None = None):
|
||||
finalized_id = finalize_prefix(curr_prefix)
|
||||
if finalized_id in live_inputs:
|
||||
inputs = value[1]["inputs"]
|
||||
parse_class_inputs(out_dict, live_inputs, inputs, curr_prefix)
|
||||
# add self to inputs
|
||||
out_dict[input_type][finalized_id] = value
|
||||
out_dict["dynamic_paths"][finalized_id] = finalize_prefix(curr_prefix, curr_prefix[-1])
|
||||
options: list[dict[str, Any]] = value[1].get("options", [])
|
||||
has_link = finalized_id in live_inputs and live_inputs[finalized_id] is not None
|
||||
selected = DynamicSlot._select_option(options, live_input_types, finalized_id, has_link)
|
||||
if selected is not None:
|
||||
parse_class_inputs(out_dict, live_inputs, selected["inputs"], curr_prefix, live_input_types)
|
||||
# Always advertise the slot itself so the connector renders even when no
|
||||
# option matched (unmatched concrete + no AnyType option).
|
||||
out_dict[input_type][finalized_id] = value
|
||||
out_dict["dynamic_paths"][finalized_id] = finalize_prefix(curr_prefix, curr_prefix[-1])
|
||||
|
||||
@comfytype(io_type="IMAGECOMPARE")
|
||||
class ImageCompare(ComfyTypeI):
|
||||
@ -1357,11 +1506,18 @@ class Range(ComfyTypeIO):
|
||||
})
|
||||
|
||||
|
||||
DYNAMIC_INPUT_LOOKUP: dict[str, Callable[[dict[str, Any], dict[str, Any], tuple[str, dict[str, Any]], str, list[str] | None], None]] = {}
|
||||
def register_dynamic_input_func(io_type: str, func: Callable[[dict[str, Any], dict[str, Any], tuple[str, dict[str, Any]], str, list[str] | None], None]):
|
||||
# Signature: (out_dict, live_inputs, value, input_type, curr_prefix, live_input_types).
|
||||
# live_input_types is {input_id: resolved_io_type} from TypeResolver; existing
|
||||
# expanders ignore it, future type-discriminated ones use it as discriminator.
|
||||
_DynamicInputFunc = Callable[
|
||||
[dict[str, Any], dict[str, Any], tuple[str, dict[str, Any]], str, list[str] | None, dict[str, str] | None],
|
||||
None,
|
||||
]
|
||||
DYNAMIC_INPUT_LOOKUP: dict[str, _DynamicInputFunc] = {}
|
||||
def register_dynamic_input_func(io_type: str, func: _DynamicInputFunc):
|
||||
DYNAMIC_INPUT_LOOKUP[io_type] = func
|
||||
|
||||
def get_dynamic_input_func(io_type: str) -> Callable[[dict[str, Any], dict[str, Any], tuple[str, dict[str, Any]], str, list[str] | None], None]:
|
||||
def get_dynamic_input_func(io_type: str) -> _DynamicInputFunc:
|
||||
return DYNAMIC_INPUT_LOOKUP[io_type]
|
||||
|
||||
def setup_dynamic_input_funcs():
|
||||
@ -1709,7 +1865,12 @@ class Schema:
|
||||
)
|
||||
return info
|
||||
|
||||
def get_finalized_class_inputs(d: dict[str, Any], live_inputs: dict[str, Any], include_hidden=False) -> tuple[dict[str, Any], V3Data]:
|
||||
def get_finalized_class_inputs(d: dict[str, Any], live_inputs: dict[str, Any], include_hidden=False, live_input_types: dict[str, str] | None = None) -> tuple[dict[str, Any], V3Data]:
|
||||
"""Expand a node's V3 schema against a concrete prompt.
|
||||
|
||||
``live_input_types`` is an optional ``{input_id: resolved_io_type}`` map
|
||||
(from ``TypeResolver``) used by future type-discriminated dynamic inputs.
|
||||
"""
|
||||
out_dict = {
|
||||
"required": {},
|
||||
"optional": {},
|
||||
@ -1719,7 +1880,7 @@ def get_finalized_class_inputs(d: dict[str, Any], live_inputs: dict[str, Any], i
|
||||
d = d.copy()
|
||||
# ignore hidden for parsing
|
||||
hidden = d.pop("hidden", None)
|
||||
parse_class_inputs(out_dict, live_inputs, d)
|
||||
parse_class_inputs(out_dict, live_inputs, d, None, live_input_types)
|
||||
if hidden is not None and include_hidden:
|
||||
out_dict["hidden"] = hidden
|
||||
v3_data = {}
|
||||
@ -1732,7 +1893,7 @@ def get_finalized_class_inputs(d: dict[str, Any], live_inputs: dict[str, Any], i
|
||||
v3_data["dynamic_paths_default_value"] = dynamic_paths_default_value
|
||||
return out_dict, hidden, v3_data
|
||||
|
||||
def parse_class_inputs(out_dict: dict[str, Any], live_inputs: dict[str, Any], curr_dict: dict[str, Any], curr_prefix: list[str] | None=None) -> None:
|
||||
def parse_class_inputs(out_dict: dict[str, Any], live_inputs: dict[str, Any], curr_dict: dict[str, Any], curr_prefix: list[str] | None=None, live_input_types: dict[str, str] | None = None) -> None:
|
||||
for input_type, inner_d in curr_dict.items():
|
||||
for id, value in inner_d.items():
|
||||
io_type = value[0]
|
||||
@ -1740,7 +1901,7 @@ def parse_class_inputs(out_dict: dict[str, Any], live_inputs: dict[str, Any], cu
|
||||
# dynamic inputs need to be handled with lookup functions
|
||||
dynamic_input_func = get_dynamic_input_func(io_type)
|
||||
new_prefix = handle_prefix(curr_prefix, id)
|
||||
dynamic_input_func(out_dict, live_inputs, value, input_type, new_prefix)
|
||||
dynamic_input_func(out_dict, live_inputs, value, input_type, new_prefix, live_input_types)
|
||||
else:
|
||||
# non-dynamic inputs get directly transferred
|
||||
finalized_id = finalize_prefix(curr_prefix, id)
|
||||
|
||||
@ -26,6 +26,7 @@ class DynamicPrompt:
|
||||
self.ephemeral_prompt = {}
|
||||
self.ephemeral_parents = {}
|
||||
self.ephemeral_display = {}
|
||||
self._type_resolver = None # lazy; invalidated by add_ephemeral_node
|
||||
|
||||
def get_node(self, node_id):
|
||||
if node_id in self.ephemeral_prompt:
|
||||
@ -41,6 +42,17 @@ class DynamicPrompt:
|
||||
self.ephemeral_prompt[node_id] = node_info
|
||||
self.ephemeral_parents[node_id] = parent_id
|
||||
self.ephemeral_display[node_id] = display_id
|
||||
# Selective downstream invalidation would need topological info; the
|
||||
# cache is small, so just wipe it.
|
||||
if self._type_resolver is not None:
|
||||
self._type_resolver.invalidate()
|
||||
|
||||
def get_type_resolver(self):
|
||||
"""Lazily build and return the per-prompt TypeResolver."""
|
||||
if self._type_resolver is None:
|
||||
from comfy_execution.type_resolver import TypeResolver
|
||||
self._type_resolver = TypeResolver(self)
|
||||
return self._type_resolver
|
||||
|
||||
def get_real_node_id(self, node_id):
|
||||
while node_id in self.ephemeral_parents:
|
||||
|
||||
369
comfy_execution/type_resolver.py
Normal file
369
comfy_execution/type_resolver.py
Normal file
@ -0,0 +1,369 @@
|
||||
"""Server-side type resolver for prompt graphs.
|
||||
|
||||
Resolves the concrete io_type of any output/input slot by walking the prompt
|
||||
graph. Handles V1/V3 ``RETURN_TYPES``, V3 ``MatchType`` template chains, and
|
||||
falls back to ``AnyType`` (with a one-shot warning) on cycles, depth overflow,
|
||||
or unresolvable wildcards.
|
||||
|
||||
Works against either a raw prompt dict or a ``DynamicPrompt``. All resolved
|
||||
values are strings, so resolver state is cross-process serializable.
|
||||
|
||||
Known limitation: when an upstream node declares its output as ``AnyType``
|
||||
(``"*"``) — Reroute, generic If/Else, many V1 utility nodes — the resolver
|
||||
returns ``"*"``. It has no way to introspect the runtime value to recover a
|
||||
more specific type. Downstream consumers (e.g. :py:class:`DynamicSlot`) will
|
||||
treat such links as AnyType and select their ``AnyType`` branch (or none),
|
||||
not a concrete-type branch.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from comfy_api.latest import _io as io
|
||||
from comfy_api.internal import _ComfyNodeInternal
|
||||
|
||||
|
||||
def _parse_link(val: Any) -> tuple[str, int] | None:
|
||||
"""Return ``(src_node_id, src_slot_idx)`` if ``val`` is a well-formed link, else ``None``.
|
||||
|
||||
A link is ``[node_id: str, slot_idx: int]``. Malformed shapes return ``None``
|
||||
so callers can fall back to AnyType rather than raise.
|
||||
"""
|
||||
if not isinstance(val, (list, tuple)) or len(val) != 2:
|
||||
return None
|
||||
src_node, src_slot = val[0], val[1]
|
||||
if not isinstance(src_node, str):
|
||||
return None
|
||||
# bool is a subclass of int — reject so True/False aren't read as slot 1/0.
|
||||
if isinstance(src_slot, bool) or not isinstance(src_slot, int):
|
||||
return None
|
||||
return src_node, src_slot
|
||||
|
||||
ANY_TYPE: str = io.AnyType.io_type
|
||||
MAX_RESOLVE_DEPTH: int = 64 # belt-and-suspenders cap; real MatchType chains stay tiny
|
||||
|
||||
|
||||
class TypeResolver:
|
||||
"""Resolves concrete io_types for a prompt graph.
|
||||
|
||||
Instantiate once per prompt (or per ``DynamicPrompt``) and reuse; results
|
||||
are cached. Call :py:meth:`invalidate` (or :py:meth:`invalidate_node`) when
|
||||
the underlying graph mutates (e.g. when an ephemeral node is added).
|
||||
"""
|
||||
|
||||
def __init__(self, prompt_source: Any):
|
||||
"""Args:
|
||||
prompt_source: Either a ``DynamicPrompt`` (anything with
|
||||
``get_node(node_id)`` / ``has_node(node_id)``) or a plain
|
||||
``dict[node_id, {"class_type", "inputs"}]``.
|
||||
"""
|
||||
self._source = prompt_source
|
||||
self._output_cache: dict[tuple[str, int], str] = {}
|
||||
self._is_output_list_cache: dict[tuple[str, int], bool] = {}
|
||||
self._warned: set[tuple[str, Any, str]] = set()
|
||||
|
||||
# ---- prompt access ----------------------------------------------------
|
||||
def _has_node(self, node_id: str) -> bool:
|
||||
if hasattr(self._source, "has_node"):
|
||||
return self._source.has_node(node_id)
|
||||
return node_id in self._source
|
||||
|
||||
def _get_node(self, node_id: str) -> dict[str, Any] | None:
|
||||
try:
|
||||
if hasattr(self._source, "get_node"):
|
||||
return self._source.get_node(node_id)
|
||||
return self._source[node_id]
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_class_def(class_type: str):
|
||||
# Local import: nodes <-> comfy_execution would cycle at import time.
|
||||
import nodes
|
||||
return nodes.NODE_CLASS_MAPPINGS.get(class_type)
|
||||
|
||||
def _get_class_def_for_node(self, node_id: str):
|
||||
"""Return (node_dict, class_def) for ``node_id``, or ``(None, None)``."""
|
||||
if not self._has_node(node_id):
|
||||
return None, None
|
||||
node = self._get_node(node_id)
|
||||
if node is None:
|
||||
return None, None
|
||||
class_type = node.get("class_type")
|
||||
if not isinstance(class_type, str):
|
||||
return node, None
|
||||
return node, self._get_class_def(class_type)
|
||||
|
||||
# ---- cache management -------------------------------------------------
|
||||
def invalidate(self) -> None:
|
||||
"""Clear all cached resolutions. Cheap; call after any graph mutation."""
|
||||
self._output_cache.clear()
|
||||
self._is_output_list_cache.clear()
|
||||
# Keep self._warned: re-emitting already-logged warnings would just spam.
|
||||
|
||||
def invalidate_node(self, node_id: str) -> None:
|
||||
"""Clear cached entries for a single node (e.g. after node-level expand)."""
|
||||
for key in [k for k in self._output_cache if k[0] == node_id]:
|
||||
del self._output_cache[key]
|
||||
for key in [k for k in self._is_output_list_cache if k[0] == node_id]:
|
||||
del self._is_output_list_cache[key]
|
||||
|
||||
# ---- output resolution -----------------------------------------------
|
||||
def resolve_output_type(self, node_id: str, slot_idx: int,
|
||||
_stack: frozenset[tuple[str, int]] | None = None) -> str:
|
||||
"""Return the resolved io_type string of ``node_id``'s output slot.
|
||||
|
||||
Falls back to ``ANY_TYPE`` on cycle, depth-overflow, unknown class,
|
||||
out-of-range slot, missing node, malformed link, or unresolved
|
||||
MatchType template.
|
||||
"""
|
||||
# Degrade gracefully on non-int slot_idx (e.g. malformed API JSON).
|
||||
if isinstance(slot_idx, bool) or not isinstance(slot_idx, int):
|
||||
return ANY_TYPE
|
||||
|
||||
cache_key = (node_id, slot_idx)
|
||||
if cache_key in self._output_cache:
|
||||
return self._output_cache[cache_key]
|
||||
|
||||
if _stack is None:
|
||||
_stack = frozenset()
|
||||
if cache_key in _stack:
|
||||
self._warn(node_id, slot_idx, "cycle detected during type resolution; defaulting to AnyType")
|
||||
return ANY_TYPE
|
||||
if len(_stack) >= MAX_RESOLVE_DEPTH:
|
||||
self._warn(node_id, slot_idx, f"exceeded MAX_RESOLVE_DEPTH={MAX_RESOLVE_DEPTH}; defaulting to AnyType")
|
||||
return ANY_TYPE
|
||||
next_stack = _stack | {cache_key}
|
||||
|
||||
node, class_def = self._get_class_def_for_node(node_id)
|
||||
if class_def is None:
|
||||
return ANY_TYPE
|
||||
class_type = node.get("class_type")
|
||||
|
||||
try:
|
||||
return_types = class_def.RETURN_TYPES
|
||||
except Exception:
|
||||
return ANY_TYPE
|
||||
if return_types is None or slot_idx < 0 or slot_idx >= len(return_types):
|
||||
return ANY_TYPE
|
||||
|
||||
declared = return_types[slot_idx]
|
||||
|
||||
# Only V3 schemas carry MatchType template info; V1 RETURN_TYPES are
|
||||
# always concrete strings.
|
||||
resolved = declared
|
||||
if isinstance(class_def, type) and issubclass(class_def, _ComfyNodeInternal):
|
||||
schema = getattr(class_def, "SCHEMA", None)
|
||||
if schema is None:
|
||||
# RETURN_TYPES access above usually populates SCHEMA — be defensive.
|
||||
try:
|
||||
schema = class_def.GET_SCHEMA()
|
||||
except Exception:
|
||||
schema = None
|
||||
if schema is not None and slot_idx < len(schema.outputs):
|
||||
out = schema.outputs[slot_idx]
|
||||
if isinstance(out, io.MatchType.Output):
|
||||
resolved = self._resolve_match_template(
|
||||
node_id, schema, out.template.template_id, next_stack
|
||||
)
|
||||
|
||||
# Warn only for V1 wildcards declared as "*"; unresolved MatchType
|
||||
# templates warn separately in _resolve_match_template, avoiding double-warns.
|
||||
if isinstance(resolved, str) and resolved == ANY_TYPE and declared == ANY_TYPE:
|
||||
self._warn(
|
||||
node_id, slot_idx,
|
||||
f"node '{class_type}' output slot {slot_idx} is wildcard; defaulting to AnyType",
|
||||
)
|
||||
|
||||
if not isinstance(resolved, str):
|
||||
# e.g. legacy combo declared as a list of options.
|
||||
self._warn(node_id, slot_idx,
|
||||
f"node '{class_type}' output slot {slot_idx} has non-string return type {type(resolved).__name__}; defaulting to AnyType")
|
||||
resolved = ANY_TYPE
|
||||
|
||||
self._output_cache[cache_key] = resolved
|
||||
return resolved
|
||||
|
||||
def _resolve_match_template(self, node_id: str, schema, template_id: str,
|
||||
stack: frozenset[tuple[str, int]]) -> str:
|
||||
"""Walk MatchType.Inputs sharing ``template_id``; return first concrete resolution or ``ANY_TYPE``."""
|
||||
node = self._get_node(node_id)
|
||||
inputs_dict = (node or {}).get("inputs", {}) or {}
|
||||
any_input_seen = False
|
||||
for inp in schema.inputs:
|
||||
if not isinstance(inp, io.MatchType.Input):
|
||||
continue
|
||||
if inp.template.template_id != template_id:
|
||||
continue
|
||||
any_input_seen = True
|
||||
val = inputs_dict.get(inp.id)
|
||||
if val is None:
|
||||
continue
|
||||
link = _parse_link(val)
|
||||
if link is not None:
|
||||
t = self.resolve_output_type(link[0], link[1], stack)
|
||||
if t != ANY_TYPE:
|
||||
return t
|
||||
# Literal or malformed link: MatchType slots have no declared concrete type.
|
||||
if not any_input_seen:
|
||||
# Node-author bug: output template has no matching Input.
|
||||
self._warn(node_id, None,
|
||||
f"MatchType output template '{template_id}' has no matching Input on the node; defaulting to AnyType")
|
||||
else:
|
||||
self._warn(node_id, None,
|
||||
f"MatchType template '{template_id}' has no bound concrete upstream input; defaulting to AnyType")
|
||||
return ANY_TYPE
|
||||
|
||||
def is_output_list(self, node_id: str, slot_idx: int) -> bool:
|
||||
"""Whether the source slot is declared as a list output (``OUTPUT_IS_LIST[idx]``)."""
|
||||
if isinstance(slot_idx, bool) or not isinstance(slot_idx, int):
|
||||
return False
|
||||
cache_key = (node_id, slot_idx)
|
||||
if cache_key in self._is_output_list_cache:
|
||||
return self._is_output_list_cache[cache_key]
|
||||
result = False
|
||||
_, class_def = self._get_class_def_for_node(node_id)
|
||||
if class_def is not None:
|
||||
lst = getattr(class_def, "OUTPUT_IS_LIST", None)
|
||||
if lst is not None and 0 <= slot_idx < len(lst):
|
||||
result = bool(lst[slot_idx])
|
||||
self._is_output_list_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
# ---- input resolution ------------------------------------------------
|
||||
def resolve_input_type(self, node_id: str, input_id: str) -> str:
|
||||
"""Resolve the io_type of the value currently bound to a node's input.
|
||||
|
||||
* If the value is a link, return the resolved type of the source slot.
|
||||
* If the value is a literal, return the declared slot's effective
|
||||
io_type (peeling dynamic-input wrappers — e.g. an Autogrow-of-Image
|
||||
slot resolves to ``IMAGE``, not ``COMFY_AUTOGROW_V3``).
|
||||
* If the value is missing, malformed, or the slot is unknown, return
|
||||
``ANY_TYPE``.
|
||||
"""
|
||||
node = self._get_node(node_id)
|
||||
if node is None:
|
||||
return ANY_TYPE
|
||||
inputs = node.get("inputs", {}) or {}
|
||||
if input_id not in inputs:
|
||||
return ANY_TYPE
|
||||
link = _parse_link(inputs[input_id])
|
||||
if link is not None:
|
||||
return self.resolve_output_type(link[0], link[1])
|
||||
return self.get_declared_slot_io_type(node_id, input_id)
|
||||
|
||||
def is_input_list(self, node_id: str, input_id: str) -> bool:
|
||||
"""Whether the value bound to ``input_id`` originates from a list output."""
|
||||
node = self._get_node(node_id)
|
||||
if node is None:
|
||||
return False
|
||||
link = _parse_link((node.get("inputs", {}) or {}).get(input_id))
|
||||
if link is None:
|
||||
return False
|
||||
return self.is_output_list(link[0], link[1])
|
||||
|
||||
def get_declared_slot_io_type(self, node_id: str, input_id: str) -> str:
|
||||
"""Return the effective declared io_type of a node's input slot.
|
||||
|
||||
Peels dynamic-input wrappers so that the user-facing element type is
|
||||
returned:
|
||||
|
||||
* Autogrow → wrapped template input's io_type
|
||||
* DynamicSlot → underlying slot's io_type
|
||||
* Anything else → the slot's own io_type
|
||||
* DynamicCombo / unsupported → ``ANY_TYPE`` (the combo key is itself
|
||||
dynamic, not a meaningful type for consumers)
|
||||
"""
|
||||
_, class_def = self._get_class_def_for_node(node_id)
|
||||
if class_def is None:
|
||||
return ANY_TYPE
|
||||
|
||||
# Prefer V3 schema (carries dynamic-input wrapper info).
|
||||
if isinstance(class_def, type) and issubclass(class_def, _ComfyNodeInternal):
|
||||
schema = getattr(class_def, "SCHEMA", None)
|
||||
if schema is None:
|
||||
try:
|
||||
class_def.GET_SCHEMA()
|
||||
schema = getattr(class_def, "SCHEMA", None)
|
||||
except Exception:
|
||||
schema = None
|
||||
if schema is not None:
|
||||
# Top-level input id.
|
||||
for inp in schema.inputs:
|
||||
if inp.id == input_id:
|
||||
return self._effective_io_type(inp)
|
||||
# Nested (DynamicSlot / DynamicCombo `parent.child`).
|
||||
if "." in input_id:
|
||||
top, _, _ = input_id.partition(".")
|
||||
for inp in schema.inputs:
|
||||
if inp.id != top:
|
||||
continue
|
||||
for child in inp.get_all():
|
||||
if child is inp:
|
||||
continue
|
||||
if child.id == input_id.split(".", 1)[1]:
|
||||
return self._effective_io_type(child)
|
||||
# Fall through to V1 dict (hidden inputs, etc.).
|
||||
|
||||
try:
|
||||
inputs = class_def.INPUT_TYPES()
|
||||
except Exception:
|
||||
return ANY_TYPE
|
||||
for section in ("required", "optional"):
|
||||
section_d = inputs.get(section, {})
|
||||
if input_id in section_d:
|
||||
entry = section_d[input_id]
|
||||
if not entry:
|
||||
return ANY_TYPE
|
||||
t = entry[0]
|
||||
if isinstance(t, str):
|
||||
return t
|
||||
if isinstance(t, list): # legacy combo declared as list of options
|
||||
return io.Combo.io_type
|
||||
return ANY_TYPE
|
||||
return ANY_TYPE
|
||||
|
||||
@staticmethod
|
||||
def _effective_io_type(inp) -> str:
|
||||
"""Return the consumer-facing io_type of a (possibly dynamic) input."""
|
||||
# Autogrow / DynamicSlot wrap a real element type; that's what consumers care about.
|
||||
if isinstance(inp, io.Autogrow.Input):
|
||||
try:
|
||||
return inp.template.input.get_io_type()
|
||||
except Exception:
|
||||
return ANY_TYPE
|
||||
if isinstance(inp, io.DynamicSlot.Input):
|
||||
# Auto-derived slot type — comma-joined union of all option `when` types.
|
||||
return getattr(inp, "_slot_io_type", ANY_TYPE)
|
||||
# DynamicCombo's "type" is a key selector, not a connection type.
|
||||
if isinstance(inp, io.DynamicCombo.Input):
|
||||
return ANY_TYPE
|
||||
try:
|
||||
return inp.get_io_type()
|
||||
except Exception:
|
||||
return ANY_TYPE
|
||||
|
||||
# ---- bulk helpers ----------------------------------------------------
|
||||
def compute_live_input_types(self, node_id: str) -> dict[str, str]:
|
||||
"""Build the ``{input_id: resolved_io_type}`` map for a node.
|
||||
|
||||
Consumed by ``_io.get_finalized_class_inputs`` so future per-type
|
||||
dynamic-input expansion can branch on what was actually connected.
|
||||
"""
|
||||
node = self._get_node(node_id)
|
||||
if node is None:
|
||||
return {}
|
||||
out: dict[str, str] = {}
|
||||
for input_id in (node.get("inputs", {}) or {}).keys():
|
||||
out[input_id] = self.resolve_input_type(node_id, input_id)
|
||||
return out
|
||||
|
||||
# ---- diagnostics -----------------------------------------------------
|
||||
def _warn(self, node_id: str, slot_idx: Any, msg: str) -> None:
|
||||
key = (node_id, slot_idx, msg)
|
||||
if key in self._warned:
|
||||
return
|
||||
self._warned.add(key)
|
||||
logging.warning("TypeResolver: node=%s slot=%s %s", node_id, slot_idx, msg)
|
||||
55
execution.py
55
execution.py
@ -83,8 +83,8 @@ class IsChangedCache:
|
||||
self.is_changed[node_id] = node["is_changed"]
|
||||
return self.is_changed[node_id]
|
||||
|
||||
# Intentionally do not use cached outputs here. We only want constants in IS_CHANGED
|
||||
input_data_all, _, v3_data = get_input_data(node["inputs"], class_def, node_id, None)
|
||||
# Intentionally do not use cached outputs here. We only want constants in IS_CHANGED.
|
||||
input_data_all, _, v3_data = get_input_data(node["inputs"], class_def, node_id, None, self.dynprompt)
|
||||
try:
|
||||
is_changed = await _async_map_node_over_list(self.prompt_id, node_id, class_def, input_data_all, is_changed_name, v3_data=v3_data)
|
||||
is_changed = await resolve_map_node_over_list_results(is_changed)
|
||||
@ -152,13 +152,16 @@ class CacheSet:
|
||||
|
||||
SENSITIVE_EXTRA_DATA_KEYS = ("auth_token_comfy_org", "api_key_comfy_org")
|
||||
|
||||
def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt=None, extra_data={}):
|
||||
def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt=None, extra_data={}, live_input_types=None):
|
||||
is_v3 = issubclass(class_def, _ComfyNodeInternal)
|
||||
v3_data: io.V3Data = {}
|
||||
hidden_inputs_v3 = {}
|
||||
valid_inputs = class_def.INPUT_TYPES()
|
||||
if is_v3:
|
||||
valid_inputs, hidden, v3_data = _io.get_finalized_class_inputs(valid_inputs, inputs)
|
||||
# Let dynamic schemas branch on resolved upstream types, not just literal values.
|
||||
if live_input_types is None and dynprompt is not None and hasattr(dynprompt, "get_type_resolver"):
|
||||
live_input_types = dynprompt.get_type_resolver().compute_live_input_types(unique_id)
|
||||
valid_inputs, hidden, v3_data = _io.get_finalized_class_inputs(valid_inputs, inputs, live_input_types=live_input_types)
|
||||
input_data_all = {}
|
||||
missing_keys = {}
|
||||
for x in inputs:
|
||||
@ -821,9 +824,17 @@ class PromptExecutor:
|
||||
self._notify_prompt_lifecycle("end", prompt_id)
|
||||
|
||||
|
||||
async def validate_inputs(prompt_id, prompt, item, validated, visiting=None):
|
||||
async def validate_inputs(prompt_id, prompt, item, validated, visiting=None, type_resolver=None):
|
||||
"""Validate inputs for a single node, recursing into upstream nodes.
|
||||
|
||||
``type_resolver`` is built once at the top of recursion and shared so
|
||||
MatchType chains are only walked once per prompt.
|
||||
"""
|
||||
if visiting is None:
|
||||
visiting = []
|
||||
if type_resolver is None:
|
||||
from comfy_execution.type_resolver import TypeResolver
|
||||
type_resolver = TypeResolver(prompt)
|
||||
|
||||
unique_id = item
|
||||
if unique_id in validated:
|
||||
@ -855,10 +866,12 @@ async def validate_inputs(prompt_id, prompt, item, validated, visiting=None):
|
||||
v3_data = None
|
||||
validate_function_inputs = []
|
||||
validate_has_kwargs = False
|
||||
live_input_types = None
|
||||
if issubclass(obj_class, _ComfyNodeInternal):
|
||||
obj_class: _io._ComfyNodeBaseInternal
|
||||
class_inputs = obj_class.INPUT_TYPES()
|
||||
class_inputs, _, v3_data = _io.get_finalized_class_inputs(class_inputs, inputs)
|
||||
live_input_types = type_resolver.compute_live_input_types(unique_id)
|
||||
class_inputs, _, v3_data = _io.get_finalized_class_inputs(class_inputs, inputs, live_input_types=live_input_types)
|
||||
validate_function_name = "validate_inputs"
|
||||
validate_function = first_real_override(obj_class, validate_function_name)
|
||||
else:
|
||||
@ -908,11 +921,20 @@ async def validate_inputs(prompt_id, prompt, item, validated, visiting=None):
|
||||
continue
|
||||
|
||||
o_id = val[0]
|
||||
o_class_type = prompt[o_id]['class_type']
|
||||
r = nodes.NODE_CLASS_MAPPINGS[o_class_type].RETURN_TYPES
|
||||
received_type = r[val[1]]
|
||||
# Walks MatchType/template chains so API workflows without
|
||||
# frontend-injected type metadata get the same answer as the UI.
|
||||
received_type = type_resolver.resolve_output_type(o_id, val[1])
|
||||
received_types[x] = received_type
|
||||
if 'input_types' not in validate_function_inputs and not validate_node_input(received_type, input_type):
|
||||
# DynamicSlot's declared input_type is just the dispatch tag
|
||||
# (COMFY_DYNAMICSLOT_V3); a link is valid iff some Option would
|
||||
# actually claim the resolved upstream type.
|
||||
if input_type == _io.DynamicSlot.io_type and isinstance(extra_info, dict):
|
||||
link_valid = _io.DynamicSlot._select_option(
|
||||
extra_info.get("options", []), {x: received_type}, x, has_link=True
|
||||
) is not None
|
||||
else:
|
||||
link_valid = validate_node_input(received_type, input_type)
|
||||
if 'input_types' not in validate_function_inputs and not link_valid:
|
||||
details = f"{x}, received_type({received_type}) mismatch input_type({input_type})"
|
||||
error = {
|
||||
"type": "return_type_mismatch",
|
||||
@ -930,7 +952,7 @@ async def validate_inputs(prompt_id, prompt, item, validated, visiting=None):
|
||||
try:
|
||||
visiting.append(unique_id)
|
||||
try:
|
||||
r = await validate_inputs(prompt_id, prompt, o_id, validated, visiting)
|
||||
r = await validate_inputs(prompt_id, prompt, o_id, validated, visiting, type_resolver)
|
||||
finally:
|
||||
visiting.pop()
|
||||
if r[0] is False:
|
||||
@ -1058,7 +1080,11 @@ async def validate_inputs(prompt_id, prompt, item, validated, visiting=None):
|
||||
continue
|
||||
|
||||
if len(validate_function_inputs) > 0 or validate_has_kwargs:
|
||||
input_data_all, _, v3_data = get_input_data(inputs, obj_class, unique_id)
|
||||
# Reuse the precomputed live_input_types so a custom validate_inputs()
|
||||
# sees the same DynamicSlot branch that finalization picked above.
|
||||
input_data_all, _, v3_data = get_input_data(
|
||||
inputs, obj_class, unique_id, live_input_types=live_input_types
|
||||
)
|
||||
input_filtered = {}
|
||||
for x in input_data_all:
|
||||
if x in validate_function_inputs or validate_has_kwargs:
|
||||
@ -1155,11 +1181,14 @@ async def validate_prompt(prompt_id, prompt, partial_execution_list: Union[list[
|
||||
errors = []
|
||||
node_errors = {}
|
||||
validated = {}
|
||||
# Shared across output validations so MatchType chains walk only once.
|
||||
from comfy_execution.type_resolver import TypeResolver
|
||||
type_resolver = TypeResolver(prompt)
|
||||
for o in outputs:
|
||||
valid = False
|
||||
reasons = []
|
||||
try:
|
||||
m = await validate_inputs(prompt_id, prompt, o, validated)
|
||||
m = await validate_inputs(prompt_id, prompt, o, validated, None, type_resolver)
|
||||
valid = m[0]
|
||||
reasons = m[1]
|
||||
except Exception as ex:
|
||||
|
||||
350
tests-unit/comfy_api_test/test_dynamic_slot.py
Normal file
350
tests-unit/comfy_api_test/test_dynamic_slot.py
Normal file
@ -0,0 +1,350 @@
|
||||
"""Unit tests for the redesigned ``DynamicSlot`` with type-keyed options."""
|
||||
|
||||
import pytest
|
||||
|
||||
from comfy_api.latest import _io as io
|
||||
|
||||
|
||||
def _opt(when, ids=None):
|
||||
"""Build an Option whose inputs are placeholder String widgets named after ids."""
|
||||
ids = ids or []
|
||||
inputs = [io.String.Input(name) for name in ids]
|
||||
return io.DynamicSlot.Option(when=when, inputs=inputs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Option.when normalization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_option_when_none():
|
||||
o = _opt(None, ["a"])
|
||||
assert o._when_types is None
|
||||
assert o._when_set is None
|
||||
assert o.as_dict()["when"] is None
|
||||
|
||||
|
||||
def test_option_when_single_type():
|
||||
o = _opt(io.Image)
|
||||
assert o._when_types == ("IMAGE",)
|
||||
assert o._when_set == frozenset({"IMAGE"})
|
||||
assert o.as_dict()["when"] == ["IMAGE"]
|
||||
|
||||
|
||||
def test_option_when_anytype():
|
||||
o = _opt(io.AnyType)
|
||||
assert o._when_types == ("*",)
|
||||
assert o.as_dict()["when"] == ["*"]
|
||||
|
||||
|
||||
def test_option_when_list_preserves_order():
|
||||
"""Declaration order is preserved in both the tuple and the serialized form."""
|
||||
o = _opt([io.Mask, io.Image])
|
||||
assert o._when_types == ("MASK", "IMAGE")
|
||||
assert o.as_dict()["when"] == ["MASK", "IMAGE"]
|
||||
|
||||
|
||||
def test_option_when_list_dedups_within_option():
|
||||
o = _opt([io.Image, io.Image, io.Mask])
|
||||
assert o._when_types == ("IMAGE", "MASK")
|
||||
|
||||
|
||||
def test_option_when_multitype_input():
|
||||
mt = io.MultiType.Input("x", types=[io.Image, io.Latent])
|
||||
o = _opt(mt)
|
||||
assert o._when_types == ("IMAGE", "LATENT")
|
||||
|
||||
|
||||
def test_option_when_empty_list_rejected():
|
||||
with pytest.raises(ValueError, match="when=\\[\\]"):
|
||||
io.DynamicSlot.Option(when=[], inputs=[])
|
||||
|
||||
|
||||
def test_option_when_garbage_rejected():
|
||||
with pytest.raises(ValueError, match="when must be"):
|
||||
io.DynamicSlot.Option(when="IMAGE", inputs=[])
|
||||
|
||||
|
||||
def test_option_when_list_with_non_comfytype_rejected():
|
||||
with pytest.raises(ValueError, match="list entries"):
|
||||
io.DynamicSlot.Option(when=[io.Image, "MASK"], inputs=[])
|
||||
|
||||
|
||||
def test_option_when_list_with_anytype_rejected():
|
||||
"""AnyType must stand alone — it represents the unresolvable-wildcard
|
||||
state, not a concrete type that can share a branch with concrete types."""
|
||||
with pytest.raises(ValueError, match="AnyType cannot be grouped"):
|
||||
io.DynamicSlot.Option(when=[io.Image, io.AnyType], inputs=[])
|
||||
|
||||
|
||||
def test_option_when_multitype_with_anytype_rejected():
|
||||
mt = io.MultiType.Input("x", types=[io.Image, io.AnyType])
|
||||
with pytest.raises(ValueError, match="AnyType cannot be grouped"):
|
||||
io.DynamicSlot.Option(when=mt, inputs=[])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DynamicSlot.Input construction and serialization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_input_requires_at_least_one_option():
|
||||
with pytest.raises(ValueError, match="at least one Option"):
|
||||
io.DynamicSlot.Input("x", options=[])
|
||||
|
||||
|
||||
def test_input_requires_non_none_option():
|
||||
with pytest.raises(ValueError, match="non-None `when`"):
|
||||
io.DynamicSlot.Input("x", options=[_opt(None, ["a"])])
|
||||
|
||||
|
||||
def test_input_auto_derives_slot_type():
|
||||
inp = io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["a"]),
|
||||
_opt(io.Mask, ["b"]),
|
||||
_opt(None, ["c"]),
|
||||
])
|
||||
# Declared order preserved across non-None options; None contributes nothing.
|
||||
assert inp._slot_io_type == "IMAGE,MASK"
|
||||
d = inp.as_dict()
|
||||
assert d["slotType"] == "IMAGE,MASK"
|
||||
assert len(d["options"]) == 3
|
||||
|
||||
|
||||
def test_input_includes_anytype_in_slot_type():
|
||||
inp = io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["a"]),
|
||||
_opt(io.AnyType, ["b"]),
|
||||
])
|
||||
assert inp._slot_io_type == "IMAGE,*"
|
||||
|
||||
|
||||
def test_input_rejects_duplicate_type_across_options():
|
||||
with pytest.raises(ValueError, match="appears in more than one"):
|
||||
io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["a"]),
|
||||
_opt([io.Image, io.Mask], ["b"]),
|
||||
])
|
||||
|
||||
|
||||
def test_input_rejects_duplicate_anytype_across_options():
|
||||
with pytest.raises(ValueError, match="appears in more than one"):
|
||||
io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.AnyType, ["a"]),
|
||||
_opt(io.AnyType, ["b"]),
|
||||
])
|
||||
|
||||
|
||||
def test_input_rejects_duplicate_when_none():
|
||||
with pytest.raises(ValueError, match="only one Option may declare when=None"):
|
||||
io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["a"]),
|
||||
_opt(None, ["b"]),
|
||||
_opt(None, ["c"]),
|
||||
])
|
||||
|
||||
|
||||
def test_input_rejects_non_option_entry():
|
||||
with pytest.raises(ValueError, match="must be DynamicSlot.Option instances"):
|
||||
io.DynamicSlot.Input("x", options=[_opt(io.Image, ["a"]), "not an option"])
|
||||
|
||||
|
||||
def test_input_defaults_to_optional_and_always_force_input():
|
||||
"""The slot is always rendered as a connector, never as a widget, even
|
||||
when slotType includes widget-capable types like INT/STRING."""
|
||||
inp = io.DynamicSlot.Input("x", options=[_opt(io.Int, ["n"])])
|
||||
d = inp.as_dict()
|
||||
assert d["forceInput"] is True
|
||||
# default optional=True → slot lives in optional bucket via DynamicInput
|
||||
assert inp.optional is True
|
||||
|
||||
|
||||
def test_input_required_slot_allowed_without_when_none():
|
||||
inp = io.DynamicSlot.Input("x", optional=False, options=[_opt(io.Image, ["a"])])
|
||||
assert inp.optional is False
|
||||
|
||||
|
||||
def test_input_required_slot_rejects_when_none_option():
|
||||
with pytest.raises(ValueError, match="optional=False forbids when=None"):
|
||||
io.DynamicSlot.Input(
|
||||
"x",
|
||||
optional=False,
|
||||
options=[_opt(io.Image, ["a"]), _opt(None, ["b"])],
|
||||
)
|
||||
|
||||
|
||||
def test_input_get_all_prepends_self_and_dedups_children():
|
||||
inp = io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["shared", "image_only"]),
|
||||
_opt(io.Mask, ["shared", "mask_only"]),
|
||||
])
|
||||
items = inp.get_all()
|
||||
# Convention shared with Autogrow / DynamicCombo: parent first, then children.
|
||||
assert items[0] is inp
|
||||
assert [i.id for i in items[1:]] == ["shared", "image_only", "mask_only"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Option selection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _select(options, live_input_types, has_link, finalized_id="x"):
|
||||
"""Convenience wrapper that runs the dispatch through the dict form (post-as_dict)."""
|
||||
serialized = [o.as_dict() for o in options]
|
||||
return io.DynamicSlot._select_option(
|
||||
serialized, live_input_types, finalized_id, has_link
|
||||
)
|
||||
|
||||
|
||||
def test_select_unconnected_picks_none_option():
|
||||
options = [_opt(io.Image, ["img_widgets"]), _opt(None, ["empty_widgets"])]
|
||||
sel = _select(options, {}, has_link=False)
|
||||
assert sel is not None
|
||||
assert sel["when"] is None
|
||||
|
||||
|
||||
def test_select_unconnected_with_no_none_option_returns_none():
|
||||
options = [_opt(io.Image, ["x"])]
|
||||
assert _select(options, {}, has_link=False) is None
|
||||
|
||||
|
||||
def test_select_concrete_type_match():
|
||||
options = [
|
||||
_opt(io.Image, ["a"]),
|
||||
_opt(io.Mask, ["b"]),
|
||||
_opt(io.AnyType, ["c"]),
|
||||
]
|
||||
sel = _select(options, {"x": "MASK"}, has_link=True)
|
||||
assert sel["when"] == ["MASK"]
|
||||
|
||||
|
||||
def test_select_anytype_matches_wildcard_resolved():
|
||||
options = [_opt(io.Image, ["a"]), _opt(io.AnyType, ["c"])]
|
||||
sel = _select(options, {"x": "*"}, has_link=True)
|
||||
assert sel["when"] == ["*"]
|
||||
|
||||
|
||||
def test_select_anytype_does_not_match_concrete():
|
||||
options = [_opt(io.AnyType, ["c"])]
|
||||
# MASK isn't in any option's set; AnyType only matches "*". No expansion.
|
||||
assert _select(options, {"x": "MASK"}, has_link=True) is None
|
||||
|
||||
|
||||
def test_select_anytype_branch_does_not_swallow_unenumerated_concrete():
|
||||
"""Regression: a slot exposing IMAGE + AnyType must reject LATENT upstream
|
||||
instead of expanding the AnyType branch. validate_inputs relies on this
|
||||
to compute link validity (slotType="IMAGE,*" alone would over-accept)."""
|
||||
options = [_opt(io.Image, ["image_widget"]), _opt(io.AnyType, ["any_widget"])]
|
||||
assert _select(options, {"x": "LATENT"}, has_link=True) is None
|
||||
# Sanity: IMAGE still matches the IMAGE branch and "*" still matches AnyType.
|
||||
assert _select(options, {"x": "IMAGE"}, has_link=True)["when"] == ["IMAGE"]
|
||||
assert _select(options, {"x": "*"}, has_link=True)["when"] == ["*"]
|
||||
|
||||
|
||||
def test_select_first_match_wins_on_union_upstream():
|
||||
"""Ordering only matters when upstream declares a multi-type union; with
|
||||
per-option type uniqueness, single concrete types can never match two
|
||||
options."""
|
||||
options = [
|
||||
_opt([io.Image, io.Mask], ["image_or_mask"]),
|
||||
_opt(io.Latent, ["latent_only"]),
|
||||
]
|
||||
# Upstream union "IMAGE,LATENT" intersects both options; first option wins.
|
||||
sel = _select(options, {"x": "IMAGE,LATENT"}, has_link=True)
|
||||
first_input_id = next(iter(sel["inputs"]["required"].keys()))
|
||||
assert first_input_id == "image_or_mask"
|
||||
|
||||
|
||||
def test_select_multitype_upstream_intersects_option_set():
|
||||
"""When upstream declares MultiType like 'IMAGE,MASK', any option that
|
||||
intersects with that set matches (first wins)."""
|
||||
options = [
|
||||
_opt(io.Latent, ["latent_only"]),
|
||||
_opt(io.Mask, ["mask_only"]),
|
||||
]
|
||||
sel = _select(options, {"x": "IMAGE,MASK"}, has_link=True)
|
||||
assert sel["when"] == ["MASK"]
|
||||
|
||||
|
||||
def test_select_missing_resolved_falls_through_to_anytype():
|
||||
"""If live_input_types lacks an entry for this slot but a link exists,
|
||||
we treat it as '*' (resolver default for unresolvable links)."""
|
||||
options = [_opt(io.Image, ["a"]), _opt(io.AnyType, ["c"])]
|
||||
sel = _select(options, {}, has_link=True)
|
||||
assert sel["when"] == ["*"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end expansion via _expand_schema_for_dynamic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_expand_unconnected_path():
|
||||
"""An unconnected slot with a `when=None` option expands that option's children."""
|
||||
inp = io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["image_widget"]),
|
||||
_opt(None, ["empty_widget"]),
|
||||
])
|
||||
d = inp.as_dict()
|
||||
value = (io.DynamicSlot.io_type, d)
|
||||
out_dict = {
|
||||
"required": {}, "optional": {}, "hidden": {},
|
||||
"dynamic_paths": {}, "dynamic_paths_default_value": {},
|
||||
}
|
||||
io.DynamicSlot._expand_schema_for_dynamic(
|
||||
out_dict=out_dict,
|
||||
live_inputs={}, # no entry for "x" → unconnected
|
||||
value=value,
|
||||
input_type="optional",
|
||||
curr_prefix=["x"],
|
||||
live_input_types=None,
|
||||
)
|
||||
# The slot itself is always advertised in the caller's bucket.
|
||||
assert "x" in out_dict["optional"]
|
||||
# Children land in their own buckets (required by default) with
|
||||
# parent-prefixed ids.
|
||||
assert "x.empty_widget" in out_dict["required"]
|
||||
assert "x.image_widget" not in out_dict["required"]
|
||||
|
||||
|
||||
def test_expand_typed_path():
|
||||
"""A connected slot expands the matching type's children."""
|
||||
inp = io.DynamicSlot.Input("x", options=[
|
||||
_opt(io.Image, ["image_widget"]),
|
||||
_opt(io.Mask, ["mask_widget"]),
|
||||
])
|
||||
d = inp.as_dict()
|
||||
value = (io.DynamicSlot.io_type, d)
|
||||
out_dict = {
|
||||
"required": {}, "optional": {}, "hidden": {},
|
||||
"dynamic_paths": {}, "dynamic_paths_default_value": {},
|
||||
}
|
||||
io.DynamicSlot._expand_schema_for_dynamic(
|
||||
out_dict=out_dict,
|
||||
live_inputs={"x": ["src_node", 0]}, # link present
|
||||
value=value,
|
||||
input_type="optional",
|
||||
curr_prefix=["x"],
|
||||
live_input_types={"x": "MASK"},
|
||||
)
|
||||
assert "x" in out_dict["optional"]
|
||||
assert "x.mask_widget" in out_dict["required"]
|
||||
assert "x.image_widget" not in out_dict["required"]
|
||||
|
||||
|
||||
def test_expand_unmatched_concrete_still_advertises_slot():
|
||||
"""Resolved type not in any option → no children, but the slot itself stays."""
|
||||
inp = io.DynamicSlot.Input("x", options=[_opt(io.Image, ["image_widget"])])
|
||||
d = inp.as_dict()
|
||||
value = (io.DynamicSlot.io_type, d)
|
||||
out_dict = {
|
||||
"required": {}, "optional": {}, "hidden": {},
|
||||
"dynamic_paths": {}, "dynamic_paths_default_value": {},
|
||||
}
|
||||
io.DynamicSlot._expand_schema_for_dynamic(
|
||||
out_dict=out_dict,
|
||||
live_inputs={"x": ["src_node", 0]},
|
||||
value=value,
|
||||
input_type="optional",
|
||||
curr_prefix=["x"],
|
||||
live_input_types={"x": "LATENT"},
|
||||
)
|
||||
assert "x" in out_dict["optional"]
|
||||
assert "x.image_widget" not in out_dict["required"]
|
||||
482
tests-unit/execution_test/test_type_resolver.py
Normal file
482
tests-unit/execution_test/test_type_resolver.py
Normal file
@ -0,0 +1,482 @@
|
||||
"""Unit tests for :mod:`comfy_execution.type_resolver`.
|
||||
|
||||
These tests stand up a small in-memory ``NODE_CLASS_MAPPINGS`` for the test
|
||||
node classes (V1 and V3) and a fake DynamicPrompt-like dict, then verify the
|
||||
resolver's behaviour for:
|
||||
|
||||
* Static V1 ``RETURN_TYPES`` resolution.
|
||||
* V1 wildcard outputs (must yield ``AnyType`` and warn once).
|
||||
* V3 ``MatchType`` chains resolved via the downstream node's bound inputs.
|
||||
* ``MatchType`` with no upstream bound (fall back to ``AnyType`` + warn).
|
||||
* ``MatchType`` cycles (termination at ``AnyType`` + warn, no recursion blow-up).
|
||||
* Deep chains capped by ``MAX_RESOLVE_DEPTH``.
|
||||
* Input-type resolution for both literal values and links.
|
||||
* Effective slot io_type peeling for ``Autogrow`` (returns the wrapped type).
|
||||
* ``compute_live_input_types`` produces the right shape.
|
||||
* Cache invalidation.
|
||||
|
||||
The tests deliberately patch ``nodes.NODE_CLASS_MAPPINGS`` so they don't need
|
||||
the whole ComfyUI bootstrap.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import types as _pytypes
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lightweight V1 test node factory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _v1_node(return_types: tuple[str, ...], input_types_dict: dict | None = None,
|
||||
output_is_list: tuple[bool, ...] | None = None):
|
||||
"""Build a V1 node class with the given RETURN_TYPES / INPUT_TYPES()."""
|
||||
if input_types_dict is None:
|
||||
input_types_dict = {"required": {}}
|
||||
|
||||
class _V1:
|
||||
RETURN_TYPES = return_types
|
||||
if output_is_list is not None:
|
||||
OUTPUT_IS_LIST = output_is_list
|
||||
|
||||
@classmethod
|
||||
def INPUT_TYPES(cls):
|
||||
return input_types_dict
|
||||
|
||||
return _V1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixture: install fake nodes module before importing the resolver
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def fake_nodes_module():
|
||||
"""Install a synthetic ``nodes`` module with an empty mappings dict.
|
||||
|
||||
Yields the mappings dict so tests can populate it per case. Cleans up
|
||||
afterwards. We also have to make sure comfy_execution.type_resolver picks
|
||||
up our fake module on its local import.
|
||||
"""
|
||||
real_nodes = sys.modules.get("nodes")
|
||||
fake = _pytypes.ModuleType("nodes")
|
||||
fake.NODE_CLASS_MAPPINGS = {}
|
||||
sys.modules["nodes"] = fake
|
||||
try:
|
||||
yield fake.NODE_CLASS_MAPPINGS
|
||||
finally:
|
||||
if real_nodes is not None:
|
||||
sys.modules["nodes"] = real_nodes
|
||||
else:
|
||||
del sys.modules["nodes"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def TypeResolver(fake_nodes_module):
|
||||
# Late import so it picks up our fake `nodes` module.
|
||||
from comfy_execution.type_resolver import TypeResolver as TR
|
||||
return TR
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# V1 resolution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_v1_static_return_types_resolves(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["AddNode"] = _v1_node(("INT",))
|
||||
prompt = {"n1": {"class_type": "AddNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("n1", 0) == "INT"
|
||||
|
||||
|
||||
def test_v1_wildcard_warns_once_and_returns_any(fake_nodes_module, TypeResolver, caplog):
|
||||
fake_nodes_module["WildNode"] = _v1_node(("*",))
|
||||
prompt = {"n1": {"class_type": "WildNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
with caplog.at_level(logging.WARNING, logger="root"):
|
||||
assert r.resolve_output_type("n1", 0) == "*"
|
||||
# second call should still return * but not produce a second warning
|
||||
assert r.resolve_output_type("n1", 0) == "*"
|
||||
warnings = [rec for rec in caplog.records if "TypeResolver" in rec.message]
|
||||
assert len(warnings) == 1, f"expected exactly one warning, got {warnings}"
|
||||
|
||||
|
||||
def test_unknown_node_returns_any(fake_nodes_module, TypeResolver):
|
||||
prompt = {"n1": {"class_type": "NopeNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("n1", 0) == "*"
|
||||
|
||||
|
||||
def test_out_of_range_slot_returns_any(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["AddNode"] = _v1_node(("INT",))
|
||||
prompt = {"n1": {"class_type": "AddNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("n1", 5) == "*"
|
||||
|
||||
|
||||
def test_missing_node_returns_any(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["AddNode"] = _v1_node(("INT",))
|
||||
prompt = {"n1": {"class_type": "AddNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("nonexistent", 0) == "*"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_output_list / is_input_list
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_is_output_list(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["ListNode"] = _v1_node(("IMAGE", "MASK"), output_is_list=(True, False))
|
||||
prompt = {"n1": {"class_type": "ListNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.is_output_list("n1", 0) is True
|
||||
assert r.is_output_list("n1", 1) is False
|
||||
|
||||
|
||||
def test_is_input_list_follows_link(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["ListNode"] = _v1_node(("IMAGE",), output_is_list=(True,))
|
||||
fake_nodes_module["Consumer"] = _v1_node(
|
||||
("INT",),
|
||||
{"required": {"img": ("IMAGE",)}},
|
||||
)
|
||||
prompt = {
|
||||
"src": {"class_type": "ListNode", "inputs": {}},
|
||||
"dst": {"class_type": "Consumer", "inputs": {"img": ["src", 0]}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.is_input_list("dst", "img") is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# V3 MatchType resolution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_switch_node_class():
|
||||
"""Build a V3 Switch-like node with MatchType inputs/outputs."""
|
||||
from comfy_api.latest import io
|
||||
|
||||
class Switch(io.ComfyNode):
|
||||
@classmethod
|
||||
def define_schema(cls):
|
||||
template = io.MatchType.Template("switch")
|
||||
return io.Schema(
|
||||
node_id="TestSwitch",
|
||||
inputs=[
|
||||
io.Boolean.Input("switch"),
|
||||
io.MatchType.Input("on_false", template=template, optional=True),
|
||||
io.MatchType.Input("on_true", template=template, optional=True),
|
||||
],
|
||||
outputs=[io.MatchType.Output(template=template)],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def execute(cls, switch, on_false=None, on_true=None):
|
||||
return io.NodeOutput(on_true if switch else on_false)
|
||||
|
||||
# Force schema computation so SCHEMA / RETURN_TYPES are populated.
|
||||
Switch.GET_SCHEMA()
|
||||
return Switch
|
||||
|
||||
|
||||
def test_matchtype_resolves_to_upstream_concrete(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
fake_nodes_module["ImageSrc"] = _v1_node(("IMAGE",))
|
||||
prompt = {
|
||||
"img": {"class_type": "ImageSrc", "inputs": {}},
|
||||
"sw": {
|
||||
"class_type": "TestSwitch",
|
||||
"inputs": {"switch": True, "on_true": ["img", 0]},
|
||||
},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("sw", 0) == "IMAGE"
|
||||
|
||||
|
||||
def test_matchtype_first_concrete_wins(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
fake_nodes_module["ImageSrc"] = _v1_node(("IMAGE",))
|
||||
fake_nodes_module["LatentSrc"] = _v1_node(("LATENT",))
|
||||
prompt = {
|
||||
"img": {"class_type": "ImageSrc", "inputs": {}},
|
||||
"lat": {"class_type": "LatentSrc", "inputs": {}},
|
||||
"sw": {
|
||||
"class_type": "TestSwitch",
|
||||
"inputs": {
|
||||
"switch": False,
|
||||
"on_false": ["img", 0], # listed first in schema → wins
|
||||
"on_true": ["lat", 0],
|
||||
},
|
||||
},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("sw", 0) == "IMAGE"
|
||||
|
||||
|
||||
def test_matchtype_no_bound_input_returns_any(fake_nodes_module, TypeResolver, caplog):
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
prompt = {"sw": {"class_type": "TestSwitch", "inputs": {"switch": True}}}
|
||||
r = TypeResolver(prompt)
|
||||
with caplog.at_level(logging.WARNING, logger="root"):
|
||||
assert r.resolve_output_type("sw", 0) == "*"
|
||||
assert any("MatchType" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_matchtype_skips_wildcard_input(fake_nodes_module, TypeResolver):
|
||||
"""If the first matched input resolves to AnyType, the resolver tries the next."""
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
fake_nodes_module["WildNode"] = _v1_node(("*",))
|
||||
fake_nodes_module["ImageSrc"] = _v1_node(("IMAGE",))
|
||||
prompt = {
|
||||
"wild": {"class_type": "WildNode", "inputs": {}},
|
||||
"img": {"class_type": "ImageSrc", "inputs": {}},
|
||||
"sw": {
|
||||
"class_type": "TestSwitch",
|
||||
"inputs": {
|
||||
"switch": True,
|
||||
"on_false": ["wild", 0],
|
||||
"on_true": ["img", 0],
|
||||
},
|
||||
},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("sw", 0) == "IMAGE"
|
||||
|
||||
|
||||
def test_matchtype_cycle_terminates_at_any(fake_nodes_module, TypeResolver):
|
||||
"""Two switches that feed each other must not recurse forever."""
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
prompt = {
|
||||
"a": {"class_type": "TestSwitch", "inputs": {"switch": True, "on_true": ["b", 0]}},
|
||||
"b": {"class_type": "TestSwitch", "inputs": {"switch": True, "on_true": ["a", 0]}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
# Must not raise / recurse forever; both resolve to AnyType.
|
||||
assert r.resolve_output_type("a", 0) == "*"
|
||||
assert r.resolve_output_type("b", 0) == "*"
|
||||
|
||||
|
||||
def test_matchtype_chain_resolves_through(fake_nodes_module, TypeResolver):
|
||||
"""A → B → C → IMAGE: chain must walk all the way."""
|
||||
fake_nodes_module["TestSwitch"] = _make_switch_node_class()
|
||||
fake_nodes_module["ImageSrc"] = _v1_node(("IMAGE",))
|
||||
prompt = {
|
||||
"src": {"class_type": "ImageSrc", "inputs": {}},
|
||||
"a": {"class_type": "TestSwitch", "inputs": {"switch": True, "on_true": ["src", 0]}},
|
||||
"b": {"class_type": "TestSwitch", "inputs": {"switch": True, "on_true": ["a", 0]}},
|
||||
"c": {"class_type": "TestSwitch", "inputs": {"switch": True, "on_true": ["b", 0]}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("c", 0) == "IMAGE"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Input resolution and effective io_type peeling
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_resolve_input_type_literal_uses_declared(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Sink"] = _v1_node(("INT",), {"required": {"steps": ("INT",)}})
|
||||
prompt = {"n1": {"class_type": "Sink", "inputs": {"steps": 20}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_input_type("n1", "steps") == "INT"
|
||||
|
||||
|
||||
def test_resolve_input_type_link(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Src"] = _v1_node(("LATENT",))
|
||||
fake_nodes_module["Sink"] = _v1_node(("INT",), {"required": {"x": ("*",)}})
|
||||
prompt = {
|
||||
"src": {"class_type": "Src", "inputs": {}},
|
||||
"sink": {"class_type": "Sink", "inputs": {"x": ["src", 0]}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_input_type("sink", "x") == "LATENT"
|
||||
|
||||
|
||||
def test_effective_slot_type_peels_autogrow(fake_nodes_module, TypeResolver):
|
||||
from comfy_api.latest import io
|
||||
|
||||
class AutogrowImg(io.ComfyNode):
|
||||
@classmethod
|
||||
def define_schema(cls):
|
||||
template = io.Autogrow.TemplatePrefix(
|
||||
input=io.Image.Input("img"),
|
||||
prefix="img",
|
||||
min=1,
|
||||
)
|
||||
return io.Schema(
|
||||
node_id="AutogrowImg",
|
||||
inputs=[io.Autogrow.Input("imgs", template=template)],
|
||||
outputs=[io.Image.Output()],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def execute(cls, imgs):
|
||||
return io.NodeOutput(None)
|
||||
|
||||
AutogrowImg.GET_SCHEMA()
|
||||
fake_nodes_module["AutogrowImg"] = AutogrowImg
|
||||
prompt = {"n1": {"class_type": "AutogrowImg", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
# The user-facing element type, not the autogrow wrapper.
|
||||
assert r.get_declared_slot_io_type("n1", "imgs") == "IMAGE"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# compute_live_input_types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_effective_slot_type_on_v3_plain_input(fake_nodes_module, TypeResolver):
|
||||
"""V3 input that is neither Autogrow/DynamicSlot/DynamicCombo must still resolve.
|
||||
|
||||
Regression test: importing ``io`` from the public re-export skipped
|
||||
``DynamicSlot``, so an ``isinstance`` chain in ``_effective_io_type`` raised
|
||||
``AttributeError`` the first time it ran against a plain V3 input.
|
||||
"""
|
||||
from comfy_api.latest import io
|
||||
|
||||
class BoolSink(io.ComfyNode):
|
||||
@classmethod
|
||||
def define_schema(cls):
|
||||
return io.Schema(
|
||||
node_id="BoolSink",
|
||||
inputs=[io.Boolean.Input("flag")],
|
||||
outputs=[io.Boolean.Output()],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def execute(cls, flag):
|
||||
return io.NodeOutput(flag)
|
||||
|
||||
BoolSink.GET_SCHEMA()
|
||||
fake_nodes_module["BoolSink"] = BoolSink
|
||||
prompt = {"n": {"class_type": "BoolSink", "inputs": {"flag": True}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.get_declared_slot_io_type("n", "flag") == "BOOLEAN"
|
||||
assert r.compute_live_input_types("n") == {"flag": "BOOLEAN"}
|
||||
|
||||
|
||||
def test_effective_slot_type_peels_dynamic_slot(fake_nodes_module, TypeResolver):
|
||||
"""A DynamicSlot input reports its auto-derived slotType (union of `when` types)."""
|
||||
from comfy_api.latest import _io as io
|
||||
|
||||
class DSNode(io.ComfyNode):
|
||||
@classmethod
|
||||
def define_schema(cls):
|
||||
return io.Schema(
|
||||
node_id="DSNode",
|
||||
inputs=[
|
||||
io.DynamicSlot.Input("slot", options=[
|
||||
io.DynamicSlot.Option(when=io.Image, inputs=[]),
|
||||
io.DynamicSlot.Option(when=io.Latent, inputs=[]),
|
||||
]),
|
||||
],
|
||||
outputs=[io.String.Output()],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def execute(cls, **kwargs):
|
||||
return io.NodeOutput("")
|
||||
|
||||
DSNode.GET_SCHEMA()
|
||||
fake_nodes_module["DSNode"] = DSNode
|
||||
prompt = {"n": {"class_type": "DSNode", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.get_declared_slot_io_type("n", "slot") == "IMAGE,LATENT"
|
||||
|
||||
|
||||
def test_compute_live_input_types_mixes_links_and_literals(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Src"] = _v1_node(("MODEL",))
|
||||
fake_nodes_module["Sink"] = _v1_node(
|
||||
("INT",),
|
||||
{"required": {"model": ("MODEL",), "steps": ("INT",)}},
|
||||
)
|
||||
prompt = {
|
||||
"src": {"class_type": "Src", "inputs": {}},
|
||||
"sink": {
|
||||
"class_type": "Sink",
|
||||
"inputs": {"model": ["src", 0], "steps": 20},
|
||||
},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.compute_live_input_types("sink") == {"model": "MODEL", "steps": "INT"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cache invalidation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_invalidate_clears_cache(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Src"] = _v1_node(("IMAGE",))
|
||||
prompt = {"n1": {"class_type": "Src", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("n1", 0) == "IMAGE"
|
||||
# Mutate the underlying class and invalidate; the resolver must re-read.
|
||||
fake_nodes_module["Src"] = _v1_node(("LATENT",))
|
||||
r.invalidate()
|
||||
assert r.resolve_output_type("n1", 0) == "LATENT"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Malformed input robustness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_malformed_link_does_not_crash(fake_nodes_module, TypeResolver):
|
||||
"""A link with a non-int slot index must not raise; resolver returns AnyType."""
|
||||
fake_nodes_module["Src"] = _v1_node(("IMAGE",))
|
||||
fake_nodes_module["Sink"] = _v1_node(("INT",), {"required": {"x": ("*",)}})
|
||||
prompt = {
|
||||
"src": {"class_type": "Src", "inputs": {}},
|
||||
# slot index sent as a string (common API JSON mistake)
|
||||
"sink": {"class_type": "Sink", "inputs": {"x": ["src", "0"]}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
# Falls back to declared slot type (still "*"), no exception.
|
||||
assert r.resolve_input_type("sink", "x") == "*"
|
||||
|
||||
|
||||
def test_malformed_link_wrong_arity_does_not_crash(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Src"] = _v1_node(("IMAGE",))
|
||||
fake_nodes_module["Sink"] = _v1_node(("INT",), {"required": {"x": ("*",)}})
|
||||
prompt = {
|
||||
"src": {"class_type": "Src", "inputs": {}},
|
||||
"sink": {"class_type": "Sink", "inputs": {"x": ["src"]}}, # arity 1
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_input_type("sink", "x") == "*"
|
||||
|
||||
|
||||
def test_direct_resolve_output_type_with_bad_slot_idx_returns_any(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["Src"] = _v1_node(("IMAGE",))
|
||||
prompt = {"src": {"class_type": "Src", "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
# type-wise these should be unreachable through normal validation but the
|
||||
# resolver must still degrade gracefully.
|
||||
assert r.resolve_output_type("src", "0") == "*"
|
||||
assert r.resolve_output_type("src", True) == "*" # bool is a subclass of int
|
||||
assert r.is_output_list("src", "0") is False
|
||||
|
||||
|
||||
def test_non_string_class_type_returns_any(fake_nodes_module, TypeResolver):
|
||||
prompt = {"n1": {"class_type": 42, "inputs": {}}}
|
||||
r = TypeResolver(prompt)
|
||||
assert r.resolve_output_type("n1", 0) == "*"
|
||||
|
||||
|
||||
def test_invalidate_node_only_clears_that_node(fake_nodes_module, TypeResolver):
|
||||
fake_nodes_module["SrcA"] = _v1_node(("IMAGE",))
|
||||
fake_nodes_module["SrcB"] = _v1_node(("LATENT",))
|
||||
prompt = {
|
||||
"a": {"class_type": "SrcA", "inputs": {}},
|
||||
"b": {"class_type": "SrcB", "inputs": {}},
|
||||
}
|
||||
r = TypeResolver(prompt)
|
||||
r.resolve_output_type("a", 0)
|
||||
r.resolve_output_type("b", 0)
|
||||
fake_nodes_module["SrcA"] = _v1_node(("MASK",))
|
||||
r.invalidate_node("a")
|
||||
assert r.resolve_output_type("a", 0) == "MASK"
|
||||
# b's cached result survives even though SrcB was unchanged
|
||||
assert ("b", 0) in r._output_cache
|
||||
Reference in New Issue
Block a user