Python SDK

Python Rate Limiting That Works Across Processes

Python has no shortage of rate limiting libraries — ratelimit, limits, slowapi. They work well for a single process. But when you scale to Celery workers, FastAPI replicas, or any multi-process setup, per-process counters silently multiply your effective rate. The ratequeue package is built for this case.

Install

pip install ratequeue

Basic Usage

The core API is an async context manager. While inside the async with block, your request has an active slot. When the block exits — normally or on exception — the slot is released.

import ratequeue.aio as rq
import os

RATEQUEUE_API_KEY = os.environ["RATEQUEUE_API_KEY"]

async def call_openai(messages: list, priority: int = 10):
    async with rq.acquire(
        "openai-gpt4",
        load=estimate_tokens(messages),  # counts against token budget
        priority=priority,
        api_key=RATEQUEUE_API_KEY
    ):
        return await openai_client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )

resource_name

Identifies which resource to acquire. Matches the resource you created in the dashboard.

load

The weight of this request against load-based limits. Use token count, bytes, or any numeric unit.

priority

Higher values are served first when capacity is constrained. Default is 10.

lane

Isolate traffic into separate queue segments to prevent one class from starving others.

Celery Integration

Run 20 Celery workers with this task — they all coordinate against the same RateQueue resource. No per-worker configuration, no Redis coordination layer to manage.

from celery import Celery
import ratequeue.aio as rq
import asyncio
import os

app = Celery("tasks")
RATEQUEUE_API_KEY = os.environ["RATEQUEUE_API_KEY"]

@app.task
def enrich_record(record_id: str, priority: int = 10):
    async def _run():
        async with rq.acquire(
            "openai",
            load=2000,
            priority=priority,
            api_key=RATEQUEUE_API_KEY
        ):
            return await openai_client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": get_prompt(record_id)}]
            )
    return asyncio.run(_run())

FastAPI Integration

from fastapi import FastAPI
import ratequeue.aio as rq
import os

app = FastAPI()
RATEQUEUE_API_KEY = os.environ["RATEQUEUE_API_KEY"]

@app.post("/generate")
async def generate(prompt: str):
    async with rq.acquire(
        "openai-gpt4",
        load=len(prompt) * 2,  # rough token estimate
        priority=100,           # user-facing, high priority
        lane="user-facing",
        api_key=RATEQUEUE_API_KEY
    ):
        response = await openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return {"text": response.choices[0].message.content}

Works with Any Async Framework

asyncio, FastAPI, Starlette, aiohttp, Celery (via asyncio.run), Airflow operators — anything that supports async/await. Synchronous support available for non-async codebases.

Install ratequeue and coordinate your first API call

Create a free resource, wrap your first API call, and see distributed rate limiting work across all your workers.