mirror of
https://github.com/yihong0618/bilingual_book_maker.git
synced 2025-06-02 09:30:24 +00:00

* feat(translator): 优化 Gemini 翻译模板并支持新模型 更新翻译提示模板以提供更专业的中文翻译输出,主要变更包括: - 添加新的 Gemini 2.0 flash 实验模型支持 - 修改翻译提示模板,采用三步翻译流程提升翻译质量 - 增加标签提取功能,只返回最终优化后的翻译内容 - 移除对 {language} 参数的强制要求检查 优化后的翻译流程包含初次翻译、反思改进和最终润色三个步骤, 显著提升翻译结果的准确性和可读性。 * support md file type --------- Co-authored-by: zhonghua.zhu <zhonghua.zhu@riveretech.com>
177 lines
6.1 KiB
Python
177 lines
6.1 KiB
Python
import sys
|
|
from pathlib import Path
|
|
|
|
from book_maker.utils import prompt_config_to_kwargs
|
|
|
|
from .base_loader import BaseBookLoader
|
|
|
|
|
|
class MarkdownBookLoader(BaseBookLoader):
|
|
def __init__(
|
|
self,
|
|
md_name,
|
|
model,
|
|
key,
|
|
resume,
|
|
language,
|
|
model_api_base=None,
|
|
is_test=False,
|
|
test_num=5,
|
|
prompt_config=None,
|
|
single_translate=False,
|
|
context_flag=False,
|
|
context_paragraph_limit=0,
|
|
temperature=1.0,
|
|
) -> None:
|
|
self.md_name = md_name
|
|
self.translate_model = model(
|
|
key,
|
|
language,
|
|
api_base=model_api_base,
|
|
temperature=temperature,
|
|
**prompt_config_to_kwargs(prompt_config),
|
|
)
|
|
self.is_test = is_test
|
|
self.p_to_save = []
|
|
self.bilingual_result = []
|
|
self.bilingual_temp_result = []
|
|
self.test_num = test_num
|
|
self.batch_size = 10
|
|
self.single_translate = single_translate
|
|
self.md_paragraphs = []
|
|
|
|
try:
|
|
with open(f"{md_name}", encoding="utf-8") as f:
|
|
self.origin_book = f.read().splitlines()
|
|
|
|
except Exception as e:
|
|
raise Exception("can not load file") from e
|
|
|
|
self.resume = resume
|
|
self.bin_path = f"{Path(md_name).parent}/.{Path(md_name).stem}.temp.bin"
|
|
if self.resume:
|
|
self.load_state()
|
|
|
|
self.process_markdown_content()
|
|
|
|
def process_markdown_content(self):
|
|
"""将原始内容处理成 markdown 段落"""
|
|
current_paragraph = []
|
|
for line in self.origin_book:
|
|
# 如果是空行且当前段落不为空,保存当前段落
|
|
if not line.strip() and current_paragraph:
|
|
self.md_paragraphs.append('\n'.join(current_paragraph))
|
|
current_paragraph = []
|
|
# 如果是标题行,单独作为一个段落
|
|
elif line.strip().startswith('#'):
|
|
if current_paragraph:
|
|
self.md_paragraphs.append('\n'.join(current_paragraph))
|
|
current_paragraph = []
|
|
self.md_paragraphs.append(line)
|
|
# 其他情况,添加到当前段落
|
|
else:
|
|
current_paragraph.append(line)
|
|
|
|
# 处理最后一个段落
|
|
if current_paragraph:
|
|
self.md_paragraphs.append('\n'.join(current_paragraph))
|
|
|
|
@staticmethod
|
|
def _is_special_text(text):
|
|
return text.isdigit() or text.isspace() or len(text) == 0
|
|
|
|
def _make_new_book(self, book):
|
|
pass
|
|
|
|
def make_bilingual_book(self):
|
|
index = 0
|
|
p_to_save_len = len(self.p_to_save)
|
|
|
|
try:
|
|
sliced_list = [
|
|
self.md_paragraphs[i : i + self.batch_size]
|
|
for i in range(0, len(self.md_paragraphs), self.batch_size)
|
|
]
|
|
for paragraphs in sliced_list:
|
|
batch_text = '\n\n'.join(paragraphs)
|
|
if self._is_special_text(batch_text):
|
|
continue
|
|
if not self.resume or index >= p_to_save_len:
|
|
try:
|
|
max_retries = 3
|
|
retry_count = 0
|
|
while retry_count < max_retries:
|
|
try:
|
|
temp = self.translate_model.translate(batch_text)
|
|
break
|
|
except AttributeError as ae:
|
|
print(f"翻译出错: {ae}")
|
|
retry_count += 1
|
|
if retry_count == max_retries:
|
|
raise Exception("翻译模型初始化失败") from ae
|
|
except Exception as e:
|
|
print(f"翻译过程中出错: {e}")
|
|
raise Exception("翻译过程中出现错误") from e
|
|
|
|
self.p_to_save.append(temp)
|
|
if not self.single_translate:
|
|
self.bilingual_result.append(batch_text)
|
|
self.bilingual_result.append(temp)
|
|
index += self.batch_size
|
|
if self.is_test and index > self.test_num:
|
|
break
|
|
|
|
self.save_file(
|
|
f"{Path(self.md_name).parent}/{Path(self.md_name).stem}_bilingual.md",
|
|
self.bilingual_result,
|
|
)
|
|
|
|
except (KeyboardInterrupt, Exception) as e:
|
|
print(f"发生错误: {e}")
|
|
print("程序将保存进度,您可以稍后继续")
|
|
self._save_progress()
|
|
self._save_temp_book()
|
|
sys.exit(1) # 使用非零退出码表示错误
|
|
|
|
def _save_temp_book(self):
|
|
index = 0
|
|
sliced_list = [
|
|
self.origin_book[i : i + self.batch_size]
|
|
for i in range(0, len(self.origin_book), self.batch_size)
|
|
]
|
|
|
|
for i in range(len(sliced_list)):
|
|
batch_text = "".join(sliced_list[i])
|
|
self.bilingual_temp_result.append(batch_text)
|
|
if self._is_special_text(self.origin_book[i]):
|
|
continue
|
|
if index < len(self.p_to_save):
|
|
self.bilingual_temp_result.append(self.p_to_save[index])
|
|
index += 1
|
|
|
|
self.save_file(
|
|
f"{Path(self.md_name).parent}/{Path(self.md_name).stem}_bilingual_temp.txt",
|
|
self.bilingual_temp_result,
|
|
)
|
|
|
|
def _save_progress(self):
|
|
try:
|
|
with open(self.bin_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(self.p_to_save))
|
|
except:
|
|
raise Exception("can not save resume file")
|
|
|
|
def load_state(self):
|
|
try:
|
|
with open(self.bin_path, encoding="utf-8") as f:
|
|
self.p_to_save = f.read().splitlines()
|
|
except Exception as e:
|
|
raise Exception("can not load resume file") from e
|
|
|
|
def save_file(self, book_path, content):
|
|
try:
|
|
with open(book_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(content))
|
|
except:
|
|
raise Exception("can not save file")
|