from __future__ import (
annotations,
)
from datetime import (
datetime,
timedelta,
)
from uuid import (
UUID,
)
import httpx as httpx
import jwt
from minos.common import (
Config,
SetupMixin,
current_datetime,
)
[docs]class KongClient(SetupMixin):
"""Kong Client class."""
[docs] def __init__(
self, protocol: str = None, host: str = None, port: int = None, token_expiration_sec: int = None, **kwargs
):
super().__init__(**kwargs)
if host is None:
host = "localhost"
if port is None:
port = 8001
if protocol is None:
protocol = "http"
if token_expiration_sec is None:
token_expiration_sec = 60 * 5
self.route = f"{protocol}://{host}:{port}"
self.token_expiration_sec = token_expiration_sec
@classmethod
def _from_config(cls, config: Config, **kwargs) -> KongClient:
discovery_config = config.get_discovery()
token_expiration_sec = discovery_config.get("token-exp")
protocol = discovery_config.get("protocol")
host = discovery_config.get("host")
port = discovery_config.get("port")
return cls(protocol=protocol, host=host, port=port, token_expiration_sec=token_expiration_sec, **kwargs)
[docs] @staticmethod
async def register_service(
discovery_route: str, service_name: str, microservice_host: str, microservice_port: int
) -> httpx.Response:
url = f"{discovery_route}/services" # kong url for service POST or add
data = {"name": service_name, "protocol": "http", "host": microservice_host, "port": microservice_port}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response
[docs] @staticmethod
async def delete_service(discovery_route: str, service_name) -> httpx.Response:
"""
the delete of a service must be checking before if the service already have the routes
if yes the DELETE routes must be called
:param discovery_route:
:param service_name:
:return:
"""
async with httpx.AsyncClient() as client:
url_get_route = f"{discovery_route}/services/{service_name}/routes" # url to get the routes
response_routes = await client.get(url_get_route)
json_routes_response = response_routes.json()
if len(json_routes_response["data"]) > 0: # service already have route, routes must be deleted
for route in json_routes_response["data"]:
url_delete_route = f"{discovery_route}/routes/{route['id']}" # url for routes delete
await client.delete(url_delete_route)
url_delete_service = f"{discovery_route}/services/{service_name}" # url for service delete
response_delete_service = await client.delete(url_delete_service)
return response_delete_service
[docs] @staticmethod
async def create_route(
endpoint: str,
protocols: list[str],
methods: list[str],
paths: list[str],
service: str,
regex_priority: int,
strip_path: bool = False,
):
url = f"{endpoint}/routes"
payload = {
"protocols": protocols,
"methods": methods,
"paths": paths,
"service": {"id": service},
"regex_priority": regex_priority,
"strip_path": strip_path,
}
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload)
return resp
[docs] async def create_consumer(self, username: str, user: UUID, tags: list[str]):
payload = {"username": username, "custom_id": str(user), "tags": tags}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/consumers", json=payload)
return resp
[docs] async def add_basic_auth_to_consumer(self, username: str, password: str, consumer: str):
payload = {
"username": username,
"password": password,
}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/consumers/{consumer}/basic-auth", json=payload)
return resp
[docs] async def add_jwt_to_consumer(self, consumer: str):
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.route}/consumers/{consumer}/jwt",
headers={"content-type": "application/x-www-form-urlencoded"},
)
return resp
[docs] async def add_acl_to_consumer(self, role: str, consumer: str):
payload = {
"group": role,
}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/consumers/{consumer}/acls", json=payload)
return resp
[docs] async def activate_acl_plugin_on_service(self, service_name: str, allow: list[str]):
payload = {"name": "acl", "config": {"allow": allow}}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/services/{service_name}/plugins", json=payload)
return resp
[docs] async def activate_acl_plugin_on_route(self, route_id: str, allow: list[str]):
payload = {"name": "acl", "config": {"allow": allow}}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/routes/{route_id}/plugins", json=payload)
return resp
[docs] async def activate_basic_auth_plugin_on_service(self, service_name: str):
payload = {"name": "basic-auth", "config": {"hide_credentials": False}}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/services/{service_name}/plugins", json=payload)
return resp
[docs] async def activate_basic_auth_plugin_on_route(self, route_id: str):
payload = {"name": "basic-auth", "config": {"hide_credentials": False}}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/routes/{route_id}/plugins", json=payload)
return resp
[docs] async def activate_jwt_plugin_on_route(self, route_id: str):
payload = {
"name": "jwt",
"config": {"secret_is_base64": False, "run_on_preflight": True, "claims_to_verify": ["exp", "nbf"]},
}
async with httpx.AsyncClient() as client:
resp = await client.post(f"{self.route}/routes/{route_id}/plugins", json=payload)
return resp
[docs] async def generate_jwt_token(
self, key: str, secret: str, algorithm: str = "HS256", exp: datetime = None, nbf: datetime = None
) -> str:
payload = {"iss": key, "exp": exp, "nbf": nbf}
current = current_datetime()
if not exp:
payload["exp"] = current + timedelta(seconds=self.token_expiration_sec)
if not nbf:
payload["nbf"] = current
return jwt.encode(payload, secret, algorithm=algorithm)
[docs] @staticmethod
async def decode_token(token: str, algorithm: str = "HS256"):
return jwt.decode(token, options={"verify_signature": False}, algorithms=[algorithm])
[docs] async def get_jwt_by_id(self, id: str):
async with httpx.AsyncClient() as client:
resp = await client.get(f"{self.route}/jwts/{id}")
return resp
[docs] async def get_consumer_jwts(self, consumer: str):
async with httpx.AsyncClient() as client:
resp = await client.get(f"{self.route}/consumers/{consumer}/jwt")
return resp