from abc import (
ABC,
abstractmethod,
)
from collections.abc import (
Iterable,
)
from psycopg2.sql import (
SQL,
)
from minos.common import (
DatabaseOperation,
)
from minos.networks import (
BrokerQueueDatabaseOperationFactory,
)
from ....clients import (
AiopgDatabaseOperation,
)
# noinspection SqlResolve,SqlNoDataSourceInspection,SqlNoDataSourceInspection,SqlResolve
[docs]class AiopgBrokerQueueDatabaseOperationFactory(BrokerQueueDatabaseOperationFactory, ABC):
"""Aiopg Broker Queue Database Operation Factory class."""
[docs] @abstractmethod
def build_table_name(self) -> str:
"""Get the table name.
:return: A ``str`` value.
"""
raise NotImplementedError
[docs] def build_create(self) -> DatabaseOperation:
"""Build the "create table" query.
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(
f"CREATE TABLE IF NOT EXISTS {self.build_table_name()} ("
"id BIGSERIAL NOT NULL PRIMARY KEY, "
"topic VARCHAR(255) NOT NULL, "
"data BYTEA NOT NULL, "
"retry INTEGER NOT NULL DEFAULT 0, "
"processing BOOL NOT NULL DEFAULT FALSE, "
"created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), "
"updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW())"
),
lock=self.build_table_name(),
)
[docs] def build_mark_processed(self, id_: int) -> DatabaseOperation:
"""Build the "update not processed" query.
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(
f"UPDATE {self.build_table_name()} "
"SET processing = FALSE, retry = retry + 1, updated_at = NOW() WHERE id = %(id)s"
),
{"id": id_},
)
[docs] def build_delete(self, id_: int) -> DatabaseOperation:
"""Build the "delete processed" query.
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(f"DELETE FROM {self.build_table_name()} WHERE id = %(id)s"),
{"id": id_},
)
[docs] def build_mark_processing(self, ids: Iterable[int]) -> DatabaseOperation:
"""
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(f"UPDATE {self.build_table_name()} SET processing = TRUE WHERE id IN %(ids)s"),
{"ids": tuple(ids)},
)
[docs] def build_count(self, retry: int, *args, **kwargs) -> DatabaseOperation:
"""Build the "count not processed" query.
:return:
"""
return AiopgDatabaseOperation(
SQL(
f"SELECT COUNT(*) FROM (SELECT id FROM {self.build_table_name()} "
"WHERE NOT processing AND retry < %(retry)s FOR UPDATE SKIP LOCKED) s"
),
{"retry": retry},
)
[docs] def build_submit(self, topic: str, data: bytes) -> DatabaseOperation:
"""Build the "insert" query.
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(f"INSERT INTO {self.build_table_name()} (topic, data) VALUES (%(topic)s, %(data)s) RETURNING id"),
{"topic": topic, "data": data},
)
[docs] def build_query(self, retry: int, records: int, *args, **kwargs) -> DatabaseOperation:
"""Build the "select not processed" query.
:return: A ``SQL`` instance.
"""
return AiopgDatabaseOperation(
SQL(
"SELECT id, data "
f"FROM {self.build_table_name()} "
"WHERE NOT processing AND retry < %(retry)s "
"ORDER BY created_at "
"LIMIT %(records)s "
"FOR UPDATE "
"SKIP LOCKED"
),
{
"retry": retry,
"records": records,
},
)