mirror of
https://github.com/langgenius/dify.git
synced 2026-02-22 19:15:47 +08:00
switch to skills tab, keep ws connected and ensure has leader
This commit is contained in:
@ -17,6 +17,7 @@ class WorkflowSessionInfo(TypedDict):
|
||||
avatar: str | None
|
||||
sid: str
|
||||
connected_at: int
|
||||
graph_active: bool
|
||||
|
||||
|
||||
class SidMapping(TypedDict):
|
||||
@ -69,6 +70,44 @@ class WorkflowCollaborationRepository:
|
||||
)
|
||||
self.refresh_session_state(workflow_id, session_info["sid"])
|
||||
|
||||
def get_session_info(self, workflow_id: str, sid: str) -> WorkflowSessionInfo | None:
|
||||
raw = self._redis.hget(self.workflow_key(workflow_id), sid)
|
||||
value = self._decode(raw)
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
session_info = json.loads(value)
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
if not isinstance(session_info, dict):
|
||||
return None
|
||||
if "user_id" not in session_info or "username" not in session_info or "sid" not in session_info:
|
||||
return None
|
||||
|
||||
return {
|
||||
"user_id": str(session_info["user_id"]),
|
||||
"username": str(session_info["username"]),
|
||||
"avatar": session_info.get("avatar"),
|
||||
"sid": str(session_info["sid"]),
|
||||
"connected_at": int(session_info.get("connected_at") or 0),
|
||||
"graph_active": bool(session_info.get("graph_active")),
|
||||
}
|
||||
|
||||
def set_graph_active(self, workflow_id: str, sid: str, active: bool) -> None:
|
||||
session_info = self.get_session_info(workflow_id, sid)
|
||||
if not session_info:
|
||||
return
|
||||
session_info["graph_active"] = bool(active)
|
||||
self._redis.hset(self.workflow_key(workflow_id), sid, json.dumps(session_info))
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
def is_graph_active(self, workflow_id: str, sid: str) -> bool:
|
||||
session_info = self.get_session_info(workflow_id, sid)
|
||||
if not session_info:
|
||||
return False
|
||||
return bool(session_info.get("graph_active") or False)
|
||||
|
||||
def get_sid_mapping(self, sid: str) -> SidMapping | None:
|
||||
raw = self._redis.get(self.sid_key(sid))
|
||||
if not raw:
|
||||
@ -125,6 +164,7 @@ class WorkflowCollaborationRepository:
|
||||
"avatar": session_info.get("avatar"),
|
||||
"sid": str(session_info["sid"]),
|
||||
"connected_at": int(session_info.get("connected_at") or 0),
|
||||
"graph_active": bool(session_info.get("graph_active")),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -38,12 +38,13 @@ class WorkflowCollaborationService:
|
||||
"avatar": session.get("avatar"),
|
||||
"sid": sid,
|
||||
"connected_at": int(time.time()),
|
||||
"graph_active": True,
|
||||
}
|
||||
|
||||
self._repository.set_session_info(workflow_id, session_info)
|
||||
|
||||
leader_sid = self.get_or_set_leader(workflow_id, sid)
|
||||
is_leader = leader_sid == sid
|
||||
is_leader = leader_sid == sid if leader_sid else False
|
||||
|
||||
self._socketio.enter_room(sid, workflow_id)
|
||||
self.broadcast_online_users(workflow_id)
|
||||
@ -79,6 +80,15 @@ class WorkflowCollaborationService:
|
||||
if not event_type:
|
||||
return {"msg": "invalid event type"}, 400
|
||||
|
||||
if event_type == "graph_view_active":
|
||||
is_active = False
|
||||
if isinstance(event_data, dict):
|
||||
is_active = bool(event_data.get("active") or False)
|
||||
self._repository.set_graph_active(workflow_id, sid, is_active)
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
self.broadcast_online_users(workflow_id)
|
||||
return {"msg": "graph_view_active_updated"}, 200
|
||||
|
||||
self._socketio.emit(
|
||||
"collaboration_update",
|
||||
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
|
||||
@ -100,27 +110,33 @@ class WorkflowCollaborationService:
|
||||
|
||||
return {"msg": "graph_update_broadcasted"}, 200
|
||||
|
||||
def get_or_set_leader(self, workflow_id: str, sid: str) -> str:
|
||||
def get_or_set_leader(self, workflow_id: str, sid: str) -> str | None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
|
||||
if current_leader:
|
||||
if self.is_session_active(workflow_id, current_leader):
|
||||
if self.is_session_active(workflow_id, current_leader) and self._repository.is_graph_active(
|
||||
workflow_id, current_leader
|
||||
):
|
||||
return current_leader
|
||||
self._repository.delete_session(workflow_id, current_leader)
|
||||
self._repository.delete_leader(workflow_id)
|
||||
|
||||
was_set = self._repository.set_leader_if_absent(workflow_id, sid)
|
||||
new_leader_sid = self._select_graph_leader(workflow_id, preferred_sid=sid)
|
||||
if not new_leader_sid:
|
||||
return None
|
||||
|
||||
was_set = self._repository.set_leader_if_absent(workflow_id, new_leader_sid)
|
||||
|
||||
if was_set:
|
||||
if current_leader:
|
||||
self.broadcast_leader_change(workflow_id, sid)
|
||||
return sid
|
||||
self.broadcast_leader_change(workflow_id, new_leader_sid)
|
||||
return new_leader_sid
|
||||
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
if current_leader:
|
||||
return current_leader
|
||||
|
||||
return sid
|
||||
return new_leader_sid
|
||||
|
||||
def handle_leader_disconnect(self, workflow_id: str, disconnected_sid: str) -> None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
@ -130,18 +146,18 @@ class WorkflowCollaborationService:
|
||||
if current_leader != disconnected_sid:
|
||||
return
|
||||
|
||||
session_sids = self._repository.get_session_sids(workflow_id)
|
||||
if session_sids:
|
||||
new_leader_sid = session_sids[0]
|
||||
new_leader_sid = self._select_graph_leader(workflow_id)
|
||||
if new_leader_sid:
|
||||
self._repository.set_leader(workflow_id, new_leader_sid)
|
||||
self.broadcast_leader_change(workflow_id, new_leader_sid)
|
||||
else:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
self.broadcast_leader_change(workflow_id, None)
|
||||
|
||||
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str) -> None:
|
||||
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str | None) -> None:
|
||||
for sid in self._repository.get_session_sids(workflow_id):
|
||||
try:
|
||||
is_leader = sid == new_leader_sid
|
||||
is_leader = new_leader_sid is not None and sid == new_leader_sid
|
||||
self._socketio.emit("status", {"isLeader": is_leader}, room=sid)
|
||||
except Exception:
|
||||
logging.exception("Failed to emit leader status to session %s", sid)
|
||||
@ -167,15 +183,34 @@ class WorkflowCollaborationService:
|
||||
|
||||
def _ensure_leader(self, workflow_id: str, sid: str) -> None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
if current_leader and self.is_session_active(workflow_id, current_leader):
|
||||
if current_leader and self.is_session_active(workflow_id, current_leader) and self._repository.is_graph_active(
|
||||
workflow_id, current_leader
|
||||
):
|
||||
self._repository.expire_leader(workflow_id)
|
||||
return
|
||||
|
||||
if current_leader:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
|
||||
self._repository.set_leader(workflow_id, sid)
|
||||
self.broadcast_leader_change(workflow_id, sid)
|
||||
new_leader_sid = self._select_graph_leader(workflow_id, preferred_sid=sid)
|
||||
if not new_leader_sid:
|
||||
self.broadcast_leader_change(workflow_id, None)
|
||||
return
|
||||
|
||||
self._repository.set_leader(workflow_id, new_leader_sid)
|
||||
self.broadcast_leader_change(workflow_id, new_leader_sid)
|
||||
|
||||
def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
|
||||
session_sids = [
|
||||
session["sid"]
|
||||
for session in self._repository.list_sessions(workflow_id)
|
||||
if session.get("graph_active")
|
||||
]
|
||||
if not session_sids:
|
||||
return None
|
||||
if preferred_sid and preferred_sid in session_sids:
|
||||
return preferred_sid
|
||||
return session_sids[0]
|
||||
|
||||
def is_session_active(self, workflow_id: str, sid: str) -> bool:
|
||||
if not sid:
|
||||
|
||||
Reference in New Issue
Block a user