mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-18 23:46:21 +08:00
### What problem does this PR solve? Fixes #14196 ## Problem When using DeepDOC to parse large PDFs (over 1000 pages), the parser silently truncated processing at 300 pages due to a hardcoded default `page_to=299` in `RAGFlowPdfParser.__images__()`. This caused: - **Errors** on pages beyond the limit - **Poor image quality** as the parser attempted to compensate with missing page data - **Inconsistent chunk splitting** between full PDF imports and partial imports Additionally, the codebase scattered magic numbers (`299`, `600`, `10000`, `100000`, `100000000`, `10000000000`, `10**9`) across 22 files as sentinel values for "parse all pages", making future maintenance error-prone. ## Root Cause ```python # deepdoc/parser/pdf_parser.py (before) def __images__(self, fnm, zoomin=3, page_from=0, page_to=299, callback=None): # Only the first 300 pages were rendered; everything beyond was silently dropped ``` While most callers in `rag/app/*.py` correctly passed `to_page=100000`, the base class `RAGFlowPdfParser.__call__()` and `parse_into_bboxes()` invoked `__images__` **without** forwarding `page_from`/`page_to`, falling back to the restrictive default of 299. ## Solution ### 1. Define constants in `common/constants.py` ```python MAXIMUM_PAGE_NUMBER = 100000 # Used by the parsing layer MAXIMUM_TASK_PAGE_NUMBER = MAXIMUM_PAGE_NUMBER * 1000 # Used by the task/DB layer ``` ### 2. Replace all hardcoded sentinel values | Layer | Files Changed | Old Values | New Value | |---|---|---|---| | **Deepdoc parsers** | `pdf_parser.py`, `mineru_parser.py`, `docling_parser.py`, `opendataloader_parser.py`, `paddleocr_parser.py`, `docx_parser.py` | `299`, `600`, `10**9`, `100000000` | `MAXIMUM_PAGE_NUMBER` | | **Chunk parsers** | `naive.py`, `book.py`, `qa.py`, `one.py`, `manual.py`, `paper.py`, `presentation.py`, `laws.py`, `resume.py`, `email.py`, `table.py` | `100000`, `10000`, `10000000000` | `MAXIMUM_PAGE_NUMBER` | | **Task/DB layer** | `db_models.py`, `task_service.py`, `document_service.py`, `file_service.py` | `100000000` | `MAXIMUM_TASK_PAGE_NUMBER` | ### 3. Fix `parse_into_bboxes()` missing parameters Added `from_page`/`to_page` parameters to `parse_into_bboxes()` so that the `rag/flow/parser/parser.py` DeepDOC path no longer falls back to the restrictive default. ## Files Changed (22) - `common/constants.py` - `deepdoc/parser/pdf_parser.py` - `deepdoc/parser/mineru_parser.py` - `deepdoc/parser/docling_parser.py` - `deepdoc/parser/opendataloader_parser.py` - `deepdoc/parser/paddleocr_parser.py` - `deepdoc/parser/docx_parser.py` - `rag/app/naive.py` - `rag/app/book.py` - `rag/app/qa.py` - `rag/app/one.py` - `rag/app/manual.py` - `rag/app/paper.py` - `rag/app/presentation.py` - `rag/app/laws.py` - `rag/app/resume.py` - `rag/app/email.py` - `rag/app/table.py` - `api/db/db_models.py` - `api/db/services/task_service.py` - `api/db/services/document_service.py` - `api/db/services/file_service.py` ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
434 lines
16 KiB
Python
434 lines
16 KiB
Python
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from io import BytesIO
|
|
from os import PathLike
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Iterable, Optional
|
|
|
|
import pdfplumber
|
|
import requests
|
|
from PIL import Image
|
|
|
|
from common.constants import MAXIMUM_PAGE_NUMBER
|
|
|
|
try:
|
|
from deepdoc.parser.pdf_parser import RAGFlowPdfParser
|
|
except Exception:
|
|
class RAGFlowPdfParser:
|
|
pass
|
|
|
|
from deepdoc.parser.utils import extract_pdf_outlines
|
|
|
|
|
|
class OpenDataLoaderContentType(str, Enum):
|
|
IMAGE = "image"
|
|
TABLE = "table"
|
|
TEXT = "text"
|
|
EQUATION = "equation"
|
|
|
|
|
|
@dataclass
|
|
class _BBox:
|
|
page_no: int
|
|
x0: float
|
|
y0: float
|
|
x1: float
|
|
y1: float
|
|
|
|
|
|
_TEXT_TYPES = {"heading", "title", "paragraph", "text", "list", "list_item", "caption"}
|
|
_TABLE_TYPES = {"table"}
|
|
_IMAGE_TYPES = {"image", "picture", "figure"}
|
|
_FORMULA_TYPES = {"formula", "equation"}
|
|
|
|
|
|
def _as_float(v) -> Optional[float]:
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _bbox_from_element(el: dict) -> Optional[_BBox]:
|
|
bb = el.get("bounding box") or el.get("bounding_box") or el.get("bbox")
|
|
pn = el.get("page number")
|
|
if pn is None:
|
|
pn = el.get("page_number")
|
|
if pn is None:
|
|
pn = el.get("page")
|
|
if bb is None or pn is None:
|
|
return None
|
|
if not isinstance(bb, (list, tuple)) or len(bb) < 4:
|
|
return None
|
|
coords = [_as_float(x) for x in bb[:4]]
|
|
if any(c is None for c in coords):
|
|
return None
|
|
try:
|
|
page_no = int(pn)
|
|
except Exception:
|
|
return None
|
|
# OpenDataLoader emits [left, bottom, right, top] in PDF points.
|
|
left, bottom, right, top = coords
|
|
x0, x1 = min(left, right), max(left, right)
|
|
y0, y1 = min(bottom, top), max(bottom, top)
|
|
return _BBox(page_no=page_no, x0=x0, y0=y0, x1=x1, y1=y1)
|
|
|
|
|
|
def _iter_elements(node: Any) -> Iterable[dict]:
|
|
if isinstance(node, dict):
|
|
if "type" in node and ("content" in node or "text" in node or "cells" in node):
|
|
yield node
|
|
for v in node.values():
|
|
yield from _iter_elements(v)
|
|
elif isinstance(node, list):
|
|
for item in node:
|
|
yield from _iter_elements(item)
|
|
|
|
|
|
def _element_text(el: dict) -> str:
|
|
content = el.get("content")
|
|
if isinstance(content, str):
|
|
return content
|
|
text = el.get("text")
|
|
if isinstance(text, str):
|
|
return text
|
|
# tables may expose cells; join row-wise if needed
|
|
cells = el.get("cells")
|
|
if isinstance(cells, list):
|
|
rows: dict[int, list[str]] = {}
|
|
for c in cells:
|
|
if not isinstance(c, dict):
|
|
continue
|
|
row = c.get("row") or c.get("row_index") or 0
|
|
rows.setdefault(int(row), []).append(str(c.get("content") or c.get("text") or ""))
|
|
return "\n".join(" | ".join(v) for _, v in sorted(rows.items()))
|
|
return ""
|
|
|
|
|
|
def _element_html(el: dict) -> str:
|
|
for key in ("html", "html_content"):
|
|
v = el.get(key)
|
|
if isinstance(v, str) and v.strip():
|
|
return v
|
|
return ""
|
|
|
|
|
|
class OpenDataLoaderParser(RAGFlowPdfParser):
|
|
def __init__(self):
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.page_images: list[Image.Image] = []
|
|
self.page_from = 0
|
|
self.page_to = 10_000
|
|
self.outlines = []
|
|
self.api_url = os.environ.get("OPENDATALOADER_APISERVER", "").rstrip("/")
|
|
self.api_key = os.environ.get("OPENDATALOADER_API_KEY", "").strip()
|
|
try:
|
|
self.timeout = int(os.environ.get("OPENDATALOADER_TIMEOUT", "600") or "600")
|
|
except ValueError:
|
|
self.logger.warning("[OpenDataLoader] Invalid OPENDATALOADER_TIMEOUT, falling back to 600s")
|
|
self.timeout = 600
|
|
|
|
def check_installation(self) -> bool:
|
|
"""Return True when the OpenDataLoader service is reachable."""
|
|
if not self.api_url:
|
|
self.logger.warning(
|
|
"[OpenDataLoader] OPENDATALOADER_APISERVER is not set. "
|
|
"Start the opendataloader service and set the env var."
|
|
)
|
|
return False
|
|
try:
|
|
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
|
|
resp = requests.get(f"{self.api_url}/health", timeout=5, headers=headers)
|
|
if resp.status_code == 200:
|
|
return True
|
|
self.logger.warning(
|
|
f"[OpenDataLoader] Health check returned {resp.status_code}: {resp.text[:200]}"
|
|
)
|
|
return False
|
|
except Exception as exc:
|
|
self.logger.warning(f"[OpenDataLoader] Health check failed: {exc}")
|
|
return False
|
|
|
|
def __images__(self, fnm, zoomin: int = 1, page_from=0, page_to=MAXIMUM_PAGE_NUMBER, callback=None):
|
|
self.page_from = page_from
|
|
self.page_to = page_to
|
|
bytes_io = None
|
|
try:
|
|
if not isinstance(fnm, (str, PathLike)):
|
|
bytes_io = fnm if isinstance(fnm, BytesIO) else BytesIO(fnm)
|
|
opener = pdfplumber.open(fnm) if isinstance(fnm, (str, PathLike)) else pdfplumber.open(bytes_io)
|
|
with opener as pdf:
|
|
pages = pdf.pages[page_from:page_to]
|
|
self.page_images = [p.to_image(resolution=72 * zoomin, antialias=True).original for p in pages]
|
|
except Exception as e:
|
|
self.page_images = []
|
|
self.logger.exception(e)
|
|
finally:
|
|
if bytes_io:
|
|
bytes_io.close()
|
|
|
|
def _make_line_tag(self, bbox: _BBox) -> str:
|
|
if bbox is None:
|
|
return ""
|
|
# Guard: only emit a crop tag when the page was actually rendered.
|
|
if not self.page_images or bbox.page_no <= 0 or len(self.page_images) < bbox.page_no:
|
|
return ""
|
|
x0, x1 = bbox.x0, bbox.x1
|
|
# OpenDataLoader bbox uses PDF coordinate space (origin bottom-left).
|
|
# Convert to image-space (origin top-left) by subtracting from page height.
|
|
_, page_height = self.page_images[bbox.page_no - 1].size
|
|
top = page_height - bbox.y1
|
|
bott = page_height - bbox.y0
|
|
return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##".format(
|
|
bbox.page_no, x0, x1, top, bott
|
|
)
|
|
|
|
@staticmethod
|
|
def extract_positions(txt: str) -> list[tuple[list[int], float, float, float, float]]:
|
|
poss = []
|
|
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", txt):
|
|
pn, left, right, top, bottom = tag.strip("#").strip("@").split("\t")
|
|
left, right, top, bottom = float(left), float(right), float(top), float(bottom)
|
|
poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom))
|
|
return poss
|
|
|
|
def crop(self, text: str, ZM: int = 1, need_position: bool = False):
|
|
if not self.page_images:
|
|
return (None, None) if need_position else None
|
|
imgs = []
|
|
poss = self.extract_positions(text)
|
|
if not poss:
|
|
return (None, None) if need_position else None
|
|
# Drop positions whose page indices fall outside the rendered range.
|
|
max_page = len(self.page_images) - 1
|
|
poss = [p for p in poss if all(0 <= pn <= max_page for pn in p[0])]
|
|
if not poss:
|
|
return (None, None) if need_position else None
|
|
GAP = 6
|
|
pos = poss[0]
|
|
poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3] - 120), max(pos[3] - GAP, 0)))
|
|
pos = poss[-1]
|
|
poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1], pos[4] + GAP), min(self.page_images[pos[0][-1]].size[1], pos[4] + 120)))
|
|
positions = []
|
|
for ii, (pns, left, right, top, bottom) in enumerate(poss):
|
|
if bottom <= top:
|
|
bottom = top + 4
|
|
img0 = self.page_images[pns[0]]
|
|
x0, y0, x1, y1 = int(left), int(top), int(right), int(min(bottom, img0.size[1]))
|
|
crop0 = img0.crop((x0, y0, x1, y1))
|
|
imgs.append(crop0)
|
|
if 0 < ii < len(poss) - 1:
|
|
positions.append((pns[0] + self.page_from, x0, x1, y0, y1))
|
|
remain_bottom = bottom - img0.size[1]
|
|
for pn in pns[1:]:
|
|
if remain_bottom <= 0:
|
|
break
|
|
page = self.page_images[pn]
|
|
x0, y0, x1, y1 = int(left), 0, int(right), int(min(remain_bottom, page.size[1]))
|
|
cimgp = page.crop((x0, y0, x1, y1))
|
|
imgs.append(cimgp)
|
|
if 0 < ii < len(poss) - 1:
|
|
positions.append((pn + self.page_from, x0, x1, y0, y1))
|
|
remain_bottom -= page.size[1]
|
|
if not imgs:
|
|
return (None, None) if need_position else None
|
|
height = sum(i.size[1] + GAP for i in imgs)
|
|
width = max(i.size[0] for i in imgs)
|
|
pic = Image.new("RGB", (width, int(height)), (245, 245, 245))
|
|
h = 0
|
|
for ii, img in enumerate(imgs):
|
|
if ii == 0 or ii + 1 == len(imgs):
|
|
img = img.convert("RGBA")
|
|
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
|
|
overlay.putalpha(128)
|
|
img = Image.alpha_composite(img, overlay).convert("RGB")
|
|
pic.paste(img, (0, int(h)))
|
|
h += img.size[1] + GAP
|
|
return (pic, positions) if need_position else pic
|
|
|
|
def _cropout_region(self, bbox: _BBox, zoomin: int = 1):
|
|
if not self.page_images:
|
|
return None, ""
|
|
idx = (bbox.page_no - 1) - self.page_from
|
|
if idx < 0 or idx >= len(self.page_images):
|
|
return None, ""
|
|
page_img = self.page_images[idx]
|
|
W, H = page_img.size
|
|
x0 = max(0.0, min(float(bbox.x0), W - 1))
|
|
y0 = max(0.0, min(float(H - bbox.y1), H - 1))
|
|
x1 = max(x0 + 1.0, min(float(bbox.x1), W))
|
|
y1 = max(y0 + 1.0, min(float(H - bbox.y0), H))
|
|
try:
|
|
crop = page_img.crop((int(x0), int(y0), int(x1), int(y1))).convert("RGB")
|
|
except Exception:
|
|
return None, ""
|
|
pos = (bbox.page_no - 1 if bbox.page_no > 0 else 0, x0, x1, y0, y1)
|
|
return crop, [pos]
|
|
|
|
def _classify(self, el_type: str) -> str:
|
|
t = (el_type or "").lower()
|
|
if t in _TABLE_TYPES:
|
|
return OpenDataLoaderContentType.TABLE.value
|
|
if t in _IMAGE_TYPES:
|
|
return OpenDataLoaderContentType.IMAGE.value
|
|
if t in _FORMULA_TYPES:
|
|
return OpenDataLoaderContentType.EQUATION.value
|
|
# Preserve the original structural type (heading, title, paragraph,
|
|
# list, caption, …) so downstream parsers can apply heading/title heuristics.
|
|
return t if t else OpenDataLoaderContentType.TEXT.value
|
|
|
|
def _transfer_from_json(self, root: Any, parse_method: str):
|
|
sections: list[tuple[str, ...]] = []
|
|
tables: list = []
|
|
for el in _iter_elements(root):
|
|
el_type = self._classify(el.get("type", ""))
|
|
bbox = _bbox_from_element(el)
|
|
tag = self._make_line_tag(bbox) if bbox else ""
|
|
|
|
if el_type == OpenDataLoaderContentType.TABLE.value:
|
|
html = _element_html(el) or _element_text(el)
|
|
img = None
|
|
positions = ""
|
|
if bbox:
|
|
img, positions = self._cropout_region(bbox)
|
|
tables.append(((img, html), positions if positions else ""))
|
|
continue
|
|
|
|
if el_type == OpenDataLoaderContentType.IMAGE.value:
|
|
img = None
|
|
positions = ""
|
|
if bbox:
|
|
img, positions = self._cropout_region(bbox)
|
|
caption = _element_text(el)
|
|
tables.append(((img, [caption] if caption else [""]), positions if positions else ""))
|
|
continue
|
|
|
|
text = _element_text(el).strip()
|
|
if not text:
|
|
continue
|
|
if parse_method in {"manual", "pipeline"}:
|
|
sections.append((text, el_type, tag))
|
|
elif parse_method == "paper":
|
|
sections.append((text + tag, el_type))
|
|
else:
|
|
sections.append((text, tag))
|
|
return sections, tables
|
|
|
|
@staticmethod
|
|
def _sections_from_markdown(md: str, parse_method: str) -> list[tuple[str, ...]]:
|
|
txt = (md or "").strip()
|
|
if not txt:
|
|
return []
|
|
if parse_method in {"manual", "pipeline"}:
|
|
return [(txt, OpenDataLoaderContentType.TEXT.value, "")]
|
|
if parse_method == "paper":
|
|
return [(txt, OpenDataLoaderContentType.TEXT.value)]
|
|
return [(txt, "")]
|
|
|
|
def parse_pdf(
|
|
self,
|
|
filepath: str | PathLike[str],
|
|
binary: BytesIO | bytes | None = None,
|
|
callback: Optional[Callable] = None,
|
|
*,
|
|
parse_method: str = "raw",
|
|
hybrid: Optional[str] = None,
|
|
image_output: Optional[str] = None,
|
|
sanitize: Optional[bool] = None,
|
|
):
|
|
self.outlines = extract_pdf_outlines(binary if binary is not None else filepath)
|
|
|
|
if not self.api_url:
|
|
raise RuntimeError(
|
|
"[OpenDataLoader] OPENDATALOADER_APISERVER is not configured. "
|
|
"Please start the opendataloader service and set the env var."
|
|
)
|
|
|
|
# Render page images locally — used by _make_line_tag() and crop().
|
|
# The image rendering stays on the RAGFlow host; only the Java conversion
|
|
# runs inside the opendataloader service container.
|
|
try:
|
|
if binary is not None:
|
|
src = BytesIO(binary) if isinstance(binary, (bytes, bytearray)) else binary
|
|
self.__images__(src, zoomin=1)
|
|
else:
|
|
self.__images__(str(filepath), zoomin=1)
|
|
except Exception as e:
|
|
self.logger.warning(f"[OpenDataLoader] render pages failed: {e}")
|
|
|
|
# Read PDF bytes for the multipart upload
|
|
if binary is not None:
|
|
pdf_bytes = binary if isinstance(binary, (bytes, bytearray)) else binary.getvalue()
|
|
else:
|
|
with open(filepath, "rb") as fh:
|
|
pdf_bytes = fh.read()
|
|
|
|
filename = Path(str(filepath)).name or "input.pdf"
|
|
|
|
if callback:
|
|
callback(0.1, f"[OpenDataLoader] Sending '{filename}' to service")
|
|
|
|
form_data: dict[str, str] = {}
|
|
if hybrid:
|
|
form_data["hybrid"] = hybrid
|
|
if image_output:
|
|
form_data["image_output"] = image_output
|
|
if sanitize is not None:
|
|
form_data["sanitize"] = "true" if sanitize else "false"
|
|
|
|
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
|
|
last_exc: Exception | None = None
|
|
for attempt in range(1, 4):
|
|
try:
|
|
self.logger.info(f"[OpenDataLoader] POST {self.api_url}/file_parse for '{filename}' (attempt {attempt})")
|
|
resp = requests.post(
|
|
url=f"{self.api_url}/file_parse",
|
|
files={"file": (filename, pdf_bytes, "application/pdf")},
|
|
data=form_data,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
result = resp.json()
|
|
break
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
self.logger.warning(f"[OpenDataLoader] attempt {attempt} failed: {exc}")
|
|
else:
|
|
raise RuntimeError(f"[OpenDataLoader] service call failed after 3 attempts: {last_exc}") from last_exc
|
|
|
|
if callback:
|
|
callback(0.7, "[OpenDataLoader] Processing response")
|
|
|
|
# Service response structure:
|
|
# {
|
|
# "json_doc": {...} | null, # structured parse tree (preferred)
|
|
# "md_text": "..." | null # markdown fallback when json_doc is absent
|
|
# }
|
|
json_doc = result.get("json_doc")
|
|
md_text = result.get("md_text")
|
|
|
|
sections: list[tuple[str, ...]] = []
|
|
tables: list = []
|
|
if json_doc is not None:
|
|
sections, tables = self._transfer_from_json(json_doc, parse_method=parse_method)
|
|
if not sections and md_text:
|
|
sections = self._sections_from_markdown(md_text, parse_method=parse_method)
|
|
|
|
if callback:
|
|
callback(1.0, f"[OpenDataLoader] Done. Sections: {len(sections)}, Tables: {len(tables)}")
|
|
|
|
return sections, tables
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
parser = OpenDataLoaderParser()
|
|
print("OpenDataLoader service reachable:", parser.check_installation())
|