From 58fa50f32e0be187e3fd9a76fd4f930329f31526 Mon Sep 17 00:00:00 2001 From: AirportR Date: Wed, 12 Oct 2022 11:13:57 +0800 Subject: [PATCH 1/3] improve netflix test method --- ssrspeed/speedtest/method/st_stream.py | 112 +++++++++++++++++-------- 1 file changed, 78 insertions(+), 34 deletions(-) diff --git a/ssrspeed/speedtest/method/st_stream.py b/ssrspeed/speedtest/method/st_stream.py index 089cfe6..8b92a22 100644 --- a/ssrspeed/speedtest/method/st_stream.py +++ b/ssrspeed/speedtest/method/st_stream.py @@ -55,14 +55,58 @@ async def netflix(cls, host, headers, inner_dict, port, outbound_ip): logger.error(f"Netflix error: {str(e)}") return {} + @classmethod + async def netflix_new(cls, host, headers, inner_dict, port, outbound_ip): + logger.info(f"Performing netflix(new) test LOCAL_PORT: {port}.") + try: + async with aiohttp.ClientSession( + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), + ) as session, session.get( + url="https://www.netflix.com/title/70143836" # "https://www.netflix.com/title/70242311" + ) as response1: + if response1.status == 200: + text = str(await response1.read()) + locate = text.find("preferredLocale") + netflix_ip = nf_ip_re.findall(text)[0].split( + "," + )[0] + logger.info(f"Netflix IP : {netflix_ip}") + if locate: + region = text[locate + 29:locate + 31] + else: + region = "未知" + if outbound_ip == netflix_ip: + logger.info("Netflix test result: Full Native.") + inner_dict["Ntype"] = f"Full Native({region})" + else: + logger.info("Netflix test result: Full DNS.") + inner_dict["Ntype"] = f"Full DNS({region})" + return + async with session.get( + url="https://www.netflix.com/title/70242311" + ) as response2: + rg = "" + if response2.status == 200: + logger.info("Netflix test result: Only Original.") + inner_dict["Ntype"] = "Only Original" + else: + logger.info("Netflix test result: None.") + inner_dict["Ntype"] = "None" + # 测试连接状态 + except Exception as e: + logger.error(f"Netflix error: {str(e)}") + return {} + @classmethod async def hbomax(cls, host, headers, inner_dict, port): logger.info(f"Performing HBO max test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session, session.get( url="https://www.hbomax.com/", allow_redirects=False ) as response: @@ -75,9 +119,9 @@ async def disneyplus(cls, host, headers, inner_dict, port): logger.info(f"Performing Disney plus test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=5), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=5), ) as session, session.get( url="https://www.disneyplus.com/" ) as response1, session.get( @@ -102,9 +146,9 @@ async def youtube(cls, host, headers, inner_dict, port): logger.info(f"Performing Youtube Premium test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session, session.get( url="https://music.youtube.com/", allow_redirects=False ) as response: @@ -122,9 +166,9 @@ async def abema(cls, host, headers, inner_dict, port): logger.info(f"Performing Abema test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session, session.get( url="https://api.abema.io/v1/ip/check?device=android", allow_redirects=False, @@ -139,9 +183,9 @@ async def bahamut(cls, host, headers, inner_dict, port): logger.info(f"Performing Bahamut test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session, session.get( url="https://ani.gamer.com.tw/ajax/token.php?adID=89422&sn=14667", allow_redirects=False, @@ -156,9 +200,9 @@ async def indazn(cls, host, headers, inner_dict, port): logger.info(f"Performing Dazn test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port, verify_ssl=False), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port, verify_ssl=False), + timeout=aiohttp.ClientTimeout(connect=10), ) as session: payload = { "LandingPageKey": "generic", @@ -170,9 +214,9 @@ async def indazn(cls, host, headers, inner_dict, port): "Version": "2", } async with session.post( - url="https://startup.core.indazn.com/misl/v5/Startup", - json=payload, - allow_redirects=False, + url="https://startup.core.indazn.com/misl/v5/Startup", + json=payload, + allow_redirects=False, ) as response: inner_dict["Dztype"] = response.status == 200 except Exception as e: @@ -183,9 +227,9 @@ async def mytvsuper(cls, host, headers, inner_dict, port): logger.info(f"Performing TVB test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session, session.get( url="https://www.mytvsuper.com/iptest.php", allow_redirects=False ) as response: @@ -199,9 +243,9 @@ async def bilibili(cls, host, headers, inner_dict, port): logger.info(f"Performing Bilibili test LOCAL_PORT: {port}.") try: async with aiohttp.ClientSession( - headers=headers, - connector=ProxyConnector(host=host, port=port), - timeout=aiohttp.ClientTimeout(connect=10), + headers=headers, + connector=ProxyConnector(host=host, port=port), + timeout=aiohttp.ClientTimeout(connect=10), ) as session: params = { "avid": 50762638, @@ -216,9 +260,9 @@ async def bilibili(cls, host, headers, inner_dict, port): "module": "bangumi", } async with session.get( - url="https://api.bilibili.com/pgc/player/web/playurl", - params=params, - allow_redirects=False, + url="https://api.bilibili.com/pgc/player/web/playurl", + params=params, + allow_redirects=False, ) as response: if response.status == 200: json_data = await response.json() @@ -239,9 +283,9 @@ async def bilibili(cls, host, headers, inner_dict, port): } session.cookie_jar.clear() async with session.get( - url="https://api.bilibili.com/pgc/player/web/playurl", - params=params, - allow_redirects=False, + url="https://api.bilibili.com/pgc/player/web/playurl", + params=params, + allow_redirects=False, ) as response2: if response2.status == 200: json_data2 = await response2.json() @@ -257,7 +301,7 @@ async def start_stream_test(port, stream_cfg, outbound_ip): host = "127.0.0.1" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" } test_list = [] inner_dict = { From fb39213185bdb145bc049914f6bb9df7a59dc099 Mon Sep 17 00:00:00 2001 From: AirportR Date: Wed, 12 Oct 2022 12:20:10 +0800 Subject: [PATCH 2/3] improve netflix test method --- ssrspeed/speedtest/method/st_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ssrspeed/speedtest/method/st_stream.py b/ssrspeed/speedtest/method/st_stream.py index 362e05e..0b9b651 100644 --- a/ssrspeed/speedtest/method/st_stream.py +++ b/ssrspeed/speedtest/method/st_stream.py @@ -73,7 +73,7 @@ async def netflix_new(cls, host, headers, inner_dict, port, outbound_ip): "," )[0] logger.info(f"Netflix IP : {netflix_ip}") - if locate: + if locate > 0: region = text[locate + 29:locate + 31] else: region = "未知" From 4d94d59a655525e969624175dd5bec83151e690f Mon Sep 17 00:00:00 2001 From: AirportR Date: Tue, 1 Aug 2023 00:02:43 +0800 Subject: [PATCH 3/3] :sparkles: Added support for local emoji sources. --- ssrspeed/util/emo.py | 138 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 133 insertions(+), 5 deletions(-) diff --git a/ssrspeed/util/emo.py b/ssrspeed/util/emo.py index bbeb42c..1a976ba 100644 --- a/ssrspeed/util/emo.py +++ b/ssrspeed/util/emo.py @@ -1,11 +1,17 @@ +import abc +import asyncio import contextlib +import os import re +import shutil from io import BytesIO from typing import ClassVar, Optional import pilmoji import requests +from aiohttp import ClientSession from emoji import demojize +from loguru import logger from unidecode import unidecode @@ -23,10 +29,10 @@ def get_emoji(self, emoji: str, /) -> Optional[BytesIO]: # type: ignore name = unidecode( demojize(emoji) - .strip(":️") - .replace("_", "-") - .replace("-&-", "-") - .replace(".", "") + .strip(":️") + .replace("_", "-") + .replace("-&-", "-") + .replace(".", "") ) if name[0].isupper(): name = f"flag-{name.lower()}" @@ -105,6 +111,114 @@ class TossFacePediaSource(EmojiPediaSource): STYLE = "toss-face/342/" +class LocalSource(pilmoji.source.BaseSource): + """ + emoji本地源基类 + """ + + def get_emoji(self, emoji: str, /) -> Optional[BytesIO]: + file_path = self.get_file_path(emoji) + try: + with open(file_path, "rb") as file: + return BytesIO(file.read()) + except FileNotFoundError: + pass + return None + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + raise NotImplementedError + + @abc.abstractmethod + def get_file_path(self, emoji: str) -> str: + return '' + + @abc.abstractmethod + def download_emoji(self, download_url): + pass + + +class OpenmojiLocalSource(LocalSource): + """ + 图片源:https://github.com/hfg-gmuend/openmoji/tree/master/color/72x72 + 安装路径:./resources/emoji/openmoji + """ + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + pass + + def get_file_path(self, emoji: str) -> str: + code_points = [f'{ord(c):04X}' for c in emoji] + return f"./resources/emoji/openmoji/{'-'.join(code_points)}.png" + + def download_emoji(self, download_url): + pass + + +class TwemojiLocalSource(LocalSource): + """ + 图片源:https://github.com/twitter/twemoji/tree/master/assets/72x72 + 安装路径:./resources/emoji/twemoji + """ + + def __init__(self, init: str = None, proxy=None): + """ + 构造函数中,如果init不为none,则提供下载emoji资源包的url地址 + """ + self.savepath = './resources/emoji/twemoji.zip' + self._download_url = 'https://github.com/twitter/twemoji/archive/refs/tags/v14.0.2.zip' + if init is None: + return + self.download_emoji(init, proxy=proxy) + self.init_emoji(self.savepath) + + @property + def download_url(self): + return self._download_url + + @staticmethod + def init_emoji(savepath: str): + # 解压下载好的文件 + shutil.unpack_archive(savepath, './resources/emoji/', format='zip') + # print("解压完成") + # 重命名 + dirs = os.listdir('./resources/emoji/') + for d in dirs: + if d.startswith('twemoji') and not d.endswith('.zip'): + os.rename(os.path.join(os.path.abspath('./resources/emoji/'), d), + os.path.join(os.path.abspath('./resources/emoji/'), 'twemoji')) + break + return os.path.isdir('./resources/emoji/twemoji') + + async def download_emoji(self, download_url: str = None, savepath='./resources/emoji/twemoji.zip', proxy=None): + # 如果本地已存在,便无需重新下载 + if os.path.isdir('./resources/emoji/twemoji'): + return + _url = self.download_url if download_url is None else download_url # 如果没有提供下载地址则用默认的 + print("Download URL:", _url) + # 从网络上下载 + async with ClientSession(headers={'user-agent': 'SSRSpeedN'}) as session: + async with session.get(_url, proxy=proxy, timeout=20) as resp: + if resp.status == 200: + with open(savepath, 'wb') as f: + while True: + block = await resp.content.read(1024) + if not block: + break + f.write(block) + else: + raise Exception(f"NetworkError: {resp.status}==>\t{_url}") + + def get_discord_emoji(self, _id: int, /) -> Optional[BytesIO]: + pass + + def get_file_path(self, emoji: str) -> str: + code_points = [f'{ord(c):x}' for c in emoji] + if emoji == "4️⃣" or emoji == '6️⃣': + del code_points[1] + file_path = f"./resources/emoji/twemoji/assets/72x72/{'-'.join(code_points)}.png" + return file_path + + __all__ = [ "ApplePediaSource", "GooglePediaSource", @@ -117,9 +231,10 @@ class TossFacePediaSource(EmojiPediaSource): "SkypePediaSource", "JoyPixelsPediaSource", "TossFacePediaSource", + "TwemojiLocalSource", + "OpenmojiLocalSource" ] - if __name__ == "__main__": from PIL import Image, ImageFont @@ -127,6 +242,19 @@ class TossFacePediaSource(EmojiPediaSource): Hello, world! 👋 Here are some flags: 🇧🇦 🇷🇪 🇨🇼 🇺🇲 """ + + def check_init(): + if not os.path.isdir('./resources/emoji/twemoji'): + twemoji = TwemojiLocalSource() + print("检测到未安装emoji资源包,正在初始化本地emoji...") + asyncio.get_event_loop().run_until_complete(twemoji.download_emoji(proxy=None)) + if twemoji.init_emoji(twemoji.savepath): + logger.info("初始化emoji成功") + else: + logger.warning("初始化emoji失败") + + + # check_init() with Image.new("RGB", (550, 80), (255, 255, 255)) as image: font = ImageFont.truetype("arial.ttf", 24)