import os import pickle import string import sys import time from copy import copy from pathlib import Path from bs4 import BeautifulSoup as bs from bs4 import Tag from bs4.element import NavigableString from ebooklib import ITEM_DOCUMENT, epub from rich import print from tqdm import tqdm from book_maker.utils import num_tokens_from_text, prompt_config_to_kwargs from .base_loader import BaseBookLoader from .helper import EPUBBookLoaderHelper, is_text_link, not_trans class EPUBBookLoader(BaseBookLoader): def __init__( self, epub_name, model, key, resume, language, model_api_base=None, is_test=False, test_num=5, prompt_config=None, single_translate=False, context_flag=False, context_paragraph_limit=0, temperature=1.0, ): self.epub_name = epub_name self.new_epub = epub.EpubBook() self.translate_model = model( key, language, api_base=model_api_base, context_flag=context_flag, context_paragraph_limit=context_paragraph_limit, temperature=temperature, **prompt_config_to_kwargs(prompt_config), ) self.is_test = is_test self.test_num = test_num self.translate_tags = "p" self.exclude_translate_tags = "sup" self.allow_navigable_strings = False self.accumulated_num = 1 self.translation_style = "" self.context_flag = context_flag self.helper = EPUBBookLoaderHelper( self.translate_model, self.accumulated_num, self.translation_style, self.context_flag, ) self.retranslate = None self.exclude_filelist = "" self.only_filelist = "" self.single_translate = single_translate self.block_size = -1 self.batch_use_flag = False self.batch_flag = False # monkey patch for # 173 def _write_items_patch(obj): for item in obj.book.get_items(): if isinstance(item, epub.EpubNcx): obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_ncx() ) elif isinstance(item, epub.EpubNav): obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_nav(item), ) elif item.manifest: obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), item.content ) else: obj.out.writestr("%s" % item.file_name, item.content) def _check_deprecated(obj): pass epub.EpubWriter._write_items = _write_items_patch epub.EpubReader._check_deprecated = _check_deprecated try: self.origin_book = epub.read_epub(self.epub_name) except Exception: # tricky monkey patch for #71 if you don't know why please check the issue and ignore this # when upstream change will TODO fix this def _load_spine(obj): spine = obj.container.find("{%s}%s" % (epub.NAMESPACES["OPF"], "spine")) obj.book.spine = [ (t.get("idref"), t.get("linear", "yes")) for t in spine ] obj.book.set_direction(spine.get("page-progression-direction", None)) epub.EpubReader._load_spine = _load_spine self.origin_book = epub.read_epub(self.epub_name) self.p_to_save = [] self.resume = resume self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin" if self.resume: self.load_state() @staticmethod def _is_special_text(text): return ( text.isdigit() or text.isspace() or is_text_link(text) or all(char in string.punctuation for char in text) ) def _make_new_book(self, book): new_book = epub.EpubBook() new_book.metadata = book.metadata new_book.spine = book.spine new_book.toc = book.toc return new_book def _extract_paragraph(self, p): for p_exclude in self.exclude_translate_tags.split(","): # for issue #280 if type(p) == NavigableString: continue for pt in p.find_all(p_exclude): pt.extract() return p def _process_paragraph(self, p, new_p, index, p_to_save_len): if self.resume and index < p_to_save_len: p.string = self.p_to_save[index] else: t_text = "" if self.batch_flag: self.translate_model.add_to_batch_translate_queue(index, new_p.text) elif self.batch_use_flag: t_text = self.translate_model.batch_translate(index) else: t_text = self.translate_model.translate(new_p.text) if type(p) == NavigableString: new_p = t_text self.p_to_save.append(new_p) else: new_p.string = t_text self.p_to_save.append(new_p.text) self.helper.insert_trans( p, new_p.string, self.translation_style, self.single_translate ) index += 1 if index % 20 == 0: self._save_progress() return index def _process_combined_paragraph(self, p_block, index, p_to_save_len): text = [] for p in p_block: if self.resume and index < p_to_save_len: p.string = self.p_to_save[index] else: p_text = p.text.rstrip() text.append(p_text) if self.is_test and index >= self.test_num: break index += 1 if len(text) > 0: translated_text = self.translate_model.translate("\n".join(text)) translated_text = translated_text.split("\n") text_len = len(translated_text) for i in range(text_len): t = translated_text[i] if i >= len(p_block): p = p_block[-1] else: p = p_block[i] if type(p) == NavigableString: p = t else: p.string = t self.helper.insert_trans( p, p.string, self.translation_style, self.single_translate ) self._save_progress() return index def translate_paragraphs_acc(self, p_list, send_num): count = 0 wait_p_list = [] for i in range(len(p_list)): p = p_list[i] print(f"translating {i}/{len(p_list)}") temp_p = copy(p) for p_exclude in self.exclude_translate_tags.split(","): # for issue #280 if type(p) == NavigableString: continue for pt in temp_p.find_all(p_exclude): pt.extract() if any( [not p.text, self._is_special_text(temp_p.text), not_trans(temp_p.text)] ): if i == len(p_list) - 1: self.helper.deal_old(wait_p_list, self.single_translate) continue length = num_tokens_from_text(temp_p.text) if length > send_num: self.helper.deal_new(p, wait_p_list, self.single_translate) continue if i == len(p_list) - 1: if count + length < send_num: wait_p_list.append(p) self.helper.deal_old(wait_p_list, self.single_translate) else: self.helper.deal_new(p, wait_p_list, self.single_translate) break if count + length < send_num: count += length wait_p_list.append(p) else: self.helper.deal_old(wait_p_list, self.single_translate) wait_p_list.append(p) count = length def get_item(self, book, name): for item in book.get_items(): if item.file_name == name: return item def find_items_containing_string(self, book, search_string): matching_items = [] for item in book.get_items_of_type(ITEM_DOCUMENT): content = item.get_content().decode("utf-8") if search_string in content: matching_items.append(item) return matching_items def retranslate_book(self, index, p_to_save_len, pbar, trans_taglist, retranslate): complete_book_name = retranslate[0] fixname = retranslate[1] fixstart = retranslate[2] fixend = retranslate[3] if fixend == "": fixend = fixstart name_fix = complete_book_name complete_book = epub.read_epub(complete_book_name) if fixname == "": fixname = self.find_items_containing_string(complete_book, fixstart)[ 0 ].file_name print(f"auto find fixname: {fixname}") new_book = self._make_new_book(complete_book) complete_item = self.get_item(complete_book, fixname) if complete_item is None: return ori_item = self.get_item(self.origin_book, fixname) if ori_item is None: return soup_complete = bs(complete_item.content, "html.parser") soup_ori = bs(ori_item.content, "html.parser") p_list_complete = soup_complete.findAll(trans_taglist) p_list_ori = soup_ori.findAll(trans_taglist) target = None tagl = [] # extract from range find_end = False find_start = False for tag in p_list_complete: if find_end: tagl.append(tag) break if fixend in tag.text: find_end = True if fixstart in tag.text: find_start = True if find_start: if not target: target = tag.previous_sibling tagl.append(tag) for t in tagl: t.extract() flag = False extract_p_list_ori = [] for p in p_list_ori: if fixstart in p.text: flag = True if flag: extract_p_list_ori.append(p) if fixend in p.text: break for t in extract_p_list_ori: if target: target.insert_after(t) target = t for item in complete_book.get_items(): if item.file_name != fixname: new_book.add_item(item) if soup_complete: complete_item.content = soup_complete.encode() index = self.process_item( complete_item, index, p_to_save_len, pbar, new_book, trans_taglist, fixstart, fixend, ) epub.write_epub(f"{name_fix}", new_book, {}) def has_nest_child(self, element, trans_taglist): if isinstance(element, Tag): for child in element.children: if child.name in trans_taglist: return True if self.has_nest_child(child, trans_taglist): return True return False def filter_nest_list(self, p_list, trans_taglist): filtered_list = [p for p in p_list if not self.has_nest_child(p, trans_taglist)] return filtered_list def process_item( self, item, index, p_to_save_len, pbar, new_book, trans_taglist, fixstart=None, fixend=None, ): if self.only_filelist != "" and not item.file_name in self.only_filelist.split( "," ): return index elif self.only_filelist == "" and item.file_name in self.exclude_filelist.split( "," ): new_book.add_item(item) return index if not os.path.exists("log"): os.makedirs("log") soup = bs(item.content, "html.parser") p_list = soup.findAll(trans_taglist) p_list = self.filter_nest_list(p_list, trans_taglist) if self.retranslate: new_p_list = [] if fixstart is None or fixend is None: return start_append = False for p in p_list: text = p.get_text() if fixstart in text or fixend in text or start_append: start_append = True new_p_list.append(p) if fixend in text: p_list = new_p_list break if self.allow_navigable_strings: p_list.extend(soup.findAll(text=True)) send_num = self.accumulated_num if send_num > 1: with open("log/buglog.txt", "a") as f: print(f"------------- {item.file_name} -------------", file=f) print("------------------------------------------------------") print(f"dealing {item.file_name} ...") self.translate_paragraphs_acc(p_list, send_num) else: is_test_done = self.is_test and index > self.test_num p_block = [] block_len = 0 for p in p_list: if is_test_done: break if not p.text or self._is_special_text(p.text): pbar.update(1) continue new_p = self._extract_paragraph(copy(p)) if self.single_translate and self.block_size > 0: p_len = num_tokens_from_text(new_p.text) block_len += p_len if block_len > self.block_size: index = self._process_combined_paragraph( p_block, index, p_to_save_len ) p_block = [p] block_len = p_len print() else: p_block.append(p) else: index = self._process_paragraph(p, new_p, index, p_to_save_len) print() # pbar.update(delta) not pbar.update(index)? pbar.update(1) if self.is_test and index >= self.test_num: break if self.single_translate and self.block_size > 0 and len(p_block) > 0: index = self._process_combined_paragraph(p_block, index, p_to_save_len) if soup: item.content = soup.encode() new_book.add_item(item) return index def batch_init_then_wait(self): name, _ = os.path.splitext(self.epub_name) if self.batch_flag or self.batch_use_flag: self.translate_model.batch_init(name) if self.batch_use_flag: start_time = time.time() while not self.translate_model.is_completed_batch(): print("Batch translation is not completed yet") time.sleep(2) if time.time() - start_time > 300: # 5 minutes raise Exception("Batch translation timed out after 5 minutes") def make_bilingual_book(self): self.helper = EPUBBookLoaderHelper( self.translate_model, self.accumulated_num, self.translation_style, self.context_flag, ) self.batch_init_then_wait() new_book = self._make_new_book(self.origin_book) all_items = list(self.origin_book.get_items()) trans_taglist = self.translate_tags.split(",") all_p_length = sum( ( 0 if ( (i.get_type() != ITEM_DOCUMENT) or (i.file_name in self.exclude_filelist.split(",")) or ( self.only_filelist and i.file_name not in self.only_filelist.split(",") ) ) else len(bs(i.content, "html.parser").findAll(trans_taglist)) ) for i in all_items ) all_p_length += self.allow_navigable_strings * sum( ( 0 if ( (i.get_type() != ITEM_DOCUMENT) or (i.file_name in self.exclude_filelist.split(",")) or ( self.only_filelist and i.file_name not in self.only_filelist.split(",") ) ) else len(bs(i.content, "html.parser").findAll(text=True)) ) for i in all_items ) pbar = tqdm(total=self.test_num) if self.is_test else tqdm(total=all_p_length) print() index = 0 p_to_save_len = len(self.p_to_save) try: if self.retranslate: self.retranslate_book( index, p_to_save_len, pbar, trans_taglist, self.retranslate ) exit(0) # Add the things that don't need to be translated first, so that you can see the img after the interruption for item in self.origin_book.get_items(): if item.get_type() != ITEM_DOCUMENT: new_book.add_item(item) for item in self.origin_book.get_items_of_type(ITEM_DOCUMENT): index = self.process_item( item, index, p_to_save_len, pbar, new_book, trans_taglist ) if self.accumulated_num > 1: name, _ = os.path.splitext(self.epub_name) epub.write_epub(f"{name}_bilingual.epub", new_book, {}) name, _ = os.path.splitext(self.epub_name) if self.batch_flag: self.translate_model.batch() else: epub.write_epub(f"{name}_bilingual.epub", new_book, {}) if self.accumulated_num == 1: pbar.close() except (KeyboardInterrupt, Exception) as e: print(e) if self.accumulated_num == 1: print("you can resume it next time") self._save_progress() self._save_temp_book() sys.exit(0) def load_state(self): try: with open(self.bin_path, "rb") as f: self.p_to_save = pickle.load(f) except Exception: raise Exception("can not load resume file") def _save_temp_book(self): # TODO refactor this logic origin_book_temp = epub.read_epub(self.epub_name) new_temp_book = self._make_new_book(origin_book_temp) p_to_save_len = len(self.p_to_save) trans_taglist = self.translate_tags.split(",") index = 0 try: for item in origin_book_temp.get_items(): if item.get_type() == ITEM_DOCUMENT: soup = bs(item.content, "html.parser") p_list = soup.findAll(trans_taglist) if self.allow_navigable_strings: p_list.extend(soup.findAll(text=True)) for p in p_list: if not p.text or self._is_special_text(p.text): continue # TODO banch of p to translate then combine # PR welcome here if index < p_to_save_len: new_p = copy(p) if type(p) == NavigableString: new_p = self.p_to_save[index] else: new_p.string = self.p_to_save[index] self.helper.insert_trans( p, new_p.string, self.translation_style, self.single_translate, ) index += 1 else: break # for save temp book if soup: item.content = soup.encode() new_temp_book.add_item(item) name, _ = os.path.splitext(self.epub_name) epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {}) except Exception as e: # TODO handle it print(e) def _save_progress(self): try: with open(self.bin_path, "wb") as f: pickle.dump(self.p_to_save, f) except Exception: raise Exception("can not save resume file")