This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/client.py
2018-05-11 18:58:48 +02:00

66 lines
3 KiB
Python

from typing import Type, Union
from boltons.typeutils import issubclass
from ereuse_utils.test import JSON
from flask import Response
from werkzeug.exceptions import HTTPException
from ereuse_devicehub.resources.models import Thing
from teal.client import Client as TealClient
class Client(TealClient):
def __init__(self, application, response_wrapper=None, use_cookies=False,
allow_subdomain_redirects=False):
super().__init__(application, response_wrapper, use_cookies, allow_subdomain_redirects)
def open(self, uri: str, res: str or Type[Thing] = None, status: int or HTTPException = 200,
query: dict = {}, accept=JSON, content_type=JSON, item=None, headers: dict = None,
token: str = None, **kw) -> (dict or str, Response):
if issubclass(res, Thing):
res = res.__name__
return super().open(uri, res, status, query, accept, content_type, item, headers, token,
**kw)
def get(self, uri: str = '', res: Union[Type[Thing], str] = None, query: dict = {},
status: int or HTTPException = 200, item: Union[int, str] = None, accept: str = JSON,
headers: dict = None, token: str = None, **kw) -> (dict or str, Response):
return super().get(uri, res, query, status, item, accept, headers, token, **kw)
def post(self, data: str or dict, uri: str = '', res: Union[Type[Thing], str] = None,
query: dict = {}, status: int or HTTPException = 201, content_type: str = JSON,
accept: str = JSON, headers: dict = None, token: str = None, **kw) -> (
dict or str, Response):
return super().post(data, uri, res, query, status, content_type, accept, headers, token,
**kw)
def login(self, email: str, password: str):
assert isinstance(email, str)
assert isinstance(password, str)
return self.post({'email': email, 'password': password}, '/users/login', status=200)
class UserClient(Client):
"""
A client that identifies all of its requests with a specific user.
It will automatically perform login on the first request.
"""
def __init__(self, application,
email: str,
password: str,
response_wrapper=None,
use_cookies=False,
allow_subdomain_redirects=False):
super().__init__(application, response_wrapper, use_cookies, allow_subdomain_redirects)
self.email = email # type: str
self.password = password # type: str
self.user = None # type: dict
def open(self, uri: str, res: str = None, status: int or HTTPException = 200, query: dict = {},
accept=JSON, content_type=JSON, item=None, headers: dict = None, token: str = None,
**kw) -> (dict or str, Response):
return super().open(uri, res, status, query, accept, content_type, item, headers,
self.user['token'] if self.user else token, **kw)