Files
ragflow/deepdoc/parser/opendataloader_parser.py
euvre 2846a93998 Fix: Remove hardcoded page limits causing parsing failures on large PDFs (>300 pages) (#14382)
### What problem does this PR solve?

Fixes #14196

## Problem

When using DeepDOC to parse large PDFs (over 1000 pages), the parser
silently truncated processing at 300 pages due to a hardcoded default
`page_to=299` in `RAGFlowPdfParser.__images__()`. This caused:

- **Errors** on pages beyond the limit
- **Poor image quality** as the parser attempted to compensate with
missing page data
- **Inconsistent chunk splitting** between full PDF imports and partial
imports

Additionally, the codebase scattered magic numbers (`299`, `600`,
`10000`, `100000`, `100000000`, `10000000000`, `10**9`) across 22 files
as sentinel values for "parse all pages", making future maintenance
error-prone.

## Root Cause

```python
# deepdoc/parser/pdf_parser.py (before)
def __images__(self, fnm, zoomin=3, page_from=0, page_to=299, callback=None):
    # Only the first 300 pages were rendered; everything beyond was silently dropped
```

While most callers in `rag/app/*.py` correctly passed `to_page=100000`,
the base class `RAGFlowPdfParser.__call__()` and `parse_into_bboxes()`
invoked `__images__` **without** forwarding `page_from`/`page_to`,
falling back to the restrictive default of 299.

## Solution

### 1. Define constants in `common/constants.py`

```python
MAXIMUM_PAGE_NUMBER = 100000                        # Used by the parsing layer
MAXIMUM_TASK_PAGE_NUMBER = MAXIMUM_PAGE_NUMBER * 1000  # Used by the task/DB layer
```

### 2. Replace all hardcoded sentinel values

| Layer | Files Changed | Old Values | New Value |
|---|---|---|---|
| **Deepdoc parsers** | `pdf_parser.py`, `mineru_parser.py`,
`docling_parser.py`, `opendataloader_parser.py`, `paddleocr_parser.py`,
`docx_parser.py` | `299`, `600`, `10**9`, `100000000` |
`MAXIMUM_PAGE_NUMBER` |
| **Chunk parsers** | `naive.py`, `book.py`, `qa.py`, `one.py`,
`manual.py`, `paper.py`, `presentation.py`, `laws.py`, `resume.py`,
`email.py`, `table.py` | `100000`, `10000`, `10000000000` |
`MAXIMUM_PAGE_NUMBER` |
| **Task/DB layer** | `db_models.py`, `task_service.py`,
`document_service.py`, `file_service.py` | `100000000` |
`MAXIMUM_TASK_PAGE_NUMBER` |

### 3. Fix `parse_into_bboxes()` missing parameters

Added `from_page`/`to_page` parameters to `parse_into_bboxes()` so that
the `rag/flow/parser/parser.py` DeepDOC path no longer falls back to the
restrictive default.

## Files Changed (22)

- `common/constants.py`
- `deepdoc/parser/pdf_parser.py`
- `deepdoc/parser/mineru_parser.py`
- `deepdoc/parser/docling_parser.py`
- `deepdoc/parser/opendataloader_parser.py`
- `deepdoc/parser/paddleocr_parser.py`
- `deepdoc/parser/docx_parser.py`
- `rag/app/naive.py`
- `rag/app/book.py`
- `rag/app/qa.py`
- `rag/app/one.py`
- `rag/app/manual.py`
- `rag/app/paper.py`
- `rag/app/presentation.py`
- `rag/app/laws.py`
- `rag/app/resume.py`
- `rag/app/email.py`
- `rag/app/table.py`
- `api/db/db_models.py`
- `api/db/services/task_service.py`
- `api/db/services/document_service.py`
- `api/db/services/file_service.py`

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring

---------

Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 14:57:20 +08:00

434 lines
16 KiB
Python

from __future__ import annotations
import logging
import os
import re
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
import pdfplumber
import requests
from PIL import Image
from common.constants import MAXIMUM_PAGE_NUMBER
try:
from deepdoc.parser.pdf_parser import RAGFlowPdfParser
except Exception:
class RAGFlowPdfParser:
pass
from deepdoc.parser.utils import extract_pdf_outlines
class OpenDataLoaderContentType(str, Enum):
IMAGE = "image"
TABLE = "table"
TEXT = "text"
EQUATION = "equation"
@dataclass
class _BBox:
page_no: int
x0: float
y0: float
x1: float
y1: float
_TEXT_TYPES = {"heading", "title", "paragraph", "text", "list", "list_item", "caption"}
_TABLE_TYPES = {"table"}
_IMAGE_TYPES = {"image", "picture", "figure"}
_FORMULA_TYPES = {"formula", "equation"}
def _as_float(v) -> Optional[float]:
try:
return float(v)
except Exception:
return None
def _bbox_from_element(el: dict) -> Optional[_BBox]:
bb = el.get("bounding box") or el.get("bounding_box") or el.get("bbox")
pn = el.get("page number")
if pn is None:
pn = el.get("page_number")
if pn is None:
pn = el.get("page")
if bb is None or pn is None:
return None
if not isinstance(bb, (list, tuple)) or len(bb) < 4:
return None
coords = [_as_float(x) for x in bb[:4]]
if any(c is None for c in coords):
return None
try:
page_no = int(pn)
except Exception:
return None
# OpenDataLoader emits [left, bottom, right, top] in PDF points.
left, bottom, right, top = coords
x0, x1 = min(left, right), max(left, right)
y0, y1 = min(bottom, top), max(bottom, top)
return _BBox(page_no=page_no, x0=x0, y0=y0, x1=x1, y1=y1)
def _iter_elements(node: Any) -> Iterable[dict]:
if isinstance(node, dict):
if "type" in node and ("content" in node or "text" in node or "cells" in node):
yield node
for v in node.values():
yield from _iter_elements(v)
elif isinstance(node, list):
for item in node:
yield from _iter_elements(item)
def _element_text(el: dict) -> str:
content = el.get("content")
if isinstance(content, str):
return content
text = el.get("text")
if isinstance(text, str):
return text
# tables may expose cells; join row-wise if needed
cells = el.get("cells")
if isinstance(cells, list):
rows: dict[int, list[str]] = {}
for c in cells:
if not isinstance(c, dict):
continue
row = c.get("row") or c.get("row_index") or 0
rows.setdefault(int(row), []).append(str(c.get("content") or c.get("text") or ""))
return "\n".join(" | ".join(v) for _, v in sorted(rows.items()))
return ""
def _element_html(el: dict) -> str:
for key in ("html", "html_content"):
v = el.get(key)
if isinstance(v, str) and v.strip():
return v
return ""
class OpenDataLoaderParser(RAGFlowPdfParser):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.page_images: list[Image.Image] = []
self.page_from = 0
self.page_to = 10_000
self.outlines = []
self.api_url = os.environ.get("OPENDATALOADER_APISERVER", "").rstrip("/")
self.api_key = os.environ.get("OPENDATALOADER_API_KEY", "").strip()
try:
self.timeout = int(os.environ.get("OPENDATALOADER_TIMEOUT", "600") or "600")
except ValueError:
self.logger.warning("[OpenDataLoader] Invalid OPENDATALOADER_TIMEOUT, falling back to 600s")
self.timeout = 600
def check_installation(self) -> bool:
"""Return True when the OpenDataLoader service is reachable."""
if not self.api_url:
self.logger.warning(
"[OpenDataLoader] OPENDATALOADER_APISERVER is not set. "
"Start the opendataloader service and set the env var."
)
return False
try:
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
resp = requests.get(f"{self.api_url}/health", timeout=5, headers=headers)
if resp.status_code == 200:
return True
self.logger.warning(
f"[OpenDataLoader] Health check returned {resp.status_code}: {resp.text[:200]}"
)
return False
except Exception as exc:
self.logger.warning(f"[OpenDataLoader] Health check failed: {exc}")
return False
def __images__(self, fnm, zoomin: int = 1, page_from=0, page_to=MAXIMUM_PAGE_NUMBER, callback=None):
self.page_from = page_from
self.page_to = page_to
bytes_io = None
try:
if not isinstance(fnm, (str, PathLike)):
bytes_io = fnm if isinstance(fnm, BytesIO) else BytesIO(fnm)
opener = pdfplumber.open(fnm) if isinstance(fnm, (str, PathLike)) else pdfplumber.open(bytes_io)
with opener as pdf:
pages = pdf.pages[page_from:page_to]
self.page_images = [p.to_image(resolution=72 * zoomin, antialias=True).original for p in pages]
except Exception as e:
self.page_images = []
self.logger.exception(e)
finally:
if bytes_io:
bytes_io.close()
def _make_line_tag(self, bbox: _BBox) -> str:
if bbox is None:
return ""
# Guard: only emit a crop tag when the page was actually rendered.
if not self.page_images or bbox.page_no <= 0 or len(self.page_images) < bbox.page_no:
return ""
x0, x1 = bbox.x0, bbox.x1
# OpenDataLoader bbox uses PDF coordinate space (origin bottom-left).
# Convert to image-space (origin top-left) by subtracting from page height.
_, page_height = self.page_images[bbox.page_no - 1].size
top = page_height - bbox.y1
bott = page_height - bbox.y0
return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##".format(
bbox.page_no, x0, x1, top, bott
)
@staticmethod
def extract_positions(txt: str) -> list[tuple[list[int], float, float, float, float]]:
poss = []
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", txt):
pn, left, right, top, bottom = tag.strip("#").strip("@").split("\t")
left, right, top, bottom = float(left), float(right), float(top), float(bottom)
poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom))
return poss
def crop(self, text: str, ZM: int = 1, need_position: bool = False):
if not self.page_images:
return (None, None) if need_position else None
imgs = []
poss = self.extract_positions(text)
if not poss:
return (None, None) if need_position else None
# Drop positions whose page indices fall outside the rendered range.
max_page = len(self.page_images) - 1
poss = [p for p in poss if all(0 <= pn <= max_page for pn in p[0])]
if not poss:
return (None, None) if need_position else None
GAP = 6
pos = poss[0]
poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3] - 120), max(pos[3] - GAP, 0)))
pos = poss[-1]
poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1], pos[4] + GAP), min(self.page_images[pos[0][-1]].size[1], pos[4] + 120)))
positions = []
for ii, (pns, left, right, top, bottom) in enumerate(poss):
if bottom <= top:
bottom = top + 4
img0 = self.page_images[pns[0]]
x0, y0, x1, y1 = int(left), int(top), int(right), int(min(bottom, img0.size[1]))
crop0 = img0.crop((x0, y0, x1, y1))
imgs.append(crop0)
if 0 < ii < len(poss) - 1:
positions.append((pns[0] + self.page_from, x0, x1, y0, y1))
remain_bottom = bottom - img0.size[1]
for pn in pns[1:]:
if remain_bottom <= 0:
break
page = self.page_images[pn]
x0, y0, x1, y1 = int(left), 0, int(right), int(min(remain_bottom, page.size[1]))
cimgp = page.crop((x0, y0, x1, y1))
imgs.append(cimgp)
if 0 < ii < len(poss) - 1:
positions.append((pn + self.page_from, x0, x1, y0, y1))
remain_bottom -= page.size[1]
if not imgs:
return (None, None) if need_position else None
height = sum(i.size[1] + GAP for i in imgs)
width = max(i.size[0] for i in imgs)
pic = Image.new("RGB", (width, int(height)), (245, 245, 245))
h = 0
for ii, img in enumerate(imgs):
if ii == 0 or ii + 1 == len(imgs):
img = img.convert("RGBA")
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
overlay.putalpha(128)
img = Image.alpha_composite(img, overlay).convert("RGB")
pic.paste(img, (0, int(h)))
h += img.size[1] + GAP
return (pic, positions) if need_position else pic
def _cropout_region(self, bbox: _BBox, zoomin: int = 1):
if not self.page_images:
return None, ""
idx = (bbox.page_no - 1) - self.page_from
if idx < 0 or idx >= len(self.page_images):
return None, ""
page_img = self.page_images[idx]
W, H = page_img.size
x0 = max(0.0, min(float(bbox.x0), W - 1))
y0 = max(0.0, min(float(H - bbox.y1), H - 1))
x1 = max(x0 + 1.0, min(float(bbox.x1), W))
y1 = max(y0 + 1.0, min(float(H - bbox.y0), H))
try:
crop = page_img.crop((int(x0), int(y0), int(x1), int(y1))).convert("RGB")
except Exception:
return None, ""
pos = (bbox.page_no - 1 if bbox.page_no > 0 else 0, x0, x1, y0, y1)
return crop, [pos]
def _classify(self, el_type: str) -> str:
t = (el_type or "").lower()
if t in _TABLE_TYPES:
return OpenDataLoaderContentType.TABLE.value
if t in _IMAGE_TYPES:
return OpenDataLoaderContentType.IMAGE.value
if t in _FORMULA_TYPES:
return OpenDataLoaderContentType.EQUATION.value
# Preserve the original structural type (heading, title, paragraph,
# list, caption, …) so downstream parsers can apply heading/title heuristics.
return t if t else OpenDataLoaderContentType.TEXT.value
def _transfer_from_json(self, root: Any, parse_method: str):
sections: list[tuple[str, ...]] = []
tables: list = []
for el in _iter_elements(root):
el_type = self._classify(el.get("type", ""))
bbox = _bbox_from_element(el)
tag = self._make_line_tag(bbox) if bbox else ""
if el_type == OpenDataLoaderContentType.TABLE.value:
html = _element_html(el) or _element_text(el)
img = None
positions = ""
if bbox:
img, positions = self._cropout_region(bbox)
tables.append(((img, html), positions if positions else ""))
continue
if el_type == OpenDataLoaderContentType.IMAGE.value:
img = None
positions = ""
if bbox:
img, positions = self._cropout_region(bbox)
caption = _element_text(el)
tables.append(((img, [caption] if caption else [""]), positions if positions else ""))
continue
text = _element_text(el).strip()
if not text:
continue
if parse_method in {"manual", "pipeline"}:
sections.append((text, el_type, tag))
elif parse_method == "paper":
sections.append((text + tag, el_type))
else:
sections.append((text, tag))
return sections, tables
@staticmethod
def _sections_from_markdown(md: str, parse_method: str) -> list[tuple[str, ...]]:
txt = (md or "").strip()
if not txt:
return []
if parse_method in {"manual", "pipeline"}:
return [(txt, OpenDataLoaderContentType.TEXT.value, "")]
if parse_method == "paper":
return [(txt, OpenDataLoaderContentType.TEXT.value)]
return [(txt, "")]
def parse_pdf(
self,
filepath: str | PathLike[str],
binary: BytesIO | bytes | None = None,
callback: Optional[Callable] = None,
*,
parse_method: str = "raw",
hybrid: Optional[str] = None,
image_output: Optional[str] = None,
sanitize: Optional[bool] = None,
):
self.outlines = extract_pdf_outlines(binary if binary is not None else filepath)
if not self.api_url:
raise RuntimeError(
"[OpenDataLoader] OPENDATALOADER_APISERVER is not configured. "
"Please start the opendataloader service and set the env var."
)
# Render page images locally — used by _make_line_tag() and crop().
# The image rendering stays on the RAGFlow host; only the Java conversion
# runs inside the opendataloader service container.
try:
if binary is not None:
src = BytesIO(binary) if isinstance(binary, (bytes, bytearray)) else binary
self.__images__(src, zoomin=1)
else:
self.__images__(str(filepath), zoomin=1)
except Exception as e:
self.logger.warning(f"[OpenDataLoader] render pages failed: {e}")
# Read PDF bytes for the multipart upload
if binary is not None:
pdf_bytes = binary if isinstance(binary, (bytes, bytearray)) else binary.getvalue()
else:
with open(filepath, "rb") as fh:
pdf_bytes = fh.read()
filename = Path(str(filepath)).name or "input.pdf"
if callback:
callback(0.1, f"[OpenDataLoader] Sending '{filename}' to service")
form_data: dict[str, str] = {}
if hybrid:
form_data["hybrid"] = hybrid
if image_output:
form_data["image_output"] = image_output
if sanitize is not None:
form_data["sanitize"] = "true" if sanitize else "false"
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
last_exc: Exception | None = None
for attempt in range(1, 4):
try:
self.logger.info(f"[OpenDataLoader] POST {self.api_url}/file_parse for '{filename}' (attempt {attempt})")
resp = requests.post(
url=f"{self.api_url}/file_parse",
files={"file": (filename, pdf_bytes, "application/pdf")},
data=form_data,
headers=headers,
timeout=self.timeout,
)
resp.raise_for_status()
result = resp.json()
break
except Exception as exc:
last_exc = exc
self.logger.warning(f"[OpenDataLoader] attempt {attempt} failed: {exc}")
else:
raise RuntimeError(f"[OpenDataLoader] service call failed after 3 attempts: {last_exc}") from last_exc
if callback:
callback(0.7, "[OpenDataLoader] Processing response")
# Service response structure:
# {
# "json_doc": {...} | null, # structured parse tree (preferred)
# "md_text": "..." | null # markdown fallback when json_doc is absent
# }
json_doc = result.get("json_doc")
md_text = result.get("md_text")
sections: list[tuple[str, ...]] = []
tables: list = []
if json_doc is not None:
sections, tables = self._transfer_from_json(json_doc, parse_method=parse_method)
if not sections and md_text:
sections = self._sections_from_markdown(md_text, parse_method=parse_method)
if callback:
callback(1.0, f"[OpenDataLoader] Done. Sections: {len(sections)}, Tables: {len(tables)}")
return sections, tables
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = OpenDataLoaderParser()
print("OpenDataLoader service reachable:", parser.check_installation())