Source code for minos.networks.specs.asyncapi

from itertools import (
    chain,
)

from minos.common import (
    Config,
)

from ..decorators import (
    EnrouteCollector,
    enroute,
)
from ..requests import (
    Request,
    Response,
)


[docs]class AsyncAPIService:
[docs] def __init__(self, config: Config): self.config = config self.spec = SPECIFICATION_SCHEMA.copy()
[docs] @enroute.rest.command("/spec/asyncapi", "GET") def generate_specification(self, request: Request) -> Response: events = self.get_events() for event in events: topic: str = event["topic"] event_spec = {} self.spec["channels"][topic] = event_spec return Response(self.spec)
[docs] def get_events(self) -> list[dict]: events = list() for name in self.config.get_services(): decorators = EnrouteCollector(name, self.config).get_broker_event() events += [{"topic": decorator.topic} for decorator in set(chain(*decorators.values()))] return events
SPECIFICATION_SCHEMA = { "asyncapi": "2.3.0", "info": {"title": "", "version": ""}, "channels": {}, }