Feat: optimize ingestion pipeline with preprocess (#13211)

### What problem does this PR solve?

Feat: optimize ingestion pipeline with preprocess

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Magicbook1108
2026-02-26 10:24:13 +08:00
committed by GitHub
parent b7eca981d4
commit 158503a1aa
6 changed files with 745 additions and 535 deletions

View File

@ -18,6 +18,7 @@ dependencies = [
"bio==1.7.1",
"boxsdk>=10.1.0",
"captcha>=0.7.1",
"chardet>=5.2.0,<6.0.0",
"cn2an==0.5.22",
"cohere==5.6.2",
"Crawl4AI>=0.4.0,<1.0.0",
@ -118,13 +119,13 @@ dependencies = [
"xpinyin==0.7.6",
"yfinance==0.2.65",
"zhipuai==2.0.1",
"peewee>=3.17.1,<4.0.0",
# following modules aren't necessary
# "nltk==3.9.1",
# "numpy>=1.26.0,<2.0.0",
# "openai>=1.45.0",
# "openpyxl>=3.1.0,<4.0.0",
# "pandas>=2.2.0,<3.0.0",
# "peewee==3.17.1",
# "pillow>=10.4.0,<13.0.0",
# "protobuf==5.27.2",
# "pymysql>=1.1.1,<2.0.0",

View File

@ -99,10 +99,15 @@ class Pdf(PdfParser):
title = ""
break
for j in range(3):
if _begin(self.boxes[i + j]["text"]):
next_idx = i + j
if next_idx >= len(self.boxes):
break
authors.append(self.boxes[i + j]["text"])
break
candidate = self.boxes[next_idx]["text"]
if _begin(candidate):
break
if "@" in candidate:
break
authors.append(candidate)
break
# get abstract
abstr = ""

View File

@ -69,11 +69,23 @@ class HierarchicalMerger(ProcessBase):
lines = [ln for ln in payload.split("\n") if ln]
else:
arr = from_upstream.chunks if from_upstream.output_format == "chunks" else from_upstream.json_result
lines = [o.get("text", "") for o in arr]
arr = arr or []
sections, section_images = [], []
for o in arr or []:
sections.append((o.get("text", ""), o.get("position_tag", "")))
section_images.append(o.get("img_id"))
lines = []
for o in arr:
if isinstance(o, dict):
raw_text = o.get("text")
position_tag = o.get("position_tag", "")
img_id = o.get("img_id")
else:
raw_text = o
position_tag = ""
img_id = None
txt = raw_text if isinstance(raw_text, str) else ("" if raw_text is None else str(raw_text))
lines.append(txt)
sections.append((txt, position_tag))
section_images.append(img_id)
matches = []
for txt in lines:

View File

@ -37,6 +37,7 @@ from rag.app.naive import Docx
from rag.flow.base import ProcessBase, ProcessParamBase
from rag.flow.parser.schema import ParserFromUpstream
from rag.llm.cv_model import Base as VLM
from rag.nlp import BULLET_PATTERN, bullets_category, docx_question_level, not_bullet
from rag.utils.base64_image import image2id
@ -223,10 +224,88 @@ class ParserParam(ProcessParamBase):
class Parser(ProcessBase):
component_name = "Parser"
def _pdf(self, name, blob):
@staticmethod
def _extract_word_title_lines(doc, to_page=100000):
lines = []
if not doc or not getattr(doc, "paragraphs", None):
return lines
pn = 0
bull = bullets_category([p.text for p in doc.paragraphs])
for p in doc.paragraphs:
if pn > to_page:
break
question_level, p_text = docx_question_level(p, bull)
lines.append((question_level, p_text))
for run in p.runs:
if "lastRenderedPageBreak" in run._element.xml:
pn += 1
continue
if "w:br" in run._element.xml and 'type="page"' in run._element.xml:
pn += 1
return lines
@staticmethod
def _extract_markdown_title_lines(sections):
lines = []
if not sections:
return lines
section_texts = []
for section in sections:
text = section[0] if isinstance(section, tuple) else section
if not isinstance(text, str):
continue
text = text.strip()
if text:
section_texts.append(text)
if not section_texts:
return lines
bull = bullets_category(section_texts)
if bull < 0:
return lines
bullet_patterns = BULLET_PATTERN[bull]
default_level = len(bullet_patterns) + 1
for text in section_texts:
level = default_level
for idx, pattern in enumerate(bullet_patterns, start=1):
if re.match(pattern, text) and not not_bullet(text):
level = idx
break
lines.append((level, text))
return lines
@staticmethod
def _extract_title_texts(lines):
normalized_lines = []
level_set = set()
for level, txt in lines or []:
if not isinstance(txt, str):
continue
txt = txt.strip()
if not txt:
continue
normalized_lines.append((level, txt))
level_set.add(level)
if not normalized_lines or not level_set:
return set()
sorted_levels = sorted(level_set)
h2_level = sorted_levels[1] if len(sorted_levels) > 1 else 1
h2_level = sorted_levels[-2] if h2_level == sorted_levels[-1] and len(sorted_levels) > 2 else h2_level
return {txt for level, txt in normalized_lines if level <= h2_level}
def _pdf(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.")
conf = self._param.setups["pdf"]
self.set_output("output_format", conf["output_format"])
abstract_enabled = kwargs.get("abstract", False)
author_enabled = kwargs.get("author", False)
raw_parse_method = conf.get("parse_method", "")
parser_model_name = None
@ -399,6 +478,69 @@ class Parser(ProcessBase):
elif layout == "table":
b["doc_type_kwd"] = "table"
# Get authors
if author_enabled:
def _begin(txt):
if not isinstance(txt, str):
return False
return re.match(
r"[0-9. 一、i]*(introduction|abstract|摘要|引言|keywords|key words|关键词|background|背景|目录|前言|contents)",
txt.lower().strip(),
)
i = 0
while i < min(32, len(bboxes) - 1):
b = bboxes[i]
i += 1
layout_type = b.get("layout_type", "")
layoutno = b.get("layoutno", "")
is_title = "title" in str(layout_type).lower() or "title" in str(layoutno).lower()
if not is_title:
continue
title_txt = b.get("text", "")
if _begin(title_txt):
break
for j in range(3):
next_idx = i + j
if next_idx >= len(bboxes):
break
candidate = bboxes[next_idx].get("text", "")
if _begin(candidate):
break
if isinstance(candidate, str) and "@" in candidate:
break
bboxes[next_idx]["author"] = True
break
# Get abstract
if abstract_enabled:
i = 0
abstract_idx = None
while i + 1 < min(32, len(bboxes)):
b = bboxes[i]
i += 1
txt = b.get("text", "")
if not isinstance(txt, str):
continue
txt = txt.lower().strip()
if re.match(r"(abstract|摘要)", txt):
if len(txt.split()) > 32 or len(txt) > 64:
abstract_idx = i - 1
break
next_txt = bboxes[i].get("text", "") if i < len(bboxes) else ""
if isinstance(next_txt, str):
next_txt = next_txt.lower().strip()
if len(next_txt.split()) > 32 or len(next_txt) > 64:
abstract_idx = i
i += 1
break
if abstract_idx is not None:
bboxes[abstract_idx]["abstract"] = True
if conf.get("output_format") == "json":
self.set_output("json", bboxes)
if conf.get("output_format") == "markdown":
@ -412,7 +554,7 @@ class Parser(ProcessBase):
mkdn += b.get("text", "") + "\n"
self.set_output("markdown", mkdn)
def _spreadsheet(self, name, blob):
def _spreadsheet(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.")
conf = self._param.setups["spreadsheet"]
self.set_output("output_format", conf["output_format"])
@ -497,7 +639,7 @@ class Parser(ProcessBase):
elif conf.get("output_format") == "markdown":
self.set_output("markdown", spreadsheet_parser.markdown(blob))
def _word(self, name, blob):
def _word(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document")
conf = self._param.setups["word"]
self.set_output("output_format", conf["output_format"])
@ -505,13 +647,18 @@ class Parser(ProcessBase):
if conf.get("output_format") == "json":
main_sections = docx_parser(name, binary=blob)
title_lines = self._extract_word_title_lines(getattr(docx_parser, "doc", None))
title_texts = self._extract_title_texts(title_lines)
sections = []
tbls = []
for text, image, html in main_sections:
sections.append((text, image))
section = {"text": text, "image": image}
text_key = text.strip() if isinstance(text, str) else ""
if text_key and text_key in title_texts:
section["title"] = True
sections.append(section)
tbls.append(((None, html), ""))
sections = [{"text": section[0], "image": section[1]} for section in sections if section]
sections.extend([{"text": tb, "image": None, "doc_type_kwd": "table"} for ((_, tb), _) in tbls])
self.set_output("json", sections)
@ -519,7 +666,8 @@ class Parser(ProcessBase):
markdown_text = docx_parser.to_markdown(name, binary=blob)
self.set_output("markdown", markdown_text)
def _slides(self, name, blob):
def _slides(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a PowerPoint Document")
conf = self._param.setups["slides"]
@ -584,7 +732,7 @@ class Parser(ProcessBase):
if conf.get("output_format") == "json":
self.set_output("json", sections)
def _markdown(self, name, blob):
def _markdown(self, name, blob, **kwargs):
from functools import reduce
from rag.app.naive import Markdown as naive_markdown_parser
@ -605,11 +753,16 @@ class Parser(ProcessBase):
if conf.get("output_format") == "json":
json_results = []
title_lines = self._extract_markdown_title_lines(sections)
title_texts = self._extract_title_texts(title_lines)
for idx, (section_text, _) in enumerate(sections):
json_result = {
"text": section_text,
}
text_key = section_text.strip() if isinstance(section_text, str) else ""
if text_key and text_key in title_texts:
json_result["title"] = True
images = []
if section_images and len(section_images) > idx and section_images[idx] is not None:
@ -625,7 +778,7 @@ class Parser(ProcessBase):
else:
self.set_output("text", "\n".join([section_text for section_text, _ in sections]))
def _image(self, name, blob):
def _image(self, name, blob, **kwargs):
from deepdoc.vision import OCR
self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.")
@ -660,7 +813,7 @@ class Parser(ProcessBase):
}]
self.set_output("json", json_result)
def _audio(self, name, blob):
def _audio(self, name, blob, **kwargs):
import os
import tempfile
@ -679,7 +832,7 @@ class Parser(ProcessBase):
self.set_output("text", txt)
def _video(self, name, blob):
def _video(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on an video.")
conf = self._param.setups["video"]
@ -691,7 +844,7 @@ class Parser(ProcessBase):
self.set_output("text", txt)
def _email(self, name, blob):
def _email(self, name, blob, **kwargs):
self.callback(random.randint(1, 5) / 100.0, "Start to work on an email.")
email_content = {}
@ -857,7 +1010,10 @@ class Parser(ProcessBase):
for p_type, conf in self._param.setups.items():
if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []):
continue
await thread_pool_exec(function_map[p_type], name, blob)
call_kwargs = dict(kwargs)
call_kwargs.pop("name", None)
call_kwargs.pop("blob", None)
await thread_pool_exec(function_map[p_type], name, blob, **call_kwargs)
done = True
break

View File

@ -21,4 +21,6 @@ class ParserFromUpstream(BaseModel):
name: str
file: dict | None = Field(default=None)
abstract: bool = False
author: bool = False
model_config = ConfigDict(populate_by_name=True, extra="forbid")

1064
uv.lock generated

File diff suppressed because it is too large Load Diff