Skip to content

Embeddings

EndpointEmbeddings

Bases: BaseEmbeddings

An Embeddings component that uses an OpenAI API compatible endpoint.

Attributes:

Name Type Description
endpoint_url str

The url of an OpenAI API compatible endpoint.

Source code in libs/kotaemon/kotaemon/embeddings/endpoint_based.py
class EndpointEmbeddings(BaseEmbeddings):
    """
    An Embeddings component that uses an OpenAI API compatible endpoint.

    Attributes:
        endpoint_url (str): The url of an OpenAI API compatible endpoint.
    """

    endpoint_url: str

    def run(
        self, text: str | list[str] | Document | list[Document]
    ) -> list[DocumentWithEmbedding]:
        """
        Generate embeddings from text Args:
            text (str | list[str] | Document | list[Document]): text to generate
            embeddings from
        Returns:
            list[DocumentWithEmbedding]: embeddings
        """
        if not isinstance(text, list):
            text = [text]

        outputs = []

        for item in text:
            response = requests.post(
                self.endpoint_url, json={"input": str(item)}
            ).json()
            outputs.append(
                DocumentWithEmbedding(
                    text=str(item),
                    embedding=response["data"][0]["embedding"],
                    total_tokens=response["usage"]["total_tokens"],
                    prompt_tokens=response["usage"]["prompt_tokens"],
                )
            )

        return outputs

run

run(text)
Generate embeddings from text Args

text (str | list[str] | Document | list[Document]): text to generate embeddings from

Returns: list[DocumentWithEmbedding]: embeddings

Source code in libs/kotaemon/kotaemon/embeddings/endpoint_based.py
def run(
    self, text: str | list[str] | Document | list[Document]
) -> list[DocumentWithEmbedding]:
    """
    Generate embeddings from text Args:
        text (str | list[str] | Document | list[Document]): text to generate
        embeddings from
    Returns:
        list[DocumentWithEmbedding]: embeddings
    """
    if not isinstance(text, list):
        text = [text]

    outputs = []

    for item in text:
        response = requests.post(
            self.endpoint_url, json={"input": str(item)}
        ).json()
        outputs.append(
            DocumentWithEmbedding(
                text=str(item),
                embedding=response["data"][0]["embedding"],
                total_tokens=response["usage"]["total_tokens"],
                prompt_tokens=response["usage"]["prompt_tokens"],
            )
        )

    return outputs

FastEmbedEmbeddings

Bases: BaseEmbeddings

Utilize fastembed library for embeddings locally without GPU.

Supported model: https://qdrant.github.io/fastembed/examples/Supported_Models/ Code: https://github.com/qdrant/fastembed

Source code in libs/kotaemon/kotaemon/embeddings/fastembed.py
class FastEmbedEmbeddings(BaseEmbeddings):
    """Utilize fastembed library for embeddings locally without GPU.

    Supported model: https://qdrant.github.io/fastembed/examples/Supported_Models/
    Code: https://github.com/qdrant/fastembed
    """

    model_name: str = Param(
        "BAAI/bge-small-en-v1.5",
        help=(
            "Model name for fastembed. Please refer "
            "[here](https://qdrant.github.io/fastembed/examples/Supported_Models/) "
            "for the list of supported models."
        ),
        required=True,
    )
    batch_size: int = Param(
        256,
        help="Batch size for embeddings. Higher values use more memory, but are faster",
    )
    parallel: Optional[int] = Param(
        None,
        help=(
            "Number of threads to use for embeddings. "
            "If > 1, data-parallel encoding will be used. "
            "If 0, use all available CPUs. "
            "If None, use default onnxruntime threading. "
            "Defaults to None."
        ),
    )

    @Param.auto()
    def client_(self) -> "TextEmbedding":
        try:
            from fastembed import TextEmbedding
        except ImportError:
            raise ImportError("Please install FastEmbed: `pip install fastembed`")

        return TextEmbedding(model_name=self.model_name)

    def invoke(
        self, text: str | list[str] | Document | list[Document], *args, **kwargs
    ) -> list[DocumentWithEmbedding]:
        input_ = self.prepare_input(text)
        embeddings = self.client_.embed(
            [_.content for _ in input_],
            batch_size=self.batch_size,
            parallel=self.parallel,
        )
        return [
            DocumentWithEmbedding(
                content=doc,
                embedding=list(embedding),
            )
            for doc, embedding in zip(input_, embeddings)
        ]

    async def ainvoke(
        self, text: str | list[str] | Document | list[Document], *args, **kwargs
    ) -> list[DocumentWithEmbedding]:
        """Fastembed does not support async API."""
        return self.invoke(text, *args, **kwargs)

ainvoke async

ainvoke(text, *args, **kwargs)

Fastembed does not support async API.

Source code in libs/kotaemon/kotaemon/embeddings/fastembed.py
async def ainvoke(
    self, text: str | list[str] | Document | list[Document], *args, **kwargs
) -> list[DocumentWithEmbedding]:
    """Fastembed does not support async API."""
    return self.invoke(text, *args, **kwargs)

LCAzureOpenAIEmbeddings

Bases: LCEmbeddingMixin, BaseEmbeddings

Wrapper around Langchain's AzureOpenAI embedding, focusing on key parameters

Source code in libs/kotaemon/kotaemon/embeddings/langchain_based.py
class LCAzureOpenAIEmbeddings(LCEmbeddingMixin, BaseEmbeddings):
    """Wrapper around Langchain's AzureOpenAI embedding, focusing on key parameters"""

    def __init__(
        self,
        azure_endpoint: Optional[str] = None,
        deployment: Optional[str] = None,
        openai_api_key: Optional[str] = None,
        api_version: Optional[str] = None,
        request_timeout: Optional[float] = None,
        **params,
    ):
        super().__init__(
            azure_endpoint=azure_endpoint,
            deployment=deployment,
            api_version=api_version,
            openai_api_key=openai_api_key,
            request_timeout=request_timeout,
            **params,
        )

    def _get_lc_class(self):
        try:
            from langchain_openai import AzureOpenAIEmbeddings
        except ImportError:
            from langchain.embeddings import AzureOpenAIEmbeddings

        return AzureOpenAIEmbeddings

LCCohereEmbeddings

Bases: LCEmbeddingMixin, BaseEmbeddings

Wrapper around Langchain's Cohere embedding, focusing on key parameters

Source code in libs/kotaemon/kotaemon/embeddings/langchain_based.py
class LCCohereEmbeddings(LCEmbeddingMixin, BaseEmbeddings):
    """Wrapper around Langchain's Cohere embedding, focusing on key parameters"""

    cohere_api_key: str = Param(
        help="API key (https://dashboard.cohere.com/api-keys)",
        default=None,
        required=True,
    )
    model: str = Param(
        help="Model name to use (https://docs.cohere.com/docs/models)",
        default=None,
        required=True,
    )
    user_agent: str = Param(
        help="User agent (leave default)", default="default", required=True
    )

    def __init__(
        self,
        model: str = "embed-english-v2.0",
        cohere_api_key: Optional[str] = None,
        truncate: Optional[str] = None,
        request_timeout: Optional[float] = None,
        **params,
    ):
        super().__init__(
            model=model,
            cohere_api_key=cohere_api_key,
            truncate=truncate,
            request_timeout=request_timeout,
            **params,
        )

    def _get_lc_class(self):
        try:
            from langchain_cohere import CohereEmbeddings
        except ImportError:
            from langchain.embeddings import CohereEmbeddings

        return CohereEmbeddings

LCHuggingFaceEmbeddings

Bases: LCEmbeddingMixin, BaseEmbeddings

Wrapper around Langchain's HuggingFace embedding, focusing on key parameters

Source code in libs/kotaemon/kotaemon/embeddings/langchain_based.py
class LCHuggingFaceEmbeddings(LCEmbeddingMixin, BaseEmbeddings):
    """Wrapper around Langchain's HuggingFace embedding, focusing on key parameters"""

    model_name: str = Param(
        help=(
            "Model name to use (https://huggingface.co/models?"
            "pipeline_tag=sentence-similarity&sort=trending)"
        ),
        default=None,
        required=True,
    )

    def __init__(
        self,
        model_name: str = "sentence-transformers/all-mpnet-base-v2",
        **params,
    ):
        super().__init__(
            model_name=model_name,
            **params,
        )

    def _get_lc_class(self):
        try:
            from langchain_community.embeddings import HuggingFaceBgeEmbeddings
        except ImportError:
            from langchain.embeddings import HuggingFaceBgeEmbeddings

        return HuggingFaceBgeEmbeddings

LCOpenAIEmbeddings

Bases: LCEmbeddingMixin, BaseEmbeddings

Wrapper around Langchain's OpenAI embedding, focusing on key parameters

Source code in libs/kotaemon/kotaemon/embeddings/langchain_based.py
class LCOpenAIEmbeddings(LCEmbeddingMixin, BaseEmbeddings):
    """Wrapper around Langchain's OpenAI embedding, focusing on key parameters"""

    def __init__(
        self,
        model: str = "text-embedding-ada-002",
        openai_api_version: Optional[str] = None,
        openai_api_base: Optional[str] = None,
        openai_api_type: Optional[str] = None,
        openai_api_key: Optional[str] = None,
        request_timeout: Optional[float] = None,
        **params,
    ):
        super().__init__(
            model=model,
            openai_api_version=openai_api_version,
            openai_api_base=openai_api_base,
            openai_api_type=openai_api_type,
            openai_api_key=openai_api_key,
            request_timeout=request_timeout,
            **params,
        )

    def _get_lc_class(self):
        try:
            from langchain_openai import OpenAIEmbeddings
        except ImportError:
            from langchain.embeddings import OpenAIEmbeddings

        return OpenAIEmbeddings

AzureOpenAIEmbeddings

Bases: BaseOpenAIEmbeddings

Source code in libs/kotaemon/kotaemon/embeddings/openai.py
class AzureOpenAIEmbeddings(BaseOpenAIEmbeddings):
    azure_endpoint: str = Param(
        None,
        help=(
            "HTTPS endpoint for the Azure OpenAI model. The azure_endpoint, "
            "azure_deployment, and api_version parameters are used to construct "
            "the full URL for the Azure OpenAI model."
        ),
        required=True,
    )
    azure_deployment: str = Param(None, help="Azure deployment name", required=True)
    api_version: str = Param(None, help="Azure model version", required=True)
    azure_ad_token: Optional[str] = Param(None, help="Azure AD token")
    azure_ad_token_provider: Optional[str] = Param(None, help="Azure AD token provider")

    @Param.auto(depends_on=["azure_ad_token_provider"])
    def azure_ad_token_provider_(self):
        if isinstance(self.azure_ad_token_provider, str):
            return import_dotted_string(self.azure_ad_token_provider, safe=False)

    def prepare_client(self, async_version: bool = False):
        """Get the OpenAI client

        Args:
            async_version (bool): Whether to get the async version of the client
        """
        params = {
            "azure_endpoint": self.azure_endpoint,
            "api_version": self.api_version,
            "api_key": self.api_key,
            "azure_ad_token": self.azure_ad_token,
            "azure_ad_token_provider": self.azure_ad_token_provider_,
            "timeout": self.timeout,
            "max_retries": self.max_retries_,
        }
        if async_version:
            from openai import AsyncAzureOpenAI

            return AsyncAzureOpenAI(**params)

        from openai import AzureOpenAI

        return AzureOpenAI(**params)

    @retry(
        retry=retry_if_not_exception_type(
            (openai.NotFoundError, openai.BadRequestError)
        ),
        wait=wait_random_exponential(min=1, max=40),
        stop=stop_after_attempt(6),
    )
    def openai_response(self, client, **kwargs):
        """Get the openai response"""
        params: dict = {
            "model": self.azure_deployment,
        }
        if self.dimensions:
            params["dimensions"] = self.dimensions
        params.update(kwargs)

        return client.embeddings.create(**params)

prepare_client

prepare_client(async_version=False)

Get the OpenAI client

Parameters:

Name Type Description Default
async_version bool

Whether to get the async version of the client

False
Source code in libs/kotaemon/kotaemon/embeddings/openai.py
def prepare_client(self, async_version: bool = False):
    """Get the OpenAI client

    Args:
        async_version (bool): Whether to get the async version of the client
    """
    params = {
        "azure_endpoint": self.azure_endpoint,
        "api_version": self.api_version,
        "api_key": self.api_key,
        "azure_ad_token": self.azure_ad_token,
        "azure_ad_token_provider": self.azure_ad_token_provider_,
        "timeout": self.timeout,
        "max_retries": self.max_retries_,
    }
    if async_version:
        from openai import AsyncAzureOpenAI

        return AsyncAzureOpenAI(**params)

    from openai import AzureOpenAI

    return AzureOpenAI(**params)

openai_response

openai_response(client, **kwargs)

Get the openai response

Source code in libs/kotaemon/kotaemon/embeddings/openai.py
@retry(
    retry=retry_if_not_exception_type(
        (openai.NotFoundError, openai.BadRequestError)
    ),
    wait=wait_random_exponential(min=1, max=40),
    stop=stop_after_attempt(6),
)
def openai_response(self, client, **kwargs):
    """Get the openai response"""
    params: dict = {
        "model": self.azure_deployment,
    }
    if self.dimensions:
        params["dimensions"] = self.dimensions
    params.update(kwargs)

    return client.embeddings.create(**params)

OpenAIEmbeddings

Bases: BaseOpenAIEmbeddings

OpenAI chat model

Source code in libs/kotaemon/kotaemon/embeddings/openai.py
class OpenAIEmbeddings(BaseOpenAIEmbeddings):
    """OpenAI chat model"""

    base_url: Optional[str] = Param(None, help="OpenAI base URL")
    organization: Optional[str] = Param(None, help="OpenAI organization")
    model: str = Param(
        None,
        help=(
            "ID of the model to use. You can go to [Model overview](https://platform."
            "openai.com/docs/models/overview) to see the available models."
        ),
        required=True,
    )

    def prepare_client(self, async_version: bool = False):
        """Get the OpenAI client

        Args:
            async_version (bool): Whether to get the async version of the client
        """
        params = {
            "api_key": self.api_key,
            "organization": self.organization,
            "base_url": self.base_url,
            "timeout": self.timeout,
            "max_retries": self.max_retries_,
        }
        if async_version:
            from openai import AsyncOpenAI

            return AsyncOpenAI(**params)

        from openai import OpenAI

        return OpenAI(**params)

    @retry(
        retry=retry_if_not_exception_type(
            (openai.NotFoundError, openai.BadRequestError)
        ),
        wait=wait_random_exponential(min=1, max=40),
        stop=stop_after_attempt(6),
    )
    def openai_response(self, client, **kwargs):
        """Get the openai response"""
        params: dict = {
            "model": self.model,
        }
        if self.dimensions:
            params["dimensions"] = self.dimensions
        params.update(kwargs)

        return client.embeddings.create(**params)

prepare_client

prepare_client(async_version=False)

Get the OpenAI client

Parameters:

Name Type Description Default
async_version bool

Whether to get the async version of the client

False
Source code in libs/kotaemon/kotaemon/embeddings/openai.py
def prepare_client(self, async_version: bool = False):
    """Get the OpenAI client

    Args:
        async_version (bool): Whether to get the async version of the client
    """
    params = {
        "api_key": self.api_key,
        "organization": self.organization,
        "base_url": self.base_url,
        "timeout": self.timeout,
        "max_retries": self.max_retries_,
    }
    if async_version:
        from openai import AsyncOpenAI

        return AsyncOpenAI(**params)

    from openai import OpenAI

    return OpenAI(**params)

openai_response

openai_response(client, **kwargs)

Get the openai response

Source code in libs/kotaemon/kotaemon/embeddings/openai.py
@retry(
    retry=retry_if_not_exception_type(
        (openai.NotFoundError, openai.BadRequestError)
    ),
    wait=wait_random_exponential(min=1, max=40),
    stop=stop_after_attempt(6),
)
def openai_response(self, client, **kwargs):
    """Get the openai response"""
    params: dict = {
        "model": self.model,
    }
    if self.dimensions:
        params["dimensions"] = self.dimensions
    params.update(kwargs)

    return client.embeddings.create(**params)

TeiEndpointEmbeddings

Bases: BaseEmbeddings

An Embeddings component that uses an TEI (Text-Embedding-Inference) API compatible endpoint.

Ref: https://github.com/huggingface/text-embeddings-inference

Attributes:

Name Type Description
endpoint_url str

The url of an TEI (Text-Embedding-Inference) API compatible endpoint.

normalize bool

Whether to normalize embeddings to unit length.

truncate bool

Whether to truncate embeddings to a fixed/default length.

Source code in libs/kotaemon/kotaemon/embeddings/tei_endpoint_embed.py
class TeiEndpointEmbeddings(BaseEmbeddings):
    """An Embeddings component that uses an
    TEI (Text-Embedding-Inference) API compatible endpoint.

    Ref: https://github.com/huggingface/text-embeddings-inference

    Attributes:
        endpoint_url (str): The url of an TEI
            (Text-Embedding-Inference) API compatible endpoint.
        normalize (bool): Whether to normalize embeddings to unit length.
        truncate (bool): Whether to truncate embeddings
            to a fixed/default length.
    """

    endpoint_url: str = Param(None, help="TEI embedding service api base URL")
    normalize: bool = Param(
        True,
        help="Normalize embeddings to unit length",
    )
    truncate: bool = Param(
        True,
        help="Truncate embeddings to a fixed/default length",
    )

    async def client_(self, inputs: list[str]):
        async with aiohttp.ClientSession() as session:
            async with session.post(
                url=self.endpoint_url,
                json={
                    "inputs": inputs,
                    "normalize": self.normalize,
                    "truncate": self.truncate,
                },
            ) as resp:
                embeddings = await resp.json()
        return embeddings

    async def ainvoke(
        self, text: str | list[str] | Document | list[Document], *args, **kwargs
    ) -> list[DocumentWithEmbedding]:
        if not isinstance(text, list):
            text = [text]
        text = self.prepare_input(text)

        outputs = []
        batch_size = 6
        num_batch = max(len(text) // batch_size, 1)
        for i in range(num_batch):
            if i == num_batch - 1:
                mini_batch = text[batch_size * i :]
            else:
                mini_batch = text[batch_size * i : batch_size * (i + 1)]
            mini_batch = [x.content for x in mini_batch]
            embeddings = await self.client_(mini_batch)  # type: ignore
            outputs.extend(
                [
                    DocumentWithEmbedding(content=doc, embedding=embedding)
                    for doc, embedding in zip(mini_batch, embeddings)
                ]
            )

        return outputs

    def invoke(
        self, text: str | list[str] | Document | list[Document], *args, **kwargs
    ) -> list[DocumentWithEmbedding]:
        if not isinstance(text, list):
            text = [text]

        text = self.prepare_input(text)

        outputs = []
        batch_size = 6
        num_batch = max(len(text) // batch_size, 1)
        for i in range(num_batch):
            if i == num_batch - 1:
                mini_batch = text[batch_size * i :]
            else:
                mini_batch = text[batch_size * i : batch_size * (i + 1)]
            mini_batch = [x.content for x in mini_batch]
            embeddings = session.post(
                url=self.endpoint_url,
                json={
                    "inputs": mini_batch,
                    "normalize": self.normalize,
                    "truncate": self.truncate,
                },
            ).json()
            outputs.extend(
                [
                    DocumentWithEmbedding(content=doc, embedding=embedding)
                    for doc, embedding in zip(mini_batch, embeddings)
                ]
            )
        return outputs