This commit is contained in:
^_^ 2026-01-16 11:09:46 +01:00 committed by GitHub
commit 550a2b325d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1329 additions and 241 deletions

136
README.md
View File

@ -50,22 +50,150 @@ grawlix --username "user@example.com" --password "SuperSecretPassword" <url>
**Config file example**
```toml
[sources.name]
# Global settings
write_metadata_to_epub = true
output = "~/ebooks/{series}/{index} - {title}.{ext}"
[sources.storytel]
username = "user@example.com"
password = "SuperSecretPassword"
```
Config file should be placed in `~/.config/grawlix/grawlix.toml`
Config file should be placed in:
- Linux: `~/.config/grawlix/grawlix.toml`
- macOS: `~/Library/Application Support/grawlix/grawlix.toml`
- Windows: `%LOCALAPPDATA%\jo1gi\grawlix\grawlix.toml`
### Cookies
Some sources can be authenticated with Netscape cookie files. I use
[this extension](https://github,com/rotemdan/ExportCookies) to export my
[this extension](https://github.com/rotemdan/ExportCookies) to export my
cookies from my browser.
Cookies can be placed in current dir as `cookies.txt` or be given with the
`--cookie` argument.
`--cookies` argument.
## Configuration
### Global Settings
The following settings can be added to your config file (before any `[sources.*]` sections):
| Setting | Type | Description | Example |
|---------|------|-------------|---------|
| `write_metadata_to_epub` | boolean | Automatically write metadata to EPUB files (supports Storytel and Nextory) | `true` or `false` |
| `output` | string | Default output path template (supports `~`, environment variables, and template variables) | `"~/ebooks/{title}.{ext}"` |
### Output Templates
The `output` setting supports template variables that are replaced with book metadata:
| Variable | Description | Example |
|----------|-------------|---------|
| `{title}` | Book title | "The Witcher" |
| `{series}` | Series name | "The Witcher Saga" |
| `{index}` | Series index/number | "1" |
| `{authors}` | Authors (semicolon-separated) | "Andrzej Sapkowski" |
| `{publisher}` | Publisher name | "Orbit" |
| `{language}` | Language code | "en" |
| `{release_date}` | Release date | "2020-01-15" |
| `{source}` | Source/service name | "Storytel", "Marvel", etc. |
| `{ext}` | File extension (auto-detected from source) | "epub" |
**Example templates:**
```toml
# Simple (auto-detect format)
output = "~/books/{title}.{ext}"
# Force EPUB format
output = "~/books/{title}.epub"
# Organized by source
output = "~/books/{source}/{title}.{ext}"
# Organized by series (auto-detect format)
output = "~/books/{series}/{index} - {title}.{ext}"
# Force EPUB with series organization
output = "~/books/{series}/{index} - {title}.epub"
# Organized by source and series
output = "~/books/{source}/{series}/{index} - {title}.epub"
```
**Note:** The file extension in your template determines the output format:
- Use `.epub` to force EPUB output
- Use `.cbz` to force CBZ (comic book) output
- Use `{ext}` to auto-detect the best format for each source
**Path expansion:**
- `~` expands to home directory
- Environment variables work: `$HOME` (Unix) or `%USERPROFILE%` (Windows)
- Absolute paths: `/path/to/books` or `C:\Books`
- Relative paths: `downloads/{title}.{ext}` (relative to current directory)
## Download books
To download a book run:
```shell
grawlix [options] <book url>
```
### Command Line Options
| Option | Short | Description |
|--------|-------|-------------|
| `--version` | `-v` | Show version number |
| `--file <path>` | `-f` | File with URLs (one per line) |
| `--username <email>` | `-u` | Username for authentication |
| `--password <password>` | `-p` | Password for authentication |
| `--library <name>` | | Library name (for sources that require it) |
| `--cookies <path>` | `-c` | Path to Netscape cookie file |
| `--output <template>` | `-o` | Output path template (overrides config) |
| `--write-metadata-to-epub` | | Write metadata to EPUB files (overrides config) |
| `--debug` | | Enable debug messages |
**Examples:**
```shell
# Download to specific location
grawlix -o "~/downloads/{title}.{ext}" <url>
# Download with metadata writing
grawlix --write-metadata-to-epub <url>
# Batch download from file
grawlix -f urls.txt
# With authentication
grawlix -u user@example.com -p password <url>
# Debug mode
grawlix --debug <url>
```
## Metadata Writing
For supported sources (Storytel and Nextory), grawlix can write rich metadata to EPUB files including:
- Title and original title
- Authors and translators
- Series information (Calibre-compatible)
- Publisher, ISBN, language
- Description and categories/tags
- Release date
### Supported Sources
| Source | Title | Authors | Translators | Series | Publisher | ISBN | Language | Description | Release Date |
|--------|-------|---------|-------------|--------|-----------|------|----------|-------------|--------------|
| Storytel | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| Nextory | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Enable globally in config:
```toml
write_metadata_to_epub = true
```
Or use the CLI flag for one-time use:
```shell
grawlix --write-metadata-to-epub <url>
```

View File

@ -1,4 +1,4 @@
from .encryption import Encryption, AESEncryption
from .book import Book, SingleFile, OnlineFile, Metadata
__version__ = "0.2.1"
__version__ = "0.2.2"

View File

@ -12,6 +12,10 @@ from functools import partial
import os
import asyncio
import traceback
import warnings
# Suppress deprecation warnings from dependencies
warnings.filterwarnings("ignore", category=UserWarning, module="google.protobuf")
def get_or_ask(attr: str, hidden: bool, source_config: Optional[SourceConfig], options) -> str:
@ -107,10 +111,13 @@ async def main() -> None:
result = await source.download(url)
if isinstance(result, Book):
with logging.progress(result.metadata.title, source.name) as progress:
template: str = args.output or "{title}.{ext}"
await download_with_progress(result, progress, template)
# Check CLI flag first, then config file, then default
template: str = args.output or config.output or "{title}.{ext}"
# Check both CLI flag and config file
write_metadata = args.write_metadata_to_epub or config.write_metadata_to_epub
await download_with_progress(result, progress, template, write_metadata)
elif isinstance(result, Series):
await download_series(source, result, args)
await download_series(source, result, args, config)
logging.info("")
except GrawlixError as error:
error.print_error()
@ -119,34 +126,77 @@ async def main() -> None:
exit(1)
async def download_series(source: Source, series: Series, args) -> None:
async def download_series(source: Source, series: Series, args, config: Config) -> None:
"""
Download books in series
:param series: Series to download
:param args: CLI arguments
:param config: Configuration
"""
template = args.output or "{series}/{title}.{ext}"
# Check CLI flag first, then config file, then default
template = args.output or config.output or "{series}/{title}.{ext}"
# Check both CLI flag and config file
write_metadata = args.write_metadata_to_epub or config.write_metadata_to_epub
with logging.progress(series.title, source.name, len(series.book_ids)) as progress:
for book_id in series.book_ids:
try:
book: Book = await source.download_book_from_id(book_id)
await download_with_progress(book, progress, template)
await download_with_progress(book, progress, template, write_metadata)
except AccessDenied as error:
logging.info("Skipping - Access Denied")
async def download_with_progress(book: Book, progress: Progress, template: str):
async def download_with_progress(book: Book, progress: Progress, template: str, write_metadata: bool = False):
"""
Download book with progress bar in cli
:param book: Book to download
:param progress: Progress object
:param template: Output template
:param write_metadata: Whether to write metadata to EPUB files
"""
task = logging.add_book(progress, book)
update_function = partial(progress.advance, task)
# Download the book
await download_book(book, update_function, template)
# Convert PDF-in-epub to PDF if needed (Nextory wraps PDFs in epub containers)
if book.metadata.source == "Nextory":
from .output import format_output_location, get_default_format
from .output.pdf_converter import convert_pdf_epub_to_pdf, is_pdf_in_epub
output_format = get_default_format(book)
location = format_output_location(book, output_format, template)
if location.endswith('.epub') and os.path.exists(location) and is_pdf_in_epub(location):
convert_pdf_epub_to_pdf(location)
logging.debug(f"Converted PDF-in-epub to PDF: {location}")
# Write metadata if requested
if write_metadata:
from .output import format_output_location, get_default_format, find_output_format, get_valid_extensions
from .output.metadata import epub_metadata
# Determine output file location
_, ext = os.path.splitext(template)
ext = ext[1:]
# Handle {ext} placeholder - use default format for the book type
if ext and ext not in ['{ext}', 'ext'] and ext in get_valid_extensions():
output_format = find_output_format(book, ext)()
else:
output_format = get_default_format(book)
location = format_output_location(book, output_format, template)
logging.debug(f"Output location: {location}, exists={os.path.exists(location)}, ends_with_epub={location.endswith('.epub')}")
# Write metadata if it's an EPUB file
if location.endswith('.epub') and os.path.exists(location):
epub_metadata.write_metadata_to_epub(book.metadata, location)
progress.advance(task, 1)

View File

@ -59,4 +59,11 @@ def parse_arguments() -> argparse.Namespace:
dest = "debug",
action="store_true",
)
# Metadata
parser.add_argument(
'--write-metadata-to-epub',
help = "Write metadata to EPUB files when downloading",
dest = "write_metadata_to_epub",
action="store_true",
)
return parser.parse_args()

View File

@ -0,0 +1,5 @@
[red]ERROR: Unsupported output format[/red]
The requested output format is not compatible with the downloaded content type.
Please check your output template or try using {{ext}} to auto-detect the correct format.
If this error persists, please create an issue at {issue}

View File

@ -12,21 +12,35 @@ class Metadata:
authors: list[str] = field(default_factory=list)
language: Optional[str] = None
publisher: Optional[str] = None
identifier: Optional[str] = None
isbn: Optional[str] = None
description: Optional[str] = None
release_date: Optional[date] = None
source: Optional[str] = None
original_title: Optional[str] = None
translators: list[str] = field(default_factory=list)
category: Optional[str] = None
tags: list[str] = field(default_factory=list)
# EPUB 3 rendition properties (fixed-layout support)
rendition_layout: Optional[str] = None # "pre-paginated" or "reflowable"
rendition_spread: Optional[str] = None # "none", "auto", "landscape", "portrait", "both"
rendition_orientation: Optional[str] = None # "auto", "landscape", "portrait"
def as_dict(self) -> dict:
return {
"title": self.title,
"series": self.series or "UNKNOWN",
"index": self.index or "UNKNOWN",
"index": str(self.index) if self.index is not None else "UNKNOWN",
"publisher": self.publisher or "UNKNOWN",
"identifier": self.identifier or "UNKNOWN",
"isbn": self.isbn or "UNKNOWN",
"language": self.language or "UNKNOWN",
"authors": "; ".join(self.authors),
"description": self.description or "UNKNOWN",
"release_date": self.release_date.isoformat() if self.release_date else "UNKNOWN",
"source": self.source or "UNKNOWN",
"original_title": self.original_title or "UNKNOWN",
"translators": "; ".join(self.translators),
"category": self.category or "UNKNOWN",
"tags": "; ".join(self.tags),
}

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Optional
import tomli
import appdirs
from platformdirs import user_config_dir
import os
@ -16,6 +16,8 @@ class SourceConfig:
class Config:
"""Grawlix configuration"""
sources: dict[str, SourceConfig]
write_metadata_to_epub: bool = False
output: Optional[str] = None
def load_config() -> Config:
@ -24,11 +26,20 @@ def load_config() -> Config:
:returns: Config object
"""
config_dir = appdirs.user_config_dir("grawlix", "jo1gi")
config_dir = user_config_dir("grawlix", "jo1gi")
config_file = os.path.join(config_dir, "grawlix.toml")
if os.path.exists(config_file):
with open(config_file, "rb") as f:
config_dict = tomli.load(f)
try:
with open(config_file, "rb") as f:
config_dict = tomli.load(f)
except tomli.TOMLDecodeError as e:
print(f"Error parsing config file: {config_file}")
print(f" {e}")
print("\nPlease check your TOML syntax. Common issues:")
print(" - Strings must be quoted: output = \"{title}.{ext}\" not output = {title}.{ext}")
print(" - Booleans are lowercase: write_metadata_to_epub = true (not True)")
print(" - Use double quotes for strings containing special characters")
raise
else:
config_dict = {}
sources = {}
@ -38,4 +49,9 @@ def load_config() -> Config:
username = values.get("username"),
password = values.get("password"),
)
return Config(sources)
# Load general settings
write_metadata_to_epub = config_dict.get("write_metadata_to_epub", False)
output = config_dict.get("output")
return Config(sources, write_metadata_to_epub, output)

View File

@ -6,6 +6,7 @@ from .output_format import OutputFormat
from .acsm import Acsm
from .cbz import Cbz
from .epub import Epub
from .pdf import Pdf
from typing import Callable, Iterable
from pathlib import Path
@ -41,11 +42,21 @@ def format_output_location(book: Book, output_format: OutputFormat, template: st
:param book: Book to download
:param output_format: Output format of book
:param template: Template for output path
:param template: Template for output path (supports ~, environment variables, and absolute paths)
:returns: Output path
"""
values = { key: remove_unwanted_chars(value) for key, value in book.metadata.as_dict().items() }
path = template.format(**values, ext = output_format.extension)
# Expand user home directory (~/... or ~user/...)
path = os.path.expanduser(path)
# Expand environment variables ($VAR or %VAR% depending on OS)
path = os.path.expandvars(path)
# Normalize path separators for current OS
path = os.path.normpath(path)
return path
@ -64,15 +75,65 @@ def remove_strings(input: str, strings: Iterable[str]) -> str:
def remove_unwanted_chars(input: str) -> str:
"""
Remove chars from string that are not supported in output path
Sanitize string for use in file paths across all operating systems.
Replaces forbidden characters with safe alternatives and handles edge cases.
:param input: The string to remove chars from
:returns: input without unsupported chars
:param input: The string to sanitize
:returns: Safe filename string
"""
import re
# Replace null bytes and control characters
output = re.sub(r'[\x00-\x1f\x7f]', '', input)
# Platform-specific forbidden characters - replace with underscore
if platform.system() == "Windows":
return remove_strings(input, "<>:\"/\\|?*")
# Windows forbidden: < > : " / \ | ? *
forbidden_chars = '<>:"|?*'
for char in forbidden_chars:
output = output.replace(char, '_')
# Replace slashes with dash for better readability
output = output.replace('/', '-')
output = output.replace('\\', '-')
# Windows reserved names (case-insensitive)
reserved_names = {
'CON', 'PRN', 'AUX', 'NUL',
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
}
# Check if the name (without extension) is reserved
name_part = output.split('.')[0].upper()
if name_part in reserved_names:
output = f"_{output}"
# Remove trailing spaces and periods (Windows doesn't allow these)
output = output.rstrip('. ')
else:
return remove_strings(input, "/")
# Unix-like systems (macOS, Linux)
# Only / is truly forbidden, but : can cause issues on macOS
output = output.replace('/', '-')
# Some versions of macOS have issues with :
output = output.replace(':', '-')
# Remove leading/trailing whitespace
output = output.strip()
# Limit filename length (most filesystems have 255 byte limit)
# Reserve some space for extensions and numbering
max_length = 200
if len(output.encode('utf-8')) > max_length:
# Truncate while respecting UTF-8 character boundaries
output_bytes = output.encode('utf-8')[:max_length]
# Decode, ignoring partial characters at the end
output = output_bytes.decode('utf-8', errors='ignore').rstrip()
# Ensure we don't return an empty string
if not output:
output = "untitled"
return output
def get_default_format(book: Book) -> OutputFormat:
@ -124,4 +185,5 @@ def get_output_formats() -> list[type[OutputFormat]]:
Acsm,
Cbz,
Epub,
Pdf,
]

View File

@ -3,12 +3,162 @@ from grawlix.exceptions import UnsupportedOutputFormat
from .output_format import OutputFormat, Update
import asyncio
from bs4 import BeautifulSoup
import os
from ebooklib import epub
import re
import xml.etree.ElementTree as ET
from zipfile import ZipFile
from bs4 import BeautifulSoup
from ebooklib import epub
import rich
def _fix_fixed_layout_page(html_content: bytes, css_content: bytes = None) -> bytes:
"""
Fix fixed-layout XHTML pages by adding viewport and fixing broken styles.
Extracts dimensions from CSS and applies them to viewport and inline styles.
"""
try:
html_str = html_content.decode('utf-8')
except UnicodeDecodeError:
return html_content
# Extract dimensions from CSS if provided
width = None
height = None
if css_content:
try:
css_str = css_content.decode('utf-8')
# Look for body width/height
width_match = re.search(r'body\s*\{[^}]*width:\s*(\d+)px', css_str)
height_match = re.search(r'body\s*\{[^}]*height:\s*(\d+)px', css_str)
if width_match:
width = width_match.group(1)
if height_match:
height = height_match.group(1)
except UnicodeDecodeError:
pass
if not width or not height:
return html_content
# Add viewport meta tag if missing
if 'name="viewport"' not in html_str and '<head>' in html_str:
viewport_tag = f'<meta name="viewport" content="width={width}, height={height}"/>'
html_str = html_str.replace('<head>', f'<head>\n {viewport_tag}', 1)
# Fix broken inline styles (width:px; height:px;)
html_str = re.sub(
r'style="width:px;\s*height:px;"',
f'style="width:{width}px; height:{height}px;"',
html_str
)
return html_str.encode('utf-8')
def _get_css_rule_key(rule_text: str) -> str | None:
"""Get unique key for a CSS rule. For @font-face, include font-family."""
selector = rule_text.split('{')[0].strip()
if selector == '@font-face':
# Extract font-family to distinguish different font-faces
match = re.search(r'font-family:\s*["\']?([^"\';}]+)', rule_text)
if match:
return f'@font-face:{match.group(1).strip()}'
return None # Skip font-face without font-family
return selector if selector else None
def _extract_opf_metadata(opf_content: bytes) -> dict:
"""
Extract rendition properties, cover info, and spine properties from OPF content.
Returns dict with keys: rendition_layout, rendition_spread,
rendition_orientation, cover_id, cover_href, spine_properties
"""
result = {
'rendition_layout': None,
'rendition_spread': None,
'rendition_orientation': None,
'cover_id': None,
'cover_href': None,
'spine_properties': {}, # Maps href -> properties (e.g., 'page-spread-left')
}
try:
root = ET.fromstring(opf_content)
ns = {
'opf': 'http://www.idpf.org/2007/opf',
'dc': 'http://purl.org/dc/elements/1.1/',
}
# Find metadata element
metadata = root.find('opf:metadata', ns)
if metadata is None:
metadata = root.find('{http://www.idpf.org/2007/opf}metadata')
if metadata is None:
return result
# Extract rendition properties from <meta property="rendition:X">
for meta in metadata.iter():
if meta.tag.endswith('}meta') or meta.tag == 'meta':
prop = meta.get('property', '')
if prop == 'rendition:layout':
result['rendition_layout'] = meta.text
elif prop == 'rendition:spread':
result['rendition_spread'] = meta.text
elif prop == 'rendition:orientation':
result['rendition_orientation'] = meta.text
# Cover reference: <meta name="cover" content="image-id"/>
name = meta.get('name', '')
if name == 'cover':
result['cover_id'] = meta.get('content')
# Parse manifest once for cover info and id->href mapping
manifest = root.find('opf:manifest', ns)
if manifest is None:
manifest = root.find('{http://www.idpf.org/2007/opf}manifest')
id_to_href = {}
if manifest is not None:
for item in manifest.iter():
item_id = item.get('id')
item_href = item.get('href')
if item_id and item_href:
id_to_href[item_id] = item_href
# Check for cover by ID match
if result['cover_id'] and item_id == result['cover_id'] and not result['cover_href']:
result['cover_href'] = item_href
# Check for cover-image property
props = item.get('properties', '')
if 'cover-image' in props and not result['cover_href']:
result['cover_href'] = item_href
result['cover_id'] = item_id
# Extract spine properties (page-spread-left, page-spread-right)
spine = root.find('opf:spine', ns)
if spine is None:
spine = root.find('{http://www.idpf.org/2007/opf}spine')
if spine is not None:
# Extract spine itemref properties
for itemref in spine.iter():
if itemref.tag.endswith('}itemref') or itemref.tag == 'itemref':
idref = itemref.get('idref')
props = itemref.get('properties')
if idref and props and idref in id_to_href:
href = id_to_href[idref]
result['spine_properties'][href] = props
except ET.ParseError:
pass
return result
class Epub(OutputFormat):
extension = "epub"
input_types = [SingleFile, HtmlFiles, EpubInParts]
@ -88,22 +238,102 @@ class Epub(OutputFormat):
progress = 1/(file_count)
temporary_file_location = f"{location}.tmp"
added_files: set[str] = set()
def get_new_files(zipfile: ZipFile):
"""Returns files in zipfile not already added to file"""
for filename in zipfile.namelist():
if filename in added_files or filename.endswith(".opf") or filename.endswith(".ncx"):
continue
yield filename
added_files: dict[str, int] = {} # Track filepath -> content size
opf_metadata: dict = {}
css_cache: dict[str, bytes] = {} # Store CSS content for fixing HTML pages
cover_href: str = None # Store cover image path from OPF
spine_properties: dict[str, str] = {} # Store spine properties (href -> properties)
def should_add_file(zipfile: ZipFile, filename: str) -> bool:
"""Check if file should be added (new or larger than existing)"""
# Skip directory entries, container files (ebooklib handles these), and OPF/NCX
if filename.endswith("/"):
return False
if filename == "mimetype" or filename.startswith("META-INF/"):
return False
if filename.endswith(".opf") or filename.endswith(".ncx"):
return False
if filename not in added_files:
return True
# If file exists, only replace if new version is larger (non-empty beats empty)
new_size = zipfile.getinfo(filename).file_size
return new_size > added_files[filename]
output = epub.EpubBook()
opf_extracted = False
for file in files:
await self._download_and_write_file(file, temporary_file_location)
with ZipFile(temporary_file_location, "r") as zipfile:
for filepath in get_new_files(zipfile):
# Extract OPF metadata from first OPF file (before skipping)
if not opf_extracted:
for filename in zipfile.namelist():
if filename.endswith(".opf"):
opf_content = zipfile.read(filename)
opf_metadata = _extract_opf_metadata(opf_content)
# Store rendition properties in metadata
if opf_metadata.get('rendition_layout'):
metadata.rendition_layout = opf_metadata['rendition_layout']
if opf_metadata.get('rendition_spread'):
metadata.rendition_spread = opf_metadata['rendition_spread']
if opf_metadata.get('rendition_orientation'):
metadata.rendition_orientation = opf_metadata['rendition_orientation']
if opf_metadata.get('cover_href'):
cover_href = opf_metadata['cover_href']
if opf_metadata.get('spine_properties'):
spine_properties.update(opf_metadata['spine_properties'])
opf_extracted = True
break
# Collect CSS files, merging content from all parts
for filepath in zipfile.namelist():
if filepath.endswith(".css"):
content = zipfile.read(filepath)
if not content:
continue # Skip empty files
if filepath not in css_cache:
css_cache[filepath] = content
else:
# Merge: combine rules, keeping the longer version for duplicate selectors
existing_str = css_cache[filepath].decode('utf-8', errors='ignore')
new_str = content.decode('utf-8', errors='ignore')
# Parse existing rules into dict: key -> full rule
existing_rules = {}
for rule in existing_str.split('}'):
if '{' in rule:
rule_key = _get_css_rule_key(rule)
if rule_key:
existing_rules[rule_key] = rule.strip() + '}'
# Process new rules: add new ones, replace if longer
for rule in new_str.split('}'):
if '{' in rule:
rule_key = _get_css_rule_key(rule)
if rule_key:
new_rule = rule.strip() + '}'
if rule_key not in existing_rules or len(new_rule) > len(existing_rules[rule_key]):
existing_rules[rule_key] = new_rule
# Rebuild CSS from merged rules
css_cache[filepath] = '\n'.join(existing_rules.values()).encode('utf-8')
for filepath in zipfile.namelist():
# Skip CSS files here - they'll be added after all parts are merged
if filepath.endswith(".css"):
continue
if not should_add_file(zipfile, filepath):
continue
content = zipfile.read(filepath)
file_size = len(content)
if filepath.endswith("html"):
filename = os.path.basename(filepath)
# Fix fixed-layout pages if we have rendition:layout
if metadata.rendition_layout == 'pre-paginated':
# Find matching CSS (e.g., page1.xhtml -> page1.css)
css_path = filepath.replace('.xhtml', '.css').replace('.html', '.css')
css_content = css_cache.get(css_path)
if css_content:
content = _fix_fixed_layout_page(content, css_content)
is_in_toc = False
title = None
for key, value in data.files_in_toc.items():
@ -112,13 +342,28 @@ class Epub(OutputFormat):
title = value
is_in_toc = True
break
epub_file = epub.EpubHtml(
title = title,
# Use EpubItem to preserve original content (link tags, viewport, etc.)
# EpubHtml parses and regenerates HTML, stripping these
epub_file = epub.EpubItem(
file_name = filepath,
content = content
content = content,
media_type = 'application/xhtml+xml'
)
output.add_item(epub_file)
output.spine.append(epub_file)
# Skip nav.xhtml from spine for fixed-layout (causes blank first page)
is_nav = any(x in filepath.lower() for x in ['nav.xhtml', 'nav.html', 'toc.xhtml', 'toc.html'])
if not (is_nav and metadata.rendition_layout == 'pre-paginated'):
# Check for spine properties (page-spread-left/right)
# Try matching with different path variations
props = None
for href, prop_value in spine_properties.items():
if filepath.endswith(href) or href.endswith(os.path.basename(filepath)):
props = prop_value
break
if props:
output.spine.append((epub_file, props))
else:
output.spine.append(epub_file)
if is_in_toc:
output.toc.append(epub_file)
else:
@ -127,12 +372,96 @@ class Epub(OutputFormat):
content = content
)
output.add_item(epub_file)
added_files.add(filepath)
added_files[filepath] = file_size
if update:
update(progress)
os.remove(temporary_file_location)
# Add merged CSS files after all parts have been processed
for css_path, css_content in css_cache.items():
css_item = epub.EpubItem(
file_name=css_path,
content=css_content,
media_type='text/css'
)
output.add_item(css_item)
# Set cover image if found in source OPF, or detect from first page for fixed-layout
if not cover_href and metadata.rendition_layout == 'pre-paginated':
# Find first content page from spine (excluding nav/toc)
first_page = None
for spine_item in output.spine:
item = spine_item[0] if isinstance(spine_item, tuple) else spine_item
if hasattr(item, 'file_name') and item.file_name:
fname = item.file_name.lower()
# Skip nav and toc files
if 'nav.' in fname or 'toc.' in fname:
continue
if fname.endswith('.xhtml') or fname.endswith('.html'):
first_page = item
break
if first_page and hasattr(first_page, 'content') and first_page.content:
# Parse HTML to find all images and pick the largest one
try:
content = first_page.content.decode('utf-8') if isinstance(first_page.content, bytes) else first_page.content
img_matches = re.findall(r'<img[^>]+src=["\']([^"\']+)["\']', content)
if img_matches:
page_dir = os.path.dirname(first_page.file_name)
# Build lookup dict for item sizes
item_sizes = {
item.file_name: len(item.content)
for item in output.items
if hasattr(item, 'file_name') and item.file_name
and hasattr(item, 'content') and item.content
}
best_img = None
best_size = 0
for img_src in img_matches:
img_path = os.path.normpath(os.path.join(page_dir, img_src))
# Find matching item by suffix
for file_name, size in item_sizes.items():
if file_name.endswith(img_path):
if size > best_size:
best_size = size
best_img = img_path
break
if best_img:
cover_href = best_img
except (UnicodeDecodeError, AttributeError):
pass
if cover_href:
# Find the cover image item and mark it as cover
for item in output.items:
if hasattr(item, 'file_name') and item.file_name and item.file_name.endswith(cover_href):
# Get or create item ID
item_id = item.id if hasattr(item, 'id') and item.id else os.path.basename(cover_href).replace('.', '-')
if not item.id:
item.id = item_id
# Add EPUB 2 cover metadata: <meta name="cover" content="image-id"/>
output.add_metadata('OPF', 'meta', '', {'name': 'cover', 'content': item_id})
# Mark item with EPUB 3 cover-image property
if not hasattr(item, 'properties') or item.properties is None:
item.properties = []
if 'cover-image' not in item.properties:
item.properties.append('cover-image')
break
# Apply rendition properties to output (fixed-layout support)
if metadata.rendition_layout:
output.add_metadata(None, 'meta', metadata.rendition_layout, {'property': 'rendition:layout'})
if metadata.rendition_spread:
output.add_metadata(None, 'meta', metadata.rendition_spread, {'property': 'rendition:spread'})
if metadata.rendition_orientation:
output.add_metadata(None, 'meta', metadata.rendition_orientation, {'property': 'rendition:orientation'})
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
nav = epub.EpubNav()
output.add_item(nav)
# For fixed-layout, remove nav from spine (it shouldn't be in reading order)
if metadata.rendition_layout == 'pre-paginated':
output.spine = [item for item in output.spine if item != nav and not (isinstance(item, tuple) and item[0] == nav)]
epub.write_epub(location, output)
exit()

View File

@ -0,0 +1,263 @@
"""
Generic EPUB metadata writer
Handles writing metadata to EPUB files from book.Metadata
"""
from grawlix import logging
from grawlix.book import Metadata
import zipfile
import tempfile
import os
import shutil
def write_metadata_to_epub(metadata: Metadata, epub_path: str) -> None:
"""
Write metadata to EPUB file
:param metadata: Metadata object from book
:param epub_path: Path to the EPUB file
"""
try:
from lxml import etree as ET
using_lxml = True
except ImportError:
import xml.etree.ElementTree as ET
using_lxml = False
# EPUB namespaces
NAMESPACES = {
'opf': 'http://www.idpf.org/2007/opf',
'dc': 'http://purl.org/dc/elements/1.1/',
'dcterms': 'http://purl.org/dc/terms/',
}
# Register namespaces for ElementTree
if not using_lxml:
for prefix, uri in NAMESPACES.items():
ET.register_namespace(prefix, uri)
# Create temporary directory for EPUB extraction
temp_dir = tempfile.mkdtemp()
try:
# Extract EPUB
with zipfile.ZipFile(epub_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Find OPF file
opf_path = _find_opf_file(temp_dir)
if not opf_path:
logging.debug("Could not find OPF file in EPUB")
return
# Parse OPF file
if using_lxml:
parser = ET.XMLParser(recover=True, encoding='utf-8')
tree = ET.parse(opf_path, parser)
else:
tree = ET.parse(opf_path)
root = tree.getroot()
# Find metadata element
if using_lxml:
metadata_elem = root.find('.//opf:metadata', NAMESPACES)
else:
metadata_elem = root.find('opf:metadata', NAMESPACES)
if metadata_elem is None:
logging.debug("Could not find metadata element in OPF")
return
# Update metadata
_update_epub_metadata(metadata_elem, metadata, NAMESPACES, using_lxml)
# Write updated OPF
if using_lxml:
tree.write(opf_path, encoding='utf-8', xml_declaration=True, pretty_print=True)
else:
tree.write(opf_path, encoding='utf-8', xml_declaration=True)
# Repack EPUB
_repack_epub(temp_dir, epub_path)
logging.debug("Successfully wrote metadata to EPUB")
finally:
# Cleanup
shutil.rmtree(temp_dir)
def _find_opf_file(epub_dir: str) -> str:
"""Find the OPF file in extracted EPUB directory"""
container_path = os.path.join(epub_dir, 'META-INF', 'container.xml')
if os.path.exists(container_path):
try:
from lxml import etree as ET
except ImportError:
import xml.etree.ElementTree as ET
tree = ET.parse(container_path)
root = tree.getroot()
rootfile = root.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile')
if rootfile is not None:
opf_relative_path = rootfile.get('full-path')
return os.path.join(epub_dir, opf_relative_path)
# Fallback: search for .opf file
for root_dir, dirs, files in os.walk(epub_dir):
for file in files:
if file.endswith('.opf'):
return os.path.join(root_dir, file)
return None
def _update_epub_metadata(metadata_elem, metadata: Metadata, ns: dict, using_lxml: bool) -> None:
"""Update EPUB metadata elements from Metadata object"""
if using_lxml:
from lxml import etree as ET
else:
import xml.etree.ElementTree as ET
# Helper function to create/update element
def update_or_create_element(tag: str, text: str, attribs: dict = None):
if not text:
return
# Remove existing elements with this tag
for elem in list(metadata_elem.findall(tag, ns)):
metadata_elem.remove(elem)
# Create new element
elem = ET.SubElement(metadata_elem, tag)
elem.text = str(text)
if attribs:
for key, value in attribs.items():
elem.set(key, value)
# Helper to create meta element
def create_meta(name: str, content):
if content is None:
return
meta = ET.SubElement(metadata_elem, f"{{{ns['opf']}}}meta")
meta.set('name', name)
meta.set('content', str(content))
# Title
update_or_create_element(f"{{{ns['dc']}}}title", metadata.title)
# Original Title (EPUB 3 with refinements)
if metadata.original_title:
# Create title with ID for main title
for elem in list(metadata_elem.findall(f"{{{ns['dc']}}}title", ns)):
elem.set('id', 'main-title')
# Add original title
orig_title = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}title")
orig_title.set('id', 'original-title')
orig_title.text = metadata.original_title
# Add meta refinement for original title
meta = ET.SubElement(metadata_elem, f"{{{ns['opf']}}}meta")
meta.set('refines', '#original-title')
meta.set('property', 'title-type')
meta.text = 'original'
# Authors
for author in metadata.authors:
creator = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}creator")
creator.text = author
creator.set(f"{{{ns['opf']}}}role", "aut")
# Translators
for translator in metadata.translators:
contributor = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}contributor")
contributor.text = translator
contributor.set(f"{{{ns['opf']}}}role", "trl")
# Description
update_or_create_element(f"{{{ns['dc']}}}description", metadata.description)
# Language
update_or_create_element(f"{{{ns['dc']}}}language", metadata.language)
# Publisher
update_or_create_element(f"{{{ns['dc']}}}publisher", metadata.publisher)
# ISBN (from identifier field)
if metadata.isbn:
# Remove existing ISBN identifiers
for elem in list(metadata_elem.findall(f"{{{ns['dc']}}}identifier", ns)):
scheme = elem.get(f"{{{ns['opf']}}}scheme")
if scheme and scheme.upper() == "ISBN":
metadata_elem.remove(elem)
# Add new ISBN
identifier = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}identifier")
identifier.text = metadata.isbn
identifier.set(f"{{{ns['opf']}}}scheme", "ISBN")
# Release Date (convert date to string)
release_date_str = metadata.release_date.isoformat() if metadata.release_date else None
update_or_create_element(f"{{{ns['dc']}}}date", release_date_str)
# Category
if metadata.category:
subject = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}subject")
subject.text = metadata.category
# Tags
for tag in metadata.tags:
subject = ET.SubElement(metadata_elem, f"{{{ns['dc']}}}subject")
subject.text = tag
# Series info (Calibre format) - using series and index fields
if metadata.series:
create_meta("calibre:series", metadata.series)
create_meta("calibre:series_index", metadata.index)
# EPUB 3 rendition properties (fixed-layout support)
# These use <meta property="...">value</meta> format, not name/content
def create_meta_property(property_name: str, value: str):
if not value:
return
# Remove existing property if present
for elem in list(metadata_elem):
if elem.get('property') == property_name:
metadata_elem.remove(elem)
meta = ET.SubElement(metadata_elem, 'meta')
meta.set('property', property_name)
meta.text = value
if metadata.rendition_layout:
create_meta_property('rendition:layout', metadata.rendition_layout)
if metadata.rendition_spread:
create_meta_property('rendition:spread', metadata.rendition_spread)
if metadata.rendition_orientation:
create_meta_property('rendition:orientation', metadata.rendition_orientation)
def _repack_epub(epub_dir: str, output_path: str) -> None:
"""Repack EPUB directory into ZIP file"""
# Remove old EPUB
if os.path.exists(output_path):
os.remove(output_path)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as epub_zip:
# mimetype must be first and uncompressed
mimetype_path = os.path.join(epub_dir, 'mimetype')
if os.path.exists(mimetype_path):
epub_zip.write(mimetype_path, 'mimetype', compress_type=zipfile.ZIP_STORED)
# Add all other files
for root, dirs, files in os.walk(epub_dir):
for file in files:
if file == 'mimetype':
continue
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, epub_dir)
epub_zip.write(file_path, arcname)

10
grawlix/output/pdf.py Normal file
View File

@ -0,0 +1,10 @@
from grawlix.book import Book, SingleFile
from .output_format import OutputFormat, Update
class Pdf(OutputFormat):
extension = "pdf"
input_types = [SingleFile]
async def download(self, book: Book, location: str, update_func: Update) -> None:
await self._download_single_file(book, location, update_func)

View File

@ -0,0 +1,68 @@
"""
Convert PDF-in-epub files to proper PDF format.
Some sources (like Nextory) wrap PDF pages in epub containers.
"""
import os
import re
import zipfile
from io import BytesIO
from pypdf import PdfWriter, PdfReader
def convert_pdf_epub_to_pdf(epub_path: str) -> str:
"""
Extract embedded PDFs from an epub and merge them into a single PDF.
:param epub_path: Path to the epub file containing embedded PDFs
:return: Path to the created PDF file
"""
pdf_path = epub_path.rsplit('.', 1)[0] + '.pdf'
with zipfile.ZipFile(epub_path, 'r') as zf:
# Find all PDF files in the epub
pdf_files = [f for f in zf.namelist() if f.endswith('.pdf')]
if not pdf_files:
raise ValueError("No PDF files found in epub")
# Sort by numeric order (1.pdf, 2.pdf, ..., 10.pdf, 11.pdf, ...)
def extract_number(path: str) -> int:
match = re.search(r'/(\d+)\.pdf$', path)
return int(match.group(1)) if match else 0
pdf_files.sort(key=extract_number)
# Merge all PDFs
writer = PdfWriter()
for pdf_file in pdf_files:
pdf_data = zf.read(pdf_file)
reader = PdfReader(BytesIO(pdf_data))
for page in reader.pages:
writer.add_page(page)
# Write merged PDF
with open(pdf_path, 'wb') as out_file:
writer.write(out_file)
# Remove the original epub
os.remove(epub_path)
return pdf_path
def is_pdf_in_epub(epub_path: str) -> bool:
"""
Check if an epub contains embedded PDF files instead of HTML.
:param epub_path: Path to the epub file
:return: True if the epub contains PDF files
"""
try:
with zipfile.ZipFile(epub_path, 'r') as zf:
for name in zf.namelist():
if name.endswith('.pdf'):
return True
except (zipfile.BadZipFile, FileNotFoundError):
pass
return False

View File

@ -23,7 +23,7 @@ class Ereolen(Source):
.read_text("utf8") \
.split("\n")
match: list[str] = [
rf"https://(www.)?({"|".join(library_domains)})/reader\?orderid=.+$",
rf"https://(www.)?({'|'.join(library_domains)})/reader\?orderid=.+$",
]
_authentication_methods = [ "login" ]

View File

@ -122,7 +122,6 @@ class Flipp(Source):
metadata = Metadata(
title = f"{metadata['series_name']} {metadata['issueName']}",
series = metadata["series_name"],
identifier = issue_id
),
)

View File

@ -1,11 +1,11 @@
from grawlix.book import Book, Metadata, OnlineFile, BookData, OnlineFile, SingleFile, EpubInParts, Result, Series
from grawlix.book import Book, Metadata, OnlineFile, BookData, EpubInParts, Result, Series
from grawlix.encryption import AESEncryption
from grawlix.exceptions import InvalidUrl
from .source import Source
from typing import Optional
from typing import Tuple
from datetime import date
import uuid
import rich
import base64
LOCALE = "en_GB"
@ -17,12 +17,7 @@ class Nextory(Source):
]
_authentication_methods = [ "login" ]
@staticmethod
def _create_device_id() -> str:
"""Create unique device id"""
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
# Authentication methods
async def login(self, url: str, username: str, password: str) -> None:
# Set permanent headers
@ -30,12 +25,12 @@ class Nextory(Source):
self._client.headers.update(
{
"X-Application-Id": "200",
"X-App-Version": "5.4.1",
"X-App-Version": "2025.12.1",
"X-Locale": LOCALE,
"X-Model": "Personal Computer",
"X-Device-Id": device_id,
"X-Os-Info": "Android",
"appid": "200",
"X-OS-INFO": "Personal Computer",
"locale": LOCALE,
}
)
# Login for account
@ -47,7 +42,6 @@ class Nextory(Source):
},
)
session_response = session_response.json()
rich.print(session_response)
login_token = session_response["login_token"]
country = session_response["country"]
self._client.headers.update(
@ -62,7 +56,6 @@ class Nextory(Source):
"https://api.nextory.com/user/v1/me/profiles",
)
profiles_response = profiles_response.json()
rich.print(profiles_response)
profile = profiles_response["profiles"][0]
login_key = profile["login_key"]
authorize_response = await self._client.post(
@ -72,19 +65,24 @@ class Nextory(Source):
}
)
authorize_response = authorize_response.json()
rich.print(authorize_response)
profile_token = authorize_response["profile_token"]
self._client.headers.update({"X-Profile-Token": profile_token})
self._client.headers.update({"X-Profile-Token": profile_token})
@staticmethod
def _find_epub_id(product_data) -> str:
"""Find id of book format of type epub for given book"""
for format in product_data["formats"]:
if format["type"] == "epub":
return format["identifier"]
raise InvalidUrl
def _create_device_id() -> str:
"""Create unique device id"""
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
# Main download methods
async def download(self, url: str) -> Result:
url_id = self._extract_id_from_url(url)
if "serier" in url:
return await self._download_series(url_id)
else:
return await self._download_book(url_id)
@staticmethod
@ -107,10 +105,156 @@ class Nextory(Source):
return await self._download_book(url_id)
async def download_book_from_id(self, book_id: str) -> Book:
return await self._download_book(book_id)
# Book download path
async def _download_book(self, book_id: str) -> Book:
product_data = await self._get_product_data(book_id)
_, format_id = self._find_format(product_data)
# Nextory serves all books via epub endpoint regardless of original format
data = await self._get_epub_data(format_id)
metadata = self._extract_metadata(product_data)
return Book(
data = data,
metadata = metadata,
)
async def _get_product_data(self, book_id: str) -> dict:
"""
Fetch product data from Nextory API
:param book_id: Id of book (can be URL id or internal id)
:return: Product data dictionary
"""
response = await self._client.get(
f"https://api.nextory.com/library/v1/products/{book_id}",
)
return response.json()
@staticmethod
def _find_format(product_data) -> Tuple[str, str]:
"""Find a supported book format (epub or pdf)"""
for format_type in ("epub", "pdf"):
for fmt in product_data["formats"]:
if fmt["type"] == format_type:
return (format_type, fmt["identifier"])
raise InvalidUrl
def _extract_metadata(self, product_data: dict) -> Metadata:
"""
Extract metadata from Nextory product data
:param product_data: Product data from Nextory API
:return: Metadata object
"""
# Find epub or pdf format for format-specific metadata
ebook_format = None
for fmt_type in ("epub", "pdf"):
for fmt in product_data.get("formats", []):
if fmt.get("type") == fmt_type:
ebook_format = fmt
break
if ebook_format:
break
# Basic metadata
title = product_data.get("title", "Unknown")
authors = [author["name"] for author in product_data.get("authors", [])]
description = product_data.get("description_full")
language = product_data.get("language")
# Format-specific metadata
publisher = None
isbn = None
release_date = None
translators = []
if ebook_format:
publisher = ebook_format.get("publisher", {}).get("name") if ebook_format.get("publisher") else None
isbn = ebook_format.get("isbn")
translators = [t["name"] for t in ebook_format.get("translators", [])]
pub_date = ebook_format.get("publication_date")
if pub_date:
# Format is YYYY-MM-DD
release_date = date.fromisoformat(pub_date)
# Series info
series = None
index = None
series_info = product_data.get("series")
if series_info:
series = series_info.get("name")
volume = product_data.get("volume")
if volume:
index = volume
return Metadata(
title=title,
authors=authors,
translators=translators,
language=language,
publisher=publisher,
isbn=isbn,
description=description,
release_date=release_date,
series=series,
index=index,
source="Nextory"
)
async def _get_epub_data(self, epub_id: str) -> BookData:
"""
Download epub data for book
:param epub_id: Id of epub file
:return: Epub data
"""
# Nextory books are for some reason split up into multiple epub files -
# one for each chapter file. All of these files has to be decrypted and
# combined afterwards. Many of the provided epub files contain the same
# files and some of them contain the same file names but with variation
# in the content and comments that describe what should have been there
# if the book was whole from the start.
response = await self._client.get(
f"https://api.nextory.com/reader/books/{epub_id}/packages/epub"
)
epub_data = response.json()
encryption = AESEncryption(
key = self._fix_key(epub_data["crypt_key"]),
iv = self._fix_key(epub_data["crypt_iv"])
)
files = [
OnlineFile(
url = part["spine_url"],
extension = "epub",
encryption = encryption
)
for part in epub_data["spines"]
]
files_in_toc = {}
for item in epub_data["toc"]["childrens"]: # Why is it "childrens"?
files_in_toc[item["src"]] = item["name"]
return EpubInParts(
files,
files_in_toc
)
@staticmethod
def _fix_key(value: str) -> bytes:
"""Remove unused data and decode key"""
return base64.b64decode(value[:-1])
# Series download path
async def _download_series(self, series_id: str) -> Series:
"""
Download series from Nextory
@ -127,96 +271,8 @@ class Nextory(Source):
}
)
series_data = response.json()
book_ids = []
for book in series_data["products"]:
book_id = book["id"]
book_ids.append(book_id)
book_ids = [book["id"] for book in series_data["products"]]
return Series(
title = series_data["products"][0]["series"]["name"],
book_ids = book_ids,
)
@staticmethod
def _extract_series_name(product_info: dict) -> Optional[str]:
if not "series" in product_info:
return None
return product_info["series"]["name"]
async def _get_book_id_from_url_id(self, url_id: str) -> str:
"""
Download book id from url id
:param url_id: Id of book from url
:return: Book id
"""
response = await self._client.get(
f"https://api.nextory.se/api/app/product/7.5/bookinfo",
params = { "id": url_id },
)
rich.print(response.url)
rich.print(response.content)
exit()
async def _download_book(self, book_id: str) -> Book:
product_data = await self._client.get(
f"https://api.nextory.com/library/v1/products/{book_id}"
)
product_data = product_data.json()
epub_id = self._find_epub_id(product_data)
pages = await self._get_pages(epub_id)
return Book(
data = pages,
metadata = Metadata(
title = product_data["title"],
authors = [author["name"] for author in product_data["authors"]],
series = self._extract_series_name(product_data),
)
)
@staticmethod
def _fix_key(value: str) -> bytes:
"""Remove unused data and decode key"""
return base64.b64decode(value[:-1])
async def _get_pages(self, epub_id: str) -> BookData:
"""
Download page information for book
:param epub_id: Id of epub file
:return: Page data
"""
# Nextory books are for some reason split up into multiple epub files -
# one for each chapter file. All of these files has to be decrypted and
# combined afterwards. Many of the provided epub files contain the same
# files and some of them contain the same file names but with variation
# in the content and comments that describe what should have been there
# if the book was whole from the start.
response = await self._client.get(
f"https://api.nextory.com/reader/books/{epub_id}/packages/epub"
)
epub_data = response.json()
encryption = AESEncryption(
key = self._fix_key(epub_data["crypt_key"]),
iv = self._fix_key(epub_data["crypt_iv"])
)
files = []
for part in epub_data["spines"]:
files.append(
OnlineFile(
url = part["spine_url"],
extension = "epub",
encryption = encryption
)
)
files_in_toc = {}
for item in epub_data["toc"]["childrens"]: # Why is it "childrens"?
files_in_toc[item["src"]] = item["name"]
return EpubInParts(
files,
files_in_toc
)

View File

@ -9,6 +9,7 @@ from urllib3.util import parse_url
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from typing import Any
from datetime import datetime
class Storytel(Source):
name: str = "Storytel"
@ -18,6 +19,57 @@ class Storytel(Source):
_authentication_methods = [ "login" ]
__download_counter = 0
# Authentication methods
async def login(self, url: str, username: str, password: str) -> None:
self.__username = username
self.__password = self.encrypt_password(password)
self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"})
await self.authenticate()
@staticmethod
def encrypt_password(password: str) -> str:
"""
Encrypt password with predefined keys.
This encrypted password is used for login.
:param password: User defined password
:returns: Encrypted password
"""
# Thanks to https://github.com/javsanpar/storytel-tui
key = b"VQZBJ6TD8M9WBUWT"
iv = b"joiwef08u23j341a"
msg = pad(password.encode(), AES.block_size)
cipher = AES.new(key, AES.MODE_CBC, iv)
cipher_text = cipher.encrypt(msg)
return cipher_text.hex()
async def authenticate(self) -> None:
"""Authenticate with storytel"""
response = await self._client.post(
f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false",
data = {
"uid": self.__username,
"pwd": self.__password
}
)
if response.status_code != 200:
raise SourceNotAuthenticated
user_data = response.json()
jwt = user_data["accountInfo"]["jwt"]
self._client.headers.update({"authorization": f"Bearer {jwt}"})
async def reauthenticate(self) -> None:
"""Reauthenticate if required"""
if self.__download_counter > 0 and self.__download_counter % 10 == 0:
await self.authenticate()
# Main download methods
async def download(self, url: str) -> Result:
await self.reauthenticate()
@ -35,6 +87,22 @@ class Storytel(Source):
raise InvalidUrl
@staticmethod
def extract_id_from_url(url: str) -> str:
"""
Extract id from url
:param url: Url containing id
:return: Id
"""
parsed = parse_url(url)
if parsed.path is None:
raise DataNotFound
return parsed.path.split("-")[-1]
# Book download path
async def download_book_from_id(self, book_id: str) -> Book:
# Epub location
response = await self._client.get(
@ -48,11 +116,13 @@ class Storytel(Source):
f"https://api.storytel.net/book-details/consumables/{book_id}?kidsMode=false&configVariant=default"
)
details = response.json()
logging.debug(f"Full book details JSON: {json.dumps(details, indent=2)}")
return Book(
metadata = Metadata(
title = details["title"]
),
# Extract metadata from details
metadata = self._extract_metadata(details)
book = Book(
metadata = metadata,
data = SingleFile(
OnlineFile(
url = epub_url,
@ -61,8 +131,73 @@ class Storytel(Source):
)
)
)
return book
def _extract_metadata(self, details: dict) -> Metadata:
"""
Extract metadata from Storytel book details JSON
:param details: Book details from Storytel API
:return: Metadata object
"""
# Extract ebook-specific format data
ebook_format = None
for fmt in details.get("formats", []):
if fmt.get("type") == "ebook":
ebook_format = fmt
break
# Extract basic metadata
title = details.get("title", "Unknown")
original_title = details.get("originalTitle")
authors = [author["name"] for author in details.get("authors", [])]
translators = [translator["name"] for translator in details.get("translators", [])]
language = details.get("language")
description = details.get("description")
category = details.get("category", {}).get("name") if details.get("category") else None
tags = [tag["name"] for tag in details.get("tags", [])[:10]]
# Extract ebook-specific publisher, ISBN, and release date
publisher = None
isbn = None
release_date = None
if ebook_format:
publisher = ebook_format.get("publisher", {}).get("name")
isbn = ebook_format.get("isbn")
release_date_str = ebook_format.get("releaseDate")
if release_date_str:
# Parse ISO format date
release_date = datetime.fromisoformat(release_date_str.replace("Z", "+00:00")).date()
# Extract series information
series = None
index = None
series_info = details.get("seriesInfo")
if series_info:
series = series_info.get("name")
index = series_info.get("orderInSeries")
return Metadata(
title=title,
original_title=original_title,
authors=authors,
translators=translators,
language=language,
publisher=publisher,
isbn=isbn,
description=description,
release_date=release_date,
series=series,
index=index,
category=category,
tags=tags,
source="Storytel"
)
# List download path
async def download_list(self, url: str, list_type: str, language: str) -> Series:
"""
Download list of books
@ -97,23 +232,22 @@ class Storytel(Source):
) -> dict[str, Any]:
"""Download details about book list
:param formats: comma serapted list of formats (abook,ebook,podcast)
:param languages: comma seperated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl)
:param formats: comma separated list of formats (abook,ebook,podcast)
:param languages: comma separated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl)
"""
nextPageToken = 0
# API returns only 10 items per request, so we need to paginate
# Start with None to ensure we enter the loop and make the first request
result: dict[str, Any] = {"nextPageToken": None}
is_first_page = True
# API returns only 10 items per request
# if the nextPageToken
result: dict[str, Any] = {"nextPageToken": False}
while result["nextPageToken"] is not None:
while result["nextPageToken"] is not None or is_first_page:
params: dict[str, str] = {
"includeListDetails": "true", # include listMetadata,filterOptions,sortOption sections
"includeFormats": formats,
"includeLanguages": languages,
"kidsMode": "false",
}
if result["nextPageToken"]:
if result.get("nextPageToken"):
params["nextPageToken"] = result["nextPageToken"]
response = await self._client.get(
@ -122,72 +256,12 @@ class Storytel(Source):
)
data = response.json()
if result["nextPageToken"] == 0:
if is_first_page:
result = data
is_first_page = False
else:
result["items"].extend(data["items"])
result["nextPageToken"] = data["nextPageToken"]
logging.debug(f"{result=}")
return result
@staticmethod
def extract_id_from_url(url: str) -> str:
"""
Extract id from url
:param url: Url containing id
:return: Id
"""
parsed = parse_url(url)
if parsed.path is None:
raise DataNotFound
return parsed.path.split("-")[-1]
@staticmethod
def encrypt_password(password: str) -> str:
"""
Encrypt password with predefined keys.
This encrypted password is used for login.
:param password: User defined password
:returns: Encrypted password
"""
# Thanks to https://github.com/javsanpar/storytel-tui
key = b"VQZBJ6TD8M9WBUWT"
iv = b"joiwef08u23j341a"
msg = pad(password.encode(), AES.block_size)
cipher = AES.new(key, AES.MODE_CBC, iv)
cipher_text = cipher.encrypt(msg)
return cipher_text.hex()
async def reauthenticate(self) -> None:
"""Reauthenticate if required"""
if self.__download_counter > 0 and self.__download_counter % 10 == 0:
await self.authenticate()
async def authenticate(self) -> None:
"""Authenticate with storytel"""
response = await self._client.post(
f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false",
data = {
"uid": self.__username,
"pwd": self.__password
}
)
if response.status_code != 200:
raise SourceNotAuthenticated
user_data = response.json()
jwt = user_data["accountInfo"]["jwt"]
self._client.headers.update({"authorization": f"Bearer {jwt}"})
async def login(self, username: str, password: str, **kwargs) -> None:
self.__username = username
self.__password = self.encrypt_password(password)
self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"})
await self.authenticate()

View File

@ -6,21 +6,28 @@ authors = [
description = "CLI tool for downloading ebooks"
readme = "README.md"
keywords = ["ebook", "cli", "downloader"]
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
]
dependencies = [
"appdirs",
"beautifulsoup4",
"blackboxprotobuf",
"EbookLib",
"httpx",
"importlib-resources",
"lxml",
"pycryptodome",
"rich",
"tomli",
"beautifulsoup4>=4.9.0",
"bbpb>=1.0.0",
"EbookLib>=0.17",
"httpx>=0.23.0",
"importlib-resources>=5.0",
"lxml>=4.6.0",
"platformdirs>=3.0.0",
"pycryptodome>=3.10.0",
"pypdf>=3.0.0",
"rich>=10.0.0",
"tomli>=1.0.0; python_version<'3.11'",
]
dynamic = ["version"]