mirror of
https://github.com/yihong0618/bilingual_book_maker.git
synced 2025-06-05 19:15:34 +00:00
Merge branch 'main' of https://github.com/yihong0618/bilingual_book_maker
This commit is contained in:
commit
c28ae78119
3
.gitignore
vendored
3
.gitignore
vendored
@ -131,4 +131,5 @@ dmypy.json
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
/test_books/*.epub
|
||||
/test_books/*.epub
|
||||
log/
|
||||
|
@ -42,9 +42,6 @@ bilingual_book_maker 是一个 AI 翻译工具,使用 ChatGPT 帮助用户制
|
||||
如果您需要设置 `system` 角色,可以使用以下方式配置:`--prompt '{"user":"Translate {text} to {language}", "system": "You are a professional translator."}'`,或者 `--prompt prompt_template_sample.json`(示例 JSON 文件可以在 [./prompt_template_sample.json](./prompt_template_sample.json) 找到)。
|
||||
你也可以用环境以下环境变量来配置 `system` 和 `user` 角色 prompt:`BBM_CHATGPTAPI_USER_MSG_TEMPLATE` 和 `BBM_CHATGPTAPI_SYS_MSG`。
|
||||
该参数可以是提示模板字符串,也可以是模板 `.txt` 文件的路径。
|
||||
- 翻译完会生成一本 ${book_name}_bilingual.epub 的双语书
|
||||
- 如果出现了错误或使用 `CTRL+C` 中断命令,不想接下来继续翻译了,会生成一本 ${book_name}_bilingual_temp.epub 的书,直接改成你想要的名字就可以了
|
||||
- 如果你想要翻译电子书中的无标签字符串,可以使用 `--allow_navigable_strings` 参数,会将可遍历字符串加入翻译队列,**注意,在条件允许情况下,请寻找更规范的电子书**
|
||||
- 使用`--batch_size` 参数,指定批量翻译的行数(默认行数为10,目前只对txt生效)
|
||||
|
||||
|
||||
|
18
README.md
18
README.md
@ -16,13 +16,13 @@ The bilingual_book_maker is an AI translation tool that uses ChatGPT to assist u
|
||||
## Use
|
||||
|
||||
- `pip install -r requirements.txt` or `pip install -U bbook_maker`(you can use)
|
||||
- Use `--openai_key` option to specify OpenAI API key. If you have multiple keys, separate them by commas (xxx,xxx,xxx) to reduce errors caused by API call limits.
|
||||
- Use `--openai_key` option to specify OpenAI API key. If you have multiple keys, separate them by commas (xxx,xxx,xxx) to reduce errors caused by API call limits.
|
||||
Or, just set environment variable `BMM_OPENAI_API_KEY` instead.
|
||||
- A sample book, `test_books/animal_farm.epub`, is provided for testing purposes.
|
||||
- The default underlying model is [GPT-3.5-turbo](https://openai.com/blog/introducing-chatgpt-and-whisper-apis), which is used by ChatGPT currently. Use `--model gpt3` to change the underlying model to `GPT3`
|
||||
5. support DeepL model [DeepL Translator](https://rapidapi.com/splintPRO/api/deepl-translator) need pay to get the token use `--model deepl --deepl_key ${deepl_key}`
|
||||
- Use `--test` option to preview the result if you haven't paid for the service. Note that there is a limit and it may take some time.
|
||||
- Set the target language like `--language "Simplified Chinese"`. Default target language is `"Simplified Chinese"`.
|
||||
- Set the target language like `--language "Simplified Chinese"`. Default target language is `"Simplified Chinese"`.
|
||||
Read available languages by helper message: `python make_book.py --help`
|
||||
- Use `--proxy` option to specify proxy server for internet access. Enter a string such as `http://127.0.0.1:7890`.
|
||||
- Use `--resume` option to manually resume the process after an interruption.
|
||||
@ -30,19 +30,19 @@ The bilingual_book_maker is an AI translation tool that uses ChatGPT to assist u
|
||||
Use `--translate-tags` to specify tags need for translation. Use comma to seperate multiple tags. For example:
|
||||
`--translate-tags h1,h2,h3,p,div`
|
||||
- Use `--book_from` option to specify e-reader type (Now only `kobo` is available), and use `--device_path` to specify the mounting point.
|
||||
- If you want to change api_base like using Cloudflare Workers, use `--api_base <URL>` to support it.
|
||||
- If you want to change api_base like using Cloudflare Workers, use `--api_base <URL>` to support it.
|
||||
**Note: the api url should be '`https://xxxx/v1`'. Quotation marks are required.**
|
||||
- Once the translation is complete, a bilingual book named `${book_name}_bilingual.epub` would be generated.
|
||||
- If there are any errors or you wish to interrupt the translation by pressing `CTRL+C`. A book named `${book_name}_bilingual_temp.epub` would be generated. You can simply rename it to any desired name.
|
||||
- If you want to translate strings in an e-book that aren't labeled with any tags, you can use the `--allow_navigable_strings` parameter. This will add the strings to the translation queue. **Note that it's best to look for e-books that are more standardized if possible.**
|
||||
- To tweak the prompt, use the `--prompt` parameter. Valid placeholders for the `user` role template include `{text}` and `{language}`. It supports a few ways to configure the prompt:
|
||||
If you don't need to set the `system` role content, you can simply set it up like this: `--prompt "Translate {text} to {language}."` or `--prompt prompt_template_sample.txt` (example of a text file can be found at [./prompt_template_sample.txt](./prompt_template_sample.txt)).
|
||||
If you need to set the `system` role content, you can use the following format: `--prompt '{"user":"Translate {text} to {language}", "system": "You are a professional translator."}'` or `--prompt prompt_template_sample.json` (example of a JSON file can be found at [./prompt_template_sample.json](./prompt_template_sample.json)).
|
||||
- To tweak the prompt, use the `--prompt` parameter. Valid placeholders for the `user` role template include `{text}` and `{language}`. It supports a few ways to configure the prompt:
|
||||
If you don't need to set the `system` role content, you can simply set it up like this: `--prompt "Translate {text} to {language}."` or `--prompt prompt_template_sample.txt` (example of a text file can be found at [./prompt_template_sample.txt](./prompt_template_sample.txt)).
|
||||
If you need to set the `system` role content, you can use the following format: `--prompt '{"user":"Translate {text} to {language}", "system": "You are a professional translator."}'` or `--prompt prompt_template_sample.json` (example of a JSON file can be found at [./prompt_template_sample.json](./prompt_template_sample.json)).
|
||||
You can also set the `user` and `system` role prompt by setting environment variables: `BBM_CHATGPTAPI_USER_MSG_TEMPLATE` and `BBM_CHATGPTAPI_SYS_MSG`.
|
||||
- Once the translation is complete, a bilingual book named `${book_name}_bilingual.epub` would be generated.
|
||||
- If there are any errors or you wish to interrupt the translation by pressing `CTRL+C`. A book named `${book_name}_bilingual_temp.epub` would be generated. You can simply rename it to any desired name.
|
||||
- If you want to translate strings in an e-book that aren't labeled with any tags, you can use the `--allow_navigable_strings` parameter. This will add the strings to the translation queue. **Note that it's best to look for e-books that are more standardized if possible.**
|
||||
- Use the `--batch_size` parameter to specify the number of lines for batch translation (default is 10, currently only effective for txt files).
|
||||
- `--accumulated_num` Wait for how many tokens have been accumulated before starting the translation. gpt3.5 limits the total_token to 4090. For example, if you use --accumulated_num 1600, maybe openai will
|
||||
output 2200 tokens and maybe 200 tokens for other messages in the system messages user messages, 1600+2200+200=4000, So you are close to reaching the limit. You have to choose your own
|
||||
value, there is no way to know if the limit is reached before sending
|
||||
|
||||
### Examples
|
||||
|
||||
|
@ -170,6 +170,18 @@ def main():
|
||||
metavar="PROMPT_ARG",
|
||||
help="used for customizing the prompt. It can be the prompt template string, or a path to the template file. The valid placeholders are `{text}` and `{language}`.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--accumulated_num",
|
||||
dest="accumulated_num",
|
||||
type=int,
|
||||
default=1,
|
||||
help="""Wait for how many tokens have been accumulated before starting the translation.
|
||||
gpt3.5 limits the total_token to 4090.
|
||||
For example, if you use --accumulated_num 1600, maybe openai will output 2200 tokens
|
||||
and maybe 200 tokens for other messages in the system messages user messages, 1600+2200+200=4000,
|
||||
So you are close to reaching the limit. You have to choose your own value, there is no way to know if the limit is reached before sending
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--batch_size",
|
||||
dest="batch_size",
|
||||
@ -250,6 +262,7 @@ def main():
|
||||
test_num=options.test_num,
|
||||
translate_tags=options.translate_tags,
|
||||
allow_navigable_strings=options.allow_navigable_strings,
|
||||
accumulated_num=options.accumulated_num,
|
||||
prompt_config=parse_prompt_arg(options.prompt_arg),
|
||||
batch_size=options.batch_size,
|
||||
)
|
||||
|
@ -1,5 +1,7 @@
|
||||
import os
|
||||
import re
|
||||
import pickle
|
||||
import tiktoken
|
||||
import sys
|
||||
from copy import copy
|
||||
from pathlib import Path
|
||||
@ -15,6 +17,95 @@ from book_maker.utils import prompt_config_to_kwargs
|
||||
from .base_loader import BaseBookLoader
|
||||
|
||||
|
||||
class EPUBBookLoaderHelper:
|
||||
def __init__(self, translate_model, accumulated_num):
|
||||
self.translate_model = translate_model
|
||||
self.accumulated_num = accumulated_num
|
||||
|
||||
def deal_new(self, p, wait_p_list):
|
||||
self.deal_old(wait_p_list)
|
||||
new_p = copy(p)
|
||||
new_p.string = self.translate_model.translate(p.text)
|
||||
p.insert_after(new_p)
|
||||
|
||||
def deal_old(self, wait_p_list):
|
||||
if len(wait_p_list) == 0:
|
||||
return
|
||||
|
||||
result_txt_list = self.translate_model.translate_list(wait_p_list)
|
||||
|
||||
for i in range(len(wait_p_list)):
|
||||
if i < len(result_txt_list):
|
||||
p = wait_p_list[i]
|
||||
new_p = copy(p)
|
||||
new_p.string = result_txt_list[i]
|
||||
p.insert_after(new_p)
|
||||
|
||||
wait_p_list.clear()
|
||||
|
||||
|
||||
# ref: https://platform.openai.com/docs/guides/chat/introduction
|
||||
def num_tokens_from_text(text, model="gpt-3.5-turbo-0301"):
|
||||
messages = (
|
||||
{
|
||||
"role": "user",
|
||||
"content": text,
|
||||
},
|
||||
)
|
||||
|
||||
"""Returns the number of tokens used by a list of messages."""
|
||||
try:
|
||||
encoding = tiktoken.encoding_for_model(model)
|
||||
except KeyError:
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
if model == "gpt-3.5-turbo-0301": # note: future models may deviate from this
|
||||
num_tokens = 0
|
||||
for message in messages:
|
||||
num_tokens += (
|
||||
4 # every message follows <im_start>{role/name}\n{content}<im_end>\n
|
||||
)
|
||||
for key, value in message.items():
|
||||
num_tokens += len(encoding.encode(value))
|
||||
if key == "name": # if there's a name, the role is omitted
|
||||
num_tokens += -1 # role is always required and always 1 token
|
||||
num_tokens += 2 # every reply is primed with <im_start>assistant
|
||||
return num_tokens
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"""num_tokens_from_messages() is not presently implemented for model {model}.
|
||||
See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
|
||||
)
|
||||
|
||||
|
||||
def is_link(text):
|
||||
url_pattern = re.compile(
|
||||
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
|
||||
)
|
||||
return bool(url_pattern.match(text.strip()))
|
||||
|
||||
|
||||
def is_tail_Link(text, num=100):
|
||||
text = text.strip()
|
||||
url_pattern = re.compile(
|
||||
r".*http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+$"
|
||||
)
|
||||
return bool(url_pattern.match(text)) and len(text) < num
|
||||
|
||||
|
||||
def is_source(text):
|
||||
return text.strip().startswith("Source: ")
|
||||
|
||||
|
||||
def is_list(text, num=80):
|
||||
text = text.strip()
|
||||
return re.match(r"^Listing\s*\d+", text) and len(text) < num
|
||||
|
||||
|
||||
def is_figure(text, num=80):
|
||||
text = text.strip()
|
||||
return re.match(r"^Figure\s*\d+", text) and len(text) < num
|
||||
|
||||
|
||||
class EPUBBookLoader(BaseBookLoader):
|
||||
def __init__(
|
||||
self,
|
||||
@ -29,6 +120,8 @@ class EPUBBookLoader(BaseBookLoader):
|
||||
test_num=5,
|
||||
translate_tags="p",
|
||||
allow_navigable_strings=False,
|
||||
accumulated_num=1,
|
||||
prompt_template=None,
|
||||
prompt_config=None,
|
||||
):
|
||||
self.epub_name = epub_name
|
||||
@ -43,6 +136,8 @@ class EPUBBookLoader(BaseBookLoader):
|
||||
self.test_num = test_num
|
||||
self.translate_tags = translate_tags
|
||||
self.allow_navigable_strings = allow_navigable_strings
|
||||
self.accumulated_num = accumulated_num
|
||||
self.helper = EPUBBookLoaderHelper(self.translate_model, self.accumulated_num)
|
||||
|
||||
try:
|
||||
self.origin_book = epub.read_epub(self.epub_name)
|
||||
@ -70,7 +165,7 @@ class EPUBBookLoader(BaseBookLoader):
|
||||
|
||||
@staticmethod
|
||||
def _is_special_text(text):
|
||||
return text.isdigit() or text.isspace()
|
||||
return text.isdigit() or text.isspace() or is_link(text)
|
||||
|
||||
def _make_new_book(self, book):
|
||||
new_book = epub.EpubBook()
|
||||
@ -79,6 +174,70 @@ class EPUBBookLoader(BaseBookLoader):
|
||||
new_book.toc = book.toc
|
||||
return new_book
|
||||
|
||||
def _process_paragraph(self, p, index, p_to_save_len):
|
||||
if not p.text or self._is_special_text(p.text):
|
||||
return index
|
||||
|
||||
new_p = copy(p)
|
||||
|
||||
if self.resume and index < p_to_save_len:
|
||||
new_p.string = self.p_to_save[index]
|
||||
else:
|
||||
if type(p) == NavigableString:
|
||||
new_p = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p)
|
||||
else:
|
||||
new_p.string = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p.text)
|
||||
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
|
||||
if index % 20 == 0:
|
||||
self._save_progress()
|
||||
|
||||
return index
|
||||
|
||||
def translate_paragraphs_acc(self, p_list, send_num):
|
||||
count = 0
|
||||
wait_p_list = []
|
||||
for i in range(len(p_list)):
|
||||
p = p_list[i]
|
||||
temp_p = copy(p)
|
||||
for sup in temp_p.find_all("sup"):
|
||||
sup.extract()
|
||||
if (
|
||||
not p.text
|
||||
or self._is_special_text(temp_p.text)
|
||||
or is_source(temp_p.text)
|
||||
or is_list(temp_p.text)
|
||||
or is_figure(temp_p.text)
|
||||
or is_tail_Link(temp_p.text)
|
||||
):
|
||||
continue
|
||||
length = num_tokens_from_text(temp_p.text)
|
||||
if length > send_num:
|
||||
self.helper.deal_new(p, wait_p_list)
|
||||
continue
|
||||
if i == len(p_list) - 1:
|
||||
if count + length < send_num:
|
||||
wait_p_list.append(p)
|
||||
self.helper.deal_old(wait_p_list)
|
||||
else:
|
||||
self.helper.deal_new(p, wait_p_list)
|
||||
break
|
||||
if count + length < send_num:
|
||||
count += length
|
||||
wait_p_list.append(p)
|
||||
# This is because the more paragraphs, the easier it is possible to translate different numbers of paragraphs, maybe you should find better values than 15 and 2
|
||||
# if len(wait_p_list) > 15 and count > send_num / 2:
|
||||
# self.helper.deal_old(wait_p_list)
|
||||
# count = 0
|
||||
else:
|
||||
self.helper.deal_old(wait_p_list)
|
||||
wait_p_list.append(p)
|
||||
count = length
|
||||
|
||||
def make_bilingual_book(self):
|
||||
new_book = self._make_new_book(self.origin_book)
|
||||
all_items = list(self.origin_book.get_items())
|
||||
@ -99,46 +258,56 @@ class EPUBBookLoader(BaseBookLoader):
|
||||
index = 0
|
||||
p_to_save_len = len(self.p_to_save)
|
||||
try:
|
||||
# Add the things that don't need to be translated first, so that you can see the img after the interruption
|
||||
for item in self.origin_book.get_items():
|
||||
if item.get_type() == ITEM_DOCUMENT:
|
||||
soup = bs(item.content, "html.parser")
|
||||
p_list = soup.findAll(trans_taglist)
|
||||
if self.allow_navigable_strings:
|
||||
p_list.extend(soup.findAll(text=True))
|
||||
if item.get_type() != ITEM_DOCUMENT:
|
||||
new_book.add_item(item)
|
||||
|
||||
for item in self.origin_book.get_items_of_type(ITEM_DOCUMENT):
|
||||
# if item.file_name != "OEBPS/ch01.xhtml":
|
||||
# continue
|
||||
if not os.path.exists("log"):
|
||||
os.makedirs("log")
|
||||
|
||||
soup = bs(item.content, "html.parser")
|
||||
p_list = soup.findAll(trans_taglist)
|
||||
if self.allow_navigable_strings:
|
||||
p_list.extend(soup.findAll(text=True))
|
||||
|
||||
send_num = self.accumulated_num
|
||||
if send_num > 1:
|
||||
with open("log/buglog.txt", "a") as f:
|
||||
print(f"------------- {item.file_name} -------------", file=f)
|
||||
|
||||
print("------------------------------------------------------")
|
||||
print(f"dealing {item.file_name} ...")
|
||||
self.translate_paragraphs_acc(p_list, send_num)
|
||||
else:
|
||||
is_test_done = self.is_test and index > self.test_num
|
||||
for p in p_list:
|
||||
if is_test_done or not p.text or self._is_special_text(p.text):
|
||||
continue
|
||||
new_p = copy(p)
|
||||
# TODO banch of p to translate then combine
|
||||
# PR welcome here
|
||||
if self.resume and index < p_to_save_len:
|
||||
new_p.string = self.p_to_save[index]
|
||||
else:
|
||||
if type(p) == NavigableString:
|
||||
new_p = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p)
|
||||
else:
|
||||
new_p.string = self.translate_model.translate(p.text)
|
||||
self.p_to_save.append(new_p.text)
|
||||
p.insert_after(new_p)
|
||||
index += 1
|
||||
if index % 20 == 0:
|
||||
self._save_progress()
|
||||
if is_test_done:
|
||||
break
|
||||
index = self._process_paragraph(p, index, p_to_save_len)
|
||||
# pbar.update(delta) not pbar.update(index)?
|
||||
pbar.update(1)
|
||||
if self.is_test and index >= self.test_num:
|
||||
break
|
||||
item.content = soup.prettify().encode()
|
||||
|
||||
item.content = soup.prettify().encode()
|
||||
new_book.add_item(item)
|
||||
if self.accumulated_num > 1:
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
||||
name, _ = os.path.splitext(self.epub_name)
|
||||
epub.write_epub(f"{name}_bilingual.epub", new_book, {})
|
||||
pbar.close()
|
||||
if self.accumulated_num == 1:
|
||||
pbar.close()
|
||||
except (KeyboardInterrupt, Exception) as e:
|
||||
print(e)
|
||||
print("you can resume it next time")
|
||||
self._save_progress()
|
||||
self._save_temp_book()
|
||||
if self.accumulated_num == 1:
|
||||
print("you can resume it next time")
|
||||
self._save_progress()
|
||||
self._save_temp_book()
|
||||
sys.exit(0)
|
||||
|
||||
def load_state(self):
|
||||
|
@ -20,6 +20,8 @@ class TXTBookLoader(BaseBookLoader):
|
||||
model_api_base=None,
|
||||
is_test=False,
|
||||
test_num=5,
|
||||
accumulated_num=1,
|
||||
prompt_template=None,
|
||||
prompt_config=None,
|
||||
):
|
||||
self.txt_name = txt_name
|
||||
@ -102,7 +104,7 @@ class TXTBookLoader(BaseBookLoader):
|
||||
for i in range(0, len(self.origin_book), self.batch_size)
|
||||
]
|
||||
|
||||
for i in range(0, len(sliced_list)):
|
||||
for i in range(len(sliced_list)):
|
||||
batch_text = "".join(sliced_list[i])
|
||||
self.bilingual_temp_result.append(batch_text)
|
||||
if self._is_special_text(self.origin_book[i]):
|
||||
|
@ -1,4 +1,6 @@
|
||||
import time
|
||||
import re
|
||||
from copy import copy
|
||||
from os import environ
|
||||
|
||||
import openai
|
||||
@ -38,57 +40,210 @@ class ChatGPTAPI(Base):
|
||||
"OPENAI_API_SYS_MSG"
|
||||
) # XXX: for backward compatability, deprecate soon
|
||||
or environ.get(PROMPT_ENV_MAP["system"])
|
||||
or ""
|
||||
)
|
||||
self.system_content = environ.get("OPENAI_API_SYS_MSG") or ""
|
||||
|
||||
max_num_token = -1
|
||||
|
||||
def rotate_key(self):
|
||||
openai.api_key = next(self.keys)
|
||||
|
||||
def get_translation(self, text):
|
||||
self.rotate_key()
|
||||
messages = []
|
||||
if self.prompt_sys_msg:
|
||||
messages.append(
|
||||
{"role": "system", "content": self.prompt_sys_msg},
|
||||
)
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": self.prompt_template.format(
|
||||
text=text, language=self.language
|
||||
),
|
||||
}
|
||||
)
|
||||
def create_chat_completion(self, text):
|
||||
content = self.prompt_template.format(text=text, language=self.language)
|
||||
sys_content = self.prompt_sys_msg
|
||||
if self.system_content:
|
||||
sys_content = self.system_content
|
||||
messages = [
|
||||
{"role": "system", "content": sys_content},
|
||||
{"role": "user", "content": content},
|
||||
]
|
||||
|
||||
completion = openai.ChatCompletion.create(
|
||||
return openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=messages,
|
||||
)
|
||||
t_text = (
|
||||
completion["choices"][0]
|
||||
.get("message")
|
||||
.get("content")
|
||||
.encode("utf8")
|
||||
.decode()
|
||||
)
|
||||
return t_text
|
||||
|
||||
def translate(self, text):
|
||||
# todo: Determine whether to print according to the cli option
|
||||
print(text)
|
||||
def get_translation(self, text):
|
||||
self.rotate_key()
|
||||
|
||||
completion = {}
|
||||
try:
|
||||
t_text = self.get_translation(text)
|
||||
except Exception as e:
|
||||
# todo: better sleep time? why sleep alawys about key_len
|
||||
# 1. openai server error or own network interruption, sleep for a fixed time
|
||||
# 2. an apikey has no money or reach limit, don’t sleep, just replace it with another apikey
|
||||
# 3. all apikey reach limit, then use current sleep
|
||||
sleep_time = int(60 / self.key_len)
|
||||
print(e, f"will sleep {sleep_time} seconds")
|
||||
time.sleep(sleep_time)
|
||||
completion = self.create_chat_completion(text)
|
||||
except Exception:
|
||||
if (
|
||||
not "choices" in completion
|
||||
or not isinstance(completion["choices"], list)
|
||||
or len(completion["choices"]) == 0
|
||||
):
|
||||
raise
|
||||
if completion["choices"][0]["finish_reason"] != "length":
|
||||
raise
|
||||
|
||||
t_text = self.get_translation(text)
|
||||
# work well or exception finish by length limit
|
||||
choice = completion["choices"][0]
|
||||
|
||||
t_text = choice.get("message").get("content").encode("utf8").decode()
|
||||
|
||||
if choice["finish_reason"] == "length":
|
||||
with open("long_text.txt", "a") as f:
|
||||
print(
|
||||
f"""==================================================
|
||||
The total token is too long and cannot be completely translated\n
|
||||
{text}
|
||||
""",
|
||||
file=f,
|
||||
)
|
||||
|
||||
# usage = completion["usage"]
|
||||
# print(f"total_token: {usage['total_tokens']}")
|
||||
# if int(usage["total_tokens"]) > self.max_num_token:
|
||||
# self.max_num_token = int(usage["total_tokens"])
|
||||
# print(
|
||||
# f"{usage['total_tokens']} {usage['prompt_tokens']} {usage['completion_tokens']} {self.max_num_token} (total_token, prompt_token, completion_tokens, max_history_total_token)"
|
||||
# )
|
||||
return t_text
|
||||
|
||||
def translate(self, text, needprint=True):
|
||||
# print("=================================================")
|
||||
start_time = time.time()
|
||||
# todo: Determine whether to print according to the cli option
|
||||
if needprint:
|
||||
print(re.sub("\n{3,}", "\n\n", text))
|
||||
|
||||
attempt_count = 0
|
||||
max_attempts = 3
|
||||
t_text = ""
|
||||
|
||||
while attempt_count < max_attempts:
|
||||
try:
|
||||
t_text = self.get_translation(text)
|
||||
break
|
||||
except Exception as e:
|
||||
# todo: better sleep time? why sleep alawys about key_len
|
||||
# 1. openai server error or own network interruption, sleep for a fixed time
|
||||
# 2. an apikey has no money or reach limit, don’t sleep, just replace it with another apikey
|
||||
# 3. all apikey reach limit, then use current sleep
|
||||
sleep_time = int(60 / self.key_len)
|
||||
print(e, f"will sleep {sleep_time} seconds")
|
||||
time.sleep(sleep_time)
|
||||
attempt_count += 1
|
||||
if attempt_count == max_attempts:
|
||||
print(f"Get {attempt_count} consecutive exceptions")
|
||||
raise
|
||||
|
||||
# todo: Determine whether to print according to the cli option
|
||||
print(t_text.strip())
|
||||
if needprint:
|
||||
print(re.sub("\n{3,}", "\n\n", t_text))
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
# print(f"translation time: {elapsed_time:.1f}s")
|
||||
|
||||
return t_text
|
||||
|
||||
def translate_and_split_lines(self, text):
|
||||
result_str = self.translate(text, False)
|
||||
lines = result_str.split("\n")
|
||||
lines = [line.strip() for line in lines if line.strip() != ""]
|
||||
return lines
|
||||
|
||||
def get_best_result_list(
|
||||
self, plist_len, new_str, sleep_dur, result_list, max_retries=15
|
||||
):
|
||||
if len(result_list) == plist_len:
|
||||
return result_list, 0
|
||||
|
||||
best_result_list = result_list
|
||||
retry_count = 0
|
||||
|
||||
while retry_count < max_retries and len(result_list) != plist_len:
|
||||
print(
|
||||
f"bug: {plist_len} -> {len(result_list)} : Number of paragraphs before and after translation"
|
||||
)
|
||||
print(f"sleep for {sleep_dur}s and retry {retry_count+1} ...")
|
||||
time.sleep(sleep_dur)
|
||||
retry_count += 1
|
||||
result_list = self.translate_and_split_lines(new_str)
|
||||
if (
|
||||
len(result_list) == plist_len
|
||||
or len(best_result_list) < len(result_list) <= plist_len
|
||||
or (
|
||||
len(result_list) < len(best_result_list)
|
||||
and len(best_result_list) > plist_len
|
||||
)
|
||||
):
|
||||
best_result_list = result_list
|
||||
|
||||
return best_result_list, retry_count
|
||||
|
||||
def log_retry(self, state, retry_count, elapsed_time, log_path="log/buglog.txt"):
|
||||
if retry_count == 0:
|
||||
return
|
||||
print(f"retry {state}")
|
||||
with open(log_path, "a") as f:
|
||||
print(
|
||||
f"retry {state}, count = {retry_count}, time = {elapsed_time:.1f}s",
|
||||
file=f,
|
||||
)
|
||||
|
||||
def log_translation_mismatch(
|
||||
self, plist_len, result_list, new_str, sep, log_path="log/buglog.txt"
|
||||
):
|
||||
if len(result_list) == plist_len:
|
||||
return
|
||||
newlist = new_str.split(sep)
|
||||
with open(log_path, "a") as f:
|
||||
print(f"problem size: {plist_len - len(result_list)}", file=f)
|
||||
for i in range(len(newlist)):
|
||||
print(newlist[i], file=f)
|
||||
print(file=f)
|
||||
if i < len(result_list):
|
||||
print(result_list[i], file=f)
|
||||
print(file=f)
|
||||
print("=============================", file=f)
|
||||
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
||||
|
||||
print(
|
||||
f"bug: {plist_len} paragraphs of text translated into {len(result_list)} paragraphs"
|
||||
)
|
||||
print("continue")
|
||||
|
||||
def translate_list(self, plist):
|
||||
sep = "\n\n\n\n\n"
|
||||
# new_str = sep.join([item.text for item in plist])
|
||||
|
||||
new_str = ""
|
||||
i = 1
|
||||
for p in plist:
|
||||
temp_p = copy(p)
|
||||
for sup in temp_p.find_all("sup"):
|
||||
sup.extract()
|
||||
new_str += f"({i}) " + temp_p.get_text().strip() + sep
|
||||
i = i + 1
|
||||
|
||||
if new_str.endswith(sep):
|
||||
new_str = new_str[: -len(sep)]
|
||||
|
||||
plist_len = len(plist)
|
||||
|
||||
print(f"plist len = {len(plist)}")
|
||||
|
||||
result_list = self.translate_and_split_lines(new_str)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
result_list, retry_count = self.get_best_result_list(
|
||||
plist_len, new_str, 6, result_list
|
||||
)
|
||||
|
||||
end_time = time.time()
|
||||
|
||||
state = "fail" if len(result_list) != plist_len else "success"
|
||||
log_path = "log/buglog.txt"
|
||||
|
||||
self.log_retry(state, retry_count, end_time - start_time, log_path)
|
||||
self.log_translation_mismatch(plist_len, result_list, new_str, sep, log_path)
|
||||
|
||||
# del (num), num. sometime (num) will translated to num.
|
||||
result_list = [re.sub(r"^(\(\d+\)|\d+\.)\s*", "", s) for s in result_list]
|
||||
return result_list
|
||||
|
@ -3,4 +3,5 @@ openai
|
||||
requests
|
||||
ebooklib
|
||||
rich
|
||||
tqdm
|
||||
tqdm
|
||||
tiktoken
|
||||
|
Loading…
x
Reference in New Issue
Block a user