fix: make sure restart server not get ghost online user

This commit is contained in:
hjlarry
2026-04-10 10:50:39 +08:00
parent 43b9bcd4e0
commit 959a9f3456
2 changed files with 103 additions and 18 deletions

View File

@ -86,7 +86,7 @@ class WorkflowCollaborationService:
else:
if leader_sid:
self._repository.delete_leader(workflow_id)
target_sid = self._select_active_leader(workflow_id, preferred_sid=sid)
target_sid = self._select_graph_leader(workflow_id, preferred_sid=sid)
if target_sid:
self._repository.set_leader(workflow_id, target_sid)
self.broadcast_leader_change(workflow_id, target_sid)
@ -153,18 +153,17 @@ class WorkflowCollaborationService:
if current_leader != disconnected_sid:
return
session_sids = self._repository.get_session_sids(workflow_id)
if session_sids:
new_leader_sid = session_sids[0]
new_leader_sid = self._select_graph_leader(workflow_id)
if new_leader_sid:
self._repository.set_leader(workflow_id, new_leader_sid)
self.broadcast_leader_change(workflow_id, new_leader_sid)
else:
self._repository.delete_leader(workflow_id)
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str) -> None:
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str | None) -> None:
for sid in self._repository.get_session_sids(workflow_id):
try:
is_leader = sid == new_leader_sid
is_leader = new_leader_sid is not None and sid == new_leader_sid
self._socketio.emit("status", {"isLeader": is_leader}, room=sid)
except Exception:
logging.exception("Failed to emit leader status to session %s", sid)
@ -172,11 +171,44 @@ class WorkflowCollaborationService:
def get_current_leader(self, workflow_id: str) -> str | None:
return self._repository.get_current_leader(workflow_id)
def _prune_inactive_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]:
"""Remove inactive sessions from storage and return active sessions only."""
sessions = self._repository.list_sessions(workflow_id)
if not sessions:
return []
active_sessions: list[WorkflowSessionInfo] = []
stale_sids: list[str] = []
for session in sessions:
sid = session["sid"]
if self.is_session_active(workflow_id, sid):
active_sessions.append(session)
else:
stale_sids.append(sid)
for sid in stale_sids:
self._repository.delete_session(workflow_id, sid)
return active_sessions
def broadcast_online_users(self, workflow_id: str) -> None:
users = self._repository.list_sessions(workflow_id)
users = self._prune_inactive_sessions(workflow_id)
users.sort(key=lambda x: x.get("connected_at") or 0)
leader_sid = self.get_current_leader(workflow_id)
previous_leader = leader_sid
active_sids = {user["sid"] for user in users}
if leader_sid and leader_sid not in active_sids:
self._repository.delete_leader(workflow_id)
leader_sid = None
if not leader_sid and users:
leader_sid = self._select_graph_leader(workflow_id)
if leader_sid:
self._repository.set_leader(workflow_id, leader_sid)
if leader_sid != previous_leader:
self.broadcast_leader_change(workflow_id, leader_sid)
self._socketio.emit(
"online_users",
@ -200,8 +232,12 @@ class WorkflowCollaborationService:
self._repository.set_leader(workflow_id, sid)
self.broadcast_leader_change(workflow_id, sid)
def _select_active_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
session_sids = [sid for sid in self._repository.get_session_sids(workflow_id) if self.is_session_active(workflow_id, sid)]
def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
session_sids = [
session["sid"]
for session in self._repository.list_sessions(workflow_id)
if session.get("graph_active", True) and self.is_session_active(workflow_id, session["sid"])
]
if not session_sids:
return None
if preferred_sid and preferred_sid in session_sids:

View File

@ -107,7 +107,10 @@ class TestWorkflowCollaborationService:
collaboration_service, repository, socketio = service
repository.get_sid_mapping.return_value = {"workflow_id": "wf-1", "user_id": "u-1"}
repository.get_current_leader.return_value = "sid-old"
repository.get_session_sids.return_value = ["sid-2", "sid-3"]
repository.list_sessions.return_value = [
{"user_id": "u-2", "username": "B", "avatar": None, "sid": "sid-2", "connected_at": 1, "graph_active": True},
{"user_id": "u-3", "username": "C", "avatar": None, "sid": "sid-3", "connected_at": 2, "graph_active": True},
]
payload = {"type": "sync_request", "data": {"reason": "join"}, "timestamp": 123}
def _is_session_active(_workflow_id: str, session_sid: str) -> bool:
@ -136,7 +139,7 @@ class TestWorkflowCollaborationService:
collaboration_service, repository, socketio = service
repository.get_sid_mapping.return_value = {"workflow_id": "wf-1", "user_id": "u-1"}
repository.get_current_leader.return_value = "sid-old"
repository.get_session_sids.return_value = []
repository.list_sessions.return_value = []
payload = {"type": "sync_request", "data": {"reason": "join"}, "timestamp": 123}
with (
@ -210,9 +213,19 @@ class TestWorkflowCollaborationService:
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = "sid-1"
repository.set_leader_if_absent.return_value = True
repository.list_sessions.return_value = [
{
"user_id": "u-2",
"username": "B",
"avatar": None,
"sid": "sid-2",
"connected_at": 1,
"graph_active": True,
}
]
with (
patch.object(collaboration_service, "is_session_active", return_value=False),
patch.object(collaboration_service, "is_session_active", side_effect=lambda _wf, sid: sid != "sid-1"),
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
):
# Act
@ -231,6 +244,16 @@ class TestWorkflowCollaborationService:
collaboration_service, repository, _socketio = service
repository.get_current_leader.side_effect = [None, "sid-3"]
repository.set_leader_if_absent.return_value = False
repository.list_sessions.return_value = [
{
"user_id": "u-2",
"username": "B",
"avatar": None,
"sid": "sid-2",
"connected_at": 1,
"graph_active": True,
}
]
# Act
result = collaboration_service.get_or_set_leader("wf-1", "sid-2")
@ -244,9 +267,21 @@ class TestWorkflowCollaborationService:
# Arrange
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = "sid-1"
repository.get_session_sids.return_value = ["sid-2"]
repository.list_sessions.return_value = [
{
"user_id": "u-2",
"username": "B",
"avatar": None,
"sid": "sid-2",
"connected_at": 1,
"graph_active": True,
}
]
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
with (
patch.object(collaboration_service, "is_session_active", return_value=True),
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
):
# Act
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
@ -260,7 +295,7 @@ class TestWorkflowCollaborationService:
# Arrange
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = "sid-1"
repository.get_session_sids.return_value = []
repository.list_sessions.return_value = []
# Act
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
@ -279,8 +314,9 @@ class TestWorkflowCollaborationService:
]
repository.get_current_leader.return_value = "sid-1"
# Act
collaboration_service.broadcast_online_users("wf-1")
with patch.object(collaboration_service, "is_session_active", return_value=True):
# Act
collaboration_service.broadcast_online_users("wf-1")
# Assert
socketio.emit.assert_called_once_with(
@ -318,8 +354,21 @@ class TestWorkflowCollaborationService:
# Arrange
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = None
repository.list_sessions.return_value = [
{
"user_id": "u-2",
"username": "B",
"avatar": None,
"sid": "sid-2",
"connected_at": 1,
"graph_active": True,
}
]
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
with (
patch.object(collaboration_service, "is_session_active", return_value=True),
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
):
# Act
collaboration_service.refresh_session_state("wf-1", "sid-2")