classMathpixPDFReader(BaseReader):"""Load `PDF` files using `Mathpix` service."""def__init__(self,processed_file_format:str="md",max_wait_time_seconds:int=900,should_clean_pdf:bool=True,**kwargs:Any,)->None:"""Initialize with a file path. Args: processed_file_format: a format of the processed file. Default is "mmd". max_wait_time_seconds: a maximum time to wait for the response from the server. Default is 500. should_clean_pdf: a flag to clean the PDF file. Default is False. **kwargs: additional keyword arguments. """self.mathpix_api_key=get_from_dict_or_env(kwargs,"mathpix_api_key","MATHPIX_API_KEY",default="empty")self.mathpix_api_id=get_from_dict_or_env(kwargs,"mathpix_api_id","MATHPIX_API_ID",default="empty")self.processed_file_format=processed_file_formatself.max_wait_time_seconds=max_wait_time_secondsself.should_clean_pdf=should_clean_pdfsuper().__init__()@propertydef_mathpix_headers(self)->Dict[str,str]:return{"app_id":self.mathpix_api_id,"app_key":self.mathpix_api_key}@propertydefurl(self)->str:return"https://api.mathpix.com/v3/pdf"@propertydefdata(self)->dict:options={"conversion_formats":{self.processed_file_format:True},"enable_tables_fallback":True,}return{"options_json":json.dumps(options)}defsend_pdf(self,file_path)->str:withopen(file_path,"rb")asf:files={"file":f}response=requests.post(self.url,headers=self._mathpix_headers,files=files,data=self.data)response_data=response.json()if"pdf_id"inresponse_data:pdf_id=response_data["pdf_id"]returnpdf_idelse:raiseValueError("Unable to send PDF to Mathpix.")defwait_for_processing(self,pdf_id:str)->None:"""Wait for processing to complete. Args: pdf_id: a PDF id. Returns: None """url=self.url+"/"+pdf_idfor_inrange(0,self.max_wait_time_seconds,5):response=requests.get(url,headers=self._mathpix_headers)response_data=response.json()status=response_data.get("status",None)print(f"Processing status: {status},"f"Progress: {response_data.get('percent_done',0)}%")ifstatus=="completed":returnelifstatus=="error":raiseValueError(f"Mathpix processing error: {response_data}")elifstatusin["split","processing",]:# Add handling for processing statestime.sleep(5)continueelse:print(f"Unknown status: {response_data}")time.sleep(5)raiseTimeoutError(f"Processing did not complete within {self.max_wait_time_seconds} seconds")defget_processed_pdf(self,pdf_id:str)->str:self.wait_for_processing(pdf_id)url=f"{self.url}/{pdf_id}.{self.processed_file_format}"response=requests.get(url,headers=self._mathpix_headers)ifresponse.status_code!=200:raiseValueError(f"Failed to get processed PDF: {response.text}")content=response.content.decode("utf-8")print(f"Retrieved content length: {len(content)}")# Debug printreturncontentdefclean_pdf(self,contents:str)->str:"""Clean the PDF file. Args: contents: a PDF file contents. Returns: """contents="\n".join([lineforlineincontents.split("\n")ifnotline.startswith("![]")])# replace \section{Title} with # Titlecontents=contents.replace("\\section{","# ")# replace the "\" slash that Mathpix adds to escape $, %, (, etc.# http:// or https:// followed by anything but a closing parenurl_regex="http[s]?://[^)]+"markup_regex=r"\[]\(\s*({0})\s*\)".format(url_regex)contents=(contents.replace(r"\$","$").replace(r"\%","%").replace(r"\(","(").replace(r"\)",")").replace("$\\begin{array}","").replace("\\end{array}$","").replace("\\\\","").replace("\\text","").replace("}","").replace("{","").replace("\\mathrm",""))contents=re.sub(markup_regex,"",contents)returncontentsdefparse_markdown_text_to_tables(self,content:str)->tuple[list[tuple[int,str]],list[tuple[int,str]]]:"""Parse markdown text to get tables and texts separately. Returns: Tuple of (tables, texts) where each is a list of (page_num, content) tuples """print("Starting markdown parsing...")print(f"Content length: {len(content)}")# Split by page markers if presentpages=re.split(r"(?m)^# Page \d+\n",content)tables:list[tuple[int,str]]=[]texts:list[tuple[int,str]]=[]forpage_num,page_contentinenumerate(pages,1):ifnotpage_content.strip():continue# Extract tables from the pagetable_matches=re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)",page_content)iftable_matches:fortableintable_matches:tables.append((page_num,table.strip()))# Store as tuple with page number# Remove tables from page contentpage_content=re.sub(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)","",page_content)# Split remaining content into meaningful chunkschunks=re.split(r"\n\s*\n",page_content)forchunkinchunks:ifchunk.strip():texts.append((page_num,chunk.strip()))# Store as tuple with page numberprint(f"Found {len(tables)} tables and {len(texts)} text sections")returntables,textsdefload_data(self,file:Union[str,List[str],Path],extra_info:Optional[Dict]=None,**load_kwargs:Any,)->List[Document]:"""Load data from file path."""file_path=Path(file)ifisinstance(file,str)elsefileif"response_content"inload_kwargs:content=load_kwargs["response_content"]else:pdf_id=self.send_pdf(file_path)content=self.get_processed_pdf(pdf_id)ifself.should_clean_pdf:content=self.clean_pdf(content)tables,texts=self.parse_markdown_text_to_tables(content)documents=[]# Handle tablesforpage_num,table_contentintables:text=strip_special_chars_markdown(table_content)metadata={"table_origin":table_content,"type":"table","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=text,metadata=metadata,metadata_template="",metadata_seperator="",))# Handle text sectionsforpage_num,text_contentintexts:ifnottext_content.strip():continuemetadata={"source":str(file_path),"type":"text","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=text_content,metadata=metadata))# Fallback if no content was parsedifnotdocumentsandcontent.strip():metadata={"source":str(file_path),"type":"text","page_label":1,"page_number":1,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=content.strip(),metadata=metadata))returndocumentsdeflazy_load_data(self,file:Union[str,List[str],Path],extra_info:Optional[Dict]=None,**load_kwargs:Any,)->Generator[Document,None,None]:"""Lazy load data from file path."""file_path=Path(file)ifisinstance(file,str)elsefileif"response_content"inload_kwargs:content=load_kwargs["response_content"]else:pdf_id=self.send_pdf(file_path)print(f"PDF ID: {pdf_id}")content=self.get_processed_pdf(pdf_id)ifself.should_clean_pdf:content=self.clean_pdf(content)tables,texts=self.parse_markdown_text_to_tables(content)# Handle tablesforpage_num,table_contentintables:# Changed variable name for claritytext=strip_special_chars_markdown(table_content)# Pass just the contentmetadata={"table_origin":table_content,# Use table_content here too"type":"table","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)yieldDocument(text=text,metadata=metadata,metadata_template="",metadata_seperator="",)# Handle text sectionsforpage_num,text_contentintexts:# Changed variable name for clarityifnottext_content.strip():continuemetadata={"source":str(file_path),"type":"text","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)yieldDocument(text=text_content,metadata=metadata)# Use text_content directly# Fallback if no content was parsedifnot(tablesortexts)andcontent.strip():metadata={"source":str(file_path),"type":"text","page_label":1,"page_number":1,}ifextra_info:metadata.update(extra_info)yieldDocument(text=content.strip(),metadata=metadata)print(f"Completed processing PDF: {file_path}")
defwait_for_processing(self,pdf_id:str)->None:"""Wait for processing to complete. Args: pdf_id: a PDF id. Returns: None """url=self.url+"/"+pdf_idfor_inrange(0,self.max_wait_time_seconds,5):response=requests.get(url,headers=self._mathpix_headers)response_data=response.json()status=response_data.get("status",None)print(f"Processing status: {status},"f"Progress: {response_data.get('percent_done',0)}%")ifstatus=="completed":returnelifstatus=="error":raiseValueError(f"Mathpix processing error: {response_data}")elifstatusin["split","processing",]:# Add handling for processing statestime.sleep(5)continueelse:print(f"Unknown status: {response_data}")time.sleep(5)raiseTimeoutError(f"Processing did not complete within {self.max_wait_time_seconds} seconds")
defclean_pdf(self,contents:str)->str:"""Clean the PDF file. Args: contents: a PDF file contents. Returns: """contents="\n".join([lineforlineincontents.split("\n")ifnotline.startswith("![]")])# replace \section{Title} with # Titlecontents=contents.replace("\\section{","# ")# replace the "\" slash that Mathpix adds to escape $, %, (, etc.# http:// or https:// followed by anything but a closing parenurl_regex="http[s]?://[^)]+"markup_regex=r"\[]\(\s*({0})\s*\)".format(url_regex)contents=(contents.replace(r"\$","$").replace(r"\%","%").replace(r"\(","(").replace(r"\)",")").replace("$\\begin{array}","").replace("\\end{array}$","").replace("\\\\","").replace("\\text","").replace("}","").replace("{","").replace("\\mathrm",""))contents=re.sub(markup_regex,"",contents)returncontents
defparse_markdown_text_to_tables(self,content:str)->tuple[list[tuple[int,str]],list[tuple[int,str]]]:"""Parse markdown text to get tables and texts separately. Returns: Tuple of (tables, texts) where each is a list of (page_num, content) tuples """print("Starting markdown parsing...")print(f"Content length: {len(content)}")# Split by page markers if presentpages=re.split(r"(?m)^# Page \d+\n",content)tables:list[tuple[int,str]]=[]texts:list[tuple[int,str]]=[]forpage_num,page_contentinenumerate(pages,1):ifnotpage_content.strip():continue# Extract tables from the pagetable_matches=re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)",page_content)iftable_matches:fortableintable_matches:tables.append((page_num,table.strip()))# Store as tuple with page number# Remove tables from page contentpage_content=re.sub(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)","",page_content)# Split remaining content into meaningful chunkschunks=re.split(r"\n\s*\n",page_content)forchunkinchunks:ifchunk.strip():texts.append((page_num,chunk.strip()))# Store as tuple with page numberprint(f"Found {len(tables)} tables and {len(texts)} text sections")returntables,texts
defload_data(self,file:Union[str,List[str],Path],extra_info:Optional[Dict]=None,**load_kwargs:Any,)->List[Document]:"""Load data from file path."""file_path=Path(file)ifisinstance(file,str)elsefileif"response_content"inload_kwargs:content=load_kwargs["response_content"]else:pdf_id=self.send_pdf(file_path)content=self.get_processed_pdf(pdf_id)ifself.should_clean_pdf:content=self.clean_pdf(content)tables,texts=self.parse_markdown_text_to_tables(content)documents=[]# Handle tablesforpage_num,table_contentintables:text=strip_special_chars_markdown(table_content)metadata={"table_origin":table_content,"type":"table","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=text,metadata=metadata,metadata_template="",metadata_seperator="",))# Handle text sectionsforpage_num,text_contentintexts:ifnottext_content.strip():continuemetadata={"source":str(file_path),"type":"text","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=text_content,metadata=metadata))# Fallback if no content was parsedifnotdocumentsandcontent.strip():metadata={"source":str(file_path),"type":"text","page_label":1,"page_number":1,}ifextra_info:metadata.update(extra_info)documents.append(Document(text=content.strip(),metadata=metadata))returndocuments
deflazy_load_data(self,file:Union[str,List[str],Path],extra_info:Optional[Dict]=None,**load_kwargs:Any,)->Generator[Document,None,None]:"""Lazy load data from file path."""file_path=Path(file)ifisinstance(file,str)elsefileif"response_content"inload_kwargs:content=load_kwargs["response_content"]else:pdf_id=self.send_pdf(file_path)print(f"PDF ID: {pdf_id}")content=self.get_processed_pdf(pdf_id)ifself.should_clean_pdf:content=self.clean_pdf(content)tables,texts=self.parse_markdown_text_to_tables(content)# Handle tablesforpage_num,table_contentintables:# Changed variable name for claritytext=strip_special_chars_markdown(table_content)# Pass just the contentmetadata={"table_origin":table_content,# Use table_content here too"type":"table","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)yieldDocument(text=text,metadata=metadata,metadata_template="",metadata_seperator="",)# Handle text sectionsforpage_num,text_contentintexts:# Changed variable name for clarityifnottext_content.strip():continuemetadata={"source":str(file_path),"type":"text","page_label":page_num,"page_number":page_num,}ifextra_info:metadata.update(extra_info)yieldDocument(text=text_content,metadata=metadata)# Use text_content directly# Fallback if no content was parsedifnot(tablesortexts)andcontent.strip():metadata={"source":str(file_path),"type":"text","page_label":1,"page_number":1,}ifextra_info:metadata.update(extra_info)yieldDocument(text=content.strip(),metadata=metadata)print(f"Completed processing PDF: {file_path}")