mirror of
https://github.com/deesiigneer/pyspapi.git
synced 2026-04-20 12:35:26 +00:00
Compare commits
1 Commits
83d4308906
...
3.0.0a0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c08394addf |
@@ -48,7 +48,7 @@ Checking the balance
|
||||
|
||||
import pyspapi
|
||||
|
||||
print(pyspapi.SPAPI(card_id='card_id', token='token').balance)
|
||||
print(await pyspapi.API(card_id='card_id', token='token').balance)
|
||||
|
||||
More examples can be found in the `examples <https://github.com/deesiigneer/pyspapi/tree/main/examples>`_
|
||||
|
||||
@@ -57,5 +57,6 @@ Links
|
||||
|
||||
- `Discord server <https://discord.gg/VbyHaKRAaN>`_
|
||||
- `pyspapi documentation <https://pyspapi.readthedocs.io/>`_
|
||||
- `API documentation <https://spworlds.readthedocs.io>`_
|
||||
- `PyPi <https://pypi.org/project/pyspapi/>`_
|
||||
- `API documentation for SP sites <https://github.com/sp-worlds/api-docs>`_
|
||||
|
||||
12
examples/get_name_history.py
Normal file
12
examples/get_name_history.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='card_id', token='token')
|
||||
|
||||
|
||||
async def main():
|
||||
print(await api.get_name_history(uuid='63ed47877aa3470fbfc46c5356c3d797'))
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
|
||||
15
examples/get_profile.py
Normal file
15
examples/get_profile.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='card_id', token='token')
|
||||
|
||||
|
||||
async def main():
|
||||
mojang_profile = await api.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797')
|
||||
print(mojang_profile)
|
||||
print(mojang_profile.id, mojang_profile.timestamp)
|
||||
print(mojang_profile.skin, mojang_profile.skin.model, mojang_profile.skin.cape_url, mojang_profile.skin.url)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
|
||||
21
examples/get_user and get_users.py
Normal file
21
examples/get_user and get_users.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
|
||||
api = pyspapi.API(card_id='card_id', token='token')
|
||||
|
||||
|
||||
async def main():
|
||||
user = await api.get_user(264329096920563714)
|
||||
print(user)
|
||||
print(user.access)
|
||||
# У API есть лимиты, каждый user = 1 запрос, учитывайте это при использовании get_users
|
||||
# https://spworlds.readthedocs.io/ru/latest/index.html#id3
|
||||
users = await api.get_users([262632724928397312, 264329096920563714])
|
||||
for user in users:
|
||||
print(user)
|
||||
if user is not None:
|
||||
print(user.access)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
17
examples/get_uuid and get_uuids.py
Normal file
17
examples/get_uuid and get_uuids.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import pyspapi
|
||||
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='card_id', token='token')
|
||||
|
||||
|
||||
async def main():
|
||||
uuid = await pyspapi.API(card_id='card_id', token='token').get_uuid(username='deesiigneer')
|
||||
print(uuid)
|
||||
print(uuid.id, uuid.name)
|
||||
print(await pyspapi.API(card_id='card_id', token='token').get_uuids(['deesiigneer', '5opka', 'OsterMiner']))
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
print(pyspapi.MojangAPI.get_name_history(uuid='63ed47877aa3470fbfc46c5356c3d797'))
|
||||
@@ -1,20 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797'))
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').timestamp)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').id)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').name)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').is_legacy_profile)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').cape_url)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').skin_url)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').skin_model)
|
||||
|
||||
print(pyspapi.MojangAPI.get_profile(uuid='63ed47877aa3470fbfc46c5356c3d797').skin)
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
print(pyspapi.MojangAPI.get_username(uuid='63ed47877aa3470fbfc46c5356c3d797'))
|
||||
@@ -1,5 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
print(pyspapi.MojangAPI.get_uuid(username='deesiigneer'))
|
||||
|
||||
print(pyspapi.MojangAPI.get_uuids(['deesiigneer', '5opka', 'OsterMiner']))
|
||||
16
examples/payments.py
Normal file
16
examples/payments.py
Normal file
@@ -0,0 +1,16 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='card_id', token='token')
|
||||
|
||||
|
||||
async def main():
|
||||
print(await api.payment(amount=1,
|
||||
redirect_url='https://www.google.com/',
|
||||
webhook_url='https://www.google.com/',
|
||||
data='some-data'
|
||||
)
|
||||
)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
@@ -1,5 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
spapi = pyspapi.SPAPI(card_id='card_id', token='token')
|
||||
|
||||
print(spapi.check_users_access([262632724928397312, 264329096920563714]))
|
||||
@@ -1,11 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
spapi = pyspapi.SPAPI(card_id='card_id', token='token')
|
||||
|
||||
print(spapi.get_user(262632724928397312))
|
||||
|
||||
print(spapi.get_user(262632724928397312).username)
|
||||
|
||||
print(spapi.get_user(262632724928397312).access)
|
||||
|
||||
print(spapi.get_users([262632724928397312, 264329096920563714]))
|
||||
@@ -1,10 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
spapi = pyspapi.SPAPI(card_id='card_id', token='token')
|
||||
|
||||
print(spapi.payment(amount=1,
|
||||
redirect_url='https://www.google.com/',
|
||||
webhook_url='https://www.google.com/',
|
||||
data='some-data'
|
||||
)
|
||||
)
|
||||
@@ -1,9 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
spapi = pyspapi.SPAPI(card_id='CARD_ID', token='TOKEN')
|
||||
|
||||
print(spapi.transaction(receiver=12345,
|
||||
amount=1,
|
||||
comment="test"
|
||||
)
|
||||
)
|
||||
@@ -1,5 +0,0 @@
|
||||
import pyspapi
|
||||
|
||||
spapi = pyspapi.SPAPI(card_id='your_card_id', token='your_token')
|
||||
|
||||
print(spapi.webhook_verify(data='webhook_data', header='webhook_header'))
|
||||
15
examples/transaction.py
Normal file
15
examples/transaction.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='CARD_ID', token='TOKEN')
|
||||
|
||||
|
||||
async def main():
|
||||
print(await api.transaction(receiver=12345,
|
||||
amount=1,
|
||||
comment="test"
|
||||
)
|
||||
)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
5
examples/webhook_listener.py
Normal file
5
examples/webhook_listener.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import pyspapi
|
||||
|
||||
api = pyspapi.API(card_id='your_card_id', token='your_token')
|
||||
|
||||
api.listener(host='myhost.com', port=80, webhook_path='/webhook/')
|
||||
10
examples/webhook_verify.py
Normal file
10
examples/webhook_verify.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import pyspapi
|
||||
import asyncio
|
||||
|
||||
api = pyspapi.API(card_id='your_card_id', token='your_token')
|
||||
|
||||
async def main():
|
||||
print(await api.webhook_verify(data='webhook_data', header='webhook_header'))
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
@@ -1,3 +1,4 @@
|
||||
from .api import *
|
||||
from .api import API
|
||||
from .types import SPUser, MojangProfile, Skin, UsernameToUUID
|
||||
|
||||
__version__ = "2.1.2"
|
||||
__version__ = "3.0.0a0"
|
||||
|
||||
352
pyspapi/api.py
352
pyspapi/api.py
@@ -1,98 +1,145 @@
|
||||
import json.decoder
|
||||
from sys import version_info
|
||||
import ast
|
||||
import warnings
|
||||
import asyncio
|
||||
from aiohttp import web
|
||||
from sys import version_info
|
||||
from base64 import b64encode, b64decode
|
||||
from hmac import new, compare_digest
|
||||
from hashlib import sha256
|
||||
from logging import getLogger
|
||||
from requests import get, post, Response
|
||||
from typing import Any, Dict, List, Optional
|
||||
from .models import MojangUserProfile, SPUserProfile
|
||||
import warnings
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from .types import SPUser, MojangProfile, UsernameToUUID
|
||||
from .request import Request
|
||||
from .errors import Error
|
||||
|
||||
log = getLogger('pyspapi')
|
||||
|
||||
|
||||
class _Error(Exception):
|
||||
"""
|
||||
class _BaseAPI:
|
||||
|
||||
"""
|
||||
def __init__(self, message: Optional[str] = None):
|
||||
self.message = message if message else self.__class__.__doc__
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class SPAPI:
|
||||
"""
|
||||
class SPAPI
|
||||
"""
|
||||
_SPWORLDS_DOMAIN_ = "https://spworlds.ru/api/public"
|
||||
_SPWORLDS = "https://spworlds.ru/api/public"
|
||||
_API_MOJANG = "https://api.mojang.com"
|
||||
_SESSIONSERVER_MOJANG = "https://sessionserver.mojang.com"
|
||||
|
||||
def __init__(self, card_id: str, token: str):
|
||||
self.__id = card_id
|
||||
self.__token = token
|
||||
self.__header = {
|
||||
'Authorization': f"Bearer {str(b64encode(str(f'{self.__id}:{self.__token}').encode('utf-8')), 'utf-8')}",
|
||||
"""
|
||||
:param card_id:
|
||||
:param token:
|
||||
"""
|
||||
self._id = card_id
|
||||
self._token = token
|
||||
|
||||
self._HEADER = {
|
||||
'Authorization': f"Bearer {str(b64encode(str(f'{self._id}:{self._token}').encode('utf-8')), 'utf-8')}",
|
||||
'User-Agent': f'pyspapi (https://github.com/deesiigneer/pyspapi) '
|
||||
f'Python {version_info.major}.{version_info.minor}.{version_info.micro}'
|
||||
}
|
||||
self.balance = self.__check_balance()
|
||||
|
||||
def __make_request(self, method: str, path: str, data: Optional[dict]) -> Optional[Response]:
|
||||
if method == 'GET':
|
||||
response = get(self._SPWORLDS_DOMAIN_ + path, headers=self.__header)
|
||||
return response
|
||||
elif method == 'POST':
|
||||
response = post(self._SPWORLDS_DOMAIN_ + path, headers=self.__header, json=data)
|
||||
return response
|
||||
async def webhook_verify(self, data: str, header) -> bool:
|
||||
"""
|
||||
Проверяет достоверность webhook'а. \n
|
||||
:param data: data из webhook.
|
||||
:param header: header X-Body-Hash из webhook.
|
||||
:return: True если header из webhook'а достоверен, иначе False
|
||||
"""
|
||||
print()
|
||||
hmac_data = b64encode(new(self._token.encode('utf-8'), data, sha256).digest())
|
||||
return compare_digest(hmac_data, header.encode('utf-8'))
|
||||
|
||||
def get_user(self, user_id: int) -> Optional[SPUserProfile]:
|
||||
def listener(self, host: str = '127.0.0.1', port: int = 80, webhook_path: str = '/webhook/'):
|
||||
app = web.Application()
|
||||
async def handle(request):
|
||||
request_data = await request.read()
|
||||
header = request.headers.get('X-Body-Hash')
|
||||
if header is not None:
|
||||
if await self.webhook_verify(data=request_data, header=header) is True:
|
||||
web.json_response(status=202)
|
||||
return True
|
||||
else:
|
||||
web.json_response(status=400)
|
||||
else:
|
||||
return web.json_response(status=404)
|
||||
app.add_routes([web.get(webhook_path, handle)])
|
||||
web.run_app(app, port=port, host=host)
|
||||
|
||||
|
||||
class API(_BaseAPI):
|
||||
"""
|
||||
class API
|
||||
"""
|
||||
|
||||
async def get_user(self, user_id: int) -> Optional[SPUser]:
|
||||
"""
|
||||
Получение информации об игроке SP \n
|
||||
:param user_id: ID пользователя в Discord.
|
||||
:return: Class SPUserProfile.
|
||||
"""
|
||||
response = self.__make_request('GET', f'/users/{str(user_id)}', data=None)
|
||||
if not response.ok:
|
||||
:return: Class User.
|
||||
"""
|
||||
sp_user = await Request.get(f'{self._SPWORLDS}/users/{str(user_id)}', self._HEADER)
|
||||
if sp_user is not None:
|
||||
return SPUser(await Request.get(f'{self._SPWORLDS}/users/{str(user_id)}', self._HEADER))
|
||||
else:
|
||||
return None
|
||||
try:
|
||||
username = response.json()['username']
|
||||
return SPUserProfile(access=True if username is not None else False, username=username)
|
||||
except json.decoder.JSONDecodeError:
|
||||
return
|
||||
|
||||
def get_users(self, user_ids: List[int]) -> List[str]:
|
||||
async def get_users(self, user_ids: List[int]) -> Union[SPUser, Any]:
|
||||
"""
|
||||
Получение никнеймов игроков в майнкрафте. **Не более 10**\n
|
||||
Получение никнеймов игроков в майнкрафте. **Максимально можно указать 60 user_ids, не используйте эту функцию
|
||||
чаще 1 раза в минуту если указали больше 60 user_ids**\n
|
||||
https://spworlds.readthedocs.io/ru/latest/index.html#id3\n
|
||||
:param user_ids: List[int] ID пользователей в Discord.
|
||||
:return: List[str] который содержит майнкрафт никнеймы игроков в том же порядке, который был задан,
|
||||
None если пользователь не найден или нет проходки.
|
||||
"""
|
||||
if len(user_ids) > 10:
|
||||
user_ids = user_ids[:10]
|
||||
warnings.warn('user_ids more than 10. Reduced to 10')
|
||||
nicknames_list = []
|
||||
for user_id in user_ids:
|
||||
nicknames_list.append(self.get_user(user_id).username
|
||||
if self.get_user(user_id) is not None else None)
|
||||
return nicknames_list
|
||||
|
||||
def check_users_access(self, user_ids: List[int]) -> List[bool]:
|
||||
:return: List[str] который содержит майнкрафт никнеймы игроков в том же порядке, который был задан, None если
|
||||
пользователь не найден или нет проходки.
|
||||
"""
|
||||
Проверка наличия проходки у списка пользователей Discord. **Не более 10**\n
|
||||
:param user_ids: Список(List[int]) содержащий ID пользователей в Discord.
|
||||
:return: Список(List[bool]) в том же порядке, который был задан.True - проходка имеется, иначе False.
|
||||
"""
|
||||
if len(user_ids) > 10:
|
||||
user_ids = user_ids[:10]
|
||||
warnings.warn('user_ids more than 10. Reduced to 10')
|
||||
ids_list = []
|
||||
if len(user_ids) > 60:
|
||||
user_ids = user_ids[:60]
|
||||
warnings.warn('user_ids больше чем 60. Уменьшено до 60.')
|
||||
tasks = []
|
||||
for user_id in user_ids:
|
||||
ids_list.append(self.get_user(user_id).access
|
||||
if self.get_user(user_id) is not None else None)
|
||||
return ids_list
|
||||
tasks.append(self.get_user(user_id))
|
||||
return await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
def payment(self, amount: int, redirect_url: str, webhook_url: str, data: str) -> Optional[str]:
|
||||
async def get_uuid(self, username: str) -> Optional[UsernameToUUID]:
|
||||
"""
|
||||
Получить UUID игрока Minecraft.\n
|
||||
:param username: str никнейм игрока Minecraft.
|
||||
:return: Optional[str] UUID игрока Minecraft.
|
||||
"""
|
||||
response = await Request.get(f'{self._API_MOJANG}/users/profiles/minecraft/{username}')
|
||||
return UsernameToUUID(await Request.get(f'{self._API_MOJANG}/users/profiles/minecraft/{username}'))
|
||||
|
||||
async def get_uuids(self, usernames: list[str]) -> Dict[str, str]:
|
||||
"""
|
||||
Получить UUID's игроков Minecraft. **Не больше 10**\n
|
||||
:param usernames: List[str] Список с никнеймами игроков Minecraft.
|
||||
:return: Dict[str, str] UUID игроков Minecraft.
|
||||
"""
|
||||
if len(usernames) > 10:
|
||||
usernames = usernames[:10]
|
||||
warnings.warn('usernames больше чем 10. Уменьшено до 10.')
|
||||
return await Request.post(f'{self._API_MOJANG}/profiles/minecraft', payload=usernames)
|
||||
|
||||
async def get_name_history(self, uuid: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
История никнеймов в Minecraft.\n
|
||||
:param uuid: UUID игрока Minecraft.
|
||||
:return: List[Dict[str, Any]] который содержит name и changed_to_at
|
||||
"""
|
||||
requests = await Request.get(f"{self._API_MOJANG}/user/profiles/{uuid}/names")
|
||||
|
||||
name_data = []
|
||||
for data in requests:
|
||||
name_data_dict = {"name": data["name"]}
|
||||
if data.get("changedToAt"):
|
||||
name_data_dict["changed_to_at"] = data["changedToAt"]
|
||||
else:
|
||||
name_data_dict["changed_to_at"] = 0
|
||||
name_data.append(name_data_dict)
|
||||
return name_data
|
||||
|
||||
async def get_profile(self, uuid: str) -> MojangProfile:
|
||||
response = await Request.get(f'{self._SESSIONSERVER_MOJANG}/session/minecraft/profile/{uuid}')
|
||||
return MojangProfile(ast.literal_eval(b64decode(response["properties"][0]["value"]).decode()))
|
||||
|
||||
async def payment(self, amount: int, redirect_url: str, webhook_url: str, data: str) -> Optional[str]:
|
||||
"""
|
||||
Создание ссылки для оплаты.\n
|
||||
:param amount: Стоимость покупки в АРах.
|
||||
@@ -102,32 +149,16 @@ class SPAPI:
|
||||
:return: Ссылку на страницу оплаты, на которую стоит перенаправить пользователя.
|
||||
"""
|
||||
if len(data) > 100:
|
||||
raise _Error('В data больше 100 символов')
|
||||
body = {
|
||||
'amount': amount,
|
||||
'redirectUrl': redirect_url,
|
||||
'webhookUrl': webhook_url,
|
||||
'data': data
|
||||
}
|
||||
response = self.__make_request('POST', '/payment', data=body)
|
||||
if not response.ok:
|
||||
return None
|
||||
try:
|
||||
return response.json()['url']
|
||||
except json.decoder.JSONDecodeError:
|
||||
return None
|
||||
raise Error('В data больше 100 символов')
|
||||
return await Request.post(f'{self._SPWORLDS}/payment',
|
||||
payload={
|
||||
'amount': amount,
|
||||
'redirectUrl': redirect_url,
|
||||
'webhookUrl': webhook_url,
|
||||
'data': data},
|
||||
headers=self._HEADER)
|
||||
|
||||
def webhook_verify(self, data: str, header) -> bool:
|
||||
"""
|
||||
Проверяет достоверность webhook'а. \n
|
||||
:param data: data из webhook.
|
||||
:param header: header X-Body-Hash из webhook.
|
||||
:return: True если header из webhook'а достоверен, иначе False
|
||||
"""
|
||||
hmac_data = b64encode(new(self.__token.encode('utf-8'), data, sha256).digest())
|
||||
return compare_digest(hmac_data, header.encode('utf-8'))
|
||||
|
||||
def transaction(self, receiver: int, amount: int, comment: str) -> Optional[str]:
|
||||
async def transaction(self, receiver: int, amount: int, comment: str) -> Optional[str]:
|
||||
"""
|
||||
Перевод АР на карту. \n
|
||||
:param receiver: Номер карты получателя.
|
||||
@@ -135,134 +166,17 @@ class SPAPI:
|
||||
:param comment: Комментарий для перевода.
|
||||
:return: True если перевод успешен, иначе False.
|
||||
"""
|
||||
body = {
|
||||
'receiver': receiver,
|
||||
'amount': amount,
|
||||
'comment': comment
|
||||
}
|
||||
response = self.__make_request('POST', '/transactions', data=body)
|
||||
if not response.ok:
|
||||
return None
|
||||
try:
|
||||
return 'Success' if response.status_code == 200 else 'Fail'
|
||||
except json.decoder.JSONDecodeError:
|
||||
return None
|
||||
return 'Удачно' if await Request.post(f'{self._SPWORLDS}/transactions',
|
||||
payload={'receiver': receiver,
|
||||
'amount': amount,
|
||||
'comment': comment},
|
||||
headers=self._HEADER) else 'Что-то пошло не так...'
|
||||
|
||||
def __check_balance(self) -> Optional[int]:
|
||||
@property
|
||||
async def balance(self) -> Optional[int]:
|
||||
"""
|
||||
Проверка баланса карты \n
|
||||
:return: Количество АР на карте.
|
||||
"""
|
||||
response = self.__make_request('GET', '/card', None)
|
||||
if not response.ok:
|
||||
return None
|
||||
try:
|
||||
return response.json()['balance']
|
||||
except json.decoder.JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
class MojangAPI:
|
||||
"""
|
||||
class MojangAPI
|
||||
"""
|
||||
|
||||
_API_DOMAIN_ = "https://api.mojang.com"
|
||||
_SESSIONSERVER_DOMAIN_ = "https://sessionserver.mojang.com"
|
||||
|
||||
@classmethod
|
||||
def __make_request(cls, server: str, method: str, path: str, data=Optional[dict]) -> Optional[Response]:
|
||||
if server == 'API':
|
||||
if method == 'GET':
|
||||
return get(cls._API_DOMAIN_ + path)
|
||||
elif method == 'POST':
|
||||
return post(cls._API_DOMAIN_ + path, json=data)
|
||||
elif server == 'SESSION':
|
||||
if method == 'GET':
|
||||
return get(cls._SESSIONSERVER_DOMAIN_ + path)
|
||||
|
||||
@classmethod
|
||||
def get_uuid(cls, username: str) -> Optional[str]:
|
||||
"""
|
||||
Получить UUID игрока Minecraft.\n
|
||||
:param username: str никнейм игрока Minecraft.
|
||||
:return: Optional[str] UUID игрока Minecraft.
|
||||
"""
|
||||
response = cls.__make_request('API', 'GET', f'/users/profiles/minecraft/{username}')
|
||||
if not response.ok:
|
||||
return None
|
||||
|
||||
try:
|
||||
return response.json()['id']
|
||||
except json.decoder.JSONDecodeError:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_uuids(cls, names: List[str]) -> Dict[str, str]:
|
||||
"""
|
||||
Получить UUID's игроков Minecraft.\n
|
||||
:param names: List[str] Список с никнеймами игроков Minecraft.
|
||||
:return: Dict[str, str] UUID игрока Minecraft.
|
||||
|
||||
"""
|
||||
if len(names) > 10:
|
||||
names = names[:10]
|
||||
response = cls.__make_request('API', 'POST', '/profiles/minecraft', data=names).json()
|
||||
if not isinstance(response, list):
|
||||
if response.get('error'):
|
||||
raise ValueError(response['errorMessage'])
|
||||
else:
|
||||
raise _Error(response)
|
||||
return {uuids['name']: uuids['id'] for uuids in response}
|
||||
|
||||
@classmethod
|
||||
def get_username(cls, uuid: str) -> Optional[str]:
|
||||
"""
|
||||
Получить никнейм игрока.\n
|
||||
:param uuid: UUID игрока Minecraft.
|
||||
:return: Optional[str] в виде никнейма игрока Minecraft.
|
||||
"""
|
||||
response = cls.__make_request('SESSION', 'GET', f'/session/minecraft/profile/{uuid}', None)
|
||||
if not response.ok:
|
||||
return None
|
||||
try:
|
||||
return response.json()["name"]
|
||||
except json.decoder.JSONDecodeError:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_profile(cls, uuid: str) -> Optional[MojangUserProfile]:
|
||||
"""
|
||||
Профиль игрока Minecraft.\n
|
||||
:param uuid: UUID игрока Minecraft.
|
||||
:return: Class MojangUserProfile
|
||||
"""
|
||||
response = cls.__make_request('SESSION', 'GET', f'/session/minecraft/profile/{uuid}')
|
||||
if not response.ok:
|
||||
return None
|
||||
try:
|
||||
value = response.json()["properties"][0]["value"]
|
||||
except (KeyError, json.decoder.JSONDecodeError):
|
||||
return None
|
||||
user_profile = ast.literal_eval(b64decode(value).decode())
|
||||
return MojangUserProfile(user_profile)
|
||||
|
||||
@classmethod
|
||||
def get_name_history(cls, uuid: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
История никнеймов в Minecraft.\n
|
||||
:param uuid: UUID игрока Minecraft.
|
||||
:return: List[Dict[str, Any]] который содержит name и changed_to_at
|
||||
"""
|
||||
requests = cls.__make_request('API', 'GET', f"/user/profiles/{uuid}/names")
|
||||
name_history = requests.json()
|
||||
|
||||
name_data = []
|
||||
for data in name_history:
|
||||
name_data_dict = {"name": data["name"]}
|
||||
if data.get("changedToAt"):
|
||||
name_data_dict["changed_to_at"] = data["changedToAt"]
|
||||
else:
|
||||
name_data_dict["changed_to_at"] = 0
|
||||
name_data.append(name_data_dict)
|
||||
return name_data
|
||||
balance = await Request.get(f'{self._SPWORLDS}/card', headers=self._HEADER)
|
||||
return balance['balance']
|
||||
|
||||
29
pyspapi/errors.py
Normal file
29
pyspapi/errors.py
Normal file
@@ -0,0 +1,29 @@
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Unauthorized(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NotFound(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TooManyRequests(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class UserNotFound(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def handle(response):
|
||||
if response['error'] == 'Unauthorized':
|
||||
raise Unauthorized(response['message'])
|
||||
elif response['error'] == 'Not Found':
|
||||
raise NotFound(response['message'])
|
||||
elif response['error'] == 'Too Many Requests':
|
||||
raise TooManyRequests(response['message'])
|
||||
else:
|
||||
raise Exception(response)
|
||||
@@ -1,55 +0,0 @@
|
||||
class _SPObject:
|
||||
"""Возвращает словарь всех атрибутов экземпляра"""
|
||||
def to_dict(self) -> dict:
|
||||
return self.__dict__.copy()
|
||||
|
||||
def __repr__(self):
|
||||
return "%s(%s)" % (
|
||||
self.__class__.__name__,
|
||||
self.__dict__
|
||||
)
|
||||
|
||||
|
||||
class SPUserProfile(_SPObject):
|
||||
def __init__(self,
|
||||
access: bool,
|
||||
username: str,
|
||||
):
|
||||
self.access = access
|
||||
self.username = username
|
||||
|
||||
|
||||
class _MojangObject:
|
||||
def to_dict(self) -> dict:
|
||||
"""Возвращает словарь всех атрибутов экземпляра"""
|
||||
return self.__dict__.copy()
|
||||
|
||||
def __repr__(self):
|
||||
return "%s(%s)" % (
|
||||
self.__class__.__name__,
|
||||
self.__dict__
|
||||
)
|
||||
|
||||
|
||||
class MojangUserProfile(_MojangObject):
|
||||
def __init__(self, data: dict):
|
||||
self.timestamp = data['timestamp']
|
||||
self.id = data['profileId']
|
||||
self.name = data['profileName']
|
||||
|
||||
self.is_legacy_profile = data.get('legacy')
|
||||
if self.is_legacy_profile is None:
|
||||
self.is_legacy_profile = False
|
||||
|
||||
self.cape_url = None
|
||||
self.skin_url = None
|
||||
self.skin_model = 'classic'
|
||||
|
||||
if data['textures'].get('CAPE'):
|
||||
self.cape_url = data['textures']['CAPE']['url']
|
||||
|
||||
if data['textures'].get('SKIN'):
|
||||
self.skin_url = data['textures']['SKIN']['url']
|
||||
self.skin = data['textures']['SKIN']
|
||||
if data['textures']['SKIN'].get('metadata'):
|
||||
self.skin_model = 'slim'
|
||||
31
pyspapi/request.py
Normal file
31
pyspapi/request.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import aiohttp
|
||||
from .errors import handle
|
||||
from typing import Coroutine, Any, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
Response = Coroutine[Any, Any, T]
|
||||
|
||||
class Request:
|
||||
async def get(url: str, headers=None):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, headers=headers) as session_response:
|
||||
try:
|
||||
response = await session_response.json()
|
||||
except:
|
||||
response = await session_response.text()
|
||||
if session_response.status == 404:
|
||||
return None
|
||||
if not session_response.ok:
|
||||
handle(response)
|
||||
return response
|
||||
|
||||
async def post(url: str, payload, headers=None):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
session_response = await session.post(url, json=payload, headers=headers)
|
||||
try:
|
||||
response = await session_response.json()
|
||||
except:
|
||||
response = await session_response.text()
|
||||
if not session_response.ok:
|
||||
handle(response)
|
||||
return response
|
||||
63
pyspapi/types.py
Normal file
63
pyspapi/types.py
Normal file
@@ -0,0 +1,63 @@
|
||||
class SPUser:
|
||||
def __init__(self, data: dict):
|
||||
self.__data = data
|
||||
|
||||
@property
|
||||
def access(self) -> bool:
|
||||
return True if self.__data['username'] is not None else False
|
||||
|
||||
def __repr__(self):
|
||||
return self.__data['username']
|
||||
|
||||
|
||||
class MojangProfile:
|
||||
def __init__(self, data: dict):
|
||||
self.__data = data
|
||||
self.skin = Skin(data)
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self.__data['profileId']
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
return self.__data['timestamp']
|
||||
|
||||
def __repr__(self):
|
||||
return self.__data['profileName']
|
||||
|
||||
|
||||
class Skin:
|
||||
def __init__(self, data: dict):
|
||||
self.__data = data
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
return self.__data['textures']['SKIN']['url']
|
||||
|
||||
@property
|
||||
def cape_url(self) -> str:
|
||||
return self.__data['textures']['CAPE']['url']
|
||||
|
||||
@property
|
||||
def model(self) -> str:
|
||||
return 'classic' if self.__data['textures']['SKIN'].get('metadata') is None else 'slim'
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.__data['textures']['SKIN'])
|
||||
|
||||
|
||||
class UsernameToUUID:
|
||||
def __init__(self, data: dict):
|
||||
self.__data = data
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self.__data['id']
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.__data['name']
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.__data['id'])
|
||||
@@ -1 +1,2 @@
|
||||
requests==2.28.1
|
||||
aiohttp>=3.8.0,<4.0.0
|
||||
3
setup.py
3
setup.py
@@ -32,7 +32,8 @@ setup(
|
||||
version=version,
|
||||
url='https://github.com/deesiigneer/pyspapi',
|
||||
project_urls={
|
||||
"Documentation": "https://pyspapi.readthedocs.io/ru/latest/",
|
||||
"pyspapi documentation": "https://pyspapi.readthedocs.io/",
|
||||
"api documentation": "https://spworlds.readthedocs.io/",
|
||||
"GitHub": "https://github.com/deesiigneer/pyspapi",
|
||||
"Discord": "https://discord.com/invite/VbyHaKRAaN"
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user