Skip to content

Chats

EndpointChatLLM

Bases: ChatLLM

A ChatLLM that uses an endpoint to generate responses. This expects an OpenAI API compatible endpoint.

Attributes:

Name Type Description
endpoint_url str

The url of a OpenAI API compatible endpoint.

Source code in libs/kotaemon/kotaemon/llms/chats/endpoint_based.py
class EndpointChatLLM(ChatLLM):
    """
    A ChatLLM that uses an endpoint to generate responses. This expects an OpenAI API
    compatible endpoint.

    Attributes:
        endpoint_url (str): The url of a OpenAI API compatible endpoint.
    """

    endpoint_url: str = Param(
        help="URL of the OpenAI API compatible endpoint", required=True
    )

    def run(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        """
        Generate response from messages
        Args:
            messages (str | BaseMessage | list[BaseMessage]): history of messages to
                generate response from
            **kwargs: additional arguments to pass to the OpenAI API
        Returns:
            LLMInterface: generated response
        """
        if isinstance(messages, str):
            input_ = [HumanMessage(content=messages)]
        elif isinstance(messages, BaseMessage):
            input_ = [messages]
        else:
            input_ = messages

        def decide_role(message: BaseMessage):
            if isinstance(message, SystemMessage):
                return "system"
            elif isinstance(message, AIMessage):
                return "assistant"
            else:
                return "user"

        request_json = {
            "messages": [{"content": m.text, "role": decide_role(m)} for m in input_]
        }

        response = requests.post(self.endpoint_url, json=request_json).json()

        content = ""
        candidates = []
        if response["choices"]:
            candidates = [
                each["message"]["content"]
                for each in response["choices"]
                if each["message"]["content"]
            ]
            content = candidates[0]

        return LLMInterface(
            content=content,
            candidates=candidates,
            completion_tokens=response["usage"]["completion_tokens"],
            total_tokens=response["usage"]["total_tokens"],
            prompt_tokens=response["usage"]["prompt_tokens"],
        )

    def invoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        """Same as run"""
        return self.run(messages, **kwargs)

    async def ainvoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        return self.invoke(messages, **kwargs)

run

run(messages, **kwargs)

Generate response from messages Args: messages (str | BaseMessage | list[BaseMessage]): history of messages to generate response from **kwargs: additional arguments to pass to the OpenAI API Returns: LLMInterface: generated response

Source code in libs/kotaemon/kotaemon/llms/chats/endpoint_based.py
def run(
    self, messages: str | BaseMessage | list[BaseMessage], **kwargs
) -> LLMInterface:
    """
    Generate response from messages
    Args:
        messages (str | BaseMessage | list[BaseMessage]): history of messages to
            generate response from
        **kwargs: additional arguments to pass to the OpenAI API
    Returns:
        LLMInterface: generated response
    """
    if isinstance(messages, str):
        input_ = [HumanMessage(content=messages)]
    elif isinstance(messages, BaseMessage):
        input_ = [messages]
    else:
        input_ = messages

    def decide_role(message: BaseMessage):
        if isinstance(message, SystemMessage):
            return "system"
        elif isinstance(message, AIMessage):
            return "assistant"
        else:
            return "user"

    request_json = {
        "messages": [{"content": m.text, "role": decide_role(m)} for m in input_]
    }

    response = requests.post(self.endpoint_url, json=request_json).json()

    content = ""
    candidates = []
    if response["choices"]:
        candidates = [
            each["message"]["content"]
            for each in response["choices"]
            if each["message"]["content"]
        ]
        content = candidates[0]

    return LLMInterface(
        content=content,
        candidates=candidates,
        completion_tokens=response["usage"]["completion_tokens"],
        total_tokens=response["usage"]["total_tokens"],
        prompt_tokens=response["usage"]["prompt_tokens"],
    )

invoke

invoke(messages, **kwargs)

Same as run

Source code in libs/kotaemon/kotaemon/llms/chats/endpoint_based.py
def invoke(
    self, messages: str | BaseMessage | list[BaseMessage], **kwargs
) -> LLMInterface:
    """Same as run"""
    return self.run(messages, **kwargs)

LCChatMixin

Mixin for langchain based chat models

Source code in libs/kotaemon/kotaemon/llms/chats/langchain_based.py
class LCChatMixin:
    """Mixin for langchain based chat models"""

    def _get_lc_class(self):
        raise NotImplementedError(
            "Please return the relevant Langchain class in in _get_lc_class"
        )

    def _get_tool_call_kwargs(self):
        return {}

    def __init__(self, stream: bool = False, **params):
        self._lc_class = self._get_lc_class()
        self._obj = self._lc_class(**params)
        self._kwargs: dict = params
        self._stream = stream

        super().__init__()

    def run(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        if self._stream:
            return self.stream(messages, **kwargs)  # type: ignore
        return self.invoke(messages, **kwargs)

    def prepare_message(self, messages: str | BaseMessage | list[BaseMessage]):
        input_: list[BaseMessage] = []

        if isinstance(messages, str):
            input_ = [HumanMessage(content=messages)]
        elif isinstance(messages, BaseMessage):
            input_ = [messages]
        else:
            input_ = messages

        return input_

    def prepare_response(self, pred):
        all_text = [each.text for each in pred.generations[0]]
        all_messages = [each.message for each in pred.generations[0]]

        completion_tokens, total_tokens, prompt_tokens = 0, 0, 0
        try:
            if pred.llm_output is not None:
                completion_tokens = pred.llm_output["token_usage"]["completion_tokens"]
                total_tokens = pred.llm_output["token_usage"]["total_tokens"]
                prompt_tokens = pred.llm_output["token_usage"]["prompt_tokens"]
        except Exception:
            pass

        return LLMInterface(
            text=all_text[0] if len(all_text) > 0 else "",
            candidates=all_text,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
            prompt_tokens=prompt_tokens,
            messages=all_messages,
            logits=[],
        )

    def invoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        """Generate response from messages

        Args:
            messages: history of messages to generate response from
            **kwargs: additional arguments to pass to the langchain chat model

        Returns:
            LLMInterface: generated response
        """
        input_ = self.prepare_message(messages)

        if "tools_pydantic" in kwargs:
            tools = kwargs.pop(
                "tools_pydantic",
            )
            lc_tool_call = self._obj.bind_tools(tools)
            pred = lc_tool_call.invoke(
                input_,
                **self._get_tool_call_kwargs(),
            )
            if pred.tool_calls:
                tool_calls = pred.tool_calls
            else:
                tool_calls = pred.additional_kwargs.get("tool_calls", [])

            output = LLMInterface(
                content="",
                additional_kwargs={"tool_calls": tool_calls},
            )
        else:
            pred = self._obj.generate(messages=[input_], **kwargs)
            output = self.prepare_response(pred)

        return output

    async def ainvoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        input_ = self.prepare_message(messages)
        pred = await self._obj.agenerate(messages=[input_], **kwargs)
        return self.prepare_response(pred)

    def stream(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> Iterator[LLMInterface]:
        for response in self._obj.stream(input=messages, **kwargs):
            yield LLMInterface(content=response.content)

    async def astream(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> AsyncGenerator[LLMInterface, None]:
        async for response in self._obj.astream(input=messages, **kwargs):
            yield LLMInterface(content=response.content)

    def to_langchain_format(self):
        return self._obj

    def __repr__(self):
        kwargs = []
        for key, value_obj in self._kwargs.items():
            value = repr(value_obj)
            kwargs.append(f"{key}={value}")
        kwargs_repr = ", ".join(kwargs)
        return f"{self.__class__.__name__}({kwargs_repr})"

    def __str__(self):
        kwargs = []
        for key, value_obj in self._kwargs.items():
            value = str(value_obj)
            if len(value) > 20:
                value = f"{value[:15]}..."
            kwargs.append(f"{key}={value}")
        kwargs_repr = ", ".join(kwargs)
        return f"{self.__class__.__name__}({kwargs_repr})"

    def __setattr__(self, name, value):
        if name == "_lc_class":
            return super().__setattr__(name, value)

        if name in self._lc_class.__fields__:
            self._kwargs[name] = value
            self._obj = self._lc_class(**self._kwargs)
        else:
            super().__setattr__(name, value)

    def __getattr__(self, name):
        if name in self._kwargs:
            return self._kwargs[name]
        return getattr(self._obj, name)

    def dump(self, *args, **kwargs):
        from theflow.utils.modules import serialize

        params = {key: serialize(value) for key, value in self._kwargs.items()}
        return {
            "__type__": f"{self.__module__}.{self.__class__.__qualname__}",
            **params,
        }

    def specs(self, path: str):
        path = path.strip(".")
        if "." in path:
            raise ValueError("path should not contain '.'")

        if path in self._lc_class.__fields__:
            return {
                "__type__": "theflow.base.ParamAttr",
                "refresh_on_set": True,
                "strict_type": True,
            }

        raise ValueError(f"Invalid param {path}")

invoke

invoke(messages, **kwargs)

Generate response from messages

Parameters:

Name Type Description Default
messages str | BaseMessage | list[BaseMessage]

history of messages to generate response from

required
**kwargs

additional arguments to pass to the langchain chat model

{}

Returns:

Name Type Description
LLMInterface LLMInterface

generated response

Source code in libs/kotaemon/kotaemon/llms/chats/langchain_based.py
def invoke(
    self, messages: str | BaseMessage | list[BaseMessage], **kwargs
) -> LLMInterface:
    """Generate response from messages

    Args:
        messages: history of messages to generate response from
        **kwargs: additional arguments to pass to the langchain chat model

    Returns:
        LLMInterface: generated response
    """
    input_ = self.prepare_message(messages)

    if "tools_pydantic" in kwargs:
        tools = kwargs.pop(
            "tools_pydantic",
        )
        lc_tool_call = self._obj.bind_tools(tools)
        pred = lc_tool_call.invoke(
            input_,
            **self._get_tool_call_kwargs(),
        )
        if pred.tool_calls:
            tool_calls = pred.tool_calls
        else:
            tool_calls = pred.additional_kwargs.get("tool_calls", [])

        output = LLMInterface(
            content="",
            additional_kwargs={"tool_calls": tool_calls},
        )
    else:
        pred = self._obj.generate(messages=[input_], **kwargs)
        output = self.prepare_response(pred)

    return output

LlamaCppChat

Bases: ChatLLM

Wrapper around the llama-cpp-python's Llama model

Source code in libs/kotaemon/kotaemon/llms/chats/llamacpp.py
class LlamaCppChat(ChatLLM):
    """Wrapper around the llama-cpp-python's Llama model"""

    model_path: Optional[str] = Param(
        help="Path to the model file. This is required to load the model.",
    )
    repo_id: Optional[str] = Param(
        help="Id of a repo on the HuggingFace Hub in the form of `user_name/repo_name`."
    )
    filename: Optional[str] = Param(
        help="A filename or glob pattern to match the model file in the repo."
    )
    chat_format: str = Param(
        help=(
            "Chat format to use. Please refer to llama_cpp.llama_chat_format for a "
            "list of supported formats. If blank, the chat format will be auto-"
            "inferred."
        ),
        required=True,
    )
    lora_base: Optional[str] = Param(None, help="Path to the base Lora model")
    n_ctx: Optional[int] = Param(512, help="Text context, 0 = from model")
    n_gpu_layers: Optional[int] = Param(
        0,
        help="Number of layers to offload to GPU. If -1, all layers are offloaded",
    )
    use_mmap: Optional[bool] = Param(
        True,
        help=(),
    )
    vocab_only: Optional[bool] = Param(
        False,
        help="If True, only the vocabulary is loaded. This is useful for debugging.",
    )

    _role_mapper: dict[str, str] = {
        "human": "user",
        "system": "system",
        "ai": "assistant",
    }

    @Param.auto()
    def client_object(self) -> "Llama":
        """Get the llama-cpp-python client object"""
        try:
            from llama_cpp import Llama
        except ImportError:
            raise ImportError(
                "llama-cpp-python is not installed. "
                "Please install it using `pip install llama-cpp-python`"
            )

        errors = []
        if not self.model_path and (not self.repo_id or not self.filename):
            errors.append(
                "- `model_path` or `repo_id` and `filename` are required to load the"
                " model"
            )

        if not self.chat_format:
            errors.append(
                "- `chat_format` is required to know how to format the chat messages. "
                "Please refer to llama_cpp.llama_chat_format for a list of supported "
                "formats."
            )
        if errors:
            raise ValueError("\n".join(errors))

        if self.model_path:
            return Llama(
                model_path=cast(str, self.model_path),
                chat_format=self.chat_format,
                lora_base=self.lora_base,
                n_ctx=self.n_ctx,
                n_gpu_layers=self.n_gpu_layers,
                use_mmap=self.use_mmap,
                vocab_only=self.vocab_only,
            )
        else:
            return Llama.from_pretrained(
                repo_id=self.repo_id,
                filename=self.filename,
                chat_format=self.chat_format,
                lora_base=self.lora_base,
                n_ctx=self.n_ctx,
                n_gpu_layers=self.n_gpu_layers,
                use_mmap=self.use_mmap,
                vocab_only=self.vocab_only,
            )

    def prepare_message(
        self, messages: str | BaseMessage | list[BaseMessage]
    ) -> list[dict]:
        input_: list[BaseMessage] = []

        if isinstance(messages, str):
            input_ = [HumanMessage(content=messages)]
        elif isinstance(messages, BaseMessage):
            input_ = [messages]
        else:
            input_ = messages

        output_ = [
            {"role": self._role_mapper[each.type], "content": each.content}
            for each in input_
        ]

        return output_

    def invoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:

        pred: "CCCR" = self.client_object.create_chat_completion(
            messages=self.prepare_message(messages),
            stream=False,
        )

        return LLMInterface(
            content=pred["choices"][0]["message"]["content"] if pred["choices"] else "",
            candidates=[
                c["message"]["content"]
                for c in pred["choices"]
                if c["message"]["content"]
            ],
            completion_tokens=pred["usage"]["completion_tokens"],
            total_tokens=pred["usage"]["total_tokens"],
            prompt_tokens=pred["usage"]["prompt_tokens"],
        )

    def stream(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> Iterator[LLMInterface]:
        pred = self.client_object.create_chat_completion(
            messages=self.prepare_message(messages),
            stream=True,
        )
        for chunk in pred:
            if not chunk["choices"]:
                continue

            if "content" not in chunk["choices"][0]["delta"]:
                continue

            yield LLMInterface(content=chunk["choices"][0]["delta"]["content"])

client_object

client_object()

Get the llama-cpp-python client object

Source code in libs/kotaemon/kotaemon/llms/chats/llamacpp.py
@Param.auto()
def client_object(self) -> "Llama":
    """Get the llama-cpp-python client object"""
    try:
        from llama_cpp import Llama
    except ImportError:
        raise ImportError(
            "llama-cpp-python is not installed. "
            "Please install it using `pip install llama-cpp-python`"
        )

    errors = []
    if not self.model_path and (not self.repo_id or not self.filename):
        errors.append(
            "- `model_path` or `repo_id` and `filename` are required to load the"
            " model"
        )

    if not self.chat_format:
        errors.append(
            "- `chat_format` is required to know how to format the chat messages. "
            "Please refer to llama_cpp.llama_chat_format for a list of supported "
            "formats."
        )
    if errors:
        raise ValueError("\n".join(errors))

    if self.model_path:
        return Llama(
            model_path=cast(str, self.model_path),
            chat_format=self.chat_format,
            lora_base=self.lora_base,
            n_ctx=self.n_ctx,
            n_gpu_layers=self.n_gpu_layers,
            use_mmap=self.use_mmap,
            vocab_only=self.vocab_only,
        )
    else:
        return Llama.from_pretrained(
            repo_id=self.repo_id,
            filename=self.filename,
            chat_format=self.chat_format,
            lora_base=self.lora_base,
            n_ctx=self.n_ctx,
            n_gpu_layers=self.n_gpu_layers,
            use_mmap=self.use_mmap,
            vocab_only=self.vocab_only,
        )

AzureChatOpenAI

Bases: BaseChatOpenAI

OpenAI chat model provided by Microsoft Azure

Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
class AzureChatOpenAI(BaseChatOpenAI):
    """OpenAI chat model provided by Microsoft Azure"""

    azure_endpoint: str = Param(
        help=(
            "HTTPS endpoint for the Azure OpenAI model. The azure_endpoint, "
            "azure_deployment, and api_version parameters are used to construct "
            "the full URL for the Azure OpenAI model."
        ),
        required=True,
    )
    azure_deployment: str = Param(help="Azure deployment name", required=True)
    api_version: str = Param(help="Azure model version", required=True)
    azure_ad_token: Optional[str] = Param(None, help="Azure AD token")
    azure_ad_token_provider: Optional[str] = Param(None, help="Azure AD token provider")

    @Param.auto(depends_on=["azure_ad_token_provider"])
    def azure_ad_token_provider_(self):
        if isinstance(self.azure_ad_token_provider, str):
            return import_dotted_string(self.azure_ad_token_provider, safe=False)

    def prepare_client(self, async_version: bool = False):
        """Get the OpenAI client

        Args:
            async_version (bool): Whether to get the async version of the client
        """
        params = {
            "azure_endpoint": self.azure_endpoint,
            "api_version": self.api_version,
            "api_key": self.api_key,
            "azure_ad_token": self.azure_ad_token,
            "azure_ad_token_provider": self.azure_ad_token_provider_,
            "timeout": self.timeout,
            "max_retries": self.max_retries_,
        }
        if async_version:
            from openai import AsyncAzureOpenAI

            return AsyncAzureOpenAI(**params)

        from openai import AzureOpenAI

        return AzureOpenAI(**params)

    def openai_response(self, client, **kwargs):
        """Get the openai response"""
        if "tools_pydantic" in kwargs:
            kwargs.pop("tools_pydantic")

        params_ = {
            "model": self.azure_deployment,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "n": self.n,
            "stop": self.stop,
            "frequency_penalty": self.frequency_penalty,
            "presence_penalty": self.presence_penalty,
            "tool_choice": self.tool_choice,
            "tools": self.tools,
            "logprobs": self.logprobs,
            "logit_bias": self.logit_bias,
            "top_logprobs": self.top_logprobs,
            "top_p": self.top_p,
        }
        params = {k: v for k, v in params_.items() if v is not None}
        params.update(kwargs)

        return client.chat.completions.create(**params)

prepare_client

prepare_client(async_version=False)

Get the OpenAI client

Parameters:

Name Type Description Default
async_version bool

Whether to get the async version of the client

False
Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
def prepare_client(self, async_version: bool = False):
    """Get the OpenAI client

    Args:
        async_version (bool): Whether to get the async version of the client
    """
    params = {
        "azure_endpoint": self.azure_endpoint,
        "api_version": self.api_version,
        "api_key": self.api_key,
        "azure_ad_token": self.azure_ad_token,
        "azure_ad_token_provider": self.azure_ad_token_provider_,
        "timeout": self.timeout,
        "max_retries": self.max_retries_,
    }
    if async_version:
        from openai import AsyncAzureOpenAI

        return AsyncAzureOpenAI(**params)

    from openai import AzureOpenAI

    return AzureOpenAI(**params)

openai_response

openai_response(client, **kwargs)

Get the openai response

Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
def openai_response(self, client, **kwargs):
    """Get the openai response"""
    if "tools_pydantic" in kwargs:
        kwargs.pop("tools_pydantic")

    params_ = {
        "model": self.azure_deployment,
        "temperature": self.temperature,
        "max_tokens": self.max_tokens,
        "n": self.n,
        "stop": self.stop,
        "frequency_penalty": self.frequency_penalty,
        "presence_penalty": self.presence_penalty,
        "tool_choice": self.tool_choice,
        "tools": self.tools,
        "logprobs": self.logprobs,
        "logit_bias": self.logit_bias,
        "top_logprobs": self.top_logprobs,
        "top_p": self.top_p,
    }
    params = {k: v for k, v in params_.items() if v is not None}
    params.update(kwargs)

    return client.chat.completions.create(**params)

ChatOpenAI

Bases: BaseChatOpenAI

OpenAI chat model

Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
class ChatOpenAI(BaseChatOpenAI):
    """OpenAI chat model"""

    base_url: Optional[str] = Param(None, help="OpenAI base URL")
    organization: Optional[str] = Param(None, help="OpenAI organization")
    model: str = Param(help="OpenAI model", required=True)

    def prepare_client(self, async_version: bool = False):
        """Get the OpenAI client

        Args:
            async_version (bool): Whether to get the async version of the client
        """
        params = {
            "api_key": self.api_key,
            "organization": self.organization,
            "base_url": self.base_url,
            "timeout": self.timeout,
            "max_retries": self.max_retries_,
        }
        if async_version:
            from openai import AsyncOpenAI

            return AsyncOpenAI(**params)

        from openai import OpenAI

        return OpenAI(**params)

    def openai_response(self, client, **kwargs):
        """Get the openai response"""
        if "tools_pydantic" in kwargs:
            kwargs.pop("tools_pydantic")

        params_ = {
            "model": self.model,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "n": self.n,
            "stop": self.stop,
            "frequency_penalty": self.frequency_penalty,
            "presence_penalty": self.presence_penalty,
            "tool_choice": self.tool_choice,
            "tools": self.tools,
            "logprobs": self.logprobs,
            "logit_bias": self.logit_bias,
            "top_logprobs": self.top_logprobs,
            "top_p": self.top_p,
        }
        params = {k: v for k, v in params_.items() if v is not None}
        params.update(kwargs)

        return client.chat.completions.create(**params)

prepare_client

prepare_client(async_version=False)

Get the OpenAI client

Parameters:

Name Type Description Default
async_version bool

Whether to get the async version of the client

False
Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
def prepare_client(self, async_version: bool = False):
    """Get the OpenAI client

    Args:
        async_version (bool): Whether to get the async version of the client
    """
    params = {
        "api_key": self.api_key,
        "organization": self.organization,
        "base_url": self.base_url,
        "timeout": self.timeout,
        "max_retries": self.max_retries_,
    }
    if async_version:
        from openai import AsyncOpenAI

        return AsyncOpenAI(**params)

    from openai import OpenAI

    return OpenAI(**params)

openai_response

openai_response(client, **kwargs)

Get the openai response

Source code in libs/kotaemon/kotaemon/llms/chats/openai.py
def openai_response(self, client, **kwargs):
    """Get the openai response"""
    if "tools_pydantic" in kwargs:
        kwargs.pop("tools_pydantic")

    params_ = {
        "model": self.model,
        "temperature": self.temperature,
        "max_tokens": self.max_tokens,
        "n": self.n,
        "stop": self.stop,
        "frequency_penalty": self.frequency_penalty,
        "presence_penalty": self.presence_penalty,
        "tool_choice": self.tool_choice,
        "tools": self.tools,
        "logprobs": self.logprobs,
        "logit_bias": self.logit_bias,
        "top_logprobs": self.top_logprobs,
        "top_p": self.top_p,
    }
    params = {k: v for k, v in params_.items() if v is not None}
    params.update(kwargs)

    return client.chat.completions.create(**params)