Skip to content

Commit 356289e

Browse files
author
Kavyansh Chourasia
committed
Ingestion Pipeline: LLM JSON Extractor
1 parent afc0853 commit 356289e

File tree

2 files changed

+426
-0
lines changed

2 files changed

+426
-0
lines changed
Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
import base64
2+
import io
3+
import json
4+
import logging
5+
from typing import List, Dict, Any, Optional
6+
import os
7+
8+
from pdf2image import convert_from_path
9+
from PIL import Image
10+
from litellm import completion
11+
from ingestion_pipeline.base.text_extractor import TextExtractor
12+
13+
14+
class LLMJSONExtractor(TextExtractor):
15+
"""
16+
A text extractor that uses LLMs to extract text from PDF documents in JSON format.
17+
This implementation uses Azure OpenAI via litellm to process PDF pages as images
18+
and convert them into structured JSON format with sections.
19+
"""
20+
21+
# System prompt template for image analysis
22+
_SYSTEM_PROMPT = """You are an expert document transcriber.
23+
Your task is to accurately convert the uploaded document image into structured JSON format.
24+
The document may contain multiple columns, diagrams, tables, maps, math equations, and images.
25+
26+
You must analyze the document and extract it into a JSON array where each object represents a section with:
27+
- "section_title": The title or heading of the section (if no clear heading exists, create a descriptive title)
28+
- "section_content": The content of that section in markdown format
29+
30+
For the section content:
31+
- Convert regular text into markdown paragraphs
32+
- Format tables in markdown table format
33+
- Describe images, diagrams, and maps within ![description](image) syntax
34+
- Render mathematical equations using LaTeX notation within $$ for display equations or $ for inline equations
35+
- Preserve lists (numbered and bulleted) in appropriate markdown format
36+
- Maintain the document's original structure
37+
38+
Output ONLY valid JSON in the following format:
39+
[{
40+
"section_title": "Introduction",
41+
"section_content": "This is the introduction content in markdown..."
42+
}, {
43+
"section_title": "Method",
44+
"section_content": "This is the method section content..."
45+
}]
46+
47+
Do NOT wrap the output in any code block or add any extra commentary. The output must be PURE JSON content."""
48+
49+
# System prompt template for batch image analysis
50+
_BATCH_SYSTEM_PROMPT = """You are an expert document transcriber.
51+
Your task is to accurately convert the current document image into structured JSON format.
52+
The document may contain multiple columns, diagrams, tables, maps, math equations, and images.
53+
54+
The current image is part of a larger document. I'll provide you with the JSON transcription of previous pages
55+
to give you context. Focus on transcribing ONLY THE CURRENT IMAGE into JSON while maintaining
56+
consistency with the previous transcriptions.
57+
58+
You must analyze the current image and extract it into a JSON array where each object represents a section with:
59+
- "section_title": The title or heading of the section (if no clear heading exists, create a descriptive title)
60+
- "section_content": The content of that section in markdown format
61+
62+
For the section content:
63+
- Convert regular text into markdown paragraphs
64+
- Format tables in markdown table format
65+
- Describe images, diagrams, and maps within ![description](image) syntax
66+
- Render mathematical equations using LaTeX notation within $$ for display equations or $ for inline equations
67+
- Preserve lists (numbered and bulleted) in appropriate markdown format
68+
- Maintain the document's original structure
69+
70+
Output ONLY valid JSON in the following format for the current image:
71+
[{
72+
"section_title": "Section Title",
73+
"section_content": "Section content in markdown..."
74+
}]
75+
76+
Do NOT wrap the output in any code block or add any extra commentary. The output must be PURE JSON content."""
77+
78+
def __init__(
79+
self,
80+
api_key: Optional[str] = None,
81+
api_base: Optional[str] = None,
82+
api_version: Optional[str] = None,
83+
deployment_name: Optional[str] = None,
84+
azure_ad_token: Optional[str] = None,
85+
):
86+
"""
87+
Initialize the LLM-based JSON text extractor with Azure OpenAI credentials.
88+
89+
Args:
90+
api_key: Azure OpenAI API key
91+
api_base: Azure OpenAI API base URL
92+
api_version: Azure OpenAI API version
93+
deployment_name: Azure OpenAI deployment name
94+
azure_ad_token: Azure AD token for authentication (alternative to API key)
95+
"""
96+
self.api_key = api_key
97+
self.api_base = api_base
98+
self.api_version = api_version
99+
self.deployment_name = deployment_name
100+
self.azure_ad_token = azure_ad_token
101+
self.logger = logging.getLogger(__name__)
102+
103+
def _encode_image(self, image: Image.Image) -> str:
104+
"""
105+
Encode a PIL Image as base64 for API transmission.
106+
107+
Args:
108+
image: PIL Image object
109+
110+
Returns:
111+
Base64 encoded string of the image
112+
"""
113+
buffered = io.BytesIO()
114+
# Save as PNG for better quality
115+
image.save(buffered, format="PNG")
116+
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
117+
return img_str
118+
119+
def _process_page_with_llm(
120+
self,
121+
image: Image.Image,
122+
previous_pages_json: str = "",
123+
page_number: int = 1,
124+
total_pages: int = 1,
125+
document_structure_hint: str = "",
126+
) -> List[Dict[str, str]]:
127+
"""
128+
Process a single page image with the LLM to extract text in JSON format.
129+
130+
Args:
131+
image: PIL Image object of the page
132+
previous_pages_json: JSON from previous pages to provide context
133+
page_number: Current page number
134+
total_pages: Total number of pages in the document
135+
document_structure_hint: Additional context about the document structure
136+
137+
Returns:
138+
List of dictionaries containing section titles and content
139+
"""
140+
encoded_image = self._encode_image(image)
141+
142+
try:
143+
# Prepare the API call
144+
base_system_prompt = (
145+
self._SYSTEM_PROMPT
146+
if not previous_pages_json
147+
else self._BATCH_SYSTEM_PROMPT
148+
)
149+
150+
# Enhance system prompt with document structure hints if provided
151+
system_prompt = base_system_prompt
152+
if document_structure_hint:
153+
system_prompt = f"{base_system_prompt}\n\nAdditional document structure information:\n{document_structure_hint}"
154+
155+
messages = [{"role": "system", "content": system_prompt}]
156+
157+
user_content = [
158+
{
159+
"type": "text",
160+
"text": f"Extract the text from this document page (page {page_number} of {total_pages}) and convert it to JSON format.",
161+
}
162+
]
163+
164+
# Add previous pages content as context if available
165+
if previous_pages_json:
166+
user_content.append(
167+
{
168+
"type": "text",
169+
"text": f"Previous pages transcription for context:\n\n{previous_pages_json}\n\nNow transcribe ONLY the current image to JSON:",
170+
}
171+
)
172+
173+
# Add the current image
174+
user_content.append(
175+
{
176+
"type": "image_url",
177+
"image_url": {"url": f"data:image/png;base64,{encoded_image}"},
178+
}
179+
)
180+
181+
messages.append({"role": "user", "content": user_content})
182+
183+
completion_args = {
184+
"model": f"azure/{self.deployment_name}",
185+
"api_base": self.api_base,
186+
"api_version": self.api_version,
187+
"messages": messages,
188+
"max_tokens": 4096,
189+
}
190+
191+
# Use the appropriate authentication method
192+
if self.azure_ad_token:
193+
completion_args["azure_ad_token"] = self.azure_ad_token
194+
else:
195+
completion_args["api_key"] = self.api_key
196+
197+
# Make the API call
198+
response = completion(**completion_args)
199+
200+
# Extract the response content
201+
extracted_json_text = response["choices"][0]["message"]["content"].strip()
202+
203+
# Parse the JSON response
204+
try:
205+
page_sections = json.loads(extracted_json_text)
206+
if not isinstance(page_sections, list):
207+
raise ValueError("Response is not a JSON array")
208+
209+
# Validate the structure
210+
for section in page_sections:
211+
if not isinstance(section, dict) or "section_title" not in section or "section_content" not in section:
212+
raise ValueError("Invalid section structure")
213+
214+
return page_sections
215+
except (json.JSONDecodeError, ValueError) as e:
216+
self.logger.error(f"Error parsing JSON response for page {page_number}: {e}")
217+
# Return a fallback structure
218+
return [{
219+
"section_title": f"Page {page_number} (Parse Error)",
220+
"section_content": f"*Error parsing JSON response: {str(e)}*\n\nRaw response:\n{extracted_json_text}"
221+
}]
222+
223+
except Exception as e:
224+
self.logger.error(f"Error processing page {page_number} with LLM: {e}")
225+
return [{
226+
"section_title": f"Page {page_number} (Processing Error)",
227+
"section_content": f"*Error processing page {page_number}: {str(e)}*"
228+
}]
229+
230+
def _process_pages_in_batches(
231+
self,
232+
images: List[Image.Image],
233+
batch_size: int = 5,
234+
document_structure_hint: str = "",
235+
) -> List[Dict[str, str]]:
236+
"""
237+
Process pages in batches, providing context from previous pages.
238+
239+
Args:
240+
images: List of PIL Image objects
241+
batch_size: Number of pages to process in each batch
242+
document_structure_hint: Additional context about the document structure
243+
244+
Returns:
245+
Combined list of sections from all pages
246+
"""
247+
total_pages = len(images)
248+
all_sections = []
249+
accumulated_context_sections = []
250+
251+
self.logger.info(f"Processing {total_pages} pages in batches of {batch_size}")
252+
253+
for i, image in enumerate(images):
254+
page_number = i + 1
255+
self.logger.info(f"Processing page {page_number}/{total_pages}")
256+
257+
# Determine context to use - limit to recent pages to prevent context overflow
258+
if i >= batch_size:
259+
# Keep only the latest batch_size pages worth of sections
260+
context_sections = accumulated_context_sections[-batch_size * 10:] # Approximate limit
261+
else:
262+
context_sections = accumulated_context_sections
263+
264+
# Convert context sections to JSON string
265+
context_json = json.dumps(context_sections, indent=2) if context_sections else ""
266+
267+
# Process the current page with context
268+
page_sections = self._process_page_with_llm(
269+
image,
270+
previous_pages_json=context_json,
271+
page_number=page_number,
272+
total_pages=total_pages,
273+
document_structure_hint=document_structure_hint,
274+
)
275+
276+
# Add page information to each section
277+
for section in page_sections:
278+
section["page_number"] = page_number
279+
280+
# Add to results
281+
all_sections.extend(page_sections)
282+
accumulated_context_sections.extend(page_sections)
283+
284+
return all_sections
285+
286+
def extract_text(self, file_path: str, **kwargs) -> str:
287+
"""
288+
Extract text from a PDF file using an LLM to process page images and return JSON format.
289+
290+
The function converts each PDF page to an image, sends it to the LLM,
291+
and combines the results as a JSON string containing structured sections.
292+
293+
Args:
294+
file_path: Path to the PDF file
295+
**kwargs: Additional parameters that can include:
296+
- api_key: Override the default Azure OpenAI API key
297+
- api_base: Override the default Azure OpenAI API base URL
298+
- api_version: Override the default Azure OpenAI API version
299+
- deployment_name: Override the default Azure OpenAI deployment name
300+
- azure_ad_token: Override the default Azure AD token
301+
- dpi: DPI for PDF to image conversion (default: 300)
302+
- batch_size: Number of pages to process in each batch (default: 5)
303+
- document_structure_hint: Additional context about the document structure
304+
to help the LLM better understand the document format (e.g., "This is a
305+
two-column scientific paper with mathematical equations and tables.")
306+
307+
Returns:
308+
JSON string containing extracted sections from all pages
309+
"""
310+
# Override instance attributes with kwargs if provided
311+
api_key = kwargs.get("api_key", self.api_key)
312+
api_base = kwargs.get("api_base", self.api_base)
313+
api_version = kwargs.get("api_version", self.api_version)
314+
deployment_name = kwargs.get("deployment_name", self.deployment_name)
315+
azure_ad_token = kwargs.get("azure_ad_token", self.azure_ad_token)
316+
317+
# Validate required parameters
318+
if not deployment_name:
319+
raise ValueError("Azure OpenAI deployment_name is required")
320+
if not api_base:
321+
raise ValueError("Azure OpenAI api_base is required")
322+
if not (api_key or azure_ad_token):
323+
raise ValueError("Either api_key or azure_ad_token must be provided")
324+
325+
# Set instance variables for this extraction
326+
self.api_key = api_key
327+
self.api_base = api_base
328+
self.api_version = api_version
329+
self.deployment_name = deployment_name
330+
self.azure_ad_token = azure_ad_token
331+
332+
# Get batch size for processing
333+
batch_size = kwargs.get("batch_size", 5)
334+
335+
# Get document structure hint if provided
336+
document_structure_hint = kwargs.get("document_structure_hint", "")
337+
338+
# Convert PDF to images
339+
dpi = kwargs.get("dpi", 300) # Higher DPI for better quality
340+
self.logger.info(f"Converting PDF to images with DPI {dpi}: {file_path}")
341+
images = convert_from_path(file_path, dpi=dpi)
342+
343+
# Process pages in batches with context from previous pages
344+
all_sections = self._process_pages_in_batches(
345+
images,
346+
batch_size=batch_size,
347+
document_structure_hint=document_structure_hint,
348+
)
349+
350+
# Return as JSON string
351+
return json.dumps(all_sections, indent=2, ensure_ascii=False)

0 commit comments

Comments
 (0)