Async Patterns
Memori works with async/await in both Python and TypeScript. This page covers patterns for the runtimes you're most likely to use.
When to Use Async
| Scenario | Python Async? | TypeScript Async? | Why |
|---|---|---|---|
| Web servers | Yes | Yes | Concurrent request handling |
| Chatbots with many users | Yes | Yes | Non-blocking I/O |
| CLI scripts | No | Yes (always) | TypeScript is always async |
| Jupyter notebooks | No | — | Event loop already running |
Basic Setup
Async Setup
import os
import asyncio
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI
engine = create_engine("sqlite:///memori.db")
SessionLocal = sessionmaker(bind=engine)
async def main():
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
mem = Memori(conn=SessionLocal).llm.register(client)
mem.attribution(entity_id="user_123", process_id="async_agent")
mem.config.storage.build()
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "I prefer async Python."}]
)
print(response.choices[0].message.content)
mem.augmentation.wait()
asyncio.run(main())
Works identically with AsyncAnthropic or other async clients — just swap the client.
Web Server Examples
FastAPI (Python)
import os
from fastapi import FastAPI
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI
app = FastAPI()
engine = create_engine("sqlite:///memori.db", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine)
Memori(conn=SessionLocal).config.storage.build()
class ChatRequest(BaseModel):
message: str
@app.post("/chat/{user_id}")
async def chat(user_id: str, req: ChatRequest):
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
mem = Memori(conn=SessionLocal).llm.register(client)
mem.attribution(entity_id=user_id, process_id="fastapi_async")
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": req.message}]
)
return {"response": response.choices[0].message.content}
Express (TypeScript)
In a long-running server, omit augmentation.wait() — augmentation continues in the background without blocking the response. Create a new Memori instance per request so each request gets its own attribution and session.
import 'dotenv/config';
import express from 'express';
import pg from 'pg';
import { OpenAI } from 'openai';
import { Memori } from '@memorilabs/memori';
const app = express();
app.use(express.json());
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const client = new OpenAI();
// Run once on startup
const bootstrapMem = new Memori({ conn: () => pool });
if (!bootstrapMem.config.storage) {
throw new Error('Storage not initialized');
}
await bootstrapMem.config.storage.build();
app.post('/chat/:userId', async (req, res) => {
const mem = new Memori({ conn: () => pool }).llm.register(client);
mem.attribution(req.params.userId, 'express-chat');
const response = await client.chat.completions.create({
model: 'gpt-4.1-mini',
messages: [{ role: 'user', content: req.body.message }],
});
// Don't await augmentation — let it run in the background
res.json({ response: response.choices[0]?.message?.content });
});
app.listen(3000);
Fastify (TypeScript)
import 'dotenv/config';
import Fastify from 'fastify';
import pg from 'pg';
import { OpenAI } from 'openai';
import { Memori } from '@memorilabs/memori';
const fastify = Fastify();
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const client = new OpenAI();
fastify.addHook('onReady', async () => {
const mem = new Memori({ conn: () => pool });
if (!mem.config.storage) {
throw new Error('Storage not initialized');
}
await mem.config.storage.build();
});
fastify.post<{ Params: { userId: string }; Body: { message: string } }>(
'/chat/:userId',
async (request, reply) => {
const mem = new Memori({ conn: () => pool }).llm.register(client);
mem.attribution(request.params.userId, 'fastify-chat');
const response = await client.chat.completions.create({
model: 'gpt-4.1-mini',
messages: [{ role: 'user', content: request.body.message }],
});
return { response: response.choices[0]?.message?.content };
}
);
await fastify.listen({ port: 3000 });
Thread Safety (Python)
| Pattern | Safe? | Why |
|---|---|---|
conn=SessionLocal (factory) | Yes | New session per operation |
conn=lambda: existing_session | No | Shares one session |
For production async apps, use PostgreSQL with larger pools:
engine = create_engine(
"postgresql+psycopg://user:pass@host/db",
pool_pre_ping=True,
pool_size=20,
max_overflow=40,
pool_recycle=300
)
Connection Factory Pattern (TypeScript)
The conn option takes a factory function — not a connection directly. Memori calls it once per StorageManager instance to borrow the connection.
| Pattern | Safe? | Why |
|---|---|---|
conn: () => pool | Yes | Pool manages concurrent borrows internally |
conn: () => db | Yes | Pre-opened Database; pass by reference, not recreated each call |
conn: () => sharedClient | No | Single client shared across concurrent calls |
When to Call augmentation.wait()
| Context | Python | TypeScript |
|---|---|---|
| Short-lived script | mem.augmentation.wait() | await mem.augmentation.wait() |
| Web server | Not needed | Not needed |
| Test suite | mem.augmentation.wait() | await mem.augmentation.wait() |
| Serverless function | mem.augmentation.wait() | await mem.augmentation.wait() |