feat(api): Agent App type S2a — unified agent_runtime_sessions table

Q2 decision: unify the workflow-only ``workflow_agent_runtime_sessions`` into
an owner-agnostic ``agent_runtime_sessions`` table serving both owners. Feature
is unreleased, so the old table is dropped (no data migration).

* ``AgentRuntimeSession`` model (table ``agent_runtime_sessions``) with an
  ``owner_type`` discriminator (workflow_run | conversation): workflow columns
  (workflow_id/run_id/node_id/binding_id/agent_config_snapshot_id/
  composition_layer_specs) and ``conversation_id`` are mutually-exclusive,
  enforced by two partial unique indexes. Back-compat aliases
  ``WorkflowAgentRuntimeSession`` / ``WorkflowAgentRuntimeSessionStatus`` keep
  the shipped lifecycle path (PR #36724) unchanged; the workflow store now sets
  ``owner_type=workflow_run``.
* New ``AgentAppRuntimeSessionStore`` (conversation-keyed) for the Agent App
  side of the same table: one conversation = one Agent session for multi-turn.
* Migration 121e7346074d (drop old + create unified) — applies and
  downgrade/upgrade round-trips clean on Postgres.

Tests: 6 new conversation-store ORM round-trip tests; 154 existing workflow
lifecycle + agent_backend tests still green against the unified table.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Yansong Zhang
2026-05-29 19:00:39 +08:00
parent 71512ad2be
commit 1fc9a7802c
8 changed files with 445 additions and 27 deletions

View File

View File

@ -0,0 +1,103 @@
"""Conversation-keyed Agent backend session store for the Agent App type.
Shares the unified ``agent_runtime_sessions`` table with the workflow Agent
Node store, but owns rows with ``owner_type = conversation``: one Agent App
conversation maps to one Agent session, so multi-turn chat re-enters the same
``session_snapshot``. Cross-conversation memory (PRD Global / Per app) is a
phase-2 concern and not modeled here.
"""
from __future__ import annotations
from dataclasses import dataclass
from agenton.compositor import CompositorSessionSnapshot
from sqlalchemy import select
from core.db.session_factory import session_factory
from libs.datetime_utils import naive_utc_now
from models.agent import (
AgentRuntimeSession,
AgentRuntimeSessionOwnerType,
AgentRuntimeSessionStatus,
)
@dataclass(frozen=True, slots=True)
class AgentAppSessionScope:
"""Identity of one Agent App conversation session."""
tenant_id: str
app_id: str
conversation_id: str
agent_id: str
class AgentAppRuntimeSessionStore:
"""Persists Agent backend session snapshots for Agent App conversations."""
def load_active_snapshot(self, scope: AgentAppSessionScope) -> CompositorSessionSnapshot | None:
with session_factory.create_session() as session:
row = session.scalar(self._active_stmt(scope))
if row is None:
return None
return CompositorSessionSnapshot.model_validate_json(row.session_snapshot)
def save_active_snapshot(
self,
*,
scope: AgentAppSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
) -> None:
if snapshot is None:
return
snapshot_json = snapshot.model_dump_json()
with session_factory.create_session() as session:
row = session.scalar(self._scope_stmt(scope))
if row is None:
row = AgentRuntimeSession(
tenant_id=scope.tenant_id,
app_id=scope.app_id,
owner_type=AgentRuntimeSessionOwnerType.CONVERSATION,
agent_id=scope.agent_id,
conversation_id=scope.conversation_id,
backend_run_id=backend_run_id,
session_snapshot=snapshot_json,
composition_layer_specs="[]",
status=AgentRuntimeSessionStatus.ACTIVE,
)
session.add(row)
else:
row.backend_run_id = backend_run_id
row.session_snapshot = snapshot_json
row.status = AgentRuntimeSessionStatus.ACTIVE
row.cleaned_at = None
session.commit()
def mark_cleaned(self, *, scope: AgentAppSessionScope, backend_run_id: str | None = None) -> None:
with session_factory.create_session() as session:
row = session.scalar(self._active_stmt(scope))
if row is None:
return
if backend_run_id is not None:
row.backend_run_id = backend_run_id
row.status = AgentRuntimeSessionStatus.CLEANED
row.cleaned_at = naive_utc_now()
session.commit()
@staticmethod
def _scope_stmt(scope: AgentAppSessionScope):
return select(AgentRuntimeSession).where(
AgentRuntimeSession.owner_type == AgentRuntimeSessionOwnerType.CONVERSATION,
AgentRuntimeSession.tenant_id == scope.tenant_id,
AgentRuntimeSession.conversation_id == scope.conversation_id,
AgentRuntimeSession.agent_id == scope.agent_id,
)
@classmethod
def _active_stmt(cls, scope: AgentAppSessionScope):
return cls._scope_stmt(scope).where(AgentRuntimeSession.status == AgentRuntimeSessionStatus.ACTIVE)
__all__ = ["AgentAppRuntimeSessionStore", "AgentAppSessionScope"]

View File

@ -10,6 +10,7 @@ from clients.agent_backend.request_builder import CleanupLayerSpec
from core.db.session_factory import session_factory
from libs.datetime_utils import naive_utc_now
from models.agent import (
AgentRuntimeSessionOwnerType,
WorkflowAgentRuntimeSession,
WorkflowAgentRuntimeSessionStatus,
)
@ -125,6 +126,7 @@ class WorkflowAgentRuntimeSessionStore:
row = WorkflowAgentRuntimeSession(
tenant_id=scope.tenant_id,
app_id=scope.app_id,
owner_type=AgentRuntimeSessionOwnerType.WORKFLOW_RUN,
workflow_id=scope.workflow_id,
workflow_run_id=scope.workflow_run_id,
node_id=scope.node_id,

View File

@ -0,0 +1,140 @@
"""unify agent runtime sessions table
Revision ID: 121e7346074d
Revises: 7885bd53f9a9
Create Date: 2026-05-29 10:54:19.400054
Unifies the workflow-only ``workflow_agent_runtime_sessions`` table into an
owner-agnostic ``agent_runtime_sessions`` table that serves both workflow
Agent Node runs (owner_type=workflow_run) and Agent App conversations
(owner_type=conversation). The feature is unreleased, so the old table is
dropped rather than migrated (no data to preserve).
"""
import sqlalchemy as sa
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = "121e7346074d"
down_revision = "7885bd53f9a9"
branch_labels = None
depends_on = None
def _is_pg() -> bool:
return op.get_bind().dialect.name == "postgresql"
def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column:
kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key}
if primary_key and _is_pg():
kwargs["server_default"] = sa.text("uuidv7()")
return sa.Column(name, models.types.StringUUID(), **kwargs)
def upgrade() -> None:
# Drop the unreleased workflow-only table; recreate as the unified table.
op.drop_table("workflow_agent_runtime_sessions")
op.create_table(
"agent_runtime_sessions",
_uuid_column("id", primary_key=True),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("owner_type", sa.String(length=32), nullable=False),
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
sa.Column("backend_run_id", sa.String(length=255), nullable=True),
sa.Column("session_snapshot", models.types.LongText(), nullable=False),
# Workflow-owner columns (NULL for conversation owner).
sa.Column("workflow_id", models.types.StringUUID(), nullable=True),
sa.Column("workflow_run_id", models.types.StringUUID(), nullable=True),
sa.Column("node_id", sa.String(length=255), nullable=True),
sa.Column("node_execution_id", sa.String(length=255), nullable=True),
sa.Column("binding_id", models.types.StringUUID(), nullable=True),
sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=True),
# MySQL rejects defaults on TEXT; the ORM always supplies this value.
sa.Column("composition_layer_specs", models.types.LongText(), nullable=False),
# Conversation-owner column (NULL for workflow owner).
sa.Column("conversation_id", models.types.StringUUID(), nullable=True),
sa.Column("status", sa.String(length=32), server_default=sa.text("'active'"), nullable=False),
sa.Column("cleaned_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("agent_runtime_session_pkey")),
)
with op.batch_alter_table("agent_runtime_sessions", schema=None) as batch_op:
batch_op.create_index(
"agent_runtime_session_workflow_scope_unique",
["tenant_id", "workflow_run_id", "node_id", "binding_id", "agent_id"],
unique=True,
postgresql_where=sa.text("workflow_run_id IS NOT NULL"),
)
batch_op.create_index(
"agent_runtime_session_conversation_scope_unique",
["tenant_id", "conversation_id", "agent_id"],
unique=True,
postgresql_where=sa.text("conversation_id IS NOT NULL"),
)
batch_op.create_index(
"agent_runtime_session_workflow_lookup_idx",
["tenant_id", "workflow_run_id", "node_id", "status"],
)
batch_op.create_index(
"agent_runtime_session_conversation_lookup_idx",
["tenant_id", "conversation_id", "status"],
)
batch_op.create_index("agent_runtime_session_backend_run_idx", ["backend_run_id"])
def downgrade() -> None:
with op.batch_alter_table("agent_runtime_sessions", schema=None) as batch_op:
batch_op.drop_index("agent_runtime_session_backend_run_idx")
batch_op.drop_index("agent_runtime_session_conversation_lookup_idx")
batch_op.drop_index("agent_runtime_session_workflow_lookup_idx")
batch_op.drop_index(
"agent_runtime_session_conversation_scope_unique",
postgresql_where=sa.text("conversation_id IS NOT NULL"),
)
batch_op.drop_index(
"agent_runtime_session_workflow_scope_unique",
postgresql_where=sa.text("workflow_run_id IS NOT NULL"),
)
op.drop_table("agent_runtime_sessions")
op.create_table(
"workflow_agent_runtime_sessions",
_uuid_column("id", primary_key=True),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False),
sa.Column("node_id", sa.String(length=255), nullable=False),
sa.Column("node_execution_id", sa.String(length=255), nullable=True),
sa.Column("binding_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False),
sa.Column("backend_run_id", sa.String(length=255), nullable=True),
sa.Column("session_snapshot", models.types.LongText(), nullable=False),
sa.Column("composition_layer_specs", models.types.LongText(), nullable=False),
sa.Column("status", sa.String(length=32), server_default=sa.text("'active'"), nullable=False),
sa.Column("cleaned_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")),
sa.UniqueConstraint(
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name=op.f("workflow_agent_runtime_session_scope_unique"),
),
)
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
batch_op.create_index(
"workflow_agent_runtime_session_lookup_idx",
["tenant_id", "workflow_run_id", "node_id", "status"],
)
batch_op.create_index("workflow_agent_runtime_session_backend_run_idx", ["backend_run_id"])

View File

@ -15,6 +15,9 @@ from .agent import (
AgentConfigSnapshot,
AgentIconType,
AgentKind,
AgentRuntimeSession,
AgentRuntimeSessionOwnerType,
AgentRuntimeSessionStatus,
AgentScope,
AgentSource,
AgentStatus,
@ -146,6 +149,9 @@ __all__ = [
"AgentConfigSnapshot",
"AgentIconType",
"AgentKind",
"AgentRuntimeSession",
"AgentRuntimeSessionOwnerType",
"AgentRuntimeSessionStatus",
"AgentScope",
"AgentSource",
"AgentStatus",

View File

@ -92,15 +92,33 @@ class WorkflowAgentBindingType(StrEnum):
INLINE_AGENT = "inline_agent"
class WorkflowAgentRuntimeSessionStatus(StrEnum):
"""Lifecycle state of an Agent backend session snapshot owned by a workflow run."""
class AgentRuntimeSessionStatus(StrEnum):
"""Lifecycle state of an Agent backend session snapshot.
# Snapshot can be reused by a later Agent run in the same workflow run.
Owner-agnostic: applies both to workflow Agent Node runs (owner =
workflow_run) and to Agent App conversations (owner = conversation).
"""
# Snapshot can be reused by a later Agent run in the same session.
ACTIVE = "active"
# Snapshot has been retired and must not be submitted to Agent backend again.
CLEANED = "cleaned"
class AgentRuntimeSessionOwnerType(StrEnum):
"""Which product surface owns an Agent runtime session row."""
# Owned by one workflow Agent Node execution scope.
WORKFLOW_RUN = "workflow_run"
# Owned by one Agent App conversation (multi-turn chat).
CONVERSATION = "conversation"
# Back-compat alias: the workflow lifecycle code (shipped in PR #36724) imports
# the old name. Kept so unifying the table does not churn that path.
WorkflowAgentRuntimeSessionStatus = AgentRuntimeSessionStatus
class Agent(DefaultFieldsMixin, Base):
"""Workspace-scoped Agent identity used by Agent Roster and workflow-only agents."""
@ -284,54 +302,88 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base):
return dict(self.node_job_config)
class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base):
"""Persisted Agent backend session snapshot for one workflow Agent node execution scope.
class AgentRuntimeSession(DefaultFieldsMixin, Base):
"""Persisted Agent backend session snapshot, owner-agnostic.
The snapshot is runtime state returned by Agent backend. It is intentionally
separate from Agent Soul snapshots and workflow node-job config.
One unified table serves both owners (decision Q2):
- workflow Agent Node runs: ``owner_type = workflow_run``; the
``workflow_id / workflow_run_id / node_id / binding_id /
agent_config_snapshot_id / composition_layer_specs`` columns are set.
- Agent App conversations: ``owner_type = conversation``; the
``conversation_id`` column is set and the workflow columns stay NULL.
The snapshot is runtime state returned by Agent backend, kept separate from
Agent Soul snapshots and workflow node-job config.
"""
__tablename__ = "workflow_agent_runtime_sessions"
__tablename__ = "agent_runtime_sessions"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"),
UniqueConstraint(
sa.PrimaryKeyConstraint("id", name="agent_runtime_session_pkey"),
# Workflow owner uniqueness (partial: only rows with a workflow_run_id).
Index(
"agent_runtime_session_workflow_scope_unique",
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name="workflow_agent_runtime_session_scope_unique",
unique=True,
postgresql_where=sa.text("workflow_run_id IS NOT NULL"),
),
# Conversation owner uniqueness (partial: only rows with a conversation_id).
Index(
"agent_runtime_session_conversation_scope_unique",
"tenant_id",
"conversation_id",
"agent_id",
unique=True,
postgresql_where=sa.text("conversation_id IS NOT NULL"),
),
Index(
"workflow_agent_runtime_session_lookup_idx",
"agent_runtime_session_workflow_lookup_idx",
"tenant_id",
"workflow_run_id",
"node_id",
"status",
),
Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"),
Index(
"agent_runtime_session_conversation_lookup_idx",
"tenant_id",
"conversation_id",
"status",
),
Index("agent_runtime_session_backend_run_idx", "backend_run_id"),
)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(255), nullable=False)
node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
owner_type: Mapped[AgentRuntimeSessionOwnerType] = mapped_column(
EnumText(AgentRuntimeSessionOwnerType, length=32), nullable=False
)
agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False)
# JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps,
# config}). Drives Agent backend cleanup-only runs: the agenton compositor
# rejects a session snapshot whose layer names do not match the cleanup
# composition, so we must replay the same layer graph (minus credential-
# bearing plugin layers) when issuing the cleanup request.
# Workflow-owner columns (NULL for conversation owner).
workflow_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
node_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
binding_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
agent_config_snapshot_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
# JSON-encoded list of cleanup layer specs ({name, type, deps, config}).
# Drives Agent backend cleanup-only runs: the agenton compositor rejects a
# session snapshot whose layer names do not match the cleanup composition,
# so we replay the same layer graph (minus credential-bearing plugin layers).
composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]")
status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column(
EnumText(WorkflowAgentRuntimeSessionStatus, length=32),
# Conversation-owner column (NULL for workflow owner).
conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
status: Mapped[AgentRuntimeSessionStatus] = mapped_column(
EnumText(AgentRuntimeSessionStatus, length=32),
nullable=False,
default=WorkflowAgentRuntimeSessionStatus.ACTIVE,
default=AgentRuntimeSessionStatus.ACTIVE,
)
cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# Back-compat alias for the shipped workflow lifecycle code (PR #36724).
WorkflowAgentRuntimeSession = AgentRuntimeSession

View File

@ -0,0 +1,115 @@
"""Unit tests for the conversation-keyed Agent App session store.
Exercises the real ORM round-trip against the project's in-memory SQLite engine
(per-test create/drop of the unified ``agent_runtime_sessions`` table), so the
conversation owner path is verified without Postgres.
"""
from __future__ import annotations
from collections.abc import Generator
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from sqlalchemy import delete
from core.app.apps.agent_app.session_store import AgentAppRuntimeSessionStore, AgentAppSessionScope
from core.db.session_factory import session_factory
from models.agent import AgentRuntimeSession, AgentRuntimeSessionOwnerType, AgentRuntimeSessionStatus
def _scope(conversation_id: str = "conv-1", agent_id: str = "agent-1") -> AgentAppSessionScope:
return AgentAppSessionScope(
tenant_id="tenant-1",
app_id="app-1",
conversation_id=conversation_id,
agent_id=agent_id,
)
def _snapshot(messages: int = 1) -> CompositorSessionSnapshot:
return CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]},
)
]
)
@pytest.fixture(autouse=True)
def _create_table() -> Generator[None, None, None]:
engine = session_factory.get_session_maker().kw["bind"]
AgentRuntimeSession.__table__.create(bind=engine, checkfirst=True)
yield
with session_factory.create_session() as session:
session.execute(delete(AgentRuntimeSession))
session.commit()
AgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True)
def test_load_returns_none_when_no_row():
assert AgentAppRuntimeSessionStore().load_active_snapshot(_scope()) is None
def test_save_creates_conversation_owned_row_and_round_trips():
store = AgentAppRuntimeSessionStore()
store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot(messages=2))
loaded = store.load_active_snapshot(_scope())
assert loaded is not None
assert loaded.layers[0].runtime_state["messages"] == [
{"role": "user", "content": "m0"},
{"role": "user", "content": "m1"},
]
with session_factory.create_session() as session:
row = session.query(AgentRuntimeSession).one()
assert row.owner_type == AgentRuntimeSessionOwnerType.CONVERSATION
assert row.conversation_id == "conv-1"
assert row.workflow_run_id is None # conversation owner leaves workflow cols NULL
assert row.backend_run_id == "run-1"
def test_save_is_noop_when_snapshot_missing():
store = AgentAppRuntimeSessionStore()
store.save_active_snapshot(scope=_scope(), backend_run_id="run-x", snapshot=None)
with session_factory.create_session() as session:
assert session.query(AgentRuntimeSession).count() == 0
def test_second_turn_updates_same_conversation_row():
store = AgentAppRuntimeSessionStore()
store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot(messages=1))
store.save_active_snapshot(scope=_scope(), backend_run_id="run-2", snapshot=_snapshot(messages=3))
with session_factory.create_session() as session:
rows = session.query(AgentRuntimeSession).all()
assert len(rows) == 1
assert rows[0].backend_run_id == "run-2"
def test_mark_cleaned_then_load_returns_none_and_save_resurrects():
store = AgentAppRuntimeSessionStore()
store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot())
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
assert store.load_active_snapshot(_scope()) is None
# Re-entry revives the row.
store.save_active_snapshot(scope=_scope(), backend_run_id="run-2", snapshot=_snapshot(messages=2))
with session_factory.create_session() as session:
row = session.query(AgentRuntimeSession).one()
assert row.status == AgentRuntimeSessionStatus.ACTIVE
assert row.cleaned_at is None
assert row.backend_run_id == "run-2"
def test_distinct_conversations_do_not_collide():
store = AgentAppRuntimeSessionStore()
store.save_active_snapshot(scope=_scope(conversation_id="conv-A"), backend_run_id="a", snapshot=_snapshot())
store.save_active_snapshot(scope=_scope(conversation_id="conv-B"), backend_run_id="b", snapshot=_snapshot())
assert store.load_active_snapshot(_scope(conversation_id="conv-A")) is not None
assert store.load_active_snapshot(_scope(conversation_id="conv-B")) is not None
with session_factory.create_session() as session:
assert session.query(AgentRuntimeSession).count() == 2