Source code for minos.plugins.kong.discovery

from __future__ import (
    annotations,
)

import logging
from collections.abc import (
    Iterable,
)
from functools import (
    partial,
)
from typing import (
    Optional,
)

import httpx as httpx

from minos.common import (
    CircuitBreakerMixin,
    Config,
)
from minos.networks import (
    DiscoveryClient,
)

from .client import (
    KongClient,
)
from .utils import (
    Endpoint,
)

logger = logging.getLogger(__name__)


[docs]class KongDiscoveryClient(DiscoveryClient, CircuitBreakerMixin): """Kong Discovery Client class."""
[docs] def __init__( self, host: Optional[str] = None, port: Optional[int] = None, circuit_breaker_exceptions: Iterable[type] = tuple(), client: KongClient = None, **kwargs, ): if host is None: host = "localhost" if port is None: port = 5567 super().__init__( host, port, circuit_breaker_exceptions=(httpx.HTTPStatusError, *circuit_breaker_exceptions), **kwargs ) if client is None: client = KongClient(host=host, port=port) self.client = client self.auth_type = None if "auth_type" in kwargs: self.auth_type = kwargs["auth_type"]
@classmethod def _from_config(cls, config: Config, **kwargs) -> KongDiscoveryClient: if "auth_type" in kwargs: auth_type = kwargs["auth_type"] kwargs.pop("auth_type") else: try: auth_type = config.get_by_key("discovery.auth-type") except Exception: auth_type = None client = KongClient.from_config(config) return super()._from_config(config, auth_type=auth_type, client=client, **kwargs)
[docs] async def subscribe( self, host: str, port: int, name: str, endpoints: list[dict[str, str]], *args, **kwargs ) -> httpx.Response: """Perform the subscription query. :param host: The ip of the microservice to be subscribed. :param port: The port of the microservice to be subscribed. :param name: The name of the microservice to be subscribed. :param endpoints: List of endpoints exposed by the microservice. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ fnsr = partial(self.client.register_service, self.route, name, host, port) response_service = await self.with_circuit_breaker(fnsr) # register a service if response_service.status_code == 409: # service already exist # if service already exist, delete and add again fn_delete = partial(self.client.delete_service, self.route, name) await self.with_circuit_breaker(fn_delete) # delete the service fnsr = partial(self.client.register_service, self.route, name, host, port) response_service = await self.with_circuit_breaker(fnsr) # send the servie subscription again content_service = response_service.json() # get the servie information like the id for endpoint in endpoints: endpointClass = Endpoint(endpoint["url"]) regex_priority = 0 if "regex_priority" in endpoint: regex_priority = endpoint["regex_priority"] fn = partial( self.client.create_route, self.route, ["http"], [endpoint["method"]], [endpointClass.path_as_regex], content_service["id"], regex_priority, False, ) response = await self.with_circuit_breaker(fn) # send the route request resp = response.json() if "authenticated" in endpoint and self.auth_type: if self.auth_type == "basic-auth": fn = partial(self.client.activate_basic_auth_plugin_on_route, route_id=resp["id"]) await self.with_circuit_breaker(fn) elif self.auth_type == "jwt": fn = partial(self.client.activate_jwt_plugin_on_route, route_id=resp["id"]) await self.with_circuit_breaker(fn) if "authorized_groups" in endpoint: fn = partial( self.client.activate_acl_plugin_on_route, route_id=resp["id"], allow=endpoint["authorized_groups"] ) await self.with_circuit_breaker(fn) return response
[docs] async def unsubscribe(self, name: str, *args, **kwargs) -> httpx.Response: """Perform the unsubscription query. :param name: The name of the microservice to be unsubscribed. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ fn = partial(self.client.delete_service, self.route, name) response = await self.with_circuit_breaker(fn) return response