Python मध्ये Async Redis Client + Django कसे वापरावे?

Python मध्ये Async Redis Client + Django कसे वापरावे?

मी माझ्या Django ऍप्लिकेशनमध्ये वापरण्यासाठी Redis वापरून वितरित सेमाफोर तयार करण्याचा प्रयत्न करत आहे. हे API वर समवर्ती विनंत्या मर्यादित करण्यासाठी आहे. मी redis-py मध्ये asyncio वापरत आहे. तथापि, मी विनंत्यांमध्ये सामायिक करण्यासाठी एक कनेक्शन पूल तयार करू इच्छितो कारण मला “क्लायंटची कमाल संख्या गाठली” त्रुटी प्राप्त होत आहे. म्हणून, मी सामायिक कनेक्शन पूल तयार केला settings.py जो मी माझ्या सेमाफोर वर्गात वापरतो. तथापि, मला नंतर एक त्रुटी येते got Future attached to a different loop जेव्हा मी समवर्ती विनंत्या करतो. हा माझा कोड आहे:

import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis

STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16


class SemaphoreTimeoutError(Exception):
    """Exception raised when a semaphore acquisition times out."""

    def __init__(self, message: str) -> None:
        super().__init__(message)


class RedisSemaphore:
    def __init__(
        self,
        key: str,
        max_locks: int,
        timeout: int = 30,
        wait_timeout: int = 30,
    ) -> None:
        """
        Initialize the RedisSemaphore.

        :param redis_url: URL of the Redis server.
        :param key: Redis key for the semaphore.
        :param max_locks: Maximum number of concurrent locks.
        :param timeout: How long until the lock should automatically be timed out in seconds.
        :param wait_timeout: How long to wait before aborting attempting to acquire a lock.
        """
        self.redis_url = os.environ["REDIS_URL"]
        self.key = key
        self.max_locks = max_locks
        self.timeout = timeout
        self.wait_timeout = wait_timeout
        self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
        self.identifier = "Only identifier"

    async def acquire(self) -> str:
        """
        Acquire a lock from the semaphore.

        :raises SemaphoreTimeoutError: If the semaphore acquisition times out.
        :return: The identifier for the acquired semaphore.
        """
        czset = f"{self.key}:owner"
        ctr = f"{self.key}:counter"
        identifier = str(uuid.uuid4())
        now = time.time()
        start_time = now
        backoff = STARTING_BACKOFF_S

        while True:
            # TODO: Redundant?
            if time.time() - start_time > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")

            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
                pipe.zinterstore(czset, {czset: 1, self.key: 0})
                pipe.incr(ctr)
                counter = (await pipe.execute())[-1]

                pipe.zadd(self.key, {identifier: now})
                pipe.zadd(czset, {identifier: counter})
                pipe.zrank(czset, identifier)
                rank = (await pipe.execute())[-1]

                print(rank)
                if rank < self.max_locks:
                    return identifier

                pipe.zrem(self.key, identifier)
                pipe.zrem(czset, identifier)
                await pipe.execute()

            # Exponential backoff with randomness
            sleep_time = backoff * (1 + random.random() * 0.3)
            if (sleep_time + time.time() - start_time) > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
            await asyncio.sleep(sleep_time)
            backoff = min(backoff * 2, MAX_BACKOFF_S)

    async def release(self, identifier: str) -> bool:
        """
        Release a lock from the semaphore.

        :param identifier: The identifier for the lock to be released.
        :return: True if the semaphore was properly released, False if it had timed out.
        """
        czset = f"{self.key}:owner"
        async with self.redis.pipeline(transaction=True) as pipe:
            pipe.zrem(self.key, identifier)
            pipe.zrem(czset, identifier)
            result = await pipe.execute()
        return result[0] > 0


class RedisSemaphoreContext:
    def __init__(self, semaphore: RedisSemaphore) -> None:
        """
        Initialize the RedisSemaphoreContext.

        :param semaphore: An instance of RedisSemaphore.
        """
        self.semaphore = semaphore
        self.identifier = None

    async def __aenter__(self) -> "RedisSemaphoreContext":
        """Enter the async context manager."""
        self.identifier = await self.semaphore.acquire()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the async context manager."""
        await self.semaphore.release(self.identifier)

जे मी माझ्या ADRF असिंक्रोनस दृश्यांमध्ये वापरतो.

मी काय चूक करत आहे? हे शक्य आहे का?

admin
admin
https://www.thefullstack.co.in

Leave a Reply