Source code for minos.plugins.aiopg.factories.aggregate.events

from collections.abc import (
    Iterable,
)
from datetime import (
    datetime,
)
from typing import (
    Any,
    Optional,
)
from uuid import (
    UUID,
)

from psycopg2.sql import (
    SQL,
    Composable,
    Identifier,
    Literal,
    Placeholder,
)

from minos.aggregate import (
    Action,
    EventDatabaseOperationFactory,
)
from minos.common import (
    ComposedDatabaseOperation,
    DatabaseOperation,
)

from ...clients import (
    AiopgDatabaseClient,
)
from ...operations import (
    AiopgDatabaseOperation,
)


# noinspection SqlNoDataSourceInspection,SqlResolve,PyMethodMayBeStatic
[docs]class AiopgEventDatabaseOperationFactory(EventDatabaseOperationFactory): """Aiopg Event Database Operation Factory class."""
[docs] def build_table_name(self) -> str: """Get the table name. :return: A ``str`` value. """ return "aggregate_event"
[docs] def build_create(self) -> DatabaseOperation: """Build the database operation to create the event table. :return: A ``DatabaseOperation`` instance.s """ return ComposedDatabaseOperation( [ AiopgDatabaseOperation( 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp", ), AiopgDatabaseOperation( """ DO $$ BEGIN IF NOT EXISTS(SELECT * FROM pg_type typ INNER JOIN pg_namespace nsp ON nsp.oid = typ.typnamespace WHERE nsp.nspname = current_schema() AND typ.typname = 'action_type') THEN CREATE TYPE action_type AS ENUM ('create', 'update', 'delete'); END IF; END; $$ LANGUAGE plpgsql; """, lock=self.build_table_name(), ), AiopgDatabaseOperation( f""" CREATE TABLE IF NOT EXISTS {self.build_table_name()} ( id BIGSERIAL PRIMARY KEY, action ACTION_TYPE NOT NULL, uuid UUID NOT NULL, name TEXT NOT NULL, version INT NOT NULL, data BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL, transaction_uuid UUID NOT NULL DEFAULT uuid_nil(), UNIQUE (uuid, version, transaction_uuid) ); """, lock=self.build_table_name(), ), ] )
[docs] def build_submit( self, transaction_uuids: Iterable[UUID], uuid: UUID, action: Action, name: str, version: int, data: bytes, created_at: datetime, transaction_uuid: UUID, lock: Optional[str], **kwargs, ) -> DatabaseOperation: """Build the database operation to submit a row into the event table. :param transaction_uuids: The sequence of nested transaction in on top of the current event's transaction. :param uuid: The identifier of the entity. :param action: The action of the event. :param name: The name of the entity. :param version: The version of the entity :param data: The data of the event. :param created_at: The creation datetime. :param transaction_uuid: The identifier of the transaction. :param lock: The lock identifier. :param kwargs: Additional named arguments. :return: A ``DatabaseOperation`` instance. """ insert_values = SQL( """ INSERT INTO {table_name} (id, action, uuid, name, version, data, created_at, transaction_uuid) VALUES ( default, %(action)s, CASE %(uuid)s WHEN uuid_nil() THEN uuid_generate_v4() ELSE %(uuid)s END, %(name)s, ( SELECT (CASE WHEN %(version)s IS NULL THEN 1 + COALESCE(MAX(t2.version), 0) ELSE %(version)s END) FROM ( SELECT DISTINCT ON (t1.uuid) t1.version FROM ( {from_parts} ) AS t1 ORDER BY t1.uuid, t1.transaction_index DESC ) AS t2 ), %(data)s, (CASE WHEN %(created_at)s IS NULL THEN NOW() ELSE %(created_at)s END), %(transaction_uuid)s ) RETURNING id, uuid, version, created_at; """ ) insert_parameters = { "uuid": uuid, "action": action, "name": name, "version": version, "data": data, "created_at": created_at, "transaction_uuid": transaction_uuid, } from_sql, from_parameters = self._build_submit_from(transaction_uuids) query = insert_values.format(from_parts=from_sql, table_name=Identifier(self.build_table_name())) parameters = from_parameters | insert_parameters return AiopgDatabaseOperation(query, parameters, lock)
def _build_submit_from(self, transaction_uuids: Iterable[UUID]) -> tuple[Composable, dict[str, Any]]: select_transaction = SQL( """ SELECT {index} AS transaction_index, uuid, MAX(version) AS version FROM {table_name} WHERE uuid = %(uuid)s AND transaction_uuid = {transaction_uuid} GROUP BY uuid """ ) from_query_parts = list() parameters = dict() for index, transaction_uuid in enumerate(transaction_uuids, start=1): name = f"transaction_uuid_{index}" parameters[name] = transaction_uuid from_query_parts.append( select_transaction.format( index=Literal(index), transaction_uuid=Placeholder(name), table_name=Identifier(self.build_table_name()), ), ) query = SQL(" UNION ALL ").join(from_query_parts) return query, parameters # noinspection PyShadowingBuiltins
[docs] def build_query( self, uuid: Optional[UUID] = None, name: Optional[str] = None, version: Optional[int] = None, version_lt: Optional[int] = None, version_gt: Optional[int] = None, version_le: Optional[int] = None, version_ge: Optional[int] = None, id: Optional[int] = None, id_lt: Optional[int] = None, id_gt: Optional[int] = None, id_le: Optional[int] = None, id_ge: Optional[int] = None, transaction_uuid: Optional[UUID] = None, transaction_uuid_ne: Optional[UUID] = None, transaction_uuid_in: Optional[Iterable[UUID, ...]] = None, **kwargs, ) -> DatabaseOperation: """Build the database operation to select rows. :param uuid: The identifier must be equal to the given value. :param name: The classname must be equal to the given value. :param version: The version must be equal to the given value. :param version_lt: The version must be lower than the given value. :param version_gt: The version must be greater than the given value. :param version_le: The version must be lower or equal to the given value. :param version_ge: The version must be greater or equal to the given value. :param id: The entry identifier must be equal to the given value. :param id_lt: The entry identifier must be lower than the given value. :param id_gt: The entry identifier must be greater than the given value. :param id_le: The entry identifier must be lower or equal to the given value. :param id_ge: The entry identifier must be greater or equal to the given value. :param transaction_uuid: The transaction identifier must be equal to the given value. :param transaction_uuid_ne: The transaction identifier must be distinct of the given value. :param transaction_uuid_in: The destination transaction identifier must be equal to one of the given values. :return: A ``DatabaseOperation`` instance. """ if transaction_uuid_in is not None: transaction_uuid_in = tuple(transaction_uuid_in) _select_all = f""" SELECT uuid, name, version, data, id, action, created_at, transaction_uuid FROM {self.build_table_name()} """ conditions = list() if uuid is not None: conditions.append("uuid = %(uuid)s") if name is not None: conditions.append("name = %(name)s") if version is not None: conditions.append("version = %(version)s") if version_lt is not None: conditions.append("version < %(version_lt)s") if version_gt is not None: conditions.append("version > %(version_gt)s") if version_le is not None: conditions.append("version <= %(version_le)s") if version_ge is not None: conditions.append("version >= %(version_ge)s") if id is not None: conditions.append("id = %(id)s") if id_lt is not None: conditions.append("id < %(id_lt)s") if id_gt is not None: conditions.append("id > %(id_gt)s") if id_le is not None: conditions.append("id <= %(id_le)s") if id_ge is not None: conditions.append("id >= %(id_ge)s") if transaction_uuid is not None: conditions.append("transaction_uuid = %(transaction_uuid)s") if transaction_uuid_ne is not None: conditions.append("transaction_uuid <> %(transaction_uuid_ne)s") if transaction_uuid_in is not None: conditions.append("transaction_uuid IN %(transaction_uuid_in)s") if not conditions: return AiopgDatabaseOperation(f"{_select_all} ORDER BY id;") return AiopgDatabaseOperation( f"{_select_all} WHERE {' AND '.join(conditions)} ORDER BY id;", { "uuid": uuid, "name": name, "version": version, "version_lt": version_lt, "version_gt": version_gt, "version_le": version_le, "version_ge": version_ge, "id": id, "id_lt": id_lt, "id_gt": id_gt, "id_le": id_le, "id_ge": id_ge, "transaction_uuid": transaction_uuid, "transaction_uuid_ne": transaction_uuid_ne, "transaction_uuid_in": transaction_uuid_in, }, )
[docs] def build_query_offset(self) -> DatabaseOperation: """Build the database operation to get the maximum identifier. :return: A ``DatabaseOperation`` instance. """ return AiopgDatabaseOperation(f"SELECT MAX(id) FROM {self.build_table_name()};".strip())
AiopgDatabaseClient.set_factory(EventDatabaseOperationFactory, AiopgEventDatabaseOperationFactory)