mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-05 01:37:46 +08:00
Fix: allow document parsing status recovery after transient errors (#13341)
### What problem does this PR solve? Fixes #13285 When an LLM returns a transient error (e.g. overloaded) during parsing, the task progress is set to -1. Previously, the progress could never be updated again, leaving the document permanently stuck in FAIL status even after the task successfully recovered and completed. Three coordinated changes address this: 1. task_service.update_progress: relax the progress update guard to accept prog >= 1 even when current progress is -1, so a task that recovers from a transient failure can report completion. 2. document_service.get_unfinished_docs: include documents that are marked FAIL (progress == -1) but still have at least one non-failed task (task.progress >= 0) in the polling set, so their status can be re-synced once a task recovers. Documents where all tasks have permanently failed are excluded to avoid unnecessary polling. 3. document_service.update_progress: explicitly set document status to RUNNING when not all tasks have finished, instead of preserving whatever stale status (potentially FAIL) the document previously had. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@ -522,15 +522,21 @@ class DocumentService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_unfinished_docs(cls):
|
||||
fields = [cls.model.id, cls.model.process_begin_at, cls.model.parser_config, cls.model.progress_msg, cls.model.run, cls.model.parser_id]
|
||||
unfinished_task_query = Task.select(Task.doc_id).where((Task.progress >= 0) & (Task.progress < 1))
|
||||
fields = [cls.model.id, cls.model.process_begin_at, cls.model.parser_config, cls.model.progress_msg,
|
||||
cls.model.run, cls.model.parser_id]
|
||||
unfinished_task_query = Task.select(Task.doc_id).where(
|
||||
(Task.progress >= 0) & (Task.progress < 1)
|
||||
)
|
||||
docs_with_non_failed_tasks = Task.select(Task.doc_id).where(Task.progress >= 0).distinct()
|
||||
|
||||
docs = cls.model.select(*fields).where(
|
||||
cls.model.status == StatusEnum.VALID.value,
|
||||
~(cls.model.type == FileType.VIRTUAL.value),
|
||||
((cls.model.run.is_null(True)) | (cls.model.run != TaskStatus.CANCEL.value)),
|
||||
(((cls.model.progress < 1) & (cls.model.progress > 0)) | (cls.model.id.in_(unfinished_task_query))),
|
||||
) # including unfinished tasks like GraphRAG, RAPTOR and Mindmap
|
||||
(((cls.model.progress < 1) & (cls.model.progress > 0)) |
|
||||
(cls.model.id.in_(unfinished_task_query)) |
|
||||
((cls.model.progress == -1) & (cls.model.run == TaskStatus.FAIL.value) &
|
||||
(cls.model.id.in_(docs_with_non_failed_tasks))))) # including GraphRAG/RAPTOR/Mindmap; re-sync failed docs
|
||||
return list(docs.dicts())
|
||||
|
||||
@classmethod
|
||||
@ -850,6 +856,8 @@ class DocumentService(CommonService):
|
||||
elif finished:
|
||||
prg = 1
|
||||
status = TaskStatus.DONE.value
|
||||
elif not finished:
|
||||
status = TaskStatus.RUNNING.value
|
||||
|
||||
# only for special task and parsed docs and unfinished
|
||||
freeze_progress = special_task_running and doc_progress >= 1 and not finished
|
||||
|
||||
@ -304,9 +304,8 @@ class TaskService(CommonService):
|
||||
|
||||
Update Rules:
|
||||
- progress_msg: Always appends the new message to the existing one, and trims the result to max 3000 lines.
|
||||
- progress: Only updates if the current progress is not -1 AND
|
||||
(the new progress is -1 OR greater than the existing progress),
|
||||
to avoid overwriting valid progress with invalid or regressive values.
|
||||
- progress: Updates when (a) new progress >= 1 (allows recovery from -1), or
|
||||
(b) current progress != -1 AND (new progress is -1 OR greater than existing).
|
||||
|
||||
Args:
|
||||
id (str): The unique identifier of the task to update.
|
||||
@ -327,10 +326,8 @@ class TaskService(CommonService):
|
||||
prog = info["progress"]
|
||||
cls.model.update(progress=prog).where(
|
||||
(cls.model.id == id) &
|
||||
(
|
||||
(cls.model.progress != -1) &
|
||||
((prog == -1) | (prog > cls.model.progress))
|
||||
)
|
||||
((prog >= 1) | ((cls.model.progress != -1) &
|
||||
((prog == -1) | (prog > cls.model.progress))))
|
||||
).execute()
|
||||
else:
|
||||
with DB.lock("update_progress", -1):
|
||||
@ -341,10 +338,8 @@ class TaskService(CommonService):
|
||||
prog = info["progress"]
|
||||
cls.model.update(progress=prog).where(
|
||||
(cls.model.id == id) &
|
||||
(
|
||||
(cls.model.progress != -1) &
|
||||
((prog == -1) | (prog > cls.model.progress))
|
||||
)
|
||||
((prog >= 1) | ((cls.model.progress != -1) &
|
||||
((prog == -1) | (prog > cls.model.progress))))
|
||||
).execute()
|
||||
|
||||
process_duration = (datetime.now() - task.begin_at).total_seconds()
|
||||
|
||||
Reference in New Issue
Block a user