From e7ab4c7c81733eba523d660fb15b399cc2311247 Mon Sep 17 00:00:00 2001 From: Daniel Parizher <105245560+Arborym@users.noreply.github.com> Date: Thu, 30 Mar 2023 07:22:36 -0400 Subject: [PATCH] Cleanup (#174) * Clean up some code --- book_maker/cli.py | 49 +++-- book_maker/loader/txt_loader.py | 22 +-- book_maker/obok.py | 178 ++++++++---------- book_maker/translator/base_translator.py | 2 +- book_maker/translator/caiyun_translator.py | 9 +- .../translator/chatgptapi_translator.py | 45 +++-- book_maker/translator/deepl_translator.py | 19 +- book_maker/translator/google_translator.py | 6 +- book_maker/translator/gpt3_translator.py | 7 +- 9 files changed, 167 insertions(+), 170 deletions(-) diff --git a/book_maker/cli.py b/book_maker/cli.py index 2233ad6..69bdb6f 100644 --- a/book_maker/cli.py +++ b/book_maker/cli.py @@ -22,23 +22,20 @@ def parse_prompt_arg(prompt_arg): # if not a json string, treat it as a template string prompt = {"user": prompt_arg} + elif os.path.exists(prompt_arg): + if prompt_arg.endswith(".txt"): + # if it's a txt file, treat it as a template string + with open(prompt_arg, encoding="utf-8") as f: + prompt = {"user": f.read()} + elif prompt_arg.endswith(".json"): + # if it's a json file, treat it as a json object + # eg: --prompt prompt_template_sample.json + with open(prompt_arg, encoding="utf-8") as f: + prompt = json.load(f) else: - if os.path.exists(prompt_arg): - if prompt_arg.endswith(".txt"): - # if it's a txt file, treat it as a template string - with open(prompt_arg, "r") as f: - prompt = {"user": f.read()} - elif prompt_arg.endswith(".json"): - # if it's a json file, treat it as a json object - # eg: --prompt prompt_template_sample.json - with open(prompt_arg, "r") as f: - prompt = json.load(f) - else: - raise FileNotFoundError(f"{prompt_arg} not found") + raise FileNotFoundError(f"{prompt_arg} not found") - if prompt is None or not ( - all(c in prompt["user"] for c in ["{text}", "{language}"]) - ): + if prompt is None or any(c not in prompt["user"] for c in ["{text}", "{language}"]): raise ValueError("prompt must contain `{text}` and `{language}`") if "user" not in prompt: @@ -123,7 +120,7 @@ def main(): "--language", type=str, choices=sorted(LANGUAGES.keys()) - + sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]), + + sorted([k.title() for k in TO_LANGUAGE_CODE]), default="zh-hans", metavar="LANGUAGE", help="language to translate to, available: {%(choices)s}", @@ -227,20 +224,20 @@ So you are close to reaching the limit. You have to choose your own value, there translate_model = MODEL_DICT.get(options.model) assert translate_model is not None, "unsupported model" if options.model in ["gpt3", "chatgptapi"]: - OPENAI_API_KEY = ( + if OPENAI_API_KEY := ( options.openai_key or env.get( - "OPENAI_API_KEY" + "OPENAI_API_KEY", ) # XXX: for backward compatability, deprecate soon or env.get( - "BBM_OPENAI_API_KEY" + "BBM_OPENAI_API_KEY", ) # suggest adding `BBM_` prefix for all the bilingual_book_maker ENVs. - ) - if not OPENAI_API_KEY: + ): + API_KEY = OPENAI_API_KEY + else: raise Exception( - "OpenAI API key not provided, please google how to obtain it" + "OpenAI API key not provided, please google how to obtain it", ) - API_KEY = OPENAI_API_KEY elif options.model == "caiyun": API_KEY = options.caiyun_key or env.get("BBM_CAIYUN_API_KEY") if not API_KEY: @@ -253,12 +250,12 @@ So you are close to reaching the limit. You have to choose your own value, there API_KEY = "" if options.book_from == "kobo": - import book_maker.obok as obok + from book_maker import obok device_path = options.device_path if device_path is None: raise Exception( - "Device path is not given, please specify the path by --device_path " + "Device path is not given, please specify the path by --device_path ", ) options.book_name = obok.cli_main(device_path) @@ -266,7 +263,7 @@ So you are close to reaching the limit. You have to choose your own value, there support_type_list = list(BOOK_LOADER_DICT.keys()) if book_type not in support_type_list: raise Exception( - f"now only support files of these formats: {','.join(support_type_list)}" + f"now only support files of these formats: {','.join(support_type_list)}", ) book_loader = BOOK_LOADER_DICT.get(book_type) diff --git a/book_maker/loader/txt_loader.py b/book_maker/loader/txt_loader.py index eb50bed..5a77016 100644 --- a/book_maker/loader/txt_loader.py +++ b/book_maker/loader/txt_loader.py @@ -18,7 +18,7 @@ class TXTBookLoader(BaseBookLoader): is_test=False, test_num=5, prompt_config=None, - ): + ) -> None: self.txt_name = txt_name self.translate_model = model( key, @@ -34,11 +34,11 @@ class TXTBookLoader(BaseBookLoader): self.batch_size = 10 try: - with open(f"{txt_name}", "r", encoding="utf-8") as f: + with open(f"{txt_name}", encoding="utf-8") as f: self.origin_book = f.read().split("\n") - except Exception: - raise Exception("can not load file") + except Exception as e: + raise Exception("can not load file") from e self.resume = resume self.bin_path = f"{Path(txt_name).parent}/.{Path(txt_name).stem}.temp.bin" @@ -65,14 +65,12 @@ class TXTBookLoader(BaseBookLoader): batch_text = "".join(i) if self._is_special_text(batch_text): continue - if self.resume and index < p_to_save_len: - pass - else: + if not self.resume or index >= p_to_save_len: try: temp = self.translate_model.translate(batch_text) except Exception as e: - print(str(e)) - raise Exception("Something is wrong when translate") + print(e) + raise Exception("Something is wrong when translate") from e self.p_to_save.append(temp) self.bilingual_result.append(batch_text) self.bilingual_result.append(temp) @@ -122,10 +120,10 @@ class TXTBookLoader(BaseBookLoader): def load_state(self): try: - with open(self.bin_path, "r", encoding="utf-8") as f: + with open(self.bin_path, encoding="utf-8") as f: self.p_to_save = f.read().split("\n") - except Exception: - raise Exception("can not load resume file") + except Exception as e: + raise Exception("can not load resume file") from e def save_file(self, book_path, content): try: diff --git a/book_maker/obok.py b/book_maker/obok.py index dd4eeb9..d4a4d18 100644 --- a/book_maker/obok.py +++ b/book_maker/obok.py @@ -162,19 +162,18 @@ # after all. # """Manage all Kobo books, either encrypted or DRM-free.""" -from __future__ import print_function __version__ = "4.1.2" -__about__ = "Obok v{0}\nCopyright © 2012-2020 Physisticated et al.".format(__version__) +__about__ = f"Obok v{__version__}\nCopyright © 2012-2020 Physisticated et al." import base64 import binascii +import contextlib import hashlib import os import re import shutil import sqlite3 -import string import subprocess import sys import tempfile @@ -206,9 +205,6 @@ def _load_crypto_libcrypto(): c_char_p, c_int, c_long, - c_ulong, - c_void_p, - cast, create_string_buffer, ) from ctypes.util import find_library @@ -224,8 +220,8 @@ def _load_crypto_libcrypto(): AES_MAXNR = 14 - c_char_pp = POINTER(c_char_p) - c_int_p = POINTER(c_int) + POINTER(c_char_p) + POINTER(c_int) class AES_KEY(Structure): _fields_ = [("rd_key", c_long * (4 * (AES_MAXNR + 1))), ("rounds", c_int)] @@ -241,12 +237,11 @@ def _load_crypto_libcrypto(): AES_set_decrypt_key = F(c_int, "AES_set_decrypt_key", [c_char_p, c_int, AES_KEY_p]) AES_ecb_encrypt = F(None, "AES_ecb_encrypt", [c_char_p, c_char_p, AES_KEY_p, c_int]) - class AES(object): - def __init__(self, userkey): + class AES: + def __init__(self, userkey) -> None: self._blocksize = len(userkey) if self._blocksize not in [16, 24, 32]: raise ENCRYPTIONError(_("AES improper key used")) - return key = self._key = AES_KEY() rv = AES_set_decrypt_key(userkey, len(userkey) * 8, key) if rv < 0: @@ -268,8 +263,8 @@ def _load_crypto_libcrypto(): def _load_crypto_pycrypto(): from Crypto.Cipher import AES as _AES - class AES(object): - def __init__(self, key): + class AES: + def __init__(self, key) -> None: self._aes = _AES.new(key, _AES.MODE_ECB) def decrypt(self, data): @@ -282,11 +277,9 @@ def _load_crypto(): AES = None cryptolist = (_load_crypto_pycrypto, _load_crypto_libcrypto) for loader in cryptolist: - try: + with contextlib.suppress(ImportError, ENCRYPTIONError): AES = loader() break - except (ImportError, ENCRYPTIONError): - pass return AES @@ -297,7 +290,7 @@ AES = _load_crypto() # and also make sure that any unicode strings get # encoded using "replace" before writing them. class SafeUnbuffered: - def __init__(self, stream): + def __init__(self, stream) -> None: self.stream = stream self.encoding = stream.encoding if self.encoding is None: @@ -313,14 +306,14 @@ class SafeUnbuffered: return getattr(self.stream, attr) -class KoboLibrary(object): +class KoboLibrary: """The Kobo library. This class represents all the information available from the data written by the Kobo Desktop Edition application, including the list of books, their titles, and the user's encryption key(s).""" - def __init__(self, serials=None, device_path=None, desktopkobodir=""): + def __init__(self, serials=None, device_path=None, desktopkobodir="") -> None: if serials is None: serials = [] print(__about__) @@ -343,7 +336,7 @@ class KoboLibrary(object): self.kobodir = os.path.join(device_path, ".kobo") # devices use KoboReader.sqlite kobodb = os.path.join(self.kobodir, "KoboReader.sqlite") - if not (os.path.isfile(kobodb)): + if not os.path.isfile(kobodb): # device path seems to be wrong, unset it device_path = "" self.kobodir = "" @@ -357,7 +350,9 @@ class KoboLibrary(object): if self.kobodir and len(serials) == 0 and can_parse_xml: # print "get_device_settings - device_path = {0}".format(device_path) devicexml = os.path.join( - device_path, ".adobe-digital-editions", "device.xml" + device_path, + ".adobe-digital-editions", + "device.xml", ) # print "trying to load {0}".format(devicexml) if os.path.exists(devicexml): @@ -386,11 +381,11 @@ class KoboLibrary(object): if ( sys.getwindowsversion().major > 5 - and "LOCALAPPDATA" in os.environ.keys() + and "LOCALAPPDATA" in os.environ ): # Python 2.x does not return unicode env. Use Python 3.x self.kobodir = winreg.ExpandEnvironmentStrings("%LOCALAPPDATA%") - if self.kobodir == "" and "USERPROFILE" in os.environ.keys(): + if self.kobodir == "" and "USERPROFILE" in os.environ: # Python 2.x does not return unicode env. Use Python 3.x self.kobodir = os.path.join( winreg.ExpandEnvironmentStrings("%USERPROFILE%"), @@ -398,7 +393,9 @@ class KoboLibrary(object): "Application Data", ) self.kobodir = os.path.join( - self.kobodir, "Kobo", "Kobo Desktop Edition" + self.kobodir, + "Kobo", + "Kobo Desktop Edition", ) elif sys.platform.startswith("darwin"): self.kobodir = os.path.join( @@ -411,7 +408,9 @@ class KoboLibrary(object): elif sys.platform.startswith("linux"): # sets ~/.config/calibre as the location to store the kobodir location info file and creates this directory if necessary kobodir_cache_dir = os.path.join( - os.environ["HOME"], ".config", "calibre" + os.environ["HOME"], + ".config", + "calibre", ) if not os.path.isdir(kobodir_cache_dir): os.mkdir(kobodir_cache_dir) @@ -424,22 +423,26 @@ class KoboLibrary(object): in that file so this loop can be skipped in the future""" original_stdout = sys.stdout if not os.path.isfile(kobodir_cache_file): - for root, dirs, files in os.walk("/"): + for root, _dirs, files in os.walk("/"): for file in files: if file == "Kobo.sqlite": kobo_linux_path = str(root) - with open(kobodir_cache_file, "w") as f: + with open( + kobodir_cache_file, + "w", + encoding="utf-8", + ) as f: sys.stdout = f print(kobo_linux_path, end="") sys.stdout = original_stdout - f = open(kobodir_cache_file, "r") + f = open(kobodir_cache_file, encoding="utf-8") self.kobodir = f.read() # desktop versions use Kobo.sqlite kobodb = os.path.join(self.kobodir, "Kobo.sqlite") # check for existence of file - if not (os.path.isfile(kobodb)): + if not os.path.isfile(kobodb): # give up here, we haven't found anything useful self.kobodir = "" kobodb = "" @@ -450,12 +453,11 @@ class KoboLibrary(object): # so we can ensure it's not using WAL logging which sqlite3 can't do. self.newdb = tempfile.NamedTemporaryFile(mode="wb", delete=False) print(self.newdb.name) - olddb = open(kobodb, "rb") - self.newdb.write(olddb.read(18)) - self.newdb.write(b"\x01\x01") - olddb.read(2) - self.newdb.write(olddb.read()) - olddb.close() + with open(kobodb, "rb") as olddb: + self.newdb.write(olddb.read(18)) + self.newdb.write(b"\x01\x01") + olddb.read(2) + self.newdb.write(olddb.read()) self.newdb.close() self.__sqlite = sqlite3.connect(self.newdb.name) self.__cursor = self.__sqlite.cursor() @@ -489,7 +491,7 @@ class KoboLibrary(object): return self._books """Drm-ed kepub""" for row in self.__cursor.execute( - "SELECT DISTINCT volumeid, Title, Attribution, Series FROM content_keys, content WHERE contentid = volumeid" + "SELECT DISTINCT volumeid, Title, Attribution, Series FROM content_keys, content WHERE contentid = volumeid", ): self._books.append( KoboBook( @@ -500,7 +502,7 @@ class KoboLibrary(object): self.__cursor, author=row[2], series=row[3], - ) + ), ) self._volumeID.append(row[0]) """Drm-free""" @@ -509,7 +511,7 @@ class KoboLibrary(object): row = self.__cursor.execute( "SELECT Title, Attribution, Series FROM content WHERE ContentID = '" + f - + "'" + + "'", ).fetchone() if row is not None: fTitle = row[0] @@ -522,7 +524,7 @@ class KoboLibrary(object): self.__cursor, author=row[1], series=row[2], - ) + ), ) self._volumeID.append(f) """Sort""" @@ -538,7 +540,8 @@ class KoboLibrary(object): macaddrs = [] if sys.platform.startswith("win"): c = re.compile( - "\s?(" + "[0-9a-f]{2}[:\-]" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE + "\\s?(" + "[0-9a-f]{2}[:\\-]" * 5 + "[0-9a-f]{2})(\\s|$)", + re.IGNORECASE, ) output = subprocess.Popen( "wmic nic where PhysicalAdapter=True get MACAddress", @@ -551,10 +554,13 @@ class KoboLibrary(object): macaddrs.append(re.sub("-", ":", m[1]).upper()) elif sys.platform.startswith("darwin"): c = re.compile( - "\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE + "\\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\\s|$)", + re.IGNORECASE, ) output = subprocess.check_output( - "/sbin/ifconfig -a", shell=True, encoding="utf-8" + "/sbin/ifconfig -a", + shell=True, + encoding="utf-8", ) matches = c.findall(output) macaddrs.extend(m[0].upper() for m in matches) @@ -563,7 +569,8 @@ class KoboLibrary(object): # let's try ip c = re.compile( - "\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE + "\\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\\s|$)", + re.IGNORECASE, ) for line in os.popen("ip -br link"): if m := c.search(line): @@ -571,7 +578,8 @@ class KoboLibrary(object): # let's try ipconfig under wine c = re.compile( - "\s(" + "[0-9a-f]{2}-" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE + "\\s(" + "[0-9a-f]{2}-" * 5 + "[0-9a-f]{2})(\\s|$)", + re.IGNORECASE, ) for line in os.popen("ipconfig /all"): if m := c.search(line): @@ -588,11 +596,9 @@ class KoboLibrary(object): cursor = self.__cursor.execute("SELECT UserID FROM user") row = cursor.fetchone() while row is not None: - try: + with contextlib.suppress(Exception): userid = row[0] userids.append(userid) - except Exception: - pass row = cursor.fetchone() return userids @@ -603,13 +609,13 @@ class KoboLibrary(object): deviceid = hashlib.sha256((hash + macaddr).encode("ascii")).hexdigest() for userid in userids: userkey = hashlib.sha256( - (deviceid + userid).encode("ascii") + (deviceid + userid).encode("ascii"), ).hexdigest() userkeys.append(binascii.a2b_hex(userkey[32:])) return userkeys -class KoboBook(object): +class KoboBook: """A Kobo book. A Kobo book contains a number of unencrypted and encrypted files. @@ -622,8 +628,15 @@ class KoboBook(object): type - either kepub or drm-free""" def __init__( - self, volumeid, title, filename, type, cursor, author=None, series=None - ): + self, + volumeid, + title, + filename, + type, + cursor, + author=None, + series=None, + ) -> None: self.volumeid = volumeid self.title = title self.author = author @@ -650,7 +663,9 @@ class KoboBook(object): (self.volumeid,), ): self._encryptedfiles[row[0]] = KoboFile( - row[0], None, base64.b64decode(row[1]) + row[0], + None, + base64.b64decode(row[1]), ) # Read the list of files from the kepub OPF manifest so that @@ -685,7 +700,7 @@ class KoboBook(object): return self.type != "drm-free" -class KoboFile(object): +class KoboFile: """An encrypted file in a KoboBook. Each file has the following instance variables: @@ -693,7 +708,7 @@ class KoboFile(object): mimetype - the file's MIME type, e.g. 'image/jpeg' key - the encrypted page key.""" - def __init__(self, filename, mimetype, key): + def __init__(self, filename, mimetype, key) -> None: self.filename = filename self.mimetype = mimetype self.key = key @@ -722,7 +737,7 @@ class KoboFile(object): # assume utf-8 with no BOM textoffset = 0 stride = 1 - print("Checking text:{0}:".format(contents[:10])) + print(f"Checking text:{contents[:10]}:") # check for byte order mark if contents[:3] == b"\xef\xbb\xbf": # seems to be utf-8 with BOM @@ -745,45 +760,15 @@ class KoboFile(object): for i in range(textoffset, textoffset + 5 * stride, stride): if contents[i] < 32 or contents[i] > 127: # Non-ascii, so decryption probably failed - print("Bad character at {0}, value {1}".format(i, contents[i])) + print(f"Bad character at {i}, value {contents[i]}") raise ValueError print("Seems to be good text") return True - if contents[:5] == b" None: self.keys = itertools.cycle(key.split(",")) self.language = language diff --git a/book_maker/translator/caiyun_translator.py b/book_maker/translator/caiyun_translator.py index 4d1c651..c2d9141 100644 --- a/book_maker/translator/caiyun_translator.py +++ b/book_maker/translator/caiyun_translator.py @@ -10,12 +10,12 @@ class Caiyun(Base): caiyun translator """ - def __init__(self, key, language, **kwargs): + def __init__(self, key, language, **kwargs) -> None: super().__init__(key, language) self.api_url = "http://api.interpreter.caiyunai.com/v1/translator" self.headers = { "content-type": "application/json", - "x-authorization": "token " + key, + "x-authorization": f"token {key}", } # caiyun api only supports: zh2en, zh2ja, en2zh, ja2zh self.translate_type = "auto2zh" @@ -36,7 +36,10 @@ class Caiyun(Base): "detect": True, } response = requests.request( - "POST", self.api_url, data=json.dumps(payload), headers=self.headers + "POST", + self.api_url, + data=json.dumps(payload), + headers=self.headers, ) t_text = json.loads(response.text)["target"] print(t_text) diff --git a/book_maker/translator/chatgptapi_translator.py b/book_maker/translator/chatgptapi_translator.py index 196e5ce..476614e 100644 --- a/book_maker/translator/chatgptapi_translator.py +++ b/book_maker/translator/chatgptapi_translator.py @@ -24,7 +24,7 @@ class ChatGPTAPI(Base): prompt_template=None, prompt_sys_msg=None, **kwargs, - ): + ) -> None: super().__init__(key, language) self.key_len = len(key.split(",")) @@ -38,7 +38,7 @@ class ChatGPTAPI(Base): self.prompt_sys_msg = ( prompt_sys_msg or environ.get( - "OPENAI_API_SYS_MSG" + "OPENAI_API_SYS_MSG", ) # XXX: for backward compatability, deprecate soon or environ.get(PROMPT_ENV_MAP["system"]) or "" @@ -51,9 +51,7 @@ class ChatGPTAPI(Base): def create_chat_completion(self, text): content = self.prompt_template.format(text=text, language=self.language) - sys_content = self.prompt_sys_msg - if self.system_content: - sys_content = self.system_content + sys_content = self.system_content or self.prompt_sys_msg messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": content}, @@ -78,7 +76,7 @@ class ChatGPTAPI(Base): completion = self.create_chat_completion(text) except Exception: if ( - not "choices" in completion + "choices" not in completion or not isinstance(completion["choices"], list) or len(completion["choices"]) == 0 ): @@ -121,7 +119,7 @@ The total token is too long and cannot be completely translated\n except Exception as e: # todo: better sleep time? why sleep alawys about key_len # 1. openai server error or own network interruption, sleep for a fixed time - # 2. an apikey has no money or reach limit, don’t sleep, just replace it with another apikey + # 2. an apikey has no money or reach limit, don`t sleep, just replace it with another apikey # 3. all apikey reach limit, then use current sleep sleep_time = int(60 / self.key_len) print(e, f"will sleep {sleep_time} seconds") @@ -135,7 +133,7 @@ The total token is too long and cannot be completely translated\n if needprint: print(re.sub("\n{3,}", "\n\n", t_text)) - elapsed_time = time.time() - start_time + time.time() - start_time # print(f"translation time: {elapsed_time:.1f}s") return t_text @@ -147,7 +145,12 @@ The total token is too long and cannot be completely translated\n return lines def get_best_result_list( - self, plist_len, new_str, sleep_dur, result_list, max_retries=15 + self, + plist_len, + new_str, + sleep_dur, + result_list, + max_retries=15, ): if len(result_list) == plist_len: return result_list, 0 @@ -157,7 +160,7 @@ The total token is too long and cannot be completely translated\n while retry_count < max_retries and len(result_list) != plist_len: print( - f"bug: {plist_len} -> {len(result_list)} : Number of paragraphs before and after translation" + f"bug: {plist_len} -> {len(result_list)} : Number of paragraphs before and after translation", ) print(f"sleep for {sleep_dur}s and retry {retry_count+1} ...") time.sleep(sleep_dur) @@ -179,19 +182,24 @@ The total token is too long and cannot be completely translated\n if retry_count == 0: return print(f"retry {state}") - with open(log_path, "a") as f: + with open(log_path, "a", encoding="utf-8") as f: print( f"retry {state}, count = {retry_count}, time = {elapsed_time:.1f}s", file=f, ) def log_translation_mismatch( - self, plist_len, result_list, new_str, sep, log_path="log/buglog.txt" + self, + plist_len, + result_list, + new_str, + sep, + log_path="log/buglog.txt", ): if len(result_list) == plist_len: return newlist = new_str.split(sep) - with open(log_path, "a") as f: + with open(log_path, "a", encoding="utf-8") as f: print(f"problem size: {plist_len - len(result_list)}", file=f) for i in range(len(newlist)): print(newlist[i], file=f) @@ -204,7 +212,7 @@ The total token is too long and cannot be completely translated\n print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") print( - f"bug: {plist_len} paragraphs of text translated into {len(result_list)} paragraphs" + f"bug: {plist_len} paragraphs of text translated into {len(result_list)} paragraphs", ) print("continue") @@ -246,7 +254,7 @@ The total token is too long and cannot be completely translated\n temp_p = copy(p) for sup in temp_p.find_all("sup"): sup.extract() - new_str += f"({i}) " + temp_p.get_text().strip() + sep + new_str += f"({i}) {temp_p.get_text().strip()}{sep}" i = i + 1 if new_str.endswith(sep): @@ -263,7 +271,10 @@ The total token is too long and cannot be completely translated\n start_time = time.time() result_list, retry_count = self.get_best_result_list( - plist_len, new_str, 6, result_list + plist_len, + new_str, + 6, + result_list, ) end_time = time.time() @@ -275,7 +286,7 @@ The total token is too long and cannot be completely translated\n self.log_translation_mismatch(plist_len, result_list, new_str, sep, log_path) # del (num), num. sometime (num) will translated to num. - result_list = [re.sub(r"^(\(\d+\)|\d+\.|(\d+))\s*", "", s) for s in result_list] + result_list = [re.sub(r"^(\(\d+\)|\d+\.|(\d+))\s*", "", s) for s in result_list] return result_list def set_deployment_id(self, deployment_id): diff --git a/book_maker/translator/deepl_translator.py b/book_maker/translator/deepl_translator.py index 9148e4e..166861d 100644 --- a/book_maker/translator/deepl_translator.py +++ b/book_maker/translator/deepl_translator.py @@ -13,7 +13,7 @@ class DeepL(Base): caiyun translator """ - def __init__(self, key, language, **kwargs): + def __init__(self, key, language, **kwargs) -> None: super().__init__(key, language) self.api_url = "https://deepl-translator.p.rapidapi.com/translate" self.headers = { @@ -22,10 +22,7 @@ class DeepL(Base): "X-RapidAPI-Host": "deepl-translator.p.rapidapi.com", } l = None - if language in LANGUAGES: - l = language - else: - l = TO_LANGUAGE_CODE.get(language) + l = language if language in LANGUAGES else TO_LANGUAGE_CODE.get(language) if l not in [ "bg", "zh", @@ -71,13 +68,19 @@ class DeepL(Base): payload = {"text": text, "source": "EN", "target": self.language} try: response = requests.request( - "POST", self.api_url, data=json.dumps(payload), headers=self.headers + "POST", + self.api_url, + data=json.dumps(payload), + headers=self.headers, ) except Exception as e: - print(str(e)) + print(e) time.sleep(30) response = requests.request( - "POST", self.api_url, data=json.dumps(payload), headers=self.headers + "POST", + self.api_url, + data=json.dumps(payload), + headers=self.headers, ) t_text = response.json().get("text", "") print(t_text) diff --git a/book_maker/translator/google_translator.py b/book_maker/translator/google_translator.py index 936a4b7..203a628 100644 --- a/book_maker/translator/google_translator.py +++ b/book_maker/translator/google_translator.py @@ -8,7 +8,7 @@ class Google(Base): google translate """ - def __init__(self, key, language, **kwargs): + def __init__(self, key, language, **kwargs) -> None: super().__init__(key, language) self.api_url = "https://translate.google.com/translate_a/single?client=it&dt=qca&dt=t&dt=rmt&dt=bd&dt=rms&dt=sos&dt=md&dt=gt&dt=ld&dt=ss&dt=ex&otf=2&dj=1&hl=en&ie=UTF-8&oe=UTF-8&sl=auto&tl=zh-CN" self.headers = { @@ -27,12 +27,12 @@ class Google(Base): r = self.session.post( self.api_url, headers=self.headers, - data="q={text}".format(text=requests.utils.quote(text)), + data=f"q={requests.utils.quote(text)}", ) if not r.ok: return text t_text = "".join( - [sentence.get("trans", "") for sentence in r.json()["sentences"]] + [sentence.get("trans", "") for sentence in r.json()["sentences"]], ) print(t_text) return t_text diff --git a/book_maker/translator/gpt3_translator.py b/book_maker/translator/gpt3_translator.py index c74de5c..b58d013 100644 --- a/book_maker/translator/gpt3_translator.py +++ b/book_maker/translator/gpt3_translator.py @@ -5,7 +5,9 @@ from .base_translator import Base class GPT3(Base): - def __init__(self, key, language, api_base=None, prompt_template=None, **kwargs): + def __init__( + self, key, language, api_base=None, prompt_template=None, **kwargs + ) -> None: super().__init__(key, language) self.api_url = ( f"{api_base}v1/completions" @@ -36,7 +38,8 @@ class GPT3(Base): print(text) self.rotate_key() self.data["prompt"] = self.prompt_template.format( - text=text, language=self.language + text=text, + language=self.language, ) r = self.session.post(self.api_url, headers=self.headers, json=self.data) if not r.ok: