* Clean up some code
This commit is contained in:
Daniel Parizher 2023-03-30 07:22:36 -04:00 committed by GitHub
parent 74bbaf3e8a
commit e7ab4c7c81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 167 additions and 170 deletions

View File

@ -22,23 +22,20 @@ def parse_prompt_arg(prompt_arg):
# if not a json string, treat it as a template string
prompt = {"user": prompt_arg}
elif os.path.exists(prompt_arg):
if prompt_arg.endswith(".txt"):
# if it's a txt file, treat it as a template string
with open(prompt_arg, encoding="utf-8") as f:
prompt = {"user": f.read()}
elif prompt_arg.endswith(".json"):
# if it's a json file, treat it as a json object
# eg: --prompt prompt_template_sample.json
with open(prompt_arg, encoding="utf-8") as f:
prompt = json.load(f)
else:
if os.path.exists(prompt_arg):
if prompt_arg.endswith(".txt"):
# if it's a txt file, treat it as a template string
with open(prompt_arg, "r") as f:
prompt = {"user": f.read()}
elif prompt_arg.endswith(".json"):
# if it's a json file, treat it as a json object
# eg: --prompt prompt_template_sample.json
with open(prompt_arg, "r") as f:
prompt = json.load(f)
else:
raise FileNotFoundError(f"{prompt_arg} not found")
raise FileNotFoundError(f"{prompt_arg} not found")
if prompt is None or not (
all(c in prompt["user"] for c in ["{text}", "{language}"])
):
if prompt is None or any(c not in prompt["user"] for c in ["{text}", "{language}"]):
raise ValueError("prompt must contain `{text}` and `{language}`")
if "user" not in prompt:
@ -123,7 +120,7 @@ def main():
"--language",
type=str,
choices=sorted(LANGUAGES.keys())
+ sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]),
+ sorted([k.title() for k in TO_LANGUAGE_CODE]),
default="zh-hans",
metavar="LANGUAGE",
help="language to translate to, available: {%(choices)s}",
@ -227,20 +224,20 @@ So you are close to reaching the limit. You have to choose your own value, there
translate_model = MODEL_DICT.get(options.model)
assert translate_model is not None, "unsupported model"
if options.model in ["gpt3", "chatgptapi"]:
OPENAI_API_KEY = (
if OPENAI_API_KEY := (
options.openai_key
or env.get(
"OPENAI_API_KEY"
"OPENAI_API_KEY",
) # XXX: for backward compatability, deprecate soon
or env.get(
"BBM_OPENAI_API_KEY"
"BBM_OPENAI_API_KEY",
) # suggest adding `BBM_` prefix for all the bilingual_book_maker ENVs.
)
if not OPENAI_API_KEY:
):
API_KEY = OPENAI_API_KEY
else:
raise Exception(
"OpenAI API key not provided, please google how to obtain it"
"OpenAI API key not provided, please google how to obtain it",
)
API_KEY = OPENAI_API_KEY
elif options.model == "caiyun":
API_KEY = options.caiyun_key or env.get("BBM_CAIYUN_API_KEY")
if not API_KEY:
@ -253,12 +250,12 @@ So you are close to reaching the limit. You have to choose your own value, there
API_KEY = ""
if options.book_from == "kobo":
import book_maker.obok as obok
from book_maker import obok
device_path = options.device_path
if device_path is None:
raise Exception(
"Device path is not given, please specify the path by --device_path <DEVICE_PATH>"
"Device path is not given, please specify the path by --device_path <DEVICE_PATH>",
)
options.book_name = obok.cli_main(device_path)
@ -266,7 +263,7 @@ So you are close to reaching the limit. You have to choose your own value, there
support_type_list = list(BOOK_LOADER_DICT.keys())
if book_type not in support_type_list:
raise Exception(
f"now only support files of these formats: {','.join(support_type_list)}"
f"now only support files of these formats: {','.join(support_type_list)}",
)
book_loader = BOOK_LOADER_DICT.get(book_type)

View File

@ -18,7 +18,7 @@ class TXTBookLoader(BaseBookLoader):
is_test=False,
test_num=5,
prompt_config=None,
):
) -> None:
self.txt_name = txt_name
self.translate_model = model(
key,
@ -34,11 +34,11 @@ class TXTBookLoader(BaseBookLoader):
self.batch_size = 10
try:
with open(f"{txt_name}", "r", encoding="utf-8") as f:
with open(f"{txt_name}", encoding="utf-8") as f:
self.origin_book = f.read().split("\n")
except Exception:
raise Exception("can not load file")
except Exception as e:
raise Exception("can not load file") from e
self.resume = resume
self.bin_path = f"{Path(txt_name).parent}/.{Path(txt_name).stem}.temp.bin"
@ -65,14 +65,12 @@ class TXTBookLoader(BaseBookLoader):
batch_text = "".join(i)
if self._is_special_text(batch_text):
continue
if self.resume and index < p_to_save_len:
pass
else:
if not self.resume or index >= p_to_save_len:
try:
temp = self.translate_model.translate(batch_text)
except Exception as e:
print(str(e))
raise Exception("Something is wrong when translate")
print(e)
raise Exception("Something is wrong when translate") from e
self.p_to_save.append(temp)
self.bilingual_result.append(batch_text)
self.bilingual_result.append(temp)
@ -122,10 +120,10 @@ class TXTBookLoader(BaseBookLoader):
def load_state(self):
try:
with open(self.bin_path, "r", encoding="utf-8") as f:
with open(self.bin_path, encoding="utf-8") as f:
self.p_to_save = f.read().split("\n")
except Exception:
raise Exception("can not load resume file")
except Exception as e:
raise Exception("can not load resume file") from e
def save_file(self, book_path, content):
try:

View File

@ -162,19 +162,18 @@
# after all.
#
"""Manage all Kobo books, either encrypted or DRM-free."""
from __future__ import print_function
__version__ = "4.1.2"
__about__ = "Obok v{0}\nCopyright © 2012-2020 Physisticated et al.".format(__version__)
__about__ = f"Obok v{__version__}\nCopyright © 2012-2020 Physisticated et al."
import base64
import binascii
import contextlib
import hashlib
import os
import re
import shutil
import sqlite3
import string
import subprocess
import sys
import tempfile
@ -206,9 +205,6 @@ def _load_crypto_libcrypto():
c_char_p,
c_int,
c_long,
c_ulong,
c_void_p,
cast,
create_string_buffer,
)
from ctypes.util import find_library
@ -224,8 +220,8 @@ def _load_crypto_libcrypto():
AES_MAXNR = 14
c_char_pp = POINTER(c_char_p)
c_int_p = POINTER(c_int)
POINTER(c_char_p)
POINTER(c_int)
class AES_KEY(Structure):
_fields_ = [("rd_key", c_long * (4 * (AES_MAXNR + 1))), ("rounds", c_int)]
@ -241,12 +237,11 @@ def _load_crypto_libcrypto():
AES_set_decrypt_key = F(c_int, "AES_set_decrypt_key", [c_char_p, c_int, AES_KEY_p])
AES_ecb_encrypt = F(None, "AES_ecb_encrypt", [c_char_p, c_char_p, AES_KEY_p, c_int])
class AES(object):
def __init__(self, userkey):
class AES:
def __init__(self, userkey) -> None:
self._blocksize = len(userkey)
if self._blocksize not in [16, 24, 32]:
raise ENCRYPTIONError(_("AES improper key used"))
return
key = self._key = AES_KEY()
rv = AES_set_decrypt_key(userkey, len(userkey) * 8, key)
if rv < 0:
@ -268,8 +263,8 @@ def _load_crypto_libcrypto():
def _load_crypto_pycrypto():
from Crypto.Cipher import AES as _AES
class AES(object):
def __init__(self, key):
class AES:
def __init__(self, key) -> None:
self._aes = _AES.new(key, _AES.MODE_ECB)
def decrypt(self, data):
@ -282,11 +277,9 @@ def _load_crypto():
AES = None
cryptolist = (_load_crypto_pycrypto, _load_crypto_libcrypto)
for loader in cryptolist:
try:
with contextlib.suppress(ImportError, ENCRYPTIONError):
AES = loader()
break
except (ImportError, ENCRYPTIONError):
pass
return AES
@ -297,7 +290,7 @@ AES = _load_crypto()
# and also make sure that any unicode strings get
# encoded using "replace" before writing them.
class SafeUnbuffered:
def __init__(self, stream):
def __init__(self, stream) -> None:
self.stream = stream
self.encoding = stream.encoding
if self.encoding is None:
@ -313,14 +306,14 @@ class SafeUnbuffered:
return getattr(self.stream, attr)
class KoboLibrary(object):
class KoboLibrary:
"""The Kobo library.
This class represents all the information available from the data
written by the Kobo Desktop Edition application, including the list
of books, their titles, and the user's encryption key(s)."""
def __init__(self, serials=None, device_path=None, desktopkobodir=""):
def __init__(self, serials=None, device_path=None, desktopkobodir="") -> None:
if serials is None:
serials = []
print(__about__)
@ -343,7 +336,7 @@ class KoboLibrary(object):
self.kobodir = os.path.join(device_path, ".kobo")
# devices use KoboReader.sqlite
kobodb = os.path.join(self.kobodir, "KoboReader.sqlite")
if not (os.path.isfile(kobodb)):
if not os.path.isfile(kobodb):
# device path seems to be wrong, unset it
device_path = ""
self.kobodir = ""
@ -357,7 +350,9 @@ class KoboLibrary(object):
if self.kobodir and len(serials) == 0 and can_parse_xml:
# print "get_device_settings - device_path = {0}".format(device_path)
devicexml = os.path.join(
device_path, ".adobe-digital-editions", "device.xml"
device_path,
".adobe-digital-editions",
"device.xml",
)
# print "trying to load {0}".format(devicexml)
if os.path.exists(devicexml):
@ -386,11 +381,11 @@ class KoboLibrary(object):
if (
sys.getwindowsversion().major > 5
and "LOCALAPPDATA" in os.environ.keys()
and "LOCALAPPDATA" in os.environ
):
# Python 2.x does not return unicode env. Use Python 3.x
self.kobodir = winreg.ExpandEnvironmentStrings("%LOCALAPPDATA%")
if self.kobodir == "" and "USERPROFILE" in os.environ.keys():
if self.kobodir == "" and "USERPROFILE" in os.environ:
# Python 2.x does not return unicode env. Use Python 3.x
self.kobodir = os.path.join(
winreg.ExpandEnvironmentStrings("%USERPROFILE%"),
@ -398,7 +393,9 @@ class KoboLibrary(object):
"Application Data",
)
self.kobodir = os.path.join(
self.kobodir, "Kobo", "Kobo Desktop Edition"
self.kobodir,
"Kobo",
"Kobo Desktop Edition",
)
elif sys.platform.startswith("darwin"):
self.kobodir = os.path.join(
@ -411,7 +408,9 @@ class KoboLibrary(object):
elif sys.platform.startswith("linux"):
# sets ~/.config/calibre as the location to store the kobodir location info file and creates this directory if necessary
kobodir_cache_dir = os.path.join(
os.environ["HOME"], ".config", "calibre"
os.environ["HOME"],
".config",
"calibre",
)
if not os.path.isdir(kobodir_cache_dir):
os.mkdir(kobodir_cache_dir)
@ -424,22 +423,26 @@ class KoboLibrary(object):
in that file so this loop can be skipped in the future"""
original_stdout = sys.stdout
if not os.path.isfile(kobodir_cache_file):
for root, dirs, files in os.walk("/"):
for root, _dirs, files in os.walk("/"):
for file in files:
if file == "Kobo.sqlite":
kobo_linux_path = str(root)
with open(kobodir_cache_file, "w") as f:
with open(
kobodir_cache_file,
"w",
encoding="utf-8",
) as f:
sys.stdout = f
print(kobo_linux_path, end="")
sys.stdout = original_stdout
f = open(kobodir_cache_file, "r")
f = open(kobodir_cache_file, encoding="utf-8")
self.kobodir = f.read()
# desktop versions use Kobo.sqlite
kobodb = os.path.join(self.kobodir, "Kobo.sqlite")
# check for existence of file
if not (os.path.isfile(kobodb)):
if not os.path.isfile(kobodb):
# give up here, we haven't found anything useful
self.kobodir = ""
kobodb = ""
@ -450,12 +453,11 @@ class KoboLibrary(object):
# so we can ensure it's not using WAL logging which sqlite3 can't do.
self.newdb = tempfile.NamedTemporaryFile(mode="wb", delete=False)
print(self.newdb.name)
olddb = open(kobodb, "rb")
self.newdb.write(olddb.read(18))
self.newdb.write(b"\x01\x01")
olddb.read(2)
self.newdb.write(olddb.read())
olddb.close()
with open(kobodb, "rb") as olddb:
self.newdb.write(olddb.read(18))
self.newdb.write(b"\x01\x01")
olddb.read(2)
self.newdb.write(olddb.read())
self.newdb.close()
self.__sqlite = sqlite3.connect(self.newdb.name)
self.__cursor = self.__sqlite.cursor()
@ -489,7 +491,7 @@ class KoboLibrary(object):
return self._books
"""Drm-ed kepub"""
for row in self.__cursor.execute(
"SELECT DISTINCT volumeid, Title, Attribution, Series FROM content_keys, content WHERE contentid = volumeid"
"SELECT DISTINCT volumeid, Title, Attribution, Series FROM content_keys, content WHERE contentid = volumeid",
):
self._books.append(
KoboBook(
@ -500,7 +502,7 @@ class KoboLibrary(object):
self.__cursor,
author=row[2],
series=row[3],
)
),
)
self._volumeID.append(row[0])
"""Drm-free"""
@ -509,7 +511,7 @@ class KoboLibrary(object):
row = self.__cursor.execute(
"SELECT Title, Attribution, Series FROM content WHERE ContentID = '"
+ f
+ "'"
+ "'",
).fetchone()
if row is not None:
fTitle = row[0]
@ -522,7 +524,7 @@ class KoboLibrary(object):
self.__cursor,
author=row[1],
series=row[2],
)
),
)
self._volumeID.append(f)
"""Sort"""
@ -538,7 +540,8 @@ class KoboLibrary(object):
macaddrs = []
if sys.platform.startswith("win"):
c = re.compile(
"\s?(" + "[0-9a-f]{2}[:\-]" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
"\\s?(" + "[0-9a-f]{2}[:\\-]" * 5 + "[0-9a-f]{2})(\\s|$)",
re.IGNORECASE,
)
output = subprocess.Popen(
"wmic nic where PhysicalAdapter=True get MACAddress",
@ -551,10 +554,13 @@ class KoboLibrary(object):
macaddrs.append(re.sub("-", ":", m[1]).upper())
elif sys.platform.startswith("darwin"):
c = re.compile(
"\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
"\\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\\s|$)",
re.IGNORECASE,
)
output = subprocess.check_output(
"/sbin/ifconfig -a", shell=True, encoding="utf-8"
"/sbin/ifconfig -a",
shell=True,
encoding="utf-8",
)
matches = c.findall(output)
macaddrs.extend(m[0].upper() for m in matches)
@ -563,7 +569,8 @@ class KoboLibrary(object):
# let's try ip
c = re.compile(
"\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
"\\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\\s|$)",
re.IGNORECASE,
)
for line in os.popen("ip -br link"):
if m := c.search(line):
@ -571,7 +578,8 @@ class KoboLibrary(object):
# let's try ipconfig under wine
c = re.compile(
"\s(" + "[0-9a-f]{2}-" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
"\\s(" + "[0-9a-f]{2}-" * 5 + "[0-9a-f]{2})(\\s|$)",
re.IGNORECASE,
)
for line in os.popen("ipconfig /all"):
if m := c.search(line):
@ -588,11 +596,9 @@ class KoboLibrary(object):
cursor = self.__cursor.execute("SELECT UserID FROM user")
row = cursor.fetchone()
while row is not None:
try:
with contextlib.suppress(Exception):
userid = row[0]
userids.append(userid)
except Exception:
pass
row = cursor.fetchone()
return userids
@ -603,13 +609,13 @@ class KoboLibrary(object):
deviceid = hashlib.sha256((hash + macaddr).encode("ascii")).hexdigest()
for userid in userids:
userkey = hashlib.sha256(
(deviceid + userid).encode("ascii")
(deviceid + userid).encode("ascii"),
).hexdigest()
userkeys.append(binascii.a2b_hex(userkey[32:]))
return userkeys
class KoboBook(object):
class KoboBook:
"""A Kobo book.
A Kobo book contains a number of unencrypted and encrypted files.
@ -622,8 +628,15 @@ class KoboBook(object):
type - either kepub or drm-free"""
def __init__(
self, volumeid, title, filename, type, cursor, author=None, series=None
):
self,
volumeid,
title,
filename,
type,
cursor,
author=None,
series=None,
) -> None:
self.volumeid = volumeid
self.title = title
self.author = author
@ -650,7 +663,9 @@ class KoboBook(object):
(self.volumeid,),
):
self._encryptedfiles[row[0]] = KoboFile(
row[0], None, base64.b64decode(row[1])
row[0],
None,
base64.b64decode(row[1]),
)
# Read the list of files from the kepub OPF manifest so that
@ -685,7 +700,7 @@ class KoboBook(object):
return self.type != "drm-free"
class KoboFile(object):
class KoboFile:
"""An encrypted file in a KoboBook.
Each file has the following instance variables:
@ -693,7 +708,7 @@ class KoboFile(object):
mimetype - the file's MIME type, e.g. 'image/jpeg'
key - the encrypted page key."""
def __init__(self, filename, mimetype, key):
def __init__(self, filename, mimetype, key) -> None:
self.filename = filename
self.mimetype = mimetype
self.key = key
@ -722,7 +737,7 @@ class KoboFile(object):
# assume utf-8 with no BOM
textoffset = 0
stride = 1
print("Checking text:{0}:".format(contents[:10]))
print(f"Checking text:{contents[:10]}:")
# check for byte order mark
if contents[:3] == b"\xef\xbb\xbf":
# seems to be utf-8 with BOM
@ -745,45 +760,15 @@ class KoboFile(object):
for i in range(textoffset, textoffset + 5 * stride, stride):
if contents[i] < 32 or contents[i] > 127:
# Non-ascii, so decryption probably failed
print("Bad character at {0}, value {1}".format(i, contents[i]))
print(f"Bad character at {i}, value {contents[i]}")
raise ValueError
print("Seems to be good text")
return True
if contents[:5] == b"<?xml" or contents[:8] == b"\xef\xbb\xbf<?xml":
# utf-8
return True
elif contents[:14] == b"\xfe\xff\x00<\x00?\x00x\x00m\x00l":
# utf-16BE
return True
elif contents[:14] == b"\xff\xfe<\x00?\x00x\x00m\x00l\x00":
# utf-16LE
return True
elif (
contents[:9] == b"<!DOCTYPE"
or contents[:12] == b"\xef\xbb\xbf<!DOCTYPE"
):
# utf-8 of weird <!DOCTYPE start
return True
elif (
contents[:22]
== b"\xfe\xff\x00<\x00!\x00D\x00O\x00C\x00T\x00Y\x00P\x00E"
):
# utf-16BE of weird <!DOCTYPE start
return True
elif (
contents[:22]
== b"\xff\xfe<\x00!\x00D\x00O\x00C\x00T\x00Y\x00P\x00E\x00"
):
# utf-16LE of weird <!DOCTYPE start
return True
else:
print("Bad XML: {0}".format(contents[:8]))
raise ValueError
if self.mimetype == "image/jpeg":
if contents[:3] == b"\xff\xd8\xff":
return True
print("Bad JPEG: {0}".format(contents[:3].hex()))
raise ValueError()
print(f"Bad JPEG: {contents[:3].hex()}")
raise ValueError
return False
def __removeaespadding(self, contents):
@ -806,18 +791,17 @@ class KoboFile(object):
def decrypt_book(book, lib):
print("Converting {0}".format(book.title))
print(f"Converting {book.title}")
zin = zipfile.ZipFile(book.filename, "r")
# make filename out of Unicode alphanumeric and whitespace equivalents from title
outname = "{0}.epub".format(re.sub("[^\s\w]", "_", book.title, 0, re.UNICODE))
outname = "{}.epub".format(re.sub("[^\\s\\w]", "_", book.title, 0, re.UNICODE))
if book.type == "drm-free":
print("DRM-free book, conversion is not needed")
shutil.copyfile(book.filename, outname)
print("Book saved as {0}".format(os.path.join(os.getcwd(), outname)))
print(f"Book saved as {os.path.join(os.getcwd(), outname)}")
return os.path.join(os.getcwd(), outname)
result = 1
for userkey in lib.userkeys:
print("Trying key: {0}".format(userkey.hex()))
print(f"Trying key: {userkey.hex()}")
try:
zout = zipfile.ZipFile(outname, "w", zipfile.ZIP_DEFLATED)
for filename in zin.namelist():
@ -830,8 +814,7 @@ def decrypt_book(book, lib):
zout.writestr(filename, contents)
zout.close()
print("Decryption succeeded.")
print("Book saved as {0}".format(os.path.join(os.getcwd(), outname)))
result = 0
print(f"Book saved as {os.path.join(os.getcwd(), outname)}")
break
except ValueError:
print("Decryption failed.")
@ -842,13 +825,12 @@ def decrypt_book(book, lib):
def cli_main(devicedir):
description = __about__
serials = []
lib = KoboLibrary(serials, devicedir)
for i, book in enumerate(lib.books):
print("{0}: {1}".format(i + 1, book.title))
print(f"{i + 1}: {book.title}")
choice = input("Convert book number... ")
try:
@ -856,7 +838,7 @@ def cli_main(devicedir):
books = [lib.books[num - 1]]
except (ValueError, IndexError):
print("Invalid choice. Exiting...")
exit()
sys.exit()
results = [decrypt_book(book, lib) for book in books]
lib.close()

View File

@ -3,7 +3,7 @@ from abc import ABC, abstractmethod
class Base(ABC):
def __init__(self, key, language):
def __init__(self, key, language) -> None:
self.keys = itertools.cycle(key.split(","))
self.language = language

View File

@ -10,12 +10,12 @@ class Caiyun(Base):
caiyun translator
"""
def __init__(self, key, language, **kwargs):
def __init__(self, key, language, **kwargs) -> None:
super().__init__(key, language)
self.api_url = "http://api.interpreter.caiyunai.com/v1/translator"
self.headers = {
"content-type": "application/json",
"x-authorization": "token " + key,
"x-authorization": f"token {key}",
}
# caiyun api only supports: zh2en, zh2ja, en2zh, ja2zh
self.translate_type = "auto2zh"
@ -36,7 +36,10 @@ class Caiyun(Base):
"detect": True,
}
response = requests.request(
"POST", self.api_url, data=json.dumps(payload), headers=self.headers
"POST",
self.api_url,
data=json.dumps(payload),
headers=self.headers,
)
t_text = json.loads(response.text)["target"]
print(t_text)

View File

@ -24,7 +24,7 @@ class ChatGPTAPI(Base):
prompt_template=None,
prompt_sys_msg=None,
**kwargs,
):
) -> None:
super().__init__(key, language)
self.key_len = len(key.split(","))
@ -38,7 +38,7 @@ class ChatGPTAPI(Base):
self.prompt_sys_msg = (
prompt_sys_msg
or environ.get(
"OPENAI_API_SYS_MSG"
"OPENAI_API_SYS_MSG",
) # XXX: for backward compatability, deprecate soon
or environ.get(PROMPT_ENV_MAP["system"])
or ""
@ -51,9 +51,7 @@ class ChatGPTAPI(Base):
def create_chat_completion(self, text):
content = self.prompt_template.format(text=text, language=self.language)
sys_content = self.prompt_sys_msg
if self.system_content:
sys_content = self.system_content
sys_content = self.system_content or self.prompt_sys_msg
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": content},
@ -78,7 +76,7 @@ class ChatGPTAPI(Base):
completion = self.create_chat_completion(text)
except Exception:
if (
not "choices" in completion
"choices" not in completion
or not isinstance(completion["choices"], list)
or len(completion["choices"]) == 0
):
@ -121,7 +119,7 @@ The total token is too long and cannot be completely translated\n
except Exception as e:
# todo: better sleep time? why sleep alawys about key_len
# 1. openai server error or own network interruption, sleep for a fixed time
# 2. an apikey has no money or reach limit, dont sleep, just replace it with another apikey
# 2. an apikey has no money or reach limit, don`t sleep, just replace it with another apikey
# 3. all apikey reach limit, then use current sleep
sleep_time = int(60 / self.key_len)
print(e, f"will sleep {sleep_time} seconds")
@ -135,7 +133,7 @@ The total token is too long and cannot be completely translated\n
if needprint:
print(re.sub("\n{3,}", "\n\n", t_text))
elapsed_time = time.time() - start_time
time.time() - start_time
# print(f"translation time: {elapsed_time:.1f}s")
return t_text
@ -147,7 +145,12 @@ The total token is too long and cannot be completely translated\n
return lines
def get_best_result_list(
self, plist_len, new_str, sleep_dur, result_list, max_retries=15
self,
plist_len,
new_str,
sleep_dur,
result_list,
max_retries=15,
):
if len(result_list) == plist_len:
return result_list, 0
@ -157,7 +160,7 @@ The total token is too long and cannot be completely translated\n
while retry_count < max_retries and len(result_list) != plist_len:
print(
f"bug: {plist_len} -> {len(result_list)} : Number of paragraphs before and after translation"
f"bug: {plist_len} -> {len(result_list)} : Number of paragraphs before and after translation",
)
print(f"sleep for {sleep_dur}s and retry {retry_count+1} ...")
time.sleep(sleep_dur)
@ -179,19 +182,24 @@ The total token is too long and cannot be completely translated\n
if retry_count == 0:
return
print(f"retry {state}")
with open(log_path, "a") as f:
with open(log_path, "a", encoding="utf-8") as f:
print(
f"retry {state}, count = {retry_count}, time = {elapsed_time:.1f}s",
file=f,
)
def log_translation_mismatch(
self, plist_len, result_list, new_str, sep, log_path="log/buglog.txt"
self,
plist_len,
result_list,
new_str,
sep,
log_path="log/buglog.txt",
):
if len(result_list) == plist_len:
return
newlist = new_str.split(sep)
with open(log_path, "a") as f:
with open(log_path, "a", encoding="utf-8") as f:
print(f"problem size: {plist_len - len(result_list)}", file=f)
for i in range(len(newlist)):
print(newlist[i], file=f)
@ -204,7 +212,7 @@ The total token is too long and cannot be completely translated\n
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(
f"bug: {plist_len} paragraphs of text translated into {len(result_list)} paragraphs"
f"bug: {plist_len} paragraphs of text translated into {len(result_list)} paragraphs",
)
print("continue")
@ -246,7 +254,7 @@ The total token is too long and cannot be completely translated\n
temp_p = copy(p)
for sup in temp_p.find_all("sup"):
sup.extract()
new_str += f"({i}) " + temp_p.get_text().strip() + sep
new_str += f"({i}) {temp_p.get_text().strip()}{sep}"
i = i + 1
if new_str.endswith(sep):
@ -263,7 +271,10 @@ The total token is too long and cannot be completely translated\n
start_time = time.time()
result_list, retry_count = self.get_best_result_list(
plist_len, new_str, 6, result_list
plist_len,
new_str,
6,
result_list,
)
end_time = time.time()
@ -275,7 +286,7 @@ The total token is too long and cannot be completely translated\n
self.log_translation_mismatch(plist_len, result_list, new_str, sep, log_path)
# del (num), num. sometime (num) will translated to num.
result_list = [re.sub(r"^(\(\d+\)|\d+\.|\d+)\s*", "", s) for s in result_list]
result_list = [re.sub(r"^(\(\d+\)|\d+\.|(\d+))\s*", "", s) for s in result_list]
return result_list
def set_deployment_id(self, deployment_id):

View File

@ -13,7 +13,7 @@ class DeepL(Base):
caiyun translator
"""
def __init__(self, key, language, **kwargs):
def __init__(self, key, language, **kwargs) -> None:
super().__init__(key, language)
self.api_url = "https://deepl-translator.p.rapidapi.com/translate"
self.headers = {
@ -22,10 +22,7 @@ class DeepL(Base):
"X-RapidAPI-Host": "deepl-translator.p.rapidapi.com",
}
l = None
if language in LANGUAGES:
l = language
else:
l = TO_LANGUAGE_CODE.get(language)
l = language if language in LANGUAGES else TO_LANGUAGE_CODE.get(language)
if l not in [
"bg",
"zh",
@ -71,13 +68,19 @@ class DeepL(Base):
payload = {"text": text, "source": "EN", "target": self.language}
try:
response = requests.request(
"POST", self.api_url, data=json.dumps(payload), headers=self.headers
"POST",
self.api_url,
data=json.dumps(payload),
headers=self.headers,
)
except Exception as e:
print(str(e))
print(e)
time.sleep(30)
response = requests.request(
"POST", self.api_url, data=json.dumps(payload), headers=self.headers
"POST",
self.api_url,
data=json.dumps(payload),
headers=self.headers,
)
t_text = response.json().get("text", "")
print(t_text)

View File

@ -8,7 +8,7 @@ class Google(Base):
google translate
"""
def __init__(self, key, language, **kwargs):
def __init__(self, key, language, **kwargs) -> None:
super().__init__(key, language)
self.api_url = "https://translate.google.com/translate_a/single?client=it&dt=qca&dt=t&dt=rmt&dt=bd&dt=rms&dt=sos&dt=md&dt=gt&dt=ld&dt=ss&dt=ex&otf=2&dj=1&hl=en&ie=UTF-8&oe=UTF-8&sl=auto&tl=zh-CN"
self.headers = {
@ -27,12 +27,12 @@ class Google(Base):
r = self.session.post(
self.api_url,
headers=self.headers,
data="q={text}".format(text=requests.utils.quote(text)),
data=f"q={requests.utils.quote(text)}",
)
if not r.ok:
return text
t_text = "".join(
[sentence.get("trans", "") for sentence in r.json()["sentences"]]
[sentence.get("trans", "") for sentence in r.json()["sentences"]],
)
print(t_text)
return t_text

View File

@ -5,7 +5,9 @@ from .base_translator import Base
class GPT3(Base):
def __init__(self, key, language, api_base=None, prompt_template=None, **kwargs):
def __init__(
self, key, language, api_base=None, prompt_template=None, **kwargs
) -> None:
super().__init__(key, language)
self.api_url = (
f"{api_base}v1/completions"
@ -36,7 +38,8 @@ class GPT3(Base):
print(text)
self.rotate_key()
self.data["prompt"] = self.prompt_template.format(
text=text, language=self.language
text=text,
language=self.language,
)
r = self.session.post(self.api_url, headers=self.headers, json=self.data)
if not r.ok: