import os import pickle import string import sys from copy import copy from pathlib import Path from bs4 import BeautifulSoup as bs, Tag from bs4.element import NavigableString from ebooklib import ITEM_DOCUMENT, epub from rich import print from tqdm import tqdm from book_maker.utils import num_tokens_from_text, prompt_config_to_kwargs from .base_loader import BaseBookLoader from .helper import EPUBBookLoaderHelper, not_trans, is_text_link class EPUBBookLoader(BaseBookLoader): def __init__( self, epub_name, model, key, resume, language, model_api_base=None, is_test=False, test_num=5, prompt_config=None, single_translate=False, context_flag=False, temperature=1.0, ): self.epub_name = epub_name self.new_epub = epub.EpubBook() self.translate_model = model( key, language, api_base=model_api_base, context_flag=context_flag, temperature=temperature, **prompt_config_to_kwargs(prompt_config), ) self.is_test = is_test self.test_num = test_num self.translate_tags = "p" self.exclude_translate_tags = "sup" self.allow_navigable_strings = False self.accumulated_num = 1 self.translation_style = "" self.context_flag = context_flag self.helper = EPUBBookLoaderHelper( self.translate_model, self.accumulated_num, self.translation_style, self.context_flag, ) self.retranslate = None self.exclude_filelist = "" self.only_filelist = "" self.single_translate = single_translate # monkey patch for # 173 def _write_items_patch(obj): for item in obj.book.get_items(): if isinstance(item, epub.EpubNcx): obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_ncx() ) elif isinstance(item, epub.EpubNav): obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), obj._get_nav(item), ) elif item.manifest: obj.out.writestr( "%s/%s" % (obj.book.FOLDER_NAME, item.file_name), item.content ) else: obj.out.writestr("%s" % item.file_name, item.content) epub.EpubWriter._write_items = _write_items_patch try: self.origin_book = epub.read_epub(self.epub_name) except Exception: # tricky monkey patch for #71 if you don't know why please check the issue and ignore this # when upstream change will TODO fix this def _load_spine(obj): spine = obj.container.find("{%s}%s" % (epub.NAMESPACES["OPF"], "spine")) obj.book.spine = [ (t.get("idref"), t.get("linear", "yes")) for t in spine ] obj.book.set_direction(spine.get("page-progression-direction", None)) epub.EpubReader._load_spine = _load_spine self.origin_book = epub.read_epub(self.epub_name) self.p_to_save = [] self.resume = resume self.bin_path = f"{Path(epub_name).parent}/.{Path(epub_name).stem}.temp.bin" if self.resume: self.load_state() @staticmethod def _is_special_text(text): return ( text.isdigit() or text.isspace() or is_text_link(text) or all(char in string.punctuation for char in text) ) def _make_new_book(self, book): new_book = epub.EpubBook() new_book.metadata = book.metadata new_book.spine = book.spine new_book.toc = book.toc return new_book def _process_paragraph(self, p, index, p_to_save_len): if not p.text or self._is_special_text(p.text): return index new_p = copy(p) for p_exclude in self.exclude_translate_tags.split(","): # for issue #280 if type(p) == NavigableString: continue for pt in new_p.find_all(p_exclude): pt.extract() if self.resume and index < p_to_save_len: new_p.string = self.p_to_save[index] else: if type(p) == NavigableString: new_p = self.translate_model.translate(new_p.text) self.p_to_save.append(new_p) else: new_p.string = self.translate_model.translate(new_p.text) self.p_to_save.append(new_p.text) self.helper.insert_trans( p, new_p.string, self.translation_style, self.single_translate ) index += 1 if index % 20 == 0: self._save_progress() return index def translate_paragraphs_acc(self, p_list, send_num): count = 0 wait_p_list = [] for i in range(len(p_list)): p = p_list[i] temp_p = copy(p) for p_exclude in self.exclude_translate_tags.split(","): # for issue #280 if type(p) == NavigableString: continue for pt in temp_p.find_all(p_exclude): pt.extract() if any( [not p.text, self._is_special_text(temp_p.text), not_trans(temp_p.text)] ): if i == len(p_list) - 1: self.helper.deal_old(wait_p_list, self.single_translate) continue length = num_tokens_from_text(temp_p.text) if length > send_num: self.helper.deal_new(p, wait_p_list, self.single_translate) continue if i == len(p_list) - 1: if count + length < send_num: wait_p_list.append(p) self.helper.deal_old(wait_p_list, self.single_translate) else: self.helper.deal_new(p, wait_p_list, self.single_translate) break if count + length < send_num: count += length wait_p_list.append(p) # This is because the more paragraphs, the easier it is possible to translate different numbers of paragraphs, maybe you should find better values than 15 and 2 # if len(wait_p_list) > 15 and count > send_num / 2: # self.helper.deal_old(wait_p_list) # count = 0 else: self.helper.deal_old(wait_p_list, self.single_translate) wait_p_list.append(p) count = length def get_item(self, book, name): for item in book.get_items(): if item.file_name == name: return item def find_items_containing_string(self, book, search_string): matching_items = [] for item in book.get_items_of_type(ITEM_DOCUMENT): content = item.get_content().decode("utf-8") if search_string in content: matching_items.append(item) return matching_items def retranslate_book(self, index, p_to_save_len, pbar, trans_taglist, retranslate): complete_book_name = retranslate[0] fixname = retranslate[1] fixstart = retranslate[2] fixend = retranslate[3] if fixend == "": fixend = fixstart name_fix = complete_book_name complete_book = epub.read_epub(complete_book_name) if fixname == "": fixname = self.find_items_containing_string(complete_book, fixstart)[ 0 ].file_name print(f"auto find fixname: {fixname}") new_book = self._make_new_book(complete_book) complete_item = self.get_item(complete_book, fixname) if complete_item is None: return ori_item = self.get_item(self.origin_book, fixname) if ori_item is None: return soup_complete = bs(complete_item.content, "html.parser") soup_ori = bs(ori_item.content, "html.parser") p_list_complete = soup_complete.findAll(trans_taglist) p_list_ori = soup_ori.findAll(trans_taglist) target = None tagl = [] # extract from range find_end = False find_start = False for tag in p_list_complete: if find_end: tagl.append(tag) break if fixend in tag.text: find_end = True if fixstart in tag.text: find_start = True if find_start: if not target: target = tag.previous_sibling tagl.append(tag) for t in tagl: t.extract() flag = False extract_p_list_ori = [] for p in p_list_ori: if fixstart in p.text: flag = True if flag: extract_p_list_ori.append(p) if fixend in p.text: break for t in extract_p_list_ori: if target: target.insert_after(t) target = t for item in complete_book.get_items(): if item.file_name != fixname: new_book.add_item(item) if soup_complete: complete_item.content = soup_complete.encode() index = self.process_item( complete_item, index, p_to_save_len, pbar, new_book, trans_taglist, fixstart, fixend, ) epub.write_epub(f"{name_fix}", new_book, {}) def has_nest_child(self, element, trans_taglist): if isinstance(element, Tag): for child in element.children: if child.name in trans_taglist: return True if self.has_nest_child(child, trans_taglist): return True return False def filter_nest_list(self, p_list, trans_taglist): filtered_list = [p for p in p_list if not self.has_nest_child(p, trans_taglist)] return filtered_list def process_item( self, item, index, p_to_save_len, pbar, new_book, trans_taglist, fixstart=None, fixend=None, ): if self.only_filelist != "" and not item.file_name in self.only_filelist.split( "," ): return index elif self.only_filelist == "" and item.file_name in self.exclude_filelist.split( "," ): new_book.add_item(item) return index if not os.path.exists("log"): os.makedirs("log") soup = bs(item.content, "html.parser") p_list = soup.findAll(trans_taglist) p_list = self.filter_nest_list(p_list, trans_taglist) if self.retranslate: new_p_list = [] if fixstart is None or fixend is None: return start_append = False for p in p_list: text = p.get_text() if fixstart in text or fixend in text or start_append: start_append = True new_p_list.append(p) if fixend in text: p_list = new_p_list break if self.allow_navigable_strings: p_list.extend(soup.findAll(text=True)) send_num = self.accumulated_num if send_num > 1: with open("log/buglog.txt", "a") as f: print(f"------------- {item.file_name} -------------", file=f) print("------------------------------------------------------") print(f"dealing {item.file_name} ...") self.translate_paragraphs_acc(p_list, send_num) else: is_test_done = self.is_test and index > self.test_num for p in p_list: if is_test_done: break index = self._process_paragraph(p, index, p_to_save_len) # pbar.update(delta) not pbar.update(index)? pbar.update(1) print() if self.is_test and index >= self.test_num: break if soup: item.content = soup.encode() new_book.add_item(item) return index def make_bilingual_book(self): self.helper = EPUBBookLoaderHelper( self.translate_model, self.accumulated_num, self.translation_style, self.context_flag, ) new_book = self._make_new_book(self.origin_book) all_items = list(self.origin_book.get_items()) trans_taglist = self.translate_tags.split(",") all_p_length = sum( 0 if ( (i.get_type() != ITEM_DOCUMENT) or (i.file_name in self.exclude_filelist.split(",")) or ( self.only_filelist and i.file_name not in self.only_filelist.split(",") ) ) else len(bs(i.content, "html.parser").findAll(trans_taglist)) for i in all_items ) all_p_length += self.allow_navigable_strings * sum( 0 if ( (i.get_type() != ITEM_DOCUMENT) or (i.file_name in self.exclude_filelist.split(",")) or ( self.only_filelist and i.file_name not in self.only_filelist.split(",") ) ) else len(bs(i.content, "html.parser").findAll(text=True)) for i in all_items ) pbar = tqdm(total=self.test_num) if self.is_test else tqdm(total=all_p_length) print() index = 0 p_to_save_len = len(self.p_to_save) try: if self.retranslate: self.retranslate_book( index, p_to_save_len, pbar, trans_taglist, self.retranslate ) exit(0) # Add the things that don't need to be translated first, so that you can see the img after the interruption for item in self.origin_book.get_items(): if item.get_type() != ITEM_DOCUMENT: new_book.add_item(item) for item in self.origin_book.get_items_of_type(ITEM_DOCUMENT): index = self.process_item( item, index, p_to_save_len, pbar, new_book, trans_taglist ) if self.accumulated_num > 1: name, _ = os.path.splitext(self.epub_name) epub.write_epub(f"{name}_bilingual.epub", new_book, {}) name, _ = os.path.splitext(self.epub_name) epub.write_epub(f"{name}_bilingual.epub", new_book, {}) if self.accumulated_num == 1: pbar.close() except (KeyboardInterrupt, Exception) as e: print(e) if self.accumulated_num == 1: print("you can resume it next time") self._save_progress() self._save_temp_book() sys.exit(0) def load_state(self): try: with open(self.bin_path, "rb") as f: self.p_to_save = pickle.load(f) except Exception: raise Exception("can not load resume file") def _save_temp_book(self): # TODO refactor this logic origin_book_temp = epub.read_epub(self.epub_name) new_temp_book = self._make_new_book(origin_book_temp) p_to_save_len = len(self.p_to_save) trans_taglist = self.translate_tags.split(",") index = 0 try: for item in origin_book_temp.get_items(): if ( item.get_type() == ITEM_DOCUMENT and (item.file_name not in self.exclude_filelist.split(",")) and ( item.file_name in self.only_filelist.split(",") or self.only_filelist != "" ) ): soup = bs(item.content, "html.parser") p_list = soup.findAll(trans_taglist) if self.allow_navigable_strings: p_list.extend(soup.findAll(text=True)) for p in p_list: if not p.text or self._is_special_text(p.text): continue # TODO banch of p to translate then combine # PR welcome here if index < p_to_save_len: new_p = copy(p) if type(p) == NavigableString: new_p = self.p_to_save[index] else: new_p.string = self.p_to_save[index] self.helper.insert_trans( p, new_p.string, self.translation_style, self.single_translate, ) index += 1 else: break # for save temp book if soup: item.content = soup.encode() new_temp_book.add_item(item) name, _ = os.path.splitext(self.epub_name) epub.write_epub(f"{name}_bilingual_temp.epub", new_temp_book, {}) except Exception as e: # TODO handle it print(e) def _save_progress(self): try: with open(self.bin_path, "wb") as f: pickle.dump(self.p_to_save, f) except Exception: raise Exception("can not save resume file")