Ninzore 40aaa9b090
Feat: combine multiple lines into one block, add a new option --block_size (#370)
* Feat: combine multiple lines into one block

bug:  some text is not replaced with translation

* Fix: some text are not translated

known issue:
1. sometime the original text show up
2. resume function not working

* Style: clean up code
2024-01-19 12:39:50 +08:00

585 lines
20 KiB
Python

import os
import pickle
import string
import sys
from copy import copy
from pathlib import Path
from bs4 import BeautifulSoup as bs
from bs4 import Tag
from bs4.element import NavigableString
from ebooklib import ITEM_DOCUMENT, epub
from rich import print
from tqdm import tqdm
from book_maker.utils import num_tokens_from_text, prompt_config_to_kwargs
from .base_loader import BaseBookLoader
from .helper import EPUBBookLoaderHelper, is_text_link, not_trans
class EPUBBookLoader(BaseBookLoader):
def __init__(
self,
epub_name,
model,
key,
resume,
language,
model_api_base=None,
is_test=False,
test_num=5,
prompt_config=None,
single_translate=False,
context_flag=False,
temperature=1.0,
):
self.epub_name = epub_name
self.new_epub = epub.EpubBook()
self.translate_model = model(
key,
language,
api_base=model_api_base,
context_flag=context_flag,
temperature=temperature,
**prompt_config_to_kwargs(prompt_config),
)
self.is_test = is_test
self.test_num = test_num
self.translate_tags = "p"
self.exclude_translate_tags = "sup"
self.allow_navigable_strings = False
self.accumulated_num = 1
self.translation_style = ""
self.context_flag = context_flag
self.helper = EPUBBookLoaderHelper(
self.translate_model,
self.accumulated_num,
self.translation_style,
self.context_flag,
)
self.retranslate = None
self.exclude_filelist = ""
self.only_filelist = ""
self.single_translate = single_translate
self.block_size = -1
# monkey patch for # 173
def _write_items_patch(obj):
for item in obj.book.get_items():
if isinstance(item, epub.EpubNcx):
obj.out.writestr(
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_ncx()
)
elif isinstance(item, epub.EpubNav):
obj.out.writestr(
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name),
obj._get_nav(item),
)
elif item.manifest:
obj.out.writestr(
"%s/%s" % (obj.book.FOLDER_NAME, item.file_name), item.content
)
else:
obj.out.writestr("%s" % item.file_name, item.content)
def _check_deprecated(obj):
pass
epub.EpubWriter._write_items = _write_items_patch
epub.EpubReader._check_deprecated = _check_deprecated
try:
self.origin_book = epub.read_epub(self.epub_name)
except Exception:
# tricky monkey patch for #71 if you don't know why please check the issue and ignore this
# when upstream change will TODO fix this
def _load_spine(obj):
spine = obj.container.find("{%s}%s" % (epub.NAMESPACES["OPF"], "spine"))
obj.book.spine = [
(t.get("idref"), t.get("linear", "yes")) for t in spine
]
obj.book.set_direction(spine.get("page-progression-direction", None))
epub.EpubReader._load_spine = _load_spine
self.origin_book = epub.read_epub(self.epub_name)
self.p_to_save = []
self.resume = resume
self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin"
if self.resume:
self.load_state()
@staticmethod
def _is_special_text(text):
return (
text.isdigit()
or text.isspace()
or is_text_link(text)
or all(char in string.punctuation for char in text)
)
def _make_new_book(self, book):
new_book = epub.EpubBook()
new_book.metadata = book.metadata
new_book.spine = book.spine
new_book.toc = book.toc
return new_book
def _extract_paragraph(self, p):
for p_exclude in self.exclude_translate_tags.split(","):
# for issue #280
if type(p) == NavigableString:
continue
for pt in p.find_all(p_exclude):
pt.extract()
return p
def _process_paragraph(self, p, new_p, index, p_to_save_len):
if self.resume and index < p_to_save_len:
p.string = self.p_to_save[index]
else:
if type(p) == NavigableString:
new_p = self.translate_model.translate(new_p.text)
self.p_to_save.append(new_p)
else:
new_p.string = self.translate_model.translate(new_p.text)
self.p_to_save.append(new_p.text)
self.helper.insert_trans(
p, new_p.string, self.translation_style, self.single_translate
)
index += 1
if index % 20 == 0:
self._save_progress()
return index
def _process_combined_paragraph(self, p_block, index, p_to_save_len):
text = []
for p in p_block:
if self.resume and index < p_to_save_len:
p.string = self.p_to_save[index]
else:
p_text = p.text.rstrip()
text.append(p_text)
if self.is_test and index >= self.test_num:
break
index += 1
if len(text) > 0:
translated_text = self.translate_model.translate("\n".join(text))
translated_text = translated_text.split("\n")
text_len = len(translated_text)
for i in range(text_len):
t = translated_text[i]
if i >= len(p_block):
p = p_block[-1]
else:
p = p_block[i]
if type(p) == NavigableString:
p = t
else:
p.string = t
self.helper.insert_trans(
p, p.string, self.translation_style, self.single_translate
)
self._save_progress()
return index
def translate_paragraphs_acc(self, p_list, send_num):
count = 0
wait_p_list = []
for i in range(len(p_list)):
p = p_list[i]
print(f"translating {i}/{len(p_list)}")
temp_p = copy(p)
for p_exclude in self.exclude_translate_tags.split(","):
# for issue #280
if type(p) == NavigableString:
continue
for pt in temp_p.find_all(p_exclude):
pt.extract()
if any(
[not p.text, self._is_special_text(temp_p.text), not_trans(temp_p.text)]
):
if i == len(p_list) - 1:
self.helper.deal_old(wait_p_list, self.single_translate)
continue
length = num_tokens_from_text(temp_p.text)
if length > send_num:
self.helper.deal_new(p, wait_p_list, self.single_translate)
continue
if i == len(p_list) - 1:
if count + length < send_num:
wait_p_list.append(p)
self.helper.deal_old(wait_p_list, self.single_translate)
else:
self.helper.deal_new(p, wait_p_list, self.single_translate)
break
if count + length < send_num:
count += length
wait_p_list.append(p)
else:
self.helper.deal_old(wait_p_list, self.single_translate)
wait_p_list.append(p)
count = length
def get_item(self, book, name):
for item in book.get_items():
if item.file_name == name:
return item
def find_items_containing_string(self, book, search_string):
matching_items = []
for item in book.get_items_of_type(ITEM_DOCUMENT):
content = item.get_content().decode("utf-8")
if search_string in content:
matching_items.append(item)
return matching_items
def retranslate_book(self, index, p_to_save_len, pbar, trans_taglist, retranslate):
complete_book_name = retranslate[0]
fixname = retranslate[1]
fixstart = retranslate[2]
fixend = retranslate[3]
if fixend == "":
fixend = fixstart
name_fix = complete_book_name
complete_book = epub.read_epub(complete_book_name)
if fixname == "":
fixname = self.find_items_containing_string(complete_book, fixstart)[
0
].file_name
print(f"auto find fixname: {fixname}")
new_book = self._make_new_book(complete_book)
complete_item = self.get_item(complete_book, fixname)
if complete_item is None:
return
ori_item = self.get_item(self.origin_book, fixname)
if ori_item is None:
return
soup_complete = bs(complete_item.content, "html.parser")
soup_ori = bs(ori_item.content, "html.parser")
p_list_complete = soup_complete.findAll(trans_taglist)
p_list_ori = soup_ori.findAll(trans_taglist)
target = None
tagl = []
# extract from range
find_end = False
find_start = False
for tag in p_list_complete:
if find_end:
tagl.append(tag)
break
if fixend in tag.text:
find_end = True
if fixstart in tag.text:
find_start = True
if find_start:
if not target:
target = tag.previous_sibling
tagl.append(tag)
for t in tagl:
t.extract()
flag = False
extract_p_list_ori = []
for p in p_list_ori:
if fixstart in p.text:
flag = True
if flag:
extract_p_list_ori.append(p)
if fixend in p.text:
break
for t in extract_p_list_ori:
if target:
target.insert_after(t)
target = t
for item in complete_book.get_items():
if item.file_name != fixname:
new_book.add_item(item)
if soup_complete:
complete_item.content = soup_complete.encode()
index = self.process_item(
complete_item,
index,
p_to_save_len,
pbar,
new_book,
trans_taglist,
fixstart,
fixend,
)
epub.write_epub(f"{name_fix}", new_book, {})
def has_nest_child(self, element, trans_taglist):
if isinstance(element, Tag):
for child in element.children:
if child.name in trans_taglist:
return True
if self.has_nest_child(child, trans_taglist):
return True
return False
def filter_nest_list(self, p_list, trans_taglist):
filtered_list = [p for p in p_list if not self.has_nest_child(p, trans_taglist)]
return filtered_list
def process_item(
self,
item,
index,
p_to_save_len,
pbar,
new_book,
trans_taglist,
fixstart=None,
fixend=None,
):
if self.only_filelist != "" and not item.file_name in self.only_filelist.split(
","
):
return index
elif self.only_filelist == "" and item.file_name in self.exclude_filelist.split(
","
):
new_book.add_item(item)
return index
if not os.path.exists("log"):
os.makedirs("log")
soup = bs(item.content, "html.parser")
p_list = soup.findAll(trans_taglist)
p_list = self.filter_nest_list(p_list, trans_taglist)
if self.retranslate:
new_p_list = []
if fixstart is None or fixend is None:
return
start_append = False
for p in p_list:
text = p.get_text()
if fixstart in text or fixend in text or start_append:
start_append = True
new_p_list.append(p)
if fixend in text:
p_list = new_p_list
break
if self.allow_navigable_strings:
p_list.extend(soup.findAll(text=True))
send_num = self.accumulated_num
if send_num > 1:
with open("log/buglog.txt", "a") as f:
print(f"------------- {item.file_name} -------------", file=f)
print("------------------------------------------------------")
print(f"dealing {item.file_name} ...")
self.translate_paragraphs_acc(p_list, send_num)
else:
is_test_done = self.is_test and index > self.test_num
p_block = []
block_len = 0
for p in p_list:
if is_test_done:
break
if not p.text or self._is_special_text(p.text):
pbar.update(1)
continue
new_p = self._extract_paragraph(copy(p))
if self.single_translate and self.block_size > 0:
p_len = num_tokens_from_text(new_p.text)
block_len += p_len
if block_len > self.block_size:
index = self._process_combined_paragraph(
p_block, index, p_to_save_len
)
p_block = [p]
block_len = p_len
print()
else:
p_block.append(p)
else:
index = self._process_paragraph(p, new_p, index, p_to_save_len)
print()
# pbar.update(delta) not pbar.update(index)?
pbar.update(1)
if self.is_test and index >= self.test_num:
break
if self.single_translate and self.block_size > 0 and len(p_block) > 0:
index = self._process_combined_paragraph(p_block, index, p_to_save_len)
if soup:
item.content = soup.encode()
new_book.add_item(item)
return index
def make_bilingual_book(self):
self.helper = EPUBBookLoaderHelper(
self.translate_model,
self.accumulated_num,
self.translation_style,
self.context_flag,
)
new_book = self._make_new_book(self.origin_book)
all_items = list(self.origin_book.get_items())
trans_taglist = self.translate_tags.split(",")
all_p_length = sum(
0
if (
(i.get_type() != ITEM_DOCUMENT)
or (i.file_name in self.exclude_filelist.split(","))
or (
self.only_filelist
and i.file_name not in self.only_filelist.split(",")
)
)
else len(bs(i.content, "html.parser").findAll(trans_taglist))
for i in all_items
)
all_p_length += self.allow_navigable_strings * sum(
0
if (
(i.get_type() != ITEM_DOCUMENT)
or (i.file_name in self.exclude_filelist.split(","))
or (
self.only_filelist
and i.file_name not in self.only_filelist.split(",")
)
)
else len(bs(i.content, "html.parser").findAll(text=True))
for i in all_items
)
pbar = tqdm(total=self.test_num) if self.is_test else tqdm(total=all_p_length)
print()
index = 0
p_to_save_len = len(self.p_to_save)
try:
if self.retranslate:
self.retranslate_book(
index, p_to_save_len, pbar, trans_taglist, self.retranslate
)
exit(0)
# Add the things that don't need to be translated first, so that you can see the img after the interruption
for item in self.origin_book.get_items():
if item.get_type() != ITEM_DOCUMENT:
new_book.add_item(item)
for item in self.origin_book.get_items_of_type(ITEM_DOCUMENT):
index = self.process_item(
item, index, p_to_save_len, pbar, new_book, trans_taglist
)
if self.accumulated_num > 1:
name, _ = os.path.splitext(self.epub_name)
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
name, _ = os.path.splitext(self.epub_name)
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
if self.accumulated_num == 1:
pbar.close()
except (KeyboardInterrupt, Exception) as e:
print(e)
if self.accumulated_num == 1:
print("you can resume it next time")
self._save_progress()
self._save_temp_book()
sys.exit(0)
def load_state(self):
try:
with open(self.bin_path, "rb") as f:
self.p_to_save = pickle.load(f)
except Exception:
raise Exception("can not load resume file")
def _save_temp_book(self):
# TODO refactor this logic
origin_book_temp = epub.read_epub(self.epub_name)
new_temp_book = self._make_new_book(origin_book_temp)
p_to_save_len = len(self.p_to_save)
trans_taglist = self.translate_tags.split(",")
index = 0
try:
for item in origin_book_temp.get_items():
if item.get_type() == ITEM_DOCUMENT:
soup = bs(item.content, "html.parser")
p_list = soup.findAll(trans_taglist)
if self.allow_navigable_strings:
p_list.extend(soup.findAll(text=True))
for p in p_list:
if not p.text or self._is_special_text(p.text):
continue
# TODO banch of p to translate then combine
# PR welcome here
if index < p_to_save_len:
new_p = copy(p)
if type(p) == NavigableString:
new_p = self.p_to_save[index]
else:
new_p.string = self.p_to_save[index]
self.helper.insert_trans(
p,
new_p.string,
self.translation_style,
self.single_translate,
)
index += 1
else:
break
# for save temp book
if soup:
item.content = soup.encode()
new_temp_book.add_item(item)
name, _ = os.path.splitext(self.epub_name)
epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {})
except Exception as e:
# TODO handle it
print(e)
def _save_progress(self):
try:
with open(self.bin_path, "wb") as f:
pickle.dump(self.p_to_save, f)
except Exception:
raise Exception("can not save resume file")