跳转至

Other Module

pipelines.pipelines.nodes.other.docs2answers

Docs2Answers

This Node is used to convert retrieved documents into predicted answers format. It is useful for situations where you are calling a Retriever only pipeline via REST API. This ensures that your output is in a compatible format.

Source code in pipelines/pipelines/nodes/other/docs2answers.py
class Docs2Answers(BaseComponent):
    """
    This Node is used to convert retrieved documents into predicted answers format.
    It is useful for situations where you are calling a Retriever only pipeline via REST API.
    This ensures that your output is in a compatible format.
    """

    outgoing_edges = 1

    def __init__(self):
        self.set_config()

    def run(self, query: str, documents: List[Document]):  # type: ignore
        # conversion from Document -> Answer
        answers: List[Answer] = []
        for doc in documents:
            # For FAQ style QA use cases
            if "answer" in doc.meta:
                doc.meta["query"] = doc.content  # question from the existing FAQ
                cur_answer = Answer(
                    answer=doc.meta["answer"],
                    type="other",
                    score=doc.score,
                    context=doc.meta["answer"],
                    offsets_in_context=[Span(start=0, end=len(doc.meta["answer"]))],
                    document_id=doc.id,
                    meta=doc.meta,
                )
            else:
                # Regular docs
                cur_answer = Answer(
                    answer="",
                    type="other",
                    score=doc.score,
                    context=doc.content,
                    document_id=doc.id,
                    meta=doc.meta,
                )
            answers.append(cur_answer)

        output = {"query": query, "answers": answers}

        return output, "output_1"

pipelines.pipelines.nodes.other.join_answers

JoinAnswers

A node to join Answers produced by multiple Reader nodes.

Source code in pipelines/pipelines/nodes/other/join_answers.py
class JoinAnswers(BaseComponent):
    """
    A node to join `Answer`s produced by multiple `Reader` nodes.
    """

    def __init__(
        self, join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None
    ):
        """
        :param join_mode: `"concatenate"` to combine documents from multiple `Reader`s. `"merge"` to aggregate scores
        of individual `Answer`s.
        :param weights: A node-wise list (length of list must be equal to the number of input nodes) of weights for
            adjusting `Answer` scores when using the `"merge"` join_mode. By default, equal weight is assigned to each
            `Reader` score. This parameter is not compatible with the `"concatenate"` join_mode.
        :param top_k_join: Limit `Answer`s to top_k based on the resulting scored of the join.
        """

        assert join_mode in ["concatenate", "merge"], f"JoinAnswers node does not support '{join_mode}' join_mode."
        assert not (
            weights is not None and join_mode == "concatenate"
        ), "Weights are not compatible with 'concatenate' join_mode"

        # Save init parameters to enable export of component config as YAML
        self.set_config(join_mode=join_mode, weights=weights, top_k_join=top_k_join)

        self.join_mode = join_mode
        self.weights = [float(i) / sum(weights) for i in weights] if weights else None
        self.top_k_join = top_k_join

    def run(self, inputs: List[Dict], top_k_join: Optional[int] = None) -> Tuple[Dict, str]:  # type: ignore
        reader_results = [inp["answers"] for inp in inputs]

        if not top_k_join:
            top_k_join = self.top_k_join

        if self.join_mode == "concatenate":
            concatenated_answers = [answer for cur_reader_result in reader_results for answer in cur_reader_result]
            concatenated_answers = sorted(concatenated_answers, reverse=True)[:top_k_join]
            return {"answers": concatenated_answers, "labels": inputs[0].get("labels", None)}, "output_1"

        elif self.join_mode == "merge":
            merged_answers = self._merge_answers(reader_results)

            merged_answers = merged_answers[:top_k_join]
            return {"answers": merged_answers, "labels": inputs[0].get("labels", None)}, "output_1"

        else:
            raise ValueError(f"Invalid join_mode: {self.join_mode}")

    def _merge_answers(self, reader_results: List[List[Answer]]) -> List[Answer]:
        weights = self.weights if self.weights else [1 / len(reader_results)] * len(reader_results)

        for result, weight in zip(reader_results, weights):
            for answer in result:
                if isinstance(answer.score, float):
                    answer.score *= weight

        return sorted([answer for cur_reader_result in reader_results for answer in cur_reader_result], reverse=True)

__init__

__init__(join_mode: str = 'concatenate', weights: Optional[List[float]] = None, top_k_join: Optional[int] = None)

Parameters:

Name Type Description Default
join_mode str

"concatenate" to combine documents from multiple Readers. "merge" to aggregate scores of individual Answers.

'concatenate'
weights Optional[List[float]]

A node-wise list (length of list must be equal to the number of input nodes) of weights for adjusting Answer scores when using the "merge" join_mode. By default, equal weight is assigned to each Reader score. This parameter is not compatible with the "concatenate" join_mode.

None
top_k_join Optional[int]

Limit Answers to top_k based on the resulting scored of the join.

None
Source code in pipelines/pipelines/nodes/other/join_answers.py
def __init__(
    self, join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None
):
    """
    :param join_mode: `"concatenate"` to combine documents from multiple `Reader`s. `"merge"` to aggregate scores
    of individual `Answer`s.
    :param weights: A node-wise list (length of list must be equal to the number of input nodes) of weights for
        adjusting `Answer` scores when using the `"merge"` join_mode. By default, equal weight is assigned to each
        `Reader` score. This parameter is not compatible with the `"concatenate"` join_mode.
    :param top_k_join: Limit `Answer`s to top_k based on the resulting scored of the join.
    """

    assert join_mode in ["concatenate", "merge"], f"JoinAnswers node does not support '{join_mode}' join_mode."
    assert not (
        weights is not None and join_mode == "concatenate"
    ), "Weights are not compatible with 'concatenate' join_mode"

    # Save init parameters to enable export of component config as YAML
    self.set_config(join_mode=join_mode, weights=weights, top_k_join=top_k_join)

    self.join_mode = join_mode
    self.weights = [float(i) / sum(weights) for i in weights] if weights else None
    self.top_k_join = top_k_join

pipelines.pipelines.nodes.other.join_docs

JoinDocuments

A node to join documents outputted by multiple retriever nodes.

The node allows multiple join modes: * concatenate: combine the documents from multiple nodes. Any duplicate documents are discarded. * merge: merge scores of documents from multiple nodes. Optionally, each input score can be given a different weight & a top_k limit can be set. This mode can also be used for "reranking" retrieved documents. * reciprocal_rank_fusion: combines the documents based on their rank in multiple nodes.

Source code in pipelines/pipelines/nodes/other/join_docs.py
class JoinDocuments(BaseComponent):
    """
    A node to join documents outputted by multiple retriever nodes.

    The node allows multiple join modes:
    * concatenate: combine the documents from multiple nodes. Any duplicate documents are discarded.
    * merge: merge scores of documents from multiple nodes. Optionally, each input score can be given a different
             `weight` & a `top_k` limit can be set. This mode can also be used for "reranking" retrieved documents.
    * reciprocal_rank_fusion: combines the documents based on their rank in multiple nodes.
    """

    outgoing_edges = 1

    def __init__(
        self, join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None
    ):
        """
        :param join_mode: `concatenate` to combine documents from multiple retrievers `merge` to aggregate scores of
                          individual documents, `reciprocal_rank_fusion` to apply rank based scoring.
        :param weights: A node-wise list(length of list must be equal to the number of input nodes) of weights for
                        adjusting document scores when using the `merge` join_mode. By default, equal weight is given
                        to each retriever score. This param is not compatible with the `concatenate` join_mode.
        :param top_k_join: Limit documents to top_k based on the resulting scores of the join.
        """
        assert join_mode in [
            "concatenate",
            "merge",
            "reciprocal_rank_fusion",
        ], f"JoinDocuments node does not support '{join_mode}' join_mode."

        assert not (
            weights is not None and join_mode == "concatenate"
        ), "Weights are not compatible with 'concatenate' join_mode."

        # save init parameters to enable export of component config as YAML
        self.set_config(join_mode=join_mode, weights=weights, top_k_join=top_k_join)

        self.join_mode = join_mode
        self.weights = [float(i) / sum(weights) for i in weights] if weights else None
        self.top_k_join = top_k_join

    def run(self, inputs: List[dict], top_k_join: Optional[int] = None):  # type: ignore
        results = [inp["documents"] for inp in inputs]
        document_map = {doc.id: doc for result in results for doc in result}

        if self.join_mode == "concatenate":
            scores_map = self._concatenate_results(results)
        elif self.join_mode == "merge":
            scores_map = self._calculate_comb_sum(results)
        elif self.join_mode == "reciprocal_rank_fusion":
            scores_map = self._calculate_rrf(results)
        else:
            raise ValueError(f"Invalid join_mode: {self.join_mode}")

        sorted_docs = sorted(scores_map.items(), key=lambda d: d[1], reverse=True)

        if not top_k_join:
            top_k_join = self.top_k_join
        if not top_k_join:
            top_k_join = len(sorted_docs)

        docs = []
        for (id, score) in sorted_docs[:top_k_join]:
            doc = document_map[id]
            doc.score = score
            docs.append(doc)

        output = {"documents": docs, "labels": inputs[0].get("labels", None)}

        return output, "output_1"

    def _concatenate_results(self, results):
        """
        Concatenates multiple document result lists.
        """
        return {doc.id: doc.score for result in results for doc in result}

    def _calculate_comb_sum(self, results):
        """
        Calculates a combination sum by multiplying each score by its weight.
        """
        scores_map = defaultdict(int)
        weights = self.weights if self.weights else [1 / len(results)] * len(results)

        for result, weight in zip(results, weights):
            for doc in result:
                scores_map[doc.id] += doc.score * weight

        return scores_map

    def _calculate_rrf(self, results):
        """
        Calculates the reciprocal rank fusion. The constant K is set to 61 (60 was suggested by the original paper,
        plus 1 as python lists are 0-based and the paper used 1-based ranking).
        """
        K = 61

        scores_map = defaultdict(int)
        for result in results:
            for rank, doc in enumerate(result):
                scores_map[doc.id] += 1 / (K + rank)

        return scores_map

__init__

__init__(join_mode: str = 'concatenate', weights: Optional[List[float]] = None, top_k_join: Optional[int] = None)

Parameters:

Name Type Description Default
join_mode str

concatenate to combine documents from multiple retrievers merge to aggregate scores of individual documents, reciprocal_rank_fusion to apply rank based scoring.

'concatenate'
weights Optional[List[float]]

A node-wise list(length of list must be equal to the number of input nodes) of weights for adjusting document scores when using the merge join_mode. By default, equal weight is given to each retriever score. This param is not compatible with the concatenate join_mode.

None
top_k_join Optional[int]

Limit documents to top_k based on the resulting scores of the join.

None
Source code in pipelines/pipelines/nodes/other/join_docs.py
def __init__(
    self, join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None
):
    """
    :param join_mode: `concatenate` to combine documents from multiple retrievers `merge` to aggregate scores of
                      individual documents, `reciprocal_rank_fusion` to apply rank based scoring.
    :param weights: A node-wise list(length of list must be equal to the number of input nodes) of weights for
                    adjusting document scores when using the `merge` join_mode. By default, equal weight is given
                    to each retriever score. This param is not compatible with the `concatenate` join_mode.
    :param top_k_join: Limit documents to top_k based on the resulting scores of the join.
    """
    assert join_mode in [
        "concatenate",
        "merge",
        "reciprocal_rank_fusion",
    ], f"JoinDocuments node does not support '{join_mode}' join_mode."

    assert not (
        weights is not None and join_mode == "concatenate"
    ), "Weights are not compatible with 'concatenate' join_mode."

    # save init parameters to enable export of component config as YAML
    self.set_config(join_mode=join_mode, weights=weights, top_k_join=top_k_join)

    self.join_mode = join_mode
    self.weights = [float(i) / sum(weights) for i in weights] if weights else None
    self.top_k_join = top_k_join

pipelines.pipelines.nodes.other.route_documents

RouteDocuments

A node to split a list of Documents by content_type or by the values of a metadata field and route them to different nodes.

Source code in pipelines/pipelines/nodes/other/route_documents.py
class RouteDocuments(BaseComponent):
    """
    A node to split a list of `Document`s by `content_type` or by the values of a metadata field and route them to
    different nodes.
    """

    # By default (split_by == "content_type"), the node has two outgoing edges.
    outgoing_edges = 2

    def __init__(self, split_by: str = "content_type", metadata_values: Optional[List[str]] = None):
        """
        :param split_by: Field to split the documents by, either `"content_type"` or a metadata field name.
            If this parameter is set to `"content_type"`, the list of `Document`s will be split into a list containing
            only `Document`s of type `"text"` (will be routed to `"output_1"`) and a list containing only `Document`s of
            type `"text"` (will be routed to `"output_2"`).
            If this parameter is set to a metadata field name, you need to specify the parameter `metadata_values` as
            well.
        :param metadata_values: If the parameter `split_by` is set to a metadata field name, you need to provide a list
            of values to group the `Document`s to. `Document`s whose metadata field is equal to the first value of the
            provided list will be routed to `"output_1"`, `Document`s whose metadata field is equal to the second
            value of the provided list will be routed to `"output_2"`, etc.
        """

        assert split_by == "content_type" or metadata_values is not None, (
            "If split_by is set to the name of a metadata field, you must provide metadata_values "
            "to group the documents to."
        )

        # Save init parameters to enable export of component config as YAML
        self.set_config(split_by=split_by, metadata_values=metadata_values)

        self.split_by = split_by
        self.metadata_values = metadata_values

        # If we split list of Documents by a metadata field, number of outgoing edges might change
        if split_by != "content_type" and metadata_values is not None:
            self.outgoing_edges = len(metadata_values)

    def run(self, documents: List[Document]) -> Tuple[Dict, str]:  # type: ignore
        if self.split_by == "content_type":
            split_documents: Dict[str, List[Document]] = {"output_1": [], "output_2": []}

            for doc in documents:
                if doc.content_type == "text":
                    split_documents["output_1"].append(doc)
                elif doc.content_type == "table":
                    split_documents["output_2"].append(doc)

        else:
            assert isinstance(self.metadata_values, list), (
                "You need to provide metadata_values if you want to split" " a list of Documents by a metadata field."
            )
            split_documents = {f"output_{i+1}": [] for i in range(len(self.metadata_values))}
            for doc in documents:
                current_metadata_value = doc.meta.get(self.split_by, None)
                # Disregard current document if it does not contain the provided metadata field
                if current_metadata_value is not None:
                    try:
                        index = self.metadata_values.index(current_metadata_value)
                    except ValueError:
                        # Disregard current document if current_metadata_value is not in the provided metadata_values
                        continue

                    split_documents[f"output_{index+1}"].append(doc)

        return split_documents, "split_documents"

__init__

__init__(split_by: str = 'content_type', metadata_values: Optional[List[str]] = None)

Parameters:

Name Type Description Default
split_by str

Field to split the documents by, either "content_type" or a metadata field name. If this parameter is set to "content_type", the list of Documents will be split into a list containing only Documents of type "text" (will be routed to "output_1") and a list containing only Documents of type "text" (will be routed to "output_2"). If this parameter is set to a metadata field name, you need to specify the parameter metadata_values as well.

'content_type'
metadata_values Optional[List[str]]

If the parameter split_by is set to a metadata field name, you need to provide a list of values to group the Documents to. Documents whose metadata field is equal to the first value of the provided list will be routed to "output_1", Documents whose metadata field is equal to the second value of the provided list will be routed to "output_2", etc.

None
Source code in pipelines/pipelines/nodes/other/route_documents.py
def __init__(self, split_by: str = "content_type", metadata_values: Optional[List[str]] = None):
    """
    :param split_by: Field to split the documents by, either `"content_type"` or a metadata field name.
        If this parameter is set to `"content_type"`, the list of `Document`s will be split into a list containing
        only `Document`s of type `"text"` (will be routed to `"output_1"`) and a list containing only `Document`s of
        type `"text"` (will be routed to `"output_2"`).
        If this parameter is set to a metadata field name, you need to specify the parameter `metadata_values` as
        well.
    :param metadata_values: If the parameter `split_by` is set to a metadata field name, you need to provide a list
        of values to group the `Document`s to. `Document`s whose metadata field is equal to the first value of the
        provided list will be routed to `"output_1"`, `Document`s whose metadata field is equal to the second
        value of the provided list will be routed to `"output_2"`, etc.
    """

    assert split_by == "content_type" or metadata_values is not None, (
        "If split_by is set to the name of a metadata field, you must provide metadata_values "
        "to group the documents to."
    )

    # Save init parameters to enable export of component config as YAML
    self.set_config(split_by=split_by, metadata_values=metadata_values)

    self.split_by = split_by
    self.metadata_values = metadata_values

    # If we split list of Documents by a metadata field, number of outgoing edges might change
    if split_by != "content_type" and metadata_values is not None:
        self.outgoing_edges = len(metadata_values)