mirror of
https://codeberg.org/reflect/reflect-client-python.git
synced 2025-09-14 09:33:41 +02:00
100 lines
3.9 KiB
Python
100 lines
3.9 KiB
Python
from typing import TypeVar, Union
|
|
from enum import Enum
|
|
|
|
import json
|
|
import socket
|
|
import requests
|
|
import validators
|
|
|
|
# Allowed HTTP verbs
|
|
class Method(Enum):
|
|
GET = "GET"
|
|
POST = "POST"
|
|
PUT = "PUT"
|
|
DELETE = "DELETE"
|
|
PATCH = "PATCH"
|
|
OPTIONS = "OPTIONS"
|
|
|
|
# Supported connection methods
|
|
class Connection(Enum):
|
|
HTTP = 1
|
|
AF_UNIX = 2
|
|
|
|
class Client:
|
|
# Use this HTTP method if no method specified to call()
|
|
HTTP_DEFAULT_METHOD = Method.GET
|
|
# The amount of bytes to read for each chunk from socket
|
|
SOCKET_READ_BYTES = 2048
|
|
|
|
# Bind current class name to a variable we can reference elsewhere in the class
|
|
# without having to type the name of the class each time
|
|
#__CLASS__ = TypeVar("__CLASS__", bound="Client")
|
|
|
|
def __init__(self, endpoint: str, key: str = None, con: Connection = None, https_peer_verify: bool = True):
|
|
self._con = con if isinstance(con, Connection) else self.resolve_connection(endpoint)
|
|
self._endpoint = endpoint
|
|
self._key = key
|
|
|
|
if (self._con == Connection.AF_UNIX):
|
|
# Connect to Reflect UNIX socket
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self._socket.connect(endpoint)
|
|
elif (self._con == Connection.HTTP):
|
|
# Append tailing "/" for HTTP if absent
|
|
self._endpoint = self._endpoint if self._endpoint[-1] == "/" else self._endpoint + "/"
|
|
# Flag which enables or disables SSL peer validation (for self-signed certificates)
|
|
self._https_peer_verify = https_peer_verify
|
|
|
|
# Close socket connection when class instance removed
|
|
def __del__(self):
|
|
# Close socket if connected
|
|
if (self._con == Connection.AF_UNIX):
|
|
self._socket.close()
|
|
|
|
# Resolve connection type from endpoint string.
|
|
# If the string is a valid URL we will treat it as HTTP otherwise we will assume it's a path on disk to a UNIX socket file.
|
|
def resolve_connection(self, endpoint: str) -> Connection:
|
|
return Connection.HTTP if validators.url(endpoint) else Connection.AF_UNIX
|
|
|
|
# Check if provided "method" is an instance of the Method enum. Else return default
|
|
def resolve_method(self, method: Union[Method, str, None]) -> Method:
|
|
return method if isinstance(method, Method) else __class__.HTTP_DEFAULT_METHOD
|
|
|
|
# Construct list of headers to send with HTTP request
|
|
def http_headers(self) -> list:
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Append Authentication header if API key is provided
|
|
if (self._key):
|
|
headers["Authentication"] = f"Bearer {self._key}"
|
|
|
|
return headers
|
|
|
|
# Make request and return response over HTTP
|
|
def http_call(self, endpoint: str, method: Method, payload: list = None) -> tuple:
|
|
# Remove leading "/" if present, as it's already present in self._endpoint
|
|
endpoint = endpoint if endpoint[-1] != "/" else endpoint[:-1]
|
|
|
|
resp = requests.request(method.name, self._endpoint + endpoint, headers=self.http_headers(), data=json.dumps(payload))
|
|
# Return response as tuple of response code and response body as plain text
|
|
return (resp.status_code, resp.text)
|
|
|
|
def socket_txn(self, endpoint: str, method: Union[Method, str] = None, payload: list = None) -> tuple:
|
|
data = f'["{endpoint}","{method.name}","{json.dumps(payload)}"]'
|
|
tx = self._socket.sendall(b'["order","GET",""]')
|
|
rx = self._socket.recv(__class__.SOCKET_READ_BYTES)
|
|
|
|
return tuple(json.loads(rx))
|
|
|
|
def call(self, endpoint: str, method: Union[Method, str] = None, payload: list = None) -> tuple:
|
|
# Get default method if not provided
|
|
method = self.resolve_method(method)
|
|
|
|
# Call endpoint via UNIX socket
|
|
if (self._con == Connection.AF_UNIX):
|
|
return self.socket_txn(endpoint, method, payload)
|
|
|
|
# Call endpoint over HTTP
|
|
return self.http_call(endpoint, method, payload)
|