Asynchronous operations with redis-py
Use redis-py with asyncio for non-blocking Redis access
redis-py provides an asyncio-compatible API under the
redis.asyncio
namespace. It mirrors the synchronous client API, so most code patterns
translate directly — you await commands instead of calling them.
Use the async client for I/O-bound workloads, for integration with async web frameworks (such as FastAPI, Starlette, aiohttp, or Sanic, or when you need to run many concurrent Redis operations from a single process. For simple scripts, CPU-bound work, or codebases without an existing event loop, the synchronous client is usually a better choice.
The examples on the other pages in this section use the synchronous client, but you can translate any of them to async by following the rules in Translating sync examples below.
Basic connection
Import the async client from redis.asyncio and await each command.
Construction is synchronous and doesn't open a connection — the pool
establishes one lazily the first time you issue a command. Call aclose()
when you're done to release the underlying socket.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
The recommended pattern is to use the client as an async context manager,
which ensures aclose() runs even if an exception is raised:
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
Connection pools
For production usage, you should manage connections with a connection pool rather than opening and closing them individually. See Connection pools and multiplexing for more information about how this works.
A Redis client instance already creates and manages its own connection
pool internally, so in a long-running async application the recommended
pattern is to create a single client at startup, share it across requests
and tasks, and close it at shutdown. Avoid creating a new Redis() per
request — it defeats pooling and pays the connection cost on every call.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
Tune max_connections to the maximum number of concurrent Redis operations
you expect from the process. If you'd rather block on pool exhaustion than
raise an error, construct the client with a BlockingConnectionPool.
ConnectionPool across multiple Redis(connection_pool=...)
instances. Closing any one of those clients also closes the shared pool,
which silently invalidates the connections held by every other client using
it. Share the Redis client object instead.Awaiting commands
Every command method on the async client returns a coroutine — each call
must be awaited. Forgetting await returns a coroutine object instead of
a result, which is the most common async mistake to watch for.
Because each command is a coroutine, you can run several concurrently with
asyncio.gather():
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
Each in-flight command consumes one pooled connection while it's executing,
so size max_connections to match your peak concurrency. (If you instantiate
the client with single_connection_client=True, all commands serialize
through a single connection instead.)
Pipelines and transactions
Pipelines and transactions work the same way as in the synchronous client
(see Pipelines and transactions
for the conceptual background). The only difference is that you create the
pipeline inside an async with block and await pipe.execute().
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
WATCH/MULTI/EXEC for optimistic locking also has an async form.
watch() and execute() are coroutines; multi() remains synchronous
because it only toggles internal pipeline state.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
Pub/Sub
The async pub/sub object follows the same shape as the sync version. Call
await pubsub.subscribe(...) to register channels, then iterate messages
with async for message in pubsub.listen():. Use the PubSub object as an
async context manager (async with r.pubsub() as pubsub:) or call
await pubsub.aclose() explicitly to release the connection.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
A single PubSub object isn't safe to share across tasks — give each
consuming task its own subscription.
Cluster connections
To connect to a Redis cluster asynchronously, import RedisCluster from
redis.asyncio.cluster. The API matches the synchronous cluster client
(see Connect to a Redis cluster),
with await in front of each command.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
Cleanup and lifecycle
Always close clients and pools when you're done:
- Use
async with Redis(...) as r:whenever the client's lifetime fits a single scope. - For longer-lived clients, call
await r.aclose()explicitly. (The olderclose()method is deprecated.) - For frameworks with startup/shutdown hooks — for example FastAPI's
lifespan— create the client or pool at startup and close it at shutdown so connections aren't leaked between process restarts.
Cancellation and timeouts
You can cancel an in-flight command by wrapping it in asyncio.wait_for()
(or asyncio.timeout() on Python 3.11+). If the command is canceled
mid-flight, redis-py disconnects the underlying connection to avoid
response/request misalignment on subsequent reads. The next command
transparently picks up a new connection from the pool.
import asyncio
import redis.asyncio as redis
async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()
asyncio.run(basic_example())
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
asyncio.run(context_example())
async def shared_client_example():
# Create one client at startup. It owns its own internal pool,
# which is sized by max_connections.
r = redis.Redis(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
try:
# Share `r` across every request and task for the app's lifetime.
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
finally:
# Close the client at shutdown to release pooled connections.
await r.aclose()
asyncio.run(shared_client_example())
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3
asyncio.run(gather_example())
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']
asyncio.run(pipeline_example())
from redis.exceptions import WatchError
async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1
asyncio.run(watch_example())
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pubsub() as pubsub:
await pubsub.subscribe('channel-1')
async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break
reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
asyncio.run(pubsub_example())
from redis.asyncio.cluster import RedisCluster
async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()
asyncio.run(cluster_example())
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
# BLPOP blocks waiting for a value, so the timeout reliably fires.
await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1)
except asyncio.TimeoutError:
print('command canceled')
asyncio.run(timeout_example())
A Redis client instance is safe to share across tasks (the pool handles
concurrency), but stateful objects derived from it — pipelines and pub/sub
subscriptions — are not. Give each task its own pipeline or PubSub.
Translating sync examples
To adapt any synchronous example elsewhere in this guide to the async client, apply these rules:
- Replace
import rediswithimport redis.asyncio as redis. - Wrap the example in an
async deffunction and call it withasyncio.run(...). - Add
awaitin front of every command call. (Exception: buffered pipeline commands such aspipe.set(...)stay un-awaited; onlypipe.watch(...), any reads issued beforepipe.multi(), andpipe.execute()are awaited.) - Replace
with r.pipeline(...) as pipe:withasync with r.pipeline(...) as pipe:, andawait pipe.execute(). - Replace
r.close()withawait r.aclose(), or useasync with redis.Redis(...) as r:as a context manager.
More information
- The
redis-pyasyncio examples on Read the Docs cover further patterns. - See Error handling and Client-side geographic failover for resiliency patterns that apply to both sync and async clients.