from __future__ import (
annotations,
)
from collections.abc import (
Iterable,
)
from datetime import (
datetime,
)
from typing import (
Optional,
)
from uuid import (
UUID,
)
from minos.aggregate import (
TransactionDatabaseOperationFactory,
TransactionStatus,
)
from minos.common import (
ComposedDatabaseOperation,
DatabaseOperation,
)
from ...clients import (
AiopgDatabaseClient,
)
from ...operations import (
AiopgDatabaseOperation,
)
# noinspection SqlNoDataSourceInspection,SqlResolve,PyMethodMayBeStatic
[docs]class AiopgTransactionDatabaseOperationFactory(TransactionDatabaseOperationFactory):
"""Aiopg Transaction Database Operation Factory class."""
[docs] def build_table_name(self) -> str:
"""Get the table name.
:return: A ``str`` value.
"""
return "aggregate_transaction"
[docs] def build_create(self) -> DatabaseOperation:
"""Build the database operation to create the snapshot table.
:return: A ``DatabaseOperation`` instance.
"""
return ComposedDatabaseOperation(
[
AiopgDatabaseOperation(
'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";',
lock="uuid-ossp",
),
AiopgDatabaseOperation(
"""
DO
$$
BEGIN
IF NOT EXISTS(SELECT *
FROM pg_type typ
INNER JOIN pg_namespace nsp
ON nsp.oid = typ.typnamespace
WHERE nsp.nspname = current_schema()
AND typ.typname = 'transaction_status') THEN
CREATE TYPE transaction_status AS ENUM (
'pending', 'reserving', 'reserved', 'committing', 'committed', 'rejected'
);
END IF;
END;
$$
LANGUAGE plpgsql;
""",
lock="transaction_status",
),
AiopgDatabaseOperation(
f"""
CREATE TABLE IF NOT EXISTS {self.build_table_name()} (
uuid UUID PRIMARY KEY,
destination_uuid UUID NOT NULL,
status TRANSACTION_STATUS NOT NULL,
event_offset INTEGER,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
""",
lock=self.build_table_name(),
),
]
)
[docs] def build_submit(
self,
uuid: UUID,
destination_uuid: UUID,
status: TransactionStatus,
event_offset: int,
**kwargs,
) -> DatabaseOperation:
"""Build the database operation to submit a row.
:param uuid: The identifier of the transaction.
:param destination_uuid: The identifier of the destination transaction.
:param status: The status of the transaction.
:param event_offset: The event offset of the transaction.
:param kwargs: Additional named arguments.
:return: A ``DatabaseOperation`` instance.
"""
params = {
"uuid": uuid,
"destination_uuid": destination_uuid,
"status": status,
"event_offset": event_offset,
}
return AiopgDatabaseOperation(
f"""
INSERT INTO {self.build_table_name()} AS t (uuid, destination_uuid, status, event_offset)
VALUES (%(uuid)s, %(destination_uuid)s, %(status)s, %(event_offset)s)
ON CONFLICT (uuid)
DO
UPDATE SET status = %(status)s, event_offset = %(event_offset)s, updated_at = NOW()
WHERE (t.destination_uuid = %(destination_uuid)s)
AND (NOT (t.status = 'pending' AND %(status)s NOT IN ('pending', 'reserving', 'rejected')))
AND (NOT (t.status = 'reserving' AND %(status)s NOT IN ('reserved', 'rejected')))
AND (NOT (t.status = 'reserved' AND %(status)s NOT IN ('committing', 'rejected')))
AND (NOT (t.status = 'committing' AND %(status)s NOT IN ('committed')))
AND (NOT (t.status = 'committed'))
AND (NOT (t.status = 'rejected'))
RETURNING updated_at;
""",
params,
lock=uuid.int & (1 << 32) - 1,
)
[docs] def build_query(
self,
uuid: Optional[UUID] = None,
uuid_ne: Optional[UUID] = None,
uuid_in: Optional[Iterable[UUID]] = None,
destination_uuid: Optional[UUID] = None,
status: Optional[str] = None,
status_in: Optional[Iterable[str]] = None,
event_offset: Optional[int] = None,
event_offset_lt: Optional[int] = None,
event_offset_gt: Optional[int] = None,
event_offset_le: Optional[int] = None,
event_offset_ge: Optional[int] = None,
updated_at: Optional[datetime] = None,
updated_at_lt: Optional[datetime] = None,
updated_at_gt: Optional[datetime] = None,
updated_at_le: Optional[datetime] = None,
updated_at_ge: Optional[datetime] = None,
**kwargs,
) -> DatabaseOperation:
"""Build the database operation to select rows.
:param uuid: Transaction identifier equal to the given value.
:param uuid_ne: Transaction identifier not equal to the given value
:param uuid_in: Transaction identifier within the given values.
:param destination_uuid: Destination Transaction identifier equal to the given value.
:param status: Transaction status equal to the given value.
:param status_in: Transaction status within the given values
:param event_offset: Event offset equal to the given value.
:param event_offset_lt: Event Offset lower than the given value
:param event_offset_gt: Event Offset greater than the given value
:param event_offset_le: Event Offset lower or equal to the given value
:param event_offset_ge: Event Offset greater or equal to the given value
:param updated_at: Updated at equal to the given value.
:param updated_at_lt: Updated at lower than the given value.
:param updated_at_gt: Updated at greater than the given value.
:param updated_at_le: Updated at lower or equal to the given value.
:param updated_at_ge: Updated at greater or equal to the given value.
:param kwargs: Additional named arguments.
:return: A ``DatabaseOperation`` instance.
"""
if uuid_in is not None:
uuid_in = tuple(uuid_in)
if status_in is not None:
status_in = tuple(status_in)
conditions = list()
if uuid is not None:
conditions.append("uuid = %(uuid)s")
if uuid_ne is not None:
conditions.append("uuid <> %(uuid_ne)s")
if uuid_in is not None:
conditions.append("uuid IN %(uuid_in)s")
if destination_uuid is not None:
conditions.append("destination_uuid = %(destination_uuid)s")
if status is not None:
conditions.append("status = %(status)s")
if status_in is not None:
conditions.append("status IN %(status_in)s")
if event_offset is not None:
conditions.append("event_offset = %(event_offset)s")
if event_offset_lt is not None:
conditions.append("event_offset < %(event_offset_lt)s")
if event_offset_gt is not None:
conditions.append("event_offset > %(event_offset_gt)s")
if event_offset_le is not None:
conditions.append("event_offset <= %(event_offset_le)s")
if event_offset_ge is not None:
conditions.append("event_offset >= %(event_offset_ge)s")
if updated_at is not None:
conditions.append("updated_at = %(updated_at)s")
if updated_at_lt is not None:
conditions.append("updated_at < %(updated_at_lt)s")
if updated_at_gt is not None:
conditions.append("updated_at > %(updated_at_gt)s")
if updated_at_le is not None:
conditions.append("updated_at <= %(updated_at_le)s")
if updated_at_ge is not None:
conditions.append("updated_at >= %(updated_at_ge)s")
select_all = f"""
SELECT uuid, status, event_offset, destination_uuid, updated_at
FROM {self.build_table_name()}
""".strip()
if not conditions:
return AiopgDatabaseOperation(f"{select_all} ORDER BY event_offset;")
return AiopgDatabaseOperation(
f"{select_all} WHERE {' AND '.join(conditions)} ORDER BY event_offset;",
{
"uuid": uuid,
"uuid_ne": uuid_ne,
"uuid_in": uuid_in,
"destination_uuid": destination_uuid,
"status": status,
"status_in": status_in,
"event_offset": event_offset,
"event_offset_lt": event_offset_lt,
"event_offset_gt": event_offset_gt,
"event_offset_le": event_offset_le,
"event_offset_ge": event_offset_ge,
"updated_at": updated_at,
"updated_at_lt": updated_at_lt,
"updated_at_gt": updated_at_gt,
"updated_at_le": updated_at_le,
"updated_at_ge": updated_at_ge,
},
)
AiopgDatabaseClient.set_factory(TransactionDatabaseOperationFactory, AiopgTransactionDatabaseOperationFactory)