跳转至

LLM Module

pipelines.pipelines.nodes.llm.chatglm

ChatGLMBot

Source code in pipelines/pipelines/nodes/llm/chatglm.py
class ChatGLMBot(BaseComponent):
    def __init__(
        self,
        model_name_or_path="THUDM/chatglm-6b-v1.1",
        batch_size: int = 2,
        max_seq_length: int = 2048,
        tgt_length: int = 2048,
        **kwargs
    ):
        """
        Initialize the ChatGLMBot instance.

        :param batch_size: batch_size for chatglm prediction.
        :param max_seq_length: max_seq_length for the processing input.
        :param tgt_length: tgt_length for models output
        """
        self.kwargs = kwargs
        self.chatglm = Taskflow(
            "text2text_generation",
            model=model_name_or_path,
            batch_size=batch_size,
            max_seq_length=max_seq_length,
            tgt_length=tgt_length,
            **self.kwargs,
        )

    def predict(self, query, stream=False):
        result = self.chatglm(query)
        return result

    def run(self, query, stream=False, **kwargs):
        """
        Using the chatbot to generate the answers
        :param query: The user's input/query to be sent to the chatGLM.
        :param stream: Whether to use streaming mode when making the request. Currently not in use. Defaults to False.
        """
        debug = kwargs.get("debug", False)
        if debug:
            logger.debug(f"Query: {query}")
        result = self.predict(query=query, stream=stream)
        return result, "output_1"

__init__

__init__(model_name_or_path='THUDM/chatglm-6b-v1.1', batch_size: int = 2, max_seq_length: int = 2048, tgt_length: int = 2048, **kwargs)

Initialize the ChatGLMBot instance.

Parameters:

Name Type Description Default
batch_size int

batch_size for chatglm prediction.

2
max_seq_length int

max_seq_length for the processing input.

2048
tgt_length int

tgt_length for models output

2048
Source code in pipelines/pipelines/nodes/llm/chatglm.py
def __init__(
    self,
    model_name_or_path="THUDM/chatglm-6b-v1.1",
    batch_size: int = 2,
    max_seq_length: int = 2048,
    tgt_length: int = 2048,
    **kwargs
):
    """
    Initialize the ChatGLMBot instance.

    :param batch_size: batch_size for chatglm prediction.
    :param max_seq_length: max_seq_length for the processing input.
    :param tgt_length: tgt_length for models output
    """
    self.kwargs = kwargs
    self.chatglm = Taskflow(
        "text2text_generation",
        model=model_name_or_path,
        batch_size=batch_size,
        max_seq_length=max_seq_length,
        tgt_length=tgt_length,
        **self.kwargs,
    )

run

run(query, stream=False, **kwargs)

Using the chatbot to generate the answers

Parameters:

Name Type Description Default
query

The user's input/query to be sent to the chatGLM.

required
stream

Whether to use streaming mode when making the request. Currently not in use. Defaults to False.

False
Source code in pipelines/pipelines/nodes/llm/chatglm.py
def run(self, query, stream=False, **kwargs):
    """
    Using the chatbot to generate the answers
    :param query: The user's input/query to be sent to the chatGLM.
    :param stream: Whether to use streaming mode when making the request. Currently not in use. Defaults to False.
    """
    debug = kwargs.get("debug", False)
    if debug:
        logger.debug(f"Query: {query}")
    result = self.predict(query=query, stream=stream)
    return result, "output_1"

pipelines.pipelines.nodes.llm.ernie_bot

ErnieBot

The ErnieBot class is a subclass of the BaseComponent class, which is designed to interface with the Ernie Bot API for generating AI chatbot responses. It handles the interaction with the API using the provided api_key, secret_key . It allows you to make a request with a given query and optional conversation history, receiving a response from the chatbot and extending the conversation history accordingly.

Source code in pipelines/pipelines/nodes/llm/ernie_bot.py
class ErnieBot(BaseComponent):
    """
    The ErnieBot class is a subclass of the BaseComponent class, which is designed to interface with
    the Ernie Bot API for generating AI chatbot responses. It handles the interaction with the API using
    the provided api_key, secret_key . It allows you to make a request with a given query and optional conversation
    history, receiving a response from the chatbot and extending the conversation history accordingly.
    """

    outgoing_edges = 1
    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    def __init__(self, api_key=None, secret_key=None, model_name="ERNIE-Bot-turbo"):
        """
        Initialize the ErnieBot instance with the provided api_key and secret_key.

        :param api_key: api_key for applying token to request wenxin api.
        :param secret_key: secret_key for applying token to request wenxin api.
        """
        api_key = api_key or os.environ.get("ERNIE_BOT_API_KEY", None)
        secret_key = secret_key or os.environ.get("ERNIE_BOT_SECRET_KEY", None)
        self.model_name = model_name
        if api_key is None or secret_key is None:
            raise Exception(
                "Please apply api_key and secret_key from https://cloud.baidu.com/doc/WENXINWORKSHOP/s/flfmc9do2"
            )
        self.api_key = api_key
        self.secret_key = secret_key
        self.token = self._apply_token(self.api_key, self.secret_key)

    def _apply_token(self, api_key, secret_key):
        payload = ""
        self.token_host = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={api_key}&client_secret={secret_key}"
        response = requests.request("POST", self.token_host, headers=self.headers, data=payload)
        if response:
            res = response.json()
        else:
            raise RuntimeError("Request access token error.")

        return res["access_token"]

    def predict(self, query, history=None, stream=False, api_key=None, secret_key=None):
        if api_key is not None and secret_key is not None:
            self.token = self._apply_token(api_key, secret_key)
        payload = {"messages": []}
        if history is not None:
            if len(history) % 2 == 0:
                for past_msg in history:
                    if past_msg["role"] not in ["user", "assistant"]:
                        raise ValueError(
                            "Invalid history: The `role` in each message in history must be `user` or `assistant`."
                        )
                payload["messages"].extend(history)
            else:
                raise ValueError("Invalid history: an even number of `messages` is expected!")

        payload["messages"].append({"role": "user", "content": f"{query}"})
        # Do not use stream for now
        if stream:
            payload["stream"] = True
        chat_url = ernie_dict[self.model_name].format(self.token)
        response = requests.request("POST", chat_url, headers=self.headers, data=json.dumps(payload))
        response_json = json.loads(response.text)
        if history is None:
            return_history = []
        else:
            return_history = copy.deepcopy(history)
        try:
            return_history.extend(
                [{"role": "user", "content": query}, {"role": "assistant", "content": response_json["result"]}]
            )
            response_json["history"] = return_history
        except Exception as e:
            logger.error(e)
            logger.error(response_json)
        return response_json

    def run(self, query, history=None, stream=False, api_key=None, secret_key=None, **kwargs):
        """
        Send a request to the Ernie Bot API with the given query and optional conversation history.
        Returns the chatbot response and updates the conversation history accordingly.

        :param query: The user's input/query to be sent to the Ernie Bot API.
        :param history: A list of dictionaries representing the conversation history,
        :param stream: Whether to use streaming mode when making the request. Currently not in use. Defaults to False.
        """
        debug = kwargs.get("debug", False)
        if debug:
            logger.debug(f"Query: {query}")
        response_json = self.predict(query, history, stream)
        return response_json, "output_1"

__init__

__init__(api_key=None, secret_key=None, model_name='ERNIE-Bot-turbo')

Initialize the ErnieBot instance with the provided api_key and secret_key.

Parameters:

Name Type Description Default
api_key

api_key for applying token to request wenxin api.

None
secret_key

secret_key for applying token to request wenxin api.

None
Source code in pipelines/pipelines/nodes/llm/ernie_bot.py
def __init__(self, api_key=None, secret_key=None, model_name="ERNIE-Bot-turbo"):
    """
    Initialize the ErnieBot instance with the provided api_key and secret_key.

    :param api_key: api_key for applying token to request wenxin api.
    :param secret_key: secret_key for applying token to request wenxin api.
    """
    api_key = api_key or os.environ.get("ERNIE_BOT_API_KEY", None)
    secret_key = secret_key or os.environ.get("ERNIE_BOT_SECRET_KEY", None)
    self.model_name = model_name
    if api_key is None or secret_key is None:
        raise Exception(
            "Please apply api_key and secret_key from https://cloud.baidu.com/doc/WENXINWORKSHOP/s/flfmc9do2"
        )
    self.api_key = api_key
    self.secret_key = secret_key
    self.token = self._apply_token(self.api_key, self.secret_key)

run

run(query, history=None, stream=False, api_key=None, secret_key=None, **kwargs)

Send a request to the Ernie Bot API with the given query and optional conversation history. Returns the chatbot response and updates the conversation history accordingly.

Parameters:

Name Type Description Default
query

The user's input/query to be sent to the Ernie Bot API.

required
history

A list of dictionaries representing the conversation history,

None
stream

Whether to use streaming mode when making the request. Currently not in use. Defaults to False.

False
Source code in pipelines/pipelines/nodes/llm/ernie_bot.py
def run(self, query, history=None, stream=False, api_key=None, secret_key=None, **kwargs):
    """
    Send a request to the Ernie Bot API with the given query and optional conversation history.
    Returns the chatbot response and updates the conversation history accordingly.

    :param query: The user's input/query to be sent to the Ernie Bot API.
    :param history: A list of dictionaries representing the conversation history,
    :param stream: Whether to use streaming mode when making the request. Currently not in use. Defaults to False.
    """
    debug = kwargs.get("debug", False)
    if debug:
        logger.debug(f"Query: {query}")
    response_json = self.predict(query, history, stream)
    return response_json, "output_1"

pipelines.pipelines.nodes.llm.history

TruncatedConversationHistory

This class represents a component that truncates conversation history to a specified maximum length.

Source code in pipelines/pipelines/nodes/llm/history.py
class TruncatedConversationHistory(BaseComponent):
    """This class represents a component that truncates conversation history to a specified maximum length."""

    outgoing_edges = 1

    def __init__(self, max_length):
        """Initializes the TruncatedConversationHistory class with the specified maximum length."""
        self.max_length = max_length

    def run(self, query, history=None):
        """Truncates the conversation history to the maximum allowed length, then returns the modified history along with the query."""
        if history is None:
            return {"query": query}, "output_1"
        for past_msg in history:
            if len(past_msg["content"]) > self.max_length:
                past_msg["content"] = past_msg["content"][: self.max_length]
        return {"query": query, "history": history}, "output_1"

__init__

__init__(max_length)

Initializes the TruncatedConversationHistory class with the specified maximum length.

Source code in pipelines/pipelines/nodes/llm/history.py
def __init__(self, max_length):
    """Initializes the TruncatedConversationHistory class with the specified maximum length."""
    self.max_length = max_length

run

run(query, history=None)

Truncates the conversation history to the maximum allowed length, then returns the modified history along with the query.

Source code in pipelines/pipelines/nodes/llm/history.py
def run(self, query, history=None):
    """Truncates the conversation history to the maximum allowed length, then returns the modified history along with the query."""
    if history is None:
        return {"query": query}, "output_1"
    for past_msg in history:
        if len(past_msg["content"]) > self.max_length:
            past_msg["content"] = past_msg["content"][: self.max_length]
    return {"query": query, "history": history}, "output_1"

pipelines.pipelines.nodes.llm.prompt_template

LLMPromptTemplate

Source code in pipelines/pipelines/nodes/llm/prompt_template.py
class LLMPromptTemplate(BaseComponent):
    outgoing_edges = 1

    def __init__(self, template):
        self.template = template

    def run(self, query=None, documents=None, history=None):
        if documents is not None:
            documents = [i.content for i in documents]
            context = "".join(documents)
            result = {"documents": context, "query": query}
        elif history is not None:
            chat_history = "\n".join(history)
            question = query
            result = {"chat_history": chat_history, "question": question}
        else:
            raise NotImplementedError("This prompt template is not implemented!")

        return {"query": self.template.format(**result)}, "output_1"