mirror of
https://github.com/langgenius/dify.git
synced 2026-05-02 16:38:04 +08:00
fix: make sure restart server not get ghost online user
This commit is contained in:
@ -130,7 +130,7 @@ class WorkflowCollaborationService:
|
||||
else:
|
||||
if leader_sid:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
target_sid = self._select_active_graph_leader(workflow_id, preferred_sid=sid)
|
||||
target_sid = self._select_graph_leader(workflow_id, preferred_sid=sid)
|
||||
if target_sid:
|
||||
self._repository.set_leader(workflow_id, target_sid)
|
||||
self.broadcast_leader_change(workflow_id, target_sid)
|
||||
@ -256,11 +256,44 @@ class WorkflowCollaborationService:
|
||||
def get_current_leader(self, workflow_id: str) -> str | None:
|
||||
return self._repository.get_current_leader(workflow_id)
|
||||
|
||||
def _prune_inactive_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]:
|
||||
"""Remove inactive sessions from storage and return active sessions only."""
|
||||
sessions = self._repository.list_sessions(workflow_id)
|
||||
if not sessions:
|
||||
return []
|
||||
|
||||
active_sessions: list[WorkflowSessionInfo] = []
|
||||
stale_sids: list[str] = []
|
||||
for session in sessions:
|
||||
sid = session["sid"]
|
||||
if self.is_session_active(workflow_id, sid):
|
||||
active_sessions.append(session)
|
||||
else:
|
||||
stale_sids.append(sid)
|
||||
|
||||
for sid in stale_sids:
|
||||
self._repository.delete_session(workflow_id, sid)
|
||||
|
||||
return active_sessions
|
||||
|
||||
def broadcast_online_users(self, workflow_id: str) -> None:
|
||||
users = self._repository.list_sessions(workflow_id)
|
||||
users = self._prune_inactive_sessions(workflow_id)
|
||||
users.sort(key=lambda x: x.get("connected_at") or 0)
|
||||
|
||||
leader_sid = self.get_current_leader(workflow_id)
|
||||
previous_leader = leader_sid
|
||||
active_sids = {user["sid"] for user in users}
|
||||
if leader_sid and leader_sid not in active_sids:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
leader_sid = None
|
||||
|
||||
if not leader_sid and users:
|
||||
leader_sid = self._select_graph_leader(workflow_id)
|
||||
if leader_sid:
|
||||
self._repository.set_leader(workflow_id, leader_sid)
|
||||
|
||||
if leader_sid != previous_leader:
|
||||
self.broadcast_leader_change(workflow_id, leader_sid)
|
||||
|
||||
self._socketio.emit(
|
||||
"online_users",
|
||||
@ -316,16 +349,6 @@ class WorkflowCollaborationService:
|
||||
self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid)
|
||||
|
||||
def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
|
||||
session_sids = [
|
||||
session["sid"] for session in self._repository.list_sessions(workflow_id) if session.get("graph_active")
|
||||
]
|
||||
if not session_sids:
|
||||
return None
|
||||
if preferred_sid and preferred_sid in session_sids:
|
||||
return preferred_sid
|
||||
return session_sids[0]
|
||||
|
||||
def _select_active_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
|
||||
session_sids = [
|
||||
session["sid"]
|
||||
for session in self._repository.list_sessions(workflow_id)
|
||||
|
||||
@ -140,9 +140,19 @@ class TestWorkflowCollaborationService:
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.set_leader_if_absent.return_value = True
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=False),
|
||||
patch.object(collaboration_service, "is_session_active", side_effect=lambda _wf, sid: sid != "sid-1"),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
@ -161,6 +171,16 @@ class TestWorkflowCollaborationService:
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.side_effect = [None, "sid-3"]
|
||||
repository.set_leader_if_absent.return_value = False
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
# Act
|
||||
result = collaboration_service.get_or_set_leader("wf-1", "sid-2")
|
||||
@ -174,9 +194,21 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.get_session_sids.return_value = ["sid-2"]
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=True),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
|
||||
|
||||
@ -190,7 +222,7 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
repository.get_session_sids.return_value = []
|
||||
repository.list_sessions.return_value = []
|
||||
|
||||
# Act
|
||||
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
|
||||
@ -209,8 +241,9 @@ class TestWorkflowCollaborationService:
|
||||
]
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
|
||||
# Act
|
||||
collaboration_service.broadcast_online_users("wf-1")
|
||||
with patch.object(collaboration_service, "is_session_active", return_value=True):
|
||||
# Act
|
||||
collaboration_service.broadcast_online_users("wf-1")
|
||||
|
||||
# Assert
|
||||
socketio.emit.assert_called_once_with(
|
||||
@ -248,8 +281,21 @@ class TestWorkflowCollaborationService:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = None
|
||||
repository.list_sessions.return_value = [
|
||||
{
|
||||
"user_id": "u-2",
|
||||
"username": "B",
|
||||
"avatar": None,
|
||||
"sid": "sid-2",
|
||||
"connected_at": 1,
|
||||
"graph_active": True,
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
|
||||
with (
|
||||
patch.object(collaboration_service, "is_session_active", return_value=True),
|
||||
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
|
||||
):
|
||||
# Act
|
||||
collaboration_service.refresh_session_state("wf-1", "sid-2")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user