函数调用
与聊天完成API类似,助理API支持函数调用。函数调用允许您向Assistants API描述函数,并让它智能返回需要调用的函数及其参数。
快速启动
在本例中,我们将创建一个天气助理,并定义两个函数,get_current_temperature和get_rain_probability,作为助理可以调用的工具。根据用户查询,如果使用我们在2023年11月6日或之后发布的最新模型,该模型将调用并行函数调用。
在我们使用并行函数调用的例子中,我们将询问助理今天旧金山的天气如何,以及下雨的可能性。我们还展示了如何通过流媒体输出助理的响应。
第1步:定义函数
在创建助理时,您将首先在助理tools参数下定义功能。
from openai import OpenAI
client = OpenAI()
assistant = client.beta.assistants.create(
  instructions="你是一个天气机器人。使用提供的函数来回答问题。",
  model="gpt-4o",
  tools=[
    {
      "type": "function",
      "function": {
        "name": "get_current_temperature",
        "description": "获取特定地点的当前温度",
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "获取特定地点(例如,旧金山,加利福尼亚州)的当前温度"
            },
            "unit": {
              "type": "string",
              "enum": ["Celsius", "Fahrenheit"],
              "description": "根据用户的所在地推断所使用的温度单位来获取特定地点的当前温度(例如,旧金山,加利福尼亚州)。"
            }
          },
          "required": ["location", "unit"]
        }
      }
    },
    {
      "type": "function",
      "function": {
        "name": "get_rain_probability",
        "description": "获取特定地点的降雨概率",
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "获取特定地点(例如,旧金山,加利福尼亚州)的降雨概率"
            }
          },
          "required": ["location"]
        }
      }
    }
  ]
)
第2步:创建线程并添加消息
当用户开始对话时创建一个线程,并在用户提问时将消息添加到线程中。
thread = client.beta.threads.create()
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="“旧金山今天的天气如何,以及降雨的可能性是多少?”"
)
第3步:启动运行
当您在包含触发一个或多个功能的用户消息的线程上启动运行时,运行将进入pending状态。处理后,运行将进入
requiresrequires_action状态,您可以通过检查运行status进行验证。这表明您需要运行工具并将其输出提交给助理
才能继续运行执行。在我们的案例中,我们将看到两个tool_calls,这表明用户查询导致并行函数调用。
{
  "id": "run_qJL1kI9xxWlfE0z1yfL0fGg9",
  ...
  "status": "requires_action",
  "required_action": {
    "submit_tool_outputs": {
      "tool_calls": [
        {
          "id": "call_FthC9qRpsL5kBpwwyw6c7j4k",
          "function": {
            "arguments": "{\"location\": \"San Francisco, CA\"}",
            "name": "get_rain_probability"
          },
          "type": "function"
        },
        {
          "id": "call_RpEDoB8O0FTL9JoKTuCVFOyR",
          "function": {
            "arguments": "{\"location\": \"San Francisco, CA\", \"unit\": \"Fahrenheit\"}",
            "name": "get_current_temperature"
          },
          "type": "function"
        }
      ]
    },
    ...
    "type": "submit_tool_outputs"
  }
}
使用流
对于流的情况,我们创建一个 EventHandler 类来处理响应流中的事件,并使用Python和Node SDK中的“提交工具输出流”助手同时提交所有工具输出。
from typing_extensions import override
from openai import AssistantEventHandler
class EventHandler(AssistantEventHandler):
  @override
  def on_event(self, event):
    # Retrieve events that are denoted with 'requires_action'
    # since these will have our tool_calls
    if event.event == 'thread.run.requires_action':
      run_id = event.data.id  # Retrieve the run ID from the event data
      self.handle_requires_action(event.data, run_id)
  def handle_requires_action(self, data, run_id):
    tool_outputs = []
    for tool in data.required_action.submit_tool_outputs.tool_calls:
      if tool.function.name == "get_current_temperature":
        tool_outputs.append({"tool_call_id": tool.id, "output": "57"})
      elif tool.function.name == "get_rain_probability":
        tool_outputs.append({"tool_call_id": tool.id, "output": "0.06"})
    # Submit all tool_outputs at the same time
    self.submit_tool_outputs(tool_outputs, run_id)
  def submit_tool_outputs(self, tool_outputs, run_id):
    # Use the submit_tool_outputs_stream helper
    with client.beta.threads.runs.submit_tool_outputs_stream(
      thread_id=self.current_run.thread_id,
      run_id=self.current_run.id,
      tool_outputs=tool_outputs,
      event_handler=EventHandler(),
    ) as stream:
      for text in stream.text_deltas:
        print(text, end="", flush=True)
      print()
with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  event_handler=EventHandler()
) as stream:
  stream.until_done()