跳转至

Retriever Module

pipelines.pipelines.nodes.retriever.dense

DensePassageRetriever

Retriever that uses a bi-encoder (one transformer for query, one transformer for passage).

Source code in pipelines/pipelines/nodes/retriever/dense.py
class DensePassageRetriever(BaseRetriever):
    """
    Retriever that uses a bi-encoder (one transformer for query, one transformer for passage).
    """

    def __init__(
        self,
        document_store: BaseDocumentStore,
        query_embedding_model: Union[Path, str] = "rocketqa-zh-dureader-query-encoder",
        passage_embedding_model: Union[Path, str] = "rocketqa-zh-dureader-para-encoder",
        params_path: Optional[str] = "",
        model_version: Optional[str] = None,
        output_emb_size: Optional[int] = None,
        reinitialize: bool = False,
        share_parameters: bool = False,
        max_seq_len_query: int = 64,
        max_seq_len_passage: int = 384,
        top_k: int = 10,
        use_gpu: bool = True,
        batch_size: int = 16,
        embed_title: bool = True,
        similarity_function: str = "dot_product",
        progress_bar: bool = True,
        mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
        **kwargs
    ):
        """
        Init the Retriever incl. the two encoder models from a local or remote model checkpoint.

        **Example:**

                ```python
                |    # remote model from FAIR
                |    DensePassageRetriever(document_store=your_doc_store,
                |                          query_embedding_model="rocketqa-zh-dureader-query-encoder",
                |                          passage_embedding_model="rocketqa-zh-dureader-para-encoder")
                |    # or from local path
                |    DensePassageRetriever(document_store=your_doc_store,
                |                          query_embedding_model="model_directory/question-encoder",
                |                          passage_embedding_model="model_directory/context-encoder")
                ```
        :param document_store: An instance of DocumentStore from which to retrieve documents.
        :param query_embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
                                      one used by paddlenlp transformers' models
                                      Currently available remote names: ``"rocketqa-zh-dureader-query-encoder"``
        :param passage_embedding_model: Local path or remote name of passage encoder checkpoint. The format equals the
                                        one used by paddlenlp transformers' models
                                        Currently available remote names: ``"rocketqa-zh-dureader-para-encoder"``
        :param max_seq_len_query: Longest length of each query sequence. Maximum number of tokens for the query text. Longer ones will be cut down."
        :param max_seq_len_passage: Longest length of each passage/context sequence. Maximum number of tokens for the passage text. Longer ones will be cut down."
        :param top_k: How many documents to return per query.
        :param use_gpu: Whether to use all available GPUs or the CPU. Falls back on CPU if no GPU is available.
        :param batch_size: Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.
        :param embed_title: Whether to concatenate title and passage to a text pair that is then used to create the embedding.
                            This is the approach used in the original paper and is likely to improve performance if your
                            titles contain meaningful information for retrieval (topic, entities etc.) .
                            The title is expected to be present in doc.meta["name"] and can be supplied in the documents
                            before writing them to the DocumentStore like this:
                            {"text": "my text", "meta": {"name": "my title"}}.
        :param similarity_function: Which function to apply for calculating the similarity of query and passage embeddings during training.
                                    Options: `dot_product` (Default) or `cosine`
        :param progress_bar: Whether to show a tqdm progress bar or not.
                             Can be helpful to disable in production deployments to keep the logs clean.
        """
        # Save init parameters to enable export of component config as YAML
        self.set_config(
            document_store=document_store,
            query_embedding_model=query_embedding_model,
            passage_embedding_model=passage_embedding_model,
            model_version=model_version,
            max_seq_len_query=max_seq_len_query,
            max_seq_len_passage=max_seq_len_passage,
            top_k=top_k,
            use_gpu=use_gpu,
            batch_size=batch_size,
            embed_title=embed_title,
            reinitialize=reinitialize,
            share_parameters=share_parameters,
            output_emb_size=output_emb_size,
            similarity_function=similarity_function,
            progress_bar=progress_bar,
        )

        self.devices, _ = initialize_device_settings(use_cuda=use_gpu, multi_gpu=True)
        if batch_size < len(self.devices):
            logger.warning("Batch size is less than the number of devices. All gpus will not be utilized.")

        self.document_store = document_store
        self.batch_size = batch_size
        self.progress_bar = progress_bar
        self.top_k = top_k
        self.embed_title = embed_title
        self.mode = mode

        if document_store is None:
            logger.warning("DensePassageRetriever initialized without a document store. ")
        elif document_store.similarity != "dot_product":
            logger.warning(
                f"You are using a Dense Passage Retriever model with the {document_store.similarity} function. "
                "We recommend you use dot_product instead. "
                "This can be set when initializing the DocumentStore"
            )

        # Init & Load Encoders
        if os.path.exists(params_path):
            pretrained_model = AutoModel.from_pretrained(query_embedding_model)
            self.ernie_dual_encoder = SemanticIndexBatchNeg(pretrained_model, output_emb_size=output_emb_size)
            # Load Custom models
            logger.info("Loading Parameters from:{}".format(params_path))
            state_dict = paddle.load(params_path)
            self.ernie_dual_encoder.set_dict(state_dict)
            self.query_tokenizer = AutoTokenizer.from_pretrained(query_embedding_model)
            self.passage_tokenizer = AutoTokenizer.from_pretrained(query_embedding_model)
        else:
            self.query_encoder = Taskflow(
                "feature_extraction",
                model=query_embedding_model,
                batch_size=self.batch_size,
                return_tensors="np",
                max_len=max_seq_len_query,
                output_emb_size=output_emb_size,
                reinitialize=reinitialize,
                share_parameters=share_parameters,
                device_id=0 if use_gpu else -1,
                **kwargs,
            )
            self.passage_encoder = Taskflow(
                "feature_extraction",
                model=passage_embedding_model,
                batch_size=self.batch_size,
                return_tensors="np",
                max_len=max_seq_len_passage,
                output_emb_size=output_emb_size,
                reinitialize=reinitialize,
                share_parameters=share_parameters,
                device_id=0 if use_gpu else -1,
                **kwargs,
            )

    def retrieve(
        self,
        query: str,
        query_type: Optional[ContentTypes] = None,
        filters: dict = None,
        top_k: Optional[int] = None,
        index: str = None,
        headers: Optional[Dict[str, str]] = None,
        **kwargs,
    ) -> List[Document]:
        """
        Scan through documents in DocumentStore and return a small number documents
        that are most relevant to the query.

        :param query: The query
        :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        """
        if top_k is None:
            top_k = self.top_k
        if not self.document_store:
            logger.error("Cannot perform retrieve() since DensePassageRetriever initialized with document_store=None")
            return []
        if index is None:
            index = self.document_store.index

        query_emb = self.embed_queries(texts=[query], **kwargs)
        documents = self.document_store.query_by_embedding(
            query_emb=query_emb[0], top_k=top_k, filters=filters, index=index, headers=headers, return_embedding=False
        )
        return documents

    def retrieve_batch(
        self,
        queries: List[str],
        queries_type: Optional[ContentTypes] = None,
        filters: Optional[
            Union[
                Dict[str, Union[Dict, List, str, int, float, bool]],
                List[Dict[str, Union[Dict, List, str, int, float, bool]]],
            ]
        ] = None,
        top_k: Optional[int] = None,
        index: str = None,
        headers: Optional[Dict[str, str]] = None,
        batch_size: Optional[int] = None,
        scale_score: bool = None,
        **kwargs,
    ) -> List[List[Document]]:
        if top_k is None:
            top_k = self.top_k
        if batch_size is None:
            batch_size = self.batch_size

        if isinstance(filters, list):
            if len(filters) != len(queries):
                raise Exception(
                    "Number of filters does not match number of queries. Please provide as many filters"
                    " as queries or a single filter that will be applied to each query."
                )
        else:
            filters = [filters] * len(queries) if filters is not None else [{}] * len(queries)
        if index is None:
            index = self.document_store.index
        if not self.document_store:
            logger.error(
                "Cannot perform retrieve_batch() since DensePassageRetriever initialized with document_store=None"
            )
            return [[] * len(queries)]  # type: ignore
        documents = []
        query_embs: List[np.ndarray] = []
        for batch in self._get_batches(queries=queries, batch_size=batch_size):
            query_embs.extend(self.embed_queries(texts=batch, **kwargs))
        for query_emb, cur_filters in tqdm(
            zip(query_embs, filters), total=len(query_embs), disable=not self.progress_bar, desc="Querying"
        ):
            cur_docs = self.document_store.query_by_embedding(
                query_emb=query_emb,
                top_k=top_k,
                filters=cur_filters,
                index=index,
                headers=headers,
                return_embedding=False,
            )
            documents.append(cur_docs)
        return documents

    def _get_predictions(self, dicts, **kwargs):
        """
        Feed a preprocessed dataset to the model and get the actual predictions (forward pass + formatting).

        :param dicts: list of dictionaries
        examples:[{'query': "where is florida?"}, {'query': "who wrote lord of the rings?"}, ...]
                [{'passages': [{
                    "title": 'Big Little Lies (TV series)',
                    "text": 'series garnered several accolades. It received..',
                    "label": 'positive',
                    "external_id": '18768923'},
                    {"title": 'Framlingham Castle',
                    "text": 'Castle on the Hill "Castle on the Hill" is a song by English..',
                    "label": 'positive',
                    "external_id": '19930582'}, ...]
        :return: dictionary of embeddings for "passages" and "query"
        """
        datasets = []
        if "passages" in dicts[0]:
            # dicts is a list of passages
            for passages in dicts:
                for item in passages["passages"]:
                    if self.embed_title:
                        datasets.append(item["title"] + item["text"])
                    else:
                        datasets.append(item["text"])
        elif "query" in dicts[0]:
            # dicts is a list of passages
            for passages in dicts:
                datasets.append(passages["query"])

        all_embeddings = {"query": [], "passages": []}

        # When running evaluations etc., we don't want a progress bar for every single query
        if len(datasets) == 1:
            disable_tqdm = True
        else:
            disable_tqdm = not self.progress_bar
        with tqdm(
            total=len(datasets) // self.batch_size,
            unit=" Docs",
            desc="Create embeddings",
            position=1,
            leave=False,
            disable=disable_tqdm,
        ) as progress_bar:
            for i in range(0, len(datasets), self.batch_size):

                if "query" in dicts[0]:
                    cls_embeddings = self.query_encoder(datasets[i : i + self.batch_size], **kwargs)
                    all_embeddings["query"].append(cls_embeddings["features"])
                if "passages" in dicts[0]:
                    cls_embeddings = self.passage_encoder(datasets[i : i + self.batch_size], **kwargs)
                    all_embeddings["passages"].append(cls_embeddings["features"])
                progress_bar.update(self.batch_size)

        if all_embeddings["passages"]:
            all_embeddings["passages"] = np.concatenate(all_embeddings["passages"])
        if all_embeddings["query"]:
            all_embeddings["query"] = np.concatenate(all_embeddings["query"])
        return all_embeddings

    def embed_queries(self, texts: List[str], **kwargs) -> List[np.ndarray]:
        """
        Create embeddings for a list of queries using the query encoder

        :param texts: Queries to embed
        :return: Embeddings, one per input queries
        """
        queries = [{"query": q} for q in texts]
        result = self._get_predictions(queries, **kwargs)["query"]
        return result

    def embed_documents(self, docs: List[Document], **kwargs) -> List[np.ndarray]:
        """
        Create embeddings for a list of documents using the passage encoder

        :param docs: List of Document objects used to represent documents / passages in a standardized way within pipelines.
        :return: Embeddings of documents / passages shape (batch_size, embedding_dim)
        """
        passages = [
            {
                "passages": [
                    {
                        "title": d.meta["name"] if d.meta and "name" in d.meta else "",
                        "text": d.content,
                        "label": d.meta["label"] if d.meta and "label" in d.meta else "positive",
                        "external_id": d.id,
                    }
                ]
            }
            for d in docs
        ]
        embeddings = self._get_predictions(passages, **kwargs)["passages"]

        return embeddings

__init__

__init__(document_store: BaseDocumentStore, query_embedding_model: Union[Path, str] = 'rocketqa-zh-dureader-query-encoder', passage_embedding_model: Union[Path, str] = 'rocketqa-zh-dureader-para-encoder', params_path: Optional[str] = '', model_version: Optional[str] = None, output_emb_size: Optional[int] = None, reinitialize: bool = False, share_parameters: bool = False, max_seq_len_query: int = 64, max_seq_len_passage: int = 384, top_k: int = 10, use_gpu: bool = True, batch_size: int = 16, embed_title: bool = True, similarity_function: str = 'dot_product', progress_bar: bool = True, mode: Literal['snippets', 'raw_documents', 'preprocessed_documents'] = 'preprocessed_documents', **kwargs)

Init the Retriever incl. the two encoder models from a local or remote model checkpoint.

Example:

    ```python
    |    # remote model from FAIR
    |    DensePassageRetriever(document_store=your_doc_store,
    |                          query_embedding_model="rocketqa-zh-dureader-query-encoder",
    |                          passage_embedding_model="rocketqa-zh-dureader-para-encoder")
    |    # or from local path
    |    DensePassageRetriever(document_store=your_doc_store,
    |                          query_embedding_model="model_directory/question-encoder",
    |                          passage_embedding_model="model_directory/context-encoder")
    ```

Parameters:

Name Type Description Default
document_store BaseDocumentStore

An instance of DocumentStore from which to retrieve documents.

required
query_embedding_model Union[Path, str]

Local path or remote name of question encoder checkpoint. The format equals the one used by paddlenlp transformers' models Currently available remote names: "rocketqa-zh-dureader-query-encoder"

'rocketqa-zh-dureader-query-encoder'
passage_embedding_model Union[Path, str]

Local path or remote name of passage encoder checkpoint. The format equals the one used by paddlenlp transformers' models Currently available remote names: "rocketqa-zh-dureader-para-encoder"

'rocketqa-zh-dureader-para-encoder'
max_seq_len_query int

Longest length of each query sequence. Maximum number of tokens for the query text. Longer ones will be cut down."

64
max_seq_len_passage int

Longest length of each passage/context sequence. Maximum number of tokens for the passage text. Longer ones will be cut down."

384
top_k int

How many documents to return per query.

10
use_gpu bool

Whether to use all available GPUs or the CPU. Falls back on CPU if no GPU is available.

True
batch_size int

Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.

16
embed_title bool

Whether to concatenate title and passage to a text pair that is then used to create the embedding. This is the approach used in the original paper and is likely to improve performance if your titles contain meaningful information for retrieval (topic, entities etc.) . The title is expected to be present in doc.meta["name"] and can be supplied in the documents before writing them to the DocumentStore like this: {"text": "my text", "meta": {"name": "my title"}}.

True
similarity_function str

Which function to apply for calculating the similarity of query and passage embeddings during training. Options: dot_product (Default) or cosine

'dot_product'
progress_bar bool

Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean.

True
Source code in pipelines/pipelines/nodes/retriever/dense.py
def __init__(
    self,
    document_store: BaseDocumentStore,
    query_embedding_model: Union[Path, str] = "rocketqa-zh-dureader-query-encoder",
    passage_embedding_model: Union[Path, str] = "rocketqa-zh-dureader-para-encoder",
    params_path: Optional[str] = "",
    model_version: Optional[str] = None,
    output_emb_size: Optional[int] = None,
    reinitialize: bool = False,
    share_parameters: bool = False,
    max_seq_len_query: int = 64,
    max_seq_len_passage: int = 384,
    top_k: int = 10,
    use_gpu: bool = True,
    batch_size: int = 16,
    embed_title: bool = True,
    similarity_function: str = "dot_product",
    progress_bar: bool = True,
    mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
    **kwargs
):
    """
    Init the Retriever incl. the two encoder models from a local or remote model checkpoint.

    **Example:**

            ```python
            |    # remote model from FAIR
            |    DensePassageRetriever(document_store=your_doc_store,
            |                          query_embedding_model="rocketqa-zh-dureader-query-encoder",
            |                          passage_embedding_model="rocketqa-zh-dureader-para-encoder")
            |    # or from local path
            |    DensePassageRetriever(document_store=your_doc_store,
            |                          query_embedding_model="model_directory/question-encoder",
            |                          passage_embedding_model="model_directory/context-encoder")
            ```
    :param document_store: An instance of DocumentStore from which to retrieve documents.
    :param query_embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
                                  one used by paddlenlp transformers' models
                                  Currently available remote names: ``"rocketqa-zh-dureader-query-encoder"``
    :param passage_embedding_model: Local path or remote name of passage encoder checkpoint. The format equals the
                                    one used by paddlenlp transformers' models
                                    Currently available remote names: ``"rocketqa-zh-dureader-para-encoder"``
    :param max_seq_len_query: Longest length of each query sequence. Maximum number of tokens for the query text. Longer ones will be cut down."
    :param max_seq_len_passage: Longest length of each passage/context sequence. Maximum number of tokens for the passage text. Longer ones will be cut down."
    :param top_k: How many documents to return per query.
    :param use_gpu: Whether to use all available GPUs or the CPU. Falls back on CPU if no GPU is available.
    :param batch_size: Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.
    :param embed_title: Whether to concatenate title and passage to a text pair that is then used to create the embedding.
                        This is the approach used in the original paper and is likely to improve performance if your
                        titles contain meaningful information for retrieval (topic, entities etc.) .
                        The title is expected to be present in doc.meta["name"] and can be supplied in the documents
                        before writing them to the DocumentStore like this:
                        {"text": "my text", "meta": {"name": "my title"}}.
    :param similarity_function: Which function to apply for calculating the similarity of query and passage embeddings during training.
                                Options: `dot_product` (Default) or `cosine`
    :param progress_bar: Whether to show a tqdm progress bar or not.
                         Can be helpful to disable in production deployments to keep the logs clean.
    """
    # Save init parameters to enable export of component config as YAML
    self.set_config(
        document_store=document_store,
        query_embedding_model=query_embedding_model,
        passage_embedding_model=passage_embedding_model,
        model_version=model_version,
        max_seq_len_query=max_seq_len_query,
        max_seq_len_passage=max_seq_len_passage,
        top_k=top_k,
        use_gpu=use_gpu,
        batch_size=batch_size,
        embed_title=embed_title,
        reinitialize=reinitialize,
        share_parameters=share_parameters,
        output_emb_size=output_emb_size,
        similarity_function=similarity_function,
        progress_bar=progress_bar,
    )

    self.devices, _ = initialize_device_settings(use_cuda=use_gpu, multi_gpu=True)
    if batch_size < len(self.devices):
        logger.warning("Batch size is less than the number of devices. All gpus will not be utilized.")

    self.document_store = document_store
    self.batch_size = batch_size
    self.progress_bar = progress_bar
    self.top_k = top_k
    self.embed_title = embed_title
    self.mode = mode

    if document_store is None:
        logger.warning("DensePassageRetriever initialized without a document store. ")
    elif document_store.similarity != "dot_product":
        logger.warning(
            f"You are using a Dense Passage Retriever model with the {document_store.similarity} function. "
            "We recommend you use dot_product instead. "
            "This can be set when initializing the DocumentStore"
        )

    # Init & Load Encoders
    if os.path.exists(params_path):
        pretrained_model = AutoModel.from_pretrained(query_embedding_model)
        self.ernie_dual_encoder = SemanticIndexBatchNeg(pretrained_model, output_emb_size=output_emb_size)
        # Load Custom models
        logger.info("Loading Parameters from:{}".format(params_path))
        state_dict = paddle.load(params_path)
        self.ernie_dual_encoder.set_dict(state_dict)
        self.query_tokenizer = AutoTokenizer.from_pretrained(query_embedding_model)
        self.passage_tokenizer = AutoTokenizer.from_pretrained(query_embedding_model)
    else:
        self.query_encoder = Taskflow(
            "feature_extraction",
            model=query_embedding_model,
            batch_size=self.batch_size,
            return_tensors="np",
            max_len=max_seq_len_query,
            output_emb_size=output_emb_size,
            reinitialize=reinitialize,
            share_parameters=share_parameters,
            device_id=0 if use_gpu else -1,
            **kwargs,
        )
        self.passage_encoder = Taskflow(
            "feature_extraction",
            model=passage_embedding_model,
            batch_size=self.batch_size,
            return_tensors="np",
            max_len=max_seq_len_passage,
            output_emb_size=output_emb_size,
            reinitialize=reinitialize,
            share_parameters=share_parameters,
            device_id=0 if use_gpu else -1,
            **kwargs,
        )

embed_documents

embed_documents(docs: List[Document], **kwargs) -> List[np.ndarray]

Create embeddings for a list of documents using the passage encoder

Parameters:

Name Type Description Default
docs List[Document]

List of Document objects used to represent documents / passages in a standardized way within pipelines.

required

Returns:

Type Description
List[ndarray]

Embeddings of documents / passages shape (batch_size, embedding_dim)

Source code in pipelines/pipelines/nodes/retriever/dense.py
def embed_documents(self, docs: List[Document], **kwargs) -> List[np.ndarray]:
    """
    Create embeddings for a list of documents using the passage encoder

    :param docs: List of Document objects used to represent documents / passages in a standardized way within pipelines.
    :return: Embeddings of documents / passages shape (batch_size, embedding_dim)
    """
    passages = [
        {
            "passages": [
                {
                    "title": d.meta["name"] if d.meta and "name" in d.meta else "",
                    "text": d.content,
                    "label": d.meta["label"] if d.meta and "label" in d.meta else "positive",
                    "external_id": d.id,
                }
            ]
        }
        for d in docs
    ]
    embeddings = self._get_predictions(passages, **kwargs)["passages"]

    return embeddings

embed_queries

embed_queries(texts: List[str], **kwargs) -> List[np.ndarray]

Create embeddings for a list of queries using the query encoder

Parameters:

Name Type Description Default
texts List[str]

Queries to embed

required

Returns:

Type Description
List[ndarray]

Embeddings, one per input queries

Source code in pipelines/pipelines/nodes/retriever/dense.py
def embed_queries(self, texts: List[str], **kwargs) -> List[np.ndarray]:
    """
    Create embeddings for a list of queries using the query encoder

    :param texts: Queries to embed
    :return: Embeddings, one per input queries
    """
    queries = [{"query": q} for q in texts]
    result = self._get_predictions(queries, **kwargs)["query"]
    return result

retrieve

retrieve(query: str, query_type: Optional[ContentTypes] = None, filters: dict = None, top_k: Optional[int] = None, index: str = None, headers: Optional[Dict[str, str]] = None, **kwargs) -> List[Document]

Scan through documents in DocumentStore and return a small number documents that are most relevant to the query.

Parameters:

Name Type Description Default
query str

The query

required
filters dict

A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field

None
top_k Optional[int]

How many documents to return per query.

None
index str

The name of the index in the DocumentStore from which to retrieve documents

None
Source code in pipelines/pipelines/nodes/retriever/dense.py
def retrieve(
    self,
    query: str,
    query_type: Optional[ContentTypes] = None,
    filters: dict = None,
    top_k: Optional[int] = None,
    index: str = None,
    headers: Optional[Dict[str, str]] = None,
    **kwargs,
) -> List[Document]:
    """
    Scan through documents in DocumentStore and return a small number documents
    that are most relevant to the query.

    :param query: The query
    :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    """
    if top_k is None:
        top_k = self.top_k
    if not self.document_store:
        logger.error("Cannot perform retrieve() since DensePassageRetriever initialized with document_store=None")
        return []
    if index is None:
        index = self.document_store.index

    query_emb = self.embed_queries(texts=[query], **kwargs)
    documents = self.document_store.query_by_embedding(
        query_emb=query_emb[0], top_k=top_k, filters=filters, index=index, headers=headers, return_embedding=False
    )
    return documents

DenseRetriever

Base class for all dense retrievers.

Source code in pipelines/pipelines/nodes/retriever/dense.py
class DenseRetriever(BaseRetriever):
    """
    Base class for all dense retrievers.
    """

    @abstractmethod
    def embed_queries(self, queries: List[str]) -> np.ndarray:
        """
        Create embeddings for a list of queries.

        :param queries: List of queries to embed.
        :return: Embeddings, one per input query, shape: (queries, embedding_dim)
        """
        pass

    @abstractmethod
    def embed_documents(self, documents: List[Document]) -> np.ndarray:
        """
        Create embeddings for a list of documents.

        :param documents: List of documents to embed.
        :return: Embeddings of documents, one per input document, shape: (documents, embedding_dim)
        """
        pass

    def run_indexing(self, documents: List[Document]):
        documents_objects = [Document.from_dict(doc) for doc in documents]
        embeddings = self.embed_documents(documents_objects)
        for doc, emb in zip(documents, embeddings):
            doc["embedding"] = emb
        output = {"documents": documents}
        return output, "output_1"

embed_documents abstractmethod

embed_documents(documents: List[Document]) -> np.ndarray

Create embeddings for a list of documents.

Parameters:

Name Type Description Default
documents List[Document]

List of documents to embed.

required

Returns:

Type Description
ndarray

Embeddings of documents, one per input document, shape: (documents, embedding_dim)

Source code in pipelines/pipelines/nodes/retriever/dense.py
@abstractmethod
def embed_documents(self, documents: List[Document]) -> np.ndarray:
    """
    Create embeddings for a list of documents.

    :param documents: List of documents to embed.
    :return: Embeddings of documents, one per input document, shape: (documents, embedding_dim)
    """
    pass

embed_queries abstractmethod

embed_queries(queries: List[str]) -> np.ndarray

Create embeddings for a list of queries.

Parameters:

Name Type Description Default
queries List[str]

List of queries to embed.

required

Returns:

Type Description
ndarray

Embeddings, one per input query, shape: (queries, embedding_dim)

Source code in pipelines/pipelines/nodes/retriever/dense.py
@abstractmethod
def embed_queries(self, queries: List[str]) -> np.ndarray:
    """
    Create embeddings for a list of queries.

    :param queries: List of queries to embed.
    :return: Embeddings, one per input query, shape: (queries, embedding_dim)
    """
    pass

EmbeddingRetriever

Retriever that uses a bi-encoder (query model for query, passage model for passage).

Source code in pipelines/pipelines/nodes/retriever/dense.py
class EmbeddingRetriever(DenseRetriever):

    """
    Retriever that uses a bi-encoder (query model for query, passage model for passage).
    """

    def __init__(
        self,
        document_store: BaseDocumentStore,
        embedding_model: Union[Path, str] = "ernie-embedding-v1",
        max_seq_len: int = 384,
        top_k: int = 10,
        batch_size: int = 16,
        embed_title: bool = True,
        similarity_function: str = "dot_product",
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        scale_score: bool = True,
        progress_bar: bool = True,
        embed_meta_fields: Optional[List[str]] = None,
        mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
        **kwargs
    ):

        """
        Init the Retriever incl. the two encoder models from a local or remote model checkpoint.
        :param document_store: An instance of DocumentStore from which to retrieve documents.
        :param embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
                                      one used by paddlenlp transformers' models
                                      Currently available remote names: ``"ernie-embedding-v1"``
        :param top_k: How many documents to return per query.
        :param batch_size: Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.
        :param embed_title: Whether to concatenate title and passage to a text pair that is then used to create the embedding.
                            This is the approach used in the original paper and is likely to improve performance if your
                            titles contain meaningful information for retrieval (topic, entities etc.) .
                            The title is expected to be present in doc.meta["name"] and can be supplied in the documents
                            before writing them to the DocumentStore like this:
                            {"text": "my text", "meta": {"name": "my title"}}.
        :param similarity_function: Which function to apply for calculating the similarity of query and passage embeddings during training.
                                    Options: `dot_product` (Default) or `cosine`
        :param progress_bar: Whether to show a tqdm progress bar or not.
                             Can be helpful to disable in production deployments to keep the logs clean.
        """
        if api_key is None or secret_key is None:
            raise Exception(
                "Please apply api_key and secret_key from https://cloud.baidu.com/doc/WENXINWORKSHOP/s/alj562vvu"
            )
        if embed_meta_fields is None:
            embed_meta_fields = []
        super().__init__()
        self.api_key = api_key
        self.secret_key = secret_key
        self.batch_size = batch_size
        self.progress_bar = progress_bar
        self.document_store = document_store
        self.top_k = top_k
        self.embed_title = embed_title
        self.embedding_model = embedding_model
        self.max_seq_len = max_seq_len
        self.embed_meta_fields = embed_meta_fields
        self.scale_score = scale_score
        self.embedding_encoder = _EMBEDDING_ENCODERS[self.embedding_model](retriever=self)

    def retrieve(
        self,
        query: str,
        filters: Optional[FilterType] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        scale_score: Optional[bool] = None,
        document_store: Optional[BaseDocumentStore] = None,
    ) -> List[Document]:
        """
        Scan through the documents in a DocumentStore and return a small number of documents
        that are most relevant to the query.

        :param query: The query
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions.
                        Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                        operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                        `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                        Logical operator keys take a dictionary of metadata field names and/or logical operators as
                        value. Metadata field names take a dictionary of comparison operators as value. Comparison
                        operator keys take a single value or (in case of `"$in"`) a list of values as value.
                        If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                        operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                        operation.

                            __Example__:

                            ```python
                            filters = {
                                "$and": {
                                    "type": {"$eq": "article"},
                                    "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                    "rating": {"$gte": 3},
                                    "$or": {
                                        "genre": {"$in": ["economy", "politics"]},
                                        "publisher": {"$eq": "nytimes"}
                                    }
                                }
                            }
                            # or simpler using default operators
                            filters = {
                                "type": "article",
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": ["economy", "politics"],
                                    "publisher": "nytimes"
                                }
                            }
                            ```

                            To use the same logical operator multiple times on the same level, logical operators take
                            optionally a list of dictionaries as value.

                            __Example__:

                            ```python
                            filters = {
                                "$or": [
                                    {
                                        "$and": {
                                            "Type": "News Paper",
                                            "Date": {
                                                "$lt": "2019-01-01"
                                            }
                                        }
                                    },
                                    {
                                        "$and": {
                                            "Type": "Blog Post",
                                            "Date": {
                                                "$gte": "2019-01-01"
                                            }
                                        }
                                    }
                                ]
                            }
                            ```
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)
        :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                                           If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant.
                                           Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.
        :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
        """
        document_store = document_store or self.document_store
        if document_store is None:
            raise ValueError(
                "This Retriever was not initialized with a Document Store. Provide one to the retrieve() method."
            )
        if top_k is None:
            top_k = self.top_k
        if index is None:
            index = document_store.index
        if scale_score is None:
            scale_score = self.scale_score
        query_emb = self.embed_queries(queries=[query])
        documents = document_store.query_by_embedding(
            query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers
        )
        return documents

    def retrieve_batch(
        self,
        queries: List[str],
        filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        batch_size: Optional[int] = None,
        scale_score: Optional[bool] = None,
        document_store: Optional[BaseDocumentStore] = None,
    ) -> List[List[Document]]:
        """
        Scan through the documents in a DocumentStore and return a small number of documents
        that are most relevant to the supplied queries.

        Returns a list of lists of Documents (one per query).

        :param queries: List of query strings.
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions. Can be a single filter that will be applied to each query or a list of filters
                        (one filter per query).

                        Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                        operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                        `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                        Logical operator keys take a dictionary of metadata field names and/or logical operators as
                        value. Metadata field names take a dictionary of comparison operators as value. Comparison
                        operator keys take a single value or (in case of `"$in"`) a list of values as value.
                        If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                        operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                        operation.

                            __Example__:

                            ```python
                            filters = {
                                "$and": {
                                    "type": {"$eq": "article"},
                                    "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                    "rating": {"$gte": 3},
                                    "$or": {
                                        "genre": {"$in": ["economy", "politics"]},
                                        "publisher": {"$eq": "nytimes"}
                                    }
                                }
                            }
                            # or simpler using default operators
                            filters = {
                                "type": "article",
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": ["economy", "politics"],
                                    "publisher": "nytimes"
                                }
                            }
                            ```

                            To use the same logical operator multiple times on the same level, logical operators take
                            optionally a list of dictionaries as value.

                            __Example__:

                            ```python
                            filters = {
                                "$or": [
                                    {
                                        "$and": {
                                            "Type": "News Paper",
                                            "Date": {
                                                "$lt": "2019-01-01"
                                            }
                                        }
                                    },
                                    {
                                        "$and": {
                                            "Type": "Blog Post",
                                            "Date": {
                                                "$gte": "2019-01-01"
                                            }
                                        }
                                    }
                                ]
                            }
                            ```
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)
        :param batch_size: Number of queries to embed at a time.
        :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                            If true similarity scores (e.g. cosine or dot_product) which naturally have a different
                            value range will be scaled to a range of [0,1], where 1 means extremely relevant.
                            Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.
        :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
        """
        document_store = document_store or self.document_store
        if document_store is None:
            raise ValueError(
                "This Retriever was not initialized with a Document Store. Provide one to the retrieve_batch() method."
            )
        if top_k is None:
            top_k = self.top_k

        if batch_size is None:
            batch_size = self.batch_size

        if index is None:
            index = document_store.index
        if scale_score is None:
            scale_score = self.scale_score

        # embed_queries is already batched within by batch_size, so no need to batch the input here
        query_embs: np.ndarray = self.embed_queries(queries=queries)
        batched_query_embs: List[np.ndarray] = []
        for i in range(0, len(query_embs), batch_size):
            batched_query_embs.extend(query_embs[i : i + batch_size])
        documents = document_store.query_by_embedding_batch(
            query_embs=batched_query_embs,
            top_k=top_k,
            filters=filters,
            index=index,
            headers=headers,
            scale_score=scale_score,
        )

        return documents

    def embed_queries(self, queries: List[str]) -> np.ndarray:
        """
        Create embeddings for a list of queries.

        :param queries: List of queries to embed.
        :return: Embeddings, one per input query, shape: (queries, embedding_dim)
        """
        # for backward compatibility: cast pure str input
        if isinstance(queries, str):
            queries = [queries]
        assert isinstance(queries, list), "Expecting a list of texts, i.e. create_embeddings(texts=['text1',...])"
        return self.embedding_encoder.embed_queries(queries)

    def embed_documents(self, documents: List[Document]) -> np.ndarray:
        """
        Create embeddings for a list of documents.

        :param documents: List of documents to embed.
        :return: Embeddings, one per input document, shape: (docs, embedding_dim)
        """
        documents = self._preprocess_documents(documents)
        return self.embedding_encoder.embed_documents(documents)

    def _preprocess_documents(self, docs: List[Document]) -> List[Document]:
        """
        Turns table documents into text documents by representing the table in csv format.
        This allows us to use text embedding models for table retrieval.
        It also concatenates specified meta data fields with the text representations.

        :param docs: List of documents to linearize. If the document is not a table, it is returned as is.
        :return: List of documents with meta data + linearized tables or original documents if they are not tables.
        """
        linearized_docs = []
        for doc in docs:
            doc = deepcopy(doc)
            if doc.content_type == "table":
                if isinstance(doc.content, pd.DataFrame):
                    doc.content = doc.content.to_csv(index=False)
                else:
                    raise Exception("Documents of type 'table' need to have a pd.DataFrame as content field")
            # Gather all relevant metadata fields
            meta_data_fields = []
            for key in self.embed_meta_fields:
                if key in doc.meta and doc.meta[key]:
                    if isinstance(doc.meta[key], list):
                        meta_data_fields.extend([item for item in doc.meta[key]])
                    else:
                        meta_data_fields.append(doc.meta[key])
            # Convert to type string (e.g. for ints or floats)
            meta_data_fields = [str(field) for field in meta_data_fields]
            doc.content = "\n".join(meta_data_fields + [doc.content])
            linearized_docs.append(doc)
        return linearized_docs

__init__

__init__(document_store: BaseDocumentStore, embedding_model: Union[Path, str] = 'ernie-embedding-v1', max_seq_len: int = 384, top_k: int = 10, batch_size: int = 16, embed_title: bool = True, similarity_function: str = 'dot_product', api_key: Optional[str] = None, secret_key: Optional[str] = None, scale_score: bool = True, progress_bar: bool = True, embed_meta_fields: Optional[List[str]] = None, mode: Literal['snippets', 'raw_documents', 'preprocessed_documents'] = 'preprocessed_documents', **kwargs)

Init the Retriever incl. the two encoder models from a local or remote model checkpoint.

Parameters:

Name Type Description Default
document_store BaseDocumentStore

An instance of DocumentStore from which to retrieve documents.

required
embedding_model Union[Path, str]

Local path or remote name of question encoder checkpoint. The format equals the one used by paddlenlp transformers' models Currently available remote names: "ernie-embedding-v1"

'ernie-embedding-v1'
top_k int

How many documents to return per query.

10
batch_size int

Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.

16
embed_title bool

Whether to concatenate title and passage to a text pair that is then used to create the embedding. This is the approach used in the original paper and is likely to improve performance if your titles contain meaningful information for retrieval (topic, entities etc.) . The title is expected to be present in doc.meta["name"] and can be supplied in the documents before writing them to the DocumentStore like this: {"text": "my text", "meta": {"name": "my title"}}.

True
similarity_function str

Which function to apply for calculating the similarity of query and passage embeddings during training. Options: dot_product (Default) or cosine

'dot_product'
progress_bar bool

Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean.

True
Source code in pipelines/pipelines/nodes/retriever/dense.py
def __init__(
    self,
    document_store: BaseDocumentStore,
    embedding_model: Union[Path, str] = "ernie-embedding-v1",
    max_seq_len: int = 384,
    top_k: int = 10,
    batch_size: int = 16,
    embed_title: bool = True,
    similarity_function: str = "dot_product",
    api_key: Optional[str] = None,
    secret_key: Optional[str] = None,
    scale_score: bool = True,
    progress_bar: bool = True,
    embed_meta_fields: Optional[List[str]] = None,
    mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
    **kwargs
):

    """
    Init the Retriever incl. the two encoder models from a local or remote model checkpoint.
    :param document_store: An instance of DocumentStore from which to retrieve documents.
    :param embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
                                  one used by paddlenlp transformers' models
                                  Currently available remote names: ``"ernie-embedding-v1"``
    :param top_k: How many documents to return per query.
    :param batch_size: Number of questions or passages to encode at once. In case of multiple gpus, this will be the total batch size.
    :param embed_title: Whether to concatenate title and passage to a text pair that is then used to create the embedding.
                        This is the approach used in the original paper and is likely to improve performance if your
                        titles contain meaningful information for retrieval (topic, entities etc.) .
                        The title is expected to be present in doc.meta["name"] and can be supplied in the documents
                        before writing them to the DocumentStore like this:
                        {"text": "my text", "meta": {"name": "my title"}}.
    :param similarity_function: Which function to apply for calculating the similarity of query and passage embeddings during training.
                                Options: `dot_product` (Default) or `cosine`
    :param progress_bar: Whether to show a tqdm progress bar or not.
                         Can be helpful to disable in production deployments to keep the logs clean.
    """
    if api_key is None or secret_key is None:
        raise Exception(
            "Please apply api_key and secret_key from https://cloud.baidu.com/doc/WENXINWORKSHOP/s/alj562vvu"
        )
    if embed_meta_fields is None:
        embed_meta_fields = []
    super().__init__()
    self.api_key = api_key
    self.secret_key = secret_key
    self.batch_size = batch_size
    self.progress_bar = progress_bar
    self.document_store = document_store
    self.top_k = top_k
    self.embed_title = embed_title
    self.embedding_model = embedding_model
    self.max_seq_len = max_seq_len
    self.embed_meta_fields = embed_meta_fields
    self.scale_score = scale_score
    self.embedding_encoder = _EMBEDDING_ENCODERS[self.embedding_model](retriever=self)

embed_documents

embed_documents(documents: List[Document]) -> np.ndarray

Create embeddings for a list of documents.

Parameters:

Name Type Description Default
documents List[Document]

List of documents to embed.

required

Returns:

Type Description
ndarray

Embeddings, one per input document, shape: (docs, embedding_dim)

Source code in pipelines/pipelines/nodes/retriever/dense.py
def embed_documents(self, documents: List[Document]) -> np.ndarray:
    """
    Create embeddings for a list of documents.

    :param documents: List of documents to embed.
    :return: Embeddings, one per input document, shape: (docs, embedding_dim)
    """
    documents = self._preprocess_documents(documents)
    return self.embedding_encoder.embed_documents(documents)

embed_queries

embed_queries(queries: List[str]) -> np.ndarray

Create embeddings for a list of queries.

Parameters:

Name Type Description Default
queries List[str]

List of queries to embed.

required

Returns:

Type Description
ndarray

Embeddings, one per input query, shape: (queries, embedding_dim)

Source code in pipelines/pipelines/nodes/retriever/dense.py
def embed_queries(self, queries: List[str]) -> np.ndarray:
    """
    Create embeddings for a list of queries.

    :param queries: List of queries to embed.
    :return: Embeddings, one per input query, shape: (queries, embedding_dim)
    """
    # for backward compatibility: cast pure str input
    if isinstance(queries, str):
        queries = [queries]
    assert isinstance(queries, list), "Expecting a list of texts, i.e. create_embeddings(texts=['text1',...])"
    return self.embedding_encoder.embed_queries(queries)

retrieve

retrieve(query: str, filters: Optional[FilterType] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None) -> List[Document]

Scan through the documents in a DocumentStore and return a small number of documents that are most relevant to the query.

Parameters:

Name Type Description Default
query str

The query

required
filters Optional[FilterType]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator ("$and", "$or", "$not"), a comparison operator ("$eq", "$in", "$gt", "$gte", "$lt", "$lte") or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of "$in") a list of values as value. If no logical operator is provided, "$and" is used as default operation. If no comparison operator is provided, "$eq" (or "$in" if the comparison value is a list) is used as default operation. Example: python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. Example: python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] }

None
top_k Optional[int]

How many documents to return per query.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents

None
headers Optional[Dict[str, str]]

Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)

None
scale_score Optional[bool]

Whether to scale the similarity score to the unit interval (range of [0,1]). If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.

None
document_store Optional[BaseDocumentStore]

the docstore to use for retrieval. If None, the one given in the __init__ is used instead.

None
Source code in pipelines/pipelines/nodes/retriever/dense.py
def retrieve(
    self,
    query: str,
    filters: Optional[FilterType] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    scale_score: Optional[bool] = None,
    document_store: Optional[BaseDocumentStore] = None,
) -> List[Document]:
    """
    Scan through the documents in a DocumentStore and return a small number of documents
    that are most relevant to the query.

    :param query: The query
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions.
                    Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                    operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                    `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                    Logical operator keys take a dictionary of metadata field names and/or logical operators as
                    value. Metadata field names take a dictionary of comparison operators as value. Comparison
                    operator keys take a single value or (in case of `"$in"`) a list of values as value.
                    If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                    operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                    operation.

                        __Example__:

                        ```python
                        filters = {
                            "$and": {
                                "type": {"$eq": "article"},
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": {"$in": ["economy", "politics"]},
                                    "publisher": {"$eq": "nytimes"}
                                }
                            }
                        }
                        # or simpler using default operators
                        filters = {
                            "type": "article",
                            "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                            "rating": {"$gte": 3},
                            "$or": {
                                "genre": ["economy", "politics"],
                                "publisher": "nytimes"
                            }
                        }
                        ```

                        To use the same logical operator multiple times on the same level, logical operators take
                        optionally a list of dictionaries as value.

                        __Example__:

                        ```python
                        filters = {
                            "$or": [
                                {
                                    "$and": {
                                        "Type": "News Paper",
                                        "Date": {
                                            "$lt": "2019-01-01"
                                        }
                                    }
                                },
                                {
                                    "$and": {
                                        "Type": "Blog Post",
                                        "Date": {
                                            "$gte": "2019-01-01"
                                        }
                                    }
                                }
                            ]
                        }
                        ```
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)
    :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                                       If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant.
                                       Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.
    :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
    """
    document_store = document_store or self.document_store
    if document_store is None:
        raise ValueError(
            "This Retriever was not initialized with a Document Store. Provide one to the retrieve() method."
        )
    if top_k is None:
        top_k = self.top_k
    if index is None:
        index = document_store.index
    if scale_score is None:
        scale_score = self.scale_score
    query_emb = self.embed_queries(queries=[query])
    documents = document_store.query_by_embedding(
        query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers
    )
    return documents

retrieve_batch

retrieve_batch(queries: List[str], filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, batch_size: Optional[int] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None) -> List[List[Document]]

Scan through the documents in a DocumentStore and return a small number of documents that are most relevant to the supplied queries.

Returns a list of lists of Documents (one per query).

Parameters:

Name Type Description Default
queries List[str]

List of query strings.

required
filters Optional[Union[FilterType, List[Optional[FilterType]]]]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Can be a single filter that will be applied to each query or a list of filters (one filter per query). Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator ("$and", "$or", "$not"), a comparison operator ("$eq", "$in", "$gt", "$gte", "$lt", "$lte") or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of "$in") a list of values as value. If no logical operator is provided, "$and" is used as default operation. If no comparison operator is provided, "$eq" (or "$in" if the comparison value is a list) is used as default operation. Example: python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. Example: python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] }

None
top_k Optional[int]

How many documents to return per query.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents

None
headers Optional[Dict[str, str]]

Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)

None
batch_size Optional[int]

Number of queries to embed at a time.

None
scale_score Optional[bool]

Whether to scale the similarity score to the unit interval (range of [0,1]). If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.

None
document_store Optional[BaseDocumentStore]

the docstore to use for retrieval. If None, the one given in the __init__ is used instead.

None
Source code in pipelines/pipelines/nodes/retriever/dense.py
def retrieve_batch(
    self,
    queries: List[str],
    filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    batch_size: Optional[int] = None,
    scale_score: Optional[bool] = None,
    document_store: Optional[BaseDocumentStore] = None,
) -> List[List[Document]]:
    """
    Scan through the documents in a DocumentStore and return a small number of documents
    that are most relevant to the supplied queries.

    Returns a list of lists of Documents (one per query).

    :param queries: List of query strings.
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions. Can be a single filter that will be applied to each query or a list of filters
                    (one filter per query).

                    Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                    operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                    `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                    Logical operator keys take a dictionary of metadata field names and/or logical operators as
                    value. Metadata field names take a dictionary of comparison operators as value. Comparison
                    operator keys take a single value or (in case of `"$in"`) a list of values as value.
                    If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                    operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                    operation.

                        __Example__:

                        ```python
                        filters = {
                            "$and": {
                                "type": {"$eq": "article"},
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": {"$in": ["economy", "politics"]},
                                    "publisher": {"$eq": "nytimes"}
                                }
                            }
                        }
                        # or simpler using default operators
                        filters = {
                            "type": "article",
                            "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                            "rating": {"$gte": 3},
                            "$or": {
                                "genre": ["economy", "politics"],
                                "publisher": "nytimes"
                            }
                        }
                        ```

                        To use the same logical operator multiple times on the same level, logical operators take
                        optionally a list of dictionaries as value.

                        __Example__:

                        ```python
                        filters = {
                            "$or": [
                                {
                                    "$and": {
                                        "Type": "News Paper",
                                        "Date": {
                                            "$lt": "2019-01-01"
                                        }
                                    }
                                },
                                {
                                    "$and": {
                                        "Type": "Blog Post",
                                        "Date": {
                                            "$gte": "2019-01-01"
                                        }
                                    }
                                }
                            ]
                        }
                        ```
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication)
    :param batch_size: Number of queries to embed at a time.
    :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                        If true similarity scores (e.g. cosine or dot_product) which naturally have a different
                        value range will be scaled to a range of [0,1], where 1 means extremely relevant.
                        Otherwise raw similarity scores (e.g. cosine or dot_product) will be used.
    :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
    """
    document_store = document_store or self.document_store
    if document_store is None:
        raise ValueError(
            "This Retriever was not initialized with a Document Store. Provide one to the retrieve_batch() method."
        )
    if top_k is None:
        top_k = self.top_k

    if batch_size is None:
        batch_size = self.batch_size

    if index is None:
        index = document_store.index
    if scale_score is None:
        scale_score = self.scale_score

    # embed_queries is already batched within by batch_size, so no need to batch the input here
    query_embs: np.ndarray = self.embed_queries(queries=queries)
    batched_query_embs: List[np.ndarray] = []
    for i in range(0, len(query_embs), batch_size):
        batched_query_embs.extend(query_embs[i : i + batch_size])
    documents = document_store.query_by_embedding_batch(
        query_embs=batched_query_embs,
        top_k=top_k,
        filters=filters,
        index=index,
        headers=headers,
        scale_score=scale_score,
    )

    return documents

pipelines.pipelines.nodes.retriever.embedder

MultiModalEmbedder

Source code in pipelines/pipelines/nodes/retriever/embedder.py
class MultiModalEmbedder:
    def __init__(
        self,
        embedding_models: Dict[str, Union[Path, str]],  # replace str with ContentTypes starting from Python3.8
        feature_extractors_params: Optional[Dict[str, Dict[str, Any]]] = None,
        batch_size: int = 16,
        embed_meta_fields: List[str] = ["name"],
        progress_bar: bool = True,
    ):
        """
        Init the Retriever and all its models from a local or remote model checkpoint.
        :param embedding_models: A dictionary matching a local path or remote name of encoder checkpoint with
            the content type it should handle ("text",  "image", etc...).
            Expected input format: `{'text': 'name_or_path_to_text_model', 'image': 'name_or_path_to_image_model', ... }`
            Keep in mind that the models should output in the same embedding space for this retriever to work.
        :param feature_extractors_params: A dictionary matching a content type ("text",  "image" and so on) with the
            parameters of its own feature extractor if the model requires one.
            Expected input format: `{'text': {'param_name': 'param_value', ...}, 'image': {'param_name': 'param_value', ...}, ...}`
        :param batch_size: Number of questions or passages to encode at once. In case of multiple GPUs, this will be the total batch size.
        :param embed_meta_fields: Concatenate the provided meta fields and text passage / image to a text pair that is
                                  then used to create the embedding.
                                  This is the approach used in the original paper and is likely to improve
                                  performance if your titles contain meaningful information for retrieval
                                  (topic, entities etc.).
        :param progress_bar: Whether to show a tqdm progress bar or not.
                             Can be helpful to disable in production deployments to keep the logs clean.
        """
        super().__init__()

        self.batch_size = batch_size
        self.progress_bar = progress_bar
        self.embed_meta_fields = embed_meta_fields

        feature_extractors_params = {
            content_type: {"max_length": 256, **(feature_extractors_params or {}).get(content_type, {})}
            for content_type in ["text", "image"]  # FIXME get_args(ContentTypes) from Python3.8 on
        }

        self.models = {}  # replace str with ContentTypes starting from Python3.8
        for content_type, embedding_model in embedding_models.items():
            if content_type in ["text", "image"]:
                self.models[content_type] = Taskflow("feature_extraction", model=embedding_model)
            else:
                raise ValueError(f"{content_type} is not a supported content.")

        # Check embedding sizes for models: they must all match
        if len(self.models) > 1:
            sizes = {model.embedding_dim for model in self.models.values()}
            if None in sizes:
                logger.warning(
                    "Pipelines could not find the output embedding dimensions for '%s'. "
                    "Dimensions won't be checked before computing the embeddings.",
                    ", ".join(
                        {
                            str(model.model_name_or_path)
                            for model in self.models.values()
                            if model.embedding_dim is None
                        }
                    ),
                )
            elif len(sizes) > 1:
                embedding_sizes: Dict[int, List[str]] = {}
                for model in self.models.values():
                    embedding_sizes[model.embedding_dim] = embedding_sizes.get(model.embedding_dim, []) + [
                        str(model.model_name_or_path)
                    ]
                raise ValueError(f"Not all models have the same embedding size: {embedding_sizes}")

    def embed(self, documents: List[Document], batch_size: Optional[int] = None) -> np.ndarray:
        """
        Create embeddings for a list of documents using the relevant encoder for their content type.
        :param documents: Documents to embed.
        :return: Embeddings, one per document, in the form of a np.array
        """
        batch_size = batch_size if batch_size is not None else self.batch_size

        all_embeddings = []
        for batch_index in tqdm(
            iterable=range(0, len(documents), batch_size),
            unit=" Docs",
            desc="Create embeddings",
            position=1,
            leave=False,
            disable=not self.progress_bar,
        ):
            docs_batch = documents[batch_index : batch_index + batch_size]
            data_by_type = self._docs_to_data(documents=docs_batch)

            # Get output for each model
            outputs_by_type: Dict[str, paddle.Tensor] = {}  # replace str with ContentTypes starting Python3.8
            for data_type, data in data_by_type.items():

                model = self.models.get(data_type)
                if not model:
                    raise Exception(
                        f"Some data of type {data_type} was passed, but no model capable of handling such data was "
                        f"initialized. Initialized models: {', '.join(self.models.keys())}"
                    )
                outputs_by_type[data_type] = model(data)["features"]
            # Check the output sizes
            embedding_sizes = [output.shape[-1] for output in outputs_by_type.values()]

            if not all(embedding_size == embedding_sizes[0] for embedding_size in embedding_sizes):
                raise Exception(
                    "Some of the models are using a different embedding size. They should all match. "
                    f"Embedding sizes by model: "
                    f"{ {name: output.shape[-1] for name, output in outputs_by_type.items()} }"
                )

            # Combine the outputs in a single matrix
            outputs = paddle.stack(list(outputs_by_type.values()))
            embeddings = outputs.reshape([-1, embedding_sizes[0]])
            embeddings = embeddings.cpu()
            all_embeddings.append(embeddings)
        return np.concatenate(all_embeddings)

    def _docs_to_data(
        self, documents: List[Document]
    ) -> Dict[str, List[Any]]:  # FIXME replace str to ContentTypes from Python3.8
        """
        Extract the data to embed from each document and return them classified by content type.
        :param documents: The documents to prepare fur multimodal embedding.
        :return: A dictionary containing one key for each content type, and a list of data extracted
            from each document, ready to be passed to the feature extractor (for example the content
            of a text document, a linearized table, a PIL image object, and so on)
        """
        docs_data: Dict[str, List[Any]] = {  # FIXME replace str to ContentTypes from Python3.8
            key: [] for key in ["text", "image"]
        }  # FIXME get_args(ContentTypes) from Python3.8 on
        for doc in documents:
            try:
                document_converter = DOCUMENT_CONVERTERS[doc.content_type]
            except KeyError:
                raise Exception(
                    f"Unknown content type '{doc.content_type}'. Known types: 'text', 'image'."  # FIXME {', '.join(get_args(ContentTypes))}"  from Python3.8 on
                )

            data = document_converter(doc)

            if doc.content_type in CAN_EMBED_META:
                meta = [v for k, v in (doc.meta or {}).items() if k in self.embed_meta_fields]
                data = f"{' '.join(meta)} {data}" if meta else data

            docs_data[doc.content_type].append(data)

        return {key: values for key, values in docs_data.items() if values}

__init__

__init__(embedding_models: Dict[str, Union[Path, str]], feature_extractors_params: Optional[Dict[str, Dict[str, Any]]] = None, batch_size: int = 16, embed_meta_fields: List[str] = ['name'], progress_bar: bool = True)

Init the Retriever and all its models from a local or remote model checkpoint.

Parameters:

Name Type Description Default
embedding_models Dict[str, Union[Path, str]]

A dictionary matching a local path or remote name of encoder checkpoint with the content type it should handle ("text", "image", etc...). Expected input format: {'text': 'name_or_path_to_text_model', 'image': 'name_or_path_to_image_model', ... } Keep in mind that the models should output in the same embedding space for this retriever to work.

required
feature_extractors_params Optional[Dict[str, Dict[str, Any]]]

A dictionary matching a content type ("text", "image" and so on) with the parameters of its own feature extractor if the model requires one. Expected input format: {'text': {'param_name': 'param_value', ...}, 'image': {'param_name': 'param_value', ...}, ...}

None
batch_size int

Number of questions or passages to encode at once. In case of multiple GPUs, this will be the total batch size.

16
embed_meta_fields List[str]

Concatenate the provided meta fields and text passage / image to a text pair that is then used to create the embedding. This is the approach used in the original paper and is likely to improve performance if your titles contain meaningful information for retrieval (topic, entities etc.).

['name']
progress_bar bool

Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean.

True
Source code in pipelines/pipelines/nodes/retriever/embedder.py
def __init__(
    self,
    embedding_models: Dict[str, Union[Path, str]],  # replace str with ContentTypes starting from Python3.8
    feature_extractors_params: Optional[Dict[str, Dict[str, Any]]] = None,
    batch_size: int = 16,
    embed_meta_fields: List[str] = ["name"],
    progress_bar: bool = True,
):
    """
    Init the Retriever and all its models from a local or remote model checkpoint.
    :param embedding_models: A dictionary matching a local path or remote name of encoder checkpoint with
        the content type it should handle ("text",  "image", etc...).
        Expected input format: `{'text': 'name_or_path_to_text_model', 'image': 'name_or_path_to_image_model', ... }`
        Keep in mind that the models should output in the same embedding space for this retriever to work.
    :param feature_extractors_params: A dictionary matching a content type ("text",  "image" and so on) with the
        parameters of its own feature extractor if the model requires one.
        Expected input format: `{'text': {'param_name': 'param_value', ...}, 'image': {'param_name': 'param_value', ...}, ...}`
    :param batch_size: Number of questions or passages to encode at once. In case of multiple GPUs, this will be the total batch size.
    :param embed_meta_fields: Concatenate the provided meta fields and text passage / image to a text pair that is
                              then used to create the embedding.
                              This is the approach used in the original paper and is likely to improve
                              performance if your titles contain meaningful information for retrieval
                              (topic, entities etc.).
    :param progress_bar: Whether to show a tqdm progress bar or not.
                         Can be helpful to disable in production deployments to keep the logs clean.
    """
    super().__init__()

    self.batch_size = batch_size
    self.progress_bar = progress_bar
    self.embed_meta_fields = embed_meta_fields

    feature_extractors_params = {
        content_type: {"max_length": 256, **(feature_extractors_params or {}).get(content_type, {})}
        for content_type in ["text", "image"]  # FIXME get_args(ContentTypes) from Python3.8 on
    }

    self.models = {}  # replace str with ContentTypes starting from Python3.8
    for content_type, embedding_model in embedding_models.items():
        if content_type in ["text", "image"]:
            self.models[content_type] = Taskflow("feature_extraction", model=embedding_model)
        else:
            raise ValueError(f"{content_type} is not a supported content.")

    # Check embedding sizes for models: they must all match
    if len(self.models) > 1:
        sizes = {model.embedding_dim for model in self.models.values()}
        if None in sizes:
            logger.warning(
                "Pipelines could not find the output embedding dimensions for '%s'. "
                "Dimensions won't be checked before computing the embeddings.",
                ", ".join(
                    {
                        str(model.model_name_or_path)
                        for model in self.models.values()
                        if model.embedding_dim is None
                    }
                ),
            )
        elif len(sizes) > 1:
            embedding_sizes: Dict[int, List[str]] = {}
            for model in self.models.values():
                embedding_sizes[model.embedding_dim] = embedding_sizes.get(model.embedding_dim, []) + [
                    str(model.model_name_or_path)
                ]
            raise ValueError(f"Not all models have the same embedding size: {embedding_sizes}")

embed

embed(documents: List[Document], batch_size: Optional[int] = None) -> np.ndarray

Create embeddings for a list of documents using the relevant encoder for their content type.

Parameters:

Name Type Description Default
documents List[Document]

Documents to embed.

required

Returns:

Type Description
ndarray

Embeddings, one per document, in the form of a np.array

Source code in pipelines/pipelines/nodes/retriever/embedder.py
def embed(self, documents: List[Document], batch_size: Optional[int] = None) -> np.ndarray:
    """
    Create embeddings for a list of documents using the relevant encoder for their content type.
    :param documents: Documents to embed.
    :return: Embeddings, one per document, in the form of a np.array
    """
    batch_size = batch_size if batch_size is not None else self.batch_size

    all_embeddings = []
    for batch_index in tqdm(
        iterable=range(0, len(documents), batch_size),
        unit=" Docs",
        desc="Create embeddings",
        position=1,
        leave=False,
        disable=not self.progress_bar,
    ):
        docs_batch = documents[batch_index : batch_index + batch_size]
        data_by_type = self._docs_to_data(documents=docs_batch)

        # Get output for each model
        outputs_by_type: Dict[str, paddle.Tensor] = {}  # replace str with ContentTypes starting Python3.8
        for data_type, data in data_by_type.items():

            model = self.models.get(data_type)
            if not model:
                raise Exception(
                    f"Some data of type {data_type} was passed, but no model capable of handling such data was "
                    f"initialized. Initialized models: {', '.join(self.models.keys())}"
                )
            outputs_by_type[data_type] = model(data)["features"]
        # Check the output sizes
        embedding_sizes = [output.shape[-1] for output in outputs_by_type.values()]

        if not all(embedding_size == embedding_sizes[0] for embedding_size in embedding_sizes):
            raise Exception(
                "Some of the models are using a different embedding size. They should all match. "
                f"Embedding sizes by model: "
                f"{ {name: output.shape[-1] for name, output in outputs_by_type.items()} }"
            )

        # Combine the outputs in a single matrix
        outputs = paddle.stack(list(outputs_by_type.values()))
        embeddings = outputs.reshape([-1, embedding_sizes[0]])
        embeddings = embeddings.cpu()
        all_embeddings.append(embeddings)
    return np.concatenate(all_embeddings)

pipelines.pipelines.nodes.retriever.ernie_encoder

ErnieEmbeddingEncoder

Source code in pipelines/pipelines/nodes/retriever/ernie_encoder.py
class ErnieEmbeddingEncoder(_BaseEmbeddingEncoder):
    def __init__(self, retriever: "EmbeddingRetriever"):
        self.api_key = retriever.api_key
        self.secret_key = retriever.secret_key
        self.batch_size = min(16, retriever.batch_size)
        self.progress_bar = retriever.progress_bar
        self.token = self._apply_token(self.api_key, self.secret_key)
        self._setup_encoding_models(retriever.embedding_model, retriever.max_seq_len)

    def _setup_encoding_models(self, model_name: str, max_seq_len: int):
        """
        Setup the encoding models for the retriever.
        """
        # new generation of embedding models (December 2022), we need to specify the full name
        if model_name.startswith("ernie"):
            self.query_encoder_model = model_name
            self.doc_encoder_model = model_name
            self.max_seq_len = min(384, max_seq_len)

    def _apply_token(self, api_key, secret_key):
        headers = {"Content-Type": "application/json", "Accept": "application/json"}
        payload = ""
        token_host = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={api_key}&client_secret={secret_key}"
        response = requests.request("POST", token_host, headers=headers, data=payload)

        if response:
            res = response.json()
        else:
            raise RuntimeError("Request access token error.")

        return res["access_token"]

    def _ensure_text_limit(self, text: str) -> str:
        """
        Ensure that length of the text is within the maximum length of the model.
        OpenAI v1 embedding models have a limit of 2046 tokens, and v2 models have a limit of 8191 tokens.
        """
        n_tokens = len(text)
        if n_tokens <= self.max_seq_len:
            return text

        logger.warning(
            "The prompt has been truncated from %s tokens to %s tokens to fit within the max token limit."
            " Reduce the length of the prompt to prevent it from being cut off.",
            n_tokens,
            self.max_seq_len,
        )

        tokenized_payload = text[: self.max_seq_len]

        return tokenized_payload

    def embed(self, model: str, text: List[str]) -> np.ndarray:
        generated_embeddings: List[Any] = []
        headers: Dict[str, str] = {"Content-Type": "application/json"}

        payload = json.dumps(
            {
                "input": text,
            }
        )
        headers = {"Content-Type": "application/json"}
        url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings/embedding-v1?access_token={}".format(
            self.token
        )
        try:
            response = requests.request("POST", url, headers=headers, data=payload)
            response_json = json.loads(response.text)
            embedding_data = response_json["data"]
        except Exception as e:
            logger.error(e)
            logger.error(response_json)

        generated_embeddings = [item["embedding"] for item in embedding_data]

        return generated_embeddings

    def embed_batch(self, model: str, text: List[str]) -> np.ndarray:
        all_embeddings = []
        for i in tqdm(
            range(0, len(text), self.batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
        ):
            batch = text[i : i + self.batch_size]
            batch_limited = [self._ensure_text_limit(content) for content in batch]
            generated_embeddings = self.embed(model, batch_limited)
            all_embeddings.append(generated_embeddings)
        return np.concatenate(all_embeddings)

    def embed_queries(self, queries: List[str]) -> np.ndarray:
        return self.embed_batch(self.query_encoder_model, queries)

    def embed_documents(self, docs: List[Document]) -> np.ndarray:
        return self.embed_batch(self.doc_encoder_model, [d.content for d in docs])

pipelines.pipelines.nodes.retriever.multimodal_retriever

MultiModalRetriever

Source code in pipelines/pipelines/nodes/retriever/multimodal_retriever.py
class MultiModalRetriever(BaseRetriever):
    def __init__(
        self,
        document_store: BaseDocumentStore,
        query_embedding_model: Union[Path, str],
        document_embedding_models: Dict[str, Union[Path, str]],  # Replace str with ContentTypes starting Python3.8
        query_type: str = "text",  # Replace str with ContentTypes starting Python3.8
        query_feature_extractor_params: Dict[str, Any] = {"max_length": 64},
        document_feature_extractors_params: Dict[str, Dict[str, Any]] = {"text": {"max_length": 256}},
        top_k: int = 10,
        batch_size: int = 16,
        embed_meta_fields: List[str] = ["name"],
        similarity_function: str = "dot_product",
        progress_bar: bool = True,
        scale_score: bool = True,
    ):
        """
        Retriever that uses a multiple encoder to jointly retrieve among a database consisting of different
        data types.
        :param document_store: An instance of DocumentStore from which to retrieve documents.
        :param query_embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
            one used by Hugging Face transformers' modelhub models.
        :param document_embedding_models: Dictionary matching a local path or remote name of document encoder
            checkpoint with the content type it should handle ("text", "table", "image", and so on).
            The format equals the one used by Hugging Face transformers' modelhub models.
        :param query_type: The content type of the query ("text", "image" and so on).
        :param query_feature_extraction_params: The parameters to pass to the feature extractor of the query.
        :param document_feature_extraction_params: The parameters to pass to the feature extractor of the documents.
        :param top_k: How many documents to return per query.
        :param batch_size: Number of questions or documents to encode at once. For multiple GPUs, this is
            the total batch size.
        :param embed_meta_fields: Concatenate the provided meta fields to a (text) pair that is then used to create
            the embedding. This is likely to improve performance if your titles contain meaningful information
            for retrieval (topic, entities, and so on). Note that only text and table documents support this feature.
        :param similarity_function: Which function to apply for calculating the similarity of query and document
            embeddings during training. Options: `dot_product` (default) or `cosine`.
        :param progress_bar: Whether to show a tqdm progress bar or not.
            Can be helpful to disable in production deployments to keep the logs clean.
        :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
            If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value
            range are scaled to a range of [0,1], where 1 means extremely relevant.
            Otherwise raw similarity scores (for example, cosine or dot_product) are used.
        """
        super().__init__()

        self.similarity_function = similarity_function
        self.progress_bar = progress_bar
        self.top_k = top_k
        self.scale_score = scale_score
        self.query_type = query_type

        self.document_embedder = MultiModalEmbedder(
            embedding_models=document_embedding_models,
            feature_extractors_params=document_feature_extractors_params,
            batch_size=batch_size,
            embed_meta_fields=embed_meta_fields,
            progress_bar=progress_bar,
        )

        # # Try to reuse the same embedder for queries if there is overlap
        if document_embedding_models.get(query_type, None) == query_embedding_model:
            self.query_embedder = self.document_embedder
        else:
            self.query_embedder = MultiModalEmbedder(
                embedding_models={query_type: query_embedding_model},
                feature_extractors_params={query_type: query_feature_extractor_params},
                batch_size=batch_size,
                embed_meta_fields=embed_meta_fields,
                progress_bar=progress_bar,
            )

        self.document_store = document_store

    def retrieve(  # type: ignore
        self,
        query: Any,
        query_type: Optional[ContentTypes] = None,
        filters: Optional[FilterType] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        scale_score: Optional[bool] = None,
        document_store: Optional[BaseDocumentStore] = None,
    ) -> List[Document]:
        """
        Scan through documents in DocumentStore and return a small number of documents that are most relevant to the
        supplied query. Returns a list of Documents.
        :param query: Query value. It might be text, a path, a table, and so on.
        :param query_type: Type of the query ("text", "table", "image" and so on).
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions. It can be a single filter applied to each query or a list of filters
                        (one filter per query).
        :param top_k: How many documents to return per query. Must be > 0.
        :param index: The name of the index in the DocumentStore from which to retrieve documents.
        :param batch_size: Number of queries to embed at a time. Must be > 0.
        :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                            If true, similarity scores (for example, cosine or dot_product) which naturally have a different
                            value range is scaled to a range of [0,1], where 1 means extremely relevant.
                            Otherwise raw similarity scores (for example, cosine or dot_product) are used.
        """
        if query_type is None:
            query_type = self.query_type
        return self.retrieve_batch(
            queries=[query],
            queries_type=query_type,
            filters=[filters],
            top_k=top_k,
            index=index,
            headers=headers,
            batch_size=1,
            scale_score=scale_score,
            document_store=document_store,
        )[0]

    def retrieve_batch(  # type: ignore
        self,
        queries: List[Any],
        queries_type: Optional[ContentTypes] = None,
        filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        batch_size: Optional[int] = None,
        scale_score: Optional[bool] = None,
        document_store: Optional[BaseDocumentStore] = None,
    ) -> List[List[Document]]:
        """
        Scan through documents in DocumentStore and return a small number of documents that are most relevant to the
        supplied queries. Returns a list of lists of Documents (one list per query).
        This method assumes all queries are of the same data type. Mixed-type query batches (for example one image and one text)
        are currently not supported. Group the queries by type and call `retrieve()` on uniform batches only.
        :param queries: List of query values. They might be text, paths, tables, and so on.
        :param queries_type: Type of the query ("text", "table", "image" and so on)
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions. It can be a single filter that will be applied to each query or a list of filters
                        (one filter per query).
        :param top_k: How many documents to return per query. Must be > 0.
        :param index: The name of the index in the DocumentStore from which to retrieve documents.
        :param batch_size: Number of queries to embed at a time. Must be > 0.
        :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                            If True, similarity scores (for example, cosine or dot_product) which naturally have a different
                            value range are scaled to a range of [0,1], where 1 means extremely relevant.
                            Otherwise raw similarity scores (for example, cosine or dot_product) are used.
        """
        top_k = top_k or self.top_k
        document_store = document_store or self.document_store
        if not document_store:
            raise ValueError(
                "This Retriever was not initialized with a Document Store. Provide one to the retrieve() or retrieve_batch() method."
            )
        index = index or document_store.index
        scale_score = scale_score or self.scale_score

        # Embed the queries - we need them into Document format to leverage MultiModalEmbedder.embed()
        query_docs = [Document(content=query, content_type=queries_type) for query in queries]
        query_embeddings = self.query_embedder.embed(documents=query_docs, batch_size=batch_size)
        # Query documents by embedding (the actual retrieval step)
        documents = document_store.query_by_embedding_batch(
            query_embs=query_embeddings,
            top_k=top_k,
            filters=filters,
            index=index,
            headers=headers,
        )
        return documents

    def embed_documents(self, docs: List[Document]) -> np.ndarray:
        return self.document_embedder.embed(documents=docs)

    def embed_queries(self, queries: List[str]) -> np.ndarray:
        query_documents = [Document(content=query, content_type="text") for query in queries]
        return self.query_embedder.embed(documents=query_documents)

__init__

__init__(document_store: BaseDocumentStore, query_embedding_model: Union[Path, str], document_embedding_models: Dict[str, Union[Path, str]], query_type: str = 'text', query_feature_extractor_params: Dict[str, Any] = {'max_length': 64}, document_feature_extractors_params: Dict[str, Dict[str, Any]] = {'text': {'max_length': 256}}, top_k: int = 10, batch_size: int = 16, embed_meta_fields: List[str] = ['name'], similarity_function: str = 'dot_product', progress_bar: bool = True, scale_score: bool = True)

Retriever that uses a multiple encoder to jointly retrieve among a database consisting of different data types.

Parameters:

Name Type Description Default
document_store BaseDocumentStore

An instance of DocumentStore from which to retrieve documents.

required
query_embedding_model Union[Path, str]

Local path or remote name of question encoder checkpoint. The format equals the one used by Hugging Face transformers' modelhub models.

required
document_embedding_models Dict[str, Union[Path, str]]

Dictionary matching a local path or remote name of document encoder checkpoint with the content type it should handle ("text", "table", "image", and so on). The format equals the one used by Hugging Face transformers' modelhub models.

required
query_type str

The content type of the query ("text", "image" and so on).

'text'
query_feature_extraction_params

The parameters to pass to the feature extractor of the query.

required
document_feature_extraction_params

The parameters to pass to the feature extractor of the documents.

required
top_k int

How many documents to return per query.

10
batch_size int

Number of questions or documents to encode at once. For multiple GPUs, this is the total batch size.

16
embed_meta_fields List[str]

Concatenate the provided meta fields to a (text) pair that is then used to create the embedding. This is likely to improve performance if your titles contain meaningful information for retrieval (topic, entities, and so on). Note that only text and table documents support this feature.

['name']
similarity_function str

Which function to apply for calculating the similarity of query and document embeddings during training. Options: dot_product (default) or cosine.

'dot_product'
progress_bar bool

Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean.

True
scale_score bool

Whether to scale the similarity score to the unit interval (range of [0,1]). If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value range are scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (for example, cosine or dot_product) are used.

True
Source code in pipelines/pipelines/nodes/retriever/multimodal_retriever.py
def __init__(
    self,
    document_store: BaseDocumentStore,
    query_embedding_model: Union[Path, str],
    document_embedding_models: Dict[str, Union[Path, str]],  # Replace str with ContentTypes starting Python3.8
    query_type: str = "text",  # Replace str with ContentTypes starting Python3.8
    query_feature_extractor_params: Dict[str, Any] = {"max_length": 64},
    document_feature_extractors_params: Dict[str, Dict[str, Any]] = {"text": {"max_length": 256}},
    top_k: int = 10,
    batch_size: int = 16,
    embed_meta_fields: List[str] = ["name"],
    similarity_function: str = "dot_product",
    progress_bar: bool = True,
    scale_score: bool = True,
):
    """
    Retriever that uses a multiple encoder to jointly retrieve among a database consisting of different
    data types.
    :param document_store: An instance of DocumentStore from which to retrieve documents.
    :param query_embedding_model: Local path or remote name of question encoder checkpoint. The format equals the
        one used by Hugging Face transformers' modelhub models.
    :param document_embedding_models: Dictionary matching a local path or remote name of document encoder
        checkpoint with the content type it should handle ("text", "table", "image", and so on).
        The format equals the one used by Hugging Face transformers' modelhub models.
    :param query_type: The content type of the query ("text", "image" and so on).
    :param query_feature_extraction_params: The parameters to pass to the feature extractor of the query.
    :param document_feature_extraction_params: The parameters to pass to the feature extractor of the documents.
    :param top_k: How many documents to return per query.
    :param batch_size: Number of questions or documents to encode at once. For multiple GPUs, this is
        the total batch size.
    :param embed_meta_fields: Concatenate the provided meta fields to a (text) pair that is then used to create
        the embedding. This is likely to improve performance if your titles contain meaningful information
        for retrieval (topic, entities, and so on). Note that only text and table documents support this feature.
    :param similarity_function: Which function to apply for calculating the similarity of query and document
        embeddings during training. Options: `dot_product` (default) or `cosine`.
    :param progress_bar: Whether to show a tqdm progress bar or not.
        Can be helpful to disable in production deployments to keep the logs clean.
    :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
        If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value
        range are scaled to a range of [0,1], where 1 means extremely relevant.
        Otherwise raw similarity scores (for example, cosine or dot_product) are used.
    """
    super().__init__()

    self.similarity_function = similarity_function
    self.progress_bar = progress_bar
    self.top_k = top_k
    self.scale_score = scale_score
    self.query_type = query_type

    self.document_embedder = MultiModalEmbedder(
        embedding_models=document_embedding_models,
        feature_extractors_params=document_feature_extractors_params,
        batch_size=batch_size,
        embed_meta_fields=embed_meta_fields,
        progress_bar=progress_bar,
    )

    # # Try to reuse the same embedder for queries if there is overlap
    if document_embedding_models.get(query_type, None) == query_embedding_model:
        self.query_embedder = self.document_embedder
    else:
        self.query_embedder = MultiModalEmbedder(
            embedding_models={query_type: query_embedding_model},
            feature_extractors_params={query_type: query_feature_extractor_params},
            batch_size=batch_size,
            embed_meta_fields=embed_meta_fields,
            progress_bar=progress_bar,
        )

    self.document_store = document_store

retrieve

retrieve(query: Any, query_type: Optional[ContentTypes] = None, filters: Optional[FilterType] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None) -> List[Document]

Scan through documents in DocumentStore and return a small number of documents that are most relevant to the supplied query. Returns a list of Documents.

Parameters:

Name Type Description Default
query Any

Query value. It might be text, a path, a table, and so on.

required
query_type Optional[ContentTypes]

Type of the query ("text", "table", "image" and so on).

None
filters Optional[FilterType]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. It can be a single filter applied to each query or a list of filters (one filter per query).

None
top_k Optional[int]

How many documents to return per query. Must be > 0.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents.

None
batch_size

Number of queries to embed at a time. Must be > 0.

required
scale_score Optional[bool]

Whether to scale the similarity score to the unit interval (range of [0,1]). If true, similarity scores (for example, cosine or dot_product) which naturally have a different value range is scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (for example, cosine or dot_product) are used.

None
Source code in pipelines/pipelines/nodes/retriever/multimodal_retriever.py
def retrieve(  # type: ignore
    self,
    query: Any,
    query_type: Optional[ContentTypes] = None,
    filters: Optional[FilterType] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    scale_score: Optional[bool] = None,
    document_store: Optional[BaseDocumentStore] = None,
) -> List[Document]:
    """
    Scan through documents in DocumentStore and return a small number of documents that are most relevant to the
    supplied query. Returns a list of Documents.
    :param query: Query value. It might be text, a path, a table, and so on.
    :param query_type: Type of the query ("text", "table", "image" and so on).
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions. It can be a single filter applied to each query or a list of filters
                    (one filter per query).
    :param top_k: How many documents to return per query. Must be > 0.
    :param index: The name of the index in the DocumentStore from which to retrieve documents.
    :param batch_size: Number of queries to embed at a time. Must be > 0.
    :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                        If true, similarity scores (for example, cosine or dot_product) which naturally have a different
                        value range is scaled to a range of [0,1], where 1 means extremely relevant.
                        Otherwise raw similarity scores (for example, cosine or dot_product) are used.
    """
    if query_type is None:
        query_type = self.query_type
    return self.retrieve_batch(
        queries=[query],
        queries_type=query_type,
        filters=[filters],
        top_k=top_k,
        index=index,
        headers=headers,
        batch_size=1,
        scale_score=scale_score,
        document_store=document_store,
    )[0]

retrieve_batch

retrieve_batch(queries: List[Any], queries_type: Optional[ContentTypes] = None, filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, batch_size: Optional[int] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None) -> List[List[Document]]

Scan through documents in DocumentStore and return a small number of documents that are most relevant to the supplied queries. Returns a list of lists of Documents (one list per query). This method assumes all queries are of the same data type. Mixed-type query batches (for example one image and one text) are currently not supported. Group the queries by type and call retrieve() on uniform batches only.

Parameters:

Name Type Description Default
queries List[Any]

List of query values. They might be text, paths, tables, and so on.

required
queries_type Optional[ContentTypes]

Type of the query ("text", "table", "image" and so on)

None
filters Optional[Union[FilterType, List[Optional[FilterType]]]]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. It can be a single filter that will be applied to each query or a list of filters (one filter per query).

None
top_k Optional[int]

How many documents to return per query. Must be > 0.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents.

None
batch_size Optional[int]

Number of queries to embed at a time. Must be > 0.

None
scale_score Optional[bool]

Whether to scale the similarity score to the unit interval (range of [0,1]). If True, similarity scores (for example, cosine or dot_product) which naturally have a different value range are scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (for example, cosine or dot_product) are used.

None
Source code in pipelines/pipelines/nodes/retriever/multimodal_retriever.py
def retrieve_batch(  # type: ignore
    self,
    queries: List[Any],
    queries_type: Optional[ContentTypes] = None,
    filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    batch_size: Optional[int] = None,
    scale_score: Optional[bool] = None,
    document_store: Optional[BaseDocumentStore] = None,
) -> List[List[Document]]:
    """
    Scan through documents in DocumentStore and return a small number of documents that are most relevant to the
    supplied queries. Returns a list of lists of Documents (one list per query).
    This method assumes all queries are of the same data type. Mixed-type query batches (for example one image and one text)
    are currently not supported. Group the queries by type and call `retrieve()` on uniform batches only.
    :param queries: List of query values. They might be text, paths, tables, and so on.
    :param queries_type: Type of the query ("text", "table", "image" and so on)
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions. It can be a single filter that will be applied to each query or a list of filters
                    (one filter per query).
    :param top_k: How many documents to return per query. Must be > 0.
    :param index: The name of the index in the DocumentStore from which to retrieve documents.
    :param batch_size: Number of queries to embed at a time. Must be > 0.
    :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]).
                        If True, similarity scores (for example, cosine or dot_product) which naturally have a different
                        value range are scaled to a range of [0,1], where 1 means extremely relevant.
                        Otherwise raw similarity scores (for example, cosine or dot_product) are used.
    """
    top_k = top_k or self.top_k
    document_store = document_store or self.document_store
    if not document_store:
        raise ValueError(
            "This Retriever was not initialized with a Document Store. Provide one to the retrieve() or retrieve_batch() method."
        )
    index = index or document_store.index
    scale_score = scale_score or self.scale_score

    # Embed the queries - we need them into Document format to leverage MultiModalEmbedder.embed()
    query_docs = [Document(content=query, content_type=queries_type) for query in queries]
    query_embeddings = self.query_embedder.embed(documents=query_docs, batch_size=batch_size)
    # Query documents by embedding (the actual retrieval step)
    documents = document_store.query_by_embedding_batch(
        query_embs=query_embeddings,
        top_k=top_k,
        filters=filters,
        index=index,
        headers=headers,
    )
    return documents

pipelines.pipelines.nodes.retriever.parallel_retriever

ParallelRetriever

Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
class ParallelRetriever(BaseRetriever):
    def __init__(
        self,
        document_store: BaseDocumentStore,
        model_version: Optional[str] = None,
        output_emb_size: Optional[int] = None,
        reinitialize: bool = False,
        share_parameters: bool = False,
        max_seq_len_query: int = 64,
        max_seq_len_passage: int = 384,
        top_k: int = 10,
        use_gpu: bool = True,
        batch_size: int = 16,
        embed_title: bool = True,
        similarity_function: str = "dot_product",
        progress_bar: bool = True,
        mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
        url="0.0.0.0:8082",
        num_process=10,
        **kwargs
    ):
        """
        :param url: the port of the HTTP service
        :param num_process: the number of processes
        """
        self.set_config(
            document_store=document_store,
            model_version=model_version,
            max_seq_len_query=max_seq_len_query,
            max_seq_len_passage=max_seq_len_passage,
            top_k=top_k,
            use_gpu=use_gpu,
            batch_size=batch_size,
            embed_title=embed_title,
            reinitialize=reinitialize,
            share_parameters=share_parameters,
            output_emb_size=output_emb_size,
            similarity_function=similarity_function,
            progress_bar=progress_bar,
        )

        self.document_store = document_store
        self.batch_size = batch_size
        self.progress_bar = progress_bar
        self.top_k = top_k
        self.embed_title = embed_title
        self.mode = mode
        self.url = url
        self.num_process = num_process
        self.model_name = kwargs.get("model_name", "m3e")

        if document_store is None:
            logger.warning("DensePassageRetriever initialized without a document store. ")
        elif document_store.similarity != "dot_product":
            logger.warning(
                f"You are using a Dense Passage Retriever model with the {document_store.similarity} function. "
                "We recommend you use dot_product instead. "
                "This can be set when initializing the DocumentStore"
            )

    def retrieve(
        self,
        query: str,
        query_type: Optional[ContentTypes] = None,
        filters: dict = None,
        top_k: Optional[int] = None,
        index: str = None,
        headers: Optional[Dict[str, str]] = None,
        **kwargs,
    ) -> List[Document]:
        """
        Scan through documents in DocumentStore and return a small number documents
        that are most relevant to the query.

        :param query: The query
        :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        """
        if top_k is None:
            top_k = self.top_k
        if not self.document_store:
            logger.error("Cannot perform retrieve() since DensePassageRetriever initialized with document_store=None")
            return []
        if index is None:
            index = self.document_store.index

        query_emb = self.embed_queries(texts=[query], **kwargs)
        documents = self.document_store.query_by_embedding(
            query_emb=query_emb[0], top_k=top_k, filters=filters, index=index, headers=headers, return_embedding=False
        )
        return documents

    def retrieve_batch(
        self,
        queries: List[str],
        queries_type: Optional[ContentTypes] = None,
        filters: Optional[
            Union[
                Dict[str, Union[Dict, List, str, int, float, bool]],
                List[Dict[str, Union[Dict, List, str, int, float, bool]]],
            ]
        ] = None,
        top_k: Optional[int] = None,
        index: str = None,
        headers: Optional[Dict[str, str]] = None,
        batch_size: Optional[int] = None,
        scale_score: bool = None,
        **kwargs,
    ) -> List[List[Document]]:
        if top_k is None:
            top_k = self.top_k
        if batch_size is None:
            batch_size = self.batch_size

        if isinstance(filters, list):
            if len(filters) != len(queries):
                raise Exception(
                    "Number of filters does not match number of queries. Please provide as many filters"
                    " as queries or a single filter that will be applied to each query."
                )
        else:
            filters = [filters] * len(queries) if filters is not None else [{}] * len(queries)
        if index is None:
            index = self.document_store.index
        if not self.document_store:
            logger.error(
                "Cannot perform retrieve_batch() since DensePassageRetriever initialized with document_store=None"
            )
            return [[] * len(queries)]  # type: ignore
        documents = []
        query_embs: List[np.ndarray] = []
        for batch in self._get_batches(queries=queries, batch_size=batch_size):
            query_embs.extend(self.embed_queries(texts=batch, **kwargs))
        for query_emb, cur_filters in tqdm(
            zip(query_embs, filters), total=len(query_embs), disable=not self.progress_bar, desc="Querying"
        ):
            cur_docs = self.document_store.query_by_embedding(
                query_emb=query_emb,
                top_k=top_k,
                filters=cur_filters,
                index=index,
                headers=headers,
                return_embedding=False,
            )
            documents.append(cur_docs)

        return documents

    def run_indexing(self, documents: List[dict], **kwargs):
        time1 = time.time()
        documents_list = embeddings_multi_doc(
            documents,
            batch_size=self.batch_size,
            num_process=self.num_process,
            url=self.url,
            model_name=self.model_name,
        )
        documents = []
        for i in documents_list:
            documents.extend(i)
        output = {"documents": documents}
        time2 = time.time()
        logger.info(f"The time cost of create docs: {time2-time1:.3f} s")
        return output, "output_1"

    def embed_queries(self, texts: List[str], **kwargs) -> List[np.ndarray]:
        return run_main_query(texts, self.url)

    def embed_documents(self, docs: List[Document], **kwargs):
        return run_main_query([d.content for d in docs], self.url)

__init__

__init__(document_store: BaseDocumentStore, model_version: Optional[str] = None, output_emb_size: Optional[int] = None, reinitialize: bool = False, share_parameters: bool = False, max_seq_len_query: int = 64, max_seq_len_passage: int = 384, top_k: int = 10, use_gpu: bool = True, batch_size: int = 16, embed_title: bool = True, similarity_function: str = 'dot_product', progress_bar: bool = True, mode: Literal['snippets', 'raw_documents', 'preprocessed_documents'] = 'preprocessed_documents', url='0.0.0.0:8082', num_process=10, **kwargs)

Parameters:

Name Type Description Default
url

the port of the HTTP service

'0.0.0.0:8082'
num_process

the number of processes

10
Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
def __init__(
    self,
    document_store: BaseDocumentStore,
    model_version: Optional[str] = None,
    output_emb_size: Optional[int] = None,
    reinitialize: bool = False,
    share_parameters: bool = False,
    max_seq_len_query: int = 64,
    max_seq_len_passage: int = 384,
    top_k: int = 10,
    use_gpu: bool = True,
    batch_size: int = 16,
    embed_title: bool = True,
    similarity_function: str = "dot_product",
    progress_bar: bool = True,
    mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "preprocessed_documents",
    url="0.0.0.0:8082",
    num_process=10,
    **kwargs
):
    """
    :param url: the port of the HTTP service
    :param num_process: the number of processes
    """
    self.set_config(
        document_store=document_store,
        model_version=model_version,
        max_seq_len_query=max_seq_len_query,
        max_seq_len_passage=max_seq_len_passage,
        top_k=top_k,
        use_gpu=use_gpu,
        batch_size=batch_size,
        embed_title=embed_title,
        reinitialize=reinitialize,
        share_parameters=share_parameters,
        output_emb_size=output_emb_size,
        similarity_function=similarity_function,
        progress_bar=progress_bar,
    )

    self.document_store = document_store
    self.batch_size = batch_size
    self.progress_bar = progress_bar
    self.top_k = top_k
    self.embed_title = embed_title
    self.mode = mode
    self.url = url
    self.num_process = num_process
    self.model_name = kwargs.get("model_name", "m3e")

    if document_store is None:
        logger.warning("DensePassageRetriever initialized without a document store. ")
    elif document_store.similarity != "dot_product":
        logger.warning(
            f"You are using a Dense Passage Retriever model with the {document_store.similarity} function. "
            "We recommend you use dot_product instead. "
            "This can be set when initializing the DocumentStore"
        )

retrieve

retrieve(query: str, query_type: Optional[ContentTypes] = None, filters: dict = None, top_k: Optional[int] = None, index: str = None, headers: Optional[Dict[str, str]] = None, **kwargs) -> List[Document]

Scan through documents in DocumentStore and return a small number documents that are most relevant to the query.

Parameters:

Name Type Description Default
query str

The query

required
filters dict

A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field

None
top_k Optional[int]

How many documents to return per query.

None
index str

The name of the index in the DocumentStore from which to retrieve documents

None
Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
def retrieve(
    self,
    query: str,
    query_type: Optional[ContentTypes] = None,
    filters: dict = None,
    top_k: Optional[int] = None,
    index: str = None,
    headers: Optional[Dict[str, str]] = None,
    **kwargs,
) -> List[Document]:
    """
    Scan through documents in DocumentStore and return a small number documents
    that are most relevant to the query.

    :param query: The query
    :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    """
    if top_k is None:
        top_k = self.top_k
    if not self.document_store:
        logger.error("Cannot perform retrieve() since DensePassageRetriever initialized with document_store=None")
        return []
    if index is None:
        index = self.document_store.index

    query_emb = self.embed_queries(texts=[query], **kwargs)
    documents = self.document_store.query_by_embedding(
        query_emb=query_emb[0], top_k=top_k, filters=filters, index=index, headers=headers, return_embedding=False
    )
    return documents

TritonRunner

Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
class TritonRunner:
    DEFAULT_MAX_RESP_WAIT_S = 120

    def __init__(
        self, server_url: str, model_name: str, model_version: str, verbose=False, resp_wait_s: Optional[float] = None
    ):
        """
        :param server_url: The port of server
        :param model_name: The model name needs to match the name in config.txt
        :param model_version: Model version number
        :param resp_wait_s: the response waiting time
        """
        self._server_url = server_url
        self._model_name = model_name
        self._model_version = model_version
        self._verbose = verbose
        self._response_wait_t = self.DEFAULT_MAX_RESP_WAIT_S if resp_wait_s is None else resp_wait_s

        self._client = InferenceServerClient(self._server_url, verbose=self._verbose)
        error = self._verify_triton_state(self._client)
        if error:
            raise RuntimeError(f"Could not communicate to Triton Server: {error}")
        model_metadata = self._client.get_model_metadata(self._model_name, self._model_version)
        self._inputs = {tm["name"]: tm for tm in model_metadata["inputs"]}
        self._input_names = list(self._inputs)
        self._outputs = {tm["name"]: tm for tm in model_metadata["outputs"]}
        self._output_names = list(self._outputs)
        self._outputs_req = [InferRequestedOutput(name) for name in self._outputs]

    def Run_docs(self, documents, embed_title=False):
        documents = deepcopy(documents)
        docs = [Document.from_dict(doc) for doc in documents]
        dicts = [
            {
                "passages": [
                    {
                        "title": d.meta["name"] if d.meta and "name" in d.meta else "",
                        "text": d.content,
                        "label": d.meta["label"] if d.meta and "label" in d.meta else "positive",
                        "external_id": d.id,
                    }
                ]
            }
            for d in docs
        ]
        datasets = []
        for passages in dicts:
            for item in passages["passages"]:
                if embed_title:
                    datasets.append(item["title"] + item["text"])
                else:
                    datasets.append(item["text"])
        infer_inputs = []
        for idx, data in enumerate([datasets]):
            data = np.array([[x.encode("utf-8")] for x in data], dtype=np.object_)
            infer_input = InferInput(self._input_names[idx], [len(data), 1], "BYTES")
            infer_input.set_data_from_numpy(data)
            infer_inputs.append(infer_input)
        try:
            results = self._client.infer(
                model_name=self._model_name,
                model_version=self._model_version,
                inputs=infer_inputs,
                outputs=self._outputs_req,
            )
        except Exception as e:
            logger.error("InferenceServerClient infer error {}".format(e))
        results = {name: results.as_numpy(name) for name in self._output_names}
        for doc, emb in zip(documents, results["embedding"]):
            doc["embedding"] = emb
        return documents

    def Run_query(self, query):
        """
        Args:
            inputs: list, Each value corresponds to an input name of self._input_names
        Returns:
            results: dict, {name : numpy.array}
        """
        infer_inputs = []
        for idx, data in enumerate([query]):
            data = np.array([[x.encode("utf-8")] for x in data], dtype=np.object_)
            infer_input = InferInput(self._input_names[idx], [len(data), 1], "BYTES")
            infer_input.set_data_from_numpy(data)
            infer_inputs.append(infer_input)
        try:
            results = self._client.infer(
                model_name=self._model_name,
                model_version=self._model_version,
                inputs=infer_inputs,
                outputs=self._outputs_req,
            )
        except Exception as e:
            logger.error("InferenceServerClient infer error {}".format(e))
        results = {name: results.as_numpy(name) for name in self._output_names}
        return results["embedding"]

    def _verify_triton_state(self, triton_client):
        if not triton_client.is_server_live():
            return f"Triton server {self._server_url} is not live"
        elif not triton_client.is_server_ready():
            return f"Triton server {self._server_url} is not ready"
        elif not triton_client.is_model_ready(self._model_name, self._model_version):
            return f"Model {self._model_name}:{self._model_version} is not ready"
        return None

Run_query

Run_query(query)

Args: inputs: list, Each value corresponds to an input name of self._input_names Returns: results: dict, {name : numpy.array}

Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
def Run_query(self, query):
    """
    Args:
        inputs: list, Each value corresponds to an input name of self._input_names
    Returns:
        results: dict, {name : numpy.array}
    """
    infer_inputs = []
    for idx, data in enumerate([query]):
        data = np.array([[x.encode("utf-8")] for x in data], dtype=np.object_)
        infer_input = InferInput(self._input_names[idx], [len(data), 1], "BYTES")
        infer_input.set_data_from_numpy(data)
        infer_inputs.append(infer_input)
    try:
        results = self._client.infer(
            model_name=self._model_name,
            model_version=self._model_version,
            inputs=infer_inputs,
            outputs=self._outputs_req,
        )
    except Exception as e:
        logger.error("InferenceServerClient infer error {}".format(e))
    results = {name: results.as_numpy(name) for name in self._output_names}
    return results["embedding"]

__init__

__init__(server_url: str, model_name: str, model_version: str, verbose=False, resp_wait_s: Optional[float] = None)

Parameters:

Name Type Description Default
server_url str

The port of server

required
model_name str

The model name needs to match the name in config.txt

required
model_version str

Model version number

required
resp_wait_s Optional[float]

the response waiting time

None
Source code in pipelines/pipelines/nodes/retriever/parallel_retriever.py
def __init__(
    self, server_url: str, model_name: str, model_version: str, verbose=False, resp_wait_s: Optional[float] = None
):
    """
    :param server_url: The port of server
    :param model_name: The model name needs to match the name in config.txt
    :param model_version: Model version number
    :param resp_wait_s: the response waiting time
    """
    self._server_url = server_url
    self._model_name = model_name
    self._model_version = model_version
    self._verbose = verbose
    self._response_wait_t = self.DEFAULT_MAX_RESP_WAIT_S if resp_wait_s is None else resp_wait_s

    self._client = InferenceServerClient(self._server_url, verbose=self._verbose)
    error = self._verify_triton_state(self._client)
    if error:
        raise RuntimeError(f"Could not communicate to Triton Server: {error}")
    model_metadata = self._client.get_model_metadata(self._model_name, self._model_version)
    self._inputs = {tm["name"]: tm for tm in model_metadata["inputs"]}
    self._input_names = list(self._inputs)
    self._outputs = {tm["name"]: tm for tm in model_metadata["outputs"]}
    self._output_names = list(self._outputs)
    self._outputs_req = [InferRequestedOutput(name) for name in self._outputs]

pipelines.pipelines.nodes.retriever.sparse

BM25Retriever

Source code in pipelines/pipelines/nodes/retriever/sparse.py
class BM25Retriever(BaseRetriever):
    def __init__(
        self,
        document_store: Optional[KeywordDocumentStore] = None,
        top_k: int = 10,
        all_terms_must_match: bool = False,
        custom_query: Optional[str] = None,
    ):
        """
        :param document_store: an instance of one of the following DocumentStores to retrieve from: ElasticsearchDocumentStore, OpenSearchDocumentStore and OpenDistroElasticsearchDocumentStore.
            If None, a document store must be passed to the retrieve method for this Retriever to work.
        :param all_terms_must_match: Whether all terms of the query must match the document.
                                     If true all query terms must be present in a document in order to be retrieved (i.e the AND operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy AND fish AND restaurant").
                                     Otherwise at least one query term must be present in a document in order to be retrieved (i.e the OR operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy OR fish OR restaurant").
                                     Defaults to False.
        :param custom_query: query string as per Elasticsearch DSL with a mandatory query placeholder(query).
                             Optionally, ES `filter` clause can be added where the values of `terms` are placeholders
                             that get substituted during runtime. The placeholder(${filter_name_1}, ${filter_name_2}..)
                             names must match with the filters dict supplied in self.retrieve().
                             ::
                                 **An example custom_query:**
                                 ```python
                                |    {
                                |        "size": 10,
                                |        "query": {
                                |            "bool": {
                                |                "should": [{"multi_match": {
                                |                    "query": ${query},                 // mandatory query placeholder
                                |                    "type": "most_fields",
                                |                    "fields": ["content", "title"]}}],
                                |                "filter": [                                 // optional custom filters
                                |                    {"terms": {"year": ${years}}},
                                |                    {"terms": {"quarter": ${quarters}}},
                                |                    {"range": {"date": {"gte": ${date}}}}
                                |                    ],
                                |            }
                                |        },
                                |    }
                                 ```
                             **For this custom_query, a sample retrieve() could be:**
                             ```python
                            |    self.retrieve(query="Why did the revenue increase?",
                            |                  filters={"years": ["2019"], "quarters": ["Q1", "Q2"]})
                            ```
                             Optionally, highlighting can be defined by specifying Elasticsearch's highlight settings.
                             See https://www.elastic.co/guide/en/elasticsearch/reference/current/highlighting.html.
                             You will find the highlighted output in the returned Document's meta field by key "highlighted".
                             ::
                                 **Example custom_query with highlighting:**
                                 ```python
                                |    {
                                |        "size": 10,
                                |        "query": {
                                |            "bool": {
                                |                "should": [{"multi_match": {
                                |                    "query": ${query},                 // mandatory query placeholder
                                |                    "type": "most_fields",
                                |                    "fields": ["content", "title"]}}],
                                |            }
                                |        },
                                |        "highlight": {             // enable highlighting
                                |            "fields": {            // for fields content and title
                                |                "content": {},
                                |                "title": {}
                                |            }
                                |        },
                                |    }
                                 ```
                                 **For this custom_query, highlighting info can be accessed by:**
                                ```python
                                |    docs = self.retrieve(query="Why did the revenue increase?")
                                |    highlighted_content = docs[0].meta["highlighted"]["content"]
                                |    highlighted_title = docs[0].meta["highlighted"]["title"]
                                ```
        :param top_k: How many documents to return per query.
        """
        super().__init__()
        self.document_store: Optional[KeywordDocumentStore] = document_store
        self.top_k = top_k
        self.custom_query = custom_query
        self.all_terms_must_match = all_terms_must_match

    def retrieve(
        self,
        query: str,
        query_type: ContentTypes = "text",
        filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        document_store: Optional[BaseDocumentStore] = None,
        **kwargs
    ) -> List[Document]:
        """
        Scan through documents in DocumentStore and return a small number documents
        that are most relevant to the query.
        :param query: The query
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions.
                        Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                        operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                        `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                        Logical operator keys take a dictionary of metadata field names and/or logical operators as
                        value. Metadata field names take a dictionary of comparison operators as value. Comparison
                        operator keys take a single value or (in case of `"$in"`) a list of values as value.
                        If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                        operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                        operation.
                            __Example__:
                            ```python
                            filters = {
                                "$and": {
                                    "type": {"$eq": "article"},
                                    "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                    "rating": {"$gte": 3},
                                    "$or": {
                                        "genre": {"$in": ["economy", "politics"]},
                                        "publisher": {"$eq": "nytimes"}
                                    }
                                }
                            }
                            # or simpler using default operators
                            filters = {
                                "type": "article",
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": ["economy", "politics"],
                                    "publisher": "nytimes"
                                }
                            }
                            ```
                            To use the same logical operator multiple times on the same level, logical operators take
                            optionally a list of dictionaries as value.
                            __Example__:
                            ```python
                            filters = {
                                "$or": [
                                    {
                                        "$and": {
                                            "Type": "News Paper",
                                            "Date": {
                                                "$lt": "2019-01-01"
                                            }
                                        }
                                    },
                                    {
                                        "$and": {
                                            "Type": "Blog Post",
                                            "Date": {
                                                "$gte": "2019-01-01"
                                            }
                                        }
                                    }
                                ]
                            }
                            ```
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='})
                Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.
        :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
        """
        document_store = document_store or self.document_store
        if document_store is None:
            raise ValueError(
                "This Retriever was not initialized with a Document Store. Provide one to the retrieve() method."
            )
        if not isinstance(document_store, KeywordDocumentStore):
            raise ValueError("document_store must be a subclass of KeywordDocumentStore.")

        if top_k is None:
            top_k = self.top_k
        if index is None:
            index = document_store.index

        documents = document_store.query(
            query=query,
            filters=filters,
            top_k=top_k,
            all_terms_must_match=self.all_terms_must_match,
            custom_query=self.custom_query,
            index=index,
            headers=headers,
        )
        return documents

    def retrieve_batch(
        self,
        queries: List[str],
        queries_type: ContentTypes = "text",
        filters: Optional[
            Union[
                Dict[str, Union[Dict, List, str, int, float, bool]],
                List[Dict[str, Union[Dict, List, str, int, float, bool]]],
            ]
        ] = None,
        top_k: Optional[int] = None,
        index: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        batch_size: Optional[int] = None,
        document_store: Optional[BaseDocumentStore] = None,
    ) -> List[List[Document]]:
        """
        Scan through documents in DocumentStore and return a small number documents
        that are most relevant to the supplied queries.
        Returns a list of lists of Documents (one per query).
        :param queries: List of query strings.
        :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                        conditions.
                        Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                        operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                        `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                        Logical operator keys take a dictionary of metadata field names and/or logical operators as
                        value. Metadata field names take a dictionary of comparison operators as value. Comparison
                        operator keys take a single value or (in case of `"$in"`) a list of values as value.
                        If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                        operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                        operation.
                            __Example__:
                            ```python
                            filters = {
                                "$and": {
                                    "type": {"$eq": "article"},
                                    "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                    "rating": {"$gte": 3},
                                    "$or": {
                                        "genre": {"$in": ["economy", "politics"]},
                                        "publisher": {"$eq": "nytimes"}
                                    }
                                }
                            }
                            # or simpler using default operators
                            filters = {
                                "type": "article",
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": ["economy", "politics"],
                                    "publisher": "nytimes"
                                }
                            }
                            ```
                            To use the same logical operator multiple times on the same level, logical operators take
                            optionally a list of dictionaries as value.
                            __Example__:
                            ```python
                            filters = {
                                "$or": [
                                    {
                                        "$and": {
                                            "Type": "News Paper",
                                            "Date": {
                                                "$lt": "2019-01-01"
                                            }
                                        }
                                    },
                                    {
                                        "$and": {
                                            "Type": "Blog Post",
                                            "Date": {
                                                "$gte": "2019-01-01"
                                            }
                                        }
                                    }
                                ]
                            }
                            ```
        :param top_k: How many documents to return per query.
        :param index: The name of the index in the DocumentStore from which to retrieve documents
        :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='})
                Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.
        :param batch_size: Not applicable.
        :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
        """
        document_store = document_store or self.document_store
        if document_store is None:
            raise ValueError(
                "This Retriever was not initialized with a Document Store. Provide one to the retrieve_batch() method."
            )
        if not isinstance(document_store, KeywordDocumentStore):
            raise ValueError("document_store must be a subclass of KeywordDocumentStore.")

        if top_k is None:
            top_k = self.top_k
        if index is None:
            index = document_store.index

        documents = document_store.query_batch(
            queries=queries,
            filters=filters,
            top_k=top_k,
            all_terms_must_match=self.all_terms_must_match,
            custom_query=self.custom_query,
            index=index,
            headers=headers,
        )
        return documents

__init__

__init__(document_store: Optional[KeywordDocumentStore] = None, top_k: int = 10, all_terms_must_match: bool = False, custom_query: Optional[str] = None)

Parameters:

Name Type Description Default
document_store Optional[KeywordDocumentStore]

an instance of one of the following DocumentStores to retrieve from: ElasticsearchDocumentStore, OpenSearchDocumentStore and OpenDistroElasticsearchDocumentStore. If None, a document store must be passed to the retrieve method for this Retriever to work.

None
all_terms_must_match bool

Whether all terms of the query must match the document. If true all query terms must be present in a document in order to be retrieved (i.e the AND operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy AND fish AND restaurant"). Otherwise at least one query term must be present in a document in order to be retrieved (i.e the OR operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy OR fish OR restaurant"). Defaults to False.

False
custom_query Optional[str]

query string as per Elasticsearch DSL with a mandatory query placeholder(query). Optionally, ES filter clause can be added where the values of terms are placeholders that get substituted during runtime. The placeholder(${filter_name_1}, ${filter_name_2}..) names must match with the filters dict supplied in self.retrieve(). :: An example custom_query: python | { | "size": 10, | "query": { | "bool": { | "should": [{"multi_match": { | "query": ${query}, // mandatory query placeholder | "type": "most_fields", | "fields": ["content", "title"]}}], | "filter": [ // optional custom filters | {"terms": {"year": ${years}}}, | {"terms": {"quarter": ${quarters}}}, | {"range": {"date": {"gte": ${date}}}} | ], | } | }, | } For this custom_query, a sample retrieve() could be: python | self.retrieve(query="Why did the revenue increase?", | filters={"years": ["2019"], "quarters": ["Q1", "Q2"]}) Optionally, highlighting can be defined by specifying Elasticsearch's highlight settings. See https://www.elastic.co/guide/en/elasticsearch/reference/current/highlighting.html. You will find the highlighted output in the returned Document's meta field by key "highlighted". :: Example custom_query with highlighting: python | { | "size": 10, | "query": { | "bool": { | "should": [{"multi_match": { | "query": ${query}, // mandatory query placeholder | "type": "most_fields", | "fields": ["content", "title"]}}], | } | }, | "highlight": { // enable highlighting | "fields": { // for fields content and title | "content": {}, | "title": {} | } | }, | } For this custom_query, highlighting info can be accessed by: python | docs = self.retrieve(query="Why did the revenue increase?") | highlighted_content = docs[0].meta["highlighted"]["content"] | highlighted_title = docs[0].meta["highlighted"]["title"]

None
top_k int

How many documents to return per query.

10
Source code in pipelines/pipelines/nodes/retriever/sparse.py
def __init__(
    self,
    document_store: Optional[KeywordDocumentStore] = None,
    top_k: int = 10,
    all_terms_must_match: bool = False,
    custom_query: Optional[str] = None,
):
    """
    :param document_store: an instance of one of the following DocumentStores to retrieve from: ElasticsearchDocumentStore, OpenSearchDocumentStore and OpenDistroElasticsearchDocumentStore.
        If None, a document store must be passed to the retrieve method for this Retriever to work.
    :param all_terms_must_match: Whether all terms of the query must match the document.
                                 If true all query terms must be present in a document in order to be retrieved (i.e the AND operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy AND fish AND restaurant").
                                 Otherwise at least one query term must be present in a document in order to be retrieved (i.e the OR operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy OR fish OR restaurant").
                                 Defaults to False.
    :param custom_query: query string as per Elasticsearch DSL with a mandatory query placeholder(query).
                         Optionally, ES `filter` clause can be added where the values of `terms` are placeholders
                         that get substituted during runtime. The placeholder(${filter_name_1}, ${filter_name_2}..)
                         names must match with the filters dict supplied in self.retrieve().
                         ::
                             **An example custom_query:**
                             ```python
                            |    {
                            |        "size": 10,
                            |        "query": {
                            |            "bool": {
                            |                "should": [{"multi_match": {
                            |                    "query": ${query},                 // mandatory query placeholder
                            |                    "type": "most_fields",
                            |                    "fields": ["content", "title"]}}],
                            |                "filter": [                                 // optional custom filters
                            |                    {"terms": {"year": ${years}}},
                            |                    {"terms": {"quarter": ${quarters}}},
                            |                    {"range": {"date": {"gte": ${date}}}}
                            |                    ],
                            |            }
                            |        },
                            |    }
                             ```
                         **For this custom_query, a sample retrieve() could be:**
                         ```python
                        |    self.retrieve(query="Why did the revenue increase?",
                        |                  filters={"years": ["2019"], "quarters": ["Q1", "Q2"]})
                        ```
                         Optionally, highlighting can be defined by specifying Elasticsearch's highlight settings.
                         See https://www.elastic.co/guide/en/elasticsearch/reference/current/highlighting.html.
                         You will find the highlighted output in the returned Document's meta field by key "highlighted".
                         ::
                             **Example custom_query with highlighting:**
                             ```python
                            |    {
                            |        "size": 10,
                            |        "query": {
                            |            "bool": {
                            |                "should": [{"multi_match": {
                            |                    "query": ${query},                 // mandatory query placeholder
                            |                    "type": "most_fields",
                            |                    "fields": ["content", "title"]}}],
                            |            }
                            |        },
                            |        "highlight": {             // enable highlighting
                            |            "fields": {            // for fields content and title
                            |                "content": {},
                            |                "title": {}
                            |            }
                            |        },
                            |    }
                             ```
                             **For this custom_query, highlighting info can be accessed by:**
                            ```python
                            |    docs = self.retrieve(query="Why did the revenue increase?")
                            |    highlighted_content = docs[0].meta["highlighted"]["content"]
                            |    highlighted_title = docs[0].meta["highlighted"]["title"]
                            ```
    :param top_k: How many documents to return per query.
    """
    super().__init__()
    self.document_store: Optional[KeywordDocumentStore] = document_store
    self.top_k = top_k
    self.custom_query = custom_query
    self.all_terms_must_match = all_terms_must_match

retrieve

retrieve(query: str, query_type: ContentTypes = 'text', filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, document_store: Optional[BaseDocumentStore] = None, **kwargs) -> List[Document]

Scan through documents in DocumentStore and return a small number documents that are most relevant to the query.

Parameters:

Name Type Description Default
query str

The query

required
filters Optional[Dict[str, Union[Dict, List, str, int, float, bool]]]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator ("$and", "$or", "$not"), a comparison operator ("$eq", "$in", "$gt", "$gte", "$lt", "$lte") or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of "$in") a list of values as value. If no logical operator is provided, "$and" is used as default operation. If no comparison operator is provided, "$eq" (or "$in" if the comparison value is a list) is used as default operation. Example: python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. Example: python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] }

None
top_k Optional[int]

How many documents to return per query.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents

None
headers Optional[Dict[str, str]]

Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='}) Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.

None
document_store Optional[BaseDocumentStore]

the docstore to use for retrieval. If None, the one given in the __init__ is used instead.

None
Source code in pipelines/pipelines/nodes/retriever/sparse.py
def retrieve(
    self,
    query: str,
    query_type: ContentTypes = "text",
    filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    document_store: Optional[BaseDocumentStore] = None,
    **kwargs
) -> List[Document]:
    """
    Scan through documents in DocumentStore and return a small number documents
    that are most relevant to the query.
    :param query: The query
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions.
                    Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                    operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                    `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                    Logical operator keys take a dictionary of metadata field names and/or logical operators as
                    value. Metadata field names take a dictionary of comparison operators as value. Comparison
                    operator keys take a single value or (in case of `"$in"`) a list of values as value.
                    If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                    operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                    operation.
                        __Example__:
                        ```python
                        filters = {
                            "$and": {
                                "type": {"$eq": "article"},
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": {"$in": ["economy", "politics"]},
                                    "publisher": {"$eq": "nytimes"}
                                }
                            }
                        }
                        # or simpler using default operators
                        filters = {
                            "type": "article",
                            "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                            "rating": {"$gte": 3},
                            "$or": {
                                "genre": ["economy", "politics"],
                                "publisher": "nytimes"
                            }
                        }
                        ```
                        To use the same logical operator multiple times on the same level, logical operators take
                        optionally a list of dictionaries as value.
                        __Example__:
                        ```python
                        filters = {
                            "$or": [
                                {
                                    "$and": {
                                        "Type": "News Paper",
                                        "Date": {
                                            "$lt": "2019-01-01"
                                        }
                                    }
                                },
                                {
                                    "$and": {
                                        "Type": "Blog Post",
                                        "Date": {
                                            "$gte": "2019-01-01"
                                        }
                                    }
                                }
                            ]
                        }
                        ```
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='})
            Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.
    :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
    """
    document_store = document_store or self.document_store
    if document_store is None:
        raise ValueError(
            "This Retriever was not initialized with a Document Store. Provide one to the retrieve() method."
        )
    if not isinstance(document_store, KeywordDocumentStore):
        raise ValueError("document_store must be a subclass of KeywordDocumentStore.")

    if top_k is None:
        top_k = self.top_k
    if index is None:
        index = document_store.index

    documents = document_store.query(
        query=query,
        filters=filters,
        top_k=top_k,
        all_terms_must_match=self.all_terms_must_match,
        custom_query=self.custom_query,
        index=index,
        headers=headers,
    )
    return documents

retrieve_batch

retrieve_batch(queries: List[str], queries_type: ContentTypes = 'text', filters: Optional[Union[Dict[str, Union[Dict, List, str, int, float, bool]], List[Dict[str, Union[Dict, List, str, int, float, bool]]]]] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, batch_size: Optional[int] = None, document_store: Optional[BaseDocumentStore] = None) -> List[List[Document]]

Scan through documents in DocumentStore and return a small number documents that are most relevant to the supplied queries. Returns a list of lists of Documents (one per query).

Parameters:

Name Type Description Default
queries List[str]

List of query strings.

required
filters Optional[Union[Dict[str, Union[Dict, List, str, int, float, bool]], List[Dict[str, Union[Dict, List, str, int, float, bool]]]]]

Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator ("$and", "$or", "$not"), a comparison operator ("$eq", "$in", "$gt", "$gte", "$lt", "$lte") or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of "$in") a list of values as value. If no logical operator is provided, "$and" is used as default operation. If no comparison operator is provided, "$eq" (or "$in" if the comparison value is a list) is used as default operation. Example: python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. Example: python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] }

None
top_k Optional[int]

How many documents to return per query.

None
index Optional[str]

The name of the index in the DocumentStore from which to retrieve documents

None
headers Optional[Dict[str, str]]

Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='}) Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.

None
batch_size Optional[int]

Not applicable.

None
document_store Optional[BaseDocumentStore]

the docstore to use for retrieval. If None, the one given in the __init__ is used instead.

None
Source code in pipelines/pipelines/nodes/retriever/sparse.py
def retrieve_batch(
    self,
    queries: List[str],
    queries_type: ContentTypes = "text",
    filters: Optional[
        Union[
            Dict[str, Union[Dict, List, str, int, float, bool]],
            List[Dict[str, Union[Dict, List, str, int, float, bool]]],
        ]
    ] = None,
    top_k: Optional[int] = None,
    index: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    batch_size: Optional[int] = None,
    document_store: Optional[BaseDocumentStore] = None,
) -> List[List[Document]]:
    """
    Scan through documents in DocumentStore and return a small number documents
    that are most relevant to the supplied queries.
    Returns a list of lists of Documents (one per query).
    :param queries: List of query strings.
    :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain
                    conditions.
                    Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
                    operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`,
                    `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name.
                    Logical operator keys take a dictionary of metadata field names and/or logical operators as
                    value. Metadata field names take a dictionary of comparison operators as value. Comparison
                    operator keys take a single value or (in case of `"$in"`) a list of values as value.
                    If no logical operator is provided, `"$and"` is used as default operation. If no comparison
                    operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
                    operation.
                        __Example__:
                        ```python
                        filters = {
                            "$and": {
                                "type": {"$eq": "article"},
                                "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                                "rating": {"$gte": 3},
                                "$or": {
                                    "genre": {"$in": ["economy", "politics"]},
                                    "publisher": {"$eq": "nytimes"}
                                }
                            }
                        }
                        # or simpler using default operators
                        filters = {
                            "type": "article",
                            "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
                            "rating": {"$gte": 3},
                            "$or": {
                                "genre": ["economy", "politics"],
                                "publisher": "nytimes"
                            }
                        }
                        ```
                        To use the same logical operator multiple times on the same level, logical operators take
                        optionally a list of dictionaries as value.
                        __Example__:
                        ```python
                        filters = {
                            "$or": [
                                {
                                    "$and": {
                                        "Type": "News Paper",
                                        "Date": {
                                            "$lt": "2019-01-01"
                                        }
                                    }
                                },
                                {
                                    "$and": {
                                        "Type": "Blog Post",
                                        "Date": {
                                            "$gte": "2019-01-01"
                                        }
                                    }
                                }
                            ]
                        }
                        ```
    :param top_k: How many documents to return per query.
    :param index: The name of the index in the DocumentStore from which to retrieve documents
    :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='})
            Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.
    :param batch_size: Not applicable.
    :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead.
    """
    document_store = document_store or self.document_store
    if document_store is None:
        raise ValueError(
            "This Retriever was not initialized with a Document Store. Provide one to the retrieve_batch() method."
        )
    if not isinstance(document_store, KeywordDocumentStore):
        raise ValueError("document_store must be a subclass of KeywordDocumentStore.")

    if top_k is None:
        top_k = self.top_k
    if index is None:
        index = document_store.index

    documents = document_store.query_batch(
        queries=queries,
        filters=filters,
        top_k=top_k,
        all_terms_must_match=self.all_terms_must_match,
        custom_query=self.custom_query,
        index=index,
        headers=headers,
    )
    return documents

pipelines.pipelines.nodes.retriever.web

WebRetriever

WebRetriever makes it possible to query the web for relevant documents. It downloads web page results returned by WebSearch, strips HTML, and extracts raw text, which is then split into smaller documents using the optional PreProcessor.

WebRetriever operates in two modes:

  • snippets mode: WebRetriever returns a list of Documents. Each Document is a snippet of the search result.
  • raw_documents mode: WebRetriever returns a list of Documents. Each Document is a full website returned by the search, stripped of HTML.
  • preprocessed_documents mode: WebRetriever return a list of Documents. Each Document is a preprocessed split of the full website stripped of HTML.

In the preprocessed_documents mode, after WebSearch receives the query through the run() method, it fetches the top_k URLs relevant to the query. WebSearch then downloads and processes these URLs. The processing involves stripping HTML tags and producing a clean, raw text wrapped in the Document objects. WebRetriever then splits raw text into Documents according to the PreProcessor settings. Finally, WebRetriever returns the top_k preprocessed Documents.

Finding the right balance between top_k and top_p is crucial to obtain high-quality and diverse results in the document mode. To explore potential results, we recommend that you set top_k for WebSearch close to 10. However, keep in mind that setting a high top_k value results in fetching and processing numerous web pages and is heavier on the resources.

We recommend you use the default value for top_k and adjust it based on your specific use case. The default value is 5. This means WebRetriever returns at most five of the most relevant processed documents, ensuring the search results are diverse but still of high quality. To get more results, increase top_k.

Source code in pipelines/pipelines/nodes/retriever/web.py
class WebRetriever(BaseRetriever):
    """
    WebRetriever makes it possible to query the web for relevant documents. It downloads web page results returned by WebSearch, strips HTML, and extracts raw text, which is then
    split into smaller documents using the optional PreProcessor.

    WebRetriever operates in two modes:

    - snippets mode: WebRetriever returns a list of Documents. Each Document is a snippet of the search result.
    - raw_documents mode: WebRetriever returns a list of Documents. Each Document is a full website returned by the search, stripped of HTML.
    - preprocessed_documents mode: WebRetriever return a list of Documents. Each Document is a preprocessed split of the full website stripped of HTML.

    In the preprocessed_documents mode, after WebSearch receives the query through the `run()` method, it fetches the top_k URLs relevant to the query. WebSearch then downloads and processes these URLs.
    The processing involves stripping HTML tags and producing
    a clean, raw text wrapped in the Document objects. WebRetriever then splits raw text into Documents according to the PreProcessor settings.
    Finally, WebRetriever returns the top_k preprocessed Documents.

    Finding the right balance between top_k and top_p is crucial to obtain high-quality and diverse results in the document
    mode. To explore potential results, we recommend that you set top_k for WebSearch close to 10.
    However, keep in mind that setting a high top_k value results in fetching and processing numerous web pages and is heavier on the resources.

    We recommend you use the default value for top_k and adjust it based on your specific
    use case. The default value is 5. This means WebRetriever returns at most
    five of the most relevant processed documents, ensuring the search results are diverse but still of high
    quality. To get more results, increase top_k.
    """

    def __init__(
        self,
        api_key: str,
        search_engine_provider: Union[str, SearchEngine] = "SerpAPI",
        engine: Optional[str] = "google",
        top_search_results: Optional[int] = 10,
        search_engine_kwargs: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = 5,
        mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "snippets",
        preprocessor: Optional[PreProcessor] = None,
        cache_document_store: Optional[BaseDocumentStore] = None,
        cache_index: Optional[str] = None,
        cache_headers: Optional[Dict[str, str]] = None,
        cache_time: int = 1 * 24 * 60 * 60,
    ):
        """
        :param top_k: Top k documents to be returned by the retriever.
        :param mode: Whether to return snippets, raw documents, or preprocessed documents. Preprocessed documents are the default.
        :param preprocessor: Optional PreProcessor to be used to split documents into paragraphs. If not provided, the default PreProcessor is used.
        :param cache_document_store: DocumentStore to be used to cache search results.
        :param cache_index: Index name to be used to cache search results.
        :param cache_headers: Headers to be used to cache search results.
        :param cache_time: Time in seconds to cache search results. Defaults to 24 hours.
        """
        super().__init__()
        self.web_search = WebSearch(
            api_key=api_key,
            top_k=top_search_results,
            search_engine_provider=search_engine_provider,
            engine=engine,
            search_engine_kwargs=search_engine_kwargs,
        )
        self.mode = mode
        self.cache_document_store = cache_document_store
        self.cache_index = cache_index
        self.cache_headers = cache_headers
        self.cache_time = cache_time
        self.top_k = top_k
        if preprocessor is not None:
            self.preprocessor = preprocessor
        else:
            self.preprocessor = PreProcessor()

    def _normalize_query(self, query: str) -> str:
        return "".join([c for c in normalize("NFKD", query.lower()) if not combining(c)])

    def _check_cache(
        self,
        query: str,
        cache_index: Optional[str] = None,
        cache_headers: Optional[Dict[str, str]] = None,
        cache_time: Optional[int] = None,
    ) -> List[Document]:
        """Check documents retrieved based on the query in cache."""

        cache_document_store = self.cache_document_store

        documents = []

        if cache_document_store is not None:
            query_norm = self._normalize_query(query)
            cache_filter: FilterType = {"$and": {"search.query": query_norm}}

            if cache_time is not None and cache_time > 0:
                cache_filter["timestamp"] = {
                    "$gt": int((datetime.utcnow() - timedelta(seconds=cache_time)).timestamp())
                }
                logger.debug("Cache filter: %s", cache_filter)

            documents = cache_document_store.get_all_documents(
                filters=cache_filter, index=cache_index, headers=cache_headers, return_embedding=False
            )

        logger.debug("Found %d documents in cache", len(documents))

        return documents

    def _save_cache(
        self,
        query: str,
        documents: List[Document],
        cache_index: Optional[str] = None,
        cache_headers: Optional[Dict[str, str]] = None,
        cache_time: Optional[int] = None,
    ) -> bool:
        cache_document_store = self.cache_document_store

        if cache_document_store is not None:
            cache_document_store.write_documents(
                documents=documents, index=cache_index, headers=cache_headers, duplicate_documents="overwrite"
            )

            logger.debug("Saved %d documents in the cache", len(documents))

            cache_filter: FilterType = {"$and": {"search.query": query}}

            if cache_time is not None and cache_time > 0:
                cache_filter["timestamp"] = {
                    "$lt": int((datetime.utcnow() - timedelta(seconds=cache_time)).timestamp())
                }

                cache_document_store.delete_documents(index=cache_index, headers=cache_headers, filters=cache_filter)

                logger.debug("Deleted documents in the cache using filter: %s", cache_filter)

            return True

        return False

    def retrieve(  # type: ignore[override]
        self,
        query: str,
        top_k: Optional[int] = None,
        preprocessor: Optional[PreProcessor] = None,
        cache_document_store: Optional[BaseDocumentStore] = None,
        cache_index: Optional[str] = None,
        cache_headers: Optional[Dict[str, str]] = None,
        cache_time: Optional[int] = None,
        **kwargs,
    ) -> List[Document]:
        """
        Retrieve documents based on the list of URLs from the WebSearchEngine. The documents are scraped from the web
        at real-time. You can then store the documents in a DocumentStore for later use. You can cache them in a
        DocumentStore to improve retrieval time.
        :param query: The query string.
        :param top_k: The number of documents to be returned by the retriever. If None, the default value is used.
        :param preprocessor: The PreProcessor to be used to split documents into paragraphs.
        :param cache_document_store: The DocumentStore to cache the documents to.
        :param cache_index: The index name to save the documents to.
        :param cache_headers: The headers to save the documents to.
        :param cache_time: The time limit in seconds to check the cache. The default is 24 hours.
        """

        preprocessor = preprocessor or self.preprocessor
        cache_document_store = cache_document_store or self.cache_document_store
        cache_index = cache_index or self.cache_index
        cache_headers = cache_headers or self.cache_headers
        cache_time = cache_time or self.cache_time
        top_k = top_k or self.top_k

        query_norm = self._normalize_query(query)

        extracted_docs = self._check_cache(
            query_norm, cache_index=cache_index, cache_headers=cache_headers, cache_time=cache_time
        )

        # cache miss
        if not extracted_docs:
            search_results, _ = self.web_search.run(query=query)
            search_results = search_results["documents"]
            if self.mode == "snippets":
                return search_results  # type: ignore

            links: List[SearchResult] = [
                SearchResult(r.meta["link"], r.meta.get("score", None), r.meta.get("position", None))
                for r in search_results
                if r.meta.get("link")
            ]
            logger.debug("Starting to fetch %d links from WebSearch results", len(links))

            def scrape_direct(link: SearchResult) -> Dict[str, Any]:
                extractor = extractors.ArticleExtractor(raise_on_failure=False)
                try:
                    extracted_doc = {}
                    response = requests.get(link.url, headers=self._request_headers(), timeout=10)
                    if response.status_code == 200 and len(response.text) > 0:
                        extracted_content = extractor.get_content(response.text)
                        if extracted_content:
                            extracted_doc = {
                                "text": extracted_content,
                                "url": link.url,
                                "search.score": link.score,
                                "search.position": link.position,
                            }
                    return extracted_doc

                except Exception as e:
                    logger.error("Error retrieving URL %s: %s", link.url, e)
                    return {}

            thread_count = cpu_count() if len(links) > cpu_count() else len(links)
            with ThreadPoolExecutor(max_workers=thread_count) as executor:
                scraped_pages: Iterator[Dict[str, Any]] = executor.map(scrape_direct, links)

                failed = 0
                extracted_docs = []
                for scraped_page, search_result_doc in zip(scraped_pages, search_results):
                    if scraped_page and "text" in scraped_page:
                        document = self._document_from_scraped_page(search_result_doc, scraped_page, query_norm)
                        extracted_docs.append(document)
                    else:
                        logger.debug(
                            "Could not extract text from URL %s. Using search snippet.", search_result_doc.meta["link"]
                        )
                        snippet_doc = self._document_from_snippet(search_result_doc, query_norm)
                        extracted_docs.append(snippet_doc)
                        failed += 1

                logger.debug(
                    "Extracted %d documents / %s snippets from %s URLs.",
                    len(extracted_docs) - failed,
                    failed,
                    len(links),
                )

        if cache_document_store:
            cached = self._save_cache(query_norm, extracted_docs, cache_index=cache_index, cache_headers=cache_headers)
            if not cached:
                logger.warning(
                    "Could not save retrieved documents to the DocumentStore cache. "
                    "Check your document store configuration."
                )

        processed_docs = (
            [t for d in extracted_docs for t in preprocessor.process([d])]
            if self.mode == "preprocessed_documents"
            else extracted_docs
        )

        logger.debug("Processed %d documents resulting in %s documents", len(extracted_docs), len(processed_docs))
        return processed_docs[:top_k]

    def retrieve_batch(  # type: ignore[override]
        self,
        queries: List[str],
        top_p: Optional[int] = None,
        top_k: Optional[int] = None,
        preprocessor: Optional[PreProcessor] = None,
        cache_document_store: Optional[BaseDocumentStore] = None,
        cache_index: Optional[str] = None,
        cache_headers: Optional[Dict[str, str]] = None,
        cache_time: Optional[int] = None,
    ) -> List[List[Document]]:
        documents = []

        # TODO: parallelize using ProcessPoolExecutor and use Lock at document store methods
        for q in queries:
            documents.extend(
                self.retrieve(
                    q,
                    top_p=top_p,
                    top_k=top_k,
                    preprocessor=preprocessor,
                    cache_document_store=cache_document_store,
                    cache_index=cache_index,
                    cache_headers=cache_headers,
                    cache_time=cache_time,
                )
            )

        return [documents]

    def _request_headers(self):
        headers = {
            "accept": "*/*",
            "User-Agent": "pipelines/WebRetriever/dev",
            "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
            "referer": "https://www.google.com/",
        }
        return headers

    def _document_from_snippet(self, doc, query_norm):
        doc_dict = {
            "text": doc.content,
            "url": doc.meta["link"],
            "id_hash_keys": ["meta.url", "meta.search.query"],
            "search.query": query_norm,
        }
        d = Document.from_dict(doc_dict, field_map={"text": "content"})
        d.meta["timestamp"] = int(datetime.utcnow().timestamp())
        d.meta["search.position"] = doc.meta.pop("position", None)
        d.meta["search.snippet"] = 1
        return d

    def _document_from_scraped_page(self, search_result_doc, scraped_page, query_norm):
        scraped_page["id_hash_keys"] = ["meta.url", "meta.search.query"]
        scraped_page["search.query"] = query_norm
        scraped_page.pop("description", None)
        document = Document.from_dict(scraped_page, field_map={"text": "content"})
        document.meta["timestamp"] = int(datetime.utcnow().timestamp())
        document.meta["search.position"] = search_result_doc.meta.get("position")
        return document

__init__

__init__(api_key: str, search_engine_provider: Union[str, SearchEngine] = 'SerpAPI', engine: Optional[str] = 'google', top_search_results: Optional[int] = 10, search_engine_kwargs: Optional[Dict[str, Any]] = None, top_k: Optional[int] = 5, mode: Literal['snippets', 'raw_documents', 'preprocessed_documents'] = 'snippets', preprocessor: Optional[PreProcessor] = None, cache_document_store: Optional[BaseDocumentStore] = None, cache_index: Optional[str] = None, cache_headers: Optional[Dict[str, str]] = None, cache_time: int = 1 * 24 * 60 * 60)

Parameters:

Name Type Description Default
top_k Optional[int]

Top k documents to be returned by the retriever.

5
mode Literal['snippets', 'raw_documents', 'preprocessed_documents']

Whether to return snippets, raw documents, or preprocessed documents. Preprocessed documents are the default.

'snippets'
preprocessor Optional[PreProcessor]

Optional PreProcessor to be used to split documents into paragraphs. If not provided, the default PreProcessor is used.

None
cache_document_store Optional[BaseDocumentStore]

DocumentStore to be used to cache search results.

None
cache_index Optional[str]

Index name to be used to cache search results.

None
cache_headers Optional[Dict[str, str]]

Headers to be used to cache search results.

None
cache_time int

Time in seconds to cache search results. Defaults to 24 hours.

1 * 24 * 60 * 60
Source code in pipelines/pipelines/nodes/retriever/web.py
def __init__(
    self,
    api_key: str,
    search_engine_provider: Union[str, SearchEngine] = "SerpAPI",
    engine: Optional[str] = "google",
    top_search_results: Optional[int] = 10,
    search_engine_kwargs: Optional[Dict[str, Any]] = None,
    top_k: Optional[int] = 5,
    mode: Literal["snippets", "raw_documents", "preprocessed_documents"] = "snippets",
    preprocessor: Optional[PreProcessor] = None,
    cache_document_store: Optional[BaseDocumentStore] = None,
    cache_index: Optional[str] = None,
    cache_headers: Optional[Dict[str, str]] = None,
    cache_time: int = 1 * 24 * 60 * 60,
):
    """
    :param top_k: Top k documents to be returned by the retriever.
    :param mode: Whether to return snippets, raw documents, or preprocessed documents. Preprocessed documents are the default.
    :param preprocessor: Optional PreProcessor to be used to split documents into paragraphs. If not provided, the default PreProcessor is used.
    :param cache_document_store: DocumentStore to be used to cache search results.
    :param cache_index: Index name to be used to cache search results.
    :param cache_headers: Headers to be used to cache search results.
    :param cache_time: Time in seconds to cache search results. Defaults to 24 hours.
    """
    super().__init__()
    self.web_search = WebSearch(
        api_key=api_key,
        top_k=top_search_results,
        search_engine_provider=search_engine_provider,
        engine=engine,
        search_engine_kwargs=search_engine_kwargs,
    )
    self.mode = mode
    self.cache_document_store = cache_document_store
    self.cache_index = cache_index
    self.cache_headers = cache_headers
    self.cache_time = cache_time
    self.top_k = top_k
    if preprocessor is not None:
        self.preprocessor = preprocessor
    else:
        self.preprocessor = PreProcessor()

retrieve

retrieve(query: str, top_k: Optional[int] = None, preprocessor: Optional[PreProcessor] = None, cache_document_store: Optional[BaseDocumentStore] = None, cache_index: Optional[str] = None, cache_headers: Optional[Dict[str, str]] = None, cache_time: Optional[int] = None, **kwargs) -> List[Document]

Retrieve documents based on the list of URLs from the WebSearchEngine. The documents are scraped from the web at real-time. You can then store the documents in a DocumentStore for later use. You can cache them in a DocumentStore to improve retrieval time.

Parameters:

Name Type Description Default
query str

The query string.

required
top_k Optional[int]

The number of documents to be returned by the retriever. If None, the default value is used.

None
preprocessor Optional[PreProcessor]

The PreProcessor to be used to split documents into paragraphs.

None
cache_document_store Optional[BaseDocumentStore]

The DocumentStore to cache the documents to.

None
cache_index Optional[str]

The index name to save the documents to.

None
cache_headers Optional[Dict[str, str]]

The headers to save the documents to.

None
cache_time Optional[int]

The time limit in seconds to check the cache. The default is 24 hours.

None
Source code in pipelines/pipelines/nodes/retriever/web.py
def retrieve(  # type: ignore[override]
    self,
    query: str,
    top_k: Optional[int] = None,
    preprocessor: Optional[PreProcessor] = None,
    cache_document_store: Optional[BaseDocumentStore] = None,
    cache_index: Optional[str] = None,
    cache_headers: Optional[Dict[str, str]] = None,
    cache_time: Optional[int] = None,
    **kwargs,
) -> List[Document]:
    """
    Retrieve documents based on the list of URLs from the WebSearchEngine. The documents are scraped from the web
    at real-time. You can then store the documents in a DocumentStore for later use. You can cache them in a
    DocumentStore to improve retrieval time.
    :param query: The query string.
    :param top_k: The number of documents to be returned by the retriever. If None, the default value is used.
    :param preprocessor: The PreProcessor to be used to split documents into paragraphs.
    :param cache_document_store: The DocumentStore to cache the documents to.
    :param cache_index: The index name to save the documents to.
    :param cache_headers: The headers to save the documents to.
    :param cache_time: The time limit in seconds to check the cache. The default is 24 hours.
    """

    preprocessor = preprocessor or self.preprocessor
    cache_document_store = cache_document_store or self.cache_document_store
    cache_index = cache_index or self.cache_index
    cache_headers = cache_headers or self.cache_headers
    cache_time = cache_time or self.cache_time
    top_k = top_k or self.top_k

    query_norm = self._normalize_query(query)

    extracted_docs = self._check_cache(
        query_norm, cache_index=cache_index, cache_headers=cache_headers, cache_time=cache_time
    )

    # cache miss
    if not extracted_docs:
        search_results, _ = self.web_search.run(query=query)
        search_results = search_results["documents"]
        if self.mode == "snippets":
            return search_results  # type: ignore

        links: List[SearchResult] = [
            SearchResult(r.meta["link"], r.meta.get("score", None), r.meta.get("position", None))
            for r in search_results
            if r.meta.get("link")
        ]
        logger.debug("Starting to fetch %d links from WebSearch results", len(links))

        def scrape_direct(link: SearchResult) -> Dict[str, Any]:
            extractor = extractors.ArticleExtractor(raise_on_failure=False)
            try:
                extracted_doc = {}
                response = requests.get(link.url, headers=self._request_headers(), timeout=10)
                if response.status_code == 200 and len(response.text) > 0:
                    extracted_content = extractor.get_content(response.text)
                    if extracted_content:
                        extracted_doc = {
                            "text": extracted_content,
                            "url": link.url,
                            "search.score": link.score,
                            "search.position": link.position,
                        }
                return extracted_doc

            except Exception as e:
                logger.error("Error retrieving URL %s: %s", link.url, e)
                return {}

        thread_count = cpu_count() if len(links) > cpu_count() else len(links)
        with ThreadPoolExecutor(max_workers=thread_count) as executor:
            scraped_pages: Iterator[Dict[str, Any]] = executor.map(scrape_direct, links)

            failed = 0
            extracted_docs = []
            for scraped_page, search_result_doc in zip(scraped_pages, search_results):
                if scraped_page and "text" in scraped_page:
                    document = self._document_from_scraped_page(search_result_doc, scraped_page, query_norm)
                    extracted_docs.append(document)
                else:
                    logger.debug(
                        "Could not extract text from URL %s. Using search snippet.", search_result_doc.meta["link"]
                    )
                    snippet_doc = self._document_from_snippet(search_result_doc, query_norm)
                    extracted_docs.append(snippet_doc)
                    failed += 1

            logger.debug(
                "Extracted %d documents / %s snippets from %s URLs.",
                len(extracted_docs) - failed,
                failed,
                len(links),
            )

    if cache_document_store:
        cached = self._save_cache(query_norm, extracted_docs, cache_index=cache_index, cache_headers=cache_headers)
        if not cached:
            logger.warning(
                "Could not save retrieved documents to the DocumentStore cache. "
                "Check your document store configuration."
            )

    processed_docs = (
        [t for d in extracted_docs for t in preprocessor.process([d])]
        if self.mode == "preprocessed_documents"
        else extracted_docs
    )

    logger.debug("Processed %d documents resulting in %s documents", len(extracted_docs), len(processed_docs))
    return processed_docs[:top_k]