diff --git a/api/core/app/apps/agent_app/__init__.py b/api/core/app/apps/agent_app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/app/apps/agent_app/session_store.py b/api/core/app/apps/agent_app/session_store.py new file mode 100644 index 0000000000..13927b3056 --- /dev/null +++ b/api/core/app/apps/agent_app/session_store.py @@ -0,0 +1,103 @@ +"""Conversation-keyed Agent backend session store for the Agent App type. + +Shares the unified ``agent_runtime_sessions`` table with the workflow Agent +Node store, but owns rows with ``owner_type = conversation``: one Agent App +conversation maps to one Agent session, so multi-turn chat re-enters the same +``session_snapshot``. Cross-conversation memory (PRD Global / Per app) is a +phase-2 concern and not modeled here. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from agenton.compositor import CompositorSessionSnapshot +from sqlalchemy import select + +from core.db.session_factory import session_factory +from libs.datetime_utils import naive_utc_now +from models.agent import ( + AgentRuntimeSession, + AgentRuntimeSessionOwnerType, + AgentRuntimeSessionStatus, +) + + +@dataclass(frozen=True, slots=True) +class AgentAppSessionScope: + """Identity of one Agent App conversation session.""" + + tenant_id: str + app_id: str + conversation_id: str + agent_id: str + + +class AgentAppRuntimeSessionStore: + """Persists Agent backend session snapshots for Agent App conversations.""" + + def load_active_snapshot(self, scope: AgentAppSessionScope) -> CompositorSessionSnapshot | None: + with session_factory.create_session() as session: + row = session.scalar(self._active_stmt(scope)) + if row is None: + return None + return CompositorSessionSnapshot.model_validate_json(row.session_snapshot) + + def save_active_snapshot( + self, + *, + scope: AgentAppSessionScope, + backend_run_id: str, + snapshot: CompositorSessionSnapshot | None, + ) -> None: + if snapshot is None: + return + snapshot_json = snapshot.model_dump_json() + with session_factory.create_session() as session: + row = session.scalar(self._scope_stmt(scope)) + if row is None: + row = AgentRuntimeSession( + tenant_id=scope.tenant_id, + app_id=scope.app_id, + owner_type=AgentRuntimeSessionOwnerType.CONVERSATION, + agent_id=scope.agent_id, + conversation_id=scope.conversation_id, + backend_run_id=backend_run_id, + session_snapshot=snapshot_json, + composition_layer_specs="[]", + status=AgentRuntimeSessionStatus.ACTIVE, + ) + session.add(row) + else: + row.backend_run_id = backend_run_id + row.session_snapshot = snapshot_json + row.status = AgentRuntimeSessionStatus.ACTIVE + row.cleaned_at = None + session.commit() + + def mark_cleaned(self, *, scope: AgentAppSessionScope, backend_run_id: str | None = None) -> None: + with session_factory.create_session() as session: + row = session.scalar(self._active_stmt(scope)) + if row is None: + return + if backend_run_id is not None: + row.backend_run_id = backend_run_id + row.status = AgentRuntimeSessionStatus.CLEANED + row.cleaned_at = naive_utc_now() + session.commit() + + @staticmethod + def _scope_stmt(scope: AgentAppSessionScope): + return select(AgentRuntimeSession).where( + AgentRuntimeSession.owner_type == AgentRuntimeSessionOwnerType.CONVERSATION, + AgentRuntimeSession.tenant_id == scope.tenant_id, + AgentRuntimeSession.conversation_id == scope.conversation_id, + AgentRuntimeSession.agent_id == scope.agent_id, + ) + + @classmethod + def _active_stmt(cls, scope: AgentAppSessionScope): + return cls._scope_stmt(scope).where(AgentRuntimeSession.status == AgentRuntimeSessionStatus.ACTIVE) + + +__all__ = ["AgentAppRuntimeSessionStore", "AgentAppSessionScope"] diff --git a/api/core/workflow/nodes/agent_v2/session_store.py b/api/core/workflow/nodes/agent_v2/session_store.py index 9c45742489..01bc031268 100644 --- a/api/core/workflow/nodes/agent_v2/session_store.py +++ b/api/core/workflow/nodes/agent_v2/session_store.py @@ -10,6 +10,7 @@ from clients.agent_backend.request_builder import CleanupLayerSpec from core.db.session_factory import session_factory from libs.datetime_utils import naive_utc_now from models.agent import ( + AgentRuntimeSessionOwnerType, WorkflowAgentRuntimeSession, WorkflowAgentRuntimeSessionStatus, ) @@ -125,6 +126,7 @@ class WorkflowAgentRuntimeSessionStore: row = WorkflowAgentRuntimeSession( tenant_id=scope.tenant_id, app_id=scope.app_id, + owner_type=AgentRuntimeSessionOwnerType.WORKFLOW_RUN, workflow_id=scope.workflow_id, workflow_run_id=scope.workflow_run_id, node_id=scope.node_id, diff --git a/api/migrations/versions/2026_05_29_1054-121e7346074d_unify_agent_runtime_sessions_table.py b/api/migrations/versions/2026_05_29_1054-121e7346074d_unify_agent_runtime_sessions_table.py new file mode 100644 index 0000000000..4fca1e1c2d --- /dev/null +++ b/api/migrations/versions/2026_05_29_1054-121e7346074d_unify_agent_runtime_sessions_table.py @@ -0,0 +1,140 @@ +"""unify agent runtime sessions table + +Revision ID: 121e7346074d +Revises: 7885bd53f9a9 +Create Date: 2026-05-29 10:54:19.400054 + +Unifies the workflow-only ``workflow_agent_runtime_sessions`` table into an +owner-agnostic ``agent_runtime_sessions`` table that serves both workflow +Agent Node runs (owner_type=workflow_run) and Agent App conversations +(owner_type=conversation). The feature is unreleased, so the old table is +dropped rather than migrated (no data to preserve). +""" + +import sqlalchemy as sa +from alembic import op + +import models as models + +# revision identifiers, used by Alembic. +revision = "121e7346074d" +down_revision = "7885bd53f9a9" +branch_labels = None +depends_on = None + + +def _is_pg() -> bool: + return op.get_bind().dialect.name == "postgresql" + + +def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column: + kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key} + if primary_key and _is_pg(): + kwargs["server_default"] = sa.text("uuidv7()") + return sa.Column(name, models.types.StringUUID(), **kwargs) + + +def upgrade() -> None: + # Drop the unreleased workflow-only table; recreate as the unified table. + op.drop_table("workflow_agent_runtime_sessions") + + op.create_table( + "agent_runtime_sessions", + _uuid_column("id", primary_key=True), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("app_id", models.types.StringUUID(), nullable=False), + sa.Column("owner_type", sa.String(length=32), nullable=False), + sa.Column("agent_id", models.types.StringUUID(), nullable=False), + sa.Column("backend_run_id", sa.String(length=255), nullable=True), + sa.Column("session_snapshot", models.types.LongText(), nullable=False), + # Workflow-owner columns (NULL for conversation owner). + sa.Column("workflow_id", models.types.StringUUID(), nullable=True), + sa.Column("workflow_run_id", models.types.StringUUID(), nullable=True), + sa.Column("node_id", sa.String(length=255), nullable=True), + sa.Column("node_execution_id", sa.String(length=255), nullable=True), + sa.Column("binding_id", models.types.StringUUID(), nullable=True), + sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=True), + # MySQL rejects defaults on TEXT; the ORM always supplies this value. + sa.Column("composition_layer_specs", models.types.LongText(), nullable=False), + # Conversation-owner column (NULL for workflow owner). + sa.Column("conversation_id", models.types.StringUUID(), nullable=True), + sa.Column("status", sa.String(length=32), server_default=sa.text("'active'"), nullable=False), + sa.Column("cleaned_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("agent_runtime_session_pkey")), + ) + with op.batch_alter_table("agent_runtime_sessions", schema=None) as batch_op: + batch_op.create_index( + "agent_runtime_session_workflow_scope_unique", + ["tenant_id", "workflow_run_id", "node_id", "binding_id", "agent_id"], + unique=True, + postgresql_where=sa.text("workflow_run_id IS NOT NULL"), + ) + batch_op.create_index( + "agent_runtime_session_conversation_scope_unique", + ["tenant_id", "conversation_id", "agent_id"], + unique=True, + postgresql_where=sa.text("conversation_id IS NOT NULL"), + ) + batch_op.create_index( + "agent_runtime_session_workflow_lookup_idx", + ["tenant_id", "workflow_run_id", "node_id", "status"], + ) + batch_op.create_index( + "agent_runtime_session_conversation_lookup_idx", + ["tenant_id", "conversation_id", "status"], + ) + batch_op.create_index("agent_runtime_session_backend_run_idx", ["backend_run_id"]) + + +def downgrade() -> None: + with op.batch_alter_table("agent_runtime_sessions", schema=None) as batch_op: + batch_op.drop_index("agent_runtime_session_backend_run_idx") + batch_op.drop_index("agent_runtime_session_conversation_lookup_idx") + batch_op.drop_index("agent_runtime_session_workflow_lookup_idx") + batch_op.drop_index( + "agent_runtime_session_conversation_scope_unique", + postgresql_where=sa.text("conversation_id IS NOT NULL"), + ) + batch_op.drop_index( + "agent_runtime_session_workflow_scope_unique", + postgresql_where=sa.text("workflow_run_id IS NOT NULL"), + ) + op.drop_table("agent_runtime_sessions") + + op.create_table( + "workflow_agent_runtime_sessions", + _uuid_column("id", primary_key=True), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("app_id", models.types.StringUUID(), nullable=False), + sa.Column("workflow_id", models.types.StringUUID(), nullable=False), + sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False), + sa.Column("node_id", sa.String(length=255), nullable=False), + sa.Column("node_execution_id", sa.String(length=255), nullable=True), + sa.Column("binding_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False), + sa.Column("backend_run_id", sa.String(length=255), nullable=True), + sa.Column("session_snapshot", models.types.LongText(), nullable=False), + sa.Column("composition_layer_specs", models.types.LongText(), nullable=False), + sa.Column("status", sa.String(length=32), server_default=sa.text("'active'"), nullable=False), + sa.Column("cleaned_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")), + sa.UniqueConstraint( + "tenant_id", + "workflow_run_id", + "node_id", + "binding_id", + "agent_id", + name=op.f("workflow_agent_runtime_session_scope_unique"), + ), + ) + with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op: + batch_op.create_index( + "workflow_agent_runtime_session_lookup_idx", + ["tenant_id", "workflow_run_id", "node_id", "status"], + ) + batch_op.create_index("workflow_agent_runtime_session_backend_run_idx", ["backend_run_id"]) diff --git a/api/models/__init__.py b/api/models/__init__.py index 47962cad21..6cd50bd0db 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -15,6 +15,9 @@ from .agent import ( AgentConfigSnapshot, AgentIconType, AgentKind, + AgentRuntimeSession, + AgentRuntimeSessionOwnerType, + AgentRuntimeSessionStatus, AgentScope, AgentSource, AgentStatus, @@ -146,6 +149,9 @@ __all__ = [ "AgentConfigSnapshot", "AgentIconType", "AgentKind", + "AgentRuntimeSession", + "AgentRuntimeSessionOwnerType", + "AgentRuntimeSessionStatus", "AgentScope", "AgentSource", "AgentStatus", diff --git a/api/models/agent.py b/api/models/agent.py index 684efd513d..25fb990837 100644 --- a/api/models/agent.py +++ b/api/models/agent.py @@ -92,15 +92,33 @@ class WorkflowAgentBindingType(StrEnum): INLINE_AGENT = "inline_agent" -class WorkflowAgentRuntimeSessionStatus(StrEnum): - """Lifecycle state of an Agent backend session snapshot owned by a workflow run.""" +class AgentRuntimeSessionStatus(StrEnum): + """Lifecycle state of an Agent backend session snapshot. - # Snapshot can be reused by a later Agent run in the same workflow run. + Owner-agnostic: applies both to workflow Agent Node runs (owner = + workflow_run) and to Agent App conversations (owner = conversation). + """ + + # Snapshot can be reused by a later Agent run in the same session. ACTIVE = "active" # Snapshot has been retired and must not be submitted to Agent backend again. CLEANED = "cleaned" +class AgentRuntimeSessionOwnerType(StrEnum): + """Which product surface owns an Agent runtime session row.""" + + # Owned by one workflow Agent Node execution scope. + WORKFLOW_RUN = "workflow_run" + # Owned by one Agent App conversation (multi-turn chat). + CONVERSATION = "conversation" + + +# Back-compat alias: the workflow lifecycle code (shipped in PR #36724) imports +# the old name. Kept so unifying the table does not churn that path. +WorkflowAgentRuntimeSessionStatus = AgentRuntimeSessionStatus + + class Agent(DefaultFieldsMixin, Base): """Workspace-scoped Agent identity used by Agent Roster and workflow-only agents.""" @@ -284,54 +302,88 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base): return dict(self.node_job_config) -class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base): - """Persisted Agent backend session snapshot for one workflow Agent node execution scope. +class AgentRuntimeSession(DefaultFieldsMixin, Base): + """Persisted Agent backend session snapshot, owner-agnostic. - The snapshot is runtime state returned by Agent backend. It is intentionally - separate from Agent Soul snapshots and workflow node-job config. + One unified table serves both owners (decision Q2): + - workflow Agent Node runs: ``owner_type = workflow_run``; the + ``workflow_id / workflow_run_id / node_id / binding_id / + agent_config_snapshot_id / composition_layer_specs`` columns are set. + - Agent App conversations: ``owner_type = conversation``; the + ``conversation_id`` column is set and the workflow columns stay NULL. + + The snapshot is runtime state returned by Agent backend, kept separate from + Agent Soul snapshots and workflow node-job config. """ - __tablename__ = "workflow_agent_runtime_sessions" + __tablename__ = "agent_runtime_sessions" __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"), - UniqueConstraint( + sa.PrimaryKeyConstraint("id", name="agent_runtime_session_pkey"), + # Workflow owner uniqueness (partial: only rows with a workflow_run_id). + Index( + "agent_runtime_session_workflow_scope_unique", "tenant_id", "workflow_run_id", "node_id", "binding_id", "agent_id", - name="workflow_agent_runtime_session_scope_unique", + unique=True, + postgresql_where=sa.text("workflow_run_id IS NOT NULL"), + ), + # Conversation owner uniqueness (partial: only rows with a conversation_id). + Index( + "agent_runtime_session_conversation_scope_unique", + "tenant_id", + "conversation_id", + "agent_id", + unique=True, + postgresql_where=sa.text("conversation_id IS NOT NULL"), ), Index( - "workflow_agent_runtime_session_lookup_idx", + "agent_runtime_session_workflow_lookup_idx", "tenant_id", "workflow_run_id", "node_id", "status", ), - Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"), + Index( + "agent_runtime_session_conversation_lookup_idx", + "tenant_id", + "conversation_id", + "status", + ), + Index("agent_runtime_session_backend_run_idx", "backend_run_id"), ) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[str] = mapped_column(String(255), nullable=False) - node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True) - binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + owner_type: Mapped[AgentRuntimeSessionOwnerType] = mapped_column( + EnumText(AgentRuntimeSessionOwnerType, length=32), nullable=False + ) agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False) backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True) session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False) - # JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps, - # config}). Drives Agent backend cleanup-only runs: the agenton compositor - # rejects a session snapshot whose layer names do not match the cleanup - # composition, so we must replay the same layer graph (minus credential- - # bearing plugin layers) when issuing the cleanup request. + # Workflow-owner columns (NULL for conversation owner). + workflow_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + node_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + binding_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + agent_config_snapshot_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + # JSON-encoded list of cleanup layer specs ({name, type, deps, config}). + # Drives Agent backend cleanup-only runs: the agenton compositor rejects a + # session snapshot whose layer names do not match the cleanup composition, + # so we replay the same layer graph (minus credential-bearing plugin layers). composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]") - status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column( - EnumText(WorkflowAgentRuntimeSessionStatus, length=32), + # Conversation-owner column (NULL for workflow owner). + conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + status: Mapped[AgentRuntimeSessionStatus] = mapped_column( + EnumText(AgentRuntimeSessionStatus, length=32), nullable=False, - default=WorkflowAgentRuntimeSessionStatus.ACTIVE, + default=AgentRuntimeSessionStatus.ACTIVE, ) cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + + +# Back-compat alias for the shipped workflow lifecycle code (PR #36724). +WorkflowAgentRuntimeSession = AgentRuntimeSession diff --git a/api/tests/unit_tests/core/app/apps/agent_app/__init__.py b/api/tests/unit_tests/core/app/apps/agent_app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_session_store.py b/api/tests/unit_tests/core/app/apps/agent_app/test_session_store.py new file mode 100644 index 0000000000..fb5c789df7 --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_session_store.py @@ -0,0 +1,115 @@ +"""Unit tests for the conversation-keyed Agent App session store. + +Exercises the real ORM round-trip against the project's in-memory SQLite engine +(per-test create/drop of the unified ``agent_runtime_sessions`` table), so the +conversation owner path is verified without Postgres. +""" + +from __future__ import annotations + +from collections.abc import Generator + +import pytest +from agenton.compositor import CompositorSessionSnapshot +from agenton.compositor.schemas import LayerSessionSnapshot +from agenton.layers.base import LifecycleState +from sqlalchemy import delete + +from core.app.apps.agent_app.session_store import AgentAppRuntimeSessionStore, AgentAppSessionScope +from core.db.session_factory import session_factory +from models.agent import AgentRuntimeSession, AgentRuntimeSessionOwnerType, AgentRuntimeSessionStatus + + +def _scope(conversation_id: str = "conv-1", agent_id: str = "agent-1") -> AgentAppSessionScope: + return AgentAppSessionScope( + tenant_id="tenant-1", + app_id="app-1", + conversation_id=conversation_id, + agent_id=agent_id, + ) + + +def _snapshot(messages: int = 1) -> CompositorSessionSnapshot: + return CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="history", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]}, + ) + ] + ) + + +@pytest.fixture(autouse=True) +def _create_table() -> Generator[None, None, None]: + engine = session_factory.get_session_maker().kw["bind"] + AgentRuntimeSession.__table__.create(bind=engine, checkfirst=True) + yield + with session_factory.create_session() as session: + session.execute(delete(AgentRuntimeSession)) + session.commit() + AgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True) + + +def test_load_returns_none_when_no_row(): + assert AgentAppRuntimeSessionStore().load_active_snapshot(_scope()) is None + + +def test_save_creates_conversation_owned_row_and_round_trips(): + store = AgentAppRuntimeSessionStore() + store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot(messages=2)) + + loaded = store.load_active_snapshot(_scope()) + assert loaded is not None + assert loaded.layers[0].runtime_state["messages"] == [ + {"role": "user", "content": "m0"}, + {"role": "user", "content": "m1"}, + ] + with session_factory.create_session() as session: + row = session.query(AgentRuntimeSession).one() + assert row.owner_type == AgentRuntimeSessionOwnerType.CONVERSATION + assert row.conversation_id == "conv-1" + assert row.workflow_run_id is None # conversation owner leaves workflow cols NULL + assert row.backend_run_id == "run-1" + + +def test_save_is_noop_when_snapshot_missing(): + store = AgentAppRuntimeSessionStore() + store.save_active_snapshot(scope=_scope(), backend_run_id="run-x", snapshot=None) + with session_factory.create_session() as session: + assert session.query(AgentRuntimeSession).count() == 0 + + +def test_second_turn_updates_same_conversation_row(): + store = AgentAppRuntimeSessionStore() + store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot(messages=1)) + store.save_active_snapshot(scope=_scope(), backend_run_id="run-2", snapshot=_snapshot(messages=3)) + with session_factory.create_session() as session: + rows = session.query(AgentRuntimeSession).all() + assert len(rows) == 1 + assert rows[0].backend_run_id == "run-2" + + +def test_mark_cleaned_then_load_returns_none_and_save_resurrects(): + store = AgentAppRuntimeSessionStore() + store.save_active_snapshot(scope=_scope(), backend_run_id="run-1", snapshot=_snapshot()) + store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1") + assert store.load_active_snapshot(_scope()) is None + # Re-entry revives the row. + store.save_active_snapshot(scope=_scope(), backend_run_id="run-2", snapshot=_snapshot(messages=2)) + with session_factory.create_session() as session: + row = session.query(AgentRuntimeSession).one() + assert row.status == AgentRuntimeSessionStatus.ACTIVE + assert row.cleaned_at is None + assert row.backend_run_id == "run-2" + + +def test_distinct_conversations_do_not_collide(): + store = AgentAppRuntimeSessionStore() + store.save_active_snapshot(scope=_scope(conversation_id="conv-A"), backend_run_id="a", snapshot=_snapshot()) + store.save_active_snapshot(scope=_scope(conversation_id="conv-B"), backend_run_id="b", snapshot=_snapshot()) + assert store.load_active_snapshot(_scope(conversation_id="conv-A")) is not None + assert store.load_active_snapshot(_scope(conversation_id="conv-B")) is not None + with session_factory.create_session() as session: + assert session.query(AgentRuntimeSession).count() == 2