Skip to content

Commit

Permalink
Merge dev Branch Into master (#187)
Browse files Browse the repository at this point in the history
* Add geetest v4 support, refactor geetest server code

* Don't use pipe for older python versions compatibility

* feat: Add create geetest functionality

* Use client cookies in `create_geetest` method

* Refactor and modelify auth component

* Don't use pipe for older python versions compatibility (2)

* feat: Add CNWebLoginResult and MobileLoginResult

* refactor: Rename methods for better clarity

* Add geetest proxying to bypass referrer check

* Add hmac hashing function

* Implement game login

* Add regions decorators

* Add error handling to `create_mmt` method

* Add HI3 support

* Refactor qr code models

* Use game-specific `app_id`

* refactor: Remove unnecessary cn login headers

* refactor: Remove unnecessary headers

* style: Format imports

* feat: Use client's game when requesting qr code

* Don't use explicit defaults in overloads

* Use proper RSA key for cn routes

* Merge captcha-related HTML

* Reformat and fix type errors

* Fix QR code imports

* Add ZZZ support in game login

* fix: Fix mypy errors

Fixes error: "type" expects no type arguments, but 1 given [type-arg]
Fixes error: "dict" is not subscriptable, use "typing.Dict" instead  [misc]

* Fix HI3 new battle suit type

* Fix icon assertion in tests

* Fix QR code login invalid cookies error

* Fix test failing because icon URL changed

* Fix Signora test icon URL causing test to fail

* Remove AccountNotFound exception test

* Add `archon_quest_progress` to genshin notes

* Add missing models to __all__

* Add value to ArchonQuestStatus enum

* Fix daily check-in for Miyoushe Genshin not working

* Bypass referer check without using proxy

* Allow passing encrypted credentials to all auth funcs

* Fix cn daily check-in and update salt

* Auto-detect account server during daily check-in

* Rename HSR character path field and use enum

* Add merged `login_with_password` method

* Add missing `encrypted` arg

* Rename `encrypt_geetest_credentials` into `encrypt_credentials`

* Remove usage of pydantic V2 stuff

* Ensure Pydantic V1 compatibility

* Add exceptions for auth component

* Prevent type check from failing

---------

Co-authored-by: JokelBaf <[email protected]>
Co-authored-by: Lalatina <[email protected]>
  • Loading branch information
3 people authored May 23, 2024
1 parent c32ed4b commit 1a1c03b
Show file tree
Hide file tree
Showing 45 changed files with 2,262 additions and 992 deletions.
6 changes: 3 additions & 3 deletions genshin/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def honkai_stats(client: genshin.Client, uid: int) -> None:
for k, v in data.stats.as_dict(lang=client.lang).items():
if isinstance(v, dict):
click.echo(f"{k}:")
for nested_k, nested_v in typing.cast("dict[str, object]", v).items():
for nested_k, nested_v in typing.cast("typing.Dict[str, object]", v).items():
click.echo(f" {nested_k}: {click.style(str(nested_v), bold=True)}")
else:
click.echo(f"{k}: {click.style(str(v), bold=True)}")
Expand Down Expand Up @@ -338,8 +338,8 @@ def authkey() -> None:
async def login(account: str, password: str, port: int) -> None:
"""Login with a password."""
client = genshin.Client()
cookies = await client.login_with_password(account, password, port=port)
cookies = await genshin.complete_cookies(cookies)
result = await client.os_login_with_password(account, password, port=port)
cookies = await genshin.complete_cookies(result.dict())

base: http.cookies.BaseCookie[str] = http.cookies.BaseCookie(cookies)
click.echo(f"Your cookies are: {click.style(base.output(header='', sep=';'), bold=True)}")
Expand Down
4 changes: 2 additions & 2 deletions genshin/client/clients.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""A simple HTTP client for API endpoints."""

from .components import (
auth,
calculator,
chronicle,
daily,
diary,
gacha,
geetest,
hoyolab,
lineup,
teapot,
Expand All @@ -28,6 +28,6 @@ class Client(
wiki.WikiClient,
gacha.WishClient,
transaction.TransactionClient,
geetest.GeetestClient,
auth.AuthClient,
):
"""A simple HTTP client for API endpoints."""
10 changes: 10 additions & 0 deletions genshin/client/components/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Auth-related utility.
Credits to:
- JokelBaf - https://github.com/jokelbaf
- Seria - https://github.com/seriaati
- M-307 - https://github.com/mrwan200
- gsuid_core - https://github.com/Genshin-bots/gsuid_core
"""

from .client import *
343 changes: 343 additions & 0 deletions genshin/client/components/auth/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
"""Main auth client."""

import asyncio
import logging
import typing

import aiohttp

from genshin import errors, types
from genshin.client import routes
from genshin.client.components import base
from genshin.client.manager import managers
from genshin.client.manager.cookie import fetch_cookie_token_with_game_token, fetch_stoken_with_game_token
from genshin.models.auth.cookie import (
AppLoginResult,
CNWebLoginResult,
GameLoginResult,
MobileLoginResult,
QRLoginResult,
WebLoginResult,
)
from genshin.models.auth.geetest import MMT, RiskyCheckMMT, RiskyCheckMMTResult, SessionMMT, SessionMMTResult
from genshin.models.auth.qrcode import QRCodeStatus
from genshin.models.auth.verification import ActionTicket
from genshin.types import Game
from genshin.utility import auth as auth_utility
from genshin.utility import ds as ds_utility

from . import server, subclients

__all__ = ["AuthClient"]

LOGGER_ = logging.getLogger(__name__)


class AuthClient(subclients.AppAuthClient, subclients.WebAuthClient, subclients.GameAuthClient):
"""Auth client component."""

async def login_with_password(
self,
account: str,
password: str,
*,
port: int = 5000,
encrypted: bool = False,
geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
) -> typing.Union[WebLoginResult, CNWebLoginResult]:
"""Login with a password via web endpoint.
Endpoint is chosen based on client region.
Note that this will start a webserver if captcha is
triggered and `geetest_solver` is not passed.
Raises
------
- AccountLoginFail: Invalid password provided.
- AccountDoesNotExist: Invalid email/username.
"""
if self.region is types.Region.CHINESE:
return await self.cn_login_with_password(
account, password, encrypted=encrypted, port=port, geetest_solver=geetest_solver
)

return await self.os_login_with_password(
account, password, port=port, encrypted=encrypted, geetest_solver=geetest_solver
)

@base.region_specific(types.Region.OVERSEAS)
async def os_login_with_password(
self,
account: str,
password: str,
*,
port: int = 5000,
encrypted: bool = False,
token_type: typing.Optional[int] = 6,
geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
) -> WebLoginResult:
"""Login with a password via web endpoint.
Note that this will start a webserver if captcha is
triggered and `geetest_solver` is not passed.
Raises
------
- AccountLoginFail: Invalid password provided.
- AccountDoesNotExist: Invalid email/username.
"""
result = await self._os_web_login(account, password, encrypted=encrypted, token_type=token_type)

if not isinstance(result, SessionMMT):
# Captcha not triggered
return result

if geetest_solver:
mmt_result = await geetest_solver(result)
else:
mmt_result = await server.solve_geetest(result, port=port)

return await self._os_web_login(
account, password, encrypted=encrypted, token_type=token_type, mmt_result=mmt_result
)

@base.region_specific(types.Region.CHINESE)
async def cn_login_with_password(
self,
account: str,
password: str,
*,
encrypted: bool = False,
port: int = 5000,
geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
) -> CNWebLoginResult:
"""Login with a password via Miyoushe loginByPassword endpoint.
Note that this will start a webserver if captcha is
triggered and `geetest_solver` is not passed.
"""
result = await self._cn_web_login(account, password, encrypted=encrypted)

if not isinstance(result, SessionMMT):
# Captcha not triggered
return result

if geetest_solver:
mmt_result = await geetest_solver(result)
else:
mmt_result = await server.solve_geetest(result, port=port)

return await self._cn_web_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

@base.region_specific(types.Region.OVERSEAS)
async def check_mobile_number_validity(self, mobile: str) -> bool:
"""Check if a mobile number is valid (it's registered on Miyoushe).
Returns True if the mobile number is valid, False otherwise.
"""
async with aiohttp.ClientSession() as session:
async with session.get(
routes.CHECK_MOBILE_VALIDITY_URL.get_url(),
params={"mobile": mobile},
) as r:
data = await r.json()

return data["data"]["status"] != data["data"]["is_registable"]

@base.region_specific(types.Region.CHINESE)
async def login_with_mobile_number(
self,
mobile: str,
*,
encrypted: bool = False,
port: int = 5000,
) -> MobileLoginResult:
"""Login with mobile number, returns cookies.
Only works for Chinese region (Miyoushe) users, do not include
area code (+86) in the mobile number.
Steps:
1. Sends OTP to the provided mobile number.
2. If captcha is triggered, prompts the user to solve it.
3. Lets user enter the OTP.
4. Logs in with the OTP.
5. Returns cookies.
"""
result = await self._send_mobile_otp(mobile, encrypted=encrypted)

if isinstance(result, SessionMMT):
# Captcha triggered
mmt_result = await server.solve_geetest(result, port=port)
await self._send_mobile_otp(mobile, encrypted=encrypted, mmt_result=mmt_result)

otp = await server.enter_code(port=port)
return await self._login_with_mobile_otp(mobile, otp, encrypted=encrypted)

@base.region_specific(types.Region.OVERSEAS)
async def login_with_app_password(
self,
account: str,
password: str,
*,
encrypted: bool = False,
port: int = 5000,
geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
) -> AppLoginResult:
"""Login with a password via HoYoLab app endpoint.
Note that this will start a webserver if either of the
following happens:
1. Captcha is triggered and `geetest_solver` is not passed.
2. Email verification is triggered (can happen if you
first login with a new device).
Raises
------
- AccountLoginFail: Invalid password provided.
- AccountDoesNotExist: Invalid email/username.
- VerificationCodeRateLimited: Too many verification code requests.
"""
result = await self._app_login(account, password, encrypted=encrypted)

if isinstance(result, SessionMMT):
# Captcha triggered
if geetest_solver:
mmt_result = await geetest_solver(result)
else:
mmt_result = await server.solve_geetest(result, port=port)

result = await self._app_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

if isinstance(result, ActionTicket):
# Email verification required
mmt = await self._send_verification_email(result)
if mmt:
if geetest_solver:
mmt_result = await geetest_solver(mmt)
else:
mmt_result = await server.solve_geetest(mmt, port=port)

await asyncio.sleep(2) # Add delay to prevent [-3206]
await self._send_verification_email(result, mmt_result=mmt_result)

code = await server.enter_code(port=port)
await self._verify_email(code, result)

result = await self._app_login(account, password, encrypted=encrypted, ticket=result)

return result

@base.region_specific(types.Region.CHINESE)
async def login_with_qrcode(self) -> QRLoginResult:
"""Login with QR code, only available for Miyoushe users."""
import qrcode
import qrcode.image.pil
from qrcode.constants import ERROR_CORRECT_L

creation_result = await self._create_qrcode()
qrcode_: qrcode.image.pil.PilImage = qrcode.make(creation_result.url, error_correction=ERROR_CORRECT_L) # type: ignore
qrcode_.show()

scanned = False
while True:
check_result = await self._check_qrcode(
creation_result.app_id, creation_result.device_id, creation_result.ticket
)
if check_result.status == QRCodeStatus.SCANNED and not scanned:
LOGGER_.info("QR code scanned")
scanned = True
elif check_result.status == QRCodeStatus.CONFIRMED:
LOGGER_.info("QR code login confirmed")
break

await asyncio.sleep(2)

raw_data = check_result.payload.raw
assert raw_data is not None

cookie_token = await fetch_cookie_token_with_game_token(
game_token=raw_data.game_token, account_id=raw_data.account_id
)
stoken = await fetch_stoken_with_game_token(game_token=raw_data.game_token, account_id=int(raw_data.account_id))

cookies = {
"stoken_v2": stoken.token,
"ltuid": stoken.aid,
"account_id": stoken.aid,
"ltmid": stoken.mid,
"cookie_token": cookie_token,
}
self.set_cookies(cookies)
return QRLoginResult(**cookies)

@base.region_specific(types.Region.CHINESE)
@managers.no_multi
async def create_mmt(self) -> MMT:
"""Create a geetest challenge."""
is_genshin = self.game is Game.GENSHIN
headers = {
"DS": ds_utility.generate_create_geetest_ds(),
"x-rpc-challenge_game": "2" if is_genshin else "6",
"x-rpc-page": "v4.1.5-ys_#ys" if is_genshin else "v1.4.1-rpg_#/rpg",
"x-rpc-tool-verison": "v4.1.5-ys" if is_genshin else "v1.4.1-rpg",
**auth_utility.CREATE_MMT_HEADERS,
}

assert isinstance(self.cookie_manager, managers.CookieManager)
async with self.cookie_manager.create_session() as session:
async with session.get(
routes.CREATE_MMT_URL.get_url(), headers=headers, cookies=self.cookie_manager.cookies
) as r:
data = await r.json()

if not data["data"]:
errors.raise_for_retcode(data)

return MMT(**data["data"])

@base.region_specific(types.Region.OVERSEAS)
async def os_game_login(
self,
account: str,
password: str,
*,
encrypted: bool = False,
port: int = 5000,
geetest_solver: typing.Optional[typing.Callable[[RiskyCheckMMT], typing.Awaitable[RiskyCheckMMTResult]]] = None,
) -> GameLoginResult:
"""Perform a login to the game.
Raises
------
- IncorrectGameAccount: Invalid account provided.
- IncorrectGamePassword: Invalid password provided.
"""
result = await self._shield_login(account, password, encrypted=encrypted)

if isinstance(result, RiskyCheckMMT):
if geetest_solver:
mmt_result = await geetest_solver(result)
else:
mmt_result = await server.solve_geetest(result, port=port)

result = await self._shield_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

if not result.device_grant_required:
return await self._os_game_login(result.account.uid, result.account.token)

mmt = await self._send_game_verification_email(result.account.device_grant_ticket)
if mmt:
if geetest_solver:
mmt_result = await geetest_solver(mmt)
else:
mmt_result = await server.solve_geetest(mmt, port=port)

await self._send_game_verification_email(result.account.device_grant_ticket, mmt_result=mmt_result)

code = await server.enter_code()
verification_result = await self._verify_game_email(code, result.account.device_grant_ticket)

return await self._os_game_login(result.account.uid, verification_result.game_token)
Loading

0 comments on commit 1a1c03b

Please sign in to comment.