mirror of
https://github.com/yihong0618/bilingual_book_maker.git
synced 2025-06-02 09:30:24 +00:00
Add options for books on kobo e-reader (#138)
Add obok.py as file selector for kobo device --------- Co-authored-by: Hsieh Chin Fan <pham@topo.tw>
This commit is contained in:
parent
c0b3e0c2d5
commit
2384fb3fe2
14
README-CN.md
14
README-CN.md
@ -28,11 +28,12 @@ bilingual_book_maker 是一个 AI 翻译工具,使用 ChatGPT 帮助用户制
|
||||
9. epub 由 html 文件组成。默认情况下,我们只翻译 `<p>` 中的内容。
|
||||
使用 `--translate-tags` 指定需要翻译的标签。使用逗号分隔多个标签。例如:
|
||||
`--translate-tags h1,h2,h3,p,div`
|
||||
10. 如果你遇到了墙需要用 Cloudflare Workers 替换 api_base 请使用 `--api_base ${url}` 来替换。
|
||||
10. 请使用 --book_from 选项指定电子阅读器类型(现在只有 kobo 可用),并使用 --device_path 指定挂载点。
|
||||
11. 如果你遇到了墙需要用 Cloudflare Workers 替换 api_base 请使用 `--api_base ${url}` 来替换。
|
||||
**请注意,此处你输入的api应该是'`https://xxxx/v1`'的字样,域名需要用引号包裹**
|
||||
11. 翻译完会生成一本 ${book_name}_bilingual.epub 的双语书
|
||||
12. 如果出现了错误或使用 `CTRL+C` 中断命令,不想接下来继续翻译了,会生成一本 ${book_name}_bilingual_temp.epub 的书,直接改成你想要的名字就可以了
|
||||
13. 如果你想要翻译电子书中的无标签字符串,可以使用 `--allow_navigable_strings` 参数,会将可遍历字符串加入翻译队列,**注意,在条件允许情况下,请寻找更规范的电子书**
|
||||
12. 翻译完会生成一本 ${book_name}_bilingual.epub 的双语书
|
||||
13. 如果出现了错误或使用 `CTRL+C` 中断命令,不想接下来继续翻译了,会生成一本 ${book_name}_bilingual_temp.epub 的书,直接改成你想要的名字就可以了
|
||||
14. 如果你想要翻译电子书中的无标签字符串,可以使用 `--allow_navigable_strings` 参数,会将可遍历字符串加入翻译队列,**注意,在条件允许情况下,请寻找更规范的电子书**
|
||||
|
||||
e.g.
|
||||
```shell
|
||||
@ -51,8 +52,11 @@ python3 make_book.py --book_name test_books/animal_farm.epub --model gpt3 --lang
|
||||
# Translate contents in <div> and <p>
|
||||
python3 make_book.py --book_name test_books/animal_farm.epub --translate-tags div,p
|
||||
|
||||
# 翻译 kobo e-reader 中,來自 Rakuten Kobo 的书籍
|
||||
python3 make_book.py --book_from kobo --device_path /tmp/kobo
|
||||
|
||||
# 翻译 txt 文件
|
||||
python3 make_book.py --book_name test_books/the_little_prince.txt -openai_key ${openai_key} --test
|
||||
python3 make_book.py --book_name test_books/the_little_prince.txt --test
|
||||
```
|
||||
|
||||
更加小白的示例
|
||||
|
14
README.md
14
README.md
@ -32,11 +32,12 @@ The bilingual_book_maker is an AI translation tool that uses ChatGPT to assist u
|
||||
9. epub is made of html files. By default, we only translate contents in `<p>`.
|
||||
Use `--translate-tags` to specify tags need for translation. Use comma to seperate multiple tags. For example:
|
||||
`--translate-tags h1,h2,h3,p,div`
|
||||
10. If you want to change api_base like using Cloudflare Workers, use `--api_base <URL>` to support it.
|
||||
10. Use `--book_from` option to specify e-reader type (Now only `kobo` is available), and use `--device_path` to specify the mounting point.
|
||||
11. If you want to change api_base like using Cloudflare Workers, use `--api_base <URL>` to support it.
|
||||
**Note: the api url should be '`https://xxxx/v1`'. Quotation marks are required.**
|
||||
11. Once the translation is complete, a bilingual book named `${book_name}_bilingual.epub` would be generated.
|
||||
12. If there are any errors or you wish to interrupt the translation by pressing `CTRL+C`. A book named `${book_name}_bilingual_temp.epub` would be generated. You can simply rename it to any desired name.
|
||||
13. If you want to translate strings in an e-book that aren't labeled with any tags, you can use the `--allow_navigable_strings` parameter. This will add the strings to the translation queue. **Note that it's best to look for e-books that are more standardized if possible.**
|
||||
12. Once the translation is complete, a bilingual book named `${book_name}_bilingual.epub` would be generated.
|
||||
13. If there are any errors or you wish to interrupt the translation by pressing `CTRL+C`. A book named `${book_name}_bilingual_temp.epub` would be generated. You can simply rename it to any desired name.
|
||||
14. If you want to translate strings in an e-book that aren't labeled with any tags, you can use the `--allow_navigable_strings` parameter. This will add the strings to the translation queue. **Note that it's best to look for e-books that are more standardized if possible.**
|
||||
|
||||
### Eamples
|
||||
|
||||
@ -56,8 +57,11 @@ python3 make_book.py --book_name test_books/animal_farm.epub --model gpt3 --lang
|
||||
# Translate contents in <div> and <p>
|
||||
python3 make_book.py --book_name test_books/animal_farm.epub --translate-tags div,p
|
||||
|
||||
# Translate books download from Rakuten Kobo on kobo e-reader
|
||||
python3 make_book.py --book_from kobo --device_path /tmp/kobo
|
||||
|
||||
# translate txt file
|
||||
python3 make_book.py --book_name test_books/the_little_prince.txt -openai_key ${openai_key} --test --language zh-hans
|
||||
python3 make_book.py --book_name test_books/the_little_prince.txt --test --language zh-hans
|
||||
```
|
||||
|
||||
More understandable example
|
||||
|
@ -5,6 +5,7 @@ from os import environ as env
|
||||
from book_maker.loader import BOOK_LOADER_DICT
|
||||
from book_maker.translator import MODEL_DICT
|
||||
from book_maker.utils import LANGUAGES, TO_LANGUAGE_CODE
|
||||
import book_maker.obok as obok
|
||||
|
||||
|
||||
def main():
|
||||
@ -15,6 +16,20 @@ def main():
|
||||
type=str,
|
||||
help="path of the epub file to be translated",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--book_from",
|
||||
dest="book_from",
|
||||
type=str,
|
||||
choices=["kobo"], # support kindle later
|
||||
metavar="E-READER",
|
||||
help="e-reader type, available: {%(choices)s}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--device_path",
|
||||
dest="device_path",
|
||||
type=str,
|
||||
help="Path of e-reader device",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--openai_key",
|
||||
dest="openai_key",
|
||||
@ -108,6 +123,14 @@ def main():
|
||||
else:
|
||||
OPENAI_API_KEY = ""
|
||||
|
||||
if options.book_from == "kobo":
|
||||
device_path = options.device_path
|
||||
if device_path is None:
|
||||
raise Exception(
|
||||
"Device path is not given, please specify the path by --device_path <DEVICE_PATH>"
|
||||
)
|
||||
options.book_name = obok.cli_main(device_path)
|
||||
|
||||
book_type = options.book_name.split(".")[-1]
|
||||
support_type_list = list(BOOK_LOADER_DICT.keys())
|
||||
if book_type not in support_type_list:
|
||||
|
866
book_maker/obok.py
Normal file
866
book_maker/obok.py
Normal file
@ -0,0 +1,866 @@
|
||||
# The original code comes from:
|
||||
# https://github.com/apprenticeharper/DeDRM_tools
|
||||
|
||||
# Version 4.1.1 March 2023
|
||||
# Make obok.py works as file selector
|
||||
|
||||
# Version 4.1.0 February 2021
|
||||
# Add detection for Kobo directory location on Linux
|
||||
|
||||
# Version 4.0.0 September 2020
|
||||
# Python 3.0
|
||||
#
|
||||
# Version 3.2.5 December 2016
|
||||
# Improve detection of good text decryption.
|
||||
#
|
||||
# Version 3.2.4 December 2016
|
||||
# Remove incorrect support for Kobo Desktop under Wine
|
||||
#
|
||||
# Version 3.2.3 October 2016
|
||||
# Fix for windows network user and more xml fixes
|
||||
#
|
||||
# Version 3.2.2 October 2016
|
||||
# Change to the way the new database version is handled.
|
||||
#
|
||||
# Version 3.2.1 September 2016
|
||||
# Update for v4.0 of Windows Desktop app.
|
||||
#
|
||||
# Version 3.2.0 January 2016
|
||||
# Update for latest version of Windows Desktop app.
|
||||
# Support Kobo devices in the command line version.
|
||||
#
|
||||
# Version 3.1.9 November 2015
|
||||
# Handle Kobo Desktop under wine on Linux
|
||||
#
|
||||
# Version 3.1.8 November 2015
|
||||
# Handle the case of Kobo Arc or Vox device (i.e. don't crash).
|
||||
#
|
||||
# Version 3.1.7 October 2015
|
||||
# Handle the case of no device or database more gracefully.
|
||||
#
|
||||
# Version 3.1.6 September 2015
|
||||
# Enable support for Kobo devices
|
||||
# More character encoding fixes (unicode strings)
|
||||
#
|
||||
# Version 3.1.5 September 2015
|
||||
# Removed requirement that a purchase has been made.
|
||||
# Also add in character encoding fixes
|
||||
#
|
||||
# Version 3.1.4 September 2015
|
||||
# Updated for version 3.17 of the Windows Desktop app.
|
||||
#
|
||||
# Version 3.1.3 August 2015
|
||||
# Add translations for Portuguese and Arabic
|
||||
#
|
||||
# Version 3.1.2 January 2015
|
||||
# Add coding, version number and version announcement
|
||||
#
|
||||
# Version 3.05 October 2014
|
||||
# Identifies DRM-free books in the dialog
|
||||
#
|
||||
# Version 3.04 September 2014
|
||||
# Handles DRM-free books as well (sometimes Kobo Library doesn't
|
||||
# show download link for DRM-free books)
|
||||
#
|
||||
# Version 3.03 August 2014
|
||||
# If PyCrypto is unavailable try to use libcrypto for AES_ECB.
|
||||
#
|
||||
# Version 3.02 August 2014
|
||||
# Relax checking of application/xhtml+xml and image/jpeg content.
|
||||
#
|
||||
# Version 3.01 June 2014
|
||||
# Check image/jpeg as well as application/xhtml+xml content. Fix typo
|
||||
# in Windows ipconfig parsing.
|
||||
#
|
||||
# Version 3.0 June 2014
|
||||
# Made portable for Mac and Windows, and the only module dependency
|
||||
# not part of python core is PyCrypto. Major code cleanup/rewrite.
|
||||
# No longer tries the first MAC address; tries them all if it detects
|
||||
# the decryption failed.
|
||||
#
|
||||
# Updated September 2013 by Anon
|
||||
# Version 2.02
|
||||
# Incorporated minor fixes posted at Apprentice Alf's.
|
||||
#
|
||||
# Updates July 2012 by Michael Newton
|
||||
# PWSD ID is no longer a MAC address, but should always
|
||||
# be stored in the registry. Script now works with OS X
|
||||
# and checks plist for values instead of registry. Must
|
||||
# have biplist installed for OS X support.
|
||||
#
|
||||
# Original comments left below; note the "AUTOPSY" is inaccurate. See
|
||||
# KoboLibrary.userkeys and KoboFile.decrypt()
|
||||
#
|
||||
##########################################################
|
||||
# KOBO DRM CRACK BY #
|
||||
# PHYSISTICATED #
|
||||
##########################################################
|
||||
# This app was made for Python 2.7 on Windows 32-bit
|
||||
#
|
||||
# This app needs pycrypto - get from here:
|
||||
# http://www.voidspace.org.uk/python/modules.shtml
|
||||
#
|
||||
# Usage: obok.py
|
||||
# Choose the book you want to decrypt
|
||||
#
|
||||
# Shouts to my krew - you know who you are - and one in
|
||||
# particular who gave me a lot of help with this - thank
|
||||
# you so much!
|
||||
#
|
||||
# Kopimi /K\
|
||||
# Keep sharing, keep copying, but remember that nothing is
|
||||
# for free - make sure you compensate your favorite
|
||||
# authors - and cut out the middle man whenever possible
|
||||
# ;) ;) ;)
|
||||
#
|
||||
# DRM AUTOPSY
|
||||
# The Kobo DRM was incredibly easy to crack, but it took
|
||||
# me months to get around to making this. Here's the
|
||||
# basics of how it works:
|
||||
# 1: Get MAC address of first NIC in ipconfig (sometimes
|
||||
# stored in registry as pwsdid)
|
||||
# 2: Get user ID (stored in tons of places, this gets it
|
||||
# from HKEY_CURRENT_USER\Software\Kobo\Kobo Desktop
|
||||
# Edition\Browser\cookies)
|
||||
# 3: Concatenate and SHA256, take the second half - this
|
||||
# is your master key
|
||||
# 4: Open %LOCALAPPDATA%\Kobo Desktop Editions\Kobo.sqlite
|
||||
# and dump content_keys
|
||||
# 5: Unbase64 the keys, then decode these with the master
|
||||
# key - these are your page keys
|
||||
# 6: Unzip EPUB of your choice, decrypt each page with its
|
||||
# page key, then zip back up again
|
||||
#
|
||||
# WHY USE THIS WHEN INEPT WORKS FINE? (adobe DRM stripper)
|
||||
# Inept works very well, but authors on Kobo can choose
|
||||
# what DRM they want to use - and some have chosen not to
|
||||
# let people download them with Adobe Digital Editions -
|
||||
# they would rather lock you into a single platform.
|
||||
#
|
||||
# With Obok, you can sync Kobo Desktop, decrypt all your
|
||||
# ebooks, and then use them on whatever device you want
|
||||
# - you bought them, you own them, you can do what you
|
||||
# like with them.
|
||||
#
|
||||
# Obok is Kobo backwards, but it is also means "next to"
|
||||
# in Polish.
|
||||
# When you buy a real book, it is right next to you. You
|
||||
# can read it at home, at work, on a train, you can lend
|
||||
# it to a friend, you can scribble on it, and add your own
|
||||
# explanations/translations.
|
||||
#
|
||||
# Obok gives you this power over your ebooks - no longer
|
||||
# are you restricted to one device. This allows you to
|
||||
# embed foreign fonts into your books, as older Kobo's
|
||||
# can't display them properly. You can read your books
|
||||
# on your phones, in different PC readers, and different
|
||||
# ereader devices. You can share them with your friends
|
||||
# too, if you like - you can do that with a real book
|
||||
# after all.
|
||||
#
|
||||
"""Manage all Kobo books, either encrypted or DRM-free."""
|
||||
from __future__ import print_function
|
||||
|
||||
__version__ = "4.0.0"
|
||||
__about__ = "Obok v{0}\nCopyright © 2012-2020 Physisticated et al.".format(__version__)
|
||||
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import sqlite3
|
||||
import base64
|
||||
import binascii
|
||||
import re
|
||||
import zipfile
|
||||
import hashlib
|
||||
import xml.etree.ElementTree as ET
|
||||
import string
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
can_parse_xml = True
|
||||
try:
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
# print "using xml.etree for xml parsing"
|
||||
except ImportError:
|
||||
can_parse_xml = False
|
||||
# print "Cannot find xml.etree, disabling extraction of serial numbers"
|
||||
|
||||
# List of all known hash keys
|
||||
KOBO_HASH_KEYS = ["88b3a2e13", "XzUhGYdFp", "NoCanLook", "QJhwzAtXL"]
|
||||
|
||||
|
||||
class ENCRYPTIONError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _load_crypto_libcrypto():
|
||||
from ctypes import (
|
||||
CDLL,
|
||||
POINTER,
|
||||
c_void_p,
|
||||
c_char_p,
|
||||
c_int,
|
||||
c_long,
|
||||
Structure,
|
||||
c_ulong,
|
||||
create_string_buffer,
|
||||
cast,
|
||||
)
|
||||
from ctypes.util import find_library
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
libcrypto = find_library("libeay32")
|
||||
else:
|
||||
libcrypto = find_library("crypto")
|
||||
|
||||
if libcrypto is None:
|
||||
raise ENCRYPTIONError("libcrypto not found")
|
||||
libcrypto = CDLL(libcrypto)
|
||||
|
||||
AES_MAXNR = 14
|
||||
|
||||
c_char_pp = POINTER(c_char_p)
|
||||
c_int_p = POINTER(c_int)
|
||||
|
||||
class AES_KEY(Structure):
|
||||
_fields_ = [("rd_key", c_long * (4 * (AES_MAXNR + 1))), ("rounds", c_int)]
|
||||
|
||||
AES_KEY_p = POINTER(AES_KEY)
|
||||
|
||||
def F(restype, name, argtypes):
|
||||
func = getattr(libcrypto, name)
|
||||
func.restype = restype
|
||||
func.argtypes = argtypes
|
||||
return func
|
||||
|
||||
AES_set_decrypt_key = F(c_int, "AES_set_decrypt_key", [c_char_p, c_int, AES_KEY_p])
|
||||
AES_ecb_encrypt = F(None, "AES_ecb_encrypt", [c_char_p, c_char_p, AES_KEY_p, c_int])
|
||||
|
||||
class AES(object):
|
||||
def __init__(self, userkey):
|
||||
self._blocksize = len(userkey)
|
||||
if self._blocksize not in [16, 24, 32]:
|
||||
raise ENCRYPTIONError(_("AES improper key used"))
|
||||
return
|
||||
key = self._key = AES_KEY()
|
||||
rv = AES_set_decrypt_key(userkey, len(userkey) * 8, key)
|
||||
if rv < 0:
|
||||
raise ENCRYPTIONError(_("Failed to initialize AES key"))
|
||||
|
||||
def decrypt(self, data):
|
||||
clear = b""
|
||||
for i in range(0, len(data), 16):
|
||||
out = create_string_buffer(16)
|
||||
rv = AES_ecb_encrypt(data[i : i + 16], out, self._key, 0)
|
||||
if rv == 0:
|
||||
raise ENCRYPTIONError(_("AES decryption failed"))
|
||||
clear += out.raw
|
||||
return clear
|
||||
|
||||
return AES
|
||||
|
||||
|
||||
def _load_crypto_pycrypto():
|
||||
from Crypto.Cipher import AES as _AES
|
||||
|
||||
class AES(object):
|
||||
def __init__(self, key):
|
||||
self._aes = _AES.new(key, _AES.MODE_ECB)
|
||||
|
||||
def decrypt(self, data):
|
||||
return self._aes.decrypt(data)
|
||||
|
||||
return AES
|
||||
|
||||
|
||||
def _load_crypto():
|
||||
AES = None
|
||||
cryptolist = (_load_crypto_pycrypto, _load_crypto_libcrypto)
|
||||
for loader in cryptolist:
|
||||
try:
|
||||
AES = loader()
|
||||
break
|
||||
except (ImportError, ENCRYPTIONError):
|
||||
pass
|
||||
return AES
|
||||
|
||||
|
||||
AES = _load_crypto()
|
||||
|
||||
|
||||
# Wrap a stream so that output gets flushed immediately
|
||||
# and also make sure that any unicode strings get
|
||||
# encoded using "replace" before writing them.
|
||||
class SafeUnbuffered:
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
self.encoding = stream.encoding
|
||||
if self.encoding is None:
|
||||
self.encoding = "utf-8"
|
||||
|
||||
def write(self, data):
|
||||
if isinstance(data, str):
|
||||
data = data.encode(self.encoding, "replace")
|
||||
self.stream.buffer.write(data)
|
||||
self.stream.buffer.flush()
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.stream, attr)
|
||||
|
||||
|
||||
class KoboLibrary(object):
|
||||
"""The Kobo library.
|
||||
|
||||
This class represents all the information available from the data
|
||||
written by the Kobo Desktop Edition application, including the list
|
||||
of books, their titles, and the user's encryption key(s)."""
|
||||
|
||||
def __init__(self, serials=None, device_path=None, desktopkobodir=""):
|
||||
if serials is None:
|
||||
serials = []
|
||||
print(__about__)
|
||||
self.kobodir = ""
|
||||
kobodb = ""
|
||||
|
||||
# Order of checks
|
||||
# 1. first check if a device_path has been passed in, and whether
|
||||
# we can find the sqlite db in the respective place
|
||||
# 2. if 1., and we got some serials passed in (from saved
|
||||
# settings in calibre), just use it
|
||||
# 3. if 1. worked, but we didn't get serials, try to parse them
|
||||
# from the device, if this didn't work, unset everything
|
||||
# 4. if by now we don't have kobodir set, give up on device and
|
||||
# try to use the Desktop app.
|
||||
|
||||
# step 1. check whether this looks like a real device
|
||||
if device_path:
|
||||
# we got a device path
|
||||
self.kobodir = os.path.join(device_path, ".kobo")
|
||||
# devices use KoboReader.sqlite
|
||||
kobodb = os.path.join(self.kobodir, "KoboReader.sqlite")
|
||||
if not (os.path.isfile(kobodb)):
|
||||
# device path seems to be wrong, unset it
|
||||
device_path = ""
|
||||
self.kobodir = ""
|
||||
kobodb = ""
|
||||
|
||||
# step 3. we found a device but didn't get serials, try to get them
|
||||
#
|
||||
# we got a device path but no saved serial
|
||||
# try to get the serial from the device
|
||||
# get serial from device_path/.adobe-digital-editions/device.xml
|
||||
if self.kobodir and len(serials) == 0 and can_parse_xml:
|
||||
# print "get_device_settings - device_path = {0}".format(device_path)
|
||||
devicexml = os.path.join(
|
||||
device_path, ".adobe-digital-editions", "device.xml"
|
||||
)
|
||||
# print "trying to load {0}".format(devicexml)
|
||||
if os.path.exists(devicexml):
|
||||
# print "trying to parse {0}".format(devicexml)
|
||||
xmltree = ET.parse(devicexml)
|
||||
for node in xmltree.iter():
|
||||
if "deviceSerial" in node.tag:
|
||||
serial = node.text
|
||||
# print "found serial {0}".format(serial)
|
||||
serials.append(serial)
|
||||
break
|
||||
else:
|
||||
# print "cannot get serials from device."
|
||||
device_path = ""
|
||||
self.kobodir = ""
|
||||
kobodb = ""
|
||||
|
||||
if self.kobodir == "":
|
||||
# step 4. we haven't found a device with serials, so try desktop apps
|
||||
if desktopkobodir != "":
|
||||
self.kobodir = desktopkobodir
|
||||
|
||||
if self.kobodir == "":
|
||||
if sys.platform.startswith("win"):
|
||||
import winreg
|
||||
|
||||
if (
|
||||
sys.getwindowsversion().major > 5
|
||||
and "LOCALAPPDATA" in os.environ.keys()
|
||||
):
|
||||
# Python 2.x does not return unicode env. Use Python 3.x
|
||||
self.kobodir = winreg.ExpandEnvironmentStrings("%LOCALAPPDATA%")
|
||||
if self.kobodir == "" and "USERPROFILE" in os.environ.keys():
|
||||
# Python 2.x does not return unicode env. Use Python 3.x
|
||||
self.kobodir = os.path.join(
|
||||
winreg.ExpandEnvironmentStrings("%USERPROFILE%"),
|
||||
"Local Settings",
|
||||
"Application Data",
|
||||
)
|
||||
self.kobodir = os.path.join(
|
||||
self.kobodir, "Kobo", "Kobo Desktop Edition"
|
||||
)
|
||||
elif sys.platform.startswith("darwin"):
|
||||
self.kobodir = os.path.join(
|
||||
os.environ["HOME"],
|
||||
"Library",
|
||||
"Application Support",
|
||||
"Kobo",
|
||||
"Kobo Desktop Edition",
|
||||
)
|
||||
elif sys.platform.startswith("linux"):
|
||||
# sets ~/.config/calibre as the location to store the kobodir location info file and creates this directory if necessary
|
||||
kobodir_cache_dir = os.path.join(
|
||||
os.environ["HOME"], ".config", "calibre"
|
||||
)
|
||||
if not os.path.isdir(kobodir_cache_dir):
|
||||
os.mkdir(kobodir_cache_dir)
|
||||
|
||||
# appends the name of the file we're storing the kobodir location info to the above path
|
||||
kobodir_cache_file = f"{str(kobodir_cache_dir)}/kobo_location"
|
||||
|
||||
"""if the above file does not exist, recursively searches from the root
|
||||
of the filesystem until kobodir is found and stores the location of kobodir
|
||||
in that file so this loop can be skipped in the future"""
|
||||
original_stdout = sys.stdout
|
||||
if not os.path.isfile(kobodir_cache_file):
|
||||
for root, dirs, files in os.walk("/"):
|
||||
for file in files:
|
||||
if file == "Kobo.sqlite":
|
||||
kobo_linux_path = str(root)
|
||||
with open(kobodir_cache_file, "w") as f:
|
||||
sys.stdout = f
|
||||
print(kobo_linux_path, end="")
|
||||
sys.stdout = original_stdout
|
||||
|
||||
f = open(kobodir_cache_file, "r")
|
||||
self.kobodir = f.read()
|
||||
|
||||
# desktop versions use Kobo.sqlite
|
||||
kobodb = os.path.join(self.kobodir, "Kobo.sqlite")
|
||||
# check for existence of file
|
||||
if not (os.path.isfile(kobodb)):
|
||||
# give up here, we haven't found anything useful
|
||||
self.kobodir = ""
|
||||
kobodb = ""
|
||||
|
||||
if self.kobodir != "":
|
||||
self.bookdir = os.path.join(self.kobodir, "kepub")
|
||||
# make a copy of the database in a temporary file
|
||||
# so we can ensure it's not using WAL logging which sqlite3 can't do.
|
||||
self.newdb = tempfile.NamedTemporaryFile(mode="wb", delete=False)
|
||||
print(self.newdb.name)
|
||||
olddb = open(kobodb, "rb")
|
||||
self.newdb.write(olddb.read(18))
|
||||
self.newdb.write(b"\x01\x01")
|
||||
olddb.read(2)
|
||||
self.newdb.write(olddb.read())
|
||||
olddb.close()
|
||||
self.newdb.close()
|
||||
self.__sqlite = sqlite3.connect(self.newdb.name)
|
||||
self.__cursor = self.__sqlite.cursor()
|
||||
self._userkeys = []
|
||||
self._books = []
|
||||
self._volumeID = []
|
||||
self._serials = serials
|
||||
|
||||
def close(self):
|
||||
"""Closes the database used by the library."""
|
||||
self.__cursor.close()
|
||||
self.__sqlite.close()
|
||||
# delete the temporary copy of the database
|
||||
os.remove(self.newdb.name)
|
||||
|
||||
@property
|
||||
def userkeys(self):
|
||||
"""The list of potential userkeys being used by this library.
|
||||
Only one of these will be valid.
|
||||
"""
|
||||
if len(self._userkeys) != 0:
|
||||
return self._userkeys
|
||||
for macaddr in self.__getmacaddrs():
|
||||
self._userkeys.extend(self.__getuserkeys(macaddr))
|
||||
return self._userkeys
|
||||
|
||||
@property
|
||||
def books(self):
|
||||
"""The list of KoboBook objects in the library."""
|
||||
if len(self._books) != 0:
|
||||
return self._books
|
||||
"""Drm-ed kepub"""
|
||||
for row in self.__cursor.execute(
|
||||
"SELECT DISTINCT volumeid, Title, Attribution, Series FROM content_keys, content WHERE contentid = volumeid"
|
||||
):
|
||||
self._books.append(
|
||||
KoboBook(
|
||||
row[0],
|
||||
row[1],
|
||||
self.__bookfile(row[0]),
|
||||
"kepub",
|
||||
self.__cursor,
|
||||
author=row[2],
|
||||
series=row[3],
|
||||
)
|
||||
)
|
||||
self._volumeID.append(row[0])
|
||||
"""Drm-free"""
|
||||
for f in os.listdir(self.bookdir):
|
||||
if f not in self._volumeID:
|
||||
row = self.__cursor.execute(
|
||||
"SELECT Title, Attribution, Series FROM content WHERE ContentID = '"
|
||||
+ f
|
||||
+ "'"
|
||||
).fetchone()
|
||||
if row is not None:
|
||||
fTitle = row[0]
|
||||
self._books.append(
|
||||
KoboBook(
|
||||
f,
|
||||
fTitle,
|
||||
self.__bookfile(f),
|
||||
"drm-free",
|
||||
self.__cursor,
|
||||
author=row[1],
|
||||
series=row[2],
|
||||
)
|
||||
)
|
||||
self._volumeID.append(f)
|
||||
"""Sort"""
|
||||
self._books.sort(key=lambda x: x.title)
|
||||
return self._books
|
||||
|
||||
def __bookfile(self, volumeid):
|
||||
"""The filename needed to open a given book."""
|
||||
return os.path.join(self.kobodir, "kepub", volumeid)
|
||||
|
||||
def __getmacaddrs(self):
|
||||
"""The list of all MAC addresses on this machine."""
|
||||
macaddrs = []
|
||||
if sys.platform.startswith("win"):
|
||||
c = re.compile(
|
||||
"\s?(" + "[0-9a-f]{2}[:\-]" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
|
||||
)
|
||||
output = subprocess.Popen(
|
||||
"wmic nic where PhysicalAdapter=True get MACAddress",
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
text=True,
|
||||
).stdout
|
||||
for line in output:
|
||||
if m := c.search(line):
|
||||
macaddrs.append(re.sub("-", ":", m[1]).upper())
|
||||
elif sys.platform.startswith("darwin"):
|
||||
c = re.compile(
|
||||
"\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
|
||||
)
|
||||
output = subprocess.check_output(
|
||||
"/sbin/ifconfig -a", shell=True, encoding="utf-8"
|
||||
)
|
||||
matches = c.findall(output)
|
||||
macaddrs.extend(m[0].upper() for m in matches)
|
||||
else:
|
||||
# probably linux
|
||||
|
||||
# let's try ip
|
||||
c = re.compile(
|
||||
"\s(" + "[0-9a-f]{2}:" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
|
||||
)
|
||||
for line in os.popen("ip -br link"):
|
||||
if m := c.search(line):
|
||||
macaddrs.append(m[1].upper())
|
||||
|
||||
# let's try ipconfig under wine
|
||||
c = re.compile(
|
||||
"\s(" + "[0-9a-f]{2}-" * 5 + "[0-9a-f]{2})(\s|$)", re.IGNORECASE
|
||||
)
|
||||
for line in os.popen("ipconfig /all"):
|
||||
if m := c.search(line):
|
||||
macaddrs.append(re.sub("-", ":", m[1]).upper())
|
||||
|
||||
# extend the list of macaddrs in any case with the serials
|
||||
# cannot hurt ;-)
|
||||
macaddrs.extend(self._serials)
|
||||
|
||||
return macaddrs
|
||||
|
||||
def __getuserids(self):
|
||||
userids = []
|
||||
cursor = self.__cursor.execute("SELECT UserID FROM user")
|
||||
row = cursor.fetchone()
|
||||
while row is not None:
|
||||
try:
|
||||
userid = row[0]
|
||||
userids.append(userid)
|
||||
except Exception:
|
||||
pass
|
||||
row = cursor.fetchone()
|
||||
return userids
|
||||
|
||||
def __getuserkeys(self, macaddr):
|
||||
userids = self.__getuserids()
|
||||
userkeys = []
|
||||
for hash in KOBO_HASH_KEYS:
|
||||
deviceid = hashlib.sha256((hash + macaddr).encode("ascii")).hexdigest()
|
||||
for userid in userids:
|
||||
userkey = hashlib.sha256(
|
||||
(deviceid + userid).encode("ascii")
|
||||
).hexdigest()
|
||||
userkeys.append(binascii.a2b_hex(userkey[32:]))
|
||||
return userkeys
|
||||
|
||||
|
||||
class KoboBook(object):
|
||||
"""A Kobo book.
|
||||
|
||||
A Kobo book contains a number of unencrypted and encrypted files.
|
||||
This class provides a list of the encrypted files.
|
||||
|
||||
Each book has the following instance variables:
|
||||
volumeid - a UUID which uniquely refers to the book in this library.
|
||||
title - the human-readable book title.
|
||||
filename - the complete path and filename of the book.
|
||||
type - either kepub or drm-free"""
|
||||
|
||||
def __init__(
|
||||
self, volumeid, title, filename, type, cursor, author=None, series=None
|
||||
):
|
||||
self.volumeid = volumeid
|
||||
self.title = title
|
||||
self.author = author
|
||||
self.series = series
|
||||
self.series_index = None
|
||||
self.filename = filename
|
||||
self.type = type
|
||||
self.__cursor = cursor
|
||||
self._encryptedfiles = {}
|
||||
|
||||
@property
|
||||
def encryptedfiles(self):
|
||||
"""A dictionary of KoboFiles inside the book.
|
||||
|
||||
The dictionary keys are the relative pathnames, which are
|
||||
the same as the pathnames inside the book 'zip' file."""
|
||||
if self.type == "drm-free":
|
||||
return self._encryptedfiles
|
||||
if len(self._encryptedfiles) != 0:
|
||||
return self._encryptedfiles
|
||||
# Read the list of encrypted files from the DB
|
||||
for row in self.__cursor.execute(
|
||||
"SELECT elementid,elementkey FROM content_keys,content WHERE volumeid = ? AND volumeid = contentid",
|
||||
(self.volumeid,),
|
||||
):
|
||||
self._encryptedfiles[row[0]] = KoboFile(
|
||||
row[0], None, base64.b64decode(row[1])
|
||||
)
|
||||
|
||||
# Read the list of files from the kepub OPF manifest so that
|
||||
# we can get their proper MIME type.
|
||||
# NOTE: this requires that the OPF file is unencrypted!
|
||||
zin = zipfile.ZipFile(self.filename, "r")
|
||||
xmlns = {
|
||||
"ocf": "urn:oasis:names:tc:opendocument:xmlns:container",
|
||||
"opf": "http://www.idpf.org/2007/opf",
|
||||
}
|
||||
ocf = ET.fromstring(zin.read("META-INF/container.xml"))
|
||||
opffile = ocf.find(".//ocf:rootfile", xmlns).attrib["full-path"]
|
||||
basedir = re.sub("[^/]+$", "", opffile)
|
||||
opf = ET.fromstring(zin.read(opffile))
|
||||
zin.close()
|
||||
|
||||
c = re.compile("/")
|
||||
for item in opf.findall(".//opf:item", xmlns):
|
||||
# Convert relative URIs
|
||||
href = item.attrib["href"]
|
||||
if not c.match(href):
|
||||
href = "".join((basedir, href))
|
||||
|
||||
# Update books we've found from the DB.
|
||||
if href in self._encryptedfiles:
|
||||
mimetype = item.attrib["media-type"]
|
||||
self._encryptedfiles[href].mimetype = mimetype
|
||||
return self._encryptedfiles
|
||||
|
||||
@property
|
||||
def has_drm(self):
|
||||
return self.type != "drm-free"
|
||||
|
||||
|
||||
class KoboFile(object):
|
||||
"""An encrypted file in a KoboBook.
|
||||
|
||||
Each file has the following instance variables:
|
||||
filename - the relative pathname inside the book zip file.
|
||||
mimetype - the file's MIME type, e.g. 'image/jpeg'
|
||||
key - the encrypted page key."""
|
||||
|
||||
def __init__(self, filename, mimetype, key):
|
||||
self.filename = filename
|
||||
self.mimetype = mimetype
|
||||
self.key = key
|
||||
|
||||
def decrypt(self, userkey, contents):
|
||||
"""
|
||||
Decrypt the contents using the provided user key and the
|
||||
file page key. The caller must determine if the decrypted
|
||||
data is correct."""
|
||||
# The userkey decrypts the page key (self.key)
|
||||
keyenc = AES(userkey)
|
||||
decryptedkey = keyenc.decrypt(self.key)
|
||||
# The decrypted page key decrypts the content
|
||||
pageenc = AES(decryptedkey)
|
||||
return self.__removeaespadding(pageenc.decrypt(contents))
|
||||
|
||||
def check(self, contents):
|
||||
"""
|
||||
If the contents uses some known MIME types, check if it
|
||||
conforms to the type. Throw a ValueError exception if not.
|
||||
If the contents uses an uncheckable MIME type, don't check
|
||||
it and don't throw an exception.
|
||||
Returns True if the content was checked, False if it was not
|
||||
checked."""
|
||||
if self.mimetype == "application/xhtml+xml":
|
||||
# assume utf-8 with no BOM
|
||||
textoffset = 0
|
||||
stride = 1
|
||||
print("Checking text:{0}:".format(contents[:10]))
|
||||
# check for byte order mark
|
||||
if contents[:3] == b"\xef\xbb\xbf":
|
||||
# seems to be utf-8 with BOM
|
||||
print("Could be utf-8 with BOM")
|
||||
textoffset = 3
|
||||
elif contents[:2] == b"\xfe\xff":
|
||||
# seems to be utf-16BE
|
||||
print("Could be utf-16BE")
|
||||
textoffset = 3
|
||||
stride = 2
|
||||
elif contents[:2] == b"\xff\xfe":
|
||||
# seems to be utf-16LE
|
||||
print("Could be utf-16LE")
|
||||
textoffset = 2
|
||||
stride = 2
|
||||
else:
|
||||
print("Perhaps utf-8 without BOM")
|
||||
|
||||
# now check that the first few characters are in the ASCII range
|
||||
for i in range(textoffset, textoffset + 5 * stride, stride):
|
||||
if contents[i] < 32 or contents[i] > 127:
|
||||
# Non-ascii, so decryption probably failed
|
||||
print("Bad character at {0}, value {1}".format(i, contents[i]))
|
||||
raise ValueError
|
||||
print("Seems to be good text")
|
||||
return True
|
||||
if contents[:5] == b"<?xml" or contents[:8] == b"\xef\xbb\xbf<?xml":
|
||||
# utf-8
|
||||
return True
|
||||
elif contents[:14] == b"\xfe\xff\x00<\x00?\x00x\x00m\x00l":
|
||||
# utf-16BE
|
||||
return True
|
||||
elif contents[:14] == b"\xff\xfe<\x00?\x00x\x00m\x00l\x00":
|
||||
# utf-16LE
|
||||
return True
|
||||
elif (
|
||||
contents[:9] == b"<!DOCTYPE"
|
||||
or contents[:12] == b"\xef\xbb\xbf<!DOCTYPE"
|
||||
):
|
||||
# utf-8 of weird <!DOCTYPE start
|
||||
return True
|
||||
elif (
|
||||
contents[:22]
|
||||
== b"\xfe\xff\x00<\x00!\x00D\x00O\x00C\x00T\x00Y\x00P\x00E"
|
||||
):
|
||||
# utf-16BE of weird <!DOCTYPE start
|
||||
return True
|
||||
elif (
|
||||
contents[:22]
|
||||
== b"\xff\xfe<\x00!\x00D\x00O\x00C\x00T\x00Y\x00P\x00E\x00"
|
||||
):
|
||||
# utf-16LE of weird <!DOCTYPE start
|
||||
return True
|
||||
else:
|
||||
print("Bad XML: {0}".format(contents[:8]))
|
||||
raise ValueError
|
||||
if self.mimetype == "image/jpeg":
|
||||
if contents[:3] == b"\xff\xd8\xff":
|
||||
return True
|
||||
print("Bad JPEG: {0}".format(contents[:3].hex()))
|
||||
raise ValueError()
|
||||
return False
|
||||
|
||||
def __removeaespadding(self, contents):
|
||||
"""
|
||||
Remove the trailing padding, using what appears to be the CMS
|
||||
algorithm from RFC 5652 6.3"""
|
||||
lastchar = binascii.b2a_hex(contents[-1:])
|
||||
strlen = int(lastchar, 16)
|
||||
padding = strlen
|
||||
if strlen == 1:
|
||||
return contents[:-1]
|
||||
if strlen < 16:
|
||||
for _ in range(strlen):
|
||||
testchar = binascii.b2a_hex(contents[-strlen : -(strlen - 1)])
|
||||
if testchar != lastchar:
|
||||
padding = 0
|
||||
if padding > 0:
|
||||
contents = contents[:-padding]
|
||||
return contents
|
||||
|
||||
|
||||
def decrypt_book(book, lib):
|
||||
print("Converting {0}".format(book.title))
|
||||
zin = zipfile.ZipFile(book.filename, "r")
|
||||
# make filename out of Unicode alphanumeric and whitespace equivalents from title
|
||||
outname = "{0}.epub".format(re.sub("[^\s\w]", "_", book.title, 0, re.UNICODE))
|
||||
if book.type == "drm-free":
|
||||
print("DRM-free book, conversion is not needed")
|
||||
shutil.copyfile(book.filename, outname)
|
||||
print("Book saved as {0}".format(os.path.join(os.getcwd(), outname)))
|
||||
return os.path.join(os.getcwd(), outname)
|
||||
result = 1
|
||||
for userkey in lib.userkeys:
|
||||
print("Trying key: {0}".format(userkey.hex()))
|
||||
try:
|
||||
zout = zipfile.ZipFile(outname, "w", zipfile.ZIP_DEFLATED)
|
||||
for filename in zin.namelist():
|
||||
contents = zin.read(filename)
|
||||
if filename in book.encryptedfiles:
|
||||
file = book.encryptedfiles[filename]
|
||||
contents = file.decrypt(userkey, contents)
|
||||
# Parse failures mean the key is probably wrong.
|
||||
file.check(contents)
|
||||
zout.writestr(filename, contents)
|
||||
zout.close()
|
||||
print("Decryption succeeded.")
|
||||
print("Book saved as {0}".format(os.path.join(os.getcwd(), outname)))
|
||||
result = 0
|
||||
break
|
||||
except ValueError:
|
||||
print("Decryption failed.")
|
||||
zout.close()
|
||||
os.remove(outname)
|
||||
zin.close()
|
||||
return os.path.join(os.getcwd(), outname)
|
||||
|
||||
|
||||
def cli_main(devicedir):
|
||||
description = __about__
|
||||
serials = []
|
||||
|
||||
lib = KoboLibrary(serials, devicedir)
|
||||
|
||||
for i, book in enumerate(lib.books):
|
||||
print("{0}: {1}".format(i + 1, book.title))
|
||||
|
||||
choice = input("Convert book number... ")
|
||||
try:
|
||||
num = int(choice)
|
||||
books = [lib.books[num - 1]]
|
||||
except (ValueError, IndexError):
|
||||
print("Invalid choice. Exiting...")
|
||||
exit()
|
||||
|
||||
results = [decrypt_book(book, lib) for book in books]
|
||||
lib.close()
|
||||
return results[0]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.stdout = SafeUnbuffered(sys.stdout)
|
||||
sys.stderr = SafeUnbuffered(sys.stderr)
|
||||
sys.exit(cli_main())
|
Loading…
x
Reference in New Issue
Block a user