发布于 

测试API并发

测试API并发

import requests
import threading

class gpt:
def __init__(self, api_keys, urls):
self.api_keys = api_keys
self.urls = urls

def send_request(self, messages, thread_id=None, api_num=0):
proxies = {
"http": "http://127.0.0.1:7890",
"https": "http://127.0.0.1:7890"
}
url = self.urls[api_num]
api_key = self.api_keys[api_num]
parameters = {
"model": "gpt-3.5-turbo",
"messages": messages
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
response = requests.post(url, headers=headers, json=parameters, proxies=proxies)
if response.status_code == 200:
data = response.json()
text = data["choices"][0]["message"]
if thread_id is not None:
print(f"Thread {thread_id} using API {api_num + 1}: {text['content']}")
return text
else:
print(response)
return "Sorry, something went wrong."

def test_concurrency(self, num_threads):
threads = []
for i in range(num_threads):
api_num = i % len(self.api_keys) # This will cycle through all the APIs
t = threading.Thread(target=self.send_request, args=([{"role": "user", "content": f"Thread {i} message"}], i, api_num))
threads.append(t)
t.start()
for t in threads:
t.join()

if __name__ == '__main__':
num_apis = int(input("Enter the number of APIs: "))
api_keys = []
urls = []
for i in range(num_apis):
api_key = input(f"Enter API key {i + 1}: ")
api_keys.append(api_key)
url = input(f"Enter URL for API {i + 1}: ")
urls.append(url)

g = gpt(api_keys, urls)
num_threads = int(input("Enter the number of threads: "))
g.test_concurrency(num_threads)