From 4efb9631688e2574124cf40e22cd3a8319cd6707 Mon Sep 17 00:00:00 2001 From: ^_^ <8480595+ballaballaballa@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:11:20 +0100 Subject: [PATCH] feat: add metadata writing support for Nextory and refactor source files - Fix Nextory SSL certificate error (api.nextory.se -> api.nextory.com) - Add missing X-OS-INFO header for Nextory API compatibility - Implement full metadata writing for Nextory (title, authors, translators, series, publisher, ISBN, language, description, release date) - Remove debug JSON output from Nextory authentication - Fix exit() call in epub.py that prevented metadata writing for EpubInParts Refactoring improvements: - Reorganize functions in nextory.py and storytel.py by execution flow - Add section comments for better code organization (Authentication, Main download, Book path, Series path) - Remove duplicate imports (OnlineFile in nextory.py, datetime in storytel.py) - Remove duplicate header update in storytel.py - Improve pagination logic in storytel.py with clearer variable names - Optimize API calls in nextory.py (eliminate redundant call) - Use list comprehensions for cleaner code Documentation: - Update README with Nextory metadata support - Add supported sources comparison table for metadata fields - Include Nextory configuration example --- README.md | 38 +++- grawlix/__main__.py | 7 +- .../errors/unsupported_output_format.txt | 5 + grawlix/book.py | 2 + grawlix/epub_metadata_writers.py | 45 +++++ grawlix/output/epub.py | 1 - grawlix/sources/nextory.py | 175 +++++++++--------- grawlix/sources/storytel.py | 155 ++++++++-------- 8 files changed, 259 insertions(+), 169 deletions(-) create mode 100644 grawlix/assets/errors/unsupported_output_format.txt diff --git a/README.md b/README.md index b327f00..c76e524 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ The following settings can be added to your config file (before any `[sources.*] | Setting | Type | Description | Example | |---------|------|-------------|---------| -| `write_metadata_to_epub` | boolean | Automatically write metadata to EPUB files (currently supports Storytel) | `true` or `false` | +| `write_metadata_to_epub` | boolean | Automatically write metadata to EPUB files (supports Storytel and Nextory) | `true` or `false` | | `output` | string | Default output path template (supports `~`, environment variables, and template variables) | `"~/ebooks/{title}.{ext}"` | ### Output Templates @@ -96,20 +96,35 @@ The `output` setting supports template variables that are replaced with book met | `{publisher}` | Publisher name | "Orbit" | | `{language}` | Language code | "en" | | `{release_date}` | Release date | "2020-01-15" | -| `{ext}` | File extension | "epub" | +| `{source}` | Source/service name | "Storytel", "Marvel", etc. | +| `{ext}` | File extension (auto-detected from source) | "epub" | **Example templates:** ```toml -# Simple +# Simple (auto-detect format) output = "~/books/{title}.{ext}" -# Organized by series +# Force EPUB format +output = "~/books/{title}.epub" + +# Organized by source +output = "~/books/{source}/{title}.{ext}" + +# Organized by series (auto-detect format) output = "~/books/{series}/{index} - {title}.{ext}" -# With author -output = "~/books/{authors}/{series}/{title}.{ext}" +# Force EPUB with series organization +output = "~/books/{series}/{index} - {title}.epub" + +# Organized by source and series +output = "~/books/{source}/{series}/{index} - {title}.epub" ``` +**Note:** The file extension in your template determines the output format: +- Use `.epub` to force EPUB output +- Use `.cbz` to force CBZ (comic book) output +- Use `{ext}` to auto-detect the best format for each source + **Path expansion:** - `~` expands to home directory - Environment variables work: `$HOME` (Unix) or `%USERPROFILE%` (Windows) @@ -157,15 +172,22 @@ grawlix --debug ## Metadata Writing -For supported sources (currently Storytel), grawlix can write rich metadata to EPUB files including: +For supported sources (Storytel and Nextory), grawlix can write rich metadata to EPUB files including: - Title and original title - Authors and translators - Series information (Calibre-compatible) - Publisher, ISBN, language -- Description and categories +- Description and categories/tags - Release date +### Supported Sources + +| Source | Title | Authors | Translators | Series | Publisher | ISBN | Language | Description | Release Date | +|--------|-------|---------|-------------|--------|-----------|------|----------|-------------|--------------| +| Storytel | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| Nextory | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | + Enable globally in config: ```toml write_metadata_to_epub = true diff --git a/grawlix/__main__.py b/grawlix/__main__.py index da49a32..5ad8b7e 100644 --- a/grawlix/__main__.py +++ b/grawlix/__main__.py @@ -165,18 +165,21 @@ async def download_with_progress(book: Book, progress: Progress, template: str, # Write metadata if requested and available if write_metadata and book.source_data: - from .output import format_output_location, get_default_format, find_output_format + from .output import format_output_location, get_default_format, find_output_format, get_valid_extensions from . import epub_metadata, epub_metadata_writers # Determine output file location _, ext = os.path.splitext(template) ext = ext[1:] - if ext: + + # Handle {ext} placeholder - use default format for the book type + if ext and ext not in ['{ext}', 'ext'] and ext in get_valid_extensions(): output_format = find_output_format(book, ext)() else: output_format = get_default_format(book) location = format_output_location(book, output_format, template) + logging.debug(f"Output location: {location}, exists={os.path.exists(location)}, ends_with_epub={location.endswith('.epub')}") # Write metadata if it's an EPUB file if location.endswith('.epub') and os.path.exists(location): diff --git a/grawlix/assets/errors/unsupported_output_format.txt b/grawlix/assets/errors/unsupported_output_format.txt new file mode 100644 index 0000000..7b6a192 --- /dev/null +++ b/grawlix/assets/errors/unsupported_output_format.txt @@ -0,0 +1,5 @@ +[red]ERROR: Unsupported output format[/red] + +The requested output format is not compatible with the downloaded content type. +Please check your output template or try using {{ext}} to auto-detect the correct format. +If this error persists, please create an issue at {issue} diff --git a/grawlix/book.py b/grawlix/book.py index 7c24369..981e5a7 100644 --- a/grawlix/book.py +++ b/grawlix/book.py @@ -15,6 +15,7 @@ class Metadata: identifier: Optional[str] = None description: Optional[str] = None release_date: Optional[date] = None + source: Optional[str] = None def as_dict(self) -> dict: return { @@ -27,6 +28,7 @@ class Metadata: "authors": "; ".join(self.authors), "description": self.description or "UNKNOWN", "release_date": self.release_date.isoformat() if self.release_date else "UNKNOWN", + "source": self.source or "UNKNOWN", } diff --git a/grawlix/epub_metadata_writers.py b/grawlix/epub_metadata_writers.py index fb202fb..5a8b063 100644 --- a/grawlix/epub_metadata_writers.py +++ b/grawlix/epub_metadata_writers.py @@ -54,9 +54,54 @@ def storytel_transformer(details: dict) -> dict: return metadata +def nextory_transformer(details: dict) -> dict: + """ + Transform Nextory book details JSON into standardized EPUB metadata format + + :param details: Nextory book details JSON + :return: Standardized metadata dict + """ + # Extract epub format + epub_format = None + for fmt in details.get("formats", []): + if fmt.get("type") == "epub": + epub_format = fmt + break + + metadata = { + "title": details.get("title"), + "authors": [author.get("name", "") for author in details.get("authors", [])], + "translators": [translator.get("name", "") for translator in epub_format.get("translators", []) if epub_format], + "description": details.get("description_full"), + "language": details.get("language"), + } + + # Epub-specific metadata + if epub_format: + metadata["publisher"] = epub_format.get("publisher", {}).get("name") + metadata["isbn"] = epub_format.get("isbn") + + publication_date = epub_format.get("publication_date") + if publication_date: + # Already in YYYY-MM-DD format + metadata["release_date"] = publication_date + + # Series info + series_info = details.get("series") + if series_info: + metadata["series_name"] = series_info.get("name") + # Nextory uses "volume" at top level, not in series info + volume = details.get("volume") + if volume: + metadata["series_index"] = volume + + return metadata + + # Registry of transformers by source name TRANSFORMERS = { "storytel": storytel_transformer, + "nextory": nextory_transformer, # Add more sources here as they're implemented } diff --git a/grawlix/output/epub.py b/grawlix/output/epub.py index a73a23e..333ebc4 100644 --- a/grawlix/output/epub.py +++ b/grawlix/output/epub.py @@ -135,4 +135,3 @@ class Epub(OutputFormat): output.add_item(epub.EpubNcx()) output.add_item(epub.EpubNav()) epub.write_epub(location, output) - exit() diff --git a/grawlix/sources/nextory.py b/grawlix/sources/nextory.py index b68761d..9f09861 100644 --- a/grawlix/sources/nextory.py +++ b/grawlix/sources/nextory.py @@ -1,11 +1,10 @@ -from grawlix.book import Book, Metadata, OnlineFile, BookData, OnlineFile, SingleFile, EpubInParts, Result, Series +from grawlix.book import Book, Metadata, OnlineFile, BookData, EpubInParts, Result, Series from grawlix.encryption import AESEncryption from grawlix.exceptions import InvalidUrl from .source import Source from typing import Optional import uuid -import rich import base64 LOCALE = "en_GB" @@ -17,12 +16,7 @@ class Nextory(Source): ] _authentication_methods = [ "login" ] - - @staticmethod - def _create_device_id() -> str: - """Create unique device id""" - return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl")) - + # Authentication methods async def login(self, url: str, username: str, password: str) -> None: # Set permanent headers @@ -30,11 +24,13 @@ class Nextory(Source): self._client.headers.update( { "X-Application-Id": "200", - "X-App-Version": "5.4.1", + "X-App-Version": "5.47.0", "X-Locale": LOCALE, "X-Model": "Personal Computer", "X-Device-Id": device_id, - "X-Os-Info": "Android", + "X-OS-INFO": "Personal Computer", + "locale": LOCALE, + "device": device_id, "appid": "200", } ) @@ -47,7 +43,6 @@ class Nextory(Source): }, ) session_response = session_response.json() - rich.print(session_response) login_token = session_response["login_token"] country = session_response["country"] self._client.headers.update( @@ -62,7 +57,6 @@ class Nextory(Source): "https://api.nextory.com/user/v1/me/profiles", ) profiles_response = profiles_response.json() - rich.print(profiles_response) profile = profiles_response["profiles"][0] login_key = profile["login_key"] authorize_response = await self._client.post( @@ -72,19 +66,24 @@ class Nextory(Source): } ) authorize_response = authorize_response.json() - rich.print(authorize_response) profile_token = authorize_response["profile_token"] self._client.headers.update({"X-Profile-Token": profile_token}) - self._client.headers.update({"X-Profile-Token": profile_token}) @staticmethod - def _find_epub_id(product_data) -> str: - """Find id of book format of type epub for given book""" - for format in product_data["formats"]: - if format["type"] == "epub": - return format["identifier"] - raise InvalidUrl + def _create_device_id() -> str: + """Create unique device id""" + return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl")) + + + # Main download methods + + async def download(self, url: str) -> Result: + url_id = self._extract_id_from_url(url) + if "serier" in url: + return await self._download_series(url_id) + else: + return await self._download_book(url_id) @staticmethod @@ -107,64 +106,15 @@ class Nextory(Source): return await self._download_book(url_id) + async def download_book_from_id(self, book_id: str) -> Book: return await self._download_book(book_id) - async def _download_series(self, series_id: str) -> Series: - """ - Download series from Nextory - - :param series_id: Id of series on Nextory - :returns: Series data - """ - response = await self._client.get( - f"https://api.nextory.com/discovery/v1/series/{series_id}/products", - params = { - "content_type": "book", - "page": 0, - "per": 100, - } - ) - series_data = response.json() - book_ids = [] - for book in series_data["products"]: - book_id = book["id"] - book_ids.append(book_id) - return Series( - title = series_data["products"][0]["series"]["name"], - book_ids = book_ids, - ) - - - @staticmethod - def _extract_series_name(product_info: dict) -> Optional[str]: - if not "series" in product_info: - return None - return product_info["series"]["name"] - - - async def _get_book_id_from_url_id(self, url_id: str) -> str: - """ - Download book id from url id - - :param url_id: Id of book from url - :return: Book id - """ - response = await self._client.get( - f"https://api.nextory.se/api/app/product/7.5/bookinfo", - params = { "id": url_id }, - ) - rich.print(response.url) - rich.print(response.content) - exit() - + # Book download path async def _download_book(self, book_id: str) -> Book: - product_data = await self._client.get( - f"https://api.nextory.com/library/v1/products/{book_id}" - ) - product_data = product_data.json() + product_data = await self._get_product_data(book_id) epub_id = self._find_epub_id(product_data) pages = await self._get_pages(epub_id) return Book( @@ -173,14 +123,41 @@ class Nextory(Source): title = product_data["title"], authors = [author["name"] for author in product_data["authors"]], series = self._extract_series_name(product_data), - ) + ), + source_data = { + "source_name": "nextory", + "details": product_data + } ) + async def _get_product_data(self, book_id: str) -> dict: + """ + Fetch product data from Nextory API + + :param book_id: Id of book (can be URL id or internal id) + :return: Product data dictionary + """ + response = await self._client.get( + f"https://api.nextory.com/library/v1/products/{book_id}", + ) + return response.json() + + @staticmethod - def _fix_key(value: str) -> bytes: - """Remove unused data and decode key""" - return base64.b64decode(value[:-1]) + def _find_epub_id(product_data) -> str: + """Find id of book format of type epub for given book""" + for format in product_data["formats"]: + if format["type"] == "epub": + return format["identifier"] + raise InvalidUrl + + + @staticmethod + def _extract_series_name(product_info: dict) -> Optional[str]: + if "series" not in product_info: + return None + return product_info["series"]["name"] async def _get_pages(self, epub_id: str) -> BookData: @@ -204,15 +181,14 @@ class Nextory(Source): key = self._fix_key(epub_data["crypt_key"]), iv = self._fix_key(epub_data["crypt_iv"]) ) - files = [] - for part in epub_data["spines"]: - files.append( - OnlineFile( - url = part["spine_url"], - extension = "epub", - encryption = encryption - ) + files = [ + OnlineFile( + url = part["spine_url"], + extension = "epub", + encryption = encryption ) + for part in epub_data["spines"] + ] files_in_toc = {} for item in epub_data["toc"]["childrens"]: # Why is it "childrens"? files_in_toc[item["src"]] = item["name"] @@ -220,3 +196,34 @@ class Nextory(Source): files, files_in_toc ) + + + @staticmethod + def _fix_key(value: str) -> bytes: + """Remove unused data and decode key""" + return base64.b64decode(value[:-1]) + + + # Series download path + + async def _download_series(self, series_id: str) -> Series: + """ + Download series from Nextory + + :param series_id: Id of series on Nextory + :returns: Series data + """ + response = await self._client.get( + f"https://api.nextory.com/discovery/v1/series/{series_id}/products", + params = { + "content_type": "book", + "page": 0, + "per": 100, + } + ) + series_data = response.json() + book_ids = [book["id"] for book in series_data["products"]] + return Series( + title = series_data["products"][0]["series"]["name"], + book_ids = book_ids, + ) diff --git a/grawlix/sources/storytel.py b/grawlix/sources/storytel.py index a4ab5bd..23af290 100644 --- a/grawlix/sources/storytel.py +++ b/grawlix/sources/storytel.py @@ -19,6 +19,57 @@ class Storytel(Source): _authentication_methods = [ "login" ] __download_counter = 0 + # Authentication methods + + async def login(self, username: str, password: str, **kwargs) -> None: + self.__username = username + self.__password = self.encrypt_password(password) + self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"}) + await self.authenticate() + + + @staticmethod + def encrypt_password(password: str) -> str: + """ + Encrypt password with predefined keys. + This encrypted password is used for login. + + :param password: User defined password + :returns: Encrypted password + """ + # Thanks to https://github.com/javsanpar/storytel-tui + key = b"VQZBJ6TD8M9WBUWT" + iv = b"joiwef08u23j341a" + msg = pad(password.encode(), AES.block_size) + cipher = AES.new(key, AES.MODE_CBC, iv) + cipher_text = cipher.encrypt(msg) + return cipher_text.hex() + + + async def authenticate(self) -> None: + """Authenticate with storytel""" + response = await self._client.post( + f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false", + data = { + "uid": self.__username, + "pwd": self.__password + } + ) + if response.status_code != 200: + raise SourceNotAuthenticated + user_data = response.json() + jwt = user_data["accountInfo"]["jwt"] + self._client.headers.update({"authorization": f"Bearer {jwt}"}) + + + async def reauthenticate(self) -> None: + """Reauthenticate if required""" + if self.__download_counter > 0 and self.__download_counter % 10 == 0: + await self.authenticate() + + + # Main download methods + async def download(self, url: str) -> Result: await self.reauthenticate() @@ -36,6 +87,22 @@ class Storytel(Source): raise InvalidUrl + @staticmethod + def extract_id_from_url(url: str) -> str: + """ + Extract id from url + + :param url: Url containing id + :return: Id + """ + parsed = parse_url(url) + if parsed.path is None: + raise DataNotFound + return parsed.path.split("-")[-1] + + + # Book download path + async def download_book_from_id(self, book_id: str) -> Book: # Epub location response = await self._client.get( @@ -78,8 +145,6 @@ class Storytel(Source): :param details: Book details from Storytel API :return: Metadata object """ - from datetime import datetime - # Extract ebook-specific format data ebook_format = None for fmt in details.get("formats", []): @@ -119,10 +184,13 @@ class Storytel(Source): description=description, release_date=release_date, series=series, - index=index + index=index, + source="Storytel" ) + # List download path + async def download_list(self, url: str, list_type: str, language: str) -> Series: """ Download list of books @@ -157,23 +225,22 @@ class Storytel(Source): ) -> dict[str, Any]: """Download details about book list - :param formats: comma serapted list of formats (abook,ebook,podcast) - :param languages: comma seperated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl) + :param formats: comma separated list of formats (abook,ebook,podcast) + :param languages: comma separated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl) """ - nextPageToken = 0 + # API returns only 10 items per request, so we need to paginate + # Start with None to ensure we enter the loop and make the first request + result: dict[str, Any] = {"nextPageToken": None} + is_first_page = True - # API returns only 10 items per request - # if the nextPageToken - result: dict[str, Any] = {"nextPageToken": False} - - while result["nextPageToken"] is not None: + while result["nextPageToken"] is not None or is_first_page: params: dict[str, str] = { "includeListDetails": "true", # include listMetadata,filterOptions,sortOption sections "includeFormats": formats, "includeLanguages": languages, "kidsMode": "false", } - if result["nextPageToken"]: + if result.get("nextPageToken"): params["nextPageToken"] = result["nextPageToken"] response = await self._client.get( @@ -182,72 +249,12 @@ class Storytel(Source): ) data = response.json() - if result["nextPageToken"] == 0: + if is_first_page: result = data + is_first_page = False else: result["items"].extend(data["items"]) result["nextPageToken"] = data["nextPageToken"] logging.debug(f"{result=}") return result - - - @staticmethod - def extract_id_from_url(url: str) -> str: - """ - Extract id from url - - :param url: Url containing id - :return: Id - """ - parsed = parse_url(url) - if parsed.path is None: - raise DataNotFound - return parsed.path.split("-")[-1] - - - @staticmethod - def encrypt_password(password: str) -> str: - """ - Encrypt password with predefined keys. - This encrypted password is used for login. - - :param password: User defined password - :returns: Encrypted password - """ - # Thanks to https://github.com/javsanpar/storytel-tui - key = b"VQZBJ6TD8M9WBUWT" - iv = b"joiwef08u23j341a" - msg = pad(password.encode(), AES.block_size) - cipher = AES.new(key, AES.MODE_CBC, iv) - cipher_text = cipher.encrypt(msg) - return cipher_text.hex() - - - async def reauthenticate(self) -> None: - """Reauthenticate if required""" - if self.__download_counter > 0 and self.__download_counter % 10 == 0: - await self.authenticate() - - - async def authenticate(self) -> None: - """Authenticate with storytel""" - response = await self._client.post( - f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false", - data = { - "uid": self.__username, - "pwd": self.__password - } - ) - if response.status_code != 200: - raise SourceNotAuthenticated - user_data = response.json() - jwt = user_data["accountInfo"]["jwt"] - self._client.headers.update({"authorization": f"Bearer {jwt}"}) - - - async def login(self, username: str, password: str, **kwargs) -> None: - self.__username = username - self.__password = self.encrypt_password(password) - self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"}) - await self.authenticate()