Skip to content

Storages

BaseDocumentStore

Bases: ABC

A document store is in charged of storing and managing documents

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
class BaseDocumentStore(ABC):
    """A document store is in charged of storing and managing documents"""

    @abstractmethod
    def __init__(self, *args, **kwargs):
        ...

    @abstractmethod
    def add(
        self,
        docs: Union[Document, List[Document]],
        ids: Optional[Union[List[str], str]] = None,
        **kwargs,
    ):
        """Add document into document store

        Args:
            docs: Document or list of documents
            ids: List of ids of the documents. Optional, if not set will use doc.doc_id
        """
        ...

    @abstractmethod
    def get(self, ids: Union[List[str], str]) -> List[Document]:
        """Get document by id"""
        ...

    @abstractmethod
    def get_all(self) -> List[Document]:
        """Get all documents"""
        ...

    @abstractmethod
    def count(self) -> int:
        """Count number of documents"""
        ...

    @abstractmethod
    def query(
        self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
    ) -> List[Document]:
        """Search document store using search query"""
        ...

    @abstractmethod
    def delete(self, ids: Union[List[str], str]):
        """Delete document by id"""
        ...

    @abstractmethod
    def drop(self):
        """Drop the document store"""
        ...

add abstractmethod

add(docs, ids=None, **kwargs)

Add document into document store

Parameters:

Name Type Description Default
docs Union[Document, List[Document]]

Document or list of documents

required
ids Optional[Union[List[str], str]]

List of ids of the documents. Optional, if not set will use doc.doc_id

None
Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def add(
    self,
    docs: Union[Document, List[Document]],
    ids: Optional[Union[List[str], str]] = None,
    **kwargs,
):
    """Add document into document store

    Args:
        docs: Document or list of documents
        ids: List of ids of the documents. Optional, if not set will use doc.doc_id
    """
    ...

get abstractmethod

get(ids)

Get document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def get(self, ids: Union[List[str], str]) -> List[Document]:
    """Get document by id"""
    ...

get_all abstractmethod

get_all()

Get all documents

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def get_all(self) -> List[Document]:
    """Get all documents"""
    ...

count abstractmethod

count()

Count number of documents

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def count(self) -> int:
    """Count number of documents"""
    ...

query abstractmethod

query(query, top_k=10, doc_ids=None)

Search document store using search query

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def query(
    self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
) -> List[Document]:
    """Search document store using search query"""
    ...

delete abstractmethod

delete(ids)

Delete document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def delete(self, ids: Union[List[str], str]):
    """Delete document by id"""
    ...

drop abstractmethod

drop()

Drop the document store

Source code in libs/kotaemon/kotaemon/storages/docstores/base.py
@abstractmethod
def drop(self):
    """Drop the document store"""
    ...

ElasticsearchDocumentStore

Bases: BaseDocumentStore

Simple memory document store that store document in a dictionary

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
class ElasticsearchDocumentStore(BaseDocumentStore):
    """Simple memory document store that store document in a dictionary"""

    def __init__(
        self,
        collection_name: str = "docstore",
        elasticsearch_url: str = "http://localhost:9200",
        k1: float = 2.0,
        b: float = 0.75,
        **kwargs,
    ):
        try:
            from elasticsearch import Elasticsearch
            from elasticsearch.helpers import bulk
        except ImportError:
            raise ImportError(
                "To use ElaticsearchDocstore please install `pip install elasticsearch`"
            )

        self.elasticsearch_url = elasticsearch_url
        self.index_name = collection_name
        self.k1 = k1
        self.b = b

        # Create an Elasticsearch client instance
        self.client = Elasticsearch(elasticsearch_url, **kwargs)
        self.es_bulk = bulk
        # Define the index settings and mappings
        settings = {
            "analysis": {"analyzer": {"default": {"type": "standard"}}},
            "similarity": {
                "custom_bm25": {
                    "type": "BM25",
                    "k1": k1,
                    "b": b,
                }
            },
        }
        mappings = {
            "properties": {
                "content": {
                    "type": "text",
                    "similarity": "custom_bm25",  # Use the custom BM25 similarity
                }
            }
        }

        # Create the index with the specified settings and mappings
        if not self.client.indices.exists(index=self.index_name):
            self.client.indices.create(
                index=self.index_name, mappings=mappings, settings=settings
            )

    def add(
        self,
        docs: Union[Document, List[Document]],
        ids: Optional[Union[List[str], str]] = None,
        refresh_indices: bool = True,
        **kwargs,
    ):
        """Add document into document store

        Args:
            docs: list of documents to add
            ids: specify the ids of documents to add or use existing doc.doc_id
            refresh_indices: request Elasticsearch to update its index (default to True)
        """
        if ids and not isinstance(ids, list):
            ids = [ids]
        if not isinstance(docs, list):
            docs = [docs]
        doc_ids = ids if ids else [doc.doc_id for doc in docs]

        requests = []
        for doc_id, doc in zip(doc_ids, docs):
            text = doc.text
            metadata = doc.metadata
            request = {
                "_op_type": "index",
                "_index": self.index_name,
                "content": text,
                "metadata": metadata,
                "_id": doc_id,
            }
            requests.append(request)

        success, failed = self.es_bulk(self.client, requests)
        print("Added/Updated documents to index", success)
        print("Failed documents to index", failed)

        if refresh_indices:
            self.client.indices.refresh(index=self.index_name)

    def query_raw(self, query: dict) -> List[Document]:
        """Query Elasticsearch store using query format of ES client

        Args:
            query (dict): Elasticsearch query format

        Returns:
            List[Document]: List of result documents
        """
        res = self.client.search(index=self.index_name, body=query)
        docs = []
        for r in res["hits"]["hits"]:
            docs.append(
                Document(
                    id_=r["_id"],
                    text=r["_source"]["content"],
                    metadata=r["_source"]["metadata"],
                )
            )
        return docs

    def query(
        self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
    ) -> List[Document]:
        """Search Elasticsearch docstore using search query (BM25)

        Args:
            query (str): query text
            top_k (int, optional): number of
                top documents to return. Defaults to 10.

        Returns:
            List[Document]: List of result documents
        """
        query_dict: dict = {"match": {"content": query}}
        if doc_ids is not None:
            query_dict = {"bool": {"must": [query_dict, {"terms": {"_id": doc_ids}}]}}
        query_dict = {"query": query_dict, "size": top_k}
        return self.query_raw(query_dict)

    def get(self, ids: Union[List[str], str]) -> List[Document]:
        """Get document by id"""
        if not isinstance(ids, list):
            ids = [ids]
        query_dict = {"query": {"terms": {"_id": ids}}, "size": 10000}
        return self.query_raw(query_dict)

    def count(self) -> int:
        """Count number of documents"""
        count = int(
            self.client.cat.count(index=self.index_name, format="json")[0]["count"]
        )
        return count

    def get_all(self) -> List[Document]:
        """Get all documents"""
        query_dict = {"query": {"match_all": {}}, "size": MAX_DOCS_TO_GET}
        return self.query_raw(query_dict)

    def delete(self, ids: Union[List[str], str]):
        """Delete document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        query = {"query": {"terms": {"_id": ids}}}
        self.client.delete_by_query(index=self.index_name, body=query)
        self.client.indices.refresh(index=self.index_name)

    def drop(self):
        """Drop the document store"""
        self.client.indices.delete(index=self.index_name)
        self.client.indices.refresh(index=self.index_name)

    def __persist_flow__(self):
        return {
            "index_name": self.index_name,
            "elasticsearch_url": self.elasticsearch_url,
            "k1": self.k1,
            "b": self.b,
        }

add

add(docs, ids=None, refresh_indices=True, **kwargs)

Add document into document store

Parameters:

Name Type Description Default
docs Union[Document, List[Document]]

list of documents to add

required
ids Optional[Union[List[str], str]]

specify the ids of documents to add or use existing doc.doc_id

None
refresh_indices bool

request Elasticsearch to update its index (default to True)

True
Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def add(
    self,
    docs: Union[Document, List[Document]],
    ids: Optional[Union[List[str], str]] = None,
    refresh_indices: bool = True,
    **kwargs,
):
    """Add document into document store

    Args:
        docs: list of documents to add
        ids: specify the ids of documents to add or use existing doc.doc_id
        refresh_indices: request Elasticsearch to update its index (default to True)
    """
    if ids and not isinstance(ids, list):
        ids = [ids]
    if not isinstance(docs, list):
        docs = [docs]
    doc_ids = ids if ids else [doc.doc_id for doc in docs]

    requests = []
    for doc_id, doc in zip(doc_ids, docs):
        text = doc.text
        metadata = doc.metadata
        request = {
            "_op_type": "index",
            "_index": self.index_name,
            "content": text,
            "metadata": metadata,
            "_id": doc_id,
        }
        requests.append(request)

    success, failed = self.es_bulk(self.client, requests)
    print("Added/Updated documents to index", success)
    print("Failed documents to index", failed)

    if refresh_indices:
        self.client.indices.refresh(index=self.index_name)

query_raw

query_raw(query)

Query Elasticsearch store using query format of ES client

Parameters:

Name Type Description Default
query dict

Elasticsearch query format

required

Returns:

Type Description
List[Document]

List[Document]: List of result documents

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def query_raw(self, query: dict) -> List[Document]:
    """Query Elasticsearch store using query format of ES client

    Args:
        query (dict): Elasticsearch query format

    Returns:
        List[Document]: List of result documents
    """
    res = self.client.search(index=self.index_name, body=query)
    docs = []
    for r in res["hits"]["hits"]:
        docs.append(
            Document(
                id_=r["_id"],
                text=r["_source"]["content"],
                metadata=r["_source"]["metadata"],
            )
        )
    return docs

query

query(query, top_k=10, doc_ids=None)

Search Elasticsearch docstore using search query (BM25)

Parameters:

Name Type Description Default
query str

query text

required
top_k int

number of top documents to return. Defaults to 10.

10

Returns:

Type Description
List[Document]

List[Document]: List of result documents

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def query(
    self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
) -> List[Document]:
    """Search Elasticsearch docstore using search query (BM25)

    Args:
        query (str): query text
        top_k (int, optional): number of
            top documents to return. Defaults to 10.

    Returns:
        List[Document]: List of result documents
    """
    query_dict: dict = {"match": {"content": query}}
    if doc_ids is not None:
        query_dict = {"bool": {"must": [query_dict, {"terms": {"_id": doc_ids}}]}}
    query_dict = {"query": query_dict, "size": top_k}
    return self.query_raw(query_dict)

get

get(ids)

Get document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def get(self, ids: Union[List[str], str]) -> List[Document]:
    """Get document by id"""
    if not isinstance(ids, list):
        ids = [ids]
    query_dict = {"query": {"terms": {"_id": ids}}, "size": 10000}
    return self.query_raw(query_dict)

count

count()

Count number of documents

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def count(self) -> int:
    """Count number of documents"""
    count = int(
        self.client.cat.count(index=self.index_name, format="json")[0]["count"]
    )
    return count

get_all

get_all()

Get all documents

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def get_all(self) -> List[Document]:
    """Get all documents"""
    query_dict = {"query": {"match_all": {}}, "size": MAX_DOCS_TO_GET}
    return self.query_raw(query_dict)

delete

delete(ids)

Delete document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def delete(self, ids: Union[List[str], str]):
    """Delete document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    query = {"query": {"terms": {"_id": ids}}}
    self.client.delete_by_query(index=self.index_name, body=query)
    self.client.indices.refresh(index=self.index_name)

drop

drop()

Drop the document store

Source code in libs/kotaemon/kotaemon/storages/docstores/elasticsearch.py
def drop(self):
    """Drop the document store"""
    self.client.indices.delete(index=self.index_name)
    self.client.indices.refresh(index=self.index_name)

InMemoryDocumentStore

Bases: BaseDocumentStore

Simple memory document store that store document in a dictionary

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
class InMemoryDocumentStore(BaseDocumentStore):
    """Simple memory document store that store document in a dictionary"""

    def __init__(self):
        self._store = {}

    def add(
        self,
        docs: Union[Document, List[Document]],
        ids: Optional[Union[List[str], str]] = None,
        **kwargs,
    ):
        """Add document into document store

        Args:
            docs: list of documents to add
            ids: specify the ids of documents to add or
                use existing doc.doc_id
            exist_ok: raise error when duplicate doc-id
                found in the docstore (default to False)
        """
        exist_ok: bool = kwargs.pop("exist_ok", False)

        if ids and not isinstance(ids, list):
            ids = [ids]
        if not isinstance(docs, list):
            docs = [docs]
        doc_ids = ids if ids else [doc.doc_id for doc in docs]

        for doc_id, doc in zip(doc_ids, docs):
            if doc_id in self._store and not exist_ok:
                raise ValueError(f"Document with id {doc_id} already exist")
            self._store[doc_id] = doc

    def get(self, ids: Union[List[str], str]) -> List[Document]:
        """Get document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        return [self._store[doc_id] for doc_id in ids]

    def get_all(self) -> List[Document]:
        """Get all documents"""
        return list(self._store.values())

    def count(self) -> int:
        """Count number of documents"""
        return len(self._store)

    def delete(self, ids: Union[List[str], str]):
        """Delete document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        for doc_id in ids:
            del self._store[doc_id]

    def save(self, path: Union[str, Path]):
        """Save document to path"""
        store = {key: value.to_dict() for key, value in self._store.items()}
        with open(path, "w") as f:
            json.dump(store, f)

    def load(self, path: Union[str, Path]):
        """Load document store from path"""
        with open(path) as f:
            store = json.load(f)
        # TODO: save and load aren't lossless. A Document-subclass will lose
        # information. Need to edit the `to_dict` and `from_dict` methods in
        # the Document class.
        # For better query support, utilize SQLite as the default document store.
        # Also, for portability, use SQLAlchemy for document store.
        self._store = {key: Document.from_dict(value) for key, value in store.items()}

    def query(
        self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
    ) -> List[Document]:
        """Perform full-text search on document store"""
        return []

    def __persist_flow__(self):
        return {}

    def drop(self):
        """Drop the document store"""
        self._store = {}

add

add(docs, ids=None, **kwargs)

Add document into document store

Parameters:

Name Type Description Default
docs Union[Document, List[Document]]

list of documents to add

required
ids Optional[Union[List[str], str]]

specify the ids of documents to add or use existing doc.doc_id

None
exist_ok

raise error when duplicate doc-id found in the docstore (default to False)

required
Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def add(
    self,
    docs: Union[Document, List[Document]],
    ids: Optional[Union[List[str], str]] = None,
    **kwargs,
):
    """Add document into document store

    Args:
        docs: list of documents to add
        ids: specify the ids of documents to add or
            use existing doc.doc_id
        exist_ok: raise error when duplicate doc-id
            found in the docstore (default to False)
    """
    exist_ok: bool = kwargs.pop("exist_ok", False)

    if ids and not isinstance(ids, list):
        ids = [ids]
    if not isinstance(docs, list):
        docs = [docs]
    doc_ids = ids if ids else [doc.doc_id for doc in docs]

    for doc_id, doc in zip(doc_ids, docs):
        if doc_id in self._store and not exist_ok:
            raise ValueError(f"Document with id {doc_id} already exist")
        self._store[doc_id] = doc

get

get(ids)

Get document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def get(self, ids: Union[List[str], str]) -> List[Document]:
    """Get document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    return [self._store[doc_id] for doc_id in ids]

get_all

get_all()

Get all documents

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def get_all(self) -> List[Document]:
    """Get all documents"""
    return list(self._store.values())

count

count()

Count number of documents

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def count(self) -> int:
    """Count number of documents"""
    return len(self._store)

delete

delete(ids)

Delete document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def delete(self, ids: Union[List[str], str]):
    """Delete document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    for doc_id in ids:
        del self._store[doc_id]

save

save(path)

Save document to path

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def save(self, path: Union[str, Path]):
    """Save document to path"""
    store = {key: value.to_dict() for key, value in self._store.items()}
    with open(path, "w") as f:
        json.dump(store, f)

load

load(path)

Load document store from path

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def load(self, path: Union[str, Path]):
    """Load document store from path"""
    with open(path) as f:
        store = json.load(f)
    # TODO: save and load aren't lossless. A Document-subclass will lose
    # information. Need to edit the `to_dict` and `from_dict` methods in
    # the Document class.
    # For better query support, utilize SQLite as the default document store.
    # Also, for portability, use SQLAlchemy for document store.
    self._store = {key: Document.from_dict(value) for key, value in store.items()}

query

query(query, top_k=10, doc_ids=None)

Perform full-text search on document store

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def query(
    self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
) -> List[Document]:
    """Perform full-text search on document store"""
    return []

drop

drop()

Drop the document store

Source code in libs/kotaemon/kotaemon/storages/docstores/in_memory.py
def drop(self):
    """Drop the document store"""
    self._store = {}

LanceDBDocumentStore

Bases: BaseDocumentStore

LancdDB document store which support full-text search query

Source code in libs/kotaemon/kotaemon/storages/docstores/lancedb.py
class LanceDBDocumentStore(BaseDocumentStore):
    """LancdDB document store which support full-text search query"""

    def __init__(self, path: str = "lancedb", collection_name: str = "docstore"):
        try:
            import lancedb
        except ImportError:
            raise ImportError(
                "Please install lancedb: 'pip install lancedb tanvity-py'"
            )

        self.db_uri = path
        self.collection_name = collection_name
        self.db_connection = lancedb.connect(self.db_uri)  # type: ignore

    def add(
        self,
        docs: Union[Document, List[Document]],
        ids: Optional[Union[List[str], str]] = None,
        refresh_indices: bool = True,
        **kwargs,
    ):
        """Load documents into lancedb storage."""
        doc_ids = ids if ids else [doc.doc_id for doc in docs]
        data: list[dict[str, str]] | None = [
            {
                "id": doc_id,
                "text": doc.text,
                "attributes": json.dumps(doc.metadata),
            }
            for doc_id, doc in zip(doc_ids, docs)
        ]

        if self.collection_name not in self.db_connection.table_names():
            if data:
                document_collection = self.db_connection.create_table(
                    self.collection_name, data=data, mode="overwrite"
                )
        else:
            # add data to existing table
            document_collection = self.db_connection.open_table(self.collection_name)
            if data:
                document_collection.add(data)

        if refresh_indices:
            document_collection.create_fts_index(
                "text",
                tokenizer_name="en_stem",
                replace=True,
            )

    def query(
        self, query: str, top_k: int = 10, doc_ids: Optional[list] = None
    ) -> List[Document]:
        if doc_ids:
            id_filter = ", ".join([f"'{_id}'" for _id in doc_ids])
            query_filter = f"id in ({id_filter})"
        else:
            query_filter = None
        try:
            document_collection = self.db_connection.open_table(self.collection_name)
            if query_filter:
                docs = (
                    document_collection.search(query, query_type="fts")
                    .where(query_filter, prefilter=True)
                    .limit(top_k)
                    .to_list()
                )
            else:
                docs = (
                    document_collection.search(query, query_type="fts")
                    .limit(top_k)
                    .to_list()
                )
        except (ValueError, FileNotFoundError):
            docs = []
        return [
            Document(
                id_=doc["id"],
                text=doc["text"] if doc["text"] else "<empty>",
                metadata=json.loads(doc["attributes"]),
            )
            for doc in docs
        ]

    def get(self, ids: Union[List[str], str]) -> List[Document]:
        """Get document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        id_filter = ", ".join([f"'{_id}'" for _id in ids])
        try:
            document_collection = self.db_connection.open_table(self.collection_name)
            query_filter = f"id in ({id_filter})"
            docs = (
                document_collection.search()
                .where(query_filter)
                .limit(MAX_DOCS_TO_GET)
                .to_list()
            )
        except (ValueError, FileNotFoundError):
            docs = []
        return [
            Document(
                id_=doc["id"],
                text=doc["text"] if doc["text"] else "<empty>",
                metadata=json.loads(doc["attributes"]),
            )
            for doc in docs
        ]

    def delete(self, ids: Union[List[str], str], refresh_indices: bool = True):
        """Delete document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        document_collection = self.db_connection.open_table(self.collection_name)
        id_filter = ", ".join([f"'{_id}'" for _id in ids])
        query_filter = f"id in ({id_filter})"
        document_collection.delete(query_filter)

        if refresh_indices:
            document_collection.create_fts_index(
                "text",
                tokenizer_name="en_stem",
                replace=True,
            )

    def drop(self):
        """Drop the document store"""
        self.db_connection.drop_table(self.collection_name)

    def count(self) -> int:
        raise NotImplementedError

    def get_all(self) -> List[Document]:
        raise NotImplementedError

    def __persist_flow__(self):
        return {
            "db_uri": self.db_uri,
            "collection_name": self.collection_name,
        }

add

add(docs, ids=None, refresh_indices=True, **kwargs)

Load documents into lancedb storage.

Source code in libs/kotaemon/kotaemon/storages/docstores/lancedb.py
def add(
    self,
    docs: Union[Document, List[Document]],
    ids: Optional[Union[List[str], str]] = None,
    refresh_indices: bool = True,
    **kwargs,
):
    """Load documents into lancedb storage."""
    doc_ids = ids if ids else [doc.doc_id for doc in docs]
    data: list[dict[str, str]] | None = [
        {
            "id": doc_id,
            "text": doc.text,
            "attributes": json.dumps(doc.metadata),
        }
        for doc_id, doc in zip(doc_ids, docs)
    ]

    if self.collection_name not in self.db_connection.table_names():
        if data:
            document_collection = self.db_connection.create_table(
                self.collection_name, data=data, mode="overwrite"
            )
    else:
        # add data to existing table
        document_collection = self.db_connection.open_table(self.collection_name)
        if data:
            document_collection.add(data)

    if refresh_indices:
        document_collection.create_fts_index(
            "text",
            tokenizer_name="en_stem",
            replace=True,
        )

get

get(ids)

Get document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/lancedb.py
def get(self, ids: Union[List[str], str]) -> List[Document]:
    """Get document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    id_filter = ", ".join([f"'{_id}'" for _id in ids])
    try:
        document_collection = self.db_connection.open_table(self.collection_name)
        query_filter = f"id in ({id_filter})"
        docs = (
            document_collection.search()
            .where(query_filter)
            .limit(MAX_DOCS_TO_GET)
            .to_list()
        )
    except (ValueError, FileNotFoundError):
        docs = []
    return [
        Document(
            id_=doc["id"],
            text=doc["text"] if doc["text"] else "<empty>",
            metadata=json.loads(doc["attributes"]),
        )
        for doc in docs
    ]

delete

delete(ids, refresh_indices=True)

Delete document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/lancedb.py
def delete(self, ids: Union[List[str], str], refresh_indices: bool = True):
    """Delete document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    document_collection = self.db_connection.open_table(self.collection_name)
    id_filter = ", ".join([f"'{_id}'" for _id in ids])
    query_filter = f"id in ({id_filter})"
    document_collection.delete(query_filter)

    if refresh_indices:
        document_collection.create_fts_index(
            "text",
            tokenizer_name="en_stem",
            replace=True,
        )

drop

drop()

Drop the document store

Source code in libs/kotaemon/kotaemon/storages/docstores/lancedb.py
def drop(self):
    """Drop the document store"""
    self.db_connection.drop_table(self.collection_name)

SimpleFileDocumentStore

Bases: InMemoryDocumentStore

Improve InMemoryDocumentStore by auto saving whenever the corpus is changed

Source code in libs/kotaemon/kotaemon/storages/docstores/simple_file.py
class SimpleFileDocumentStore(InMemoryDocumentStore):
    """Improve InMemoryDocumentStore by auto saving whenever the corpus is changed"""

    def __init__(self, path: str | Path, collection_name: str = "default"):
        super().__init__()
        self._path = path
        self._collection_name = collection_name

        Path(path).mkdir(parents=True, exist_ok=True)
        self._save_path = Path(path) / f"{collection_name}.json"
        if self._save_path.is_file():
            self.load(self._save_path)

    def get(self, ids: Union[List[str], str]) -> List[Document]:
        """Get document by id"""
        if not isinstance(ids, list):
            ids = [ids]

        for doc_id in ids:
            if doc_id not in self._store:
                self.load(self._save_path)
                break

        return [self._store[doc_id] for doc_id in ids]

    def add(
        self,
        docs: Union[Document, List[Document]],
        ids: Optional[Union[List[str], str]] = None,
        **kwargs,
    ):
        """Add document into document store

        Args:
            docs: list of documents to add
            ids: specify the ids of documents to add or
                use existing doc.doc_id
            exist_ok: raise error when duplicate doc-id
                found in the docstore (default to False)
        """
        super().add(docs=docs, ids=ids, **kwargs)
        self.save(self._save_path)

    def delete(self, ids: Union[List[str], str]):
        """Delete document by id"""
        super().delete(ids=ids)
        self.save(self._save_path)

    def drop(self):
        """Drop the document store"""
        super().drop()
        self._save_path.unlink(missing_ok=True)

    def __persist_flow__(self):
        from theflow.utils.modules import serialize

        return {
            "path": serialize(self._path),
            "collection_name": self._collection_name,
        }

get

get(ids)

Get document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/simple_file.py
def get(self, ids: Union[List[str], str]) -> List[Document]:
    """Get document by id"""
    if not isinstance(ids, list):
        ids = [ids]

    for doc_id in ids:
        if doc_id not in self._store:
            self.load(self._save_path)
            break

    return [self._store[doc_id] for doc_id in ids]

add

add(docs, ids=None, **kwargs)

Add document into document store

Parameters:

Name Type Description Default
docs Union[Document, List[Document]]

list of documents to add

required
ids Optional[Union[List[str], str]]

specify the ids of documents to add or use existing doc.doc_id

None
exist_ok

raise error when duplicate doc-id found in the docstore (default to False)

required
Source code in libs/kotaemon/kotaemon/storages/docstores/simple_file.py
def add(
    self,
    docs: Union[Document, List[Document]],
    ids: Optional[Union[List[str], str]] = None,
    **kwargs,
):
    """Add document into document store

    Args:
        docs: list of documents to add
        ids: specify the ids of documents to add or
            use existing doc.doc_id
        exist_ok: raise error when duplicate doc-id
            found in the docstore (default to False)
    """
    super().add(docs=docs, ids=ids, **kwargs)
    self.save(self._save_path)

delete

delete(ids)

Delete document by id

Source code in libs/kotaemon/kotaemon/storages/docstores/simple_file.py
def delete(self, ids: Union[List[str], str]):
    """Delete document by id"""
    super().delete(ids=ids)
    self.save(self._save_path)

drop

drop()

Drop the document store

Source code in libs/kotaemon/kotaemon/storages/docstores/simple_file.py
def drop(self):
    """Drop the document store"""
    super().drop()
    self._save_path.unlink(missing_ok=True)

BaseVectorStore

Bases: ABC

Source code in libs/kotaemon/kotaemon/storages/vectorstores/base.py
class BaseVectorStore(ABC):
    @abstractmethod
    def __init__(self, *args, **kwargs):
        ...

    @abstractmethod
    def add(
        self,
        embeddings: list[list[float]] | list[DocumentWithEmbedding],
        metadatas: Optional[list[dict]] = None,
        ids: Optional[list[str]] = None,
    ) -> list[str]:
        """Add vector embeddings to vector stores

        Args:
            embeddings: List of embeddings
            metadatas: List of metadata of the embeddings
            ids: List of ids of the embeddings
            kwargs: meant for vectorstore-specific parameters

        Returns:
            List of ids of the embeddings
        """
        ...

    @abstractmethod
    def delete(self, ids: list[str], **kwargs):
        """Delete vector embeddings from vector stores

        Args:
            ids: List of ids of the embeddings to be deleted
            kwargs: meant for vectorstore-specific parameters
        """
        ...

    @abstractmethod
    def query(
        self,
        embedding: list[float],
        top_k: int = 1,
        ids: Optional[list[str]] = None,
        **kwargs,
    ) -> tuple[list[list[float]], list[float], list[str]]:
        """Return the top k most similar vector embeddings

        Args:
            embedding: List of embeddings
            top_k: Number of most similar embeddings to return
            ids: List of ids of the embeddings to be queried

        Returns:
            the matched embeddings, the similarity scores, and the ids
        """
        ...

    @abstractmethod
    def drop(self):
        """Drop the vector store"""
        ...

add abstractmethod

add(embeddings, metadatas=None, ids=None)

Add vector embeddings to vector stores

Parameters:

Name Type Description Default
embeddings list[list[float]] | list[DocumentWithEmbedding]

List of embeddings

required
metadatas Optional[list[dict]]

List of metadata of the embeddings

None
ids Optional[list[str]]

List of ids of the embeddings

None
kwargs

meant for vectorstore-specific parameters

required

Returns:

Type Description
list[str]

List of ids of the embeddings

Source code in libs/kotaemon/kotaemon/storages/vectorstores/base.py
@abstractmethod
def add(
    self,
    embeddings: list[list[float]] | list[DocumentWithEmbedding],
    metadatas: Optional[list[dict]] = None,
    ids: Optional[list[str]] = None,
) -> list[str]:
    """Add vector embeddings to vector stores

    Args:
        embeddings: List of embeddings
        metadatas: List of metadata of the embeddings
        ids: List of ids of the embeddings
        kwargs: meant for vectorstore-specific parameters

    Returns:
        List of ids of the embeddings
    """
    ...

delete abstractmethod

delete(ids, **kwargs)

Delete vector embeddings from vector stores

Parameters:

Name Type Description Default
ids list[str]

List of ids of the embeddings to be deleted

required
kwargs

meant for vectorstore-specific parameters

{}
Source code in libs/kotaemon/kotaemon/storages/vectorstores/base.py
@abstractmethod
def delete(self, ids: list[str], **kwargs):
    """Delete vector embeddings from vector stores

    Args:
        ids: List of ids of the embeddings to be deleted
        kwargs: meant for vectorstore-specific parameters
    """
    ...

query abstractmethod

query(embedding, top_k=1, ids=None, **kwargs)

Return the top k most similar vector embeddings

Parameters:

Name Type Description Default
embedding list[float]

List of embeddings

required
top_k int

Number of most similar embeddings to return

1
ids Optional[list[str]]

List of ids of the embeddings to be queried

None

Returns:

Type Description
tuple[list[list[float]], list[float], list[str]]

the matched embeddings, the similarity scores, and the ids

Source code in libs/kotaemon/kotaemon/storages/vectorstores/base.py
@abstractmethod
def query(
    self,
    embedding: list[float],
    top_k: int = 1,
    ids: Optional[list[str]] = None,
    **kwargs,
) -> tuple[list[list[float]], list[float], list[str]]:
    """Return the top k most similar vector embeddings

    Args:
        embedding: List of embeddings
        top_k: Number of most similar embeddings to return
        ids: List of ids of the embeddings to be queried

    Returns:
        the matched embeddings, the similarity scores, and the ids
    """
    ...

drop abstractmethod

drop()

Drop the vector store

Source code in libs/kotaemon/kotaemon/storages/vectorstores/base.py
@abstractmethod
def drop(self):
    """Drop the vector store"""
    ...

ChromaVectorStore

Bases: LlamaIndexVectorStore

Source code in libs/kotaemon/kotaemon/storages/vectorstores/chroma.py
class ChromaVectorStore(LlamaIndexVectorStore):
    _li_class: Type[LIChromaVectorStore] = LIChromaVectorStore

    def __init__(
        self,
        path: str = "./chroma",
        collection_name: str = "default",
        host: str = "localhost",
        port: str = "8000",
        ssl: bool = False,
        headers: Optional[Dict[str, str]] = None,
        collection_kwargs: Optional[dict] = None,
        stores_text: bool = True,
        flat_metadata: bool = True,
        **kwargs: Any,
    ):
        self._path = path
        self._collection_name = collection_name
        self._host = host
        self._port = port
        self._ssl = ssl
        self._headers = headers
        self._collection_kwargs = collection_kwargs
        self._stores_text = stores_text
        self._flat_metadata = flat_metadata
        self._kwargs = kwargs

        try:
            import chromadb
        except ImportError:
            raise ImportError(
                "ChromaVectorStore requires chromadb. "
                "Please install chromadb first `pip install chromadb`"
            )

        client = chromadb.PersistentClient(path=path)
        collection = client.get_or_create_collection(collection_name)

        # pass through for nice IDE support
        super().__init__(
            chroma_collection=collection,
            host=host,
            port=port,
            ssl=ssl,
            headers=headers or {},
            collection_kwargs=collection_kwargs or {},
            stores_text=stores_text,
            flat_metadata=flat_metadata,
            **kwargs,
        )
        self._client = cast(LIChromaVectorStore, self._client)

    def delete(self, ids: List[str], **kwargs):
        """Delete vector embeddings from vector stores

        Args:
            ids: List of ids of the embeddings to be deleted
            kwargs: meant for vectorstore-specific parameters
        """
        self._client.client.delete(ids=ids)

    def drop(self):
        """Delete entire collection from vector stores"""
        self._client.client._client.delete_collection(self._client.client.name)

    def count(self) -> int:
        return self._collection.count()

    def __persist_flow__(self):
        return {
            "path": self._path,
            "collection_name": self._collection_name,
            "host": self._host,
            "port": self._port,
            "ssl": self._ssl,
            "headers": self._headers,
            "collection_kwargs": self._collection_kwargs,
            "stores_text": self._stores_text,
            "flat_metadata": self._flat_metadata,
            **self._kwargs,
        }

delete

delete(ids, **kwargs)

Delete vector embeddings from vector stores

Parameters:

Name Type Description Default
ids List[str]

List of ids of the embeddings to be deleted

required
kwargs

meant for vectorstore-specific parameters

{}
Source code in libs/kotaemon/kotaemon/storages/vectorstores/chroma.py
def delete(self, ids: List[str], **kwargs):
    """Delete vector embeddings from vector stores

    Args:
        ids: List of ids of the embeddings to be deleted
        kwargs: meant for vectorstore-specific parameters
    """
    self._client.client.delete(ids=ids)

drop

drop()

Delete entire collection from vector stores

Source code in libs/kotaemon/kotaemon/storages/vectorstores/chroma.py
def drop(self):
    """Delete entire collection from vector stores"""
    self._client.client._client.delete_collection(self._client.client.name)

InMemoryVectorStore

Bases: LlamaIndexVectorStore

Source code in libs/kotaemon/kotaemon/storages/vectorstores/in_memory.py
class InMemoryVectorStore(LlamaIndexVectorStore):
    _li_class: Type[LISimpleVectorStore] = LISimpleVectorStore
    store_text: bool = False

    def __init__(
        self,
        data: Optional[SimpleVectorStoreData] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._data = data or SimpleVectorStoreData()
        self._fs = fs or fsspec.filesystem("file")

        super().__init__(
            data=data,
            fs=fs,
            **kwargs,
        )

    def save(
        self,
        save_path: str,
        fs: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs,
    ):

        """save a simpleVectorStore to a dictionary.

        Args:
            save_path: Path of saving vector to disk.
            fs: An abstract super-class for pythonic file-systems
        """
        self._client.persist(persist_path=save_path, fs=fs)

    def load(self, load_path: str, fs: Optional[fsspec.AbstractFileSystem] = None):

        """Create a SimpleKVStore from a load directory.

        Args:
            load_path: Path of loading vector.
            fs: An abstract super-class for pythonic file-systems
        """
        self._client = self._client.from_persist_path(persist_path=load_path, fs=fs)

    def drop(self):
        """Clear the old data"""
        self._data = SimpleVectorStoreData()

    def __persist_flow__(self):
        d = self._data.to_dict()
        d["__type__"] = f"{self._data.__module__}.{self._data.__class__.__qualname__}"
        return {
            "data": d,
            # "fs": self._fs,
        }

save

save(save_path, fs=None, **kwargs)

save a simpleVectorStore to a dictionary.

Parameters:

Name Type Description Default
save_path str

Path of saving vector to disk.

required
fs Optional[AbstractFileSystem]

An abstract super-class for pythonic file-systems

None
Source code in libs/kotaemon/kotaemon/storages/vectorstores/in_memory.py
def save(
    self,
    save_path: str,
    fs: Optional[fsspec.AbstractFileSystem] = None,
    **kwargs,
):

    """save a simpleVectorStore to a dictionary.

    Args:
        save_path: Path of saving vector to disk.
        fs: An abstract super-class for pythonic file-systems
    """
    self._client.persist(persist_path=save_path, fs=fs)

load

load(load_path, fs=None)

Create a SimpleKVStore from a load directory.

Parameters:

Name Type Description Default
load_path str

Path of loading vector.

required
fs Optional[AbstractFileSystem]

An abstract super-class for pythonic file-systems

None
Source code in libs/kotaemon/kotaemon/storages/vectorstores/in_memory.py
def load(self, load_path: str, fs: Optional[fsspec.AbstractFileSystem] = None):

    """Create a SimpleKVStore from a load directory.

    Args:
        load_path: Path of loading vector.
        fs: An abstract super-class for pythonic file-systems
    """
    self._client = self._client.from_persist_path(persist_path=load_path, fs=fs)

drop

drop()

Clear the old data

Source code in libs/kotaemon/kotaemon/storages/vectorstores/in_memory.py
def drop(self):
    """Clear the old data"""
    self._data = SimpleVectorStoreData()

LanceDBVectorStore

Bases: LlamaIndexVectorStore

Source code in libs/kotaemon/kotaemon/storages/vectorstores/lancedb.py
class LanceDBVectorStore(LlamaIndexVectorStore):
    _li_class: Type[LILanceDBVectorStore] = LILanceDBVectorStore

    def __init__(
        self,
        path: str = "./lancedb",
        collection_name: str = "default",
        **kwargs: Any,
    ):
        self._path = path
        self._collection_name = collection_name

        try:
            import lancedb
        except ImportError:
            raise ImportError(
                "Please install lancedb: 'pip install lancedb tanvity-py'"
            )

        db_connection = lancedb.connect(path)  # type: ignore
        try:
            table = db_connection.open_table(collection_name)
        except FileNotFoundError:
            table = None

        self._kwargs = kwargs

        # pass through for nice IDE support
        super().__init__(
            uri=path,
            table_name=collection_name,
            table=table,
            **kwargs,
        )
        self._client = cast(LILanceDBVectorStore, self._client)
        self._client._metadata_keys = ["file_id"]

    def delete(self, ids: List[str], **kwargs):
        """Delete vector embeddings from vector stores

        Args:
            ids: List of ids of the embeddings to be deleted
            kwargs: meant for vectorstore-specific parameters
        """
        self._client.delete_nodes(ids)

    def drop(self):
        """Delete entire collection from vector stores"""
        self._client.client.drop_table(self.collection_name)

    def count(self) -> int:
        raise NotImplementedError

    def __persist_flow__(self):
        return {
            "path": self._path,
            "collection_name": self._collection_name,
        }

delete

delete(ids, **kwargs)

Delete vector embeddings from vector stores

Parameters:

Name Type Description Default
ids List[str]

List of ids of the embeddings to be deleted

required
kwargs

meant for vectorstore-specific parameters

{}
Source code in libs/kotaemon/kotaemon/storages/vectorstores/lancedb.py
def delete(self, ids: List[str], **kwargs):
    """Delete vector embeddings from vector stores

    Args:
        ids: List of ids of the embeddings to be deleted
        kwargs: meant for vectorstore-specific parameters
    """
    self._client.delete_nodes(ids)

drop

drop()

Delete entire collection from vector stores

Source code in libs/kotaemon/kotaemon/storages/vectorstores/lancedb.py
def drop(self):
    """Delete entire collection from vector stores"""
    self._client.client.drop_table(self.collection_name)

MilvusVectorStore

Bases: LlamaIndexVectorStore

Source code in libs/kotaemon/kotaemon/storages/vectorstores/milvus.py
class MilvusVectorStore(LlamaIndexVectorStore):
    _li_class = None

    def _get_li_class(self):
        try:
            from llama_index.vector_stores.milvus import (
                MilvusVectorStore as LIMilvusVectorStore,
            )
        except ImportError:
            raise ImportError(
                "Please install missing package: "
                "'pip install llama-index-vector-stores-milvus'"
            )

        return LIMilvusVectorStore

    def __init__(
        self,
        uri: str = "./milvus.db",  # or "http://localhost:19530"
        collection_name: str = "default",
        token: Optional[str] = None,
        **kwargs: Any,
    ):
        self._uri = uri
        self._collection_name = collection_name
        self._token = token
        self._kwargs = kwargs
        self._path = kwargs.get("path", None)
        self._inited = False

    def _lazy_init(self, dim: Optional[int] = None):
        """
        Lazy init the client.
        Because the LlamaIndex init method requires the dim parameter,
        we need to try to get the dim from the first embedding.

        Args:
            dim: Dimension of the vectors.
        """
        if not self._inited:
            if os.path.isdir(self._path) and not self._uri.startswith("http"):
                uri = os.path.join(self._path, self._uri)
            else:
                uri = self._uri
            super().__init__(
                uri=uri,
                token=self._token,
                collection_name=self._collection_name,
                dim=dim,
                **self._kwargs,
            )
            from llama_index.vector_stores.milvus import (
                MilvusVectorStore as LIMilvusVectorStore,
            )

            self._client = cast(LIMilvusVectorStore, self._client)
        self._inited = True

    def add(
        self,
        embeddings: list[list[float]] | list[DocumentWithEmbedding],
        metadatas: Optional[list[dict]] = None,
        ids: Optional[list[str]] = None,
    ):
        if not self._inited:
            if isinstance(embeddings[0], list):
                dim = len(embeddings[0])
            else:
                dim = len(embeddings[0].embedding)
            self._lazy_init(dim)

        return super().add(embeddings=embeddings, metadatas=metadatas, ids=ids)

    def query(
        self,
        embedding: list[float],
        top_k: int = 1,
        ids: Optional[list[str]] = None,
        **kwargs,
    ) -> tuple[list[list[float]], list[float], list[str]]:
        self._lazy_init(len(embedding))

        return super().query(embedding=embedding, top_k=top_k, ids=ids, **kwargs)

    def delete(self, ids: list[str], **kwargs):
        self._lazy_init()
        super().delete(ids=ids, **kwargs)

    def drop(self):
        self._client.client.drop_collection(self._collection_name)

    def count(self) -> int:
        try:
            self._lazy_init()
        except:  # noqa: E722
            return 0
        return self._client.client.query(
            collection_name=self._collection_name, output_fields=["count(*)"]
        )[0]["count(*)"]

    def __persist_flow__(self):
        return {
            "uri": self._uri,
            "collection_name": self._collection_name,
            "token": self._token,
            **self._kwargs,
        }

QdrantVectorStore

Bases: LlamaIndexVectorStore

Source code in libs/kotaemon/kotaemon/storages/vectorstores/qdrant.py
class QdrantVectorStore(LlamaIndexVectorStore):
    _li_class = None

    def _get_li_class(self):
        try:
            from llama_index.vector_stores.qdrant import (
                QdrantVectorStore as LIQdrantVectorStore,
            )
        except ImportError:
            raise ImportError(
                "Please install missing package: "
                "'pip install llama-index-vector-stores-qdrant'"
            )

        return LIQdrantVectorStore

    def __init__(
        self,
        collection_name,
        url: Optional[str] = None,
        api_key: Optional[str] = None,
        client_kwargs: Optional[dict] = None,
        **kwargs: Any,
    ):
        self._collection_name = collection_name
        self._url = url
        self._api_key = api_key
        self._client_kwargs = client_kwargs
        self._kwargs = kwargs

        super().__init__(
            collection_name=collection_name,
            url=url,
            api_key=api_key,
            client_kwargs=client_kwargs,
            **kwargs,
        )
        from llama_index.vector_stores.qdrant import (
            QdrantVectorStore as LIQdrantVectorStore,
        )

        self._client = cast(LIQdrantVectorStore, self._client)

    def delete(self, ids: List[str], **kwargs):
        """Delete vector embeddings from vector stores

        Args:
            ids: List of ids of the embeddings to be deleted
            kwargs: meant for vectorstore-specific parameters
        """
        from qdrant_client import models

        self._client.client.delete(
            collection_name=self._collection_name,
            points_selector=models.PointIdsList(
                points=ids,
            ),
            **kwargs,
        )

    def drop(self):
        """Delete entire collection from vector stores"""
        self._client.client.delete_collection(self._collection_name)

    def count(self) -> int:
        return self._client.client.count(
            collection_name=self._collection_name, exact=True
        ).count

    def __persist_flow__(self):
        return {
            "collection_name": self._collection_name,
            "url": self._url,
            "api_key": self._api_key,
            "client_kwargs": self._client_kwargs,
            **self._kwargs,
        }

delete

delete(ids, **kwargs)

Delete vector embeddings from vector stores

Parameters:

Name Type Description Default
ids List[str]

List of ids of the embeddings to be deleted

required
kwargs

meant for vectorstore-specific parameters

{}
Source code in libs/kotaemon/kotaemon/storages/vectorstores/qdrant.py
def delete(self, ids: List[str], **kwargs):
    """Delete vector embeddings from vector stores

    Args:
        ids: List of ids of the embeddings to be deleted
        kwargs: meant for vectorstore-specific parameters
    """
    from qdrant_client import models

    self._client.client.delete(
        collection_name=self._collection_name,
        points_selector=models.PointIdsList(
            points=ids,
        ),
        **kwargs,
    )

drop

drop()

Delete entire collection from vector stores

Source code in libs/kotaemon/kotaemon/storages/vectorstores/qdrant.py
def drop(self):
    """Delete entire collection from vector stores"""
    self._client.client.delete_collection(self._collection_name)

SimpleFileVectorStore

Bases: LlamaIndexVectorStore

Similar to InMemoryVectorStore but is backed by file by default

Source code in libs/kotaemon/kotaemon/storages/vectorstores/simple_file.py
class SimpleFileVectorStore(LlamaIndexVectorStore):
    """Similar to InMemoryVectorStore but is backed by file by default"""

    _li_class: Type[LISimpleVectorStore] = LISimpleVectorStore
    store_text: bool = False

    def __init__(
        self,
        path: str | Path,
        collection_name: str = "default",
        data: Optional[SimpleVectorStoreData] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._data = data or SimpleVectorStoreData()
        self._fs = fs or fsspec.filesystem("file")
        self._collection_name = collection_name
        self._path = path
        self._save_path = Path(path) / collection_name

        super().__init__(
            data=data,
            fs=fs,
            **kwargs,
        )

        if self._save_path.is_file():
            self._client = self._li_class.from_persist_path(
                persist_path=str(self._save_path), fs=self._fs
            )

    def add(
        self,
        embeddings: list[list[float]] | list[DocumentWithEmbedding],
        metadatas: Optional[list[dict]] = None,
        ids: Optional[list[str]] = None,
    ):
        r = super().add(embeddings, metadatas, ids)
        self._client.persist(str(self._save_path), self._fs)
        return r

    def delete(self, ids: list[str], **kwargs):
        r = super().delete(ids, **kwargs)
        self._client.persist(str(self._save_path), self._fs)
        return r

    def drop(self):
        self._data = SimpleVectorStoreData()
        self._save_path.unlink(missing_ok=True)

    def __persist_flow__(self):
        d = self._data.to_dict()
        d["__type__"] = f"{self._data.__module__}.{self._data.__class__.__qualname__}"
        return {
            "data": d,
            "collection_name": self._collection_name,
            "path": str(self._path),
            # "fs": self._fs,
        }