mirror of
https://github.com/jo1gi/grawlix.git
synced 2026-03-28 14:40:00 -06:00
feat: add metadata writing support for Nextory and refactor source files
- Fix Nextory SSL certificate error (api.nextory.se -> api.nextory.com) - Add missing X-OS-INFO header for Nextory API compatibility - Implement full metadata writing for Nextory (title, authors, translators, series, publisher, ISBN, language, description, release date) - Remove debug JSON output from Nextory authentication - Fix exit() call in epub.py that prevented metadata writing for EpubInParts Refactoring improvements: - Reorganize functions in nextory.py and storytel.py by execution flow - Add section comments for better code organization (Authentication, Main download, Book path, Series path) - Remove duplicate imports (OnlineFile in nextory.py, datetime in storytel.py) - Remove duplicate header update in storytel.py - Improve pagination logic in storytel.py with clearer variable names - Optimize API calls in nextory.py (eliminate redundant call) - Use list comprehensions for cleaner code Documentation: - Update README with Nextory metadata support - Add supported sources comparison table for metadata fields - Include Nextory configuration example
This commit is contained in:
parent
ea7ca71408
commit
4efb963168
38
README.md
38
README.md
@ -80,7 +80,7 @@ The following settings can be added to your config file (before any `[sources.*]
|
||||
|
||||
| Setting | Type | Description | Example |
|
||||
|---------|------|-------------|---------|
|
||||
| `write_metadata_to_epub` | boolean | Automatically write metadata to EPUB files (currently supports Storytel) | `true` or `false` |
|
||||
| `write_metadata_to_epub` | boolean | Automatically write metadata to EPUB files (supports Storytel and Nextory) | `true` or `false` |
|
||||
| `output` | string | Default output path template (supports `~`, environment variables, and template variables) | `"~/ebooks/{title}.{ext}"` |
|
||||
|
||||
### Output Templates
|
||||
@ -96,20 +96,35 @@ The `output` setting supports template variables that are replaced with book met
|
||||
| `{publisher}` | Publisher name | "Orbit" |
|
||||
| `{language}` | Language code | "en" |
|
||||
| `{release_date}` | Release date | "2020-01-15" |
|
||||
| `{ext}` | File extension | "epub" |
|
||||
| `{source}` | Source/service name | "Storytel", "Marvel", etc. |
|
||||
| `{ext}` | File extension (auto-detected from source) | "epub" |
|
||||
|
||||
**Example templates:**
|
||||
```toml
|
||||
# Simple
|
||||
# Simple (auto-detect format)
|
||||
output = "~/books/{title}.{ext}"
|
||||
|
||||
# Organized by series
|
||||
# Force EPUB format
|
||||
output = "~/books/{title}.epub"
|
||||
|
||||
# Organized by source
|
||||
output = "~/books/{source}/{title}.{ext}"
|
||||
|
||||
# Organized by series (auto-detect format)
|
||||
output = "~/books/{series}/{index} - {title}.{ext}"
|
||||
|
||||
# With author
|
||||
output = "~/books/{authors}/{series}/{title}.{ext}"
|
||||
# Force EPUB with series organization
|
||||
output = "~/books/{series}/{index} - {title}.epub"
|
||||
|
||||
# Organized by source and series
|
||||
output = "~/books/{source}/{series}/{index} - {title}.epub"
|
||||
```
|
||||
|
||||
**Note:** The file extension in your template determines the output format:
|
||||
- Use `.epub` to force EPUB output
|
||||
- Use `.cbz` to force CBZ (comic book) output
|
||||
- Use `{ext}` to auto-detect the best format for each source
|
||||
|
||||
**Path expansion:**
|
||||
- `~` expands to home directory
|
||||
- Environment variables work: `$HOME` (Unix) or `%USERPROFILE%` (Windows)
|
||||
@ -157,15 +172,22 @@ grawlix --debug <url>
|
||||
|
||||
## Metadata Writing
|
||||
|
||||
For supported sources (currently Storytel), grawlix can write rich metadata to EPUB files including:
|
||||
For supported sources (Storytel and Nextory), grawlix can write rich metadata to EPUB files including:
|
||||
|
||||
- Title and original title
|
||||
- Authors and translators
|
||||
- Series information (Calibre-compatible)
|
||||
- Publisher, ISBN, language
|
||||
- Description and categories
|
||||
- Description and categories/tags
|
||||
- Release date
|
||||
|
||||
### Supported Sources
|
||||
|
||||
| Source | Title | Authors | Translators | Series | Publisher | ISBN | Language | Description | Release Date |
|
||||
|--------|-------|---------|-------------|--------|-----------|------|----------|-------------|--------------|
|
||||
| Storytel | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
|
||||
| Nextory | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
|
||||
|
||||
Enable globally in config:
|
||||
```toml
|
||||
write_metadata_to_epub = true
|
||||
|
||||
@ -165,18 +165,21 @@ async def download_with_progress(book: Book, progress: Progress, template: str,
|
||||
|
||||
# Write metadata if requested and available
|
||||
if write_metadata and book.source_data:
|
||||
from .output import format_output_location, get_default_format, find_output_format
|
||||
from .output import format_output_location, get_default_format, find_output_format, get_valid_extensions
|
||||
from . import epub_metadata, epub_metadata_writers
|
||||
|
||||
# Determine output file location
|
||||
_, ext = os.path.splitext(template)
|
||||
ext = ext[1:]
|
||||
if ext:
|
||||
|
||||
# Handle {ext} placeholder - use default format for the book type
|
||||
if ext and ext not in ['{ext}', 'ext'] and ext in get_valid_extensions():
|
||||
output_format = find_output_format(book, ext)()
|
||||
else:
|
||||
output_format = get_default_format(book)
|
||||
|
||||
location = format_output_location(book, output_format, template)
|
||||
logging.debug(f"Output location: {location}, exists={os.path.exists(location)}, ends_with_epub={location.endswith('.epub')}")
|
||||
|
||||
# Write metadata if it's an EPUB file
|
||||
if location.endswith('.epub') and os.path.exists(location):
|
||||
|
||||
5
grawlix/assets/errors/unsupported_output_format.txt
Normal file
5
grawlix/assets/errors/unsupported_output_format.txt
Normal file
@ -0,0 +1,5 @@
|
||||
[red]ERROR: Unsupported output format[/red]
|
||||
|
||||
The requested output format is not compatible with the downloaded content type.
|
||||
Please check your output template or try using {{ext}} to auto-detect the correct format.
|
||||
If this error persists, please create an issue at {issue}
|
||||
@ -15,6 +15,7 @@ class Metadata:
|
||||
identifier: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
release_date: Optional[date] = None
|
||||
source: Optional[str] = None
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
return {
|
||||
@ -27,6 +28,7 @@ class Metadata:
|
||||
"authors": "; ".join(self.authors),
|
||||
"description": self.description or "UNKNOWN",
|
||||
"release_date": self.release_date.isoformat() if self.release_date else "UNKNOWN",
|
||||
"source": self.source or "UNKNOWN",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -54,9 +54,54 @@ def storytel_transformer(details: dict) -> dict:
|
||||
return metadata
|
||||
|
||||
|
||||
def nextory_transformer(details: dict) -> dict:
|
||||
"""
|
||||
Transform Nextory book details JSON into standardized EPUB metadata format
|
||||
|
||||
:param details: Nextory book details JSON
|
||||
:return: Standardized metadata dict
|
||||
"""
|
||||
# Extract epub format
|
||||
epub_format = None
|
||||
for fmt in details.get("formats", []):
|
||||
if fmt.get("type") == "epub":
|
||||
epub_format = fmt
|
||||
break
|
||||
|
||||
metadata = {
|
||||
"title": details.get("title"),
|
||||
"authors": [author.get("name", "") for author in details.get("authors", [])],
|
||||
"translators": [translator.get("name", "") for translator in epub_format.get("translators", []) if epub_format],
|
||||
"description": details.get("description_full"),
|
||||
"language": details.get("language"),
|
||||
}
|
||||
|
||||
# Epub-specific metadata
|
||||
if epub_format:
|
||||
metadata["publisher"] = epub_format.get("publisher", {}).get("name")
|
||||
metadata["isbn"] = epub_format.get("isbn")
|
||||
|
||||
publication_date = epub_format.get("publication_date")
|
||||
if publication_date:
|
||||
# Already in YYYY-MM-DD format
|
||||
metadata["release_date"] = publication_date
|
||||
|
||||
# Series info
|
||||
series_info = details.get("series")
|
||||
if series_info:
|
||||
metadata["series_name"] = series_info.get("name")
|
||||
# Nextory uses "volume" at top level, not in series info
|
||||
volume = details.get("volume")
|
||||
if volume:
|
||||
metadata["series_index"] = volume
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
# Registry of transformers by source name
|
||||
TRANSFORMERS = {
|
||||
"storytel": storytel_transformer,
|
||||
"nextory": nextory_transformer,
|
||||
# Add more sources here as they're implemented
|
||||
}
|
||||
|
||||
|
||||
@ -135,4 +135,3 @@ class Epub(OutputFormat):
|
||||
output.add_item(epub.EpubNcx())
|
||||
output.add_item(epub.EpubNav())
|
||||
epub.write_epub(location, output)
|
||||
exit()
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
from grawlix.book import Book, Metadata, OnlineFile, BookData, OnlineFile, SingleFile, EpubInParts, Result, Series
|
||||
from grawlix.book import Book, Metadata, OnlineFile, BookData, EpubInParts, Result, Series
|
||||
from grawlix.encryption import AESEncryption
|
||||
from grawlix.exceptions import InvalidUrl
|
||||
from .source import Source
|
||||
|
||||
from typing import Optional
|
||||
import uuid
|
||||
import rich
|
||||
import base64
|
||||
|
||||
LOCALE = "en_GB"
|
||||
@ -17,12 +16,7 @@ class Nextory(Source):
|
||||
]
|
||||
_authentication_methods = [ "login" ]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _create_device_id() -> str:
|
||||
"""Create unique device id"""
|
||||
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
|
||||
|
||||
# Authentication methods
|
||||
|
||||
async def login(self, url: str, username: str, password: str) -> None:
|
||||
# Set permanent headers
|
||||
@ -30,11 +24,13 @@ class Nextory(Source):
|
||||
self._client.headers.update(
|
||||
{
|
||||
"X-Application-Id": "200",
|
||||
"X-App-Version": "5.4.1",
|
||||
"X-App-Version": "5.47.0",
|
||||
"X-Locale": LOCALE,
|
||||
"X-Model": "Personal Computer",
|
||||
"X-Device-Id": device_id,
|
||||
"X-Os-Info": "Android",
|
||||
"X-OS-INFO": "Personal Computer",
|
||||
"locale": LOCALE,
|
||||
"device": device_id,
|
||||
"appid": "200",
|
||||
}
|
||||
)
|
||||
@ -47,7 +43,6 @@ class Nextory(Source):
|
||||
},
|
||||
)
|
||||
session_response = session_response.json()
|
||||
rich.print(session_response)
|
||||
login_token = session_response["login_token"]
|
||||
country = session_response["country"]
|
||||
self._client.headers.update(
|
||||
@ -62,7 +57,6 @@ class Nextory(Source):
|
||||
"https://api.nextory.com/user/v1/me/profiles",
|
||||
)
|
||||
profiles_response = profiles_response.json()
|
||||
rich.print(profiles_response)
|
||||
profile = profiles_response["profiles"][0]
|
||||
login_key = profile["login_key"]
|
||||
authorize_response = await self._client.post(
|
||||
@ -72,19 +66,24 @@ class Nextory(Source):
|
||||
}
|
||||
)
|
||||
authorize_response = authorize_response.json()
|
||||
rich.print(authorize_response)
|
||||
profile_token = authorize_response["profile_token"]
|
||||
self._client.headers.update({"X-Profile-Token": profile_token})
|
||||
self._client.headers.update({"X-Profile-Token": profile_token})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _find_epub_id(product_data) -> str:
|
||||
"""Find id of book format of type epub for given book"""
|
||||
for format in product_data["formats"]:
|
||||
if format["type"] == "epub":
|
||||
return format["identifier"]
|
||||
raise InvalidUrl
|
||||
def _create_device_id() -> str:
|
||||
"""Create unique device id"""
|
||||
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
|
||||
|
||||
|
||||
# Main download methods
|
||||
|
||||
async def download(self, url: str) -> Result:
|
||||
url_id = self._extract_id_from_url(url)
|
||||
if "serier" in url:
|
||||
return await self._download_series(url_id)
|
||||
else:
|
||||
return await self._download_book(url_id)
|
||||
|
||||
|
||||
@staticmethod
|
||||
@ -107,64 +106,15 @@ class Nextory(Source):
|
||||
return await self._download_book(url_id)
|
||||
|
||||
|
||||
|
||||
async def download_book_from_id(self, book_id: str) -> Book:
|
||||
return await self._download_book(book_id)
|
||||
|
||||
|
||||
async def _download_series(self, series_id: str) -> Series:
|
||||
"""
|
||||
Download series from Nextory
|
||||
|
||||
:param series_id: Id of series on Nextory
|
||||
:returns: Series data
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"https://api.nextory.com/discovery/v1/series/{series_id}/products",
|
||||
params = {
|
||||
"content_type": "book",
|
||||
"page": 0,
|
||||
"per": 100,
|
||||
}
|
||||
)
|
||||
series_data = response.json()
|
||||
book_ids = []
|
||||
for book in series_data["products"]:
|
||||
book_id = book["id"]
|
||||
book_ids.append(book_id)
|
||||
return Series(
|
||||
title = series_data["products"][0]["series"]["name"],
|
||||
book_ids = book_ids,
|
||||
)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _extract_series_name(product_info: dict) -> Optional[str]:
|
||||
if not "series" in product_info:
|
||||
return None
|
||||
return product_info["series"]["name"]
|
||||
|
||||
|
||||
async def _get_book_id_from_url_id(self, url_id: str) -> str:
|
||||
"""
|
||||
Download book id from url id
|
||||
|
||||
:param url_id: Id of book from url
|
||||
:return: Book id
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"https://api.nextory.se/api/app/product/7.5/bookinfo",
|
||||
params = { "id": url_id },
|
||||
)
|
||||
rich.print(response.url)
|
||||
rich.print(response.content)
|
||||
exit()
|
||||
|
||||
# Book download path
|
||||
|
||||
async def _download_book(self, book_id: str) -> Book:
|
||||
product_data = await self._client.get(
|
||||
f"https://api.nextory.com/library/v1/products/{book_id}"
|
||||
)
|
||||
product_data = product_data.json()
|
||||
product_data = await self._get_product_data(book_id)
|
||||
epub_id = self._find_epub_id(product_data)
|
||||
pages = await self._get_pages(epub_id)
|
||||
return Book(
|
||||
@ -173,14 +123,41 @@ class Nextory(Source):
|
||||
title = product_data["title"],
|
||||
authors = [author["name"] for author in product_data["authors"]],
|
||||
series = self._extract_series_name(product_data),
|
||||
)
|
||||
),
|
||||
source_data = {
|
||||
"source_name": "nextory",
|
||||
"details": product_data
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def _get_product_data(self, book_id: str) -> dict:
|
||||
"""
|
||||
Fetch product data from Nextory API
|
||||
|
||||
:param book_id: Id of book (can be URL id or internal id)
|
||||
:return: Product data dictionary
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"https://api.nextory.com/library/v1/products/{book_id}",
|
||||
)
|
||||
return response.json()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _fix_key(value: str) -> bytes:
|
||||
"""Remove unused data and decode key"""
|
||||
return base64.b64decode(value[:-1])
|
||||
def _find_epub_id(product_data) -> str:
|
||||
"""Find id of book format of type epub for given book"""
|
||||
for format in product_data["formats"]:
|
||||
if format["type"] == "epub":
|
||||
return format["identifier"]
|
||||
raise InvalidUrl
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _extract_series_name(product_info: dict) -> Optional[str]:
|
||||
if "series" not in product_info:
|
||||
return None
|
||||
return product_info["series"]["name"]
|
||||
|
||||
|
||||
async def _get_pages(self, epub_id: str) -> BookData:
|
||||
@ -204,15 +181,14 @@ class Nextory(Source):
|
||||
key = self._fix_key(epub_data["crypt_key"]),
|
||||
iv = self._fix_key(epub_data["crypt_iv"])
|
||||
)
|
||||
files = []
|
||||
for part in epub_data["spines"]:
|
||||
files.append(
|
||||
OnlineFile(
|
||||
url = part["spine_url"],
|
||||
extension = "epub",
|
||||
encryption = encryption
|
||||
)
|
||||
files = [
|
||||
OnlineFile(
|
||||
url = part["spine_url"],
|
||||
extension = "epub",
|
||||
encryption = encryption
|
||||
)
|
||||
for part in epub_data["spines"]
|
||||
]
|
||||
files_in_toc = {}
|
||||
for item in epub_data["toc"]["childrens"]: # Why is it "childrens"?
|
||||
files_in_toc[item["src"]] = item["name"]
|
||||
@ -220,3 +196,34 @@ class Nextory(Source):
|
||||
files,
|
||||
files_in_toc
|
||||
)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _fix_key(value: str) -> bytes:
|
||||
"""Remove unused data and decode key"""
|
||||
return base64.b64decode(value[:-1])
|
||||
|
||||
|
||||
# Series download path
|
||||
|
||||
async def _download_series(self, series_id: str) -> Series:
|
||||
"""
|
||||
Download series from Nextory
|
||||
|
||||
:param series_id: Id of series on Nextory
|
||||
:returns: Series data
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"https://api.nextory.com/discovery/v1/series/{series_id}/products",
|
||||
params = {
|
||||
"content_type": "book",
|
||||
"page": 0,
|
||||
"per": 100,
|
||||
}
|
||||
)
|
||||
series_data = response.json()
|
||||
book_ids = [book["id"] for book in series_data["products"]]
|
||||
return Series(
|
||||
title = series_data["products"][0]["series"]["name"],
|
||||
book_ids = book_ids,
|
||||
)
|
||||
|
||||
@ -19,6 +19,57 @@ class Storytel(Source):
|
||||
_authentication_methods = [ "login" ]
|
||||
__download_counter = 0
|
||||
|
||||
# Authentication methods
|
||||
|
||||
async def login(self, username: str, password: str, **kwargs) -> None:
|
||||
self.__username = username
|
||||
self.__password = self.encrypt_password(password)
|
||||
self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"})
|
||||
await self.authenticate()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def encrypt_password(password: str) -> str:
|
||||
"""
|
||||
Encrypt password with predefined keys.
|
||||
This encrypted password is used for login.
|
||||
|
||||
:param password: User defined password
|
||||
:returns: Encrypted password
|
||||
"""
|
||||
# Thanks to https://github.com/javsanpar/storytel-tui
|
||||
key = b"VQZBJ6TD8M9WBUWT"
|
||||
iv = b"joiwef08u23j341a"
|
||||
msg = pad(password.encode(), AES.block_size)
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
cipher_text = cipher.encrypt(msg)
|
||||
return cipher_text.hex()
|
||||
|
||||
|
||||
async def authenticate(self) -> None:
|
||||
"""Authenticate with storytel"""
|
||||
response = await self._client.post(
|
||||
f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false",
|
||||
data = {
|
||||
"uid": self.__username,
|
||||
"pwd": self.__password
|
||||
}
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise SourceNotAuthenticated
|
||||
user_data = response.json()
|
||||
jwt = user_data["accountInfo"]["jwt"]
|
||||
self._client.headers.update({"authorization": f"Bearer {jwt}"})
|
||||
|
||||
|
||||
async def reauthenticate(self) -> None:
|
||||
"""Reauthenticate if required"""
|
||||
if self.__download_counter > 0 and self.__download_counter % 10 == 0:
|
||||
await self.authenticate()
|
||||
|
||||
|
||||
# Main download methods
|
||||
|
||||
async def download(self, url: str) -> Result:
|
||||
await self.reauthenticate()
|
||||
|
||||
@ -36,6 +87,22 @@ class Storytel(Source):
|
||||
raise InvalidUrl
|
||||
|
||||
|
||||
@staticmethod
|
||||
def extract_id_from_url(url: str) -> str:
|
||||
"""
|
||||
Extract id from url
|
||||
|
||||
:param url: Url containing id
|
||||
:return: Id
|
||||
"""
|
||||
parsed = parse_url(url)
|
||||
if parsed.path is None:
|
||||
raise DataNotFound
|
||||
return parsed.path.split("-")[-1]
|
||||
|
||||
|
||||
# Book download path
|
||||
|
||||
async def download_book_from_id(self, book_id: str) -> Book:
|
||||
# Epub location
|
||||
response = await self._client.get(
|
||||
@ -78,8 +145,6 @@ class Storytel(Source):
|
||||
:param details: Book details from Storytel API
|
||||
:return: Metadata object
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
# Extract ebook-specific format data
|
||||
ebook_format = None
|
||||
for fmt in details.get("formats", []):
|
||||
@ -119,10 +184,13 @@ class Storytel(Source):
|
||||
description=description,
|
||||
release_date=release_date,
|
||||
series=series,
|
||||
index=index
|
||||
index=index,
|
||||
source="Storytel"
|
||||
)
|
||||
|
||||
|
||||
# List download path
|
||||
|
||||
async def download_list(self, url: str, list_type: str, language: str) -> Series:
|
||||
"""
|
||||
Download list of books
|
||||
@ -157,23 +225,22 @@ class Storytel(Source):
|
||||
) -> dict[str, Any]:
|
||||
"""Download details about book list
|
||||
|
||||
:param formats: comma serapted list of formats (abook,ebook,podcast)
|
||||
:param languages: comma seperated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl)
|
||||
:param formats: comma separated list of formats (abook,ebook,podcast)
|
||||
:param languages: comma separated list of languages (en,de,tr,ar,ru,pl,it,es,sv,fr,nl)
|
||||
"""
|
||||
nextPageToken = 0
|
||||
# API returns only 10 items per request, so we need to paginate
|
||||
# Start with None to ensure we enter the loop and make the first request
|
||||
result: dict[str, Any] = {"nextPageToken": None}
|
||||
is_first_page = True
|
||||
|
||||
# API returns only 10 items per request
|
||||
# if the nextPageToken
|
||||
result: dict[str, Any] = {"nextPageToken": False}
|
||||
|
||||
while result["nextPageToken"] is not None:
|
||||
while result["nextPageToken"] is not None or is_first_page:
|
||||
params: dict[str, str] = {
|
||||
"includeListDetails": "true", # include listMetadata,filterOptions,sortOption sections
|
||||
"includeFormats": formats,
|
||||
"includeLanguages": languages,
|
||||
"kidsMode": "false",
|
||||
}
|
||||
if result["nextPageToken"]:
|
||||
if result.get("nextPageToken"):
|
||||
params["nextPageToken"] = result["nextPageToken"]
|
||||
|
||||
response = await self._client.get(
|
||||
@ -182,72 +249,12 @@ class Storytel(Source):
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
if result["nextPageToken"] == 0:
|
||||
if is_first_page:
|
||||
result = data
|
||||
is_first_page = False
|
||||
else:
|
||||
result["items"].extend(data["items"])
|
||||
result["nextPageToken"] = data["nextPageToken"]
|
||||
logging.debug(f"{result=}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@staticmethod
|
||||
def extract_id_from_url(url: str) -> str:
|
||||
"""
|
||||
Extract id from url
|
||||
|
||||
:param url: Url containing id
|
||||
:return: Id
|
||||
"""
|
||||
parsed = parse_url(url)
|
||||
if parsed.path is None:
|
||||
raise DataNotFound
|
||||
return parsed.path.split("-")[-1]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def encrypt_password(password: str) -> str:
|
||||
"""
|
||||
Encrypt password with predefined keys.
|
||||
This encrypted password is used for login.
|
||||
|
||||
:param password: User defined password
|
||||
:returns: Encrypted password
|
||||
"""
|
||||
# Thanks to https://github.com/javsanpar/storytel-tui
|
||||
key = b"VQZBJ6TD8M9WBUWT"
|
||||
iv = b"joiwef08u23j341a"
|
||||
msg = pad(password.encode(), AES.block_size)
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
cipher_text = cipher.encrypt(msg)
|
||||
return cipher_text.hex()
|
||||
|
||||
|
||||
async def reauthenticate(self) -> None:
|
||||
"""Reauthenticate if required"""
|
||||
if self.__download_counter > 0 and self.__download_counter % 10 == 0:
|
||||
await self.authenticate()
|
||||
|
||||
|
||||
async def authenticate(self) -> None:
|
||||
"""Authenticate with storytel"""
|
||||
response = await self._client.post(
|
||||
f"https://www.storytel.com/api/login.action?m=1&token=guestsv&userid=-1&version=23.49&terminal=android&locale=sv&deviceId=995f2562-0e44-4410-b1b9-8d08261f33c4&kidsMode=false",
|
||||
data = {
|
||||
"uid": self.__username,
|
||||
"pwd": self.__password
|
||||
}
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise SourceNotAuthenticated
|
||||
user_data = response.json()
|
||||
jwt = user_data["accountInfo"]["jwt"]
|
||||
self._client.headers.update({"authorization": f"Bearer {jwt}"})
|
||||
|
||||
|
||||
async def login(self, username: str, password: str, **kwargs) -> None:
|
||||
self.__username = username
|
||||
self.__password = self.encrypt_password(password)
|
||||
self._client.headers.update({"User-Agent": "Storytel/23.49 (Android 13; Pixel 6) Release/2288481"})
|
||||
await self.authenticate()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user