Skip to content

Loaders

AdobeReader

Bases: BaseReader

Read PDF using the Adobe's PDF Services. Be able to extract text, table, and figure with high accuracy

Example
1
2
3
>> from kotaemon.loaders import AdobeReader
>> reader = AdobeReader()
>> documents = reader.load_data("path/to/pdf")

Args: endpoint: URL to the Vision Language Model endpoint. If not provided, will use the default kotaemon.loaders.adobe_loader.DEFAULT_VLM_ENDPOINT

1
2
max_figures_to_caption: an int decides how many figured will be captioned.
The rest will be ignored (are indexed without captions).
Source code in libs/kotaemon/kotaemon/loaders/adobe_loader.py
class AdobeReader(BaseReader):
    """Read PDF using the Adobe's PDF Services.
    Be able to extract text, table, and figure with high accuracy

    Example:
        ```python
        >> from kotaemon.loaders import AdobeReader
        >> reader = AdobeReader()
        >> documents = reader.load_data("path/to/pdf")
        ```
    Args:
        endpoint: URL to the Vision Language Model endpoint. If not provided,
        will use the default `kotaemon.loaders.adobe_loader.DEFAULT_VLM_ENDPOINT`

        max_figures_to_caption: an int decides how many figured will be captioned.
        The rest will be ignored (are indexed without captions).
    """

    def __init__(
        self,
        vlm_endpoint: Optional[str] = None,
        max_figures_to_caption: int = 100,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Init params"""
        super().__init__(*args)
        self.table_regex = r"/Table(\[\d+\])?$"
        self.figure_regex = r"/Figure(\[\d+\])?$"
        self.vlm_endpoint = vlm_endpoint or DEFAULT_VLM_ENDPOINT
        self.max_figures_to_caption = max_figures_to_caption

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None, **kwargs
    ) -> List[Document]:
        """Load data by calling to the Adobe's API

        Args:
            file (Path): Path to the PDF file

        Returns:
            List[Document]: list of documents extracted from the PDF file,
                includes 3 types: text, table, and image

        """
        from .utils.adobe import (
            generate_figure_captions,
            load_json,
            parse_figure_paths,
            parse_table_paths,
            request_adobe_service,
        )

        filename = file.name
        filepath = str(Path(file).resolve())
        output_path = request_adobe_service(file_path=str(file), output_path="")
        results_path = os.path.join(output_path, "structuredData.json")

        if not os.path.exists(results_path):
            logger.exception("Fail to parse the document.")
            return []

        data = load_json(results_path)

        texts = defaultdict(list)
        tables = []
        figures = []

        elements = data["elements"]
        for item_id, item in enumerate(elements):
            page_number = item.get("Page", -1) + 1
            item_path = item["Path"]
            item_text = item.get("Text", "")

            file_paths = [
                Path(output_path) / path for path in item.get("filePaths", [])
            ]
            prev_item = elements[item_id - 1]
            title = prev_item.get("Text", "")

            if re.search(self.table_regex, item_path):
                table_content = parse_table_paths(file_paths)
                if not table_content:
                    continue
                table_caption = (
                    table_content.replace("|", "").replace("---", "")
                    + f"\n(Table in Page {page_number}. {title})"
                )
                tables.append((page_number, table_content, table_caption))

            elif re.search(self.figure_regex, item_path):
                figure_caption = (
                    item_text + f"\n(Figure in Page {page_number}. {title})"
                )
                figure_content = parse_figure_paths(file_paths)
                if not figure_content:
                    continue
                figures.append([page_number, figure_content, figure_caption])

            else:
                if item_text and "Table" not in item_path and "Figure" not in item_path:
                    texts[page_number].append(item_text)

        # get figure caption using GPT-4V
        figure_captions = generate_figure_captions(
            self.vlm_endpoint,
            [item[1] for item in figures],
            self.max_figures_to_caption,
        )
        for item, caption in zip(figures, figure_captions):
            # update figure caption
            item[2] += " " + caption

        # Wrap elements with Document
        documents = []

        # join plain text elements
        for page_number, txts in texts.items():
            documents.append(
                Document(
                    text="\n".join(txts),
                    metadata={
                        "page_label": page_number,
                        "file_name": filename,
                        "file_path": filepath,
                        **(extra_info if extra_info else {}),
                    },
                )
            )

        # table elements
        for page_number, table_content, table_caption in tables:
            documents.append(
                Document(
                    text=table_content,
                    metadata={
                        "table_origin": table_content,
                        "type": "table",
                        "page_label": page_number,
                        "file_name": filename,
                        "file_path": filepath,
                        **(extra_info if extra_info else {}),
                    },
                    metadata_template="",
                    metadata_seperator="",
                )
            )

        # figure elements
        for page_number, figure_content, figure_caption in figures:
            documents.append(
                Document(
                    text=figure_caption,
                    metadata={
                        "image_origin": figure_content,
                        "type": "image",
                        "page_label": page_number,
                        "file_name": filename,
                        "file_path": filepath,
                        **(extra_info if extra_info else {}),
                    },
                    metadata_template="",
                    metadata_seperator="",
                )
            )
        return documents

load_data

load_data(file, extra_info=None, **kwargs)

Load data by calling to the Adobe's API

Parameters:

Name Type Description Default
file Path

Path to the PDF file

required

Returns:

Type Description
List[Document]

List[Document]: list of documents extracted from the PDF file, includes 3 types: text, table, and image

Source code in libs/kotaemon/kotaemon/loaders/adobe_loader.py
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None, **kwargs
) -> List[Document]:
    """Load data by calling to the Adobe's API

    Args:
        file (Path): Path to the PDF file

    Returns:
        List[Document]: list of documents extracted from the PDF file,
            includes 3 types: text, table, and image

    """
    from .utils.adobe import (
        generate_figure_captions,
        load_json,
        parse_figure_paths,
        parse_table_paths,
        request_adobe_service,
    )

    filename = file.name
    filepath = str(Path(file).resolve())
    output_path = request_adobe_service(file_path=str(file), output_path="")
    results_path = os.path.join(output_path, "structuredData.json")

    if not os.path.exists(results_path):
        logger.exception("Fail to parse the document.")
        return []

    data = load_json(results_path)

    texts = defaultdict(list)
    tables = []
    figures = []

    elements = data["elements"]
    for item_id, item in enumerate(elements):
        page_number = item.get("Page", -1) + 1
        item_path = item["Path"]
        item_text = item.get("Text", "")

        file_paths = [
            Path(output_path) / path for path in item.get("filePaths", [])
        ]
        prev_item = elements[item_id - 1]
        title = prev_item.get("Text", "")

        if re.search(self.table_regex, item_path):
            table_content = parse_table_paths(file_paths)
            if not table_content:
                continue
            table_caption = (
                table_content.replace("|", "").replace("---", "")
                + f"\n(Table in Page {page_number}. {title})"
            )
            tables.append((page_number, table_content, table_caption))

        elif re.search(self.figure_regex, item_path):
            figure_caption = (
                item_text + f"\n(Figure in Page {page_number}. {title})"
            )
            figure_content = parse_figure_paths(file_paths)
            if not figure_content:
                continue
            figures.append([page_number, figure_content, figure_caption])

        else:
            if item_text and "Table" not in item_path and "Figure" not in item_path:
                texts[page_number].append(item_text)

    # get figure caption using GPT-4V
    figure_captions = generate_figure_captions(
        self.vlm_endpoint,
        [item[1] for item in figures],
        self.max_figures_to_caption,
    )
    for item, caption in zip(figures, figure_captions):
        # update figure caption
        item[2] += " " + caption

    # Wrap elements with Document
    documents = []

    # join plain text elements
    for page_number, txts in texts.items():
        documents.append(
            Document(
                text="\n".join(txts),
                metadata={
                    "page_label": page_number,
                    "file_name": filename,
                    "file_path": filepath,
                    **(extra_info if extra_info else {}),
                },
            )
        )

    # table elements
    for page_number, table_content, table_caption in tables:
        documents.append(
            Document(
                text=table_content,
                metadata={
                    "table_origin": table_content,
                    "type": "table",
                    "page_label": page_number,
                    "file_name": filename,
                    "file_path": filepath,
                    **(extra_info if extra_info else {}),
                },
                metadata_template="",
                metadata_seperator="",
            )
        )

    # figure elements
    for page_number, figure_content, figure_caption in figures:
        documents.append(
            Document(
                text=figure_caption,
                metadata={
                    "image_origin": figure_content,
                    "type": "image",
                    "page_label": page_number,
                    "file_name": filename,
                    "file_path": filepath,
                    **(extra_info if extra_info else {}),
                },
                metadata_template="",
                metadata_seperator="",
            )
        )
    return documents

AzureAIDocumentIntelligenceLoader

Bases: BaseReader

Utilize Azure AI Document Intelligence to parse document

As of April 24, the supported file formats are: pdf, jpeg/jpg, png, bmp, tiff, heif, docx, xlsx, pptx and html.

Source code in libs/kotaemon/kotaemon/loaders/azureai_document_intelligence_loader.py
class AzureAIDocumentIntelligenceLoader(BaseReader):
    """Utilize Azure AI Document Intelligence to parse document

    As of April 24, the supported file formats are: pdf, jpeg/jpg, png, bmp, tiff,
    heif, docx, xlsx, pptx and html.
    """

    _dependencies = ["azure-ai-documentintelligence", "PyMuPDF", "Pillow"]

    endpoint: str = Param(
        os.environ.get("AZUREAI_DOCUMENT_INTELLIGENT_ENDPOINT", None),
        help="Endpoint of Azure AI Document Intelligence",
    )
    credential: str = Param(
        os.environ.get("AZUREAI_DOCUMENT_INTELLIGENT_CREDENTIAL", None),
        help="Credential of Azure AI Document Intelligence",
    )
    model: str = Param(
        "prebuilt-layout",
        help=(
            "Model to use for document analysis. Default is prebuilt-layout. "
            "As of April 24, you can view the supported models [here]"
            "(https://learn.microsoft.com/en-us/azure/ai-services/"
            "document-intelligence/concept-model-overview?view=doc-intel-4.0.0"
            "#model-analysis-features)"
        ),
    )
    output_content_format: str = Param(
        "markdown",
        help="Output content format. Can be 'markdown' or 'text'.Default is markdown",
    )
    vlm_endpoint: str = Param(
        help=(
            "Default VLM endpoint for figure captioning. If not provided, will not "
            "caption the figures"
        )
    )
    figure_friendly_filetypes: list[str] = Param(
        [".pdf", ".jpeg", ".jpg", ".png", ".bmp", ".tiff", ".heif", ".tif"],
        help=(
            "File types that we can reliably open and extract figures. "
            "For files like .docx or .html, the visual layout may be different "
            "when viewed from different tools, hence we cannot use Azure DI "
            "location to extract figures."
        ),
    )
    cache_dir: str = Param(
        None,
        help="Directory to cache the downloaded files. Default is None",
    )

    @Param.auto(depends_on=["endpoint", "credential"])
    def client_(self):
        try:
            from azure.ai.documentintelligence import DocumentIntelligenceClient
            from azure.core.credentials import AzureKeyCredential
        except ImportError:
            raise ImportError("Please install azure-ai-documentintelligence")

        return DocumentIntelligenceClient(
            self.endpoint, AzureKeyCredential(self.credential)
        )

    def run(
        self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
    ) -> list[Document]:
        return self.load_data(Path(file_path), extra_info=extra_info, **kwargs)

    def load_data(
        self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
    ) -> list[Document]:
        """Extract the input file, allowing multi-modal extraction"""
        metadata = extra_info or {}
        file_name = Path(file_path)
        with open(file_path, "rb") as fi:
            poller = self.client_.begin_analyze_document(
                self.model,
                analyze_request=fi,
                content_type="application/octet-stream",
                output_content_format=self.output_content_format,
            )
            result = poller.result()

        # the total text content of the document in `output_content_format` format
        text_content = result.content
        removed_spans: list[dict] = []

        # extract the figures
        figures = []
        for figure_desc in result.get("figures", []):
            if not self.vlm_endpoint:
                continue
            if file_path.suffix.lower() not in self.figure_friendly_filetypes:
                continue

            # read & crop the image
            page_number = figure_desc["boundingRegions"][0]["pageNumber"]
            page_width = result.pages[page_number - 1]["width"]
            page_height = result.pages[page_number - 1]["height"]
            polygon = figure_desc["boundingRegions"][0]["polygon"]
            xs = [polygon[i] for i in range(0, len(polygon), 2)]
            ys = [polygon[i] for i in range(1, len(polygon), 2)]
            bbox = [
                min(xs) / page_width,
                min(ys) / page_height,
                max(xs) / page_width,
                max(ys) / page_height,
            ]
            img = crop_image(file_path, bbox, page_number - 1)

            # convert the image into base64
            img_bytes = BytesIO()
            img.save(img_bytes, format="PNG")
            img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
            img_base64 = f"data:image/png;base64,{img_base64}"

            # caption the image
            caption = generate_single_figure_caption(
                figure=img_base64, vlm_endpoint=self.vlm_endpoint
            )

            # store the image into document
            figure_metadata = {
                "image_origin": img_base64,
                "type": "image",
                "page_label": page_number,
            }
            figure_metadata.update(metadata)

            figures.append(
                Document(
                    text=caption,
                    metadata=figure_metadata,
                )
            )
            removed_spans += figure_desc["spans"]

        # extract the tables
        tables = []
        for table_desc in result.get("tables", []):
            if not table_desc["spans"]:
                continue

            # convert the tables into markdown format
            boundingRegions = table_desc["boundingRegions"]
            if boundingRegions:
                page_number = boundingRegions[0]["pageNumber"]
            else:
                page_number = 1

            # store the tables into document
            offset = table_desc["spans"][0]["offset"]
            length = table_desc["spans"][0]["length"]
            table_metadata = {
                "type": "table",
                "page_label": page_number,
                "table_origin": text_content[offset : offset + length],
            }
            table_metadata.update(metadata)

            tables.append(
                Document(
                    text=text_content[offset : offset + length],
                    metadata=table_metadata,
                )
            )
            removed_spans += table_desc["spans"]
        # save the text content into markdown format
        if self.cache_dir is not None:
            with open(
                Path(self.cache_dir) / f"{file_name.stem}.md", "w", encoding="utf-8"
            ) as f:
                f.write(text_content)

        removed_spans = sorted(removed_spans, key=lambda x: x["offset"], reverse=True)
        for span in removed_spans:
            text_content = (
                text_content[: span["offset"]]
                + text_content[span["offset"] + span["length"] :]
            )

        return [Document(content=text_content, metadata=metadata)] + figures + tables

load_data

load_data(file_path, extra_info=None, **kwargs)

Extract the input file, allowing multi-modal extraction

Source code in libs/kotaemon/kotaemon/loaders/azureai_document_intelligence_loader.py
def load_data(
    self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
) -> list[Document]:
    """Extract the input file, allowing multi-modal extraction"""
    metadata = extra_info or {}
    file_name = Path(file_path)
    with open(file_path, "rb") as fi:
        poller = self.client_.begin_analyze_document(
            self.model,
            analyze_request=fi,
            content_type="application/octet-stream",
            output_content_format=self.output_content_format,
        )
        result = poller.result()

    # the total text content of the document in `output_content_format` format
    text_content = result.content
    removed_spans: list[dict] = []

    # extract the figures
    figures = []
    for figure_desc in result.get("figures", []):
        if not self.vlm_endpoint:
            continue
        if file_path.suffix.lower() not in self.figure_friendly_filetypes:
            continue

        # read & crop the image
        page_number = figure_desc["boundingRegions"][0]["pageNumber"]
        page_width = result.pages[page_number - 1]["width"]
        page_height = result.pages[page_number - 1]["height"]
        polygon = figure_desc["boundingRegions"][0]["polygon"]
        xs = [polygon[i] for i in range(0, len(polygon), 2)]
        ys = [polygon[i] for i in range(1, len(polygon), 2)]
        bbox = [
            min(xs) / page_width,
            min(ys) / page_height,
            max(xs) / page_width,
            max(ys) / page_height,
        ]
        img = crop_image(file_path, bbox, page_number - 1)

        # convert the image into base64
        img_bytes = BytesIO()
        img.save(img_bytes, format="PNG")
        img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
        img_base64 = f"data:image/png;base64,{img_base64}"

        # caption the image
        caption = generate_single_figure_caption(
            figure=img_base64, vlm_endpoint=self.vlm_endpoint
        )

        # store the image into document
        figure_metadata = {
            "image_origin": img_base64,
            "type": "image",
            "page_label": page_number,
        }
        figure_metadata.update(metadata)

        figures.append(
            Document(
                text=caption,
                metadata=figure_metadata,
            )
        )
        removed_spans += figure_desc["spans"]

    # extract the tables
    tables = []
    for table_desc in result.get("tables", []):
        if not table_desc["spans"]:
            continue

        # convert the tables into markdown format
        boundingRegions = table_desc["boundingRegions"]
        if boundingRegions:
            page_number = boundingRegions[0]["pageNumber"]
        else:
            page_number = 1

        # store the tables into document
        offset = table_desc["spans"][0]["offset"]
        length = table_desc["spans"][0]["length"]
        table_metadata = {
            "type": "table",
            "page_label": page_number,
            "table_origin": text_content[offset : offset + length],
        }
        table_metadata.update(metadata)

        tables.append(
            Document(
                text=text_content[offset : offset + length],
                metadata=table_metadata,
            )
        )
        removed_spans += table_desc["spans"]
    # save the text content into markdown format
    if self.cache_dir is not None:
        with open(
            Path(self.cache_dir) / f"{file_name.stem}.md", "w", encoding="utf-8"
        ) as f:
            f.write(text_content)

    removed_spans = sorted(removed_spans, key=lambda x: x["offset"], reverse=True)
    for span in removed_spans:
        text_content = (
            text_content[: span["offset"]]
            + text_content[span["offset"] + span["length"] :]
        )

    return [Document(content=text_content, metadata=metadata)] + figures + tables

AutoReader

Bases: BaseReader

General auto reader for a variety of files. (based on llama-hub)

Source code in libs/kotaemon/kotaemon/loaders/base.py
class AutoReader(BaseReader):
    """General auto reader for a variety of files. (based on llama-hub)"""

    def __init__(self, reader_type: Union[str, Type["LIBaseReader"]]) -> None:
        """Init reader using string identifier or class name from llama-hub"""

        if isinstance(reader_type, str):
            from llama_index.core import download_loader

            self._reader = download_loader(reader_type)()
        else:
            self._reader = reader_type()
        super().__init__()

    def load_data(self, file: Union[Path, str], **kwargs: Any) -> List[Document]:
        documents = self._reader.load_data(file=file, **kwargs)

        # convert Document to new base class from kotaemon
        converted_documents = [Document.from_dict(doc.to_dict()) for doc in documents]
        return converted_documents

    def run(self, file: Union[Path, str], **kwargs: Any) -> List[Document]:
        return self.load_data(file=file, **kwargs)

BaseReader

Bases: BaseComponent

The base class for all readers

Source code in libs/kotaemon/kotaemon/loaders/base.py
class BaseReader(BaseComponent):
    """The base class for all readers"""

    ...

DirectoryReader

Bases: LIReaderMixin, BaseReader

Wrap around llama-index SimpleDirectoryReader

Parameters:

Name Type Description Default
input_dir str

Path to the directory.

required
input_files List

List of file paths to read (Optional; overrides input_dir, exclude)

required
exclude List

glob of python file paths to exclude (Optional)

required
exclude_hidden bool

Whether to exclude hidden files (dotfiles).

required
encoding str

Encoding of the files. Default is utf-8.

required
errors str

how encoding and decoding errors are to be handled, see https://docs.python.org/3/library/functions.html#open

required
recursive bool

Whether to recursively search in subdirectories. False by default.

required
filename_as_id bool

Whether to use the filename as the document id. False by default.

required
required_exts Optional[List[str]]

List of required extensions. Default is None.

required
file_extractor Optional[Dict[str, BaseReader]]

A mapping of file extension to a BaseReader class that specifies how to convert that file to text. If not specified, use default from DEFAULT_FILE_READER_CLS.

required
num_files_limit Optional[int]

Maximum number of files to read. Default is None.

required
file_metadata Optional[Callable[str, Dict]]

A function that takes in a filename and returns a Dict of metadata for the Document. Default is None.

required
Source code in libs/kotaemon/kotaemon/loaders/composite_loader.py
class DirectoryReader(LIReaderMixin, BaseReader):
    """Wrap around llama-index SimpleDirectoryReader

    Args:
        input_dir (str): Path to the directory.
        input_files (List): List of file paths to read
            (Optional; overrides input_dir, exclude)
        exclude (List): glob of python file paths to exclude (Optional)
        exclude_hidden (bool): Whether to exclude hidden files (dotfiles).
        encoding (str): Encoding of the files.
            Default is utf-8.
        errors (str): how encoding and decoding errors are to be handled,
              see https://docs.python.org/3/library/functions.html#open
        recursive (bool): Whether to recursively search in subdirectories.
            False by default.
        filename_as_id (bool): Whether to use the filename as the document id.
            False by default.
        required_exts (Optional[List[str]]): List of required extensions.
            Default is None.
        file_extractor (Optional[Dict[str, BaseReader]]): A mapping of file
            extension to a BaseReader class that specifies how to convert that file
            to text. If not specified, use default from DEFAULT_FILE_READER_CLS.
        num_files_limit (Optional[int]): Maximum number of files to read.
            Default is None.
        file_metadata (Optional[Callable[str, Dict]]): A function that takes
            in a filename and returns a Dict of metadata for the Document.
            Default is None.
    """

    input_dir: Optional[str] = None
    input_files: Optional[List] = None
    exclude: Optional[List] = None
    exclude_hidden: bool = True
    errors: str = "ignore"
    recursive: bool = False
    encoding: str = "utf-8"
    filename_as_id: bool = False
    required_exts: Optional[list[str]] = None
    file_extractor: Optional[dict[str, "LIBaseReader"]] = None
    num_files_limit: Optional[int] = None
    file_metadata: Optional[Callable[[str], dict]] = None

    def _get_wrapped_class(self) -> Type["LIBaseReader"]:
        from llama_index.core import SimpleDirectoryReader

        return SimpleDirectoryReader

DoclingReader

Bases: BaseReader

Using Docling to extract document structure and content

Source code in libs/kotaemon/kotaemon/loaders/docling_loader.py
class DoclingReader(BaseReader):
    """Using Docling to extract document structure and content"""

    _dependencies = ["docling"]

    vlm_endpoint: str = Param(
        help=(
            "Default VLM endpoint for figure captioning. "
            "If not provided, will not caption the figures"
        )
    )

    max_figure_to_caption: int = Param(
        100,
        help=(
            "The maximum number of figures to caption. "
            "The rest will be indexed without captions."
        ),
    )

    figure_friendly_filetypes: list[str] = Param(
        [".pdf", ".jpeg", ".jpg", ".png", ".bmp", ".tiff", ".heif", ".tif"],
        help=(
            "File types that we can reliably open and extract figures. "
            "For files like .docx or .html, the visual layout may be different "
            "when viewed from different tools, hence we cannot use Azure DI location "
            "to extract figures."
        ),
    )

    @Param.auto(cache=True)
    def converter_(self):
        try:
            from docling.document_converter import DocumentConverter
        except ImportError:
            raise ImportError("Please install docling: 'pip install docling'")

        return DocumentConverter()

    def run(
        self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        return self.load_data(file_path, extra_info, **kwargs)

    def load_data(
        self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        """Extract the input file, allowing multi-modal extraction"""

        metadata = extra_info or {}

        result = self.converter_.convert(file_path)
        result_dict = result.document.export_to_dict()

        file_path = Path(file_path)
        file_name = file_path.name

        # extract the figures
        figures = []
        gen_caption_count = 0
        for figure_obj in result_dict.get("pictures", []):
            if not self.vlm_endpoint:
                continue
            if file_path.suffix.lower() not in self.figure_friendly_filetypes:
                continue

            # retrieve extractive captions provided by docling
            caption_refs = [caption["$ref"] for caption in figure_obj["captions"]]
            extractive_captions = []
            for caption_ref in caption_refs:
                text_id = caption_ref.split("/")[-1]
                try:
                    caption_text = result_dict["texts"][int(text_id)]["text"]
                    extractive_captions.append(caption_text)
                except (ValueError, TypeError, IndexError) as e:
                    print(e)
                    continue

            # read & crop image
            page_number = figure_obj["prov"][0]["page_no"]

            try:
                page_number_text = str(page_number)
                page_width = result_dict["pages"][page_number_text]["size"]["width"]
                page_height = result_dict["pages"][page_number_text]["size"]["height"]

                bbox_obj = figure_obj["prov"][0]["bbox"]
                bbox: list[float] = [
                    bbox_obj["l"],
                    bbox_obj["t"],
                    bbox_obj["r"],
                    bbox_obj["b"],
                ]
                if bbox_obj["coord_origin"] == "BOTTOMLEFT":
                    bbox = self._convert_bbox_bl_tl(bbox, page_width, page_height)

                img = crop_image(file_path, bbox, page_number - 1)
            except KeyError as e:
                print(e, list(result_dict["pages"].keys()))
                continue

            # convert img to base64
            img_bytes = BytesIO()
            img.save(img_bytes, format="PNG")
            img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
            img_base64 = f"data:image/png;base64,{img_base64}"

            # generate the generative caption
            if gen_caption_count >= self.max_figure_to_caption:
                gen_caption = ""
            else:
                gen_caption_count += 1
                gen_caption = generate_single_figure_caption(
                    img_base64, self.vlm_endpoint
                )

            # join the extractive and generative captions
            caption = "\n".join(extractive_captions + [gen_caption])

            # store the image into document
            figure_metadata = {
                "image_origin": img_base64,
                "type": "image",
                "page_label": page_number,
                "file_name": file_name,
                "file_path": file_path,
            }
            figure_metadata.update(metadata)

            figures.append(
                Document(
                    text=caption,
                    metadata=figure_metadata,
                )
            )

        # extract the tables
        tables = []
        for table_obj in result_dict.get("tables", []):
            # convert the tables into markdown format
            markdown_table = self._parse_table(table_obj)
            caption_refs = [caption["$ref"] for caption in table_obj["captions"]]

            extractive_captions = []
            for caption_ref in caption_refs:
                text_id = caption_ref.split("/")[-1]
                try:
                    caption_text = result_dict["texts"][int(text_id)]["text"]
                    extractive_captions.append(caption_text)
                except (ValueError, TypeError, IndexError) as e:
                    print(e)
                    continue
            # join the extractive and generative captions
            caption = "\n".join(extractive_captions)
            markdown_table = f"{caption}\n{markdown_table}"

            page_number = table_obj["prov"][0].get("page_no", 1)

            table_metadata = {
                "type": "table",
                "page_label": page_number,
                "table_origin": markdown_table,
                "file_name": file_name,
                "file_path": file_path,
            }
            table_metadata.update(metadata)

            tables.append(
                Document(
                    text=markdown_table,
                    metadata=table_metadata,
                )
            )

        # join plain text elements
        texts = []
        page_number_to_text = defaultdict(list)

        for text_obj in result_dict["texts"]:
            page_number = text_obj["prov"][0].get("page_no", 1)
            page_number_to_text[page_number].append(text_obj["text"])

        for page_number, txts in page_number_to_text.items():
            texts.append(
                Document(
                    text="\n".join(txts),
                    metadata={
                        "page_label": page_number,
                        "file_name": file_name,
                        "file_path": file_path,
                        **metadata,
                    },
                )
            )

        return texts + tables + figures

    def _convert_bbox_bl_tl(
        self, bbox: list[float], page_width: int, page_height: int
    ) -> list[float]:
        """Convert bbox from bottom-left to top-left"""
        x0, y0, x1, y1 = bbox
        return [
            x0 / page_width,
            (page_height - y1) / page_height,
            x1 / page_width,
            (page_height - y0) / page_height,
        ]

    def _parse_table(self, table_obj: dict) -> str:
        """Convert docling table object to markdown table"""
        table_as_list: List[List[str]] = []
        grid = table_obj["data"]["grid"]
        for row in grid:
            table_as_list.append([])
            for cell in row:
                table_as_list[-1].append(cell["text"])

        return make_markdown_table(table_as_list)

load_data

load_data(file_path, extra_info=None, **kwargs)

Extract the input file, allowing multi-modal extraction

Source code in libs/kotaemon/kotaemon/loaders/docling_loader.py
def load_data(
    self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
) -> List[Document]:
    """Extract the input file, allowing multi-modal extraction"""

    metadata = extra_info or {}

    result = self.converter_.convert(file_path)
    result_dict = result.document.export_to_dict()

    file_path = Path(file_path)
    file_name = file_path.name

    # extract the figures
    figures = []
    gen_caption_count = 0
    for figure_obj in result_dict.get("pictures", []):
        if not self.vlm_endpoint:
            continue
        if file_path.suffix.lower() not in self.figure_friendly_filetypes:
            continue

        # retrieve extractive captions provided by docling
        caption_refs = [caption["$ref"] for caption in figure_obj["captions"]]
        extractive_captions = []
        for caption_ref in caption_refs:
            text_id = caption_ref.split("/")[-1]
            try:
                caption_text = result_dict["texts"][int(text_id)]["text"]
                extractive_captions.append(caption_text)
            except (ValueError, TypeError, IndexError) as e:
                print(e)
                continue

        # read & crop image
        page_number = figure_obj["prov"][0]["page_no"]

        try:
            page_number_text = str(page_number)
            page_width = result_dict["pages"][page_number_text]["size"]["width"]
            page_height = result_dict["pages"][page_number_text]["size"]["height"]

            bbox_obj = figure_obj["prov"][0]["bbox"]
            bbox: list[float] = [
                bbox_obj["l"],
                bbox_obj["t"],
                bbox_obj["r"],
                bbox_obj["b"],
            ]
            if bbox_obj["coord_origin"] == "BOTTOMLEFT":
                bbox = self._convert_bbox_bl_tl(bbox, page_width, page_height)

            img = crop_image(file_path, bbox, page_number - 1)
        except KeyError as e:
            print(e, list(result_dict["pages"].keys()))
            continue

        # convert img to base64
        img_bytes = BytesIO()
        img.save(img_bytes, format="PNG")
        img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
        img_base64 = f"data:image/png;base64,{img_base64}"

        # generate the generative caption
        if gen_caption_count >= self.max_figure_to_caption:
            gen_caption = ""
        else:
            gen_caption_count += 1
            gen_caption = generate_single_figure_caption(
                img_base64, self.vlm_endpoint
            )

        # join the extractive and generative captions
        caption = "\n".join(extractive_captions + [gen_caption])

        # store the image into document
        figure_metadata = {
            "image_origin": img_base64,
            "type": "image",
            "page_label": page_number,
            "file_name": file_name,
            "file_path": file_path,
        }
        figure_metadata.update(metadata)

        figures.append(
            Document(
                text=caption,
                metadata=figure_metadata,
            )
        )

    # extract the tables
    tables = []
    for table_obj in result_dict.get("tables", []):
        # convert the tables into markdown format
        markdown_table = self._parse_table(table_obj)
        caption_refs = [caption["$ref"] for caption in table_obj["captions"]]

        extractive_captions = []
        for caption_ref in caption_refs:
            text_id = caption_ref.split("/")[-1]
            try:
                caption_text = result_dict["texts"][int(text_id)]["text"]
                extractive_captions.append(caption_text)
            except (ValueError, TypeError, IndexError) as e:
                print(e)
                continue
        # join the extractive and generative captions
        caption = "\n".join(extractive_captions)
        markdown_table = f"{caption}\n{markdown_table}"

        page_number = table_obj["prov"][0].get("page_no", 1)

        table_metadata = {
            "type": "table",
            "page_label": page_number,
            "table_origin": markdown_table,
            "file_name": file_name,
            "file_path": file_path,
        }
        table_metadata.update(metadata)

        tables.append(
            Document(
                text=markdown_table,
                metadata=table_metadata,
            )
        )

    # join plain text elements
    texts = []
    page_number_to_text = defaultdict(list)

    for text_obj in result_dict["texts"]:
        page_number = text_obj["prov"][0].get("page_no", 1)
        page_number_to_text[page_number].append(text_obj["text"])

    for page_number, txts in page_number_to_text.items():
        texts.append(
            Document(
                text="\n".join(txts),
                metadata={
                    "page_label": page_number,
                    "file_name": file_name,
                    "file_path": file_path,
                    **metadata,
                },
            )
        )

    return texts + tables + figures

DocxReader

Bases: BaseReader

Read Docx files that respect table, using python-docx library

Reader behavior
  • All paragraphs are extracted as a Document
  • Each table is extracted as a Document, rendered as a CSV string
  • The output is a list of Documents, concatenating the above (tables + paragraphs)
Source code in libs/kotaemon/kotaemon/loaders/docx_loader.py
class DocxReader(BaseReader):
    """Read Docx files that respect table, using python-docx library

    Reader behavior:
        - All paragraphs are extracted as a Document
        - Each table is extracted as a Document, rendered as a CSV string
        - The output is a list of Documents, concatenating the above
        (tables + paragraphs)
    """

    def __init__(self, *args, **kwargs):
        try:
            import docx  # noqa
        except ImportError:
            raise ImportError(
                "docx is not installed. "
                "Please install it using `pip install python-docx`"
            )

    def _load_single_table(self, table) -> List[List[str]]:
        """Extract content from tables. Return a list of columns: list[str]
        Some merged cells will share duplicated content.
        """
        n_row = len(table.rows)
        n_col = len(table.columns)

        arrays = [["" for _ in range(n_row)] for _ in range(n_col)]

        for i, row in enumerate(table.rows):
            for j, cell in enumerate(row.cells):
                arrays[j][i] = cell.text

        return arrays

    def load_data(
        self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        """Load data using Docx reader

        Args:
            file_path (Path): Path to .docx file

        Returns:
            List[Document]: list of documents extracted from the HTML file
        """
        import docx

        file_path = Path(file_path).resolve()

        doc = docx.Document(str(file_path))
        all_text = "\n".join(
            [unicodedata.normalize("NFKC", p.text) for p in doc.paragraphs]
        )
        pages = [all_text]  # 1 page only

        tables = []
        for t in doc.tables:
            # return list of columns: list of string
            arrays = self._load_single_table(t)

            tables.append(pd.DataFrame({a[0]: a[1:] for a in arrays}))

        extra_info = extra_info or {}

        # create output Document with metadata from table
        documents = [
            Document(
                text=table.to_csv(
                    index=False
                ).strip(),  # strip_special_chars_markdown()
                metadata={
                    "table_origin": table.to_csv(index=False),
                    "type": "table",
                    **extra_info,
                },
                metadata_template="",
                metadata_seperator="",
            )
            for table in tables  # page_id
        ]

        # create Document from non-table text
        documents.extend(
            [
                Document(
                    text=non_table_text.strip(),
                    metadata={"page_label": 1, **extra_info},
                )
                for _, non_table_text in enumerate(pages)
            ]
        )

        return documents

load_data

load_data(file_path, extra_info=None, **kwargs)

Load data using Docx reader

Parameters:

Name Type Description Default
file_path Path

Path to .docx file

required

Returns:

Type Description
List[Document]

List[Document]: list of documents extracted from the HTML file

Source code in libs/kotaemon/kotaemon/loaders/docx_loader.py
def load_data(
    self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
) -> List[Document]:
    """Load data using Docx reader

    Args:
        file_path (Path): Path to .docx file

    Returns:
        List[Document]: list of documents extracted from the HTML file
    """
    import docx

    file_path = Path(file_path).resolve()

    doc = docx.Document(str(file_path))
    all_text = "\n".join(
        [unicodedata.normalize("NFKC", p.text) for p in doc.paragraphs]
    )
    pages = [all_text]  # 1 page only

    tables = []
    for t in doc.tables:
        # return list of columns: list of string
        arrays = self._load_single_table(t)

        tables.append(pd.DataFrame({a[0]: a[1:] for a in arrays}))

    extra_info = extra_info or {}

    # create output Document with metadata from table
    documents = [
        Document(
            text=table.to_csv(
                index=False
            ).strip(),  # strip_special_chars_markdown()
            metadata={
                "table_origin": table.to_csv(index=False),
                "type": "table",
                **extra_info,
            },
            metadata_template="",
            metadata_seperator="",
        )
        for table in tables  # page_id
    ]

    # create Document from non-table text
    documents.extend(
        [
            Document(
                text=non_table_text.strip(),
                metadata={"page_label": 1, **extra_info},
            )
            for _, non_table_text in enumerate(pages)
        ]
    )

    return documents

ExcelReader

Bases: BaseReader

Spreadsheet exporter respecting multiple worksheets

Parses CSVs using the separator detection from Pandas read_csv function. If special parameters are required, use the pandas_config dict.

Args:

1
2
3
4
pandas_config (dict): Options for the `pandas.read_excel` function call.
    Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html
    for more information. Set to empty dict by default,
    this means defaults will be used.
Source code in libs/kotaemon/kotaemon/loaders/excel_loader.py
class ExcelReader(BaseReader):
    r"""Spreadsheet exporter respecting multiple worksheets

    Parses CSVs using the separator detection from Pandas `read_csv` function.
    If special parameters are required, use the `pandas_config` dict.

    Args:

        pandas_config (dict): Options for the `pandas.read_excel` function call.
            Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html
            for more information. Set to empty dict by default,
            this means defaults will be used.

    """

    def __init__(
        self,
        *args: Any,
        pandas_config: Optional[dict] = None,
        row_joiner: str = "\n",
        col_joiner: str = " ",
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._pandas_config = pandas_config or {}
        self._row_joiner = row_joiner if row_joiner else "\n"
        self._col_joiner = col_joiner if col_joiner else " "

    def load_data(
        self,
        file: Path,
        include_sheetname: bool = True,
        sheet_name: Optional[Union[str, int, list]] = None,
        extra_info: Optional[dict] = None,
        **kwargs,
    ) -> List[Document]:
        """Parse file and extract values from a specific column.

        Args:
            file (Path): The path to the Excel file to read.
            include_sheetname (bool): Whether to include the sheet name in the output.
            sheet_name (Union[str, int, None]): The specific sheet to read from,
                default is None which reads all sheets.

        Returns:
            List[Document]: A list of`Document objects containing the
                values from the specified column in the Excel file.
        """

        try:
            import pandas as pd
        except ImportError:
            raise ImportError(
                "install pandas using `pip3 install pandas` to use this loader"
            )

        if sheet_name is not None:
            sheet_name = (
                [sheet_name] if not isinstance(sheet_name, list) else sheet_name
            )

        # clean up input
        file = Path(file)
        extra_info = extra_info or {}

        dfs = pd.read_excel(file, sheet_name=sheet_name, **self._pandas_config)
        sheet_names = dfs.keys()
        output = []

        for idx, key in enumerate(sheet_names):
            dfs[key] = dfs[key].dropna(axis=0, how="all")
            dfs[key] = dfs[key].dropna(axis=0, how="all")
            dfs[key] = dfs[key].astype("object")
            dfs[key].fillna("", inplace=True)

            rows = dfs[key].values.astype(str).tolist()
            content = self._row_joiner.join(
                self._col_joiner.join(row).strip() for row in rows
            ).strip()
            if include_sheetname:
                content = f"(Sheet {key} of file {file.name})\n{content}"
            metadata = {"page_label": idx + 1, "sheet_name": key, **extra_info}
            output.append(Document(text=content, metadata=metadata))

        return output

load_data

1
2
3
4
5
6
7
load_data(
    file,
    include_sheetname=True,
    sheet_name=None,
    extra_info=None,
    **kwargs
)

Parse file and extract values from a specific column.

Parameters:

Name Type Description Default
file Path

The path to the Excel file to read.

required
include_sheetname bool

Whether to include the sheet name in the output.

True
sheet_name Union[str, int, None]

The specific sheet to read from, default is None which reads all sheets.

None

Returns:

Type Description
List[Document]

List[Document]: A list of`Document objects containing the values from the specified column in the Excel file.

Source code in libs/kotaemon/kotaemon/loaders/excel_loader.py
def load_data(
    self,
    file: Path,
    include_sheetname: bool = True,
    sheet_name: Optional[Union[str, int, list]] = None,
    extra_info: Optional[dict] = None,
    **kwargs,
) -> List[Document]:
    """Parse file and extract values from a specific column.

    Args:
        file (Path): The path to the Excel file to read.
        include_sheetname (bool): Whether to include the sheet name in the output.
        sheet_name (Union[str, int, None]): The specific sheet to read from,
            default is None which reads all sheets.

    Returns:
        List[Document]: A list of`Document objects containing the
            values from the specified column in the Excel file.
    """

    try:
        import pandas as pd
    except ImportError:
        raise ImportError(
            "install pandas using `pip3 install pandas` to use this loader"
        )

    if sheet_name is not None:
        sheet_name = (
            [sheet_name] if not isinstance(sheet_name, list) else sheet_name
        )

    # clean up input
    file = Path(file)
    extra_info = extra_info or {}

    dfs = pd.read_excel(file, sheet_name=sheet_name, **self._pandas_config)
    sheet_names = dfs.keys()
    output = []

    for idx, key in enumerate(sheet_names):
        dfs[key] = dfs[key].dropna(axis=0, how="all")
        dfs[key] = dfs[key].dropna(axis=0, how="all")
        dfs[key] = dfs[key].astype("object")
        dfs[key].fillna("", inplace=True)

        rows = dfs[key].values.astype(str).tolist()
        content = self._row_joiner.join(
            self._col_joiner.join(row).strip() for row in rows
        ).strip()
        if include_sheetname:
            content = f"(Sheet {key} of file {file.name})\n{content}"
        metadata = {"page_label": idx + 1, "sheet_name": key, **extra_info}
        output.append(Document(text=content, metadata=metadata))

    return output

PandasExcelReader

Bases: BaseReader

Pandas-based CSV parser.

Parses CSVs using the separator detection from Pandas read_csv function. If special parameters are required, use the pandas_config dict.

Args:

1
2
3
4
pandas_config (dict): Options for the `pandas.read_excel` function call.
    Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html
    for more information. Set to empty dict by default,
    this means defaults will be used.
Source code in libs/kotaemon/kotaemon/loaders/excel_loader.py
class PandasExcelReader(BaseReader):
    r"""Pandas-based CSV parser.

    Parses CSVs using the separator detection from Pandas `read_csv` function.
    If special parameters are required, use the `pandas_config` dict.

    Args:

        pandas_config (dict): Options for the `pandas.read_excel` function call.
            Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html
            for more information. Set to empty dict by default,
            this means defaults will be used.

    """

    def __init__(
        self,
        *args: Any,
        pandas_config: Optional[dict] = None,
        row_joiner: str = "\n",
        col_joiner: str = " ",
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._pandas_config = pandas_config or {}
        self._row_joiner = row_joiner if row_joiner else "\n"
        self._col_joiner = col_joiner if col_joiner else " "

    def load_data(
        self,
        file: Path,
        include_sheetname: bool = False,
        sheet_name: Optional[Union[str, int, list]] = None,
        extra_info: Optional[dict] = None,
        **kwargs,
    ) -> List[Document]:
        """Parse file and extract values from a specific column.

        Args:
            file (Path): The path to the Excel file to read.
            include_sheetname (bool): Whether to include the sheet name in the output.
            sheet_name (Union[str, int, None]): The specific sheet to read from,
                default is None which reads all sheets.

        Returns:
            List[Document]: A list of`Document objects containing the
                values from the specified column in the Excel file.
        """
        import itertools

        try:
            import pandas as pd
        except ImportError:
            raise ImportError(
                "install pandas using `pip3 install pandas` to use this loader"
            )

        if sheet_name is not None:
            sheet_name = (
                [sheet_name] if not isinstance(sheet_name, list) else sheet_name
            )

        dfs = pd.read_excel(file, sheet_name=sheet_name, **self._pandas_config)
        sheet_names = dfs.keys()
        df_sheets = []

        for key in sheet_names:
            sheet = []
            if include_sheetname:
                sheet.append([key])
            dfs[key] = dfs[key].dropna(axis=0, how="all")
            dfs[key] = dfs[key].dropna(axis=0, how="all")
            dfs[key].fillna("", inplace=True)
            sheet.extend(dfs[key].values.astype(str).tolist())
            df_sheets.append(sheet)

        text_list = list(
            itertools.chain.from_iterable(df_sheets)
        )  # flatten list of lists

        output = [
            Document(
                text=self._row_joiner.join(
                    self._col_joiner.join(sublist) for sublist in text_list
                ),
                metadata=extra_info or {},
            )
        ]

        return output

load_data

1
2
3
4
5
6
7
load_data(
    file,
    include_sheetname=False,
    sheet_name=None,
    extra_info=None,
    **kwargs
)

Parse file and extract values from a specific column.

Parameters:

Name Type Description Default
file Path

The path to the Excel file to read.

required
include_sheetname bool

Whether to include the sheet name in the output.

False
sheet_name Union[str, int, None]

The specific sheet to read from, default is None which reads all sheets.

None

Returns:

Type Description
List[Document]

List[Document]: A list of`Document objects containing the values from the specified column in the Excel file.

Source code in libs/kotaemon/kotaemon/loaders/excel_loader.py
def load_data(
    self,
    file: Path,
    include_sheetname: bool = False,
    sheet_name: Optional[Union[str, int, list]] = None,
    extra_info: Optional[dict] = None,
    **kwargs,
) -> List[Document]:
    """Parse file and extract values from a specific column.

    Args:
        file (Path): The path to the Excel file to read.
        include_sheetname (bool): Whether to include the sheet name in the output.
        sheet_name (Union[str, int, None]): The specific sheet to read from,
            default is None which reads all sheets.

    Returns:
        List[Document]: A list of`Document objects containing the
            values from the specified column in the Excel file.
    """
    import itertools

    try:
        import pandas as pd
    except ImportError:
        raise ImportError(
            "install pandas using `pip3 install pandas` to use this loader"
        )

    if sheet_name is not None:
        sheet_name = (
            [sheet_name] if not isinstance(sheet_name, list) else sheet_name
        )

    dfs = pd.read_excel(file, sheet_name=sheet_name, **self._pandas_config)
    sheet_names = dfs.keys()
    df_sheets = []

    for key in sheet_names:
        sheet = []
        if include_sheetname:
            sheet.append([key])
        dfs[key] = dfs[key].dropna(axis=0, how="all")
        dfs[key] = dfs[key].dropna(axis=0, how="all")
        dfs[key].fillna("", inplace=True)
        sheet.extend(dfs[key].values.astype(str).tolist())
        df_sheets.append(sheet)

    text_list = list(
        itertools.chain.from_iterable(df_sheets)
    )  # flatten list of lists

    output = [
        Document(
            text=self._row_joiner.join(
                self._col_joiner.join(sublist) for sublist in text_list
            ),
            metadata=extra_info or {},
        )
    ]

    return output

HtmlReader

Bases: BaseReader

Reader HTML usimg html2text

Reader behavior
  • HTML is read with html2text.
  • All of the texts will be split by page_break_pattern
  • Each page is extracted as a Document
  • The output is a list of Documents

Parameters:

Name Type Description Default
page_break_pattern str

Pattern to split the HTML into pages

None
Source code in libs/kotaemon/kotaemon/loaders/html_loader.py
class HtmlReader(BaseReader):
    """Reader HTML usimg html2text

    Reader behavior:
        - HTML is read with html2text.
        - All of the texts will be split by `page_break_pattern`
        - Each page is extracted as a Document
        - The output is a list of Documents

    Args:
        page_break_pattern (str): Pattern to split the HTML into pages
    """

    def __init__(self, page_break_pattern: Optional[str] = None, *args, **kwargs):
        try:
            import html2text  # noqa
        except ImportError:
            raise ImportError(
                "html2text is not installed. "
                "Please install it using `pip install html2text`"
            )

        self._page_break_pattern: Optional[str] = page_break_pattern
        super().__init__()

    def load_data(
        self, file_path: Path | str, extra_info: Optional[dict] = None, **kwargs
    ) -> list[Document]:
        """Load data using Html reader

        Args:
            file_path: path to HTML file
            extra_info: extra information passed to this reader during extracting data

        Returns:
            list[Document]: list of documents extracted from the HTML file
        """
        import html2text

        file_path = Path(file_path).resolve()

        with file_path.open("r") as f:
            html_text = "".join([line[:-1] for line in f.readlines()])

        # read HTML
        all_text = html2text.html2text(html_text)
        pages = (
            all_text.split(self._page_break_pattern)
            if self._page_break_pattern
            else [all_text]
        )

        extra_info = extra_info or {}

        # create Document from non-table text
        documents = [
            Document(
                text=page.strip(),
                metadata={"page_label": page_id + 1, **extra_info},
            )
            for page_id, page in enumerate(pages)
        ]

        return documents

load_data

load_data(file_path, extra_info=None, **kwargs)

Load data using Html reader

Parameters:

Name Type Description Default
file_path Path | str

path to HTML file

required
extra_info Optional[dict]

extra information passed to this reader during extracting data

None

Returns:

Type Description
list[Document]

list[Document]: list of documents extracted from the HTML file

Source code in libs/kotaemon/kotaemon/loaders/html_loader.py
def load_data(
    self, file_path: Path | str, extra_info: Optional[dict] = None, **kwargs
) -> list[Document]:
    """Load data using Html reader

    Args:
        file_path: path to HTML file
        extra_info: extra information passed to this reader during extracting data

    Returns:
        list[Document]: list of documents extracted from the HTML file
    """
    import html2text

    file_path = Path(file_path).resolve()

    with file_path.open("r") as f:
        html_text = "".join([line[:-1] for line in f.readlines()])

    # read HTML
    all_text = html2text.html2text(html_text)
    pages = (
        all_text.split(self._page_break_pattern)
        if self._page_break_pattern
        else [all_text]
    )

    extra_info = extra_info or {}

    # create Document from non-table text
    documents = [
        Document(
            text=page.strip(),
            metadata={"page_label": page_id + 1, **extra_info},
        )
        for page_id, page in enumerate(pages)
    ]

    return documents

MhtmlReader

Bases: BaseReader

Parse MHTML files with BeautifulSoup.

Source code in libs/kotaemon/kotaemon/loaders/html_loader.py
class MhtmlReader(BaseReader):
    """Parse `MHTML` files with `BeautifulSoup`."""

    def __init__(
        self,
        cache_dir: Optional[str] = getattr(
            flowsettings, "KH_MARKDOWN_OUTPUT_DIR", None
        ),
        open_encoding: Optional[str] = None,
        bs_kwargs: Optional[dict] = None,
        get_text_separator: str = "",
    ) -> None:
        """initialize with path, and optionally, file encoding to use, and any kwargs
        to pass to the BeautifulSoup object.

        Args:
            cache_dir: Path for markdwon format.
            file_path: Path to file to load.
            open_encoding: The encoding to use when opening the file.
            bs_kwargs: Any kwargs to pass to the BeautifulSoup object.
            get_text_separator: The separator to use when getting the text
                from the soup.
        """
        try:
            import bs4  # noqa:F401
        except ImportError:
            raise ImportError(
                "beautifulsoup4 package not found, please install it with "
                "`pip install beautifulsoup4`"
            )

        self.cache_dir = cache_dir
        self.open_encoding = open_encoding
        if bs_kwargs is None:
            bs_kwargs = {"features": "lxml"}
        self.bs_kwargs = bs_kwargs
        self.get_text_separator = get_text_separator

    def load_data(
        self, file_path: Path | str, extra_info: Optional[dict] = None, **kwargs
    ) -> list[Document]:
        """Load MHTML document into document objects."""

        from bs4 import BeautifulSoup

        extra_info = extra_info or {}
        metadata: dict = extra_info
        page = []
        file_name = Path(file_path)
        with open(file_path, "r", encoding=self.open_encoding) as f:
            message = email.message_from_string(f.read())
            parts = message.get_payload()

            if not isinstance(parts, list):
                parts = [message]

            for part in parts:
                if part.get_content_type() == "text/html":
                    html = part.get_payload(decode=True).decode()

                    soup = BeautifulSoup(html, **self.bs_kwargs)
                    text = soup.get_text(self.get_text_separator)

                    if soup.title:
                        title = str(soup.title.string)
                    else:
                        title = ""

                    metadata = {
                        "source": str(file_path),
                        "title": title,
                        **extra_info,
                    }
                    lines = [line for line in text.split("\n") if line.strip()]
                    text = "\n\n".join(lines)
                    if text:
                        page.append(text)
        # save the page into markdown format
        print(self.cache_dir)
        if self.cache_dir is not None:
            print(Path(self.cache_dir) / f"{file_name.stem}.md")
            with open(Path(self.cache_dir) / f"{file_name.stem}.md", "w") as f:
                f.write(page[0])

        return [Document(text="\n\n".join(page), metadata=metadata)]

load_data

load_data(file_path, extra_info=None, **kwargs)

Load MHTML document into document objects.

Source code in libs/kotaemon/kotaemon/loaders/html_loader.py
def load_data(
    self, file_path: Path | str, extra_info: Optional[dict] = None, **kwargs
) -> list[Document]:
    """Load MHTML document into document objects."""

    from bs4 import BeautifulSoup

    extra_info = extra_info or {}
    metadata: dict = extra_info
    page = []
    file_name = Path(file_path)
    with open(file_path, "r", encoding=self.open_encoding) as f:
        message = email.message_from_string(f.read())
        parts = message.get_payload()

        if not isinstance(parts, list):
            parts = [message]

        for part in parts:
            if part.get_content_type() == "text/html":
                html = part.get_payload(decode=True).decode()

                soup = BeautifulSoup(html, **self.bs_kwargs)
                text = soup.get_text(self.get_text_separator)

                if soup.title:
                    title = str(soup.title.string)
                else:
                    title = ""

                metadata = {
                    "source": str(file_path),
                    "title": title,
                    **extra_info,
                }
                lines = [line for line in text.split("\n") if line.strip()]
                text = "\n\n".join(lines)
                if text:
                    page.append(text)
    # save the page into markdown format
    print(self.cache_dir)
    if self.cache_dir is not None:
        print(Path(self.cache_dir) / f"{file_name.stem}.md")
        with open(Path(self.cache_dir) / f"{file_name.stem}.md", "w") as f:
            f.write(page[0])

    return [Document(text="\n\n".join(page), metadata=metadata)]

MathpixPDFReader

Bases: BaseReader

Load PDF files using Mathpix service.

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
class MathpixPDFReader(BaseReader):
    """Load `PDF` files using `Mathpix` service."""

    def __init__(
        self,
        processed_file_format: str = "md",
        max_wait_time_seconds: int = 500,
        should_clean_pdf: bool = True,
        **kwargs: Any,
    ) -> None:
        """Initialize with a file path.

        Args:
            processed_file_format: a format of the processed file. Default is   "mmd".
            max_wait_time_seconds: a maximum time to wait for the response from
                the server. Default is 500.
            should_clean_pdf: a flag to clean the PDF file. Default is False.
            **kwargs: additional keyword arguments.
        """
        self.mathpix_api_key = get_from_dict_or_env(
            kwargs, "mathpix_api_key", "MATHPIX_API_KEY", default="empty"
        )
        self.mathpix_api_id = get_from_dict_or_env(
            kwargs, "mathpix_api_id", "MATHPIX_API_ID", default="empty"
        )
        self.processed_file_format = processed_file_format
        self.max_wait_time_seconds = max_wait_time_seconds
        self.should_clean_pdf = should_clean_pdf
        super().__init__()

    @property
    def _mathpix_headers(self) -> Dict[str, str]:
        return {"app_id": self.mathpix_api_id, "app_key": self.mathpix_api_key}

    @property
    def url(self) -> str:
        return "https://api.mathpix.com/v3/pdf"

    @property
    def data(self) -> dict:
        options = {
            "conversion_formats": {self.processed_file_format: True},
            "enable_tables_fallback": True,
        }
        return {"options_json": json.dumps(options)}

    def send_pdf(self, file_path) -> str:
        with open(file_path, "rb") as f:
            files = {"file": f}
            response = requests.post(
                self.url, headers=self._mathpix_headers, files=files, data=self.data
            )
        response_data = response.json()
        if "pdf_id" in response_data:
            pdf_id = response_data["pdf_id"]
            return pdf_id
        else:
            raise ValueError("Unable to send PDF to Mathpix.")

    def wait_for_processing(self, pdf_id: str) -> None:
        """Wait for processing to complete.

        Args:
            pdf_id: a PDF id.

        Returns: None
        """
        url = self.url + "/" + pdf_id
        for _ in range(0, self.max_wait_time_seconds, 5):
            response = requests.get(url, headers=self._mathpix_headers)
            response_data = response.json()
            status = response_data.get("status", None)

            if status == "completed":
                return
            elif status == "error":
                raise ValueError("Unable to retrieve PDF from Mathpix")
            else:
                print(response_data)
                print(url)
                time.sleep(5)
        raise TimeoutError

    def get_processed_pdf(self, pdf_id: str) -> str:
        self.wait_for_processing(pdf_id)
        url = f"{self.url}/{pdf_id}.{self.processed_file_format}"
        response = requests.get(url, headers=self._mathpix_headers)
        return response.content.decode("utf-8")

    def clean_pdf(self, contents: str) -> str:
        """Clean the PDF file.

        Args:
            contents: a PDF file contents.

        Returns:

        """
        contents = "\n".join(
            [line for line in contents.split("\n") if not line.startswith("![]")]
        )
        # replace \section{Title} with # Title
        contents = contents.replace("\\section{", "# ")
        # replace the "\" slash that Mathpix adds to escape $, %, (, etc.

        # http:// or https:// followed by anything but a closing paren
        url_regex = "http[s]?://[^)]+"
        markup_regex = r"\[]\(\s*({0})\s*\)".format(url_regex)
        contents = (
            contents.replace(r"\$", "$")
            .replace(r"\%", "%")
            .replace(r"\(", "(")
            .replace(r"\)", ")")
            .replace("$\\begin{array}", "")
            .replace("\\end{array}$", "")
            .replace("\\\\", "")
            .replace("\\text", "")
            .replace("}", "")
            .replace("{", "")
            .replace("\\mathrm", "")
        )
        contents = re.sub(markup_regex, "", contents)
        return contents

    def load_data(
        self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        if "response_content" in kwargs:
            # overriding response content if specified
            content = kwargs["response_content"]
        else:
            # call original API
            pdf_id = self.send_pdf(file_path)
            content = self.get_processed_pdf(pdf_id)

        if self.should_clean_pdf:
            content = self.clean_pdf(content)
        tables, texts = parse_markdown_text_to_tables(content)
        documents = []
        for table in tables:
            text = strip_special_chars_markdown(table)
            metadata = {
                "table_origin": table,
                "type": "table",
            }
            if extra_info:
                metadata.update(extra_info)
            documents.append(
                Document(
                    text=text,
                    metadata=metadata,
                    metadata_template="",
                    metadata_seperator="",
                )
            )

        for text in texts:
            metadata = {"source": file_path.name, "type": "text"}
            documents.append(Document(text=text, metadata=metadata))

        return documents

wait_for_processing

wait_for_processing(pdf_id)

Wait for processing to complete.

Parameters:

Name Type Description Default
pdf_id str

a PDF id.

required

Returns: None

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def wait_for_processing(self, pdf_id: str) -> None:
    """Wait for processing to complete.

    Args:
        pdf_id: a PDF id.

    Returns: None
    """
    url = self.url + "/" + pdf_id
    for _ in range(0, self.max_wait_time_seconds, 5):
        response = requests.get(url, headers=self._mathpix_headers)
        response_data = response.json()
        status = response_data.get("status", None)

        if status == "completed":
            return
        elif status == "error":
            raise ValueError("Unable to retrieve PDF from Mathpix")
        else:
            print(response_data)
            print(url)
            time.sleep(5)
    raise TimeoutError

clean_pdf

clean_pdf(contents)

Clean the PDF file.

Parameters:

Name Type Description Default
contents str

a PDF file contents.

required

Returns:

Source code in libs/kotaemon/kotaemon/loaders/mathpix_loader.py
def clean_pdf(self, contents: str) -> str:
    """Clean the PDF file.

    Args:
        contents: a PDF file contents.

    Returns:

    """
    contents = "\n".join(
        [line for line in contents.split("\n") if not line.startswith("![]")]
    )
    # replace \section{Title} with # Title
    contents = contents.replace("\\section{", "# ")
    # replace the "\" slash that Mathpix adds to escape $, %, (, etc.

    # http:// or https:// followed by anything but a closing paren
    url_regex = "http[s]?://[^)]+"
    markup_regex = r"\[]\(\s*({0})\s*\)".format(url_regex)
    contents = (
        contents.replace(r"\$", "$")
        .replace(r"\%", "%")
        .replace(r"\(", "(")
        .replace(r"\)", ")")
        .replace("$\\begin{array}", "")
        .replace("\\end{array}$", "")
        .replace("\\\\", "")
        .replace("\\text", "")
        .replace("}", "")
        .replace("{", "")
        .replace("\\mathrm", "")
    )
    contents = re.sub(markup_regex, "", contents)
    return contents

ImageReader

Bases: BaseReader

Read PDF using OCR, with high focus on table extraction

Example
1
2
3
>> from knowledgehub.loaders import OCRReader
>> reader = OCRReader()
>> documents = reader.load_data("path/to/pdf")

Parameters:

Name Type Description Default
endpoint Optional[str]

URL to FullOCR endpoint. If not provided, will look for environment variable OCR_READER_ENDPOINT or use the default knowledgehub.loaders.ocr_loader.DEFAULT_OCR_ENDPOINT (http://127.0.0.1:8000/v2/ai/infer/)

None
use_ocr

whether to use OCR to read text (e.g: from images, tables) in the PDF If False, only the table and text within table cells will be extracted.

required
Source code in libs/kotaemon/kotaemon/loaders/ocr_loader.py
class ImageReader(BaseReader):
    """Read PDF using OCR, with high focus on table extraction

    Example:
        ```python
        >> from knowledgehub.loaders import OCRReader
        >> reader = OCRReader()
        >> documents = reader.load_data("path/to/pdf")
        ```

    Args:
        endpoint: URL to FullOCR endpoint. If not provided, will look for
            environment variable `OCR_READER_ENDPOINT` or use the default
            `knowledgehub.loaders.ocr_loader.DEFAULT_OCR_ENDPOINT`
            (http://127.0.0.1:8000/v2/ai/infer/)
        use_ocr: whether to use OCR to read text (e.g: from images, tables) in the PDF
            If False, only the table and text within table cells will be extracted.
    """

    def __init__(self, endpoint: Optional[str] = None):
        """Init the OCR reader with OCR endpoint (FullOCR pipeline)"""
        super().__init__()
        self.ocr_endpoint = endpoint or os.getenv(
            "OCR_READER_ENDPOINT", DEFAULT_OCR_ENDPOINT
        )

    def load_data(
        self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        """Load data using OCR reader

        Args:
            file_path (Path): Path to PDF file
            debug_path (Path): Path to store debug image output
            artifact_path (Path): Path to OCR endpoints artifacts directory

        Returns:
            List[Document]: list of documents extracted from the PDF file
        """
        file_path = Path(file_path).resolve()

        # call the API from FullOCR endpoint
        if "response_content" in kwargs:
            # overriding response content if specified
            ocr_results = kwargs["response_content"]
        else:
            # call original API
            resp = tenacious_api_post(
                url=self.ocr_endpoint, file_path=file_path, table_only=False
            )
            ocr_results = resp.json()["result"]

        extra_info = extra_info or {}
        result = []
        for ocr_result in ocr_results:
            result.append(
                Document(
                    content=ocr_result["csv_string"],
                    metadata=extra_info,
                )
            )

        return result

load_data

load_data(file_path, extra_info=None, **kwargs)

Load data using OCR reader

Parameters:

Name Type Description Default
file_path Path

Path to PDF file

required
debug_path Path

Path to store debug image output

required
artifact_path Path

Path to OCR endpoints artifacts directory

required

Returns:

Type Description
List[Document]

List[Document]: list of documents extracted from the PDF file

Source code in libs/kotaemon/kotaemon/loaders/ocr_loader.py
def load_data(
    self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
) -> List[Document]:
    """Load data using OCR reader

    Args:
        file_path (Path): Path to PDF file
        debug_path (Path): Path to store debug image output
        artifact_path (Path): Path to OCR endpoints artifacts directory

    Returns:
        List[Document]: list of documents extracted from the PDF file
    """
    file_path = Path(file_path).resolve()

    # call the API from FullOCR endpoint
    if "response_content" in kwargs:
        # overriding response content if specified
        ocr_results = kwargs["response_content"]
    else:
        # call original API
        resp = tenacious_api_post(
            url=self.ocr_endpoint, file_path=file_path, table_only=False
        )
        ocr_results = resp.json()["result"]

    extra_info = extra_info or {}
    result = []
    for ocr_result in ocr_results:
        result.append(
            Document(
                content=ocr_result["csv_string"],
                metadata=extra_info,
            )
        )

    return result

OCRReader

Bases: BaseReader

Read PDF using OCR, with high focus on table extraction

Example
1
2
3
>> from kotaemon.loaders import OCRReader
>> reader = OCRReader()
>> documents = reader.load_data("path/to/pdf")

Parameters:

Name Type Description Default
endpoint Optional[str]

URL to FullOCR endpoint. If not provided, will look for environment variable OCR_READER_ENDPOINT or use the default kotaemon.loaders.ocr_loader.DEFAULT_OCR_ENDPOINT (http://127.0.0.1:8000/v2/ai/infer/)

None
use_ocr

whether to use OCR to read text (e.g: from images, tables) in the PDF If False, only the table and text within table cells will be extracted.

True
Source code in libs/kotaemon/kotaemon/loaders/ocr_loader.py
class OCRReader(BaseReader):
    """Read PDF using OCR, with high focus on table extraction

    Example:
        ```python
        >> from kotaemon.loaders import OCRReader
        >> reader = OCRReader()
        >> documents = reader.load_data("path/to/pdf")
        ```

    Args:
        endpoint: URL to FullOCR endpoint. If not provided, will look for
            environment variable `OCR_READER_ENDPOINT` or use the default
            `kotaemon.loaders.ocr_loader.DEFAULT_OCR_ENDPOINT`
            (http://127.0.0.1:8000/v2/ai/infer/)
        use_ocr: whether to use OCR to read text (e.g: from images, tables) in the PDF
            If False, only the table and text within table cells will be extracted.
    """

    def __init__(self, endpoint: Optional[str] = None, use_ocr=True):
        """Init the OCR reader with OCR endpoint (FullOCR pipeline)"""
        super().__init__()
        self.ocr_endpoint = endpoint or os.getenv(
            "OCR_READER_ENDPOINT", DEFAULT_OCR_ENDPOINT
        )
        self.use_ocr = use_ocr

    def load_data(
        self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
    ) -> List[Document]:
        """Load data using OCR reader

        Args:
            file_path (Path): Path to PDF file
            debug_path (Path): Path to store debug image output
            artifact_path (Path): Path to OCR endpoints artifacts directory

        Returns:
            List[Document]: list of documents extracted from the PDF file
        """
        file_path = Path(file_path).resolve()

        # call the API from FullOCR endpoint
        if "response_content" in kwargs:
            # overriding response content if specified
            ocr_results = kwargs["response_content"]
        else:
            # call original API
            resp = tenacious_api_post(
                url=self.ocr_endpoint, file_path=file_path, table_only=not self.use_ocr
            )
            ocr_results = resp.json()["result"]

        debug_path = kwargs.pop("debug_path", None)
        artifact_path = kwargs.pop("artifact_path", None)

        # read PDF through normal reader (unstructured)
        pdf_page_items = read_pdf_unstructured(file_path)
        # merge PDF text output with OCR output
        tables, texts = parse_ocr_output(
            ocr_results,
            pdf_page_items,
            debug_path=debug_path,
            artifact_path=artifact_path,
        )
        extra_info = extra_info or {}

        # create output Document with metadata from table
        documents = [
            Document(
                text=strip_special_chars_markdown(table_text),
                metadata={
                    "table_origin": table_text,
                    "type": "table",
                    "page_label": page_id + 1,
                    **extra_info,
                },
                metadata_template="",
                metadata_seperator="",
            )
            for page_id, table_text in tables
        ]
        # create Document from non-table text
        documents.extend(
            [
                Document(
                    text=non_table_text,
                    metadata={"page_label": page_id + 1, **extra_info},
                )
                for page_id, non_table_text in texts
            ]
        )

        return documents

load_data

load_data(file_path, extra_info=None, **kwargs)

Load data using OCR reader

Parameters:

Name Type Description Default
file_path Path

Path to PDF file

required
debug_path Path

Path to store debug image output

required
artifact_path Path

Path to OCR endpoints artifacts directory

required

Returns:

Type Description
List[Document]

List[Document]: list of documents extracted from the PDF file

Source code in libs/kotaemon/kotaemon/loaders/ocr_loader.py
def load_data(
    self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
) -> List[Document]:
    """Load data using OCR reader

    Args:
        file_path (Path): Path to PDF file
        debug_path (Path): Path to store debug image output
        artifact_path (Path): Path to OCR endpoints artifacts directory

    Returns:
        List[Document]: list of documents extracted from the PDF file
    """
    file_path = Path(file_path).resolve()

    # call the API from FullOCR endpoint
    if "response_content" in kwargs:
        # overriding response content if specified
        ocr_results = kwargs["response_content"]
    else:
        # call original API
        resp = tenacious_api_post(
            url=self.ocr_endpoint, file_path=file_path, table_only=not self.use_ocr
        )
        ocr_results = resp.json()["result"]

    debug_path = kwargs.pop("debug_path", None)
    artifact_path = kwargs.pop("artifact_path", None)

    # read PDF through normal reader (unstructured)
    pdf_page_items = read_pdf_unstructured(file_path)
    # merge PDF text output with OCR output
    tables, texts = parse_ocr_output(
        ocr_results,
        pdf_page_items,
        debug_path=debug_path,
        artifact_path=artifact_path,
    )
    extra_info = extra_info or {}

    # create output Document with metadata from table
    documents = [
        Document(
            text=strip_special_chars_markdown(table_text),
            metadata={
                "table_origin": table_text,
                "type": "table",
                "page_label": page_id + 1,
                **extra_info,
            },
            metadata_template="",
            metadata_seperator="",
        )
        for page_id, table_text in tables
    ]
    # create Document from non-table text
    documents.extend(
        [
            Document(
                text=non_table_text,
                metadata={"page_label": page_id + 1, **extra_info},
            )
            for page_id, non_table_text in texts
        ]
    )

    return documents

PDFThumbnailReader

Bases: PDFReader

PDF parser with thumbnail for each page.

Source code in libs/kotaemon/kotaemon/loaders/pdf_loader.py
class PDFThumbnailReader(PDFReader):
    """PDF parser with thumbnail for each page."""

    def __init__(self) -> None:
        """
        Initialize PDFReader.
        """
        super().__init__(return_full_document=False)

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        documents = super().load_data(file, extra_info, fs)

        page_numbers_str = []
        filtered_docs = []
        is_int_page_number: dict[str, bool] = {}

        for doc in documents:
            if "page_label" in doc.metadata:
                page_num_str = doc.metadata["page_label"]
                page_numbers_str.append(page_num_str)
                try:
                    _ = int(page_num_str)
                    is_int_page_number[page_num_str] = True
                    filtered_docs.append(doc)
                except ValueError:
                    is_int_page_number[page_num_str] = False
                    continue

        documents = filtered_docs
        page_numbers = list(range(len(page_numbers_str)))

        print("Page numbers:", len(page_numbers))
        page_thumbnails = get_page_thumbnails(file, page_numbers)

        documents.extend(
            [
                Document(
                    text="Page thumbnail",
                    metadata={
                        "image_origin": page_thumbnail,
                        "type": "thumbnail",
                        "page_label": page_number,
                        **(extra_info if extra_info is not None else {}),
                    },
                )
                for (page_thumbnail, page_number) in zip(
                    page_thumbnails, page_numbers_str
                )
                if is_int_page_number[page_number]
            ]
        )

        return documents

load_data

load_data(file, extra_info=None, fs=None)

Parse file.

Source code in libs/kotaemon/kotaemon/loaders/pdf_loader.py
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    documents = super().load_data(file, extra_info, fs)

    page_numbers_str = []
    filtered_docs = []
    is_int_page_number: dict[str, bool] = {}

    for doc in documents:
        if "page_label" in doc.metadata:
            page_num_str = doc.metadata["page_label"]
            page_numbers_str.append(page_num_str)
            try:
                _ = int(page_num_str)
                is_int_page_number[page_num_str] = True
                filtered_docs.append(doc)
            except ValueError:
                is_int_page_number[page_num_str] = False
                continue

    documents = filtered_docs
    page_numbers = list(range(len(page_numbers_str)))

    print("Page numbers:", len(page_numbers))
    page_thumbnails = get_page_thumbnails(file, page_numbers)

    documents.extend(
        [
            Document(
                text="Page thumbnail",
                metadata={
                    "image_origin": page_thumbnail,
                    "type": "thumbnail",
                    "page_label": page_number,
                    **(extra_info if extra_info is not None else {}),
                },
            )
            for (page_thumbnail, page_number) in zip(
                page_thumbnails, page_numbers_str
            )
            if is_int_page_number[page_number]
        ]
    )

    return documents

UnstructuredReader

Bases: BaseReader

General unstructured text reader for a variety of files.

Source code in libs/kotaemon/kotaemon/loaders/unstructured_loader.py
class UnstructuredReader(BaseReader):
    """General unstructured text reader for a variety of files."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args)  # not passing kwargs to parent bc it cannot accept it

        self.api = False  # we default to local
        if "url" in kwargs:
            self.server_url = str(kwargs["url"])
            self.api = True  # is url was set, switch to api
        else:
            self.server_url = "http://localhost:8000"

        if "api" in kwargs:
            self.api = kwargs["api"]

        self.api_key = ""
        if "api_key" in kwargs:
            self.api_key = kwargs["api_key"]

    """ Loads data using Unstructured.io

        Depending on the construction if url is set or api = True
        it'll parse file using API call, else parse it locally
        additional_metadata is extended by the returned metadata if
        split_documents is True

        Returns list of documents
    """

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        split_documents: Optional[bool] = False,
        **kwargs,
    ) -> List[Document]:
        """If api is set, parse through api"""
        file_path_str = str(file)
        if self.api:
            from unstructured.partition.api import partition_via_api

            elements = partition_via_api(
                filename=file_path_str,
                api_key=self.api_key,
                api_url=self.server_url + "/general/v0/general",
            )
        else:
            """Parse file locally"""
            from unstructured.partition.auto import partition

            elements = partition(filename=file_path_str)

        """ Process elements """
        docs = []
        file_name = Path(file).name
        file_path = str(Path(file).resolve())
        if split_documents:
            for node in elements:
                metadata = {"file_name": file_name, "file_path": file_path}
                if hasattr(node, "metadata"):
                    """Load metadata fields"""
                    for field, val in vars(node.metadata).items():
                        if field == "_known_field_names":
                            continue
                        # removing coordinates because it does not serialize
                        # and dont want to bother with it
                        if field == "coordinates":
                            continue
                        # removing bc it might cause interference
                        if field == "parent_id":
                            continue
                        metadata[field] = val

                if extra_info is not None:
                    metadata.update(extra_info)

                metadata["file_name"] = file_name
                docs.append(Document(text=node.text, metadata=metadata))

        else:
            text_chunks = [" ".join(str(el).split()) for el in elements]
            metadata = {"file_name": file_name, "file_path": file_path}

            if extra_info is not None:
                metadata.update(extra_info)

            # Create a single document by joining all the texts
            docs.append(Document(text="\n\n".join(text_chunks), metadata=metadata))

        return docs

load_data

1
2
3
load_data(
    file, extra_info=None, split_documents=False, **kwargs
)

If api is set, parse through api

Source code in libs/kotaemon/kotaemon/loaders/unstructured_loader.py
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    split_documents: Optional[bool] = False,
    **kwargs,
) -> List[Document]:
    """If api is set, parse through api"""
    file_path_str = str(file)
    if self.api:
        from unstructured.partition.api import partition_via_api

        elements = partition_via_api(
            filename=file_path_str,
            api_key=self.api_key,
            api_url=self.server_url + "/general/v0/general",
        )
    else:
        """Parse file locally"""
        from unstructured.partition.auto import partition

        elements = partition(filename=file_path_str)

    """ Process elements """
    docs = []
    file_name = Path(file).name
    file_path = str(Path(file).resolve())
    if split_documents:
        for node in elements:
            metadata = {"file_name": file_name, "file_path": file_path}
            if hasattr(node, "metadata"):
                """Load metadata fields"""
                for field, val in vars(node.metadata).items():
                    if field == "_known_field_names":
                        continue
                    # removing coordinates because it does not serialize
                    # and dont want to bother with it
                    if field == "coordinates":
                        continue
                    # removing bc it might cause interference
                    if field == "parent_id":
                        continue
                    metadata[field] = val

            if extra_info is not None:
                metadata.update(extra_info)

            metadata["file_name"] = file_name
            docs.append(Document(text=node.text, metadata=metadata))

    else:
        text_chunks = [" ".join(str(el).split()) for el in elements]
        metadata = {"file_name": file_name, "file_path": file_path}

        if extra_info is not None:
            metadata.update(extra_info)

        # Create a single document by joining all the texts
        docs.append(Document(text="\n\n".join(text_chunks), metadata=metadata))

    return docs