mirror of
https://github.com/yihong0618/bilingual_book_maker.git
synced 2025-06-01 00:50:12 +00:00
feat: refactor and we have no type hint never (#97)
This commit is contained in:
parent
0bed9959f3
commit
cdeaaea6ce
0
book_maker/__init__.py
Normal file
0
book_maker/__init__.py
Normal file
1
book_maker/__main__.py
Normal file
1
book_maker/__main__.py
Normal file
@ -0,0 +1 @@
|
||||
from cli import main
|
127
book_maker/cli.py
Normal file
127
book_maker/cli.py
Normal file
@ -0,0 +1,127 @@
|
||||
import argparse
|
||||
import os
|
||||
from os import environ as env
|
||||
|
||||
from book_maker.loader import BOOK_LOADER_DICT
|
||||
from book_maker.translator import MODEL_DICT
|
||||
from book_maker.utils import LANGUAGES, TO_LANGUAGE_CODE
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--book_name",
|
||||
dest="book_name",
|
||||
type=str,
|
||||
help="your epub book file path",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--openai_key",
|
||||
dest="openai_key",
|
||||
type=str,
|
||||
default="",
|
||||
help="openai api key,if you have more than one key,you can use comma"
|
||||
" to split them and you can break through the limitation",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no_limit",
|
||||
dest="no_limit",
|
||||
action="store_true",
|
||||
help="If you are a paying customer you can add it",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test",
|
||||
dest="test",
|
||||
action="store_true",
|
||||
help="if test we only translat 10 contents you can easily check",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test_num",
|
||||
dest="test_num",
|
||||
type=int,
|
||||
default=10,
|
||||
help="test num for the test",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--model",
|
||||
dest="model",
|
||||
type=str,
|
||||
default="chatgptapi",
|
||||
choices=["chatgptapi", "gpt3"], # support DeepL later
|
||||
metavar="MODEL",
|
||||
help="Which model to use, available: {%(choices)s}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--language",
|
||||
type=str,
|
||||
choices=sorted(LANGUAGES.keys())
|
||||
+ sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]),
|
||||
default="zh-hans",
|
||||
metavar="LANGUAGE",
|
||||
help="language to translate to, available: {%(choices)s}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--resume",
|
||||
dest="resume",
|
||||
action="store_true",
|
||||
help="if program accidentally stop you can use this to resume",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--proxy",
|
||||
dest="proxy",
|
||||
type=str,
|
||||
default="",
|
||||
help="use proxy like http://127.0.0.1:7890",
|
||||
)
|
||||
# args to change api_base
|
||||
parser.add_argument(
|
||||
"--api_base",
|
||||
dest="api_base",
|
||||
type=str,
|
||||
help="replace base url from openapi",
|
||||
)
|
||||
|
||||
options = parser.parse_args()
|
||||
PROXY = options.proxy
|
||||
if PROXY != "":
|
||||
os.environ["http_proxy"] = PROXY
|
||||
os.environ["https_proxy"] = PROXY
|
||||
|
||||
OPENAI_API_KEY = options.openai_key or env.get("OPENAI_API_KEY")
|
||||
if not OPENAI_API_KEY:
|
||||
raise Exception("Need openai API key, please google how to")
|
||||
|
||||
book_type = options.book_name.split(".")[-1]
|
||||
support_type_list = list(BOOK_LOADER_DICT.keys())
|
||||
if book_type not in support_type_list:
|
||||
raise Exception(f"now only support {','.join(support_type_list)} files")
|
||||
translate_model = MODEL_DICT.get(options.model)
|
||||
assert translate_model is not None, "Not support model"
|
||||
|
||||
book_loader = BOOK_LOADER_DICT.get(book_type)
|
||||
assert book_loader is not None, "Not support loader"
|
||||
language = options.language
|
||||
if options.language in LANGUAGES:
|
||||
# use the value for prompt
|
||||
language = LANGUAGES.get(language, language)
|
||||
|
||||
# change api_base for issue #42
|
||||
model_api_base = options.api_base
|
||||
|
||||
e = book_loader(
|
||||
options.book_name,
|
||||
translate_model,
|
||||
OPENAI_API_KEY,
|
||||
options.resume,
|
||||
language=language,
|
||||
model_api_base=model_api_base,
|
||||
is_test=options.test,
|
||||
test_num=options.test_num,
|
||||
)
|
||||
e.make_bilingual_book()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
6
book_maker/loader/__init__.py
Normal file
6
book_maker/loader/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
from book_maker.loader.epub_loader import EPUBBookLoader
|
||||
|
||||
BOOK_LOADER_DICT = {
|
||||
"epub": EPUBBookLoader
|
||||
# TODO add more here
|
||||
}
|
40
book_maker/loader/base_loader.py
Normal file
40
book_maker/loader/base_loader.py
Normal file
@ -0,0 +1,40 @@
|
||||
from abc import abstractmethod
|
||||
|
||||
|
||||
class BaseBookLoader:
|
||||
def __init__(
|
||||
self,
|
||||
epub_name,
|
||||
model,
|
||||
key,
|
||||
resume,
|
||||
language,
|
||||
model_api_base=None,
|
||||
is_test=False,
|
||||
test_num=5,
|
||||
):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _is_special_text(text):
|
||||
return text.isdigit() or text.isspace()
|
||||
|
||||
@abstractmethod
|
||||
def _make_new_book(self, book):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def make_bilingual_book(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def load_state(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _save_temp_book(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _save_progress(self):
|
||||
pass
|
168
book_maker/loader/epub_loader.py
Normal file
168
book_maker/loader/epub_loader.py
Normal file
@ -0,0 +1,168 @@
|
||||
import argparse
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
from copy import copy
|
||||
from pathlib import Path
|
||||
|
||||
from bs4 import BeautifulSoup as bs
|
||||
from ebooklib import ITEM_DOCUMENT, epub
|
||||
from rich import print
|
||||
from tqdm import tqdm
|
||||
|
||||
from .base_loader import BaseBookLoader
|
||||
|
||||
|
||||
class EPUBBookLoader(BaseBookLoader):
|
||||
def __init__(
|
||||
self,
|
||||
epub_name,
|
||||
model,
|
||||
key,
|
||||
resume,
|
||||
language,
|
||||
model_api_base=None,
|
||||
is_test=False,
|
||||
test_num=5,
|
||||
):
|
||||
self.epub_name = epub_name
|
||||
self.new_epub = epub.EpubBook()
|
||||
self.translate_model = model(key, language, model_api_base)
|
||||
self.is_test = is_test
|
||||
self.test_num = test_num
|
||||
|
||||
try:
|
||||
self.origin_book = epub.read_epub(self.epub_name)
|
||||
except:
|
||||
# tricky for #71 if you don't know why please check the issue and ignore this
|
||||
# when upstream change will TODO fix this
|
||||
def _load_spine(self):
|
||||
spine = self.container.find(
|
||||
"{%s}%s" % (epub.NAMESPACES["OPF"], "spine")
|
||||
)
|
||||
|
||||
self.book.spine = [
|
||||
(t.get("idref"), t.get("linear", "yes")) for t in spine
|
||||
]
|
||||
self.book.set_direction(spine.get("page-progression-direction", None))
|
||||
|
||||
epub.EpubReader._load_spine = _load_spine
|
||||
self.origin_book = epub.read_epub(self.epub_name)
|
||||
|
||||
self.p_to_save = []
|
||||
self.resume = resume
|
||||
self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin"
|
||||
if self.resume:
|
||||
self.load_state()
|
||||
|
||||
@staticmethod
|
||||
def _is_special_text(text):
|
||||
return text.isdigit() or text.isspace()
|
||||
|
||||
def _make_new_book(self, book):
|
||||
new_book = epub.EpubBook()
|
||||
new_book.metadata = book.metadata
|
||||
new_book.spine = book.spine
|
||||
new_book.toc = book.toc
|
||||
return new_book
|
||||
|
||||
def make_bilingual_book(self):
|
||||
new_book = self._make_new_book(self.origin_book)
|
||||
all_items = list(self.origin_book.get_items())
|
||||
all_p_length = sum(
|
||||
0
|
||||
if i.get_type() != ITEM_DOCUMENT
|
||||
else len(bs(i.content, "html.parser").findAll("p"))
|
||||
for i in all_items
|
||||
)
|
||||
pbar = tqdm(total=self.test_num) if self.is_test else tqdm(total=all_p_length)
|
||||
index = 0
|
||||
p_to_save_len = len(self.p_to_save)
|
||||
try:
|
||||
for item in self.origin_book.get_items():
|
||||
if item.get_type() == ITEM_DOCUMENT:
|
||||
soup = bs(item.content, "html.parser")
|
||||
p_list = soup.findAll("p")
|
||||
is_test_done = self.is_test and index > self.test_num
|
||||
for p in p_list:
|
||||
if is_test_done or not p.text or self._is_special_text(p.text):
|
||||
continue
|
||||
new_p = copy(p)
|
||||
# TODO banch of p to translate then combine
|
||||
# PR welcome here
|
||||
if self.resume and index < p_to_save_len:
|
||||
new_p.string = self.p_to_save[index]
|
||||
else:
|
||||
new_p.string = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p.text)
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
if index % 20 == 0:
|
||||
self._save_progress()
|
||||
# pbar.update(delta) not pbar.update(index)?
|
||||
pbar.update(1)
|
||||
if self.is_test and index >= self.test_num:
|
||||
break
|
||||
item.content = soup.prettify().encode()
|
||||
new_book.add_item(item)
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
||||
pbar.close()
|
||||
except (KeyboardInterrupt, Exception) as e:
|
||||
print(e)
|
||||
print("you can resume it next time")
|
||||
self._save_progress()
|
||||
self._save_temp_book()
|
||||
sys.exit(0)
|
||||
|
||||
def load_state(self):
|
||||
try:
|
||||
with open(self.bin_path, "rb") as f:
|
||||
self.p_to_save = pickle.load(f)
|
||||
except:
|
||||
raise Exception("can not load resume file")
|
||||
|
||||
def _save_temp_book(self):
|
||||
origin_book_temp = epub.read_epub(
|
||||
self.epub_name
|
||||
) # we need a new instance for temp save
|
||||
new_temp_book = self._make_new_book(origin_book_temp)
|
||||
p_to_save_len = len(self.p_to_save)
|
||||
index = 0
|
||||
try:
|
||||
for item in self.origin_book.get_items():
|
||||
if item.get_type() == ITEM_DOCUMENT:
|
||||
soup = (
|
||||
bs(item.content, "xml")
|
||||
if item.file_name.endswith(".xhtml")
|
||||
else bs(item.content, "html.parser")
|
||||
)
|
||||
p_list = soup.findAll("p")
|
||||
for p in p_list:
|
||||
if not p.text or self._is_special_text(p.text):
|
||||
continue
|
||||
# TODO banch of p to translate then combine
|
||||
# PR welcome here
|
||||
if index < p_to_save_len:
|
||||
new_p = copy(p)
|
||||
new_p.string = self.p_to_save[index]
|
||||
print(new_p.string)
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
else:
|
||||
break
|
||||
# for save temp book
|
||||
item.content = soup.prettify().encode()
|
||||
new_temp_book.add_item(item)
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {})
|
||||
except Exception as e:
|
||||
# TODO handle it
|
||||
print(e)
|
||||
|
||||
def _save_progress(self):
|
||||
try:
|
||||
with open(self.bin_path, "wb") as f:
|
||||
pickle.dump(self.p_to_save, f)
|
||||
except:
|
||||
raise Exception("can not save resume file")
|
1
book_maker/loader/srt_loader.py
Normal file
1
book_maker/loader/srt_loader.py
Normal file
@ -0,0 +1 @@
|
||||
"""TODO"""
|
1
book_maker/loader/txt_loader.py
Normal file
1
book_maker/loader/txt_loader.py
Normal file
@ -0,0 +1 @@
|
||||
"""TODO"""
|
8
book_maker/translator/__init__.py
Normal file
8
book_maker/translator/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from book_maker.translator.chatgptapi_translator import ChatGPTAPI
|
||||
from book_maker.translator.gpt3_translator import GPT3
|
||||
|
||||
MODEL_DICT = {
|
||||
"chatgptapi": ChatGPTAPI,
|
||||
"gpt3": GPT3,
|
||||
# add more here
|
||||
}
|
18
book_maker/translator/base_translator.py
Normal file
18
book_maker/translator/base_translator.py
Normal file
@ -0,0 +1,18 @@
|
||||
from abc import abstractmethod
|
||||
|
||||
|
||||
class Base:
|
||||
def __init__(self, key, language, api_base=None):
|
||||
self.key = key
|
||||
self.language = language
|
||||
self.current_key_index = 0
|
||||
|
||||
def get_key(self, key_str):
|
||||
keys = key_str.split(",")
|
||||
key = keys[self.current_key_index]
|
||||
self.current_key_index = (self.current_key_index + 1) % len(keys)
|
||||
return key
|
||||
|
||||
@abstractmethod
|
||||
def translate(self, text):
|
||||
pass
|
61
book_maker/translator/chatgptapi_translator.py
Normal file
61
book_maker/translator/chatgptapi_translator.py
Normal file
@ -0,0 +1,61 @@
|
||||
import time
|
||||
|
||||
import openai
|
||||
|
||||
from .base_translator import Base
|
||||
|
||||
|
||||
class ChatGPTAPI(Base):
|
||||
def __init__(self, key, language, api_base=None):
|
||||
super().__init__(key, language, api_base=api_base)
|
||||
self.key = key
|
||||
self.language = language
|
||||
if api_base:
|
||||
openai.api_base = api_base
|
||||
|
||||
def translate(self, text):
|
||||
print(text)
|
||||
openai.api_key = self.get_key(self.key)
|
||||
try:
|
||||
completion = openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
# english prompt here to save tokens
|
||||
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
|
||||
}
|
||||
],
|
||||
)
|
||||
t_text = (
|
||||
completion["choices"][0]
|
||||
.get("message")
|
||||
.get("content")
|
||||
.encode("utf8")
|
||||
.decode()
|
||||
)
|
||||
except Exception as e:
|
||||
# TIME LIMIT for open api please pay
|
||||
key_len = self.key.count(",") + 1
|
||||
sleep_time = int(60 / key_len)
|
||||
time.sleep(sleep_time)
|
||||
print(e, f"will sleep {sleep_time} seconds")
|
||||
openai.api_key = self.get_key(self.key)
|
||||
completion = openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
|
||||
}
|
||||
],
|
||||
)
|
||||
t_text = (
|
||||
completion["choices"][0]
|
||||
.get("message")
|
||||
.get("content")
|
||||
.encode("utf8")
|
||||
.decode()
|
||||
)
|
||||
print(t_text)
|
||||
return t_text
|
9
book_maker/translator/deepl_translator.py
Normal file
9
book_maker/translator/deepl_translator.py
Normal file
@ -0,0 +1,9 @@
|
||||
from .base_translator import Base
|
||||
|
||||
|
||||
class DeepL(Base):
|
||||
def __init__(self, session, key, api_base=None):
|
||||
super().__init__(session, key, api_base=api_base)
|
||||
|
||||
def translate(self, text):
|
||||
return super().translate(text)
|
39
book_maker/translator/gpt3_translator.py
Normal file
39
book_maker/translator/gpt3_translator.py
Normal file
@ -0,0 +1,39 @@
|
||||
import requests
|
||||
from rich import print
|
||||
|
||||
from .base_translator import Base
|
||||
|
||||
|
||||
class GPT3(Base):
|
||||
def __init__(self, key, language, api_base=None):
|
||||
super().__init__(key, language)
|
||||
self.api_key = key
|
||||
self.api_url = (
|
||||
f"{api_base}v1/completions"
|
||||
if api_base
|
||||
else "https://api.openai.com/v1/completions"
|
||||
)
|
||||
self.headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
# TODO support more models here
|
||||
self.data = {
|
||||
"prompt": "",
|
||||
"model": "text-davinci-003",
|
||||
"max_tokens": 1024,
|
||||
"temperature": 1,
|
||||
"top_p": 1,
|
||||
}
|
||||
self.session = requests.session()
|
||||
self.language = language
|
||||
|
||||
def translate(self, text):
|
||||
print(text)
|
||||
self.headers["Authorization"] = f"Bearer {self.get_key(self.api_key)}"
|
||||
self.data["prompt"] = f"Please help me to translate,`{text}` to {self.language}"
|
||||
r = self.session.post(self.api_url, headers=self.headers, json=self.data)
|
||||
if not r.ok:
|
||||
return text
|
||||
t_text = r.json().get("choices")[0].get("text", "").strip()
|
||||
print(t_text)
|
||||
return t_text
|
396
make_book.py
396
make_book.py
@ -1,396 +1,4 @@
|
||||
import argparse
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from copy import copy
|
||||
from os import environ as env
|
||||
from pathlib import Path
|
||||
|
||||
import openai
|
||||
import requests
|
||||
from bs4 import BeautifulSoup as bs
|
||||
from ebooklib import epub, ITEM_DOCUMENT
|
||||
from rich import print
|
||||
from tqdm import tqdm
|
||||
|
||||
from utils import LANGUAGES, TO_LANGUAGE_CODE
|
||||
|
||||
NO_LIMIT = False
|
||||
IS_TEST = False
|
||||
RESUME = False
|
||||
|
||||
|
||||
class Base:
|
||||
def __init__(self, key, language, api_base=None):
|
||||
self.key = key
|
||||
self.language = language
|
||||
self.current_key_index = 0
|
||||
|
||||
def get_key(self, key_str):
|
||||
keys = key_str.split(",")
|
||||
key = keys[self.current_key_index]
|
||||
self.current_key_index = (self.current_key_index + 1) % len(keys)
|
||||
return key
|
||||
|
||||
@abstractmethod
|
||||
def translate(self, text):
|
||||
pass
|
||||
|
||||
|
||||
class GPT3(Base):
|
||||
def __init__(self, key, language, api_base=None):
|
||||
super().__init__(key, language)
|
||||
self.api_key = key
|
||||
self.api_url = (
|
||||
f"{api_base}v1/completions"
|
||||
if api_base
|
||||
else "https://api.openai.com/v1/completions"
|
||||
)
|
||||
self.headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
# TODO support more models here
|
||||
self.data = {
|
||||
"prompt": "",
|
||||
"model": "text-davinci-003",
|
||||
"max_tokens": 1024,
|
||||
"temperature": 1,
|
||||
"top_p": 1,
|
||||
}
|
||||
self.session = requests.session()
|
||||
self.language = language
|
||||
|
||||
def translate(self, text):
|
||||
print(text)
|
||||
self.headers["Authorization"] = f"Bearer {self.get_key(self.api_key)}"
|
||||
self.data["prompt"] = f"Please help me to translate,`{text}` to {self.language}"
|
||||
r = self.session.post(self.api_url, headers=self.headers, json=self.data)
|
||||
if not r.ok:
|
||||
return text
|
||||
t_text = r.json().get("choices")[0].get("text", "").strip()
|
||||
print(t_text)
|
||||
return t_text
|
||||
|
||||
|
||||
class DeepL(Base):
|
||||
def __init__(self, session, key, api_base=None):
|
||||
super().__init__(session, key, api_base=api_base)
|
||||
|
||||
def translate(self, text):
|
||||
return super().translate(text)
|
||||
|
||||
|
||||
class ChatGPT(Base):
|
||||
def __init__(self, key, language, api_base=None):
|
||||
super().__init__(key, language, api_base=api_base)
|
||||
self.key = key
|
||||
self.language = language
|
||||
if api_base:
|
||||
openai.api_base = api_base
|
||||
|
||||
def translate(self, text):
|
||||
print(text)
|
||||
openai.api_key = self.get_key(self.key)
|
||||
try:
|
||||
completion = openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
# english prompt here to save tokens
|
||||
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
|
||||
}
|
||||
],
|
||||
)
|
||||
t_text = (
|
||||
completion["choices"][0]
|
||||
.get("message")
|
||||
.get("content")
|
||||
.encode("utf8")
|
||||
.decode()
|
||||
)
|
||||
if not NO_LIMIT:
|
||||
# for time limit
|
||||
time.sleep(3)
|
||||
except Exception as e:
|
||||
# TIME LIMIT for open api please pay
|
||||
key_len = self.key.count(",") + 1
|
||||
sleep_time = int(60 / key_len)
|
||||
time.sleep(sleep_time)
|
||||
print(e, f"will sleep {sleep_time} seconds")
|
||||
openai.api_key = self.get_key(self.key)
|
||||
completion = openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
|
||||
}
|
||||
],
|
||||
)
|
||||
t_text = (
|
||||
completion["choices"][0]
|
||||
.get("message")
|
||||
.get("content")
|
||||
.encode("utf8")
|
||||
.decode()
|
||||
)
|
||||
print(t_text)
|
||||
return t_text
|
||||
|
||||
|
||||
class BEPUB:
|
||||
def __init__(self, epub_name, model, key, resume, language, model_api_base=None):
|
||||
self.epub_name = epub_name
|
||||
self.new_epub = epub.EpubBook()
|
||||
self.translate_model = model(key, language, model_api_base)
|
||||
|
||||
try:
|
||||
self.origin_book = epub.read_epub(self.epub_name)
|
||||
except:
|
||||
# tricky for #71 if you don't know why please check the issue and ignore this
|
||||
# when upstream change will TODO fix this
|
||||
def _load_spine(self):
|
||||
spine = self.container.find(
|
||||
"{%s}%s" % (epub.NAMESPACES["OPF"], "spine")
|
||||
)
|
||||
|
||||
self.book.spine = [
|
||||
(t.get("idref"), t.get("linear", "yes")) for t in spine
|
||||
]
|
||||
self.book.set_direction(spine.get("page-progression-direction", None))
|
||||
|
||||
epub.EpubReader._load_spine = _load_spine
|
||||
self.origin_book = epub.read_epub(self.epub_name)
|
||||
|
||||
self.p_to_save = []
|
||||
self.resume = resume
|
||||
self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin"
|
||||
if self.resume:
|
||||
self.load_state()
|
||||
|
||||
@staticmethod
|
||||
def _is_special_text(text):
|
||||
return text.isdigit() or text.isspace()
|
||||
|
||||
def _make_new_book(self, book):
|
||||
new_book = epub.EpubBook()
|
||||
new_book.metadata = book.metadata
|
||||
new_book.spine = book.spine
|
||||
new_book.toc = book.toc
|
||||
return new_book
|
||||
|
||||
def make_bilingual_book(self):
|
||||
new_book = self._make_new_book(self.origin_book)
|
||||
all_items = list(self.origin_book.get_items())
|
||||
all_p_length = sum(
|
||||
0
|
||||
if i.get_type() != ITEM_DOCUMENT
|
||||
else len(bs(i.content, "html.parser").findAll("p"))
|
||||
for i in all_items
|
||||
)
|
||||
pbar = tqdm(total=TEST_NUM) if IS_TEST else tqdm(total=all_p_length)
|
||||
index = 0
|
||||
p_to_save_len = len(self.p_to_save)
|
||||
try:
|
||||
for item in self.origin_book.get_items():
|
||||
if item.get_type() == ITEM_DOCUMENT:
|
||||
soup = bs(item.content, "html.parser")
|
||||
p_list = soup.findAll("p")
|
||||
is_test_done = IS_TEST and index > TEST_NUM
|
||||
for p in p_list:
|
||||
if is_test_done or not p.text or self._is_special_text(p.text):
|
||||
continue
|
||||
new_p = copy(p)
|
||||
# TODO banch of p to translate then combine
|
||||
# PR welcome here
|
||||
if self.resume and index < p_to_save_len:
|
||||
new_p.string = self.p_to_save[index]
|
||||
else:
|
||||
new_p.string = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p.text)
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
if index % 50 == 0:
|
||||
self._save_progress()
|
||||
# pbar.update(delta) not pbar.update(index)?
|
||||
pbar.update(1)
|
||||
if IS_TEST and index >= TEST_NUM:
|
||||
break
|
||||
item.content = soup.prettify().encode()
|
||||
new_book.add_item(item)
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
||||
pbar.close()
|
||||
except (KeyboardInterrupt, Exception) as e:
|
||||
print(e)
|
||||
print("you can resume it next time")
|
||||
self._save_progress()
|
||||
self._save_temp_book()
|
||||
sys.exit(0)
|
||||
|
||||
def load_state(self):
|
||||
try:
|
||||
with open(self.bin_path, "rb") as f:
|
||||
self.p_to_save = pickle.load(f)
|
||||
except:
|
||||
raise Exception("can not load resume file")
|
||||
|
||||
def _save_temp_book(self):
|
||||
origin_book_temp = epub.read_epub(
|
||||
self.epub_name
|
||||
) # we need a new instance for temp save
|
||||
new_temp_book = self._make_new_book(origin_book_temp)
|
||||
p_to_save_len = len(self.p_to_save)
|
||||
index = 0
|
||||
# items clear
|
||||
try:
|
||||
for item in self.origin_book.get_items():
|
||||
if item.get_type() == ITEM_DOCUMENT:
|
||||
soup = (
|
||||
bs(item.content, "xml")
|
||||
if item.file_name.endswith(".xhtml")
|
||||
else bs(item.content, "html.parser")
|
||||
)
|
||||
p_list = soup.findAll("p")
|
||||
for p in p_list:
|
||||
if not p.text or self._is_special_text(p.text):
|
||||
continue
|
||||
# TODO banch of p to translate then combine
|
||||
# PR welcome here
|
||||
if index < p_to_save_len:
|
||||
new_p = copy(p)
|
||||
new_p.string = self.p_to_save[index]
|
||||
print(new_p.string)
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
else:
|
||||
break
|
||||
# for save temp book
|
||||
item.content = soup.prettify().encode()
|
||||
new_temp_book.add_item(item)
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {})
|
||||
except Exception as e:
|
||||
# TODO handle it
|
||||
print(e)
|
||||
|
||||
def _save_progress(self):
|
||||
try:
|
||||
with open(self.bin_path, "wb") as f:
|
||||
pickle.dump(self.p_to_save, f)
|
||||
except:
|
||||
raise Exception("can not save resume file")
|
||||
|
||||
from book_maker.cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
MODEL_DICT = {"gpt3": GPT3, "chatgpt": ChatGPT}
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--book_name",
|
||||
dest="book_name",
|
||||
type=str,
|
||||
help="your epub book file path",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--openai_key",
|
||||
dest="openai_key",
|
||||
type=str,
|
||||
default="",
|
||||
help="openai api key,if you have more than one key,you can use comma"
|
||||
" to split them and you can break through the limitation",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no_limit",
|
||||
dest="no_limit",
|
||||
action="store_true",
|
||||
help="If you are a paying customer you can add it",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test",
|
||||
dest="test",
|
||||
action="store_true",
|
||||
help="if test we only translat 10 contents you can easily check",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test_num",
|
||||
dest="test_num",
|
||||
type=int,
|
||||
default=10,
|
||||
help="test num for the test",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--model",
|
||||
dest="model",
|
||||
type=str,
|
||||
default="chatgpt",
|
||||
choices=["chatgpt", "gpt3"], # support DeepL later
|
||||
metavar="MODEL",
|
||||
help="Which model to use, available: {%(choices)s}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--language",
|
||||
type=str,
|
||||
choices=sorted(LANGUAGES.keys())
|
||||
+ sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]),
|
||||
default="zh-hans",
|
||||
metavar="LANGUAGE",
|
||||
help="language to translate to, available: {%(choices)s}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--resume",
|
||||
dest="resume",
|
||||
action="store_true",
|
||||
help="if program accidentally stop you can use this to resume",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--proxy",
|
||||
dest="proxy",
|
||||
type=str,
|
||||
default="",
|
||||
help="use proxy like http://127.0.0.1:7890",
|
||||
)
|
||||
# args to change api_base
|
||||
parser.add_argument(
|
||||
"--api_base",
|
||||
dest="api_base",
|
||||
type=str,
|
||||
help="replace base url from openapi",
|
||||
)
|
||||
|
||||
options = parser.parse_args()
|
||||
NO_LIMIT = options.no_limit
|
||||
IS_TEST = options.test
|
||||
TEST_NUM = options.test_num
|
||||
PROXY = options.proxy
|
||||
if PROXY != "":
|
||||
os.environ["http_proxy"] = PROXY
|
||||
os.environ["https_proxy"] = PROXY
|
||||
|
||||
OPENAI_API_KEY = options.openai_key or env.get("OPENAI_API_KEY")
|
||||
RESUME = options.resume
|
||||
if not OPENAI_API_KEY:
|
||||
raise Exception("Need openai API key, please google how to")
|
||||
if not options.book_name.lower().endswith(".epub"):
|
||||
raise Exception("please use epub file")
|
||||
model = MODEL_DICT.get(options.model, "chatgpt")
|
||||
language = options.language
|
||||
if options.language in LANGUAGES:
|
||||
# use the value for prompt
|
||||
language = LANGUAGES.get(language, language)
|
||||
|
||||
# change api_base for issue #42
|
||||
model_api_base = options.api_base
|
||||
e = BEPUB(
|
||||
options.book_name,
|
||||
model,
|
||||
OPENAI_API_KEY,
|
||||
RESUME,
|
||||
language=language,
|
||||
model_api_base=model_api_base,
|
||||
)
|
||||
e.make_bilingual_book()
|
||||
main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user