367 lines
12 KiB
Python
367 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Model-free PDF → markdown chapters for agent grep (text + images + tables)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
import fitz
|
|
import pdfplumber
|
|
|
|
# Repeated header/footer on most pages of this spec family
|
|
HEADER_FOOTER = re.compile(
|
|
r"^(RTL8169|2002/\d{2}/\d{2}|Rev\.\d+\.\d+|\d{1,3})$",
|
|
re.MULTILINE,
|
|
)
|
|
MAJOR_SECTION = re.compile(r"^(\d{1,2})\.\s+([A-Z][^\n]{2,})$")
|
|
TOC_LINE = re.compile(r"\.{4,}\s*\d+\s*$")
|
|
MIN_TABLE_ROWS = 2
|
|
MIN_TABLE_CELLS = 6
|
|
MIN_CELL_LEN_FOR_DEDUP = 4
|
|
SPACED_LETTERS = re.compile(r"^([A-Z](?: [A-Z]){1,}|[A-Z])$")
|
|
DEFAULT_PAGE_DPI = 150
|
|
|
|
|
|
def slugify(title: str) -> str:
|
|
s = re.sub(r"[^\w\s-]", "", title, flags=re.UNICODE)
|
|
s = re.sub(r"[-\s]+", "-", s.strip()).strip("-").lower()
|
|
return s[:80] or "section"
|
|
|
|
|
|
def clean_page_text(raw: str) -> str:
|
|
lines = []
|
|
for line in raw.splitlines():
|
|
t = line.strip()
|
|
if not t:
|
|
lines.append("")
|
|
continue
|
|
if HEADER_FOOTER.match(t):
|
|
continue
|
|
lines.append(line.rstrip())
|
|
text = "\n".join(lines)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def is_toc_page(text: str) -> bool:
|
|
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
|
|
if not lines:
|
|
return False
|
|
dotted = sum(1 for ln in lines if TOC_LINE.search(ln))
|
|
return dotted >= max(3, len(lines) // 4)
|
|
|
|
|
|
def _max_vertical_letter_run(text: str) -> int:
|
|
run = best = 0
|
|
for line in text.splitlines():
|
|
token = line.strip()
|
|
if len(token) == 1 and token.isalpha():
|
|
run += 1
|
|
best = max(best, run)
|
|
else:
|
|
run = 0
|
|
return best
|
|
|
|
|
|
def is_broken_page(text: str) -> bool:
|
|
"""Detect layout-heavy pages where get_text() yields vertical single letters."""
|
|
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
|
|
if len(lines) < 12:
|
|
return False
|
|
if _max_vertical_letter_run(text) >= 4:
|
|
return True
|
|
single = sum(1 for ln in lines if len(ln) == 1)
|
|
short = sum(1 for ln in lines if len(ln) <= 2)
|
|
normal = sum(1 for ln in lines if len(ln) >= 10 or (" " in ln and len(ln) >= 5))
|
|
# Bitfield/diagram pages: many one-letter lines stacked vertically
|
|
if single >= 20:
|
|
return True
|
|
if single >= 12 and single > normal * 0.45:
|
|
return True
|
|
# Mostly fragmented tokens, little readable prose
|
|
if short >= 25 and short > normal and short / len(lines) >= 0.35:
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_layout_table(rows: list[list]) -> bool:
|
|
"""Reject bitfield/diagram tables made of spaced letters and empty cells."""
|
|
cells = [(c or "").strip() for row in rows for c in row]
|
|
non_empty = [c for c in cells if c]
|
|
if not non_empty:
|
|
return True
|
|
if len(rows) < MIN_TABLE_ROWS:
|
|
return True
|
|
filled = len(non_empty)
|
|
if filled < MIN_TABLE_CELLS:
|
|
return True
|
|
short = sum(1 for c in non_empty if len(c) <= 3)
|
|
spaced = sum(1 for c in non_empty if SPACED_LETTERS.match(c))
|
|
if short / filled > 0.55 or spaced / filled > 0.25:
|
|
return True
|
|
# Single-row "tables" from layout heuristics (one data row + header)
|
|
if len(rows) == 2 and filled <= 4:
|
|
return True
|
|
return False
|
|
|
|
|
|
def table_overlaps_page_text(rows: list[list], page_text: str) -> bool:
|
|
"""Skip markdown table when its cell text is already on the page."""
|
|
page_norm = re.sub(r"\s+", " ", page_text.lower())
|
|
hits = 0
|
|
checked = 0
|
|
for row in rows:
|
|
for cell in row:
|
|
c = (cell or "").strip().replace("\n", " ")
|
|
if len(c) < MIN_CELL_LEN_FOR_DEDUP:
|
|
continue
|
|
checked += 1
|
|
if c.lower() in page_norm:
|
|
hits += 1
|
|
continue
|
|
# Long descriptions: match if a substantial prefix is present
|
|
if len(c) >= 40 and c[:40].lower() in page_norm:
|
|
hits += 1
|
|
return checked >= 3 and hits / checked >= 0.5
|
|
|
|
|
|
def table_to_markdown(table: list[list], page_text: str) -> str | None:
|
|
rows = []
|
|
for row in table:
|
|
cells = [(c or "").strip().replace("\n", " ") for c in row]
|
|
if any(cells):
|
|
rows.append(cells)
|
|
if len(rows) < MIN_TABLE_ROWS:
|
|
return None
|
|
ncol = max(len(r) for r in rows)
|
|
if ncol < 2:
|
|
return None
|
|
if is_layout_table(rows):
|
|
return None
|
|
if table_overlaps_page_text(rows, page_text):
|
|
return None
|
|
filled = sum(1 for r in rows for c in r if c)
|
|
if filled < MIN_TABLE_CELLS:
|
|
return None
|
|
width = ncol
|
|
norm = [r + [""] * (width - len(r)) for r in rows]
|
|
header, *body = norm
|
|
if not any(header):
|
|
header = [f"col{i+1}" for i in range(width)]
|
|
body = norm
|
|
lines = [
|
|
"| " + " | ".join(header) + " |",
|
|
"| " + " | ".join("---" for _ in range(width)) + " |",
|
|
]
|
|
for row in body:
|
|
lines.append("| " + " | ".join(row) + " |")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_images(doc: fitz.Document, page_index: int, assets_dir: Path, prefix: str) -> list[str]:
|
|
paths: list[str] = []
|
|
page = doc[page_index]
|
|
for img_index, img in enumerate(page.get_images(full=True)):
|
|
xref = img[0]
|
|
try:
|
|
info = doc.extract_image(xref)
|
|
except Exception:
|
|
continue
|
|
ext = info.get("ext", "png")
|
|
name = f"{prefix}-p{page_index + 1:03d}-img{img_index + 1}.{ext}"
|
|
out = assets_dir / name
|
|
out.write_bytes(info["image"])
|
|
paths.append(f"assets/{name}")
|
|
return paths
|
|
|
|
|
|
def render_page_png(
|
|
doc: fitz.Document, page_index: int, assets_dir: Path, prefix: str, dpi: int
|
|
) -> str:
|
|
page = doc[page_index]
|
|
zoom = dpi / 72
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom))
|
|
name = f"{prefix}-p{page_index + 1:03d}-page.png"
|
|
out = assets_dir / name
|
|
pix.save(str(out))
|
|
return f"assets/{name}"
|
|
|
|
|
|
def find_major_section(line: str) -> tuple[str, str] | None:
|
|
m = MAJOR_SECTION.match(line.strip())
|
|
if not m:
|
|
return None
|
|
num, title = m.group(1), m.group(2).strip()
|
|
if TOC_LINE.search(title) or "." * 3 in title:
|
|
return None
|
|
if title.endswith(":"):
|
|
return None
|
|
if len(title) > 70:
|
|
return None
|
|
return num, title
|
|
|
|
|
|
def should_start_chapter(num: str, current: dict | None) -> bool:
|
|
if current is None:
|
|
return True
|
|
if current.get("num") == "00":
|
|
return True
|
|
try:
|
|
return int(num) > int(current["num"])
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def convert(
|
|
pdf_path: Path,
|
|
out_dir: Path,
|
|
*,
|
|
extract_tables: bool = False,
|
|
page_dpi: int = DEFAULT_PAGE_DPI,
|
|
single_file: bool = False,
|
|
) -> None:
|
|
if out_dir.exists():
|
|
shutil.rmtree(out_dir)
|
|
assets_dir = out_dir / "assets"
|
|
chapters_dir = out_dir / "chapters"
|
|
assets_dir.mkdir(parents=True)
|
|
if not single_file:
|
|
chapters_dir.mkdir(parents=True)
|
|
|
|
doc = fitz.open(pdf_path)
|
|
stem = pdf_path.stem
|
|
|
|
sections: list[dict] = []
|
|
current: dict | None = None
|
|
page_png_count = 0
|
|
|
|
with pdfplumber.open(pdf_path) as plumber:
|
|
for page_index in range(doc.page_count):
|
|
raw = doc[page_index].get_text()
|
|
text = clean_page_text(raw)
|
|
if is_toc_page(text):
|
|
continue
|
|
|
|
img_refs = extract_images(doc, page_index, assets_dir, stem)
|
|
page_png_ref: str | None = None
|
|
if is_broken_page(text):
|
|
page_png_ref = render_page_png(doc, page_index, assets_dir, stem, page_dpi)
|
|
page_png_count += 1
|
|
text = ""
|
|
|
|
tables_md: list[str] = []
|
|
if extract_tables:
|
|
page = plumber.pages[page_index]
|
|
for table in page.extract_tables() or []:
|
|
md = table_to_markdown(table, text or raw)
|
|
if md:
|
|
tables_md.append(md)
|
|
|
|
page_block = [f"<!-- page {page_index + 1} -->"]
|
|
if text:
|
|
page_block.append(text)
|
|
if page_png_ref:
|
|
page_block.append(f"")
|
|
if tables_md:
|
|
page_block.append("\n\n".join(tables_md))
|
|
if img_refs:
|
|
page_block.append("\n".join(f"" for p in img_refs))
|
|
page_content = "\n\n".join(x for x in page_block if x)
|
|
|
|
section_hit = None
|
|
source = text or raw
|
|
for line in source.splitlines():
|
|
hit = find_major_section(line)
|
|
if hit:
|
|
section_hit = hit
|
|
|
|
if section_hit and should_start_chapter(section_hit[0], current):
|
|
num, title = section_hit
|
|
if current:
|
|
sections.append(current)
|
|
current = {
|
|
"num": num,
|
|
"title": title,
|
|
"slug": f"{int(num):02d}-{slugify(title)}",
|
|
"pages": [],
|
|
}
|
|
|
|
if current is None:
|
|
current = {
|
|
"num": "00",
|
|
"title": "front-matter",
|
|
"slug": "00-front-matter",
|
|
"pages": [],
|
|
}
|
|
current["pages"].append(page_content)
|
|
|
|
if current:
|
|
sections.append(current)
|
|
doc.close()
|
|
|
|
if single_file:
|
|
full_parts = []
|
|
for sec in sections:
|
|
full_parts.append(f"# {sec['num']}. {sec['title']}\n\n")
|
|
full_parts.append("\n\n---\n\n".join(sec["pages"]))
|
|
full_parts.append("\n\n")
|
|
(out_dir / f"{stem}.md").write_text("".join(full_parts), encoding="utf-8")
|
|
else:
|
|
index_lines = [f"# {stem}\n", f"Source: `{pdf_path.name}`\n", "## Chapters\n"]
|
|
for sec in sections:
|
|
fname = f"{sec['slug']}.md"
|
|
body = "\n\n---\n\n".join(sec["pages"])
|
|
chapter_path = chapters_dir / fname
|
|
header = f"# {sec['num']}. {sec['title']}\n\n"
|
|
chapter_path.write_text(header + body + "\n", encoding="utf-8")
|
|
index_lines.append(f"- [{sec['num']}. {sec['title']}](chapters/{fname})\n")
|
|
(out_dir / "INDEX.md").write_text("".join(index_lines), encoding="utf-8")
|
|
|
|
print(f"Wrote {out_dir}")
|
|
if single_file:
|
|
print(f" full md: {out_dir / f'{stem}.md'}")
|
|
else:
|
|
print(f" chapters: {len(sections)}")
|
|
print(f" index: {out_dir / 'INDEX.md'}")
|
|
print(f" images: {len(list(assets_dir.glob('*')))}")
|
|
print(f" page png: {page_png_count}")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("pdf", type=Path)
|
|
parser.add_argument("-o", "--output", type=Path, default=None)
|
|
parser.add_argument(
|
|
"--tables",
|
|
action="store_true",
|
|
help="extract pdfplumber tables into markdown (off by default)",
|
|
)
|
|
parser.add_argument(
|
|
"--page-dpi",
|
|
type=int,
|
|
default=DEFAULT_PAGE_DPI,
|
|
metavar="N",
|
|
help=f"DPI for full-page PNG fallback on broken layout pages (default {DEFAULT_PAGE_DPI})",
|
|
)
|
|
parser.add_argument(
|
|
"--single-file",
|
|
action="store_true",
|
|
help="write one {stem}.md instead of INDEX.md + chapters/",
|
|
)
|
|
args = parser.parse_args()
|
|
out = args.output or Path("out") / args.pdf.stem
|
|
convert(
|
|
args.pdf.resolve(),
|
|
out.resolve(),
|
|
extract_tables=args.tables,
|
|
page_dpi=args.page_dpi,
|
|
single_file=args.single_file,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|