1 Commits

Author SHA1 Message Date
deesiigneer
c08394addf v3 asyncio/aiohttp alpha 2022-08-15 23:26:36 +03:00
34 changed files with 485 additions and 654 deletions

View File

@@ -2,10 +2,7 @@ version: 2
formats: []
build:
os: ubuntu-lts-latest
tools:
python: '3.8'
image: latest
sphinx:
configuration: docs/conf.py
@@ -13,6 +10,7 @@ sphinx:
builder: html
python:
version: "3.8"
install:
- method: pip
path: .

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2022 deesiigneer
Copyright (c) 2022 Aleksey
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -37,7 +37,7 @@ Installation
.. code:: sh
pip3 install pyspapi
sudo apt pip3 install pyspapi
Quick example
--------------
@@ -46,17 +46,9 @@ Checking the balance
~~~~~~~~~~~~~~~~~~~~~
.. code:: py
from pyspapi import SPAPI
from asyncio import get_event_loop
import pyspapi
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
async def main():
print(await spapi.balance)
loop = get_event_loop()
loop.run_until_complete(main())
print(await pyspapi.API(card_id='card_id', token='token').balance)
More examples can be found in the `examples <https://github.com/deesiigneer/pyspapi/tree/main/examples>`_
@@ -65,5 +57,6 @@ Links
- `Discord server <https://discord.gg/VbyHaKRAaN>`_
- `pyspapi documentation <https://pyspapi.readthedocs.io/>`_
- `API documentation <https://spworlds.readthedocs.io>`_
- `PyPi <https://pypi.org/project/pyspapi/>`_
- `API documentation for SP sites <https://github.com/sp-worlds/api-docs>`_

View File

@@ -1,13 +1,13 @@
/* Background of stable should be green */
.version-switcher__container a[data-version-name*="stable"] {
#version_switcher a[data-version-name*="stable"] {
position: relative;
}
.version-switcher__container a[data-version-name*="stable"] span {
#version_switcher a[data-version-name*="stable"] span {
color: var(--pst-color-success);
}
.version-switcher__container a[data-version-name*="stable"] span:before {
#version_switcher a[data-version-name*="stable"] span:before {
content: "";
width: 100%;
height: 100%;

View File

@@ -2,16 +2,11 @@
{
"name": "latest",
"version": "latest",
"url": "https://pyspapi.readthedocs.io/ru/latest/"
"url": "https://pyspapi.readthedocs.io/en/latest/"
},
{
"name": "stable",
"version": "stable",
"url": "https://pyspapi.readthedocs.io/ru/stable/"
},
{
"name": "v3-asyncio",
"version": "v3-asyncio",
"url": "https://pyspapi.readthedocs.io/ru/v3-asyncio/"
"url": "https://pyspapi.readthedocs.io/en/stable/"
}
]

View File

@@ -1,4 +1,4 @@
.. currentmodule:: pyspapi
.. py:currentmodule:: pyspapi
API Reference
===============
@@ -25,6 +25,50 @@ There are two main ways to query version information.
-----------
``SPAPI``
~~~~~~~~~
~~~~~
.. autoclass:: SPAPI
:members:
.. automethod:: SPAPI.event()
:decorator:
.. automethod:: SPAPI.check_user_access
:decorator:
.. automethod:: SPAPI.get_user
:decorator:
.. automethod:: SPAPI.get_users
:decorator:
.. automethod:: SPAPI.payment
:decorator:
.. automethod:: SPAPI.transaction
:decorator:
.. automethod:: SPAPI.webhook_verify
:decorator:
MojangAPI
~~~~~
.. autoclass:: MojangAPI
:members:
.. automethod:: SPAPI.event()
:decorator:
.. automethod:: SPAPI.get_name_history
:decorator:
.. automethod:: SPAPI.get_profile
:decorator:
.. automethod:: SPAPI.get_username
:decorator:
.. automethod:: SPAPI.get_uuid
:decorator:
.. automethod:: SPAPI.get_uuids
:decorator:

View File

@@ -1,7 +1,4 @@
from re import search, MULTILINE
import os
import sys
project = 'pyspapi'
copyright = '2022, deesiigneer'
@@ -19,9 +16,6 @@ release = version
# -- General configuration
sys.path.insert(0, os.path.abspath(".."))
extensions = [
'sphinx.ext.duration',
'sphinx.ext.doctest',
@@ -30,19 +24,12 @@ extensions = [
'sphinx.ext.intersphinx',
]
autodoc_member_order = "bysource"
autodoc_typehinta = "none"
intersphinx_mapping = {
'python': ('https://docs.python.org/3/', None),
'sphinx': ('https://www.sphinx-doc.org/en/master/', None),
}
version_match = os.environ.get("READTHEDOCS_VERSION")
json_url = f"https://pyspapi.readthedocs.io/ru/{version_match}/_static/switcher.json"
intersphinx_disabled_domains = ['std']
language = 'en'
language = None
locale_dirs = ["locale/"]
exclude_patterns = []
html_static_path = ["_static"]
@@ -78,7 +65,12 @@ html_theme_options = {
],
"header_links_before_dropdown": 4,
"show_toc_level": 1,
"navbar_start": ["navbar-logo"],
"navbar_start": ["navbar-logo", "version-switcher"],
"switcher": {
"json_url": "https://pyspapi.readthedocs.io/en/latest/_static/switcher.json",
"version_match": "latest"
},
"navigation_with_keys": True,
}
html_css_files = ["custom.css"]

View File

@@ -10,7 +10,7 @@ Quickstart
This page gives a brief introduction to the library.
Checking balance
----------------
-------------
Let's output the amount of money remaining in the card account to the console.
@@ -18,17 +18,9 @@ It looks something like this:
.. code-block:: python
from pyspapi import SPAPI
from asyncio import get_event_loop
import pyspapi
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
async def main():
print(await spapi.balance)
loop = get_event_loop()
loop.run_until_complete(main())
print(pyspapi.SPAPI(card_id='card_id', token='token').balance)
Make sure not to name it ``pyspapi`` as that'll conflict with the library.

View File

@@ -0,0 +1,12 @@
import pyspapi
import asyncio
api = pyspapi.API(card_id='card_id', token='token')
async def main():
print(await api.get_name_history(uuid='63ed47877aa3470fbfc46c5356c3d797'))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

15
examples/get_profile.py Normal file
View File

@@ -0,0 +1,15 @@
import pyspapi
import asyncio
api = pyspapi.API(card_id='card_id', token='token')
async def main():
mojang_profile = await api.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797')
print(mojang_profile)
print(mojang_profile.id, mojang_profile.timestamp)
print(mojang_profile.skin, mojang_profile.skin.model, mojang_profile.skin.cape_url, mojang_profile.skin.url)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -0,0 +1,21 @@
import pyspapi
import asyncio
api = pyspapi.API(card_id='card_id', token='token')
async def main():
user = await api.get_user(264329096920563714)
print(user)
print(user.access)
# У API есть лимиты, каждый user = 1 запрос, учитывайте это при использовании get_users
# https://spworlds.readthedocs.io/ru/latest/index.html#id3
users = await api.get_users([262632724928397312, 264329096920563714])
for user in users:
print(user)
if user is not None:
print(user.access)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,15 +0,0 @@
from pyspapi import SPAPI
from asyncio import get_event_loop
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
async def main():
user = await spapi.get_user(262632724928397312)
print(user.username, user.uuid)
for card in user.cards:
print(card.name, card.number)
loop = get_event_loop()
loop.run_until_complete(main())

View File

@@ -0,0 +1,17 @@
import pyspapi
import pyspapi
import asyncio
api = pyspapi.API(card_id='card_id', token='token')
async def main():
uuid = await pyspapi.API(card_id='card_id', token='token').get_uuid(username='deesiigneer')
print(uuid)
print(uuid.id, uuid.name)
print(await pyspapi.API(card_id='card_id', token='token').get_uuids(['deesiigneer', '5opka', 'OsterMiner']))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,12 +0,0 @@
from pyspapi import SPAPI
from asyncio import get_event_loop
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
async def main():
me = await spapi.me
print(me)
loop = get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,21 +1,16 @@
from pyspapi import SPAPI
from pyspapi.types import Item
from asyncio import get_event_loop
import pyspapi
import asyncio
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
items = [Item('first item', 1, 2, 'first item comment').to_json(),
Item('second item', 3, 4, 'second item comment').to_json()]
api = pyspapi.API(card_id='card_id', token='token')
async def main():
print(await spapi.create_payment(items=items,
redirect_url='https://www.google.com/',
webhook_url='https://www.google.com/',
data='some-data'
)
print(await api.payment(amount=1,
redirect_url='https://www.google.com/',
webhook_url='https://www.google.com/',
data='some-data'
)
)
loop = get_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,15 +1,15 @@
from asyncio import get_event_loop
import pyspapi
import asyncio
from pyspapi import SPAPI
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
api = pyspapi.API(card_id='CARD_ID', token='TOKEN')
async def main():
new_balance = await spapi.create_transaction(receiver='77552',
amount=1,
comment='test')
print(new_balance)
print(await api.transaction(receiver=12345,
amount=1,
comment="test"
)
)
loop = get_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,13 +0,0 @@
from pyspapi import SPAPI
from asyncio import get_event_loop
spapi = SPAPI(card_id='CARD_ID', token='TOKEN')
# print(spapi.webhook_verify(data='webhook_data', header='webhook_header'))
async def main():
print(await spapi.update_webhook(url='https://example.com/webhook'))
loop = get_event_loop()
loop.run_until_complete(main())

View File

@@ -0,0 +1,5 @@
import pyspapi
api = pyspapi.API(card_id='your_card_id', token='your_token')
api.listener(host='myhost.com', port=80, webhook_path='/webhook/')

View File

@@ -0,0 +1,10 @@
import pyspapi
import asyncio
api = pyspapi.API(card_id='your_card_id', token='your_token')
async def main():
print(await api.webhook_verify(data='webhook_data', header='webhook_header'))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,9 +1,4 @@
from .api import *
from .spworlds import *
from .types import *
from .api import API
from .types import SPUser, MojangProfile, Skin, UsernameToUUID
__author__ = 'deesiigneer'
__url__ = 'https://github.com/deesiigneer/pyspapi'
__description__ = 'API wrapper for SP servers written in Python.'
__license__ = 'MIT'
__version__ = "3.2.0"
__version__ = "3.0.0a0"

182
pyspapi/api.py Normal file
View File

@@ -0,0 +1,182 @@
import ast
import warnings
import asyncio
from aiohttp import web
from sys import version_info
from base64 import b64encode, b64decode
from hmac import new, compare_digest
from hashlib import sha256
from logging import getLogger
from typing import Any, Dict, List, Optional, Union
from .types import SPUser, MojangProfile, UsernameToUUID
from .request import Request
from .errors import Error
log = getLogger('pyspapi')
class _BaseAPI:
_SPWORLDS = "https://spworlds.ru/api/public"
_API_MOJANG = "https://api.mojang.com"
_SESSIONSERVER_MOJANG = "https://sessionserver.mojang.com"
def __init__(self, card_id: str, token: str):
"""
:param card_id:
:param token:
"""
self._id = card_id
self._token = token
self._HEADER = {
'Authorization': f"Bearer {str(b64encode(str(f'{self._id}:{self._token}').encode('utf-8')), 'utf-8')}",
'User-Agent': f'pyspapi (https://github.com/deesiigneer/pyspapi) '
f'Python {version_info.major}.{version_info.minor}.{version_info.micro}'
}
async def webhook_verify(self, data: str, header) -> bool:
"""
Проверяет достоверность webhook'а. \n
:param data: data из webhook.
:param header: header X-Body-Hash из webhook.
:return: True если header из webhook'а достоверен, иначе False
"""
print()
hmac_data = b64encode(new(self._token.encode('utf-8'), data, sha256).digest())
return compare_digest(hmac_data, header.encode('utf-8'))
def listener(self, host: str = '127.0.0.1', port: int = 80, webhook_path: str = '/webhook/'):
app = web.Application()
async def handle(request):
request_data = await request.read()
header = request.headers.get('X-Body-Hash')
if header is not None:
if await self.webhook_verify(data=request_data, header=header) is True:
web.json_response(status=202)
return True
else:
web.json_response(status=400)
else:
return web.json_response(status=404)
app.add_routes([web.get(webhook_path, handle)])
web.run_app(app, port=port, host=host)
class API(_BaseAPI):
"""
class API
"""
async def get_user(self, user_id: int) -> Optional[SPUser]:
"""
Получение информации об игроке SP \n
:param user_id: ID пользователя в Discord.
:return: Class User.
"""
sp_user = await Request.get(f'{self._SPWORLDS}/users/{str(user_id)}', self._HEADER)
if sp_user is not None:
return SPUser(await Request.get(f'{self._SPWORLDS}/users/{str(user_id)}', self._HEADER))
else:
return None
async def get_users(self, user_ids: List[int]) -> Union[SPUser, Any]:
"""
Получение никнеймов игроков в майнкрафте. **Максимально можно указать 60 user_ids, не используйте эту функцию
чаще 1 раза в минуту если указали больше 60 user_ids**\n
https://spworlds.readthedocs.io/ru/latest/index.html#id3\n
:param user_ids: List[int] ID пользователей в Discord.
:return: List[str] который содержит майнкрафт никнеймы игроков в том же порядке, который был задан, None если
пользователь не найден или нет проходки.
"""
if len(user_ids) > 60:
user_ids = user_ids[:60]
warnings.warn('user_ids больше чем 60. Уменьшено до 60.')
tasks = []
for user_id in user_ids:
tasks.append(self.get_user(user_id))
return await asyncio.gather(*tasks, return_exceptions=True)
async def get_uuid(self, username: str) -> Optional[UsernameToUUID]:
"""
Получить UUID игрока Minecraft.\n
:param username: str никнейм игрока Minecraft.
:return: Optional[str] UUID игрока Minecraft.
"""
response = await Request.get(f'{self._API_MOJANG}/users/profiles/minecraft/{username}')
return UsernameToUUID(await Request.get(f'{self._API_MOJANG}/users/profiles/minecraft/{username}'))
async def get_uuids(self, usernames: list[str]) -> Dict[str, str]:
"""
Получить UUID's игроков Minecraft. **Не больше 10**\n
:param usernames: List[str] Список с никнеймами игроков Minecraft.
:return: Dict[str, str] UUID игроков Minecraft.
"""
if len(usernames) > 10:
usernames = usernames[:10]
warnings.warn('usernames больше чем 10. Уменьшено до 10.')
return await Request.post(f'{self._API_MOJANG}/profiles/minecraft', payload=usernames)
async def get_name_history(self, uuid: str) -> List[Dict[str, Any]]:
"""
История никнеймов в Minecraft.\n
:param uuid: UUID игрока Minecraft.
:return: List[Dict[str, Any]] который содержит name и changed_to_at
"""
requests = await Request.get(f"{self._API_MOJANG}/user/profiles/{uuid}/names")
name_data = []
for data in requests:
name_data_dict = {"name": data["name"]}
if data.get("changedToAt"):
name_data_dict["changed_to_at"] = data["changedToAt"]
else:
name_data_dict["changed_to_at"] = 0
name_data.append(name_data_dict)
return name_data
async def get_profile(self, uuid: str) -> MojangProfile:
response = await Request.get(f'{self._SESSIONSERVER_MOJANG}/session/minecraft/profile/{uuid}')
return MojangProfile(ast.literal_eval(b64decode(response["properties"][0]["value"]).decode()))
async def payment(self, amount: int, redirect_url: str, webhook_url: str, data: str) -> Optional[str]:
"""
Создание ссылки для оплаты.\n
:param amount: Стоимость покупки в АРах.
:param redirect_url: URL страницы, на которую попадет пользователь после оплаты.
:param webhook_url: URL, куда наш сервер направит запрос, чтобы оповестить ваш сервер об успешной оплате.
:param data: Строка до 100 символов, сюда можно поместить любые полезные данных.
:return: Ссылку на страницу оплаты, на которую стоит перенаправить пользователя.
"""
if len(data) > 100:
raise Error('В data больше 100 символов')
return await Request.post(f'{self._SPWORLDS}/payment',
payload={
'amount': amount,
'redirectUrl': redirect_url,
'webhookUrl': webhook_url,
'data': data},
headers=self._HEADER)
async def transaction(self, receiver: int, amount: int, comment: str) -> Optional[str]:
"""
Перевод АР на карту. \n
:param receiver: Номер карты получателя.
:param amount: Количество АР для перевода.
:param comment: Комментарий для перевода.
:return: True если перевод успешен, иначе False.
"""
return 'Удачно' if await Request.post(f'{self._SPWORLDS}/transactions',
payload={'receiver': receiver,
'amount': amount,
'comment': comment},
headers=self._HEADER) else 'Что-то пошло не так...'
@property
async def balance(self) -> Optional[int]:
"""
Проверка баланса карты \n
:return: Количество АР на карте.
"""
balance = await Request.get(f'{self._SPWORLDS}/card', headers=self._HEADER)
return balance['balance']

View File

@@ -1,2 +0,0 @@
from .api import APISession

View File

@@ -1,87 +0,0 @@
import asyncio
import json
from base64 import b64encode
from logging import getLogger
from typing import Optional, Any, Dict
import aiohttp
from ..exceptions import ValidationError, SPAPIError
log = getLogger('pyspapi')
class APISession(object):
def __init__(self, card_id: str,
token: str,
timeout: int = 5,
sleep_time: float = 0.2,
retries: int = 0,
raise_exception: bool = False):
self.__url = "https://spworlds.ru/api/public/"
self.__id = card_id
self.__token = token
self.__sleep_timeout = sleep_time
self.__retries = retries
self.__timeout = timeout
self.__raise_exception = raise_exception
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
json_serialize=json.dumps,
timeout=aiohttp.ClientTimeout(total=self.__timeout))
return self
async def __aexit__(self, *err):
await self.session.close()
self.session = None
async def request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Any:
url = self.__url + endpoint
headers = {
'Authorization': f"Bearer {str(b64encode(str(f'{self.__id}:{self.__token}').encode('utf-8')), 'utf-8')}",
'User-Agent': 'https://github.com/deesiigneer/pyspapi',
"Content-Type": "application/json",
}
attempt = 0
while True:
attempt += 1
if attempt > 1:
log.warning(f'[pyspapi] Repeat attempt {attempt}: {method.upper()} {url}')
try:
async with self.session.request(method, url, json=data, headers=headers) as resp:
if resp.status == 422:
errors = await resp.json()
log.error(f"[pyspapi] Validation error: {errors}")
if self.__raise_exception:
raise ValidationError(errors)
return None
if resp.status >= 400:
content = await resp.text()
log.error(f"[pyspapi] API error {resp.status}: {content}")
if self.__raise_exception:
raise SPAPIError(resp.status, content)
return None
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
log.exception(f"[pyspapi] Connection error: {e}")
if attempt > self.__retries:
return None
await asyncio.sleep(self.__sleep_timeout)
async def get(self, endpoint: str) -> Any:
async with self:
return await self.request("GET", endpoint)
async def post(self, endpoint: str, data: Optional[Dict] = None) -> Any:
async with self:
return await self.request("POST", endpoint, data)
async def put(self, endpoint: str, data: Optional[Dict] = None) -> Any:
async with self:
return await self.request("PUT", endpoint, data)

29
pyspapi/errors.py Normal file
View File

@@ -0,0 +1,29 @@
class Error(Exception):
pass
class Unauthorized(Exception):
pass
class NotFound(Exception):
pass
class TooManyRequests(Exception):
pass
class UserNotFound(Exception):
pass
def handle(response):
if response['error'] == 'Unauthorized':
raise Unauthorized(response['message'])
elif response['error'] == 'Not Found':
raise NotFound(response['message'])
elif response['error'] == 'Too Many Requests':
raise TooManyRequests(response['message'])
else:
raise Exception(response)

View File

@@ -1,25 +0,0 @@
class SPAPIError(Exception):
"""
Базовая ошибка для всех исключений, связанных с API SPWorlds.
"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
super().__init__(f"[{status_code}] {message}")
def __str__(self):
return f"SPAPIError: [{self.status_code}] {self.message}"
class ValidationError(SPAPIError):
"""
Ошибка валидации.
"""
def __init__(self, errors):
self.errors = errors
super().__init__(422, f"Validation failed: {errors}")
def __str__(self):
return f"ValidationError: {self.errors}"

31
pyspapi/request.py Normal file
View File

@@ -0,0 +1,31 @@
import aiohttp
from .errors import handle
from typing import Coroutine, Any, TypeVar
T = TypeVar("T")
Response = Coroutine[Any, Any, T]
class Request:
async def get(url: str, headers=None):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as session_response:
try:
response = await session_response.json()
except:
response = await session_response.text()
if session_response.status == 404:
return None
if not session_response.ok:
handle(response)
return response
async def post(url: str, payload, headers=None):
async with aiohttp.ClientSession() as session:
session_response = await session.post(url, json=payload, headers=headers)
try:
response = await session_response.json()
except:
response = await session_response.text()
if not session_response.ok:
handle(response)
return response

View File

@@ -1,185 +0,0 @@
from base64 import b64encode
from hashlib import sha256
from hmac import new, compare_digest
from typing import Optional
from .api import APISession
from .types import User
from .types.me import Account
from .types.payment import Item
__all__ = ['SPAPI']
class SPAPI(APISession):
"""
pyspapi — высокоуровневый клиент для взаимодействия с SPWorldsAPI.
Предоставляет удобные методы для работы с балансом карты, вебхуками,
информацией о пользователе, транзакциями и платежами, а также верификацией вебхуков.
"""
def __init__(self, card_id: str,
token: str,
timeout: int = 5,
sleep_time: float = 0.2,
retries: int = 0,
raise_exception: bool = False):
"""
Инициализирует объект SPAPI.
:param card_id: Идентификатор карты.
:type card_id: str
:param token: Токен API.
:type token: str
:param timeout: Таймаут для запросов API в секундах. По умолчанию 5.
:type timeout: int
:param sleep_time: Время ожидания между повторными запросами в секундах. По умолчанию 0.2.
:type sleep_time: float
:param retries: Количество повторных попыток для неудачных запросов. По умолчанию 0.
:type retries: int
:param raise_exception: Поднимать исключения при ошибке, если True.
:type raise_exception: bool
"""
super().__init__(card_id, token, timeout, sleep_time, retries, raise_exception)
self.__card_id = card_id
self.__token = token
def __repr__(self):
"""
Возвращает строковое представление объекта SPAPI.
"""
return f"{self.__class__.__name__}({vars(self)})"
@property
async def balance(self) -> Optional[int]:
"""
Получает текущий баланс карты.
:return: Текущий баланс карты.
:rtype: int
"""
return int((await super().get('card'))['balance'])
@property
async def webhook(self) -> Optional[str]:
"""
Получает URL вебхука, связанного с картой.
:return: URL вебхука.
:rtype: str
"""
return str((await super().get('card'))['webhook'])
@property
async def me(self) -> Optional[Account]:
"""
Получает информацию об аккаунте текущего пользователя.
:return: Объект Account, представляющий аккаунт текущего пользователя.
:rtype: :class:`Account`
"""
me = await super().get('accounts/me')
return Account(
account_id=me['id'],
username=me['username'],
minecraftuuid=me['minecraftUUID'],
status=me['status'],
roles=me['roles'],
cities=me['cities'],
cards=me['cards'],
created_at=me['createdAt'])
async def get_user(self, discord_id: int) -> Optional[User]:
"""
Получает информацию о пользователе по его ID в Discord.
:param discord_id: ID пользователя в Discord.
:type discord_id: int
:return: Объект User, представляющий пользователя.
:rtype: :class:`User`
"""
user = await super().get(f'users/{discord_id}')
cards = await super().get(f"accounts/{user['username']}/cards")
return User(user['username'], user['uuid'], cards)
async def create_transaction(self, receiver: str, amount: int, comment: str) -> Optional[int]:
"""
Создает транзакцию.
:param receiver: Получатель транзакции.
:type receiver: str
:param amount: Сумма транзакции.
:type amount: int
:param comment: Комментарий к транзакции.
:type comment: str
:return: Баланс после транзакции.
:rtype: int
"""
data = {
'receiver': receiver,
'amount': amount,
'comment': comment
}
return int((await super().post('transactions', data))['balance'])
async def create_payment(self, webhook_url: str, redirect_url: str, data: str, items: list[Item]) -> Optional[str]:
"""
Создает платеж.
:param webhook_url: URL вебхука для платежа.
:type webhook_url: str
:param redirect_url: URL для перенаправления после платежа.
:type redirect_url: str
:param data: Дополнительные данные для платежа.
:type data: str
:param items: Элементы, включаемые в платеж.
:return: URL для платежа или None при ошибке.
:rtype: str
"""
data = {
'items': items,
'redirectUrl': redirect_url,
'webhookUrl': webhook_url,
'data': data
}
return str((await super().post('payments', data))['url'])
async def update_webhook(self, url: str) -> Optional[dict]:
"""
Обновляет URL вебхука, связанного с картой.
:param url: Новый URL вебхука.
:return: Ответ API в виде словаря или None при ошибке.
"""
data = {'url': url}
return await super().put('card/webhook', data)
def webhook_verify(self, data: str, header: str) -> bool:
"""
Проверяет достоверность вебхука.
:param data: Данные из вебхука.
:type data: str
:param header: Заголовок X-Body-Hash из вебхука.
:return: True, если заголовок из вебхука достоверен, иначе False.
:rtype: bool
"""
hmac_data = b64encode(new(self.__token.encode('utf-8'), data, sha256).digest())
return compare_digest(hmac_data, header.encode('utf-8'))
def to_dict(self) -> dict:
"""
Преобразует объект SPAPI в словарь.
:return: Словарное представление объекта SPAPI.
:rtype: dict
"""
return self.__dict__.copy()

63
pyspapi/types.py Normal file
View File

@@ -0,0 +1,63 @@
class SPUser:
def __init__(self, data: dict):
self.__data = data
@property
def access(self) -> bool:
return True if self.__data['username'] is not None else False
def __repr__(self):
return self.__data['username']
class MojangProfile:
def __init__(self, data: dict):
self.__data = data
self.skin = Skin(data)
@property
def id(self) -> str:
return self.__data['profileId']
@property
def timestamp(self):
return self.__data['timestamp']
def __repr__(self):
return self.__data['profileName']
class Skin:
def __init__(self, data: dict):
self.__data = data
@property
def url(self) -> str:
return self.__data['textures']['SKIN']['url']
@property
def cape_url(self) -> str:
return self.__data['textures']['CAPE']['url']
@property
def model(self) -> str:
return 'classic' if self.__data['textures']['SKIN'].get('metadata') is None else 'slim'
def __repr__(self):
return str(self.__data['textures']['SKIN'])
class UsernameToUUID:
def __init__(self, data: dict):
self.__data = data
@property
def id(self):
return self.__data['id']
@property
def name(self):
return self.__data['name']
def __repr__(self):
return str(self.__data['id'])

View File

@@ -1,3 +0,0 @@
from .payment import Item
from .users import User
from .me import Account

View File

@@ -1,152 +0,0 @@
class City:
def __init__(self, city_id=None, name=None, x=None, z=None, nether_x=None, nether_z=None, lane=None, role=None,
created_at=None):
self._id = city_id
self._name = name
self._x = x
self._z = z
self._nether_x = nether_x
self._nether_z = nether_z
self._lane = lane
self._role = role
self._created_at = created_at
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def x(self):
return self._x
@property
def z(self):
return self._z
@property
def nether_x(self):
return self._nether_x
@property
def nether_z(self):
return self._nether_z
@property
def lane(self):
return self._lane
@property
def role(self):
return self._role
@property
def created_at(self):
return self._created_at
def __repr__(self):
return (
f"City(id={self._id}, name={self._name}, x={self._x}, z={self._z}, "
f"nether_x={self._nether_x}, nether_z={self._nether_z}, lane={self._lane}, role={self._role}, "
f"created_at={self._created_at})"
)
class Card:
def __init__(self, card_id=None, name=None, number=None, color=None):
self._id = card_id
self._name = name
self._number = number
self._color = color
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def number(self):
return self._number
@property
def color(self):
return self._color
def __repr__(self):
return f"Card(id={self._id}, name={self._name}, number={self._number}, color={self._color})"
class Account:
def __init__(self, account_id, username, minecraftuuid, status, roles, created_at, cards, cities):
self._id = account_id
self._username = username
self._minecraftuuid = minecraftuuid
self._status = status
self._roles = roles
self._cities = [
City(
city_id=city['city_id'],
name=city['name'],
x=city['x'],
z=city['z'],
nether_x=city['nether_x'],
nether_z=city['nether_z'],
lane=city['lane'],
role=city['role'],
created_at=city['created_at'],
)
for city in cities
]
self._cards = [
Card(
card_id=card["id"],
name=card["name"],
number=card["number"],
color=card["color"],
)
for card in cards
]
self._created_at = created_at
@property
def id(self):
return self._id
@property
def username(self):
return self._username
@property
def minecraftuuid(self):
return self._minecraftuuid
@property
def status(self):
return self._status
@property
def roles(self):
return self._roles
@property
def cities(self):
return self._cities
@property
def cards(self):
return self._cards
@property
def created_at(self):
return self._created_at
def __repr__(self):
return (f"Account(id={self._id}, username={self._username}, minecraftUUID={self._minecraftuuid}, "
f"status={self._status}, roles={self._roles}, cities={self._cities}, cards={self._cards}, "
f"created_at={self._created_at})")

View File

@@ -1,18 +0,0 @@
class Item:
def __init__(self, name: str, count: int, price: int, comment: str):
self._name = name
self._count = count
self._price = price
self._comment = comment
@property
def name(self):
return self._name
def to_json(self):
return {
"name": self._name,
"count": self._count,
"price": self._price,
"comment": self._comment
}

View File

@@ -1,51 +0,0 @@
class Cards:
def __init__(self, name, number):
self._name: str = name
self._number: str = number
@property
def name(self):
return self._name
@property
def number(self):
return self._number
# def __repr__(self):
# return "%s(%s)" % (
# self.__class__.__name__,
# self.__dict__
# )
class User:
def __init__(self, username, uuid, cards):
self._username: str = username
self._uuid: str = uuid
self._cards = [
Cards(
name=card["name"],
number=card["number"],
)
for card in cards
]
@property
def username(self):
return self._username
@property
def uuid(self):
return self._uuid
@property
def cards(self):
return self._cards
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
self.__dict__
)

View File

@@ -1 +1,2 @@
aiohttp>=3.8.0
requests==2.28.1
aiohttp>=3.8.0,<4.0.0

View File

@@ -1,6 +1,6 @@
import re
from setuptools import setup, find_packages
from setuptools import setup
requirements = []
with open("requirements.txt") as f:
@@ -21,6 +21,10 @@ readme = ""
with open("README.rst") as f:
readme = f.read()
packages = [
"pyspapi"
]
setup(
name='pyspapi',
license='MIT',
@@ -28,15 +32,15 @@ setup(
version=version,
url='https://github.com/deesiigneer/pyspapi',
project_urls={
"Documentation": "https://pyspapi.readthedocs.io/ru/latest/",
"pyspapi documentation": "https://pyspapi.readthedocs.io/",
"api documentation": "https://spworlds.readthedocs.io/",
"GitHub": "https://github.com/deesiigneer/pyspapi",
"Discord": "https://discord.com/invite/VbyHaKRAaN"
},
description='API wrapper for SP servers written in Python',
long_description=readme,
long_description_content_type='text/x-rst',
packages=find_packages(),
package_data={'pyspapi': ['types/*', 'api/*']}, # Включаем дополнительные файлы и папки
packages=packages,
include_package_data=True,
install_requires=requirements,
python_requires='>=3.8.0',