跳转至

Controller Module

pipelines.rest_api.controller.document

delete_documents

delete_documents(filters: FilterRequest)

This endpoint allows you to delete documents contained in your document store. You can filter the documents to delete by metadata (like the document's name), or provide an empty JSON object to clear the document store.

Example of filters: '{"filters": {{"name": ["some", "more"], "category": ["only_one"]}}'

To get all documents you should provide an empty dict, like: '{"filters": {}}'

Source code in pipelines/rest_api/controller/document.py
@router.post("/documents/delete_by_filters", response_model=bool)
def delete_documents(filters: FilterRequest):
    """
    This endpoint allows you to delete documents contained in your document store.
    You can filter the documents to delete by metadata (like the document's name),
    or provide an empty JSON object to clear the document store.

    Example of filters:
    `'{"filters": {{"name": ["some", "more"], "category": ["only_one"]}}'`

    To get all documents you should provide an empty dict, like:
    `'{"filters": {}}'`
    """
    DOCUMENT_STORE.delete_documents(filters=filters.filters)
    return True

get_documents

get_documents(filters: FilterRequest)

This endpoint allows you to retrieve documents contained in your document store. You can filter the documents to delete by metadata (like the document's name), or provide an empty JSON object to clear the document store.

Example of filters: '{"filters": {{"name": ["some", "more"], "category": ["only_one"]}}'

To get all documents you should provide an empty dict, like: '{"filters": {}}'

Source code in pipelines/rest_api/controller/document.py
@router.post("/documents/get_by_filters", response_model=List[DocumentSerialized], response_model_exclude_none=True)
def get_documents(filters: FilterRequest):
    """
    This endpoint allows you to retrieve documents contained in your document store.
    You can filter the documents to delete by metadata (like the document's name),
    or provide an empty JSON object to clear the document store.

    Example of filters:
    `'{"filters": {{"name": ["some", "more"], "category": ["only_one"]}}'`

    To get all documents you should provide an empty dict, like:
    `'{"filters": {}}'`
    """
    docs = [doc.to_dict() for doc in DOCUMENT_STORE.get_all_documents(filters=filters.filters)]
    for doc in docs:
        doc["embedding"] = None
    return docs

pipelines.rest_api.controller.feedback

delete_feedback

delete_feedback()

This endpoint allows the API user to delete all the feedback that has been sumbitted through the POST /feedback endpoint

Source code in pipelines/rest_api/controller/feedback.py
@router.delete("/feedback")
def delete_feedback():
    """
    This endpoint allows the API user to delete all the
    feedback that has been sumbitted through the
    `POST /feedback` endpoint
    """
    all_labels = DOCUMENT_STORE.get_all_labels()
    user_label_ids = [label.id for label in all_labels if label.origin == "user-feedback"]
    DOCUMENT_STORE.delete_labels(ids=user_label_ids)

export_feedback

export_feedback(context_size: int = 100000, full_document_context: bool = True, only_positive_labels: bool = False)

This endpoint returns JSON output in the SQuAD format for question/answer pairs that were marked as "relevant" by user feedback through the POST /feedback endpoint.

The context_size param can be used to limit response size for large documents.

Source code in pipelines/rest_api/controller/feedback.py
@router.get("/export-feedback")
def export_feedback(
    context_size: int = 100_000, full_document_context: bool = True, only_positive_labels: bool = False
):
    """
    This endpoint returns JSON output in the SQuAD format for question/answer pairs
    that were marked as "relevant" by user feedback through the `POST /feedback` endpoint.

    The context_size param can be used to limit response size for large documents.
    """
    if only_positive_labels:
        labels = DOCUMENT_STORE.get_all_labels(filters={"is_correct_answer": [True], "origin": ["user-feedback"]})
    else:
        labels = DOCUMENT_STORE.get_all_labels(filters={"origin": ["user-feedback"]})
        # Filter out the labels where the passage is correct but answer is wrong (in SQuAD this matches
        # neither a "positive example" nor a negative "is_impossible" one)
        labels = [l for l in labels if not (l.is_correct_document is True and l.is_correct_answer is False)]

    export_data = []

    for label in labels:
        if full_document_context:
            context = label.document.content

            answer_start = label.answer.offsets_in_document[0].start
        else:
            text = label.document.content
            # the final length of context(including the answer string) is 'context_size'.
            # we try to add equal characters for context before and after the answer string.
            # if either beginning or end of text is reached, we correspondingly
            # append more context characters at the other end of answer string.
            context_to_add = int((context_size - len(label.answer.answer)) / 2)
            start_pos = max(label.answer.offsets_in_document[0].start - context_to_add, 0)
            additional_context_at_end = max(context_to_add - label.answer.offsets_in_document[0].start, 0)
            end_pos = min(
                label.answer.offsets_in_document[0].start + len(label.answer.answer) + context_to_add, len(text) - 1
            )
            additional_context_at_start = max(
                label.answer.offsets_in_document[0].start + len(label.answer.answer) + context_to_add - len(text), 0
            )
            start_pos = max(0, start_pos - additional_context_at_start)
            end_pos = min(len(text) - 1, end_pos + additional_context_at_end)
            context = text[start_pos:end_pos]
            answer_start = label.answer.offsets_in_document[0].start - start_pos

        if label.is_correct_answer is False and label.is_correct_document is False:  # No answer
            squad_label = {
                "paragraphs": [
                    {
                        "context": context,
                        "id": label.document.id,
                        "qas": [{"question": label.query, "id": label.id, "is_impossible": True, "answers": []}],
                    }
                ]
            }
        else:
            squad_label = {
                "paragraphs": [
                    {
                        "context": context,
                        "id": label.document.id,
                        "qas": [
                            {
                                "question": label.query,
                                "id": label.id,
                                "is_impossible": False,
                                "answers": [{"text": label.answer.answer, "answer_start": answer_start}],
                            }
                        ],
                    }
                ]
            }

            # quality check
            start = squad_label["paragraphs"][0]["qas"][0]["answers"][0]["answer_start"]
            answer = squad_label["paragraphs"][0]["qas"][0]["answers"][0]["text"]
            context = squad_label["paragraphs"][0]["context"]
            if not context[start : start + len(answer)] == answer:
                logger.error(
                    f"Skipping invalid squad label as string via offsets "
                    f"('{context[start:start + len(answer)]}') does not match answer string ('{answer}') "
                )
        export_data.append(squad_label)

    export = {"data": export_data}

    with open("feedback_squad_direct.json", "w", encoding="utf8") as f:
        json.dump(export_data, f, ensure_ascii=False, sort_keys=True, indent=4)
    return export

get_feedback

get_feedback()

This endpoint allows the API user to retrieve all the feedback that has been submitted through the POST /feedback endpoint.

Source code in pipelines/rest_api/controller/feedback.py
@router.get("/feedback")
def get_feedback():
    """
    This endpoint allows the API user to retrieve all the feedback that has been submitted
    through the `POST /feedback` endpoint.
    """
    labels = DOCUMENT_STORE.get_all_labels()
    return labels

get_feedback_metrics

get_feedback_metrics(filters: FilterRequest = None)

This endpoint returns basic accuracy metrics based on user feedback, e.g., the ratio of correct answers or correctly identified documents. You can filter the output by document or label.

Example:

curl --location --request POST 'http://127.0.0.1:8000/eval-doc-qa-feedback' --header 'Content-Type: application/json' --data-raw '{ "filters": {"document_id": ["XRR3xnEBCYVTkbTystOB"]} }'

Source code in pipelines/rest_api/controller/feedback.py
@router.post("/eval-feedback")
def get_feedback_metrics(filters: FilterRequest = None):
    """
    This endpoint returns basic accuracy metrics based on user feedback,
    e.g., the ratio of correct answers or correctly identified documents.
    You can filter the output by document or label.

    Example:

    `curl --location --request POST 'http://127.0.0.1:8000/eval-doc-qa-feedback' \
     --header 'Content-Type: application/json' \
     --data-raw '{ "filters": {"document_id": ["XRR3xnEBCYVTkbTystOB"]} }'`
    """

    if filters:
        filters_content = filters.filters or {}
        filters_content["origin"] = ["user-feedback"]
    else:
        filters_content = {"origin": ["user-feedback"]}

    labels = DOCUMENT_STORE.get_all_labels(filters=filters_content)

    res: Dict[str, Optional[Union[float, int]]]
    if len(labels) > 0:
        answer_feedback = [1 if l.is_correct_answer else 0 for l in labels]
        doc_feedback = [1 if l.is_correct_document else 0 for l in labels]

        answer_accuracy = sum(answer_feedback) / len(answer_feedback)
        doc_accuracy = sum(doc_feedback) / len(doc_feedback)

        res = {"answer_accuracy": answer_accuracy, "document_accuracy": doc_accuracy, "n_feedback": len(labels)}
    else:
        res = {"answer_accuracy": None, "document_accuracy": None, "n_feedback": 0}
    return res

post_feedback

post_feedback(feedback: Union[LabelSerialized, CreateLabelSerialized])

This endpoint allows the API user to submit feedback on an answer for a particular query.

For example, the user can send feedback on whether the answer was correct and whether the right snippet was identified as the answer.

Information submitted through this endpoint is used to train the underlying QA model.

Source code in pipelines/rest_api/controller/feedback.py
@router.post("/feedback")
def post_feedback(feedback: Union[LabelSerialized, CreateLabelSerialized]):
    """
    This endpoint allows the API user to submit feedback on an answer for a particular query.

    For example, the user can send feedback on whether the answer was correct and
    whether the right snippet was identified as the answer.

    Information submitted through this endpoint is used to train the underlying QA model.
    """

    if feedback.origin is None:
        feedback.origin = "user-feedback"

    label = Label(**feedback.dict())
    DOCUMENT_STORE.write_labels([label])

pipelines.rest_api.controller.file_upload

upload_file

upload_file(files: List[UploadFile] = File(...), meta: Optional[str] = Form('null'), fileconverter_params: FileConverterParams = Depends(FileConverterParams.as_form), preprocessor_params: PreprocessorParams = Depends(PreprocessorParams.as_form))

You can use this endpoint to upload a file for indexing

Source code in pipelines/rest_api/controller/file_upload.py
@router.post("/file-upload")
def upload_file(
    files: List[UploadFile] = File(...),
    # JSON serialized string
    meta: Optional[str] = Form("null"),  # type: ignore
    fileconverter_params: FileConverterParams = Depends(FileConverterParams.as_form),  # type: ignore
    preprocessor_params: PreprocessorParams = Depends(PreprocessorParams.as_form),  # type: ignore
):
    """
    You can use this endpoint to upload a file for indexing
    """
    if not INDEXING_PIPELINE:
        raise HTTPException(status_code=501, detail="Indexing Pipeline is not configured.")
    file_paths: list = []
    file_metas: list = []
    meta_form = json.loads(meta) or {}  # type: ignore
    if not isinstance(meta_form, dict):
        raise HTTPException(status_code=500, detail=f"The meta field must be a dict or None, not {type(meta_form)}")

    for file in files:
        try:
            file_path = Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{file.filename}"
            with file_path.open("wb") as buffer:
                shutil.copyfileobj(file.file, buffer)

            file_paths.append(file_path)
            meta_form["name"] = file.filename
            file_metas.append(meta_form)
        finally:
            file.file.close()

    INDEXING_PIPELINE.run(
        file_paths=file_paths,
        meta=file_metas,
        params={
            "TextFileConverter": fileconverter_params.dict(),
            "PDFFileConverter": fileconverter_params.dict(),
            "Preprocessor": preprocessor_params.dict(),
        },
    )
    return {"message": "OK"}

upload_qa_file

upload_qa_file(files: List[UploadFile] = File(...), meta: Optional[str] = Form('null'), fileconverter_params: FileConverterParams = Depends(FileConverterParams.as_form))

You can use this endpoint to upload a file for indexing

Source code in pipelines/rest_api/controller/file_upload.py
@router.post("/file-upload-qa-generate")
def upload_qa_file(
    files: List[UploadFile] = File(...),
    # JSON serialized string
    meta: Optional[str] = Form("null"),  # type: ignore
    fileconverter_params: FileConverterParams = Depends(FileConverterParams.as_form),  # type: ignore
):
    """
    You can use this endpoint to upload a file for indexing
    """
    if not INDEXING_QA_GENERATING_PIPELINE:
        raise HTTPException(status_code=501, detail="INDEXING_QA_GENERATING_PIPELINE  is not configured.")

    file_paths: list = []
    file_metas: list = []
    meta_form = json.loads(meta) or {}  # type: ignore
    if not isinstance(meta_form, dict):
        raise HTTPException(status_code=500, detail=f"The meta field must be a dict or None, not {type(meta_form)}")

    for file in files:
        try:
            file_path = Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{file.filename}"
            with file_path.open("wb") as buffer:
                shutil.copyfileobj(file.file, buffer)

            file_paths.append(file_path)
            meta_form["name"] = file.filename
            file_metas.append(meta_form)
        finally:
            file.file.close()

    INDEXING_QA_GENERATING_PIPELINE.run(
        file_paths=file_paths,
        meta=file_metas,
        params={
            "TextFileConverter": fileconverter_params.dict(),
            "PDFFileConverter": fileconverter_params.dict(),
        },
    )
    return {"message": "OK"}

pipelines.rest_api.controller.router

pipelines.rest_api.controller.search

chatfile_query

chatfile_query(request: QueryRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/chatfile_query", response_model=Chatfile_QueryResponse, response_model_exclude_none=True)
def chatfile_query(request: QueryRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    with concurrency_limiter.run():
        result = _process_request(PIPELINE, request)
        return result

check_status

check_status()

This endpoint can be used during startup to understand if the server is ready to take any requests, or is still loading.

The recommended approach is to call this endpoint with a short timeout, like 500ms, and in case of no reply, consider the server busy.

Source code in pipelines/rest_api/controller/search.py
@router.get("/initialized")
def check_status():
    """
    This endpoint can be used during startup to understand if the
    server is ready to take any requests, or is still loading.

    The recommended approach is to call this endpoint with a short timeout,
    like 500ms, and in case of no reply, consider the server busy.
    """
    return True

pipelines_version

pipelines_version()

Get the running pipelines version.

Source code in pipelines/rest_api/controller/search.py
@router.get("/hs_version")
def pipelines_version():
    """
    Get the running pipelines version.
    """
    return {"hs_version": pipelines.__version__}

query

query(request: QueryRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/query", response_model=QueryResponse, response_model_exclude_none=True)
def query(request: QueryRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    with concurrency_limiter.run():
        result = _process_request(PIPELINE, request)
        return result

query_documents

query_documents(request: DocumentRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/query_documents", response_model=DocumentResponse, response_model_exclude_none=True)
def query_documents(request: DocumentRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    result = {}
    result["meta"] = request.meta
    params = request.params or {}
    res = PIPELINE.run(meta=request.meta, params=params, debug=request.debug)
    result["results"] = res["results"]
    return result

query_images

query_images(request: QueryRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/query_text_to_images", response_model=QueryImageResponse, response_model_exclude_none=True)
def query_images(request: QueryRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    result = {}
    result["query"] = request.query
    params = request.params or {}
    res = PIPELINE.run(query=request.query, params=params, debug=request.debug)
    # Ensure answers and documents exist, even if they're empty lists
    result["answers"] = res["results"]
    if "documents" not in result:
        result["documents"] = []
    if "answers" not in result:
        result["answers"] = []
    return result

query_images_for_retrieval

query_images_for_retrieval(files: List[UploadFile] = File(...), meta: Optional[str] = Form('null'))

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/query_images", response_model=QueryResponse, response_model_exclude_none=True)
def query_images_for_retrieval(
    files: List[UploadFile] = File(...),
    # JSON serialized string
    meta: Optional[str] = Form("null"),
):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    file_paths: list = []
    file_metas: list = []
    meta_form = json.loads(meta) or {}  # type: ignore

    for file in files:
        try:
            file_path = Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{file.filename}"
            with file_path.open("wb") as buffer:
                shutil.copyfileobj(file.file, buffer)

            file_paths.append(file_path)
            # meta_form["name"] = file.filename
            file_metas.append(meta_form)
        finally:
            file.file.close()
    result = PIPELINE.run(query=str(file_paths[0]), params=meta_form, debug=True)
    return result

query_qa_pairs

query_qa_pairs(request: QueryQAPairRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/query_qa_pairs", response_model=QueryQAPairResponse, response_model_exclude_none=True)
def query_qa_pairs(request: QueryQAPairRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    print("request", request)
    result = {}
    result["meta"] = request.meta
    params = request.params or {}
    res = QA_PAIR_PIPELINE.run(meta=request.meta, params=params, debug=request.debug)
    result["filtered_cqa_triples"] = res["filtered_cqa_triples"]
    return result

senta_file

senta_file(request: SentaRequest)

This endpoint receives the question as a string and allows the requester to set additional parameters that will be passed on to the pipelines pipeline.

Source code in pipelines/rest_api/controller/search.py
@router.post("/senta_file", response_model=SentaResponse, response_model_exclude_none=True)
def senta_file(request: SentaRequest):
    """
    This endpoint receives the question as a string and allows the requester to set
    additional parameters that will be passed on to the pipelines pipeline.
    """
    result = {}
    result["meta"] = request.meta
    params = request.params or {}
    res = PIPELINE.run(meta=request.meta, params=params, debug=request.debug)
    result["img_dict"] = res["img_dict"]
    return result