mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-28 11:43:06 +08:00
Declare doc_id, filename, mime_type, and size as separate outputs on the Document Generation component so downstream nodes (e.g., the Code component) can consume them via the variable picker. The existing download JSON blob is preserved unchanged for the Message component's download-chip rendering. ### What problem does this PR solve? The Document Generation component previously exposed only a single `download` output — a JSON-encoded blob containing the file's `doc_id`, `filename`, `mime_type`, `size`, and base64 payload. On top of that, the variable picker actively hides this `download` entry from every consumer except the Message component (because the embedded base64 is too heavy to splat into arbitrary downstream nodes). The combined effect: users wiring the Doc Generator's output into a Code component had no way to retrieve basic file info such as `file_name` or `doc_id` from the picker, blocking workflows that need to post-process the generated file (e.g., registering it elsewhere, custom delivery, follow-up API calls). This PR declares `doc_id`, `filename`, `mime_type`, and `size` as **discrete outputs** on the Document Generation component, alongside the existing `download` blob. The new fields: - Appear in the variable picker for **all** downstream nodes, including the Code component, so users can bind them directly to script arguments. - Are cheap scalars only — no base64 payload leaks into other components. - Leave the existing `download` JSON blob completely untouched, so the Message component's download-chip rendering (which parses that blob via `_is_download_info`) keeps working with no behavior change. Changes: - `agent/component/docs_generator.py` — declare the four new outputs in `DocGeneratorParam` and emit them via `set_output(...)` in `_invoke`. - `web/src/pages/agent/constant/index.tsx` — extend `initialDocGeneratorValues.outputs` with the new keys. - `web/src/pages/agent/form/doc-generator-form/index.tsx` — mirror the new outputs in the zod schema so the form is valid. No changes needed to the picker's existing `download`-hiding filter — it matches only on the literal output name `download`, so the new metadata entries fall through naturally. Reported in: https://github.com/infiniflow/ragflow/issues/14461. ### Type of change - [x] New Feature (non-breaking change which adds functionality)
643 lines
23 KiB
Python
643 lines
23 KiB
Python
import base64
|
|
import logging
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import tempfile
|
|
from abc import ABC
|
|
from datetime import datetime
|
|
from functools import partial
|
|
from io import BytesIO
|
|
from xml.sax.saxutils import escape
|
|
|
|
from agent.component.base import ComponentParamBase
|
|
from api.utils.api_utils import timeout
|
|
from common import settings
|
|
from common.misc_utils import get_uuid
|
|
from .message import Message
|
|
|
|
|
|
def sanitize_filename(name: str, extension: str) -> str:
|
|
if not name:
|
|
return f"file.{extension}"
|
|
|
|
name = str(name).strip()
|
|
name = re.sub(r'[\\/\x00-\x1f\?\#\%\*\:\|\<\>"]', " ", name)
|
|
name = re.sub(r"\s+", " ", name).strip(" .")
|
|
|
|
if not name:
|
|
return f"file.{extension}"
|
|
|
|
base, _ = os.path.splitext(name)
|
|
base = base[:180].rstrip() or "file"
|
|
return f"{base}.{extension}"
|
|
|
|
|
|
class DocGeneratorParam(ComponentParamBase):
|
|
"""
|
|
Define the Docs Generator component parameters.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.output_format = "pdf" # pdf, docx, txt, markdown, html
|
|
self.content = ""
|
|
self.filename = ""
|
|
self.header_text = ""
|
|
self.footer_text = ""
|
|
self.watermark_text = ""
|
|
self.add_page_numbers = True
|
|
self.add_timestamp = True
|
|
self.include_download_info_in_content = False
|
|
self.font_size = 12
|
|
self.outputs = {
|
|
"doc_id": {"value": "", "type": "string"},
|
|
"filename": {"value": "", "type": "string"},
|
|
"mime_type": {"value": "", "type": "string"},
|
|
"size": {"value": 0, "type": "number"},
|
|
"download": {"value": "", "type": "string"},
|
|
}
|
|
|
|
def check(self):
|
|
self.check_empty(self.content, "[DocGenerator] Content")
|
|
self.check_valid_value(
|
|
self.output_format,
|
|
"[DocGenerator] Output format",
|
|
["pdf", "docx", "txt", "markdown", "html"],
|
|
)
|
|
self.check_positive_number(self.font_size, "[DocGenerator] Font size")
|
|
if self.font_size < 12:
|
|
raise ValueError("[DocGenerator] Font size must be greater than or equal to 12")
|
|
|
|
|
|
class DocGenerator(Message, ABC):
|
|
component_name = "DocGenerator"
|
|
_default_output_directory = os.path.join(tempfile.gettempdir(), "doc_outputs")
|
|
_overlay_margin = 36
|
|
_overlay_font_size = 9
|
|
_pdf_main_font = "Noto Sans CJK SC"
|
|
_pdf_cjk_font = "Noto Sans CJK SC"
|
|
_pdf_overlay_font = "STSong-Light"
|
|
|
|
def get_input_form(self) -> dict[str, dict]:
|
|
return {
|
|
"content": {
|
|
"name": "Content",
|
|
"type": "text",
|
|
}
|
|
}
|
|
|
|
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10 * 60)))
|
|
def _invoke(self, **kwargs):
|
|
file_path = None
|
|
try:
|
|
content = self._resolve_content(kwargs)
|
|
output_format = self._param.output_format or "pdf"
|
|
|
|
try:
|
|
if output_format == "pdf":
|
|
file_path, file_bytes = self._generate_pdf(content)
|
|
mime_type = "application/pdf"
|
|
elif output_format == "docx":
|
|
file_path, file_bytes = self._generate_docx(content)
|
|
mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
elif output_format == "txt":
|
|
file_path, file_bytes = self._generate_txt(content)
|
|
mime_type = "text/plain"
|
|
elif output_format == "markdown":
|
|
file_path, file_bytes = self._generate_markdown(content)
|
|
mime_type = "text/markdown"
|
|
elif output_format == "html":
|
|
file_path, file_bytes = self._generate_html(content)
|
|
mime_type = "text/html"
|
|
else:
|
|
raise Exception(f"Unsupported output format: {output_format}")
|
|
|
|
filename = os.path.basename(file_path)
|
|
if not file_bytes:
|
|
raise Exception("Document file is empty")
|
|
|
|
file_size = len(file_bytes)
|
|
file_base64 = base64.b64encode(file_bytes).decode("utf-8")
|
|
doc_id = get_uuid()
|
|
settings.STORAGE_IMPL.put(self._canvas.get_tenant_id(), doc_id, file_bytes)
|
|
|
|
logging.info(
|
|
"Successfully generated %s: %s (Size: %s bytes)",
|
|
output_format.upper(),
|
|
filename,
|
|
file_size,
|
|
)
|
|
|
|
download_info = {
|
|
"doc_id": doc_id,
|
|
"filename": filename,
|
|
"mime_type": mime_type,
|
|
"size": file_size,
|
|
"base64": file_base64,
|
|
"include_download_info_in_content": self._param.include_download_info_in_content,
|
|
}
|
|
self.set_output("doc_id", doc_id)
|
|
self.set_output("filename", filename)
|
|
self.set_output("mime_type", mime_type)
|
|
self.set_output("size", file_size)
|
|
self.set_output("download", json.dumps(download_info))
|
|
return download_info
|
|
|
|
except Exception as e:
|
|
logging.exception("Error generating %s document", output_format)
|
|
self.set_output("_ERROR", f"Document generation failed: {str(e)}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.exception("Error in DocGenerator._invoke")
|
|
self.set_output("_ERROR", f"Document generation failed: {str(e)}")
|
|
raise
|
|
finally:
|
|
if file_path and os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
def _resolve_content(self, kwargs: dict) -> str:
|
|
content = self._param.content or kwargs.get("content", "") or ""
|
|
logging.info("Starting document generation, content length: %s chars", len(content))
|
|
|
|
if content:
|
|
def _replace_variable(match_obj: re.Match[str]) -> str:
|
|
match = match_obj.group(1)
|
|
try:
|
|
var_value = self._canvas.get_variable_value(match)
|
|
if var_value is None:
|
|
return ""
|
|
if isinstance(var_value, partial):
|
|
resolved_content = ""
|
|
for chunk in var_value():
|
|
resolved_content += chunk
|
|
return resolved_content
|
|
return self._stringify_message_value(var_value, fallback_to_str=True)
|
|
except Exception as e:
|
|
logging.warning("Error resolving variable %s: %s", match, str(e))
|
|
return f"[ERROR: {str(e)}]"
|
|
|
|
content = re.sub(
|
|
self.variable_ref_patt,
|
|
_replace_variable,
|
|
content,
|
|
flags=re.DOTALL,
|
|
)
|
|
|
|
return content
|
|
|
|
def _get_output_directory(self) -> str:
|
|
os.makedirs(self._default_output_directory, exist_ok=True)
|
|
return self._default_output_directory
|
|
|
|
def _build_output_filename(self, output_format: str) -> str:
|
|
import uuid
|
|
|
|
if self._param.filename:
|
|
return sanitize_filename(self._param.filename, output_format.lower())
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return f"document_{timestamp}_{uuid.uuid4().hex[:8]}.{output_format}"
|
|
|
|
def _get_timestamp_text(self) -> str:
|
|
return f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
|
|
|
|
def _write_bytes_output(self, content: bytes, extension: str) -> tuple[str, bytes]:
|
|
output_directory = self._get_output_directory()
|
|
filename = self._build_output_filename(extension)
|
|
file_path = os.path.join(output_directory, filename)
|
|
with open(file_path, "wb") as f:
|
|
f.write(content)
|
|
return file_path, content
|
|
|
|
def _build_markdown_source(self, content: str, include_timestamp_in_body: bool = False) -> str:
|
|
if not (include_timestamp_in_body and self._param.add_timestamp):
|
|
return content
|
|
return f"{self._get_timestamp_text()}\n\n{content}"
|
|
|
|
def _get_heading_sizes(self) -> tuple[int, int, int]:
|
|
base = int(self._param.font_size)
|
|
return base + 6, base + 4, base + 2
|
|
|
|
def _generate_pandoc_binary_output(
|
|
self,
|
|
content: str,
|
|
target_format: str,
|
|
extension: str,
|
|
include_timestamp_in_body: bool = False,
|
|
extra_args: list[str] | None = None,
|
|
) -> tuple[str, bytes]:
|
|
import pypandoc
|
|
|
|
output_directory = self._get_output_directory()
|
|
filename = self._build_output_filename(extension)
|
|
file_path = os.path.join(output_directory, filename)
|
|
markdown_content = self._build_markdown_source(
|
|
content,
|
|
include_timestamp_in_body=include_timestamp_in_body,
|
|
)
|
|
|
|
pypandoc.convert_text(
|
|
markdown_content,
|
|
to=target_format,
|
|
format="markdown",
|
|
outputfile=file_path,
|
|
extra_args=extra_args or [],
|
|
)
|
|
|
|
with open(file_path, "rb") as f:
|
|
file_bytes = f.read()
|
|
|
|
return file_path, file_bytes
|
|
|
|
def _generate_pandoc_text_output(
|
|
self,
|
|
content: str,
|
|
target_format: str,
|
|
extension: str,
|
|
include_timestamp_in_body: bool = True,
|
|
) -> tuple[str, bytes]:
|
|
import pypandoc
|
|
|
|
markdown_content = self._build_markdown_source(
|
|
content,
|
|
include_timestamp_in_body=include_timestamp_in_body,
|
|
)
|
|
converted_content = pypandoc.convert_text(
|
|
markdown_content,
|
|
to=target_format,
|
|
format="markdown",
|
|
)
|
|
return self._write_bytes_output(converted_content.encode("utf-8"), extension)
|
|
|
|
def _select_pdf_engine(self) -> str:
|
|
if shutil.which("xelatex"):
|
|
return "xelatex"
|
|
raise Exception("No PDF engine found. Install xelatex.")
|
|
|
|
def _get_pdf_font_args(self) -> list[str]:
|
|
return [
|
|
"-V",
|
|
f"mainfont={self._pdf_main_font}",
|
|
"-V",
|
|
f"CJKmainfont={self._pdf_cjk_font}",
|
|
]
|
|
|
|
def _get_pdf_overlay_font_name(self) -> str:
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
|
|
|
|
try:
|
|
pdfmetrics.getFont(self._pdf_overlay_font)
|
|
except KeyError:
|
|
pdfmetrics.registerFont(UnicodeCIDFont(self._pdf_overlay_font))
|
|
|
|
return self._pdf_overlay_font
|
|
|
|
def _build_pdf_heading_overrides(self) -> str:
|
|
font_size = int(self._param.font_size)
|
|
leading = round(font_size * 1.2, 1)
|
|
h1_size, h2_size, h3_size = self._get_heading_sizes()
|
|
h1_leading = round(h1_size * 1.2, 1)
|
|
h2_leading = round(h2_size * 1.2, 1)
|
|
h3_leading = round(h3_size * 1.2, 1)
|
|
|
|
return rf"""
|
|
\makeatletter
|
|
\renewcommand\normalsize{{
|
|
\@setfontsize\normalsize{{{font_size}pt}}{{{leading}pt}}
|
|
\abovedisplayskip 12pt plus 3pt minus 7pt
|
|
\abovedisplayshortskip \z@ plus 3pt
|
|
\belowdisplayshortskip 6.5pt plus 3.5pt minus 3pt
|
|
\belowdisplayskip \abovedisplayskip
|
|
\let\@listi\@listI
|
|
}}
|
|
\normalsize
|
|
\renewcommand\section{{\@startsection{{section}}{{1}}{{\z@}}{{-3.5ex \@plus -1ex \@minus -.2ex}}{{2.3ex \@plus .2ex}}{{\normalfont\fontsize{{{h1_size}pt}}{{{h1_leading}pt}}\selectfont\bfseries}}}}
|
|
\renewcommand\subsection{{\@startsection{{subsection}}{{2}}{{\z@}}{{-3.25ex\@plus -1ex \@minus -.2ex}}{{1.5ex \@plus .2ex}}{{\normalfont\fontsize{{{h2_size}pt}}{{{h2_leading}pt}}\selectfont\bfseries}}}}
|
|
\renewcommand\subsubsection{{\@startsection{{subsubsection}}{{3}}{{\z@}}{{-3.25ex\@plus -1ex \@minus -.2ex}}{{1.5ex \@plus .2ex}}{{\normalfont\fontsize{{{h3_size}pt}}{{{h3_leading}pt}}\selectfont\bfseries}}}}
|
|
\makeatother
|
|
""".strip()
|
|
|
|
def _write_temp_tex(self, content: str) -> str:
|
|
output_directory = self._get_output_directory()
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w",
|
|
encoding="utf-8",
|
|
suffix=".tex",
|
|
dir=output_directory,
|
|
delete=False,
|
|
) as f:
|
|
f.write(content)
|
|
return f.name
|
|
|
|
def _should_apply_pdf_overlay(self) -> bool:
|
|
return any(
|
|
[
|
|
self._param.header_text,
|
|
self._param.footer_text,
|
|
self._param.watermark_text,
|
|
self._param.add_page_numbers,
|
|
self._param.add_timestamp,
|
|
]
|
|
)
|
|
|
|
def _build_pdf_overlay_page(self, width: float, height: float, page_number: int):
|
|
if not self._should_apply_pdf_overlay():
|
|
return None
|
|
|
|
from pypdf import PdfReader
|
|
from reportlab.lib.colors import Color
|
|
from reportlab.pdfgen import canvas as pdf_canvas
|
|
|
|
buffer = BytesIO()
|
|
overlay = pdf_canvas.Canvas(buffer, pagesize=(width, height))
|
|
overlay_font = self._get_pdf_overlay_font_name()
|
|
|
|
if self._param.watermark_text:
|
|
overlay.saveState()
|
|
if hasattr(overlay, "setFillAlpha"):
|
|
overlay.setFillAlpha(0.15)
|
|
overlay.setFillColor(Color(0.6, 0.6, 0.6))
|
|
overlay.setFont(overlay_font, 48)
|
|
overlay.translate(width / 2, height / 2)
|
|
overlay.rotate(45)
|
|
overlay.drawCentredString(0, 0, self._param.watermark_text)
|
|
overlay.restoreState()
|
|
|
|
overlay.setFont(overlay_font, self._overlay_font_size)
|
|
overlay.setFillColor(Color(0.35, 0.35, 0.35))
|
|
|
|
if self._param.header_text:
|
|
overlay.drawString(
|
|
self._overlay_margin,
|
|
height - self._overlay_margin + 8,
|
|
self._param.header_text,
|
|
)
|
|
|
|
if self._param.footer_text:
|
|
overlay.drawString(
|
|
self._overlay_margin,
|
|
self._overlay_margin - 8,
|
|
self._param.footer_text,
|
|
)
|
|
|
|
if self._param.add_timestamp:
|
|
overlay.drawCentredString(
|
|
width / 2,
|
|
self._overlay_margin - 8,
|
|
self._get_timestamp_text(),
|
|
)
|
|
|
|
if self._param.add_page_numbers:
|
|
overlay.drawRightString(
|
|
width - self._overlay_margin,
|
|
self._overlay_margin - 8,
|
|
f"Page {page_number}",
|
|
)
|
|
|
|
overlay.save()
|
|
buffer.seek(0)
|
|
return PdfReader(buffer).pages[0]
|
|
|
|
def _apply_pdf_overlay(self, file_path: str) -> tuple[str, bytes]:
|
|
from pypdf import PdfReader, PdfWriter
|
|
|
|
if not self._should_apply_pdf_overlay():
|
|
with open(file_path, "rb") as f:
|
|
file_bytes = f.read()
|
|
return file_path, file_bytes
|
|
|
|
reader = PdfReader(file_path)
|
|
writer = PdfWriter()
|
|
|
|
for page_number, page in enumerate(reader.pages, start=1):
|
|
overlay_page = self._build_pdf_overlay_page(
|
|
float(page.mediabox.width),
|
|
float(page.mediabox.height),
|
|
page_number,
|
|
)
|
|
if overlay_page is not None:
|
|
page.merge_page(overlay_page)
|
|
writer.add_page(page)
|
|
|
|
temp_file = f"{file_path}.overlay"
|
|
with open(temp_file, "wb") as f:
|
|
writer.write(f)
|
|
|
|
os.replace(temp_file, file_path)
|
|
with open(file_path, "rb") as f:
|
|
file_bytes = f.read()
|
|
return file_path, file_bytes
|
|
|
|
def _clear_docx_container(self, container):
|
|
element = container._element
|
|
for child in list(element):
|
|
element.remove(child)
|
|
|
|
def _append_docx_field(self, run, instruction: str):
|
|
from docx.oxml import OxmlElement
|
|
|
|
begin = OxmlElement("w:fldChar")
|
|
begin.set(run.part.element.nsmap["w"] and "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}fldCharType", "begin")
|
|
|
|
instr = OxmlElement("w:instrText")
|
|
instr.set("{http://www.w3.org/XML/1998/namespace}space", "preserve")
|
|
instr.text = instruction
|
|
|
|
end = OxmlElement("w:fldChar")
|
|
end.set(run.part.element.nsmap["w"] and "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}fldCharType", "end")
|
|
|
|
run._r.append(begin)
|
|
run._r.append(instr)
|
|
run._r.append(end)
|
|
|
|
def _add_docx_watermark(self, section):
|
|
if not self._param.watermark_text:
|
|
return
|
|
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
from docx.oxml import parse_xml
|
|
|
|
header = section.header
|
|
paragraph = header.add_paragraph()
|
|
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = paragraph.add_run()
|
|
watermark_xml = parse_xml(
|
|
rf"""
|
|
<w:pict
|
|
xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
xmlns:v="urn:schemas-microsoft-com:vml"
|
|
xmlns:o="urn:schemas-microsoft-com:office:office">
|
|
<v:shape id="PowerPlusWaterMarkObject"
|
|
o:spid="_x0000_s2049"
|
|
type="#_x0000_t136"
|
|
style="position:absolute;
|
|
margin-left:0;
|
|
margin-top:0;
|
|
width:468pt;
|
|
height:117pt;
|
|
rotation:315;
|
|
z-index:-251654144;
|
|
mso-wrap-edited:f;
|
|
mso-position-horizontal:center;
|
|
mso-position-horizontal-relative:margin;
|
|
mso-position-vertical:center;
|
|
mso-position-vertical-relative:margin"
|
|
fillcolor="#d9d9d9"
|
|
stroked="f">
|
|
<v:fill opacity="0.18"/>
|
|
<v:textpath on="t" style="font-family:"Calibri";font-size:1pt" string="{escape(self._param.watermark_text)}"/>
|
|
</v:shape>
|
|
</w:pict>
|
|
"""
|
|
)
|
|
run._r.append(watermark_xml)
|
|
|
|
def _normalize_docx_section_geometry(self, section, default_section):
|
|
for attr in ("page_width", "left_margin", "right_margin"):
|
|
if getattr(section, attr) is None:
|
|
setattr(section, attr, getattr(default_section, attr))
|
|
|
|
def _get_docx_available_width(self, section):
|
|
page_width = section.page_width
|
|
left_margin = section.left_margin
|
|
right_margin = section.right_margin
|
|
|
|
if page_width is None or left_margin is None or right_margin is None:
|
|
raise ValueError("DOCX section geometry is incomplete after normalization.")
|
|
|
|
return page_width - left_margin - right_margin
|
|
|
|
def _decorate_docx(self, file_path: str) -> tuple[str, bytes]:
|
|
from docx import Document
|
|
from docx.enum.text import WD_TAB_ALIGNMENT
|
|
from docx.shared import Pt
|
|
|
|
document = Document(file_path)
|
|
default_section = Document().sections[0]
|
|
h1_size, h2_size, h3_size = self._get_heading_sizes()
|
|
|
|
style_map = {
|
|
"Normal": int(self._param.font_size),
|
|
"Heading 1": h1_size,
|
|
"Heading 2": h2_size,
|
|
"Heading 3": h3_size,
|
|
}
|
|
for style_name, size in style_map.items():
|
|
try:
|
|
document.styles[style_name].font.size = Pt(size)
|
|
except Exception:
|
|
continue
|
|
|
|
for section in document.sections:
|
|
self._normalize_docx_section_geometry(section, default_section)
|
|
available_width = self._get_docx_available_width(section)
|
|
|
|
header = section.header
|
|
header.is_linked_to_previous = False
|
|
self._clear_docx_container(header)
|
|
if self._param.header_text:
|
|
paragraph = header.add_paragraph()
|
|
paragraph.add_run(self._param.header_text)
|
|
|
|
self._add_docx_watermark(section)
|
|
|
|
footer = section.footer
|
|
footer.is_linked_to_previous = False
|
|
self._clear_docx_container(footer)
|
|
if any(
|
|
[
|
|
self._param.footer_text,
|
|
self._param.add_timestamp,
|
|
self._param.add_page_numbers,
|
|
]
|
|
):
|
|
paragraph = footer.add_paragraph()
|
|
paragraph.paragraph_format.tab_stops.add_tab_stop(
|
|
int(available_width // 2),
|
|
WD_TAB_ALIGNMENT.CENTER,
|
|
)
|
|
paragraph.paragraph_format.tab_stops.add_tab_stop(
|
|
int(available_width),
|
|
WD_TAB_ALIGNMENT.RIGHT,
|
|
)
|
|
|
|
if self._param.footer_text:
|
|
paragraph.add_run(self._param.footer_text)
|
|
|
|
if self._param.add_timestamp or self._param.add_page_numbers:
|
|
paragraph.add_run("\t")
|
|
|
|
if self._param.add_timestamp:
|
|
paragraph.add_run(self._get_timestamp_text())
|
|
|
|
if self._param.add_page_numbers:
|
|
paragraph.add_run("\t")
|
|
self._append_docx_field(paragraph.add_run(), " PAGE ")
|
|
|
|
document.save(file_path)
|
|
with open(file_path, "rb") as f:
|
|
file_bytes = f.read()
|
|
return file_path, file_bytes
|
|
|
|
def thoughts(self) -> str:
|
|
return f"Generating {self._param.output_format.upper()} document with markdown conversion..."
|
|
|
|
def _generate_pdf(self, content: str) -> tuple[str, bytes]:
|
|
try:
|
|
engine = self._select_pdf_engine()
|
|
header_path = self._write_temp_tex(self._build_pdf_heading_overrides())
|
|
try:
|
|
file_path, _ = self._generate_pandoc_binary_output(
|
|
content,
|
|
"pdf",
|
|
"pdf",
|
|
include_timestamp_in_body=False,
|
|
extra_args=[
|
|
"--standalone",
|
|
f"--pdf-engine={engine}",
|
|
f"--include-in-header={header_path}",
|
|
*self._get_pdf_font_args(),
|
|
],
|
|
)
|
|
finally:
|
|
if os.path.exists(header_path):
|
|
os.remove(header_path)
|
|
return self._apply_pdf_overlay(file_path)
|
|
except Exception as e:
|
|
raise Exception(f"PDF generation failed: {str(e)}")
|
|
|
|
def _generate_docx(self, content: str) -> tuple[str, bytes]:
|
|
try:
|
|
file_path, _ = self._generate_pandoc_binary_output(
|
|
content,
|
|
"docx",
|
|
"docx",
|
|
include_timestamp_in_body=False,
|
|
extra_args=["--standalone"],
|
|
)
|
|
return self._decorate_docx(file_path)
|
|
except Exception as e:
|
|
raise Exception(f"DOCX generation failed: {str(e)}")
|
|
|
|
def _generate_txt(self, content: str) -> tuple[str, bytes]:
|
|
try:
|
|
return self._generate_pandoc_text_output(content, "plain", "txt")
|
|
except Exception as e:
|
|
raise Exception(f"TXT generation failed: {str(e)}")
|
|
|
|
def _generate_markdown(self, content: str) -> tuple[str, bytes]:
|
|
try:
|
|
return self._generate_pandoc_text_output(content, "markdown", "md")
|
|
except Exception as e:
|
|
raise Exception(f"Markdown generation failed: {str(e)}")
|
|
|
|
def _generate_html(self, content: str) -> tuple[str, bytes]:
|
|
try:
|
|
return self._generate_pandoc_text_output(content, "html", "html")
|
|
except Exception as e:
|
|
raise Exception(f"HTML generation failed: {str(e)}")
|