ข้อจำกัดอัตราการใช้งาน
ข้อจำกัดอัตราการใช้งานถูกนำมาใช้เพื่อให้แน่ใจว่ามีการใช้งาน OpenTyphoon.ai API อย่างเป็นธรรมและเพื่อรักษาคุณภาพบริการสำหรับผู้ใช้ทุกคน หน้านี้อธิบายนโยบายข้อจำกัดอัตราการใช้งานของเรา, ข้อจำกัดตามโมเดล และวิธีจัดการกับข้อผิดพลาดเกี่ยวกับข้อจำกัดอัตราการใช้งาน
นโยบายข้อจำกัดอัตราการใช้งาน
การร้องขอ API แต่ละครั้งถูกนับรวมเข้ากับข้อจำกัดอัตราการใช้งานของคุณ เราบังคับใช้ข้อจำกัดอัตราการใช้งานสองประเภท:
- คำขอต่อวินาที (RPS): จำนวนคำขอสูงสุดที่คุณสามารถทำได้ในแต่ละวินาที
- คำขอต่อนาที (RPM): จำนวนคำขอสูงสุดที่คุณสามารถทำได้ในแต่ละนาที
ข้อจำกัดเหล่านี้แตกต่างกันไปตามโมเดล โดยโมเดลที่ใหญ่กว่ามักมีข้อจำกัดที่เข้มงวดมากกว่าเนื่องจากความต้องการด้านการคำนวณที่สูงกว่า
ข้อจำกัดอัตราการใช้งานปัจจุบัน
ด้านล่างเป็นข้อจำกัดอัตราการใช้งานปัจจุบันสำหรับแต่ละโมเดล:
โมเดล | คำขอต่อวินาที | คำขอต่อนาที |
---|---|---|
typhoon-v2-8b-instruct | 5 | 50 |
typhoon-v2-70b-instruct | 5 | 50 |
การจัดการกับข้อผิดพลาดเกี่ยวกับข้อจำกัดอัตราการใช้งาน
เมื่อคุณเกินข้อจำกัดอัตราการใช้งาน API จะส่งคืนข้อผิดพลาด 429 Too Many Requests
การตอบกลับจะรวมข้อมูลเกี่ยวกับเวลาที่คุณสามารถลองร้องขอใหม่
ตัวอย่างการตอบกลับข้อผิดพลาดเกี่ยวกับข้อจำกัดอัตราการใช้งาน:
{ "error": { "message": "เกินข้อจำกัดอัตราการใช้งาน โปรดลองใหม่หลังจาก 2024-01-01T12:00:00Z", "type": "rate_limit_error", "param": null, "code": "rate_limit_exceeded" }}
แนวทางปฏิบัติที่ดีที่สุดสำหรับการจัดการกับข้อจำกัดอัตราการใช้งาน
เพื่อทำงานภายในข้อจำกัดอัตราการใช้งานอย่างมีประสิทธิภาพ:
- ใช้การถอยหลังแบบเอกซ์โพเนนเชียล: เมื่อคุณได้รับข้อผิดพลาดเกี่ยวกับข้อจำกัดอัตราการใช้งาน ให้รอก่อนที่จะลองใหม่ และเพิ่มเวลารอแบบเอกซ์โพเนนเชียลในแต่ละครั้งที่เกิดข้อผิดพลาดต่อเนื่อง
import timeimport randomfrom openai import OpenAI, RateLimitError
client = OpenAI( api_key="<YOUR_API_KEY>", base_url="https://api.opentyphoon.ai/v1")
def call_api_with_backoff(max_retries=5): retries = 0 while retries < max_retries: try: response = client.chat.completions.create( model="typhoon-v2-70b-instruct", messages=[{"role": "user", "content": "Hello"}], ) return response except RateLimitError as e: retries += 1 if retries >= max_retries: raise e
# คำนวณเวลาถอยหลัง: 2^retries + ค่าสุ่ม backoff_time = (2 ** retries) + random.random() print(f"เกินข้อจำกัดอัตราการใช้งาน กำลังลองใหม่ใน {backoff_time:.2f} วินาที...") time.sleep(backoff_time)
- จัดคิวและรวมคำขอเป็นกลุ่ม: แทนที่จะส่งคำขอ API ทันที ให้จัดคิวและส่งในอัตราที่ควบคุมได้
import timeimport queueimport threading
class APIRequestQueue: def __init__(self, requests_per_minute): self.queue = queue.Queue() self.requests_per_minute = requests_per_minute self.interval = 60 / requests_per_minute self.lock = threading.Lock() self.last_request_time = 0
def add_request(self, messages): self.queue.put(messages)
def process_queue(self): while True: messages = self.queue.get() self._make_request(messages) self.queue.task_done()
def _make_request(self, messages): with self.lock: # ตรวจสอบให้แน่ใจว่าไม่เกินข้อจำกัดอัตราการใช้งาน current_time = time.time() time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.interval: sleep_time = self.interval - time_since_last_request time.sleep(sleep_time)
# ทำคำขอ API try: response = client.chat.completions.create( model="typhoon-v2-70b-instruct", messages=messages ) print(f"การตอบกลับ: {response.choices[0].message.content}") except Exception as e: print(f"ข้อผิดพลาด: {e}")
self.last_request_time = time.time()
- ตรวจสอบการใช้งานของคุณ: ติดตามการใช้งาน API ของคุณเพื่อให้แน่ใจว่าคุณอยู่ภายในข้อจำกัด
เคล็ดลับสำหรับการทำงานกับข้อจำกัดอัตราการใช้งาน
-
เลือกโมเดลที่เหมาะสม: หากคุณกำลังประสบปัญหาเกี่ยวกับข้อจำกัดอัตราการใช้งาน ให้พิจารณาใช้โมเดลที่เล็กกว่าที่มีข้อจำกัดปริมาณงานที่สูงกว่า
-
ปรับคำพรอมต์ของคุณให้เหมาะสม: คำพรอมต์ที่มีประสิทธิภาพมากขึ้นสามารถลดจำนวนการเรียก API ที่จำเป็นในการบรรลุเป้าหมายของคุณ
-
แคชการตอบกลับ: สำหรับคำขอที่เหมือนกัน ให้แคชการตอบกลับเพื่อหลีกเลี่ยงการเรียก API ที่ไม่จำเป็น
import hashlibimport jsonimport pickleimport os
class SimpleCache: def __init__(self, cache_file="typhoon_cache.pkl"): self.cache_file = cache_file self.cache = {} self._load_cache()
def _load_cache(self): if os.path.exists(self.cache_file): with open(self.cache_file, "rb") as f: self.cache = pickle.load(f)
def _save_cache(self): with open(self.cache_file, "wb") as f: pickle.dump(self.cache, f)
def get_response(self, model, messages, **kwargs): # สร้างคีย์ที่ไม่ซ้ำกันจากพารามิเตอร์คำขอ key_dict = { "model": model, "messages": messages, **kwargs } key = hashlib.md5(json.dumps(key_dict, sort_keys=True).encode()).hexdigest()
# ตรวจสอบว่าเรามีการตอบกลับที่แคชไว้หรือไม่ if key in self.cache: print("ใช้การตอบกลับที่แคชไว้") return self.cache[key]
# ทำการเรียก API response = client.chat.completions.create( model=model, messages=messages, **kwargs )
# แคชการตอบกลับ self.cache[key] = response self._save_cache()
return response
-
ใช้การรวมคำขอ: จัดกลุ่มคำถามของผู้ใช้หลายรายการเข้าด้วยกันเป็นการเรียก API เพียงครั้งเดียวเมื่อเหมาะสม
-
ใช้การสตรีมอย่างชาญฉลาด: การตอบกลับแบบสตรีมนับเป็นคำขอเพียงครั้งเดียวแต่ช่วยให้คุณเริ่มประมวลผลการตอบกลับได้เร็วขึ้น
การขอข้อจำกัดอัตราการใช้งานที่สูงขึ้น
หากคุณต้องการข้อจำกัดอัตราการใช้งานที่สูงขึ้นสำหรับแอปพลิเคชันการผลิตของคุณ โปรดติดต่อเราที่ contact@opentyphoon.ai พร้อมรายละเอียดเกี่ยวกับกรณีการใช้งานของคุณ, ปริมาณที่คาดหวัง และความต้องการ