Move request logic to separate method

As preparations for calling other endpoints as well.
This commit is contained in:
Mikkel Munch Mortensen 2024-08-22 09:29:30 +02:00
parent cf46a76452
commit 98748f2ff6

View file

@ -2,10 +2,11 @@
import base64 import base64
import getopt import getopt
from http.client import HTTPResponse
import json import json
import re import re
from urllib import error, parse, request from urllib import error, parse, request
from typing import Any, Dict, Tuple from typing import Any, Dict, Optional, Tuple
class Gateway: class Gateway:
@ -56,6 +57,20 @@ class Gateway:
if gateway_base_url: if gateway_base_url:
self.gateway_base_url = gateway_base_url self.gateway_base_url = gateway_base_url
def request(self, path: str, data: Optional[dict] = None) -> dict:
"""Make a request to the gateway and return the response data."""
# Construct the authentication header.
auth_header = "{}:{}".format(self.username, self.password)
auth_header = base64.b64encode(auth_header.encode()).decode()
# Prepare and make the request.
http_request = request.Request(self.gateway_base_url + path, data=data)
http_request.add_header("Authorization", "Basic {}".format(auth_header))
response = request.urlopen(http_request)
# Parse and return the response data.
return json.loads(response.read().decode())
def send(self, to: str = None, message: str = None) -> Tuple[bool, str]: def send(self, to: str = None, message: str = None) -> Tuple[bool, str]:
""" """
Send a message to a recipient. Send a message to a recipient.
@ -100,12 +115,4 @@ class Gateway:
options["flash"] = int(options["flash"]) options["flash"] = int(options["flash"])
data = json.dumps(options).encode() data = json.dumps(options).encode()
# Prepare authentication. return self.request("/v2/send", data=data)
auth_header = "{}:{}".format(self.username, self.password)
auth_header = base64.b64encode(auth_header.encode()).decode()
http_request = request.Request(self.gateway_base_url + "/v2/send", data=data)
http_request.add_header("Authorization", "Basic {}".format(auth_header))
response = request.urlopen(http_request)
return json.loads(response.read().decode())