AudibleSeriesChecker/connectors/audible_connector.py

56 lines
1.8 KiB
Python

import os
import audible
import json
from getpass import getpass
class AudibleConnector:
def __init__(self, authFile):
self.client: audible.Client = None
self._setup_auth(authFile)
def __del__(self):
if self.client:
self.client.close()
def _setup_auth(self, authFile=None):
try:
if authFile and os.path.exists(authFile):
self.auth = audible.Authenticator.from_file(authFile)
else:
self.auth = audible.Authenticator.from_login(
input("Username "),
getpass("Password "),
locale="us",
with_username=False,
)
if authFile:
self.auth.to_file(authFile)
except (
OSError,
audible.exceptions.AuthFlowError,
) as e:
print(f"Authentication failed: {e}")
raise ConnectionError(f"Failed to authenticate: {e}")
self.client = audible.Client(self.auth)
def get_produce_from_asin(self, asin):
endpoint = f"1.0/catalog/products/{asin}"
response = self.client.get(
endpoint, response_groups="series, relationships, product_attrs"
)
return response["product"]
class AudibleConnectorMock(AudibleConnector):
def get_produce_from_asin(self, asin):
try:
with open(f"dumps/products_{asin}.json", "r") as f:
data = json.load(f)
return data["product"]
except FileNotFoundError:
data = AudibleConnector.get_produce_from_asin(self, asin)
with open(f"dumps/products_{asin}.json", "w+") as f:
json.dump({"product": data}, f, indent=4)
return data