Rate Limits
Rate limits are implemented to ensure fair usage of the OpenTyphoon.ai API and to maintain service quality for all users. This page explains our rate limit policy, limits by model, and how to handle rate limit errors.
Rate Limit Policies
Each API request is counted against your rate limits. We enforce two types of rate limits:
- Requests per second (RPS): The maximum number of requests you can make each second
- Requests per minute (RPM): The maximum number of requests you can make each minute
These limits vary by model, with larger models typically having more restrictive limits due to their higher computational requirements.
Current Rate Limits
Below are the current rate limits for each model:
Model | Requests per Second | Requests per Minute |
---|---|---|
typhoon-v2-8b-instruct | 5 | 50 |
typhoon-v2-70b-instruct | 5 | 50 |
Handling Rate Limit Errors
When you exceed your rate limits, the API will return a 429 Too Many Requests
error. The response will include information about when you can retry your request.
Example rate limit error response:
{ "error": { "message": "Rate limit exceeded. Please retry after 2024-01-01T12:00:00Z", "type": "rate_limit_error", "param": null, "code": "rate_limit_exceeded" }}
Best Practices for Handling Rate Limits
To work within rate limits effectively:
- Implement exponential backoff: When you receive a rate limit error, wait before retrying, and increase the wait time exponentially with each subsequent error.
import timeimport randomfrom openai import OpenAI, RateLimitError
client = OpenAI( api_key="<YOUR_API_KEY>", base_url="https://api.opentyphoon.ai/v1")
def call_api_with_backoff(max_retries=5): retries = 0 while retries < max_retries: try: response = client.chat.completions.create( model="typhoon-v2-70b-instruct", messages=[{"role": "user", "content": "Hello"}], ) return response except RateLimitError as e: retries += 1 if retries >= max_retries: raise e
# Calculate backoff time: 2^retries + random jitter backoff_time = (2 ** retries) + random.random() print(f"Rate limit exceeded. Retrying in {backoff_time:.2f} seconds...") time.sleep(backoff_time)
- Queue and batch requests: Instead of sending API requests immediately, queue them and send them at a controlled rate.
import timeimport queueimport threading
class APIRequestQueue: def __init__(self, requests_per_minute): self.queue = queue.Queue() self.requests_per_minute = requests_per_minute self.interval = 60 / requests_per_minute self.lock = threading.Lock() self.last_request_time = 0
def add_request(self, messages): self.queue.put(messages)
def process_queue(self): while True: messages = self.queue.get() self._make_request(messages) self.queue.task_done()
def _make_request(self, messages): with self.lock: # Ensure we don't exceed rate limits current_time = time.time() time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.interval: sleep_time = self.interval - time_since_last_request time.sleep(sleep_time)
# Make the API request try: response = client.chat.completions.create( model="typhoon-v2-70b-instruct", messages=messages ) print(f"Response: {response.choices[0].message.content}") except Exception as e: print(f"Error: {e}")
self.last_request_time = time.time()
- Monitor your usage: Keep track of your API usage to ensure you stay within limits.
Tips for Working with Rate Limits
-
Choose the right model: If you’re experiencing rate limit issues, consider using a smaller model with higher throughput limits.
-
Optimize your prompts: More efficient prompts can reduce the number of API calls needed to achieve your goals.
-
Cache responses: For identical requests, cache the responses to avoid unnecessary API calls.
import hashlibimport jsonimport pickleimport os
class SimpleCache: def __init__(self, cache_file="typhoon_cache.pkl"): self.cache_file = cache_file self.cache = {} self._load_cache()
def _load_cache(self): if os.path.exists(self.cache_file): with open(self.cache_file, "rb") as f: self.cache = pickle.load(f)
def _save_cache(self): with open(self.cache_file, "wb") as f: pickle.dump(self.cache, f)
def get_response(self, model, messages, **kwargs): # Create a unique key from the request parameters key_dict = { "model": model, "messages": messages, **kwargs } key = hashlib.md5(json.dumps(key_dict, sort_keys=True).encode()).hexdigest()
# Check if we have a cached response if key in self.cache: print("Using cached response") return self.cache[key]
# Make the API call response = client.chat.completions.create( model=model, messages=messages, **kwargs )
# Cache the response self.cache[key] = response self._save_cache()
return response
-
Implement request pooling: Group multiple user queries together into a single API call when appropriate.
-
Use streaming wisely: Streaming responses count as a single request but allow you to start processing the response sooner.
Request for Higher Rate Limits
If you need higher rate limits for your production application, please contact us at contact@opentyphoon.ai with details about your use case, expected volume, and requirements.