mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
add skill markdown file collaboration
This commit is contained in:
@ -96,6 +96,9 @@ def handle_collaboration_event(sid, data):
|
||||
6. workflow_update
|
||||
7. comments_update
|
||||
8. node_panel_presence
|
||||
9. skill_file_active
|
||||
10. skill_sync_request
|
||||
11. skill_resync_request
|
||||
"""
|
||||
return collaboration_service.relay_collaboration_event(sid, data)
|
||||
|
||||
@ -106,3 +109,11 @@ def handle_graph_event(sid, data):
|
||||
Handle graph events - simple broadcast relay.
|
||||
"""
|
||||
return collaboration_service.relay_graph_event(sid, data)
|
||||
|
||||
|
||||
@_sio_on("skill_event")
|
||||
def handle_skill_event(sid, data):
|
||||
"""
|
||||
Handle skill events - simple broadcast relay.
|
||||
"""
|
||||
return collaboration_service.relay_skill_event(sid, data)
|
||||
|
||||
@ -8,6 +8,7 @@ from extensions.ext_redis import redis_client
|
||||
SESSION_STATE_TTL_SECONDS = 3600
|
||||
WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:"
|
||||
WORKFLOW_LEADER_PREFIX = "workflow_leader:"
|
||||
WORKFLOW_SKILL_LEADER_PREFIX = "workflow_skill_leader:"
|
||||
WS_SID_MAP_PREFIX = "ws_sid_map:"
|
||||
|
||||
|
||||
@ -18,6 +19,7 @@ class WorkflowSessionInfo(TypedDict):
|
||||
sid: str
|
||||
connected_at: int
|
||||
graph_active: bool
|
||||
active_skill_file_id: str | None
|
||||
|
||||
|
||||
class SidMapping(TypedDict):
|
||||
@ -40,6 +42,10 @@ class WorkflowCollaborationRepository:
|
||||
def leader_key(workflow_id: str) -> str:
|
||||
return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}"
|
||||
|
||||
@staticmethod
|
||||
def skill_leader_key(workflow_id: str, file_id: str) -> str:
|
||||
return f"{WORKFLOW_SKILL_LEADER_PREFIX}{workflow_id}:{file_id}"
|
||||
|
||||
@staticmethod
|
||||
def sid_key(sid: str) -> str:
|
||||
return f"{WS_SID_MAP_PREFIX}{sid}"
|
||||
@ -92,6 +98,7 @@ class WorkflowCollaborationRepository:
|
||||
"sid": str(session_info["sid"]),
|
||||
"connected_at": int(session_info.get("connected_at") or 0),
|
||||
"graph_active": bool(session_info.get("graph_active")),
|
||||
"active_skill_file_id": session_info.get("active_skill_file_id"),
|
||||
}
|
||||
|
||||
def set_graph_active(self, workflow_id: str, sid: str, active: bool) -> None:
|
||||
@ -108,6 +115,20 @@ class WorkflowCollaborationRepository:
|
||||
return False
|
||||
return bool(session_info.get("graph_active") or False)
|
||||
|
||||
def set_active_skill_file(self, workflow_id: str, sid: str, file_id: str | None) -> None:
|
||||
session_info = self.get_session_info(workflow_id, sid)
|
||||
if not session_info:
|
||||
return
|
||||
session_info["active_skill_file_id"] = file_id
|
||||
self._redis.hset(self.workflow_key(workflow_id), sid, json.dumps(session_info))
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
def get_active_skill_file_id(self, workflow_id: str, sid: str) -> str | None:
|
||||
session_info = self.get_session_info(workflow_id, sid)
|
||||
if not session_info:
|
||||
return None
|
||||
return session_info.get("active_skill_file_id")
|
||||
|
||||
def get_sid_mapping(self, sid: str) -> SidMapping | None:
|
||||
raw = self._redis.get(self.sid_key(sid))
|
||||
if not raw:
|
||||
@ -165,6 +186,7 @@ class WorkflowCollaborationRepository:
|
||||
"sid": str(session_info["sid"]),
|
||||
"connected_at": int(session_info.get("connected_at") or 0),
|
||||
"graph_active": bool(session_info.get("graph_active")),
|
||||
"active_skill_file_id": session_info.get("active_skill_file_id"),
|
||||
}
|
||||
)
|
||||
|
||||
@ -174,14 +196,31 @@ class WorkflowCollaborationRepository:
|
||||
raw = self._redis.get(self.leader_key(workflow_id))
|
||||
return self._decode(raw)
|
||||
|
||||
def get_skill_leader(self, workflow_id: str, file_id: str) -> str | None:
|
||||
raw = self._redis.get(self.skill_leader_key(workflow_id, file_id))
|
||||
return self._decode(raw)
|
||||
|
||||
def set_leader_if_absent(self, workflow_id: str, sid: str) -> bool:
|
||||
return bool(self._redis.set(self.leader_key(workflow_id), sid, nx=True, ex=SESSION_STATE_TTL_SECONDS))
|
||||
|
||||
def set_leader(self, workflow_id: str, sid: str) -> None:
|
||||
self._redis.set(self.leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS)
|
||||
|
||||
def set_skill_leader(self, workflow_id: str, file_id: str, sid: str) -> None:
|
||||
self._redis.set(self.skill_leader_key(workflow_id, file_id), sid, ex=SESSION_STATE_TTL_SECONDS)
|
||||
|
||||
def delete_leader(self, workflow_id: str) -> None:
|
||||
self._redis.delete(self.leader_key(workflow_id))
|
||||
|
||||
def delete_skill_leader(self, workflow_id: str, file_id: str) -> None:
|
||||
self._redis.delete(self.skill_leader_key(workflow_id, file_id))
|
||||
|
||||
def expire_leader(self, workflow_id: str) -> None:
|
||||
self._redis.expire(self.leader_key(workflow_id), SESSION_STATE_TTL_SECONDS)
|
||||
|
||||
def expire_skill_leader(self, workflow_id: str, file_id: str) -> None:
|
||||
self._redis.expire(self.skill_leader_key(workflow_id, file_id), SESSION_STATE_TTL_SECONDS)
|
||||
|
||||
def get_active_skill_session_sids(self, workflow_id: str, file_id: str) -> list[str]:
|
||||
sessions = self.list_sessions(workflow_id)
|
||||
return [session["sid"] for session in sessions if session.get("active_skill_file_id") == file_id]
|
||||
|
||||
@ -39,6 +39,7 @@ class WorkflowCollaborationService:
|
||||
"sid": sid,
|
||||
"connected_at": int(time.time()),
|
||||
"graph_active": True,
|
||||
"active_skill_file_id": None,
|
||||
}
|
||||
|
||||
self._repository.set_session_info(workflow_id, session_info)
|
||||
@ -59,9 +60,12 @@ class WorkflowCollaborationService:
|
||||
return
|
||||
|
||||
workflow_id = mapping["workflow_id"]
|
||||
active_skill_file_id = self._repository.get_active_skill_file_id(workflow_id, sid)
|
||||
self._repository.delete_session(workflow_id, sid)
|
||||
|
||||
self.handle_leader_disconnect(workflow_id, sid)
|
||||
if active_skill_file_id:
|
||||
self.handle_skill_leader_disconnect(workflow_id, active_skill_file_id, sid)
|
||||
self.broadcast_online_users(workflow_id)
|
||||
|
||||
def relay_collaboration_event(self, sid: str, data: Mapping[str, object]) -> tuple[dict[str, str], int]:
|
||||
@ -89,6 +93,33 @@ class WorkflowCollaborationService:
|
||||
self.broadcast_online_users(workflow_id)
|
||||
return {"msg": "graph_view_active_updated"}, 200
|
||||
|
||||
if event_type == "skill_file_active":
|
||||
file_id = None
|
||||
is_active = False
|
||||
if isinstance(event_data, dict):
|
||||
file_id = event_data.get("file_id")
|
||||
is_active = bool(event_data.get("active") or False)
|
||||
|
||||
if not file_id or not isinstance(file_id, str):
|
||||
return {"msg": "invalid skill_file_active payload"}, 400
|
||||
|
||||
previous_file_id = self._repository.get_active_skill_file_id(workflow_id, sid)
|
||||
next_file_id = file_id if is_active else None
|
||||
|
||||
if previous_file_id == next_file_id:
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
return {"msg": "skill_file_active_unchanged"}, 200
|
||||
|
||||
self._repository.set_active_skill_file(workflow_id, sid, next_file_id)
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
if previous_file_id:
|
||||
self._ensure_skill_leader(workflow_id, previous_file_id)
|
||||
if next_file_id:
|
||||
self._ensure_skill_leader(workflow_id, next_file_id, preferred_sid=sid)
|
||||
|
||||
return {"msg": "skill_file_active_updated"}, 200
|
||||
|
||||
self._socketio.emit(
|
||||
"collaboration_update",
|
||||
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
|
||||
@ -110,6 +141,18 @@ class WorkflowCollaborationService:
|
||||
|
||||
return {"msg": "graph_update_broadcasted"}, 200
|
||||
|
||||
def relay_skill_event(self, sid: str, data: object) -> tuple[dict[str, str], int]:
|
||||
mapping = self._repository.get_sid_mapping(sid)
|
||||
if not mapping:
|
||||
return {"msg": "unauthorized"}, 401
|
||||
|
||||
workflow_id = mapping["workflow_id"]
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
self._socketio.emit("skill_update", data, room=workflow_id, skip_sid=sid)
|
||||
|
||||
return {"msg": "skill_update_broadcasted"}, 200
|
||||
|
||||
def get_or_set_leader(self, workflow_id: str, sid: str) -> str | None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
|
||||
@ -154,6 +197,22 @@ class WorkflowCollaborationService:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
self.broadcast_leader_change(workflow_id, None)
|
||||
|
||||
def handle_skill_leader_disconnect(self, workflow_id: str, file_id: str, disconnected_sid: str) -> None:
|
||||
current_leader = self._repository.get_skill_leader(workflow_id, file_id)
|
||||
if not current_leader:
|
||||
return
|
||||
|
||||
if current_leader != disconnected_sid:
|
||||
return
|
||||
|
||||
new_leader_sid = self._select_skill_leader(workflow_id, file_id)
|
||||
if new_leader_sid:
|
||||
self._repository.set_skill_leader(workflow_id, file_id, new_leader_sid)
|
||||
self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid)
|
||||
else:
|
||||
self._repository.delete_skill_leader(workflow_id, file_id)
|
||||
self.broadcast_skill_leader_change(workflow_id, file_id, None)
|
||||
|
||||
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str | None) -> None:
|
||||
for sid in self._repository.get_session_sids(workflow_id):
|
||||
try:
|
||||
@ -162,6 +221,14 @@ class WorkflowCollaborationService:
|
||||
except Exception:
|
||||
logging.exception("Failed to emit leader status to session %s", sid)
|
||||
|
||||
def broadcast_skill_leader_change(self, workflow_id: str, file_id: str, new_leader_sid: str | None) -> None:
|
||||
for sid in self._repository.get_session_sids(workflow_id):
|
||||
try:
|
||||
is_leader = new_leader_sid is not None and sid == new_leader_sid
|
||||
self._socketio.emit("skill_status", {"file_id": file_id, "isLeader": is_leader}, room=sid)
|
||||
except Exception:
|
||||
logging.exception("Failed to emit skill leader status to session %s", sid)
|
||||
|
||||
def get_current_leader(self, workflow_id: str) -> str | None:
|
||||
return self._repository.get_current_leader(workflow_id)
|
||||
|
||||
@ -180,6 +247,9 @@ class WorkflowCollaborationService:
|
||||
def refresh_session_state(self, workflow_id: str, sid: str) -> None:
|
||||
self._repository.refresh_session_state(workflow_id, sid)
|
||||
self._ensure_leader(workflow_id, sid)
|
||||
active_skill_file_id = self._repository.get_active_skill_file_id(workflow_id, sid)
|
||||
if active_skill_file_id:
|
||||
self._ensure_skill_leader(workflow_id, active_skill_file_id, preferred_sid=sid)
|
||||
|
||||
def _ensure_leader(self, workflow_id: str, sid: str) -> None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
@ -200,6 +270,25 @@ class WorkflowCollaborationService:
|
||||
self._repository.set_leader(workflow_id, new_leader_sid)
|
||||
self.broadcast_leader_change(workflow_id, new_leader_sid)
|
||||
|
||||
def _ensure_skill_leader(self, workflow_id: str, file_id: str, preferred_sid: str | None = None) -> None:
|
||||
current_leader = self._repository.get_skill_leader(workflow_id, file_id)
|
||||
active_sids = self._repository.get_active_skill_session_sids(workflow_id, file_id)
|
||||
if current_leader and self.is_session_active(workflow_id, current_leader):
|
||||
if current_leader in active_sids or not active_sids:
|
||||
self._repository.expire_skill_leader(workflow_id, file_id)
|
||||
return
|
||||
|
||||
if current_leader:
|
||||
self._repository.delete_skill_leader(workflow_id, file_id)
|
||||
|
||||
new_leader_sid = self._select_skill_leader(workflow_id, file_id, preferred_sid=preferred_sid)
|
||||
if not new_leader_sid:
|
||||
self.broadcast_skill_leader_change(workflow_id, file_id, None)
|
||||
return
|
||||
|
||||
self._repository.set_skill_leader(workflow_id, file_id, new_leader_sid)
|
||||
self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid)
|
||||
|
||||
def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None:
|
||||
session_sids = [
|
||||
session["sid"]
|
||||
@ -212,6 +301,20 @@ class WorkflowCollaborationService:
|
||||
return preferred_sid
|
||||
return session_sids[0]
|
||||
|
||||
def _select_skill_leader(
|
||||
self, workflow_id: str, file_id: str, preferred_sid: str | None = None
|
||||
) -> str | None:
|
||||
session_sids = [
|
||||
sid
|
||||
for sid in self._repository.get_active_skill_session_sids(workflow_id, file_id)
|
||||
if self.is_session_active(workflow_id, sid)
|
||||
]
|
||||
if not session_sids:
|
||||
return None
|
||||
if preferred_sid and preferred_sid in session_sids:
|
||||
return preferred_sid
|
||||
return session_sids[0]
|
||||
|
||||
def is_session_active(self, workflow_id: str, sid: str) -> bool:
|
||||
if not sid:
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user