Skip to content

Mathpix Loader

MathpixPDFReader

Bases: BaseReader

Load PDF files using Mathpix service.

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
class MathpixPDFReader(BaseReader):
    """Load `PDF` files using `Mathpix` service."""

    def __init__(
        self,
        processed_file_format: str = "md",
        max_wait_time_seconds: int = 900,
        should_clean_pdf: bool = True,
        **kwargs: Any,
    ) -> None:
        """Initialize with a file path.

        Args:
            processed_file_format: a format of the processed file. Default is   "mmd".
            max_wait_time_seconds: a maximum time to wait for the response from
                the server. Default is 500.
            should_clean_pdf: a flag to clean the PDF file. Default is False.
            **kwargs: additional keyword arguments.
        """
        self.mathpix_api_key = get_from_dict_or_env(
            kwargs, "mathpix_api_key", "MATHPIX_API_KEY", default="empty"
        )
        self.mathpix_api_id = get_from_dict_or_env(
            kwargs, "mathpix_api_id", "MATHPIX_API_ID", default="empty"
        )
        self.processed_file_format = processed_file_format
        self.max_wait_time_seconds = max_wait_time_seconds
        self.should_clean_pdf = should_clean_pdf
        super().__init__()

    @property
    def _mathpix_headers(self) -> Dict[str, str]:
        return {"app_id": self.mathpix_api_id, "app_key": self.mathpix_api_key}

    @property
    def url(self) -> str:
        return "https://api.mathpix.com/v3/pdf"

    @property
    def data(self) -> dict:
        options = {
            "conversion_formats": {self.processed_file_format: True},
            "enable_tables_fallback": True,
        }
        return {"options_json": json.dumps(options)}

    def send_pdf(self, file_path) -> str:
        with open(file_path, "rb") as f:
            files = {"file": f}
            response = requests.post(
                self.url, headers=self._mathpix_headers, files=files, data=self.data
            )
        response_data = response.json()
        if "pdf_id" in response_data:
            pdf_id = response_data["pdf_id"]
            return pdf_id
        else:
            raise ValueError("Unable to send PDF to Mathpix.")

    def wait_for_processing(self, pdf_id: str) -> None:
        """Wait for processing to complete.

        Args:
            pdf_id: a PDF id.

        Returns: None
        """
        url = self.url + "/" + pdf_id
        for _ in range(0, self.max_wait_time_seconds, 5):
            response = requests.get(url, headers=self._mathpix_headers)
            response_data = response.json()
            status = response_data.get("status", None)
            print(
                f"Processing status: {status},"
                f"Progress: {response_data.get('percent_done', 0)}%"
            )

            if status == "completed":
                return
            elif status == "error":
                raise ValueError(f"Mathpix processing error: {response_data}")
            elif status in [
                "split",
                "processing",
            ]:  # Add handling for processing states
                time.sleep(5)
                continue
            else:
                print(f"Unknown status: {response_data}")
                time.sleep(5)

        raise TimeoutError(
            f"Processing did not complete within {self.max_wait_time_seconds} seconds"
        )

    def get_processed_pdf(self, pdf_id: str) -> str:
        self.wait_for_processing(pdf_id)
        url = f"{self.url}/{pdf_id}.{self.processed_file_format}"
        response = requests.get(url, headers=self._mathpix_headers)
        if response.status_code != 200:
            raise ValueError(f"Failed to get processed PDF: {response.text}")
        content = response.content.decode("utf-8")
        print(f"Retrieved content length: {len(content)}")  # Debug print
        return content

    def clean_pdf(self, contents: str) -> str:
        """Clean the PDF file.

        Args:
            contents: a PDF file contents.

        Returns:

        """
        contents = "\n".join(
            [line for line in contents.split("\n") if not line.startswith("![]")]
        )
        # replace \section{Title} with # Title
        contents = contents.replace("\\section{", "# ")
        # replace the "\" slash that Mathpix adds to escape $, %, (, etc.

        # http:// or https:// followed by anything but a closing paren
        url_regex = "http[s]?://[^)]+"
        markup_regex = r"\[]\(\s*({0})\s*\)".format(url_regex)
        contents = (
            contents.replace(r"\$", "$")
            .replace(r"\%", "%")
            .replace(r"\(", "(")
            .replace(r"\)", ")")
            .replace("$\\begin{array}", "")
            .replace("\\end{array}$", "")
            .replace("\\\\", "")
            .replace("\\text", "")
            .replace("}", "")
            .replace("{", "")
            .replace("\\mathrm", "")
        )
        contents = re.sub(markup_regex, "", contents)
        return contents

    def parse_markdown_text_to_tables(
        self, content: str
    ) -> tuple[list[tuple[int, str]], list[tuple[int, str]]]:
        """Parse markdown text to get tables and texts separately.

        Returns:
            Tuple of (tables, texts) where each is a list of (page_num, content) tuples
        """
        print("Starting markdown parsing...")
        print(f"Content length: {len(content)}")

        # Split by page markers if present
        pages = re.split(r"(?m)^# Page \d+\n", content)

        tables: list[tuple[int, str]] = []
        texts: list[tuple[int, str]] = []

        for page_num, page_content in enumerate(pages, 1):
            if not page_content.strip():
                continue

            # Extract tables from the page
            table_matches = re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", page_content)
            if table_matches:
                for table in table_matches:
                    tables.append(
                        (page_num, table.strip())
                    )  # Store as tuple with page number
                # Remove tables from page content
                page_content = re.sub(
                    r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", "", page_content
                )

            # Split remaining content into meaningful chunks
            chunks = re.split(r"\n\s*\n", page_content)
            for chunk in chunks:
                if chunk.strip():
                    texts.append(
                        (page_num, chunk.strip())
                    )  # Store as tuple with page number

        print(f"Found {len(tables)} tables and {len(texts)} text sections")
        return tables, texts

    def load_data(
        self,
        file: Union[str, List[str], Path],
        extra_info: Optional[Dict] = None,
        **load_kwargs: Any,
    ) -> List[Document]:
        """Load data from file path."""
        file_path = Path(file) if isinstance(file, str) else file

        if "response_content" in load_kwargs:
            content = load_kwargs["response_content"]
        else:
            pdf_id = self.send_pdf(file_path)
            content = self.get_processed_pdf(pdf_id)

        if self.should_clean_pdf:
            content = self.clean_pdf(content)

        tables, texts = self.parse_markdown_text_to_tables(content)
        documents = []

        # Handle tables
        for page_num, table_content in tables:
            text = strip_special_chars_markdown(table_content)
            metadata = {
                "table_origin": table_content,
                "type": "table",
                "page_label": page_num,
                "page_number": page_num,
            }
            if extra_info:
                metadata.update(extra_info)
            documents.append(
                Document(
                    text=text,
                    metadata=metadata,
                    metadata_template="",
                    metadata_seperator="",
                )
            )

        # Handle text sections
        for page_num, text_content in texts:
            if not text_content.strip():
                continue
            metadata = {
                "source": str(file_path),
                "type": "text",
                "page_label": page_num,
                "page_number": page_num,
            }
            if extra_info:
                metadata.update(extra_info)
            documents.append(Document(text=text_content, metadata=metadata))

        # Fallback if no content was parsed
        if not documents and content.strip():
            metadata = {
                "source": str(file_path),
                "type": "text",
                "page_label": 1,
                "page_number": 1,
            }
            if extra_info:
                metadata.update(extra_info)
            documents.append(Document(text=content.strip(), metadata=metadata))

        return documents

    def lazy_load_data(
        self,
        file: Union[str, List[str], Path],
        extra_info: Optional[Dict] = None,
        **load_kwargs: Any,
    ) -> Generator[Document, None, None]:
        """Lazy load data from file path."""
        file_path = Path(file) if isinstance(file, str) else file

        if "response_content" in load_kwargs:
            content = load_kwargs["response_content"]
        else:
            pdf_id = self.send_pdf(file_path)
            print(f"PDF ID: {pdf_id}")
            content = self.get_processed_pdf(pdf_id)

        if self.should_clean_pdf:
            content = self.clean_pdf(content)

        tables, texts = self.parse_markdown_text_to_tables(content)

        # Handle tables
        for page_num, table_content in tables:  # Changed variable name for clarity
            text = strip_special_chars_markdown(table_content)  # Pass just the content
            metadata = {
                "table_origin": table_content,  # Use table_content here too
                "type": "table",
                "page_label": page_num,
                "page_number": page_num,
            }
            if extra_info:
                metadata.update(extra_info)
            yield Document(
                text=text,
                metadata=metadata,
                metadata_template="",
                metadata_seperator="",
            )

        # Handle text sections
        for page_num, text_content in texts:  # Changed variable name for clarity
            if not text_content.strip():
                continue
            metadata = {
                "source": str(file_path),
                "type": "text",
                "page_label": page_num,
                "page_number": page_num,
            }
            if extra_info:
                metadata.update(extra_info)
            yield Document(
                text=text_content, metadata=metadata
            )  # Use text_content directly

        # Fallback if no content was parsed
        if not (tables or texts) and content.strip():
            metadata = {
                "source": str(file_path),
                "type": "text",
                "page_label": 1,
                "page_number": 1,
            }
            if extra_info:
                metadata.update(extra_info)
            yield Document(text=content.strip(), metadata=metadata)

        print(f"Completed processing PDF: {file_path}")

wait_for_processing

wait_for_processing(pdf_id)

Wait for processing to complete.

Parameters:

Name Type Description Default
pdf_id str

a PDF id.

required

Returns: None

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def wait_for_processing(self, pdf_id: str) -> None:
    """Wait for processing to complete.

    Args:
        pdf_id: a PDF id.

    Returns: None
    """
    url = self.url + "/" + pdf_id
    for _ in range(0, self.max_wait_time_seconds, 5):
        response = requests.get(url, headers=self._mathpix_headers)
        response_data = response.json()
        status = response_data.get("status", None)
        print(
            f"Processing status: {status},"
            f"Progress: {response_data.get('percent_done', 0)}%"
        )

        if status == "completed":
            return
        elif status == "error":
            raise ValueError(f"Mathpix processing error: {response_data}")
        elif status in [
            "split",
            "processing",
        ]:  # Add handling for processing states
            time.sleep(5)
            continue
        else:
            print(f"Unknown status: {response_data}")
            time.sleep(5)

    raise TimeoutError(
        f"Processing did not complete within {self.max_wait_time_seconds} seconds"
    )

clean_pdf

clean_pdf(contents)

Clean the PDF file.

Parameters:

Name Type Description Default
contents str

a PDF file contents.

required

Returns:

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def clean_pdf(self, contents: str) -> str:
    """Clean the PDF file.

    Args:
        contents: a PDF file contents.

    Returns:

    """
    contents = "\n".join(
        [line for line in contents.split("\n") if not line.startswith("![]")]
    )
    # replace \section{Title} with # Title
    contents = contents.replace("\\section{", "# ")
    # replace the "\" slash that Mathpix adds to escape $, %, (, etc.

    # http:// or https:// followed by anything but a closing paren
    url_regex = "http[s]?://[^)]+"
    markup_regex = r"\[]\(\s*({0})\s*\)".format(url_regex)
    contents = (
        contents.replace(r"\$", "$")
        .replace(r"\%", "%")
        .replace(r"\(", "(")
        .replace(r"\)", ")")
        .replace("$\\begin{array}", "")
        .replace("\\end{array}$", "")
        .replace("\\\\", "")
        .replace("\\text", "")
        .replace("}", "")
        .replace("{", "")
        .replace("\\mathrm", "")
    )
    contents = re.sub(markup_regex, "", contents)
    return contents

parse_markdown_text_to_tables

parse_markdown_text_to_tables(content)

Parse markdown text to get tables and texts separately.

Returns:

Type Description
tuple[list[tuple[int, str]], list[tuple[int, str]]]

Tuple of (tables, texts) where each is a list of (page_num, content) tuples

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def parse_markdown_text_to_tables(
    self, content: str
) -> tuple[list[tuple[int, str]], list[tuple[int, str]]]:
    """Parse markdown text to get tables and texts separately.

    Returns:
        Tuple of (tables, texts) where each is a list of (page_num, content) tuples
    """
    print("Starting markdown parsing...")
    print(f"Content length: {len(content)}")

    # Split by page markers if present
    pages = re.split(r"(?m)^# Page \d+\n", content)

    tables: list[tuple[int, str]] = []
    texts: list[tuple[int, str]] = []

    for page_num, page_content in enumerate(pages, 1):
        if not page_content.strip():
            continue

        # Extract tables from the page
        table_matches = re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", page_content)
        if table_matches:
            for table in table_matches:
                tables.append(
                    (page_num, table.strip())
                )  # Store as tuple with page number
            # Remove tables from page content
            page_content = re.sub(
                r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", "", page_content
            )

        # Split remaining content into meaningful chunks
        chunks = re.split(r"\n\s*\n", page_content)
        for chunk in chunks:
            if chunk.strip():
                texts.append(
                    (page_num, chunk.strip())
                )  # Store as tuple with page number

    print(f"Found {len(tables)} tables and {len(texts)} text sections")
    return tables, texts

load_data

load_data(file, extra_info=None, **load_kwargs)

Load data from file path.

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def load_data(
    self,
    file: Union[str, List[str], Path],
    extra_info: Optional[Dict] = None,
    **load_kwargs: Any,
) -> List[Document]:
    """Load data from file path."""
    file_path = Path(file) if isinstance(file, str) else file

    if "response_content" in load_kwargs:
        content = load_kwargs["response_content"]
    else:
        pdf_id = self.send_pdf(file_path)
        content = self.get_processed_pdf(pdf_id)

    if self.should_clean_pdf:
        content = self.clean_pdf(content)

    tables, texts = self.parse_markdown_text_to_tables(content)
    documents = []

    # Handle tables
    for page_num, table_content in tables:
        text = strip_special_chars_markdown(table_content)
        metadata = {
            "table_origin": table_content,
            "type": "table",
            "page_label": page_num,
            "page_number": page_num,
        }
        if extra_info:
            metadata.update(extra_info)
        documents.append(
            Document(
                text=text,
                metadata=metadata,
                metadata_template="",
                metadata_seperator="",
            )
        )

    # Handle text sections
    for page_num, text_content in texts:
        if not text_content.strip():
            continue
        metadata = {
            "source": str(file_path),
            "type": "text",
            "page_label": page_num,
            "page_number": page_num,
        }
        if extra_info:
            metadata.update(extra_info)
        documents.append(Document(text=text_content, metadata=metadata))

    # Fallback if no content was parsed
    if not documents and content.strip():
        metadata = {
            "source": str(file_path),
            "type": "text",
            "page_label": 1,
            "page_number": 1,
        }
        if extra_info:
            metadata.update(extra_info)
        documents.append(Document(text=content.strip(), metadata=metadata))

    return documents

lazy_load_data

lazy_load_data(file, extra_info=None, **load_kwargs)

Lazy load data from file path.

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def lazy_load_data(
    self,
    file: Union[str, List[str], Path],
    extra_info: Optional[Dict] = None,
    **load_kwargs: Any,
) -> Generator[Document, None, None]:
    """Lazy load data from file path."""
    file_path = Path(file) if isinstance(file, str) else file

    if "response_content" in load_kwargs:
        content = load_kwargs["response_content"]
    else:
        pdf_id = self.send_pdf(file_path)
        print(f"PDF ID: {pdf_id}")
        content = self.get_processed_pdf(pdf_id)

    if self.should_clean_pdf:
        content = self.clean_pdf(content)

    tables, texts = self.parse_markdown_text_to_tables(content)

    # Handle tables
    for page_num, table_content in tables:  # Changed variable name for clarity
        text = strip_special_chars_markdown(table_content)  # Pass just the content
        metadata = {
            "table_origin": table_content,  # Use table_content here too
            "type": "table",
            "page_label": page_num,
            "page_number": page_num,
        }
        if extra_info:
            metadata.update(extra_info)
        yield Document(
            text=text,
            metadata=metadata,
            metadata_template="",
            metadata_seperator="",
        )

    # Handle text sections
    for page_num, text_content in texts:  # Changed variable name for clarity
        if not text_content.strip():
            continue
        metadata = {
            "source": str(file_path),
            "type": "text",
            "page_label": page_num,
            "page_number": page_num,
        }
        if extra_info:
            metadata.update(extra_info)
        yield Document(
            text=text_content, metadata=metadata
        )  # Use text_content directly

    # Fallback if no content was parsed
    if not (tables or texts) and content.strip():
        metadata = {
            "source": str(file_path),
            "type": "text",
            "page_label": 1,
            "page_number": 1,
        }
        if extra_info:
            metadata.update(extra_info)
        yield Document(text=content.strip(), metadata=metadata)

    print(f"Completed processing PDF: {file_path}")