Python

Python Rate Limiting That Works Across Workers

Python's ecosystem has plenty of per-process rate limiters — ratelimit, slowapi, limits. They work fine for a single process. But when you run Celery workers, FastAPI instances, or async services at scale, each process has its own counter. The shared API key doesn't.

The Single-Process Trap

Decorators like @sleep_and_retry and @limits track state in memory — state that only exists within that one process. Run 4 Celery workers and you have 4 independent counters, each thinking it owns the full limit. Collectively, they're at 4x the limit, and none of them know it.

Scaling workers makes the problem worse, not better. The solution requires a counter that lives outside any single process — one that all workers read from and write to atomically.

RateQueue's Python SDK

The ratequeue.aio package is an async context manager. Call it from any number of processes — they all coordinate against the same limit automatically.

import ratequeue.aio as rq
import os

RATEQUEUE_API_KEY = os.environ["RATEQUEUE_API_KEY"]

async def call_openai(messages: list, priority: int = 10):
    async with rq.acquire(
        "openai-gpt4",
        load=estimate_tokens(messages),
        priority=priority,
        api_key=RATEQUEUE_API_KEY
    ):
        return await openai_client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )

Call this from 10 workers simultaneously — they'll all coordinate against the same limit. No shared memory, no inter-process communication.

Celery Example

Celery tasks are synchronous by default but can run async code. Wrap the async RateQueue call in asyncio.run():

from celery import Celery
import ratequeue.aio as rq
import asyncio

app = Celery("tasks")

@app.task
def process_with_openai(prompt: str, priority: int = 10):
    async def _run():
        async with rq.acquire("openai", load=2000, priority=priority, api_key=RATEQUEUE_API_KEY):
            return await openai_client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": prompt}]
            )
    return asyncio.run(_run())

Every Celery worker that processes this task will coordinate through the same RateQueue resource, regardless of how many workers are running.

Drop-In for Any Python HTTP Framework

Works with any Python code that makes outbound API calls:

FastAPI
Flask
Django
Celery
asyncio scripts
Airflow operators
LangChain agents
Background jobs
Data pipelines

The ratequeue.aio package is async-first. For synchronous codebases, a sync wrapper is also available — the same coordination, without needing an event loop.

Coordinate Python workers in minutes

Install the SDK, create a resource, and replace your per-process rate limiter with one that actually works at scale.