Skip to content

Agents

BaseAgent

Bases: BaseComponent

Define base agent interface

Source code in libs/kotaemon/kotaemon/agents/base.py
class BaseAgent(BaseComponent):
    """Define base agent interface"""

    name: str = Param(help="Name of the agent.")
    agent_type: AgentType = Param(help="Agent type, must be one of AgentType")
    description: str = Param(
        help=(
            "Description used to tell the model how/when/why to use the agent. You can"
            " provide few-shot examples as a part of the description. This will be"
            " input to the prompt of LLM."
        )
    )
    llm: Optional[BaseLLM] = Node(
        help=(
            "LLM to be used for the agent (optional). LLM must implement BaseLLM"
            " interface."
        )
    )
    prompt_template: Optional[Union[PromptTemplate, dict[str, PromptTemplate]]] = Param(
        help="A prompt template or a dict to supply different prompt to the agent"
    )
    plugins: list[BaseTool] = Param(
        default_callback=lambda _: [],
        help="List of plugins / tools to be used in the agent",
    )

    @staticmethod
    def safeguard_run(run_func, *args, **kwargs):
        def wrapper(self, *args, **kwargs):
            try:
                return run_func(self, *args, **kwargs)
            except Exception as e:
                return AgentOutput(
                    text="",
                    agent_type=self.agent_type,
                    status="failed",
                    error=str(e),
                )

        return wrapper

    def add_tools(self, tools: list[BaseTool]) -> None:
        """Helper method to add tools and update agent state if needed"""
        self.plugins.extend(tools)

    def run(self, *args, **kwargs) -> AgentOutput | list[AgentOutput]:
        """Run the component."""
        raise NotImplementedError()

add_tools

add_tools(tools)

Helper method to add tools and update agent state if needed

Source code in libs/kotaemon/kotaemon/agents/base.py
def add_tools(self, tools: list[BaseTool]) -> None:
    """Helper method to add tools and update agent state if needed"""
    self.plugins.extend(tools)

run

run(*args, **kwargs)

Run the component.

Source code in libs/kotaemon/kotaemon/agents/base.py
def run(self, *args, **kwargs) -> AgentOutput | list[AgentOutput]:
    """Run the component."""
    raise NotImplementedError()

AgentFinish

Bases: NamedTuple

Agent's return value when finishing execution.

Parameters:

Name Type Description Default
return_values

The return values of the agent.

required
log

The log message.

required
Source code in libs/kotaemon/kotaemon/agents/io/base.py
class AgentFinish(NamedTuple):
    """Agent's return value when finishing execution.

    Args:
        return_values: The return values of the agent.
        log: The log message.
    """

    return_values: dict
    log: str

AgentOutput

Bases: LLMInterface

Output from an agent.

Parameters:

Name Type Description Default
text

The text output from the agent.

required
agent_type

The type of agent.

required
status

The status after executing the agent.

required
error

The error message if any.

required
Source code in libs/kotaemon/kotaemon/agents/io/base.py
class AgentOutput(LLMInterface):
    """Output from an agent.

    Args:
        text: The text output from the agent.
        agent_type: The type of agent.
        status: The status after executing the agent.
        error: The error message if any.
    """

    model_config = ConfigDict(extra="allow")

    text: str
    type: str = "agent"
    agent_type: AgentType
    status: Literal["thinking", "finished", "stopped", "failed"]
    error: Optional[str] = None
    intermediate_steps: Optional[list] = None

AgentType

Bases: Enum

Enumerated type for agent types.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
class AgentType(Enum):
    """
    Enumerated type for agent types.
    """

    openai = "openai"
    openai_multi = "openai_multi"
    openai_tool = "openai_tool"
    self_ask = "self_ask"
    react = "react"
    rewoo = "rewoo"
    vanilla = "vanilla"

BaseScratchPad

Base class for output handlers.

Attributes:

logger : logging.Logger The logger object to log messages.

Methods:

stop(): Stop the output.

update_status(output: str, **kwargs): Update the status of the output.

thinking(name: str): Log that a process is thinking.

done(_all=False): Log that the process is done.

stream_print(item: str): Not implemented.

json_print(item: Dict[str, Any]): Log a JSON object.

panel_print(item: Any, title: str = "Output", stream: bool = False): Log a panel output.

clear(): Not implemented.

print(content: str, **kwargs): Log arbitrary content.

format_json(json_obj: str): Format a JSON object.

debug(content: str, **kwargs): Log a debug message.

info(content: str, **kwargs): Log an informational message.

warning(content: str, **kwargs): Log a warning message.

error(content: str, **kwargs): Log an error message.

critical(content: str, **kwargs): Log a critical message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
class BaseScratchPad:
    """
    Base class for output handlers.

    Attributes:
    -----------
    logger : logging.Logger
        The logger object to log messages.

    Methods:
    --------
    stop():
        Stop the output.

    update_status(output: str, **kwargs):
        Update the status of the output.

    thinking(name: str):
        Log that a process is thinking.

    done(_all=False):
        Log that the process is done.

    stream_print(item: str):
        Not implemented.

    json_print(item: Dict[str, Any]):
        Log a JSON object.

    panel_print(item: Any, title: str = "Output", stream: bool = False):
        Log a panel output.

    clear():
        Not implemented.

    print(content: str, **kwargs):
        Log arbitrary content.

    format_json(json_obj: str):
        Format a JSON object.

    debug(content: str, **kwargs):
        Log a debug message.

    info(content: str, **kwargs):
        Log an informational message.

    warning(content: str, **kwargs):
        Log a warning message.

    error(content: str, **kwargs):
        Log an error message.

    critical(content: str, **kwargs):
        Log a critical message.
    """

    def __init__(self):
        """
        Initialize the BaseOutput object.

        """
        self.logger = logging
        self.log = []

    def stop(self):
        """
        Stop the output.
        """

    def update_status(self, output: str, **kwargs):
        """
        Update the status of the output.
        """
        if check_log():
            self.logger.info(output)

    def thinking(self, name: str):
        """
        Log that a process is thinking.
        """
        if check_log():
            self.logger.info(f"{name} is thinking...")

    def done(self, _all=False):
        """
        Log that the process is done.
        """

        if check_log():
            self.logger.info("Done")

    def stream_print(self, item: str):
        """
        Stream print.
        """

    def json_print(self, item: Dict[str, Any]):
        """
        Log a JSON object.
        """
        if check_log():
            self.logger.info(json.dumps(item, indent=2))

    def panel_print(self, item: Any, title: str = "Output", stream: bool = False):
        """
        Log a panel output.

        Args:
            item : Any
                The item to log.
            title : str, optional
                The title of the panel, defaults to "Output".
            stream : bool, optional
        """
        if not stream:
            self.log.append(item)
        if check_log():
            self.logger.info("-" * 20)
            self.logger.info(item)
            self.logger.info("-" * 20)

    def clear(self):
        """
        Not implemented.
        """

    def print(self, content: str, **kwargs):
        """
        Log arbitrary content.
        """
        self.log.append(content)
        if check_log():
            self.logger.info(content)

    def format_json(self, json_obj: str):
        """
        Format a JSON object.
        """
        formatted_json = json.dumps(json_obj, indent=2)
        return formatted_json

    def debug(self, content: str, **kwargs):
        """
        Log a debug message.
        """
        if check_log():
            self.logger.debug(content, **kwargs)

    def info(self, content: str, **kwargs):
        """
        Log an informational message.
        """
        if check_log():
            self.logger.info(content, **kwargs)

    def warning(self, content: str, **kwargs):
        """
        Log a warning message.
        """
        if check_log():
            self.logger.warning(content, **kwargs)

    def error(self, content: str, **kwargs):
        """
        Log an error message.
        """
        if check_log():
            self.logger.error(content, **kwargs)

    def critical(self, content: str, **kwargs):
        """
        Log a critical message.
        """
        if check_log():
            self.logger.critical(content, **kwargs)

stop

stop()

Stop the output.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def stop(self):
    """
    Stop the output.
    """

update_status

update_status(output, **kwargs)

Update the status of the output.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def update_status(self, output: str, **kwargs):
    """
    Update the status of the output.
    """
    if check_log():
        self.logger.info(output)

thinking

thinking(name)

Log that a process is thinking.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def thinking(self, name: str):
    """
    Log that a process is thinking.
    """
    if check_log():
        self.logger.info(f"{name} is thinking...")

done

done(_all=False)

Log that the process is done.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def done(self, _all=False):
    """
    Log that the process is done.
    """

    if check_log():
        self.logger.info("Done")

stream_print

stream_print(item)

Stream print.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def stream_print(self, item: str):
    """
    Stream print.
    """

json_print

json_print(item)

Log a JSON object.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def json_print(self, item: Dict[str, Any]):
    """
    Log a JSON object.
    """
    if check_log():
        self.logger.info(json.dumps(item, indent=2))

panel_print

panel_print(item, title='Output', stream=False)

Log a panel output.

Parameters:

Name Type Description Default
item

Any The item to log.

required
title

str, optional The title of the panel, defaults to "Output".

'Output'
stream

bool, optional

False
Source code in libs/kotaemon/kotaemon/agents/io/base.py
def panel_print(self, item: Any, title: str = "Output", stream: bool = False):
    """
    Log a panel output.

    Args:
        item : Any
            The item to log.
        title : str, optional
            The title of the panel, defaults to "Output".
        stream : bool, optional
    """
    if not stream:
        self.log.append(item)
    if check_log():
        self.logger.info("-" * 20)
        self.logger.info(item)
        self.logger.info("-" * 20)

clear

clear()

Not implemented.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def clear(self):
    """
    Not implemented.
    """

print

print(content, **kwargs)

Log arbitrary content.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def print(self, content: str, **kwargs):
    """
    Log arbitrary content.
    """
    self.log.append(content)
    if check_log():
        self.logger.info(content)

format_json

format_json(json_obj)

Format a JSON object.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def format_json(self, json_obj: str):
    """
    Format a JSON object.
    """
    formatted_json = json.dumps(json_obj, indent=2)
    return formatted_json

debug

debug(content, **kwargs)

Log a debug message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def debug(self, content: str, **kwargs):
    """
    Log a debug message.
    """
    if check_log():
        self.logger.debug(content, **kwargs)

info

info(content, **kwargs)

Log an informational message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def info(self, content: str, **kwargs):
    """
    Log an informational message.
    """
    if check_log():
        self.logger.info(content, **kwargs)

warning

warning(content, **kwargs)

Log a warning message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def warning(self, content: str, **kwargs):
    """
    Log a warning message.
    """
    if check_log():
        self.logger.warning(content, **kwargs)

error

error(content, **kwargs)

Log an error message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def error(self, content: str, **kwargs):
    """
    Log an error message.
    """
    if check_log():
        self.logger.error(content, **kwargs)

critical

critical(content, **kwargs)

Log a critical message.

Source code in libs/kotaemon/kotaemon/agents/io/base.py
def critical(self, content: str, **kwargs):
    """
    Log a critical message.
    """
    if check_log():
        self.logger.critical(content, **kwargs)

LangchainAgent

Bases: BaseAgent

Wrapper for Langchain Agent

Source code in libs/kotaemon/kotaemon/agents/langchain_based.py
class LangchainAgent(BaseAgent):
    """Wrapper for Langchain Agent"""

    name: str = "LangchainAgent"
    agent_type: AgentType
    description: str = "LangchainAgent for answering multi-step reasoning questions"
    AGENT_TYPE_MAP = {
        AgentType.openai: LCAgentType.OPENAI_FUNCTIONS,
        AgentType.openai_multi: LCAgentType.OPENAI_MULTI_FUNCTIONS,
        AgentType.react: LCAgentType.ZERO_SHOT_REACT_DESCRIPTION,
        AgentType.self_ask: LCAgentType.SELF_ASK_WITH_SEARCH,
    }
    agent: Optional[LCAgentExecutor] = None

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if self.agent_type not in self.AGENT_TYPE_MAP:
            raise NotImplementedError(
                f"AgentType {self.agent_type } not supported by Langchain wrapper"
            )
        self.update_agent_tools()

    def update_agent_tools(self):
        assert isinstance(self.llm, (ChatLLM, LLM))
        langchain_plugins = [tool.to_langchain_format() for tool in self.plugins]

        # a fix for search_doc tool name:
        # use "Intermediate Answer" for self-ask agent
        found_search_tool = False
        if self.agent_type == AgentType.self_ask:
            for plugin in langchain_plugins:
                if plugin.name == "search_doc":
                    plugin.name = "Intermediate Answer"
                    langchain_plugins = [plugin]
                    found_search_tool = True
                    break

        if self.agent_type != AgentType.self_ask or found_search_tool:
            # reinit Langchain AgentExecutor
            self.agent = initialize_agent(
                langchain_plugins,
                self.llm.to_langchain_format(),
                agent=self.AGENT_TYPE_MAP[self.agent_type],
                handle_parsing_errors=True,
                verbose=True,
            )

    def add_tools(self, tools: List[BaseTool]) -> None:
        super().add_tools(tools)
        self.update_agent_tools()
        return

    def run(self, instruction: str) -> AgentOutput:
        assert (
            self.agent is not None
        ), "Lanchain AgentExecutor is not correctly initialized"

        # Langchain AgentExecutor call
        output = self.agent(instruction)["output"]

        return AgentOutput(
            text=output,
            agent_type=self.agent_type,
            status="finished",
        )

ReactAgent

Bases: BaseAgent

Sequential ReactAgent class inherited from BaseAgent. Implementing ReAct agent paradigm https://arxiv.org/pdf/2210.03629.pdf

Source code in libs/kotaemon/kotaemon/agents/react/agent.py
class ReactAgent(BaseAgent):
    """
    Sequential ReactAgent class inherited from BaseAgent.
    Implementing ReAct agent paradigm https://arxiv.org/pdf/2210.03629.pdf
    """

    name: str = "ReactAgent"
    agent_type: AgentType = AgentType.react
    description: str = "ReactAgent for answering multi-step reasoning questions"
    llm: BaseLLM
    prompt_template: Optional[PromptTemplate] = None
    output_lang: str = "English"
    plugins: list[BaseTool] = Param(
        default_callback=lambda _: [], help="List of tools to be used in the agent. "
    )
    examples: dict[str, str | list[str]] = Param(
        default_callback=lambda _: {}, help="Examples to be used in the agent. "
    )
    intermediate_steps: list[tuple[AgentAction | AgentFinish, str]] = Param(
        default_callback=lambda _: [],
        help="List of AgentAction and observation (tool) output",
    )
    max_iterations: int = 5
    strict_decode: bool = False
    max_context_length: int = Param(
        default=3000,
        help="Max context length for each tool output.",
    )
    trim_func: TokenSplitter | None = None

    def _compose_plugin_description(self) -> str:
        """
        Compose the worker prompt from the workers.

        Example:
        toolname1[input]: tool1 description
        toolname2[input]: tool2 description
        """
        prompt = ""
        try:
            for plugin in self.plugins:
                prompt += f"{plugin.name}[input]: {plugin.description}\n"
        except Exception:
            raise ValueError("Worker must have a name and description.")
        return prompt

    def _construct_scratchpad(
        self, intermediate_steps: list[tuple[AgentAction | AgentFinish, str]] = []
    ) -> str:
        """Construct the scratchpad that lets the agent continue its thought process."""
        thoughts = ""
        for action, observation in intermediate_steps:
            thoughts += action.log
            thoughts += f"\nObservation: {observation}\nThought:"
        return thoughts

    def _parse_output(self, text: str) -> Optional[AgentAction | AgentFinish]:
        """
        Parse text output from LLM for the next Action or Final Answer
        Using Regex to parse "Action:\n Action Input:\n" for the next Action
        Using FINAL_ANSWER_ACTION to parse Final Answer

        Args:
            text[str]: input text to parse
        """
        includes_answer = FINAL_ANSWER_ACTION in text
        regex = (
            r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
        )
        action_match = re.search(regex, text, re.DOTALL)
        action_output: Optional[AgentAction | AgentFinish] = None
        if action_match:
            if includes_answer:
                raise Exception(
                    "Parsing LLM output produced both a final answer "
                    f"and a parse-able action: {text}"
                )
            action = action_match.group(1).strip()
            action_input = action_match.group(2)
            tool_input = action_input.strip(" ")
            # ensure if its a well formed SQL query we don't remove any trailing " chars
            if tool_input.startswith("SELECT ") is False:
                tool_input = tool_input.strip('"')

            action_output = AgentAction(action, tool_input, text)

        elif includes_answer:
            action_output = AgentFinish(
                {"output": text.split(FINAL_ANSWER_ACTION)[-1].strip()}, text
            )
        else:
            if self.strict_decode:
                raise Exception(f"Could not parse LLM output: `{text}`")
            else:
                action_output = AgentFinish({"output": text}, text)

        return action_output

    def _compose_prompt(self, instruction) -> str:
        """
        Compose the prompt from template, worker description, examples and instruction.
        """
        agent_scratchpad = self._construct_scratchpad(self.intermediate_steps)
        tool_description = self._compose_plugin_description()
        tool_names = ", ".join([plugin.name for plugin in self.plugins])
        if self.prompt_template is None:
            from .prompt import zero_shot_react_prompt

            self.prompt_template = zero_shot_react_prompt
        return self.prompt_template.populate(
            instruction=instruction,
            agent_scratchpad=agent_scratchpad,
            tool_description=tool_description,
            tool_names=tool_names,
            lang=self.output_lang,
        )

    def _format_function_map(self) -> dict[str, BaseTool]:
        """Format the function map for the open AI function API.

        Return:
            Dict[str, Callable]: The function map.
        """
        # Map the function name to the real function object.
        function_map = {}
        for plugin in self.plugins:
            function_map[plugin.name] = plugin
        return function_map

    def _trim(self, text: str | Document) -> str:
        """
        Trim the text to the maximum token length.
        """
        evidence_trim_func = (
            self.trim_func
            if self.trim_func
            else TokenSplitter(
                chunk_size=self.max_context_length,
                chunk_overlap=0,
                separator=" ",
                tokenizer=partial(
                    tiktoken.encoding_for_model("gpt-3.5-turbo").encode,
                    allowed_special=set(),
                    disallowed_special="all",
                ),
            )
        )
        if isinstance(text, str):
            texts = evidence_trim_func([Document(text=text)])
        elif isinstance(text, Document):
            texts = evidence_trim_func([text])
        else:
            raise ValueError("Invalid text type to trim")
        trim_text = texts[0].text
        logging.info(f"len (trimmed): {len(trim_text)}")
        return trim_text

    def clear(self):
        """
        Clear and reset the agent.
        """
        self.intermediate_steps = []

    def run(self, instruction, max_iterations=None) -> AgentOutput:
        """
        Run the agent with the given instruction.

        Args:
            instruction: Instruction to run the agent with.
            max_iterations: Maximum number of iterations
                of reasoning steps, defaults to 10.

        Return:
            AgentOutput object.
        """
        if not max_iterations:
            max_iterations = self.max_iterations
        assert max_iterations > 0

        self.clear()
        logging.info(f"Running {self.name} with instruction: {instruction}")
        total_cost = 0.0
        total_token = 0
        status = "failed"
        response_text = None

        for step_count in range(1, max_iterations + 1):
            prompt = self._compose_prompt(instruction)
            logging.info(f"Prompt: {prompt}")
            response = self.llm(
                prompt, stop=["Observation:"]
            )  # could cause bugs if llm doesn't have `stop` as a parameter
            response_text = response.text
            logging.info(f"Response: {response_text}")
            action_step = self._parse_output(response_text)
            if action_step is None:
                raise ValueError("Invalid action")
            is_finished_chain = isinstance(action_step, AgentFinish)
            if is_finished_chain:
                result = ""
            else:
                assert isinstance(action_step, AgentAction)
                action_name = action_step.tool
                tool_input = action_step.tool_input
                logging.info(f"Action: {action_name}")
                logging.info(f"Tool Input: {tool_input}")
                result = self._format_function_map()[action_name](tool_input)

                # trim the worker output to 1000 tokens, as we are appending
                # all workers' logs and it can exceed the token limit if we
                # don't limit each. Fix this number regarding to the LLM capacity.
                result = self._trim(result)
                logging.info(f"Result: {result}")

            self.intermediate_steps.append((action_step, result))
            if is_finished_chain:
                logging.info(f"Finished after {step_count} steps.")
                status = "finished"
                break
        else:
            status = "stopped"

        return AgentOutput(
            text=response_text,
            agent_type=self.agent_type,
            status=status,
            total_tokens=total_token,
            total_cost=total_cost,
            intermediate_steps=self.intermediate_steps,
            max_iterations=max_iterations,
        )

    def stream(self, instruction, max_iterations=None):
        """
        Stream the agent with the given instruction.

        Args:
            instruction: Instruction to run the agent with.
            max_iterations: Maximum number of iterations
                of reasoning steps, defaults to 10.

        Return:
            AgentOutput object.
        """
        if not max_iterations:
            max_iterations = self.max_iterations
        assert max_iterations > 0

        self.clear()
        logging.info(f"Running {self.name} with instruction: {instruction}")
        print(f"Running {self.name} with instruction: {instruction}")
        total_cost = 0.0
        total_token = 0
        status = "failed"
        response_text = None

        for step_count in range(1, max_iterations + 1):
            prompt = self._compose_prompt(instruction)
            logging.info(f"Prompt: {prompt}")
            print(f"Prompt: {prompt}")
            response = self.llm(
                prompt, stop=["Observation:"]
            )  # TODO: could cause bugs if llm doesn't have `stop` as a parameter
            response_text = response.text
            logging.info(f"Response: {response_text}")
            print(f"Response: {response_text}")
            action_step = self._parse_output(response_text)
            if action_step is None:
                raise ValueError("Invalid action")
            is_finished_chain = isinstance(action_step, AgentFinish)
            if is_finished_chain:
                result = response_text
                if "Final Answer:" in response_text:
                    result = response_text.split("Final Answer:")[-1].strip()
            else:
                assert isinstance(action_step, AgentAction)
                action_name = action_step.tool
                tool_input = action_step.tool_input
                logging.info(f"Action: {action_name}")
                print(f"Action: {action_name}")
                logging.info(f"Tool Input: {tool_input}")
                print(f"Tool Input: {tool_input}")
                result = self._format_function_map()[action_name](tool_input)

                # trim the worker output to 1000 tokens, as we are appending
                # all workers' logs and it can exceed the token limit if we
                # don't limit each. Fix this number regarding to the LLM capacity.
                result = self._trim(result)
                logging.info(f"Result: {result}")
                print(f"Result: {result}")

            self.intermediate_steps.append((action_step, result))
            if is_finished_chain:
                logging.info(f"Finished after {step_count} steps.")
                status = "finished"
                yield AgentOutput(
                    text=result,
                    agent_type=self.agent_type,
                    status=status,
                    intermediate_steps=self.intermediate_steps[-1],
                )
                break
            else:
                yield AgentOutput(
                    text="",
                    agent_type=self.agent_type,
                    status="thinking",
                    intermediate_steps=self.intermediate_steps[-1],
                )

        else:
            status = "stopped"
            yield AgentOutput(
                text="",
                agent_type=self.agent_type,
                status=status,
                intermediate_steps=self.intermediate_steps[-1],
            )

        return AgentOutput(
            text=response_text,
            agent_type=self.agent_type,
            status=status,
            total_tokens=total_token,
            total_cost=total_cost,
            intermediate_steps=self.intermediate_steps,
            max_iterations=max_iterations,
        )

clear

clear()

Clear and reset the agent.

Source code in libs/kotaemon/kotaemon/agents/react/agent.py
def clear(self):
    """
    Clear and reset the agent.
    """
    self.intermediate_steps = []

run

run(instruction, max_iterations=None)

Run the agent with the given instruction.

Parameters:

Name Type Description Default
instruction

Instruction to run the agent with.

required
max_iterations

Maximum number of iterations of reasoning steps, defaults to 10.

None
Return

AgentOutput object.

Source code in libs/kotaemon/kotaemon/agents/react/agent.py
def run(self, instruction, max_iterations=None) -> AgentOutput:
    """
    Run the agent with the given instruction.

    Args:
        instruction: Instruction to run the agent with.
        max_iterations: Maximum number of iterations
            of reasoning steps, defaults to 10.

    Return:
        AgentOutput object.
    """
    if not max_iterations:
        max_iterations = self.max_iterations
    assert max_iterations > 0

    self.clear()
    logging.info(f"Running {self.name} with instruction: {instruction}")
    total_cost = 0.0
    total_token = 0
    status = "failed"
    response_text = None

    for step_count in range(1, max_iterations + 1):
        prompt = self._compose_prompt(instruction)
        logging.info(f"Prompt: {prompt}")
        response = self.llm(
            prompt, stop=["Observation:"]
        )  # could cause bugs if llm doesn't have `stop` as a parameter
        response_text = response.text
        logging.info(f"Response: {response_text}")
        action_step = self._parse_output(response_text)
        if action_step is None:
            raise ValueError("Invalid action")
        is_finished_chain = isinstance(action_step, AgentFinish)
        if is_finished_chain:
            result = ""
        else:
            assert isinstance(action_step, AgentAction)
            action_name = action_step.tool
            tool_input = action_step.tool_input
            logging.info(f"Action: {action_name}")
            logging.info(f"Tool Input: {tool_input}")
            result = self._format_function_map()[action_name](tool_input)

            # trim the worker output to 1000 tokens, as we are appending
            # all workers' logs and it can exceed the token limit if we
            # don't limit each. Fix this number regarding to the LLM capacity.
            result = self._trim(result)
            logging.info(f"Result: {result}")

        self.intermediate_steps.append((action_step, result))
        if is_finished_chain:
            logging.info(f"Finished after {step_count} steps.")
            status = "finished"
            break
    else:
        status = "stopped"

    return AgentOutput(
        text=response_text,
        agent_type=self.agent_type,
        status=status,
        total_tokens=total_token,
        total_cost=total_cost,
        intermediate_steps=self.intermediate_steps,
        max_iterations=max_iterations,
    )

stream

stream(instruction, max_iterations=None)

Stream the agent with the given instruction.

Parameters:

Name Type Description Default
instruction

Instruction to run the agent with.

required
max_iterations

Maximum number of iterations of reasoning steps, defaults to 10.

None
Return

AgentOutput object.

Source code in libs/kotaemon/kotaemon/agents/react/agent.py
def stream(self, instruction, max_iterations=None):
    """
    Stream the agent with the given instruction.

    Args:
        instruction: Instruction to run the agent with.
        max_iterations: Maximum number of iterations
            of reasoning steps, defaults to 10.

    Return:
        AgentOutput object.
    """
    if not max_iterations:
        max_iterations = self.max_iterations
    assert max_iterations > 0

    self.clear()
    logging.info(f"Running {self.name} with instruction: {instruction}")
    print(f"Running {self.name} with instruction: {instruction}")
    total_cost = 0.0
    total_token = 0
    status = "failed"
    response_text = None

    for step_count in range(1, max_iterations + 1):
        prompt = self._compose_prompt(instruction)
        logging.info(f"Prompt: {prompt}")
        print(f"Prompt: {prompt}")
        response = self.llm(
            prompt, stop=["Observation:"]
        )  # TODO: could cause bugs if llm doesn't have `stop` as a parameter
        response_text = response.text
        logging.info(f"Response: {response_text}")
        print(f"Response: {response_text}")
        action_step = self._parse_output(response_text)
        if action_step is None:
            raise ValueError("Invalid action")
        is_finished_chain = isinstance(action_step, AgentFinish)
        if is_finished_chain:
            result = response_text
            if "Final Answer:" in response_text:
                result = response_text.split("Final Answer:")[-1].strip()
        else:
            assert isinstance(action_step, AgentAction)
            action_name = action_step.tool
            tool_input = action_step.tool_input
            logging.info(f"Action: {action_name}")
            print(f"Action: {action_name}")
            logging.info(f"Tool Input: {tool_input}")
            print(f"Tool Input: {tool_input}")
            result = self._format_function_map()[action_name](tool_input)

            # trim the worker output to 1000 tokens, as we are appending
            # all workers' logs and it can exceed the token limit if we
            # don't limit each. Fix this number regarding to the LLM capacity.
            result = self._trim(result)
            logging.info(f"Result: {result}")
            print(f"Result: {result}")

        self.intermediate_steps.append((action_step, result))
        if is_finished_chain:
            logging.info(f"Finished after {step_count} steps.")
            status = "finished"
            yield AgentOutput(
                text=result,
                agent_type=self.agent_type,
                status=status,
                intermediate_steps=self.intermediate_steps[-1],
            )
            break
        else:
            yield AgentOutput(
                text="",
                agent_type=self.agent_type,
                status="thinking",
                intermediate_steps=self.intermediate_steps[-1],
            )

    else:
        status = "stopped"
        yield AgentOutput(
            text="",
            agent_type=self.agent_type,
            status=status,
            intermediate_steps=self.intermediate_steps[-1],
        )

    return AgentOutput(
        text=response_text,
        agent_type=self.agent_type,
        status=status,
        total_tokens=total_token,
        total_cost=total_cost,
        intermediate_steps=self.intermediate_steps,
        max_iterations=max_iterations,
    )

RewooAgent

Bases: BaseAgent

Distributive RewooAgent class inherited from BaseAgent. Implementing ReWOO paradigm https://arxiv.org/pdf/2305.18323.pdf

Source code in libs/kotaemon/kotaemon/agents/rewoo/agent.py
class RewooAgent(BaseAgent):
    """Distributive RewooAgent class inherited from BaseAgent.
    Implementing ReWOO paradigm https://arxiv.org/pdf/2305.18323.pdf"""

    name: str = "RewooAgent"
    agent_type: AgentType = AgentType.rewoo
    description: str = "RewooAgent for answering multi-step reasoning questions"
    output_lang: str = "English"
    planner_llm: BaseLLM
    solver_llm: BaseLLM
    prompt_template: dict[str, PromptTemplate] = Param(
        default_callback=lambda _: {},
        help="A dict to supply different prompt to the agent.",
    )
    plugins: list[BaseTool] = Param(
        default_callback=lambda _: [], help="A list of plugins to be used in the model."
    )
    examples: dict[str, str | list[str]] = Param(
        default_callback=lambda _: {}, help="Examples to be used in the agent."
    )
    max_context_length: int = Param(
        default=3000,
        help="Max context length for each tool output.",
    )
    trim_func: TokenSplitter | None = None

    @Node.auto(depends_on=["planner_llm", "plugins", "prompt_template", "examples"])
    def planner(self):
        return Planner(
            model=self.planner_llm,
            plugins=self.plugins,
            prompt_template=self.prompt_template.get("Planner", None),
            examples=self.examples.get("Planner", None),
        )

    @Node.auto(depends_on=["solver_llm", "prompt_template", "examples"])
    def solver(self):
        return Solver(
            model=self.solver_llm,
            prompt_template=self.prompt_template.get("Solver", None),
            examples=self.examples.get("Solver", None),
            output_lang=self.output_lang,
        )

    def _parse_plan_map(
        self, planner_response: str
    ) -> tuple[dict[str, list[str]], dict[str, str]]:
        """
        Parse planner output. It should be an n-to-n mapping from Plans to #Es.
        This is because sometimes LLM cannot follow the strict output format.
        Example:
            #Plan1
            #E1
            #E2
        should result in: {"#Plan1": ["#E1", "#E2"]}
        Or:
            #Plan1
            #Plan2
            #E1
        should result in: {"#Plan1": [], "#Plan2": ["#E1"]}
        This function should also return a plan map.

        Returns:
            tuple[Dict[str, List[str]], Dict[str, str]]: A list of plan map
        """
        valid_chunk = [
            line
            for line in planner_response.splitlines()
            if line.startswith("#Plan") or line.startswith("#E")
        ]

        plan_to_es: dict[str, list[str]] = dict()
        plans: dict[str, str] = dict()
        prev_key = ""
        for line in valid_chunk:
            key, description = line.split(":", 1)
            key = key.strip()
            if key.startswith("#Plan"):
                plans[key] = description.strip()
                plan_to_es[key] = []
                prev_key = key
            elif key.startswith("#E"):
                plan_to_es[prev_key].append(key)

        return plan_to_es, plans

    def _parse_planner_evidences(
        self, planner_response: str
    ) -> tuple[dict[str, str], list[list[str]]]:
        """
        Parse planner output. This should return a mapping from #E to tool call.
        It should also identify the level of each #E in dependency map.
        Example:
            {
            "#E1": "Tool1", "#E2": "Tool2",
            "#E3": "Tool3", "#E4": "Tool4"
            }, [[#E1, #E2], [#E3, #E4]]

        Returns:
            tuple[dict[str, str], List[List[str]]]:
            A mapping from #E to tool call and a list of levels.
        """
        evidences: dict[str, str] = dict()
        dependence: dict[str, list[str]] = dict()
        for line in planner_response.splitlines():
            if line.startswith("#E") and line[2].isdigit():
                e, tool_call = line.split(":", 1)
                e, tool_call = e.strip(), tool_call.strip()
                if len(e) == 3:
                    dependence[e] = []
                    evidences[e] = tool_call
                    for var in re.findall(r"#E\d+", tool_call):
                        if var in evidences:
                            dependence[e].append(var)
                else:
                    evidences[e] = "No evidence found"
        level = []
        while dependence:
            select = [i for i in dependence if not dependence[i]]
            if len(select) == 0:
                raise ValueError("Circular dependency detected.")
            level.append(select)
            for item in select:
                dependence.pop(item)
            for item in dependence:
                for i in select:
                    if i in dependence[item]:
                        dependence[item].remove(i)

        return evidences, level

    def _run_plugin(
        self,
        e: str,
        planner_evidences: dict[str, str],
        worker_evidences: dict[str, str],
        output=BaseScratchPad(),
    ):
        """
        Run a plugin for a given evidence.
        This function should also cumulate the cost and tokens.
        """
        result = dict(e=e, plugin_cost=0, plugin_token=0, evidence="")
        tool_call = planner_evidences[e]
        if "[" not in tool_call:
            result["evidence"] = tool_call
        else:
            tool, tool_input = tool_call.split("[", 1)
            tool_input = tool_input[:-1]
            # find variables in input and replace with previous evidences
            for var in re.findall(r"#E\d+", tool_input):
                print("Tool input: ", tool_input)
                print("Var: ", var)
                print("Worker evidences: ", worker_evidences)
                if var in worker_evidences:
                    tool_input = tool_input.replace(
                        var, worker_evidences.get(var, "") or ""
                    )
            try:
                selected_plugin = self._find_plugin(tool)
                if selected_plugin is None:
                    raise ValueError("Invalid plugin detected")
                tool_response = selected_plugin(tool_input)
                result["evidence"] = get_plugin_response_content(tool_response)
            except ValueError:
                result["evidence"] = "No evidence found."
            finally:
                output.panel_print(
                    result["evidence"], f"[green] Function Response of [blue]{tool}: "
                )
        return result

    def _get_worker_evidence(
        self,
        planner_evidences: dict[str, str],
        evidences_level: list[list[str]],
        output=BaseScratchPad(),
    ) -> Any:
        """
        Parallel execution of plugins in DAG for speedup.
        This is one of core benefits of ReWOO agents.

        Args:
            planner_evidences: A mapping from #E to tool call.
            evidences_level: A list of levels of evidences.
                Calculated from DAG of plugin calls.
            output: Output object, defaults to BaseOutput().
        Returns:
            A mapping from #E to tool call.
        """
        worker_evidences: dict[str, str] = dict()
        plugin_cost, plugin_token = 0.0, 0.0
        with ThreadPoolExecutor() as pool:
            for level in evidences_level:
                results = []
                for e in level:
                    results.append(
                        pool.submit(
                            self._run_plugin,
                            e,
                            planner_evidences,
                            worker_evidences,
                            output,
                        )
                    )
                if len(results) > 1:
                    output.update_status(f"Running tasks {level} in parallel.")
                else:
                    output.update_status(f"Running task {level[0]}.")
                for r in results:
                    resp = r.result()
                    plugin_cost += resp["plugin_cost"]
                    plugin_token += resp["plugin_token"]
                    worker_evidences[resp["e"]] = self._trim_evidence(resp["evidence"])
                output.done()

        return worker_evidences, plugin_cost, plugin_token

    def _find_plugin(self, name: str):
        for p in self.plugins:
            if p.name == name:
                return p

    def _trim_evidence(self, evidence: str):
        evidence_trim_func = (
            self.trim_func
            if self.trim_func
            else TokenSplitter(
                chunk_size=self.max_context_length,
                chunk_overlap=0,
                separator=" ",
                tokenizer=partial(
                    tiktoken.encoding_for_model("gpt-3.5-turbo").encode,
                    allowed_special=set(),
                    disallowed_special="all",
                ),
            )
        )
        if evidence:
            texts = evidence_trim_func([Document(text=evidence)])
            evidence = texts[0].text
            logging.info(f"len (trimmed): {len(evidence)}")
            return evidence

    @BaseAgent.safeguard_run
    def run(self, instruction: str, use_citation: bool = False) -> AgentOutput:
        """
        Run the agent with a given instruction.
        """
        logging.info(f"Running {self.name} with instruction: {instruction}")
        total_cost = 0.0
        total_token = 0

        # Plan
        planner_output = self.planner(instruction)
        planner_text_output = planner_output.text
        plan_to_es, plans = self._parse_plan_map(planner_text_output)
        planner_evidences, evidence_level = self._parse_planner_evidences(
            planner_text_output
        )

        # Work
        worker_evidences, plugin_cost, plugin_token = self._get_worker_evidence(
            planner_evidences, evidence_level
        )
        worker_log = ""
        for plan in plan_to_es:
            worker_log += f"{plan}: {plans[plan]}\n"
            for e in plan_to_es[plan]:
                worker_log += f"{e}: {worker_evidences[e]}\n"

        # Solve
        solver_output = self.solver(instruction, worker_log)
        solver_output_text = solver_output.text
        if use_citation:
            citation_pipeline = CitationPipeline(llm=self.solver_llm)
            citation = citation_pipeline(context=worker_log, question=instruction)
        else:
            citation = None

        return AgentOutput(
            text=solver_output_text,
            agent_type=self.agent_type,
            status="finished",
            total_tokens=total_token,
            total_cost=total_cost,
            citation=citation,
            metadata={"citation": citation, "worker_log": worker_log},
        )

    def stream(self, instruction: str, use_citation: bool = False):
        """
        Stream the agent with a given instruction.
        """
        logging.info(f"Streaming {self.name} with instruction: {instruction}")
        total_cost = 0.0
        total_token = 0

        # Plan
        planner_output = self.planner(instruction)
        planner_text_output = planner_output.text
        plan_to_es, plans = self._parse_plan_map(planner_text_output)
        planner_evidences, evidence_level = self._parse_planner_evidences(
            planner_text_output
        )

        print("Planner output:", planner_text_output)
        # output planner to info panel
        yield AgentOutput(
            text="",
            agent_type=self.agent_type,
            status="thinking",
            intermediate_steps=[{"planner_log": planner_text_output}],
        )

        # Work
        worker_evidences, plugin_cost, plugin_token = self._get_worker_evidence(
            planner_evidences, evidence_level
        )
        worker_log = ""
        for plan in plan_to_es:
            worker_log += f"{plan}: {plans[plan]}\n"
            current_progress = f"{plan}: {plans[plan]}\n"
            for e in plan_to_es[plan]:
                worker_log += f"#Action: {planner_evidences.get(e, None)}\n"
                worker_log += f"{e}: {worker_evidences[e]}\n"
                current_progress += f"#Action: {planner_evidences.get(e, None)}\n"
                current_progress += f"{e}: {worker_evidences[e]}\n"

            yield AgentOutput(
                text="",
                agent_type=self.agent_type,
                status="thinking",
                intermediate_steps=[{"worker_log": current_progress}],
            )

        # Solve
        solver_response = ""
        for solver_output in self.solver.stream(instruction, worker_log):
            solver_output_text = solver_output.text
            solver_response += solver_output_text
            yield AgentOutput(
                text=solver_output_text,
                agent_type=self.agent_type,
                status="thinking",
            )
        if use_citation:
            citation_pipeline = CitationPipeline(llm=self.solver_llm)
            citation = citation_pipeline.invoke(
                context=worker_log, question=instruction
            )
        else:
            citation = None

        return AgentOutput(
            text="",
            agent_type=self.agent_type,
            status="finished",
            total_tokens=total_token,
            total_cost=total_cost,
            citation=citation,
            metadata={"citation": citation, "worker_log": worker_log},
        )

run

run(instruction, use_citation=False)

Run the agent with a given instruction.

Source code in libs/kotaemon/kotaemon/agents/rewoo/agent.py
@BaseAgent.safeguard_run
def run(self, instruction: str, use_citation: bool = False) -> AgentOutput:
    """
    Run the agent with a given instruction.
    """
    logging.info(f"Running {self.name} with instruction: {instruction}")
    total_cost = 0.0
    total_token = 0

    # Plan
    planner_output = self.planner(instruction)
    planner_text_output = planner_output.text
    plan_to_es, plans = self._parse_plan_map(planner_text_output)
    planner_evidences, evidence_level = self._parse_planner_evidences(
        planner_text_output
    )

    # Work
    worker_evidences, plugin_cost, plugin_token = self._get_worker_evidence(
        planner_evidences, evidence_level
    )
    worker_log = ""
    for plan in plan_to_es:
        worker_log += f"{plan}: {plans[plan]}\n"
        for e in plan_to_es[plan]:
            worker_log += f"{e}: {worker_evidences[e]}\n"

    # Solve
    solver_output = self.solver(instruction, worker_log)
    solver_output_text = solver_output.text
    if use_citation:
        citation_pipeline = CitationPipeline(llm=self.solver_llm)
        citation = citation_pipeline(context=worker_log, question=instruction)
    else:
        citation = None

    return AgentOutput(
        text=solver_output_text,
        agent_type=self.agent_type,
        status="finished",
        total_tokens=total_token,
        total_cost=total_cost,
        citation=citation,
        metadata={"citation": citation, "worker_log": worker_log},
    )

stream

stream(instruction, use_citation=False)

Stream the agent with a given instruction.

Source code in libs/kotaemon/kotaemon/agents/rewoo/agent.py
def stream(self, instruction: str, use_citation: bool = False):
    """
    Stream the agent with a given instruction.
    """
    logging.info(f"Streaming {self.name} with instruction: {instruction}")
    total_cost = 0.0
    total_token = 0

    # Plan
    planner_output = self.planner(instruction)
    planner_text_output = planner_output.text
    plan_to_es, plans = self._parse_plan_map(planner_text_output)
    planner_evidences, evidence_level = self._parse_planner_evidences(
        planner_text_output
    )

    print("Planner output:", planner_text_output)
    # output planner to info panel
    yield AgentOutput(
        text="",
        agent_type=self.agent_type,
        status="thinking",
        intermediate_steps=[{"planner_log": planner_text_output}],
    )

    # Work
    worker_evidences, plugin_cost, plugin_token = self._get_worker_evidence(
        planner_evidences, evidence_level
    )
    worker_log = ""
    for plan in plan_to_es:
        worker_log += f"{plan}: {plans[plan]}\n"
        current_progress = f"{plan}: {plans[plan]}\n"
        for e in plan_to_es[plan]:
            worker_log += f"#Action: {planner_evidences.get(e, None)}\n"
            worker_log += f"{e}: {worker_evidences[e]}\n"
            current_progress += f"#Action: {planner_evidences.get(e, None)}\n"
            current_progress += f"{e}: {worker_evidences[e]}\n"

        yield AgentOutput(
            text="",
            agent_type=self.agent_type,
            status="thinking",
            intermediate_steps=[{"worker_log": current_progress}],
        )

    # Solve
    solver_response = ""
    for solver_output in self.solver.stream(instruction, worker_log):
        solver_output_text = solver_output.text
        solver_response += solver_output_text
        yield AgentOutput(
            text=solver_output_text,
            agent_type=self.agent_type,
            status="thinking",
        )
    if use_citation:
        citation_pipeline = CitationPipeline(llm=self.solver_llm)
        citation = citation_pipeline.invoke(
            context=worker_log, question=instruction
        )
    else:
        citation = None

    return AgentOutput(
        text="",
        agent_type=self.agent_type,
        status="finished",
        total_tokens=total_token,
        total_cost=total_cost,
        citation=citation,
        metadata={"citation": citation, "worker_log": worker_log},
    )

BaseTool

Bases: BaseComponent

Source code in libs/kotaemon/kotaemon/agents/tools/base.py
class BaseTool(BaseComponent):
    name: str
    """The unique name of the tool that clearly communicates its purpose."""
    description: str
    """Description used to tell the model how/when/why to use the tool.
    You can provide few-shot examples as a part of the description. This will be
    input to the prompt of LLM.
    """
    args_schema: Optional[Type[BaseModel]] = None
    """Pydantic model class to validate and parse the tool's input arguments."""
    verbose: bool = False
    """Whether to log the tool's progress."""
    handle_tool_error: Optional[
        Union[bool, str, Callable[[ToolException], str]]
    ] = False
    """Handle the content of the ToolException thrown."""

    def _parse_input(
        self,
        tool_input: Union[str, Dict],
    ) -> Union[str, Dict[str, Any]]:
        """Convert tool input to pydantic model."""
        args_schema = self.args_schema
        if isinstance(tool_input, str):
            if args_schema is not None:
                key_ = next(iter(args_schema.model_fields.keys()))
                args_schema.validate({key_: tool_input})
            return tool_input
        else:
            if args_schema is not None:
                result = args_schema.parse_obj(tool_input)
                return {k: v for k, v in result.dict().items() if k in tool_input}
        return tool_input

    def _run_tool(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        """Call tool."""
        raise NotImplementedError(f"_run_tool is not implemented for {self.name}")

    def _to_args_and_kwargs(self, tool_input: Union[str, Dict]) -> Tuple[Tuple, Dict]:
        # For backwards compatibility, if run_input is a string,
        # pass as a positional argument.
        if isinstance(tool_input, str):
            return (tool_input,), {}
        else:
            return (), tool_input

    def _handle_tool_error(self, e: ToolException) -> Any:
        """Handle the content of the ToolException thrown."""
        observation = None
        if not self.handle_tool_error:
            raise e
        elif isinstance(self.handle_tool_error, bool):
            if e.args:
                observation = e.args[0]
            else:
                observation = "Tool execution error"
        elif isinstance(self.handle_tool_error, str):
            observation = self.handle_tool_error
        elif callable(self.handle_tool_error):
            observation = self.handle_tool_error(e)
        else:
            raise ValueError(
                f"Got unexpected type of `handle_tool_error`. Expected bool, str "
                f"or callable. Received: {self.handle_tool_error}"
            )
        return observation

    def to_langchain_format(self) -> LCTool:
        """Convert this tool to Langchain format to use with its agent"""
        return LCTool(name=self.name, description=self.description, func=self.run)

    def run(
        self,
        tool_input: Union[str, Dict],
        verbose: Optional[bool] = None,
        **kwargs: Any,
    ) -> Any:
        """Run the tool."""
        parsed_input = self._parse_input(tool_input)
        # TODO (verbose_): Add logging
        try:
            tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
            call_kwargs = {**kwargs, **tool_kwargs}
            observation = self._run_tool(*tool_args, **call_kwargs)
        except ToolException as e:
            observation = self._handle_tool_error(e)
            return observation
        else:
            return observation

    @classmethod
    def from_langchain_format(cls, langchain_tool: LCTool) -> "BaseTool":
        """Wrapper for Langchain Tool"""
        new_tool = BaseTool(
            name=langchain_tool.name, description=langchain_tool.description
        )
        new_tool._run_tool = langchain_tool._run  # type: ignore
        return new_tool

name instance-attribute

name

The unique name of the tool that clearly communicates its purpose.

description instance-attribute

description

Description used to tell the model how/when/why to use the tool. You can provide few-shot examples as a part of the description. This will be input to the prompt of LLM.

args_schema class-attribute instance-attribute

args_schema = None

Pydantic model class to validate and parse the tool's input arguments.

verbose class-attribute instance-attribute

verbose = False

Whether to log the tool's progress.

handle_tool_error class-attribute instance-attribute

handle_tool_error = False

Handle the content of the ToolException thrown.

to_langchain_format

to_langchain_format()

Convert this tool to Langchain format to use with its agent

Source code in libs/kotaemon/kotaemon/agents/tools/base.py
def to_langchain_format(self) -> LCTool:
    """Convert this tool to Langchain format to use with its agent"""
    return LCTool(name=self.name, description=self.description, func=self.run)

run

run(tool_input, verbose=None, **kwargs)

Run the tool.

Source code in libs/kotaemon/kotaemon/agents/tools/base.py
def run(
    self,
    tool_input: Union[str, Dict],
    verbose: Optional[bool] = None,
    **kwargs: Any,
) -> Any:
    """Run the tool."""
    parsed_input = self._parse_input(tool_input)
    # TODO (verbose_): Add logging
    try:
        tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
        call_kwargs = {**kwargs, **tool_kwargs}
        observation = self._run_tool(*tool_args, **call_kwargs)
    except ToolException as e:
        observation = self._handle_tool_error(e)
        return observation
    else:
        return observation

from_langchain_format classmethod

from_langchain_format(langchain_tool)

Wrapper for Langchain Tool

Source code in libs/kotaemon/kotaemon/agents/tools/base.py
@classmethod
def from_langchain_format(cls, langchain_tool: LCTool) -> "BaseTool":
    """Wrapper for Langchain Tool"""
    new_tool = BaseTool(
        name=langchain_tool.name, description=langchain_tool.description
    )
    new_tool._run_tool = langchain_tool._run  # type: ignore
    return new_tool

ComponentTool

Bases: BaseTool

Wrapper around other BaseComponent to use it as a tool

Parameters:

Name Type Description Default
component

BaseComponent-based component to wrap

required
postprocessor

Optional postprocessor for the component output

required
Source code in libs/kotaemon/kotaemon/agents/tools/base.py
class ComponentTool(BaseTool):
    """Wrapper around other BaseComponent to use it as a tool

    Args:
        component: BaseComponent-based component to wrap
        postprocessor: Optional postprocessor for the component output
    """

    component: BaseComponent
    postprocessor: Optional[Callable] = None

    def _run_tool(self, *args: Any, **kwargs: Any) -> Any:
        output = self.component(*args, **kwargs)
        if self.postprocessor:
            output = self.postprocessor(output)

        return output

WikipediaTool

Bases: BaseTool

Tool that adds the capability to query the Wikipedia API.

Source code in libs/kotaemon/kotaemon/agents/tools/wikipedia.py
class WikipediaTool(BaseTool):
    """Tool that adds the capability to query the Wikipedia API."""

    name: str = "wikipedia"
    description: str = (
        "Search engine from Wikipedia, retrieving relevant wiki page. "
        "Useful when you need to get holistic knowledge about people, "
        "places, companies, historical events, or other subjects. "
        "Input should be a search query."
    )
    args_schema: Optional[Type[BaseModel]] = WikipediaArgs
    doc_store: Any = None

    def _run_tool(self, query: AnyStr) -> AnyStr:
        if not self.doc_store:
            self.doc_store = Wiki()
        tool = self.doc_store
        evidence = tool.search(query)
        return evidence