Lightweight, concurrency-safe credential orchestration for AI API systems.
KeyMesh is a high-performance, framework-agnostic runtime designed to multiplex multiple API keys (e.g., OpenAI, Anthropic, Gemini) across highly concurrent workloads. It maximizes aggregate throughput by managing rate limits, cooldowns, and scheduling strategiesβacting purely as a routing scheduler and cooldown manager.
- π Maximized Throughput: Pool multiple lower-tier keys to act as a single high-throughput endpoint.
- π‘οΈ Concurrency Safe: Native
asyncioand multi-threaded synchronous support with granular locks for high-frequency safe acquisition. - π Sync & Async Native: Identical features available in both async-first runtimes and standard synchronous/threaded architectures.
- π Pluggable Schedulers: Choose between
RoundRobin,LeastBusy, orWeightedstrategies. - βοΈ Smart Cooldowns: Automatically detects rate limits (
HTTP 429), parsesRetry-Afterheaders, and temporarily cools down keys. - π Health Monitoring: Tracks latency using Exponential Moving Average (EMA), success rates, and consecutive failures to prune dead credentials.
- πΎ Flexible Storage: Memory and JSON persistent backends for both async (
MemoryStorage,JSONStorage) and sync (SyncMemoryStorage,SyncJSONStorage) runtimes. - π Zero Heavy Couplings: No hard runtime dependencies on specific client SDKs. Integrates natively via HTTP client adapters.
KeyMesh is optimized for the uv package manager.
# Core package
uv add keymesh
pip install keymesh
# With OpenAI SDK integration support
uv add keymesh --optional openai
pip install keymesh[openai]The easiest, most robust way to integrate KeyMesh with the OpenAI SDK is using the built-in OpenAIHandler and AsyncOpenAIHandler.
These handlers subclass httpx.Client and httpx.AsyncClient respectively. When passed directly into the OpenAI SDK client constructor as the http_client, they intercept outgoing requests transparently to:
- Acquire a key from the pool automatically before the request starts.
- Inject the key dynamically into the request's
Authorizationheader. - Measure the latency of the request and record it on the key's stats upon success.
- Cool down the key if the server returns
HTTP 429(automatically parsing theRetry-Afterheader if present). - Prune / Mark Failed the key if connection errors or exceptions occur during transmission.
Important
This approach keeps your code clean. You do not need to call pool.acquire(), pool.release(), or handle try/except blocks around key status updates manually. KeyMesh manages everything at the HTTP transport layer!
import asyncio
from openai import AsyncOpenAI
from keymesh import AsyncOpenAIHandler, SchedulerStrategy
async def main():
# 1. Initialize the AsyncOpenAIHandler with your keys
handler = AsyncOpenAIHandler(
keys=["sk-key-1", "sk-key-2", "sk-key-3"],
strategy=SchedulerStrategy.LEAST_BUSY,
default_cooldown=60.0
)
# 2. Pass the handler directly as the http_client to AsyncOpenAI
client = AsyncOpenAI(
api_key="dummy-key", # The dummy value is overridden dynamically per-request
http_client=handler
)
try:
# 3. Call the SDK normally! Key rotation & state management is 100% transparent.
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello KeyMesh Async!"}]
)
print(f"Response: {response.choices[0].message.content}")
finally:
# 4. Gracefully close the handler to persist metrics/storage
await handler.aclose()
asyncio.run(main())from openai import OpenAI
from keymesh import OpenAIHandler, SchedulerStrategy
def main():
# 1. Initialize the thread-safe OpenAIHandler
handler = OpenAIHandler(
keys=["sk-key-1", "sk-key-2", "sk-key-3"],
strategy=SchedulerStrategy.ROUND_ROBIN
)
# 2. Pass the handler directly as the http_client to OpenAI
client = OpenAI(
api_key="dummy-key",
http_client=handler
)
try:
# 3. Use the SDK as usual
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello KeyMesh Sync!"}]
)
print(f"Response: {response.choices[0].message.content}")
finally:
# 4. Gracefully close the handler
handler.close()
if __name__ == "__main__":
main()If you are using a custom HTTP client, a different LLM SDK (like Anthropic, Gemini, or Cohere), or need manual control over the lifecycle of your credentials, you can interface directly with KeyPool or SyncKeyPool.
Warning
Strict Concurrency Rule: Never mutate a shared client's API key globally (e.g. client.api_key = key) in concurrent loops as it causes race conditions. Instead, use one of the patterns below to scope the key to the request context.
Modern SDKs support copying a client configuration with a overridden API key while sharing the underlying connection pool.
# Async
key = await pool.acquire()
start = time.monotonic()
try:
scoped_client = client.with_options(api_key=key)
response = await scoped_client.chat.completions.create(...)
await pool.release(key, latency=time.monotonic() - start)
except Exception:
await pool.mark_failed(key)
raisePass the key as an HTTP header directly in the API call, bypassing global client state.
key = await pool.acquire()
start = time.monotonic()
try:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Query"}],
extra_headers={"Authorization": f"Bearer {key}"}
)
await pool.release(key, latency=time.monotonic() - start)
except Exception:
await pool.mark_failed(key)
raiseEncapsulate the acquire/release/fail lifecycle into a clean Python context manager:
import time
import contextlib
@contextlib.asynccontextmanager
async def key_lifecycle(pool: KeyPool):
key = await pool.acquire()
start = time.monotonic()
try:
yield key
await pool.release(key, latency=time.monotonic() - start)
except Exception:
await pool.mark_failed(key)
raise
# Usage
async with key_lifecycle(pool) as key:
scoped_client = client.with_options(api_key=key)
response = await scoped_client.chat.completions.create(...)KeyMesh follows a modular, thread-safe, and async-safe design:
- KeyPool / SyncKeyPool: The central async / sync orchestrators.
- Scheduler: Stateless selection logic for choosing the next key (e.g.
RoundRobin,LeastBusy,Weighted). - KeyState / SyncKeyState: Lock-guarded runtime diagnostics tracking per API key (failures, latency average, cooldown timers, active requests).
- Storage: Pluggable persistence layers (In-Memory or JSON-backed) for both asynchronous and synchronous runtimes.
This project uses uv for development.
# Install dependencies
uv sync
# Run tests
uv run pytest
# Lint and Format
uv run ruff check .
uv run mypy .MIT License. See LICENSE for details.