mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
feat: skill markdown cursor pos sync
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
import type { Socket } from 'socket.io-client'
|
||||
import type { CollaborationUpdate } from '@/app/components/workflow/collaboration/types/collaboration'
|
||||
import { LoroDoc } from 'loro-crdt'
|
||||
import { EventEmitter } from '@/app/components/workflow/collaboration/core/event-emitter'
|
||||
import { emitWithAuthGuard, webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager'
|
||||
|
||||
type SkillUpdatePayload = {
|
||||
@ -14,6 +15,12 @@ type SkillStatusPayload = {
|
||||
isLeader: boolean
|
||||
}
|
||||
|
||||
type SkillCursorPayload = {
|
||||
file_id: string
|
||||
start?: number | null
|
||||
end?: number | null
|
||||
}
|
||||
|
||||
type SkillDocEntry = {
|
||||
doc: LoroDoc
|
||||
text: ReturnType<LoroDoc['getText']>
|
||||
@ -21,6 +28,15 @@ type SkillDocEntry = {
|
||||
suppressBroadcast: boolean
|
||||
}
|
||||
|
||||
type SkillCursorInfo = {
|
||||
userId: string
|
||||
start: number
|
||||
end: number
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
type SkillCursorMap = Record<string, SkillCursorInfo>
|
||||
|
||||
class SkillCollaborationManager {
|
||||
private appId: string | null = null
|
||||
private socket: Socket | null = null
|
||||
@ -29,11 +45,18 @@ class SkillCollaborationManager {
|
||||
private syncHandlers = new Map<string, Set<() => void>>()
|
||||
private activeFileId: string | null = null
|
||||
private pendingResync = new Set<string>()
|
||||
private cursorByFile = new Map<string, SkillCursorMap>()
|
||||
private cursorEmitter = new EventEmitter()
|
||||
|
||||
private handleSkillUpdate = (payload: SkillUpdatePayload) => {
|
||||
if (!payload || !payload.file_id || !payload.update)
|
||||
return
|
||||
|
||||
if (payload.is_snapshot) {
|
||||
this.replaceEntryWithSnapshot(payload.file_id, payload.update)
|
||||
return
|
||||
}
|
||||
|
||||
const entry = this.docs.get(payload.file_id)
|
||||
if (!entry)
|
||||
return
|
||||
@ -57,6 +80,28 @@ class SkillCollaborationManager {
|
||||
if (!update || !update.type)
|
||||
return
|
||||
|
||||
if (update.type === 'skill_cursor') {
|
||||
const data = update.data as SkillCursorPayload | undefined
|
||||
const fileId = data?.file_id
|
||||
if (!fileId || !update.userId)
|
||||
return
|
||||
|
||||
const start = typeof data?.start === 'number' ? data.start : null
|
||||
const end = typeof data?.end === 'number' ? data.end : null
|
||||
if (start === null || end === null || start < 0 || end < 0) {
|
||||
this.updateCursor(fileId, update.userId, null)
|
||||
return
|
||||
}
|
||||
|
||||
this.updateCursor(fileId, update.userId, {
|
||||
userId: update.userId,
|
||||
start,
|
||||
end,
|
||||
timestamp: update.timestamp,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (update.type === 'skill_resync_request') {
|
||||
const fileId = (update.data as { file_id?: string } | undefined)?.file_id
|
||||
if (!fileId || !this.isLeader(fileId))
|
||||
@ -92,6 +137,8 @@ class SkillCollaborationManager {
|
||||
this.syncHandlers.clear()
|
||||
this.activeFileId = null
|
||||
this.pendingResync.clear()
|
||||
this.cursorByFile.clear()
|
||||
this.cursorEmitter.removeAllListeners()
|
||||
}
|
||||
|
||||
this.appId = appId
|
||||
@ -132,12 +179,8 @@ class SkillCollaborationManager {
|
||||
if (!this.docs.has(fileId)) {
|
||||
const doc = new LoroDoc()
|
||||
const text = doc.getText('content')
|
||||
const entry: SkillDocEntry = {
|
||||
doc,
|
||||
text,
|
||||
subscribers: new Set(),
|
||||
suppressBroadcast: true,
|
||||
}
|
||||
const entry = this.createEntry(fileId, doc, text)
|
||||
entry.suppressBroadcast = true
|
||||
|
||||
if (initialContent)
|
||||
text.update(initialContent)
|
||||
@ -145,19 +188,6 @@ class SkillCollaborationManager {
|
||||
doc.commit()
|
||||
entry.suppressBroadcast = false
|
||||
|
||||
doc.subscribe((event: { by?: string }) => {
|
||||
if (event.by === 'local') {
|
||||
if (entry.suppressBroadcast)
|
||||
return
|
||||
const update = doc.export({ mode: 'update' })
|
||||
this.emitUpdate(fileId, update)
|
||||
return
|
||||
}
|
||||
|
||||
const nextText = text.toString()
|
||||
entry.subscribers.forEach(callback => callback(nextText, 'remote'))
|
||||
})
|
||||
|
||||
this.docs.set(fileId, entry)
|
||||
}
|
||||
|
||||
@ -216,6 +246,16 @@ class SkillCollaborationManager {
|
||||
}
|
||||
}
|
||||
|
||||
onCursorUpdate(fileId: string, callback: (cursors: SkillCursorMap) => void): () => void {
|
||||
if (!fileId)
|
||||
return () => {}
|
||||
|
||||
const eventKey = this.getCursorEventKey(fileId)
|
||||
const off = this.cursorEmitter.on(eventKey, callback)
|
||||
callback({ ...(this.cursorByFile.get(fileId) || {}) })
|
||||
return off
|
||||
}
|
||||
|
||||
isLeader(fileId: string): boolean {
|
||||
return this.leaderByFile.get(fileId) || false
|
||||
}
|
||||
@ -228,6 +268,23 @@ class SkillCollaborationManager {
|
||||
this.emitSyncRequest(fileId)
|
||||
}
|
||||
|
||||
emitCursorUpdate(fileId: string, cursor: { start: number, end: number } | null): void {
|
||||
if (!fileId || !this.socket || !this.socket.connected)
|
||||
return
|
||||
|
||||
const payload: SkillCursorPayload = {
|
||||
file_id: fileId,
|
||||
start: cursor?.start ?? null,
|
||||
end: cursor?.end ?? null,
|
||||
}
|
||||
|
||||
emitWithAuthGuard(this.socket, 'collaboration_event', {
|
||||
type: 'skill_cursor',
|
||||
data: payload,
|
||||
timestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
|
||||
setActiveFile(appId: string, fileId: string, active: boolean): void {
|
||||
if (!appId || !fileId)
|
||||
return
|
||||
@ -293,6 +350,79 @@ class SkillCollaborationManager {
|
||||
timestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
|
||||
private getCursorEventKey(fileId: string): string {
|
||||
return `skill_cursor:${fileId}`
|
||||
}
|
||||
|
||||
private updateCursor(fileId: string, userId: string, cursor: SkillCursorInfo | null): void {
|
||||
const current = this.cursorByFile.get(fileId) || {}
|
||||
if (!cursor) {
|
||||
if (!current[userId])
|
||||
return
|
||||
delete current[userId]
|
||||
this.cursorByFile.set(fileId, current)
|
||||
this.cursorEmitter.emit(this.getCursorEventKey(fileId), { ...current })
|
||||
return
|
||||
}
|
||||
|
||||
current[userId] = cursor
|
||||
this.cursorByFile.set(fileId, current)
|
||||
this.cursorEmitter.emit(this.getCursorEventKey(fileId), { ...current })
|
||||
}
|
||||
|
||||
private subscribeDoc(fileId: string, entry: SkillDocEntry) {
|
||||
entry.doc.subscribe((event: { by?: string }) => {
|
||||
if (event.by === 'local') {
|
||||
if (entry.suppressBroadcast)
|
||||
return
|
||||
const update = entry.doc.export({ mode: 'update' })
|
||||
this.emitUpdate(fileId, update)
|
||||
return
|
||||
}
|
||||
|
||||
const nextText = entry.text.toString()
|
||||
entry.subscribers.forEach(callback => callback(nextText, 'remote'))
|
||||
})
|
||||
}
|
||||
|
||||
private createEntry(fileId: string, doc: LoroDoc, text: ReturnType<LoroDoc['getText']>) {
|
||||
const entry: SkillDocEntry = {
|
||||
doc,
|
||||
text,
|
||||
subscribers: new Set(),
|
||||
suppressBroadcast: false,
|
||||
}
|
||||
|
||||
this.subscribeDoc(fileId, entry)
|
||||
return entry
|
||||
}
|
||||
|
||||
private replaceEntryWithSnapshot(fileId: string, snapshot: Uint8Array) {
|
||||
const existing = this.docs.get(fileId)
|
||||
const subscribers = existing?.subscribers ?? new Set<(text: string, source: 'remote') => void>()
|
||||
const doc = new LoroDoc()
|
||||
try {
|
||||
doc.import(new Uint8Array(snapshot))
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Failed to import skill snapshot:', error)
|
||||
return
|
||||
}
|
||||
|
||||
const text = doc.getText('content')
|
||||
const entry: SkillDocEntry = {
|
||||
doc,
|
||||
text,
|
||||
subscribers,
|
||||
suppressBroadcast: false,
|
||||
}
|
||||
this.subscribeDoc(fileId, entry)
|
||||
this.docs.set(fileId, entry)
|
||||
|
||||
const nextText = text.toString()
|
||||
entry.subscribers.forEach(callback => callback(nextText, 'remote'))
|
||||
}
|
||||
}
|
||||
|
||||
export const skillCollaborationManager = new SkillCollaborationManager()
|
||||
|
||||
@ -64,6 +64,7 @@ export type CollaborationEventType
|
||||
| 'app_publish_update'
|
||||
| 'graph_view_active'
|
||||
| 'skill_file_active'
|
||||
| 'skill_cursor'
|
||||
| 'skill_sync_request'
|
||||
| 'skill_resync_request'
|
||||
| 'graph_resync_request'
|
||||
|
||||
Reference in New Issue
Block a user