Python मध्ये Async Redis Client + Django कसे वापरावे?
मी माझ्या Django ऍप्लिकेशनमध्ये वापरण्यासाठी Redis वापरून वितरित सेमाफोर तयार करण्याचा प्रयत्न करत आहे. हे API वर समवर्ती विनंत्या मर्यादित करण्यासाठी आहे. मी redis-py मध्ये asyncio वापरत आहे. तथापि, मी विनंत्यांमध्ये सामायिक करण्यासाठी एक कनेक्शन पूल तयार करू इच्छितो कारण मला “क्लायंटची कमाल संख्या गाठली” त्रुटी प्राप्त होत आहे. म्हणून, मी सामायिक कनेक्शन पूल तयार केला settings.py
जो मी माझ्या सेमाफोर वर्गात वापरतो. तथापि, मला नंतर एक त्रुटी येते got Future attached to a different loop
जेव्हा मी समवर्ती विनंत्या करतो. हा माझा कोड आहे:
import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis
STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16
class SemaphoreTimeoutError(Exception):
"""Exception raised when a semaphore acquisition times out."""
def __init__(self, message: str) -> None:
super().__init__(message)
class RedisSemaphore:
def __init__(
self,
key: str,
max_locks: int,
timeout: int = 30,
wait_timeout: int = 30,
) -> None:
"""
Initialize the RedisSemaphore.
:param redis_url: URL of the Redis server.
:param key: Redis key for the semaphore.
:param max_locks: Maximum number of concurrent locks.
:param timeout: How long until the lock should automatically be timed out in seconds.
:param wait_timeout: How long to wait before aborting attempting to acquire a lock.
"""
self.redis_url = os.environ["REDIS_URL"]
self.key = key
self.max_locks = max_locks
self.timeout = timeout
self.wait_timeout = wait_timeout
self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
self.identifier = "Only identifier"
async def acquire(self) -> str:
"""
Acquire a lock from the semaphore.
:raises SemaphoreTimeoutError: If the semaphore acquisition times out.
:return: The identifier for the acquired semaphore.
"""
czset = f"{self.key}:owner"
ctr = f"{self.key}:counter"
identifier = str(uuid.uuid4())
now = time.time()
start_time = now
backoff = STARTING_BACKOFF_S
while True:
# TODO: Redundant?
if time.time() - start_time > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
pipe.zinterstore(czset, {czset: 1, self.key: 0})
pipe.incr(ctr)
counter = (await pipe.execute())[-1]
pipe.zadd(self.key, {identifier: now})
pipe.zadd(czset, {identifier: counter})
pipe.zrank(czset, identifier)
rank = (await pipe.execute())[-1]
print(rank)
if rank < self.max_locks:
return identifier
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
await pipe.execute()
# Exponential backoff with randomness
sleep_time = backoff * (1 + random.random() * 0.3)
if (sleep_time + time.time() - start_time) > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
await asyncio.sleep(sleep_time)
backoff = min(backoff * 2, MAX_BACKOFF_S)
async def release(self, identifier: str) -> bool:
"""
Release a lock from the semaphore.
:param identifier: The identifier for the lock to be released.
:return: True if the semaphore was properly released, False if it had timed out.
"""
czset = f"{self.key}:owner"
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
result = await pipe.execute()
return result[0] > 0
class RedisSemaphoreContext:
def __init__(self, semaphore: RedisSemaphore) -> None:
"""
Initialize the RedisSemaphoreContext.
:param semaphore: An instance of RedisSemaphore.
"""
self.semaphore = semaphore
self.identifier = None
async def __aenter__(self) -> "RedisSemaphoreContext":
"""Enter the async context manager."""
self.identifier = await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the async context manager."""
await self.semaphore.release(self.identifier)
जे मी माझ्या ADRF असिंक्रोनस दृश्यांमध्ये वापरतो.
मी काय चूक करत आहे? हे शक्य आहे का?
Leave a Reply