Source code for minos.networks.brokers.subscribers.queued.queues.database.impl

from __future__ import (
    annotations,
)

import logging
from typing import (
    Any,
)

from minos.common import (
    DatabaseClient,
)

from .....collections import (
    DatabaseBrokerQueue,
    DatabaseBrokerQueueBuilder,
)
from ..abc import (
    BrokerSubscriberQueue,
    BrokerSubscriberQueueBuilder,
)
from .factories import (
    BrokerSubscriberQueueDatabaseOperationFactory,
)

logger = logging.getLogger(__name__)


[docs]class DatabaseBrokerSubscriberQueue( DatabaseBrokerQueue[BrokerSubscriberQueueDatabaseOperationFactory], BrokerSubscriberQueue ): """Database Broker Subscriber Queue class.""" async def _get_count(self) -> int: # noinspection PyTypeChecker operation = self.database_operation_factory.build_count(self._retry, self.topics) row = await self.execute_on_database_and_fetch_one(operation) count = row[0] return count async def _dequeue_rows(self, client: DatabaseClient) -> list[Any]: operation = self.database_operation_factory.build_query(self._retry, self._records, self.topics) await client.execute(operation) return [row async for row in client.fetch_all()]
[docs]class DatabaseBrokerSubscriberQueueBuilder( BrokerSubscriberQueueBuilder[DatabaseBrokerSubscriberQueue], DatabaseBrokerQueueBuilder ): """Database Broker Subscriber Queue Builder class."""
DatabaseBrokerSubscriberQueue.set_builder(DatabaseBrokerSubscriberQueueBuilder)