Skip to content

Langchain Based

LCChatMixin ΒΆ

Mixin for langchain based chat models

Source code in libs/kotaemon/kotaemon/llms/chats/langchain_based.py
class LCChatMixin:
    """Mixin for langchain based chat models"""

    def _get_lc_class(self):
        raise NotImplementedError(
            "Please return the relevant Langchain class in in _get_lc_class"
        )

    def _get_tool_call_kwargs(self):
        return {}

    def __init__(self, stream: bool = False, **params):
        self._lc_class = self._get_lc_class()
        self._obj = self._lc_class(**params)
        self._kwargs: dict = params
        self._stream = stream

        super().__init__()

    def run(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        if self._stream:
            return self.stream(messages, **kwargs)  # type: ignore
        return self.invoke(messages, **kwargs)

    def prepare_message(self, messages: str | BaseMessage | list[BaseMessage]):
        input_: list[BaseMessage] = []

        if isinstance(messages, str):
            input_ = [HumanMessage(content=messages)]
        elif isinstance(messages, BaseMessage):
            input_ = [messages]
        else:
            input_ = messages

        return input_

    def prepare_response(self, pred):
        all_text = [each.text for each in pred.generations[0]]
        all_messages = [each.message for each in pred.generations[0]]

        completion_tokens, total_tokens, prompt_tokens = 0, 0, 0
        try:
            if pred.llm_output is not None:
                completion_tokens = pred.llm_output["token_usage"]["completion_tokens"]
                total_tokens = pred.llm_output["token_usage"]["total_tokens"]
                prompt_tokens = pred.llm_output["token_usage"]["prompt_tokens"]
        except Exception:
            pass

        return LLMInterface(
            text=all_text[0] if len(all_text) > 0 else "",
            candidates=all_text,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
            prompt_tokens=prompt_tokens,
            messages=all_messages,
            logits=[],
        )

    def invoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        """Generate response from messages

        Args:
            messages: history of messages to generate response from
            **kwargs: additional arguments to pass to the langchain chat model

        Returns:
            LLMInterface: generated response
        """
        input_ = self.prepare_message(messages)

        if "tools_pydantic" in kwargs:
            tools = kwargs.pop(
                "tools_pydantic",
            )
            lc_tool_call = self._obj.bind_tools(tools)
            pred = lc_tool_call.invoke(
                input_,
                **self._get_tool_call_kwargs(),
            )
            if pred.tool_calls:
                tool_calls = pred.tool_calls
            else:
                tool_calls = pred.additional_kwargs.get("tool_calls", [])

            output = LLMInterface(
                content="",
                additional_kwargs={"tool_calls": tool_calls},
            )
        else:
            pred = self._obj.generate(messages=[input_], **kwargs)
            output = self.prepare_response(pred)

        return output

    async def ainvoke(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> LLMInterface:
        input_ = self.prepare_message(messages)
        pred = await self._obj.agenerate(messages=[input_], **kwargs)
        return self.prepare_response(pred)

    def stream(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> Iterator[LLMInterface]:
        for response in self._obj.stream(input=messages, **kwargs):
            yield LLMInterface(content=response.content)

    async def astream(
        self, messages: str | BaseMessage | list[BaseMessage], **kwargs
    ) -> AsyncGenerator[LLMInterface, None]:
        async for response in self._obj.astream(input=messages, **kwargs):
            yield LLMInterface(content=response.content)

    def to_langchain_format(self):
        return self._obj

    def __repr__(self):
        kwargs = []
        for key, value_obj in self._kwargs.items():
            value = repr(value_obj)
            kwargs.append(f"{key}={value}")
        kwargs_repr = ", ".join(kwargs)
        return f"{self.__class__.__name__}({kwargs_repr})"

    def __str__(self):
        kwargs = []
        for key, value_obj in self._kwargs.items():
            value = str(value_obj)
            if len(value) > 20:
                value = f"{value[:15]}..."
            kwargs.append(f"{key}={value}")
        kwargs_repr = ", ".join(kwargs)
        return f"{self.__class__.__name__}({kwargs_repr})"

    def __setattr__(self, name, value):
        if name == "_lc_class":
            return super().__setattr__(name, value)

        if name in self._lc_class.__fields__:
            self._kwargs[name] = value
            self._obj = self._lc_class(**self._kwargs)
        else:
            super().__setattr__(name, value)

    def __getattr__(self, name):
        if name in self._kwargs:
            return self._kwargs[name]
        return getattr(self._obj, name)

    def dump(self, *args, **kwargs):
        from theflow.utils.modules import serialize

        params = {key: serialize(value) for key, value in self._kwargs.items()}
        return {
            "__type__": f"{self.__module__}.{self.__class__.__qualname__}",
            **params,
        }

    def specs(self, path: str):
        path = path.strip(".")
        if "." in path:
            raise ValueError("path should not contain '.'")

        if path in self._lc_class.__fields__:
            return {
                "__type__": "theflow.base.ParamAttr",
                "refresh_on_set": True,
                "strict_type": True,
            }

        raise ValueError(f"Invalid param {path}")

invoke ΒΆ

invoke(messages, **kwargs)

Generate response from messages

Parameters:

Name Type Description Default
messages str | BaseMessage | list[BaseMessage]

history of messages to generate response from

required
**kwargs

additional arguments to pass to the langchain chat model

{}

Returns:

Name Type Description
LLMInterface LLMInterface

generated response

Source code in libs/kotaemon/kotaemon/llms/chats/langchain_based.py
def invoke(
    self, messages: str | BaseMessage | list[BaseMessage], **kwargs
) -> LLMInterface:
    """Generate response from messages

    Args:
        messages: history of messages to generate response from
        **kwargs: additional arguments to pass to the langchain chat model

    Returns:
        LLMInterface: generated response
    """
    input_ = self.prepare_message(messages)

    if "tools_pydantic" in kwargs:
        tools = kwargs.pop(
            "tools_pydantic",
        )
        lc_tool_call = self._obj.bind_tools(tools)
        pred = lc_tool_call.invoke(
            input_,
            **self._get_tool_call_kwargs(),
        )
        if pred.tool_calls:
            tool_calls = pred.tool_calls
        else:
            tool_calls = pred.additional_kwargs.get("tool_calls", [])

        output = LLMInterface(
            content="",
            additional_kwargs={"tool_calls": tool_calls},
        )
    else:
        pred = self._obj.generate(messages=[input_], **kwargs)
        output = self.prepare_response(pred)

    return output