Source code for minos.networks.testing.brokers.subscribers.queues

from collections.abc import (
    Iterable,
)

from minos.common import (
    DatabaseOperation,
)
from minos.common.testing import (
    MockedDatabaseClient,
    MockedDatabaseOperation,
)

from ....brokers import (
    BrokerSubscriberQueueDatabaseOperationFactory,
)
from ..collections import (
    MockedBrokerQueueDatabaseOperationFactory,
)


[docs]class MockedBrokerSubscriberQueueDatabaseOperationFactory( BrokerSubscriberQueueDatabaseOperationFactory, MockedBrokerQueueDatabaseOperationFactory ): """For testing purposes"""
[docs] def build_count(self, retry: int, topics: Iterable[str] = tuple(), *args, **kwargs) -> DatabaseOperation: """For testing purposes""" return MockedDatabaseOperation("count_not_processed")
[docs] def build_query( self, retry: int, records: int, topics: Iterable[str] = tuple(), *args, **kwargs ) -> DatabaseOperation: """For testing purposes""" return MockedDatabaseOperation("select_not_processed")
MockedDatabaseClient.set_factory( BrokerSubscriberQueueDatabaseOperationFactory, MockedBrokerSubscriberQueueDatabaseOperationFactory )