bilingual_book_maker/make_book.py
2023-03-07 17:57:34 +08:00

397 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import os
import pickle
import sys
import time
from abc import abstractmethod
from copy import copy
from os import environ as env
from pathlib import Path
import openai
import requests
from bs4 import BeautifulSoup as bs
from ebooklib import epub, ITEM_DOCUMENT
from rich import print
from tqdm import tqdm
from utils import LANGUAGES, TO_LANGUAGE_CODE
NO_LIMIT = False
IS_TEST = False
RESUME = False
class Base:
def __init__(self, key, language, api_base=None):
self.key = key
self.language = language
self.current_key_index = 0
def get_key(self, key_str):
keys = key_str.split(",")
key = keys[self.current_key_index]
self.current_key_index = (self.current_key_index + 1) % len(keys)
return key
@abstractmethod
def translate(self, text):
pass
class GPT3(Base):
def __init__(self, key, language, api_base=None):
super().__init__(key, language)
self.api_key = key
self.api_url = (
f"{api_base}v1/completions"
if api_base
else "https://api.openai.com/v1/completions"
)
self.headers = {
"Content-Type": "application/json",
}
# TODO support more models here
self.data = {
"prompt": "",
"model": "text-davinci-003",
"max_tokens": 1024,
"temperature": 1,
"top_p": 1,
}
self.session = requests.session()
self.language = language
def translate(self, text):
print(text)
self.headers["Authorization"] = f"Bearer {self.get_key(self.api_key)}"
self.data["prompt"] = f"Please help me to translate`{text}` to {self.language}"
r = self.session.post(self.api_url, headers=self.headers, json=self.data)
if not r.ok:
return text
t_text = r.json().get("choices")[0].get("text", "").strip()
print(t_text)
return t_text
class DeepL(Base):
def __init__(self, session, key, api_base=None):
super().__init__(session, key, api_base=api_base)
def translate(self, text):
return super().translate(text)
class ChatGPT(Base):
def __init__(self, key, language, api_base=None):
super().__init__(key, language, api_base=api_base)
self.key = key
self.language = language
if api_base:
openai.api_base = api_base
def translate(self, text):
print(text)
openai.api_key = self.get_key(self.key)
try:
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
# english prompt here to save tokens
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
}
],
)
t_text = (
completion["choices"][0]
.get("message")
.get("content")
.encode("utf8")
.decode()
)
if not NO_LIMIT:
# for time limit
time.sleep(3)
except Exception as e:
# TIME LIMIT for open api please pay
key_len = self.key.count(",") + 1
sleep_time = int(60 / key_len)
time.sleep(sleep_time)
print(e, f"will sleep {sleep_time} seconds")
openai.api_key = self.get_key(self.key)
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"Please help me to translate,`{text}` to {self.language}, please return only translated content not include the origin text",
}
],
)
t_text = (
completion["choices"][0]
.get("message")
.get("content")
.encode("utf8")
.decode()
)
print(t_text)
return t_text
class BEPUB:
def __init__(self, epub_name, model, key, resume, language, model_api_base=None):
self.epub_name = epub_name
self.new_epub = epub.EpubBook()
self.translate_model = model(key, language, model_api_base)
try:
self.origin_book = epub.read_epub(self.epub_name)
except:
# tricky for #71 if you don't know why please check the issue and ignore this
# when upstream change will TODO fix this
def _load_spine(self):
spine = self.container.find(
"{%s}%s" % (epub.NAMESPACES["OPF"], "spine")
)
self.book.spine = [
(t.get("idref"), t.get("linear", "yes")) for t in spine
]
self.book.set_direction(spine.get("page-progression-direction", None))
epub.EpubReader._load_spine = _load_spine
self.origin_book = epub.read_epub(self.epub_name)
self.p_to_save = []
self.resume = resume
self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin"
if self.resume:
self.load_state()
@staticmethod
def _is_special_text(text):
return text.isdigit() or text.isspace()
def _make_new_book(self, book):
new_book = epub.EpubBook()
new_book.metadata = book.metadata
new_book.spine = book.spine
new_book.toc = book.toc
return new_book
def make_bilingual_book(self):
new_book = self._make_new_book(self.origin_book)
all_items = list(self.origin_book.get_items())
all_p_length = sum(
len(bs(i.content, "html.parser").findAll("p"))
if i.file_name.endswith(".xhtml")
else len(bs(i.content, "xml").findAll("p"))
for i in all_items
)
pbar = tqdm(total=TEST_NUM) if IS_TEST else tqdm(total=all_p_length)
index = 0
p_to_save_len = len(self.p_to_save)
try:
for item in self.origin_book.get_items():
if item.get_type() == ITEM_DOCUMENT:
soup = bs(item.content, "html.parser")
p_list = soup.findAll("p")
is_test_done = IS_TEST and index > TEST_NUM
for p in p_list:
if is_test_done or not p.text or self._is_special_text(p.text):
continue
new_p = copy(p)
# TODO banch of p to translate then combine
# PR welcome here
if self.resume and index < p_to_save_len:
new_p.string = self.p_to_save[index]
else:
new_p.string = self.translate_model.translate(p.text)
self.p_to_save.append(new_p.text)
p.insert_after(new_p)
index += 1
if index % 50 == 0:
self._save_progress()
# pbar.update(delta) not pbar.update(index)?
pbar.update(1)
if IS_TEST and index > TEST_NUM:
break
item.content = soup.prettify().encode()
new_book.add_item(item)
name, _ = os.path.splitext(self.epub_name)
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
pbar.close()
except (KeyboardInterrupt, Exception) as e:
print(e)
print("you can resume it next time")
self._save_progress()
self._save_temp_book()
sys.exit(0)
def load_state(self):
try:
with open(self.bin_path, "rb") as f:
self.p_to_save = pickle.load(f)
except:
raise Exception("can not load resume file")
def _save_temp_book(self):
origin_book_temp = epub.read_epub(
self.epub_name
) # we need a new instance for temp save
new_temp_book = self._make_new_book(origin_book_temp)
p_to_save_len = len(self.p_to_save)
index = 0
# items clear
try:
for item in self.origin_book.get_items():
if item.get_type() == ITEM_DOCUMENT:
soup = (
bs(item.content, "xml")
if item.file_name.endswith(".xhtml")
else bs(item.content, "html.parser")
)
p_list = soup.findAll("p")
for p in p_list:
if not p.text or self._is_special_text(p.text):
continue
# TODO banch of p to translate then combine
# PR welcome here
if index < p_to_save_len:
new_p = copy(p)
new_p.string = self.p_to_save[index]
print(new_p.string)
p.insert_after(new_p)
index += 1
else:
break
# for save temp book
item.content = soup.prettify().encode()
new_temp_book.add_item(item)
name, _ = os.path.splitext(self.epub_name)
epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {})
except Exception as e:
# TODO handle it
print(e)
def _save_progress(self):
try:
with open(self.bin_path, "wb") as f:
pickle.dump(self.p_to_save, f)
except:
raise Exception("can not save resume file")
if __name__ == "__main__":
MODEL_DICT = {"gpt3": GPT3, "chatgpt": ChatGPT}
parser = argparse.ArgumentParser()
parser.add_argument(
"--book_name",
dest="book_name",
type=str,
help="your epub book file path",
)
parser.add_argument(
"--openai_key",
dest="openai_key",
type=str,
default="",
help="openai api key,if you have more than one key,you can use comma"
" to split them and you can break through the limitation",
)
parser.add_argument(
"--no_limit",
dest="no_limit",
action="store_true",
help="If you are a paying customer you can add it",
)
parser.add_argument(
"--test",
dest="test",
action="store_true",
help="if test we only translat 10 contents you can easily check",
)
parser.add_argument(
"--test_num",
dest="test_num",
type=int,
default=10,
help="test num for the test",
)
parser.add_argument(
"-m",
"--model",
dest="model",
type=str,
default="chatgpt",
choices=["chatgpt", "gpt3"], # support DeepL later
metavar="MODEL",
help="Which model to use, available: {%(choices)s}",
)
parser.add_argument(
"--language",
type=str,
choices=sorted(LANGUAGES.keys())
+ sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]),
default="zh-hans",
metavar="LANGUAGE",
help="language to translate to, available: {%(choices)s}",
)
parser.add_argument(
"--resume",
dest="resume",
action="store_true",
help="if program accidentally stop you can use this to resume",
)
parser.add_argument(
"-p",
"--proxy",
dest="proxy",
type=str,
default="",
help="use proxy like http://127.0.0.1:7890",
)
# args to change api_base
parser.add_argument(
"--api_base",
dest="api_base",
type=str,
help="replace base url from openapi",
)
options = parser.parse_args()
NO_LIMIT = options.no_limit
IS_TEST = options.test
TEST_NUM = options.test_num
PROXY = options.proxy
if PROXY != "":
os.environ["http_proxy"] = PROXY
os.environ["https_proxy"] = PROXY
OPENAI_API_KEY = options.openai_key or env.get("OPENAI_API_KEY")
RESUME = options.resume
if not OPENAI_API_KEY:
raise Exception("Need openai API key, please google how to")
if not options.book_name.lower().endswith(".epub"):
raise Exception("please use epub file")
model = MODEL_DICT.get(options.model, "chatgpt")
language = options.language
if options.language in LANGUAGES:
# use the value for prompt
language = LANGUAGES.get(language, language)
# change api_base for issue #42
model_api_base = options.api_base
e = BEPUB(
options.book_name,
model,
OPENAI_API_KEY,
RESUME,
language=language,
model_api_base=model_api_base,
)
e.make_bilingual_book()