mirror of
https://github.com/yihong0618/bilingual_book_maker.git
synced 2025-06-05 19:15:34 +00:00
617 lines
21 KiB
Python
617 lines
21 KiB
Python
import os
|
|
import pickle
|
|
import string
|
|
import sys
|
|
import time
|
|
from copy import copy
|
|
from pathlib import Path
|
|
|
|
from bs4 import BeautifulSoup as bs
|
|
from bs4 import Tag
|
|
from bs4.element import NavigableString
|
|
from ebooklib import ITEM_DOCUMENT, epub
|
|
from rich import print
|
|
from tqdm import tqdm
|
|
|
|
from book_maker.utils import num_tokens_from_text, prompt_config_to_kwargs
|
|
|
|
from .base_loader import BaseBookLoader
|
|
from .helper import EPUBBookLoaderHelper, is_text_link, not_trans
|
|
|
|
|
|
class EPUBBookLoader(BaseBookLoader):
|
|
def __init__(
|
|
self,
|
|
epub_name,
|
|
model,
|
|
key,
|
|
resume,
|
|
language,
|
|
model_api_base=None,
|
|
is_test=False,
|
|
test_num=5,
|
|
prompt_config=None,
|
|
single_translate=False,
|
|
context_flag=False,
|
|
temperature=1.0,
|
|
context_paragraph_limit=0,
|
|
):
|
|
self.epub_name = epub_name
|
|
self.new_epub = epub.EpubBook()
|
|
self.translate_model = model(
|
|
key,
|
|
language,
|
|
api_base=model_api_base,
|
|
context_flag=context_flag,
|
|
context_paragraph_limit=context_paragraph_limit,
|
|
temperature=temperature,
|
|
**prompt_config_to_kwargs(prompt_config),
|
|
)
|
|
self.is_test = is_test
|
|
self.test_num = test_num
|
|
self.translate_tags = "p"
|
|
self.exclude_translate_tags = "sup"
|
|
self.allow_navigable_strings = False
|
|
self.accumulated_num = 1
|
|
self.translation_style = ""
|
|
self.context_flag = context_flag
|
|
self.helper = EPUBBookLoaderHelper(
|
|
self.translate_model,
|
|
self.accumulated_num,
|
|
self.translation_style,
|
|
self.context_flag,
|
|
)
|
|
self.retranslate = None
|
|
self.exclude_filelist = ""
|
|
self.only_filelist = ""
|
|
self.single_translate = single_translate
|
|
self.block_size = -1
|
|
self.batch_use_flag = False
|
|
self.batch_flag = False
|
|
|
|
# monkey patch for # 173
|
|
def _write_items_patch(obj):
|
|
for item in obj.book.get_items():
|
|
if isinstance(item, epub.EpubNcx):
|
|
obj.out.writestr(
|
|
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_ncx()
|
|
)
|
|
elif isinstance(item, epub.EpubNav):
|
|
obj.out.writestr(
|
|
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name),
|
|
obj._get_nav(item),
|
|
)
|
|
elif item.manifest:
|
|
obj.out.writestr(
|
|
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name), item.content
|
|
)
|
|
else:
|
|
obj.out.writestr("%s" % item.file_name, item.content)
|
|
|
|
def _check_deprecated(obj):
|
|
pass
|
|
|
|
epub.EpubWriter._write_items = _write_items_patch
|
|
epub.EpubReader._check_deprecated = _check_deprecated
|
|
|
|
try:
|
|
self.origin_book = epub.read_epub(self.epub_name)
|
|
except Exception:
|
|
# tricky monkey patch for #71 if you don't know why please check the issue and ignore this
|
|
# when upstream change will TODO fix this
|
|
def _load_spine(obj):
|
|
spine = obj.container.find("{%s}%s" % (epub.NAMESPACES["OPF"], "spine"))
|
|
|
|
obj.book.spine = [
|
|
(t.get("idref"), t.get("linear", "yes")) for t in spine
|
|
]
|
|
obj.book.set_direction(spine.get("page-progression-direction", None))
|
|
|
|
epub.EpubReader._load_spine = _load_spine
|
|
self.origin_book = epub.read_epub(self.epub_name)
|
|
|
|
self.p_to_save = []
|
|
self.resume = resume
|
|
self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin"
|
|
if self.resume:
|
|
self.load_state()
|
|
|
|
@staticmethod
|
|
def _is_special_text(text):
|
|
return (
|
|
text.isdigit()
|
|
or text.isspace()
|
|
or is_text_link(text)
|
|
or all(char in string.punctuation for char in text)
|
|
)
|
|
|
|
def _make_new_book(self, book):
|
|
new_book = epub.EpubBook()
|
|
new_book.metadata = book.metadata
|
|
new_book.spine = book.spine
|
|
new_book.toc = book.toc
|
|
return new_book
|
|
|
|
def _extract_paragraph(self, p):
|
|
for p_exclude in self.exclude_translate_tags.split(","):
|
|
# for issue #280
|
|
if type(p) == NavigableString:
|
|
continue
|
|
for pt in p.find_all(p_exclude):
|
|
pt.extract()
|
|
return p
|
|
|
|
def _process_paragraph(self, p, new_p, index, p_to_save_len):
|
|
if self.resume and index < p_to_save_len:
|
|
p.string = self.p_to_save[index]
|
|
else:
|
|
t_text = ""
|
|
if self.batch_flag:
|
|
self.translate_model.add_to_batch_translate_queue(index, new_p.text)
|
|
elif self.batch_use_flag:
|
|
t_text = self.translate_model.batch_translate(index)
|
|
else:
|
|
t_text = self.translate_model.translate(new_p.text)
|
|
if type(p) == NavigableString:
|
|
new_p = t_text
|
|
self.p_to_save.append(new_p)
|
|
else:
|
|
new_p.string = t_text
|
|
self.p_to_save.append(new_p.text)
|
|
|
|
self.helper.insert_trans(
|
|
p, new_p.string, self.translation_style, self.single_translate
|
|
)
|
|
index += 1
|
|
|
|
if index % 20 == 0:
|
|
self._save_progress()
|
|
return index
|
|
|
|
def _process_combined_paragraph(self, p_block, index, p_to_save_len):
|
|
text = []
|
|
|
|
for p in p_block:
|
|
if self.resume and index < p_to_save_len:
|
|
p.string = self.p_to_save[index]
|
|
else:
|
|
p_text = p.text.rstrip()
|
|
text.append(p_text)
|
|
|
|
if self.is_test and index >= self.test_num:
|
|
break
|
|
|
|
index += 1
|
|
|
|
if len(text) > 0:
|
|
translated_text = self.translate_model.translate("\n".join(text))
|
|
translated_text = translated_text.split("\n")
|
|
text_len = len(translated_text)
|
|
|
|
for i in range(text_len):
|
|
t = translated_text[i]
|
|
|
|
if i >= len(p_block):
|
|
p = p_block[-1]
|
|
else:
|
|
p = p_block[i]
|
|
|
|
if type(p) == NavigableString:
|
|
p = t
|
|
else:
|
|
p.string = t
|
|
|
|
self.helper.insert_trans(
|
|
p, p.string, self.translation_style, self.single_translate
|
|
)
|
|
|
|
self._save_progress()
|
|
return index
|
|
|
|
def translate_paragraphs_acc(self, p_list, send_num):
|
|
count = 0
|
|
wait_p_list = []
|
|
for i in range(len(p_list)):
|
|
p = p_list[i]
|
|
print(f"translating {i}/{len(p_list)}")
|
|
temp_p = copy(p)
|
|
|
|
for p_exclude in self.exclude_translate_tags.split(","):
|
|
# for issue #280
|
|
if type(p) == NavigableString:
|
|
continue
|
|
for pt in temp_p.find_all(p_exclude):
|
|
pt.extract()
|
|
|
|
if any(
|
|
[not p.text, self._is_special_text(temp_p.text), not_trans(temp_p.text)]
|
|
):
|
|
if i == len(p_list) - 1:
|
|
self.helper.deal_old(wait_p_list, self.single_translate)
|
|
continue
|
|
length = num_tokens_from_text(temp_p.text)
|
|
if length > send_num:
|
|
self.helper.deal_new(p, wait_p_list, self.single_translate)
|
|
continue
|
|
if i == len(p_list) - 1:
|
|
if count + length < send_num:
|
|
wait_p_list.append(p)
|
|
self.helper.deal_old(wait_p_list, self.single_translate)
|
|
else:
|
|
self.helper.deal_new(p, wait_p_list, self.single_translate)
|
|
break
|
|
if count + length < send_num:
|
|
count += length
|
|
wait_p_list.append(p)
|
|
else:
|
|
self.helper.deal_old(wait_p_list, self.single_translate)
|
|
wait_p_list.append(p)
|
|
count = length
|
|
|
|
def get_item(self, book, name):
|
|
for item in book.get_items():
|
|
if item.file_name == name:
|
|
return item
|
|
|
|
def find_items_containing_string(self, book, search_string):
|
|
matching_items = []
|
|
|
|
for item in book.get_items_of_type(ITEM_DOCUMENT):
|
|
content = item.get_content().decode("utf-8")
|
|
if search_string in content:
|
|
matching_items.append(item)
|
|
|
|
return matching_items
|
|
|
|
def retranslate_book(self, index, p_to_save_len, pbar, trans_taglist, retranslate):
|
|
complete_book_name = retranslate[0]
|
|
fixname = retranslate[1]
|
|
fixstart = retranslate[2]
|
|
fixend = retranslate[3]
|
|
|
|
if fixend == "":
|
|
fixend = fixstart
|
|
|
|
name_fix = complete_book_name
|
|
|
|
complete_book = epub.read_epub(complete_book_name)
|
|
|
|
if fixname == "":
|
|
fixname = self.find_items_containing_string(complete_book, fixstart)[
|
|
0
|
|
].file_name
|
|
print(f"auto find fixname: {fixname}")
|
|
|
|
new_book = self._make_new_book(complete_book)
|
|
|
|
complete_item = self.get_item(complete_book, fixname)
|
|
if complete_item is None:
|
|
return
|
|
|
|
ori_item = self.get_item(self.origin_book, fixname)
|
|
if ori_item is None:
|
|
return
|
|
|
|
soup_complete = bs(complete_item.content, "html.parser")
|
|
soup_ori = bs(ori_item.content, "html.parser")
|
|
|
|
p_list_complete = soup_complete.findAll(trans_taglist)
|
|
p_list_ori = soup_ori.findAll(trans_taglist)
|
|
|
|
target = None
|
|
tagl = []
|
|
|
|
# extract from range
|
|
find_end = False
|
|
find_start = False
|
|
for tag in p_list_complete:
|
|
if find_end:
|
|
tagl.append(tag)
|
|
break
|
|
|
|
if fixend in tag.text:
|
|
find_end = True
|
|
if fixstart in tag.text:
|
|
find_start = True
|
|
|
|
if find_start:
|
|
if not target:
|
|
target = tag.previous_sibling
|
|
tagl.append(tag)
|
|
|
|
for t in tagl:
|
|
t.extract()
|
|
|
|
flag = False
|
|
extract_p_list_ori = []
|
|
for p in p_list_ori:
|
|
if fixstart in p.text:
|
|
flag = True
|
|
if flag:
|
|
extract_p_list_ori.append(p)
|
|
if fixend in p.text:
|
|
break
|
|
|
|
for t in extract_p_list_ori:
|
|
if target:
|
|
target.insert_after(t)
|
|
target = t
|
|
|
|
for item in complete_book.get_items():
|
|
if item.file_name != fixname:
|
|
new_book.add_item(item)
|
|
if soup_complete:
|
|
complete_item.content = soup_complete.encode()
|
|
|
|
index = self.process_item(
|
|
complete_item,
|
|
index,
|
|
p_to_save_len,
|
|
pbar,
|
|
new_book,
|
|
trans_taglist,
|
|
fixstart,
|
|
fixend,
|
|
)
|
|
epub.write_epub(f"{name_fix}", new_book, {})
|
|
|
|
def has_nest_child(self, element, trans_taglist):
|
|
if isinstance(element, Tag):
|
|
for child in element.children:
|
|
if child.name in trans_taglist:
|
|
return True
|
|
if self.has_nest_child(child, trans_taglist):
|
|
return True
|
|
return False
|
|
|
|
def filter_nest_list(self, p_list, trans_taglist):
|
|
filtered_list = [p for p in p_list if not self.has_nest_child(p, trans_taglist)]
|
|
return filtered_list
|
|
|
|
def process_item(
|
|
self,
|
|
item,
|
|
index,
|
|
p_to_save_len,
|
|
pbar,
|
|
new_book,
|
|
trans_taglist,
|
|
fixstart=None,
|
|
fixend=None,
|
|
):
|
|
if self.only_filelist != "" and not item.file_name in self.only_filelist.split(
|
|
","
|
|
):
|
|
return index
|
|
elif self.only_filelist == "" and item.file_name in self.exclude_filelist.split(
|
|
","
|
|
):
|
|
new_book.add_item(item)
|
|
return index
|
|
|
|
if not os.path.exists("log"):
|
|
os.makedirs("log")
|
|
|
|
soup = bs(item.content, "html.parser")
|
|
p_list = soup.findAll(trans_taglist)
|
|
|
|
p_list = self.filter_nest_list(p_list, trans_taglist)
|
|
|
|
if self.retranslate:
|
|
new_p_list = []
|
|
|
|
if fixstart is None or fixend is None:
|
|
return
|
|
|
|
start_append = False
|
|
for p in p_list:
|
|
text = p.get_text()
|
|
if fixstart in text or fixend in text or start_append:
|
|
start_append = True
|
|
new_p_list.append(p)
|
|
if fixend in text:
|
|
p_list = new_p_list
|
|
break
|
|
|
|
if self.allow_navigable_strings:
|
|
p_list.extend(soup.findAll(text=True))
|
|
|
|
send_num = self.accumulated_num
|
|
if send_num > 1:
|
|
with open("log/buglog.txt", "a") as f:
|
|
print(f"------------- {item.file_name} -------------", file=f)
|
|
|
|
print("------------------------------------------------------")
|
|
print(f"dealing {item.file_name} ...")
|
|
self.translate_paragraphs_acc(p_list, send_num)
|
|
else:
|
|
is_test_done = self.is_test and index > self.test_num
|
|
p_block = []
|
|
block_len = 0
|
|
for p in p_list:
|
|
if is_test_done:
|
|
break
|
|
if not p.text or self._is_special_text(p.text):
|
|
pbar.update(1)
|
|
continue
|
|
|
|
new_p = self._extract_paragraph(copy(p))
|
|
if self.single_translate and self.block_size > 0:
|
|
p_len = num_tokens_from_text(new_p.text)
|
|
block_len += p_len
|
|
if block_len > self.block_size:
|
|
index = self._process_combined_paragraph(
|
|
p_block, index, p_to_save_len
|
|
)
|
|
p_block = [p]
|
|
block_len = p_len
|
|
print()
|
|
else:
|
|
p_block.append(p)
|
|
else:
|
|
index = self._process_paragraph(p, new_p, index, p_to_save_len)
|
|
print()
|
|
|
|
# pbar.update(delta) not pbar.update(index)?
|
|
pbar.update(1)
|
|
|
|
if self.is_test and index >= self.test_num:
|
|
break
|
|
if self.single_translate and self.block_size > 0 and len(p_block) > 0:
|
|
index = self._process_combined_paragraph(p_block, index, p_to_save_len)
|
|
|
|
if soup:
|
|
item.content = soup.encode()
|
|
new_book.add_item(item)
|
|
|
|
return index
|
|
|
|
def batch_init_then_wait(self):
|
|
name, _ = os.path.splitext(self.epub_name)
|
|
if self.batch_flag or self.batch_use_flag:
|
|
self.translate_model.batch_init(name)
|
|
if self.batch_use_flag:
|
|
start_time = time.time()
|
|
while not self.translate_model.is_completed_batch():
|
|
print("Batch translation is not completed yet")
|
|
time.sleep(2)
|
|
if time.time() - start_time > 300: # 5 minutes
|
|
raise Exception("Batch translation timed out after 5 minutes")
|
|
|
|
def make_bilingual_book(self):
|
|
self.helper = EPUBBookLoaderHelper(
|
|
self.translate_model,
|
|
self.accumulated_num,
|
|
self.translation_style,
|
|
self.context_flag,
|
|
)
|
|
self.batch_init_then_wait()
|
|
new_book = self._make_new_book(self.origin_book)
|
|
all_items = list(self.origin_book.get_items())
|
|
trans_taglist = self.translate_tags.split(",")
|
|
all_p_length = sum(
|
|
(
|
|
0
|
|
if (
|
|
(i.get_type() != ITEM_DOCUMENT)
|
|
or (i.file_name in self.exclude_filelist.split(","))
|
|
or (
|
|
self.only_filelist
|
|
and i.file_name not in self.only_filelist.split(",")
|
|
)
|
|
)
|
|
else len(bs(i.content, "html.parser").findAll(trans_taglist))
|
|
)
|
|
for i in all_items
|
|
)
|
|
all_p_length += self.allow_navigable_strings * sum(
|
|
(
|
|
0
|
|
if (
|
|
(i.get_type() != ITEM_DOCUMENT)
|
|
or (i.file_name in self.exclude_filelist.split(","))
|
|
or (
|
|
self.only_filelist
|
|
and i.file_name not in self.only_filelist.split(",")
|
|
)
|
|
)
|
|
else len(bs(i.content, "html.parser").findAll(text=True))
|
|
)
|
|
for i in all_items
|
|
)
|
|
pbar = tqdm(total=self.test_num) if self.is_test else tqdm(total=all_p_length)
|
|
print()
|
|
index = 0
|
|
p_to_save_len = len(self.p_to_save)
|
|
try:
|
|
if self.retranslate:
|
|
self.retranslate_book(
|
|
index, p_to_save_len, pbar, trans_taglist, self.retranslate
|
|
)
|
|
exit(0)
|
|
# Add the things that don't need to be translated first, so that you can see the img after the interruption
|
|
for item in self.origin_book.get_items():
|
|
if item.get_type() != ITEM_DOCUMENT:
|
|
new_book.add_item(item)
|
|
|
|
for item in self.origin_book.get_items_of_type(ITEM_DOCUMENT):
|
|
index = self.process_item(
|
|
item, index, p_to_save_len, pbar, new_book, trans_taglist
|
|
)
|
|
|
|
if self.accumulated_num > 1:
|
|
name, _ = os.path.splitext(self.epub_name)
|
|
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
|
name, _ = os.path.splitext(self.epub_name)
|
|
if self.batch_flag:
|
|
self.translate_model.batch()
|
|
else:
|
|
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
|
if self.accumulated_num == 1:
|
|
pbar.close()
|
|
except (KeyboardInterrupt, Exception) as e:
|
|
print(e)
|
|
if self.accumulated_num == 1:
|
|
print("you can resume it next time")
|
|
self._save_progress()
|
|
self._save_temp_book()
|
|
sys.exit(0)
|
|
|
|
def load_state(self):
|
|
try:
|
|
with open(self.bin_path, "rb") as f:
|
|
self.p_to_save = pickle.load(f)
|
|
except Exception:
|
|
raise Exception("can not load resume file")
|
|
|
|
def _save_temp_book(self):
|
|
# TODO refactor this logic
|
|
origin_book_temp = epub.read_epub(self.epub_name)
|
|
new_temp_book = self._make_new_book(origin_book_temp)
|
|
p_to_save_len = len(self.p_to_save)
|
|
trans_taglist = self.translate_tags.split(",")
|
|
index = 0
|
|
try:
|
|
for item in origin_book_temp.get_items():
|
|
if item.get_type() == ITEM_DOCUMENT:
|
|
soup = bs(item.content, "html.parser")
|
|
p_list = soup.findAll(trans_taglist)
|
|
if self.allow_navigable_strings:
|
|
p_list.extend(soup.findAll(text=True))
|
|
for p in p_list:
|
|
if not p.text or self._is_special_text(p.text):
|
|
continue
|
|
# TODO banch of p to translate then combine
|
|
# PR welcome here
|
|
if index < p_to_save_len:
|
|
new_p = copy(p)
|
|
if type(p) == NavigableString:
|
|
new_p = self.p_to_save[index]
|
|
else:
|
|
new_p.string = self.p_to_save[index]
|
|
self.helper.insert_trans(
|
|
p,
|
|
new_p.string,
|
|
self.translation_style,
|
|
self.single_translate,
|
|
)
|
|
index += 1
|
|
else:
|
|
break
|
|
# for save temp book
|
|
if soup:
|
|
item.content = soup.encode()
|
|
new_temp_book.add_item(item)
|
|
name, _ = os.path.splitext(self.epub_name)
|
|
epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {})
|
|
except Exception as e:
|
|
# TODO handle it
|
|
print(e)
|
|
|
|
def _save_progress(self):
|
|
try:
|
|
with open(self.bin_path, "wb") as f:
|
|
pickle.dump(self.p_to_save, f)
|
|
except Exception:
|
|
raise Exception("can not save resume file")
|