Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import hashlib 

2import json 

3import logging 

4import math 

5import multiprocessing 

6import os 

7import pickle 

8import random 

9import string 

10import tempfile 

11from datetime import datetime, timedelta 

12from pathlib import Path 

13from time import sleep 

14from typing import ( 

15 Any, 

16 cast, 

17 Dict, 

18 Iterable, 

19 List, 

20 Optional, 

21 Sequence, 

22 Set, 

23 Tuple, 

24 Union, 

25) 

26from urllib.parse import urlencode, urlparse 

27 

28import requests 

29import semver 

30from gi.repository import Gtk 

31 

32from sublime_music.util import resolve_path 

33 

34from .api_objects import Directory, Response 

35from .. import ( 

36 Adapter, 

37 AlbumSearchQuery, 

38 api_objects as API, 

39 ConfigParamDescriptor, 

40 ConfigurationStore, 

41 ConfigureServerForm, 

42 UIInfo, 

43) 

44 

45try: 

46 import gi 

47 

48 gi.require_version("NM", "1.0") 

49 from gi.repository import NM 

50 

51 networkmanager_imported = True 

52except Exception: 

53 # I really don't care what kind of exception it is, all that matters is the 

54 # import failed for some reason. 

55 networkmanager_imported = False 

56 

57REQUEST_DELAY: Optional[Tuple[float, float]] = None 

58if delay_str := os.environ.get("REQUEST_DELAY"): 

59 if "," in delay_str: 

60 high, low = map(float, delay_str.split(",")) 

61 REQUEST_DELAY = (high, low) 

62 else: 

63 REQUEST_DELAY = (float(delay_str), float(delay_str)) 

64 

65NETWORK_ALWAYS_ERROR: bool = False 

66if always_error := os.environ.get("NETWORK_ALWAYS_ERROR"): 

67 NETWORK_ALWAYS_ERROR = True 

68 

69 

70class ServerError(Exception): 

71 def __init__(self, status_code: int, message: str): 

72 self.status_code = status_code 

73 super().__init__(message) 

74 

75 

76class SubsonicAdapter(Adapter): 

77 """ 

78 Defines an adapter which retrieves its data from a Subsonic server 

79 """ 

80 

81 # Configuration and Initialization Properties 

82 # ================================================================================== 

83 @staticmethod 

84 def get_ui_info() -> UIInfo: 

85 return UIInfo( 

86 name="Subsonic", 

87 description="Connect to a Subsonic-compatible server", 

88 icon_basename="subsonic", 

89 icon_dir=resolve_path("adapters/subsonic/icons"), 

90 ) 

91 

92 @staticmethod 

93 def get_configuration_form(config_store: ConfigurationStore) -> Gtk.Box: 

94 configs = { 

95 "server_address": ConfigParamDescriptor(str, "Server Address"), 

96 "username": ConfigParamDescriptor(str, "Username"), 

97 "password": ConfigParamDescriptor("password", "Password"), 

98 "verify_cert": ConfigParamDescriptor( 

99 bool, 

100 "Verify Certificate", 

101 default=True, 

102 advanced=True, 

103 helptext="Whether or not to verify the SSL certificate of the server.", 

104 ), 

105 "sync_enabled": ConfigParamDescriptor( 

106 bool, 

107 "Sync Play Queue", 

108 default=True, 

109 advanced=True, 

110 helptext="If toggled, Sublime Music will periodically save the play " 

111 "queue state so that you can resume on other devices.", 

112 ), 

113 "salt_auth": ConfigParamDescriptor( 

114 bool, 

115 "Use Salt Authentication", 

116 default=True, 

117 advanced=True, 

118 helptext="If toggled, Sublime Music will use salted hash tokens " 

119 "instead of the plain password in the request urls (only supported on " 

120 "Subsonic API 1.13.0+)", 

121 ), 

122 } 

123 

124 if networkmanager_imported: 

125 configs.update( 

126 { 

127 "local_network_ssid": ConfigParamDescriptor( 

128 str, 

129 "Local Network SSID", 

130 advanced=True, 

131 required=False, 

132 helptext="If Sublime Music is connected to the given SSID, the " 

133 "Local Network Address will be used instead of the Server " 

134 "address when making network requests.", 

135 ), 

136 "local_network_address": ConfigParamDescriptor( 

137 str, 

138 "Local Network Address", 

139 advanced=True, 

140 required=False, 

141 helptext="If Sublime Music is connected to the given Local " 

142 "Network SSID, this URL will be used instead of the Server " 

143 "address when making network requests.", 

144 ), 

145 } 

146 ) 

147 

148 def verify_configuration() -> Dict[str, Optional[str]]: 

149 errors: Dict[str, Optional[str]] = {} 

150 

151 with tempfile.TemporaryDirectory() as tmp_dir_name: 

152 try: 

153 tmp_adapter = SubsonicAdapter(config_store, Path(tmp_dir_name)) 

154 tmp_adapter._get_json( 

155 tmp_adapter._make_url("ping"), 

156 timeout=2, 

157 is_exponential_backoff_ping=True, 

158 ) 

159 except requests.exceptions.SSLError: 

160 errors["__ping__"] = ( 

161 "<b>Error connecting to the server.</b>\n" 

162 "An SSL error occurred while connecting to the server.\n" 

163 "You may need to explicitly specify http://." 

164 ) 

165 except requests.ConnectionError: 

166 errors["__ping__"] = ( 

167 "<b>Unable to connect to the server.</b>\n" 

168 "Double check the server address." 

169 ) 

170 except ServerError as e: 

171 if e.status_code in (10, 40, 41) and config_store["salt_auth"]: 

172 # status code 10: if salt auth is not enabled, server will 

173 # return error server error with status_code 10 since it'll 

174 # interpret it as a missing (password) parameter 

175 # status code 41: returned by ampache 

176 # status code 41: as per subsonic api docs, description of 

177 # status_code 41 is "Token authentication not supported for 

178 # LDAP users." so fall back to password auth 

179 try: 

180 config_store["salt_auth"] = False 

181 tmp_adapter = SubsonicAdapter( 

182 config_store, Path(tmp_dir_name) 

183 ) 

184 tmp_adapter._get_json( 

185 tmp_adapter._make_url("ping"), 

186 timeout=2, 

187 is_exponential_backoff_ping=True, 

188 ) 

189 logging.warn( 

190 "Salted auth not supported, falling back to regular " 

191 "password auth" 

192 ) 

193 except ServerError as retry_e: 

194 config_store["salt_auth"] = True 

195 errors["__ping__"] = ( 

196 "<b>Error connecting to the server.</b>\n" 

197 f"Error {retry_e.status_code}: {str(retry_e)}" 

198 ) 

199 else: 

200 errors["__ping__"] = ( 

201 "<b>Error connecting to the server.</b>\n" 

202 f"Error {e.status_code}: {str(e)}" 

203 ) 

204 except Exception as e: 

205 errors["__ping__"] = str(e) 

206 

207 return errors 

208 

209 return ConfigureServerForm(config_store, configs, verify_configuration) 

210 

211 @staticmethod 

212 def migrate_configuration(config_store: ConfigurationStore): 

213 if "salt_auth" not in config_store: 

214 config_store["salt_auth"] = True 

215 

216 def __init__(self, config: ConfigurationStore, data_directory: Path): 

217 self.data_directory = data_directory 

218 self.ignored_articles_cache_file = self.data_directory.joinpath( 

219 "ignored_articles.pickle" 

220 ) 

221 

222 self.hostname = config["server_address"] 

223 if ( 

224 (ssid := config.get("local_network_ssid")) 

225 and (lan_address := config.get("local_network_address")) 

226 and networkmanager_imported 

227 ): 

228 networkmanager_client = NM.Client.new() 

229 

230 # Only look at the active WiFi connections. 

231 for ac in networkmanager_client.get_active_connections(): 

232 if ac.get_connection_type() != "802-11-wireless": 

233 continue 

234 devs = ac.get_devices() 

235 if len(devs) != 1: 

236 continue 

237 if devs[0].get_device_type() != NM.DeviceType.WIFI: 

238 continue 

239 

240 # If connected to the Local Network SSID, then change the hostname to 

241 # the Local Network Address. 

242 if ssid == ac.get_id(): 

243 self.hostname = lan_address 

244 break 

245 

246 parsed_hostname = urlparse(self.hostname) 

247 if not parsed_hostname.scheme: 

248 self.hostname = "https://" + self.hostname 

249 

250 self.username = config["username"] 

251 self.password = cast(str, config.get_secret("password")) 

252 self.verify_cert = config["verify_cert"] 

253 self.use_salt_auth = config["salt_auth"] 

254 

255 self.is_shutting_down = False 

256 self._ping_process: Optional[multiprocessing.Process] = None 

257 self._version = multiprocessing.Array("c", 20) 

258 self._offline_mode = False 

259 

260 # TODO (#112): support XML? 

261 

262 def initial_sync(self): 

263 # Try to ping the server five times using exponential backoff (2^5 = 32s). 

264 self._exponential_backoff(5) 

265 

266 def shutdown(self): 

267 if self._ping_process: 

268 self._ping_process.terminate() 

269 

270 # Availability Properties 

271 # ================================================================================== 

272 _server_available = multiprocessing.Value("b", False) 

273 _last_ping_timestamp = multiprocessing.Value("d", 0.0) 

274 

275 def _exponential_backoff(self, n: int): 

276 logging.info(f"Starting Exponential Backoff: n={n}") 

277 if self._ping_process: 

278 self._ping_process.terminate() 

279 

280 self._ping_process = multiprocessing.Process( 

281 target=self._check_ping_thread, args=(n,) 

282 ) 

283 self._ping_process.start() 

284 

285 def _check_ping_thread(self, n: int): 

286 i = 0 

287 while i < n and not self._offline_mode and not self._server_available.value: 

288 try: 

289 self._set_ping_status(timeout=2 * (i + 1)) 

290 except Exception: 

291 pass 

292 sleep(2 ** i) 

293 i += 1 

294 

295 def _set_ping_status(self, timeout: int = 2): 

296 logging.info(f"SET PING STATUS timeout={timeout}") 

297 now = datetime.now().timestamp() 

298 if now - self._last_ping_timestamp.value < 15: 

299 return 

300 

301 # Try to ping the server. 

302 self._get_json( 

303 self._make_url("ping"), 

304 timeout=timeout, 

305 is_exponential_backoff_ping=True, 

306 ) 

307 

308 def on_offline_mode_change(self, offline_mode: bool): 

309 self._offline_mode = offline_mode 

310 

311 @property 

312 def ping_status(self) -> bool: 

313 return self._server_available.value 

314 

315 can_create_playlist = True 

316 can_delete_playlist = True 

317 can_get_album = True 

318 can_get_albums = True 

319 can_get_artist = True 

320 can_get_artists = True 

321 can_get_cover_art_uri = True 

322 can_get_directory = True 

323 can_get_ignored_articles = True 

324 can_get_playlist_details = True 

325 can_get_playlists = True 

326 can_get_song_details = True 

327 can_get_song_file_uri = True 

328 can_get_song_stream_uri = True 

329 can_scrobble_song = True 

330 can_search = True 

331 can_stream = True 

332 can_update_playlist = True 

333 

334 def version_at_least(self, version: str) -> bool: 

335 if not self._version.value: 

336 return False 

337 return semver.VersionInfo.parse(self._version.value.decode()) >= version 

338 

339 @property 

340 def can_get_genres(self) -> bool: 

341 return self.version_at_least("1.9.0") 

342 

343 @property 

344 def can_get_play_queue(self) -> bool: 

345 return self.version_at_least("1.12.0") 

346 

347 @property 

348 def can_save_play_queue(self) -> bool: 

349 return self.version_at_least("1.12.0") 

350 

351 _schemes = None 

352 

353 @property 

354 def supported_schemes(self) -> Iterable[str]: 

355 if not self._schemes: 

356 self._schemes = (urlparse(self.hostname)[0],) 

357 return self._schemes 

358 

359 @property 

360 def supported_artist_query_types(self) -> Set[AlbumSearchQuery.Type]: 

361 supported = { 

362 AlbumSearchQuery.Type.RANDOM, 

363 AlbumSearchQuery.Type.NEWEST, 

364 AlbumSearchQuery.Type.FREQUENT, 

365 AlbumSearchQuery.Type.RECENT, 

366 AlbumSearchQuery.Type.STARRED, 

367 AlbumSearchQuery.Type.ALPHABETICAL_BY_NAME, 

368 AlbumSearchQuery.Type.ALPHABETICAL_BY_ARTIST, 

369 } 

370 if self.version_at_least("1.10.1"): 

371 supported.add(AlbumSearchQuery.Type.YEAR_RANGE) 

372 supported.add(AlbumSearchQuery.Type.GENRE) 

373 

374 return supported 

375 

376 # Helper mothods for making requests 

377 # ================================================================================== 

378 def _get_params(self) -> Dict[str, str]: 

379 """ 

380 Gets the parameters that are needed for all requests to the Subsonic API. See 

381 Subsonic API Introduction for details. 

382 """ 

383 params = { 

384 "u": self.username, 

385 "c": "Sublime Music", 

386 "f": "json", 

387 "v": self._version.value.decode() or "1.8.0", 

388 } 

389 

390 if self.use_salt_auth: 

391 salt, token = self._generate_auth_token() 

392 params["s"] = salt 

393 params["t"] = token 

394 else: 

395 params["p"] = self.password 

396 

397 return params 

398 

399 def _generate_auth_token(self) -> Tuple[str, str]: 

400 """ 

401 Generates the necessary authentication data to call the Subsonic API See the 

402 Authentication section of www.subsonic.org/pages/api.jsp for more information 

403 """ 

404 salt = "".join(random.choices(string.ascii_letters + string.digits, k=8)) 

405 token = hashlib.md5(f"{self.password}{salt}".encode()).hexdigest() 

406 return (salt, token) 

407 

408 def _make_url(self, endpoint: str) -> str: 

409 return f"{self.hostname}/rest/{endpoint}.view" 

410 

411 # TODO (#196) figure out some way of rate limiting requests. They often come in too 

412 # fast. 

413 def _get( 

414 self, 

415 url: str, 

416 timeout: Union[float, Tuple[float, float], None] = None, 

417 is_exponential_backoff_ping: bool = False, 

418 **params, 

419 ) -> Any: 

420 params = {**self._get_params(), **params} 

421 logging.info(f"[START] get: {url}") 

422 

423 try: 

424 if REQUEST_DELAY is not None: 

425 delay = random.uniform(*REQUEST_DELAY) 

426 logging.info(f"REQUEST_DELAY enabled. Pausing for {delay} seconds") 

427 sleep(delay) 

428 if timeout: 

429 if type(timeout) == tuple: 

430 if delay > cast(Tuple[float, float], timeout)[0]: 

431 raise TimeoutError("DUMMY TIMEOUT ERROR") 

432 else: 

433 if delay > cast(float, timeout): 

434 raise TimeoutError("DUMMY TIMEOUT ERROR") 

435 

436 if NETWORK_ALWAYS_ERROR: 

437 raise ServerError(69, "NETWORK_ALWAYS_ERROR enabled") 

438 

439 # Deal with datetime parameters (convert to milliseconds since 1970) 

440 for k, v in params.items(): 

441 if isinstance(v, datetime): 

442 params[k] = int(v.timestamp() * 1000) 

443 

444 if self._is_mock: 

445 logging.info("Using mock data") 

446 result = self._get_mock_data() 

447 else: 

448 if url.startswith("http://") or url.startswith("https://"): 

449 result = requests.get( 

450 url, 

451 params=params, 

452 verify=self.verify_cert, 

453 timeout=timeout, 

454 ) 

455 else: 

456 # if user creates a serverconf address w/o protocol, we'll 

457 # attempt to fix it and store it in hostname 

458 # TODO (#305) #hostname currently preprends https:// if 

459 # protocol isn't defined this might be able to be taken out 

460 try: 

461 logging.info("Hostname: %r has no protocol", self.hostname) 

462 result = requests.get( 

463 "https://" + url, 

464 params=params, 

465 verify=self.verify_cert, 

466 timeout=timeout, 

467 ) 

468 self.hostname = "https://" + url.split("/")[0] 

469 except Exception: 

470 result = requests.get( 

471 "http://" + url, 

472 params=params, 

473 verify=self.verify_cert, 

474 timeout=timeout, 

475 ) 

476 

477 self.hostname = "http://" + url.split("/")[0] 

478 

479 if result.status_code != 200: 

480 raise ServerError( 

481 result.status_code, f"{url} returned status={result.status_code}." 

482 ) 

483 # Any time that a server request succeeds, then we win. 

484 self._server_available.value = True 

485 self._last_ping_timestamp.value = datetime.now().timestamp() 

486 

487 except Exception: 

488 logging.exception(f"[FAIL] get: {url} failed") 

489 self._server_available.value = False 

490 self._last_ping_timestamp.value = datetime.now().timestamp() 

491 if not is_exponential_backoff_ping: 

492 self._exponential_backoff(5) 

493 raise 

494 

495 logging.info(f"[FINISH] get: {url}") 

496 return result 

497 

498 def _get_json( 

499 self, 

500 url: str, 

501 timeout: Union[float, Tuple[float, float], None] = None, 

502 is_exponential_backoff_ping: bool = False, 

503 **params: Union[None, str, datetime, int, Sequence[int], Sequence[str]], 

504 ) -> Response: 

505 """ 

506 Make a get request to a *Sonic REST API. Handle all types of errors including 

507 *Sonic ``<error>`` responses. 

508 

509 :returns: a dictionary of the subsonic response. 

510 :raises Exception: needs some work 

511 """ 

512 result = self._get( 

513 url, 

514 timeout=timeout, 

515 is_exponential_backoff_ping=is_exponential_backoff_ping, 

516 **params, 

517 ) 

518 subsonic_response = result.json().get("subsonic-response") 

519 

520 if not subsonic_response: 

521 raise ServerError(500, f"{url} returned invalid JSON.") 

522 

523 if subsonic_response["status"] != "ok": 

524 raise ServerError( 

525 subsonic_response["error"].get("code"), 

526 subsonic_response["error"].get("message"), 

527 ) 

528 

529 self._version.value = subsonic_response["version"].encode() 

530 

531 logging.debug(f"Response from {url}: {subsonic_response}") 

532 return Response.from_dict(subsonic_response) 

533 

534 # Helper Methods for Testing 

535 _get_mock_data: Any = None 

536 _is_mock: bool = False 

537 

538 def _set_mock_data(self, data: Any): 

539 class MockResult: 

540 status_code = 200 

541 

542 def __init__(self, content: Any): 

543 self._content = content 

544 

545 def content(self) -> Any: 

546 return self._content 

547 

548 def json(self) -> Any: 

549 return json.loads(self._content) 

550 

551 def get_mock_data() -> Any: 

552 if type(data) == Exception: 

553 raise data 

554 if hasattr(data, "__next__"): 

555 if d := next(data): 

556 logging.info("MOCK DATA: %s", d) 

557 return MockResult(d) 

558 

559 logging.info("MOCK DATA: %s", data) 

560 return MockResult(data) 

561 

562 self._get_mock_data = get_mock_data 

563 

564 # Data Retrieval Methods 

565 # ================================================================================== 

566 def get_playlists(self) -> Sequence[API.Playlist]: 

567 if playlists := self._get_json(self._make_url("getPlaylists")).playlists: 

568 return sorted(playlists.playlist, key=lambda p: p.name.lower()) 

569 return [] 

570 

571 def get_playlist_details(self, playlist_id: str) -> API.Playlist: 

572 result = self._get_json(self._make_url("getPlaylist"), id=playlist_id).playlist 

573 assert result, f"Error getting playlist {playlist_id}" 

574 return result 

575 

576 def create_playlist( 

577 self, name: str, songs: Sequence[API.Song] = None 

578 ) -> Optional[API.Playlist]: 

579 return self._get_json( 

580 self._make_url("createPlaylist"), 

581 name=name, 

582 songId=[s.id for s in songs or []], 

583 ).playlist 

584 

585 def update_playlist( 

586 self, 

587 playlist_id: str, 

588 name: str = None, 

589 comment: str = None, 

590 public: bool = None, 

591 song_ids: Sequence[str] = None, 

592 append_song_ids: Sequence[str] = None, 

593 ) -> API.Playlist: 

594 if any(x is not None for x in (name, comment, public, append_song_ids)): 

595 self._get_json( 

596 self._make_url("updatePlaylist"), 

597 playlistId=playlist_id, 

598 name=name, 

599 comment=comment, 

600 public=public, 

601 songIdToAdd=append_song_ids, 

602 ) 

603 

604 playlist = None 

605 if song_ids is not None: 

606 playlist = self._get_json( 

607 self._make_url("createPlaylist"), 

608 playlistId=playlist_id, 

609 songId=song_ids, 

610 ).playlist 

611 

612 # If the call to createPlaylist to update the song IDs returned the playlist, 

613 # return it. 

614 return playlist or self.get_playlist_details(playlist_id) 

615 

616 def delete_playlist(self, playlist_id: str): 

617 self._get_json(self._make_url("deletePlaylist"), id=playlist_id) 

618 

619 def get_cover_art_uri(self, cover_art: str, scheme: str, size: int) -> str: 

620 # Some servers return a full URL instead of an ID 

621 if cover_art.startswith("http://") or cover_art.startswith("https://"): 

622 return cover_art 

623 

624 params = {"id": cover_art, "size": size, **self._get_params()} 

625 return self._make_url("getCoverArt") + "?" + urlencode(params) 

626 

627 def get_song_file_uri(self, song_id: str, schemes: Iterable[str]) -> str: 

628 assert any(s in schemes for s in self.supported_schemes) 

629 params = {"id": song_id, **self._get_params()} 

630 return self._make_url("download") + "?" + urlencode(params) 

631 

632 def get_song_stream_uri(self, song_id: str) -> str: 

633 params = {"id": song_id, **self._get_params()} 

634 return self._make_url("stream") + "?" + urlencode(params) 

635 

636 def get_song_details(self, song_id: str) -> API.Song: 

637 song = self._get_json(self._make_url("getSong"), id=song_id).song 

638 assert song, f"Error getting song {song_id}" 

639 return song 

640 

641 def scrobble_song(self, song: API.Song): 

642 self._get(self._make_url("scrobble"), id=song.id) 

643 

644 def get_artists(self) -> Sequence[API.Artist]: 

645 if artist_index := self._get_json(self._make_url("getArtists")).artists: 

646 with open(self.ignored_articles_cache_file, "wb+") as f: 

647 pickle.dump(artist_index.ignored_articles, f) 

648 

649 artists = [] 

650 for index in artist_index.index: 

651 artists.extend(index.artist) 

652 return cast(Sequence[API.Artist], artists) 

653 return [] 

654 

655 def get_artist(self, artist_id: str) -> API.Artist: 

656 artist = self._get_json(self._make_url("getArtist"), id=artist_id).artist 

657 assert artist, f"Error getting artist {artist_id}" 

658 if self.version_at_least("1.11.0"): 

659 try: 

660 artist_info = self._get_json( 

661 self._make_url("getArtistInfo2"), id=artist_id 

662 ) 

663 artist.augment_with_artist_info(artist_info.artist_info) 

664 except Exception: 

665 pass 

666 return artist 

667 

668 def get_ignored_articles(self) -> Set[str]: 

669 ignored_articles = "The El La Los Las Le Les" 

670 try: 

671 # If we already got the ignored articles from the get_artists, do that here. 

672 with open(self.ignored_articles_cache_file, "rb+") as f: 

673 if ia := pickle.load(f): 

674 ignored_articles = ia 

675 except Exception: 

676 try: 

677 # Whatever the exception, fall back on getting from the server. 

678 if artists := self._get_json(self._make_url("getArtists")).artists: 

679 if ia := artists.ignored_articles: 

680 ignored_articles = ia 

681 except Exception: 

682 # Use the default ignored articles. 

683 pass 

684 

685 return set(ignored_articles.split()) 

686 

687 def get_albums( 

688 self, query: AlbumSearchQuery, sort_direction: str = "ascending" 

689 ) -> Sequence[API.Album]: 

690 type_ = { 

691 AlbumSearchQuery.Type.RANDOM: "random", 

692 AlbumSearchQuery.Type.NEWEST: "newest", 

693 AlbumSearchQuery.Type.FREQUENT: "frequent", 

694 AlbumSearchQuery.Type.RECENT: "recent", 

695 AlbumSearchQuery.Type.STARRED: "starred", 

696 AlbumSearchQuery.Type.ALPHABETICAL_BY_NAME: "alphabeticalByName", 

697 AlbumSearchQuery.Type.ALPHABETICAL_BY_ARTIST: "alphabeticalByArtist", 

698 AlbumSearchQuery.Type.YEAR_RANGE: "byYear", 

699 AlbumSearchQuery.Type.GENRE: "byGenre", 

700 }[query.type] 

701 

702 extra_args: Dict[str, Any] = {} 

703 if query.type == AlbumSearchQuery.Type.YEAR_RANGE: 

704 assert (year_range := query.year_range) 

705 extra_args = { 

706 "fromYear": year_range[0], 

707 "toYear": year_range[1], 

708 } 

709 elif query.type == AlbumSearchQuery.Type.GENRE: 

710 assert (genre := query.genre) 

711 extra_args = {"genre": genre.name} 

712 

713 albums: List[API.Album] = [] 

714 page_size = 50 if query.type == AlbumSearchQuery.Type.RANDOM else 500 

715 offset = 0 

716 

717 def get_page(offset: int) -> Sequence[API.Album]: 

718 album_list = self._get_json( 

719 self._make_url("getAlbumList2"), 

720 type=type_, 

721 size=page_size, 

722 offset=offset, 

723 **extra_args, 

724 ).albums 

725 return album_list.album if album_list else [] 

726 

727 # Get all pages. 

728 while len(next_page := get_page(offset)) > 0: 

729 albums.extend(next_page) 

730 if query.type == AlbumSearchQuery.Type.RANDOM: 

731 break 

732 offset += page_size 

733 

734 return albums 

735 

736 def get_album(self, album_id: str) -> API.Album: 

737 album = self._get_json(self._make_url("getAlbum"), id=album_id).album 

738 assert album, f"Error getting album {album_id}" 

739 return album 

740 

741 def _get_indexes(self) -> API.Directory: 

742 indexes = self._get_json(self._make_url("getIndexes")).indexes 

743 assert indexes, "Error getting indexes" 

744 with open(self.ignored_articles_cache_file, "wb+") as f: 

745 pickle.dump(indexes.ignored_articles, f) 

746 

747 root_dir_items: List[Dict[str, Any]] = [] 

748 for index in indexes.index: 

749 root_dir_items.extend(map(lambda x: {**x, "isDir": True}, index.artist)) 

750 return Directory(id="root", _children=root_dir_items) 

751 

752 def get_directory(self, directory_id: str) -> API.Directory: 

753 if directory_id == "root": 

754 return self._get_indexes() 

755 

756 # TODO (#187) make sure to filter out all non-song files 

757 directory = self._get_json( 

758 self._make_url("getMusicDirectory"), id=directory_id 

759 ).directory 

760 assert directory, f"Error getting directory {directory_id}" 

761 return directory 

762 

763 def get_genres(self) -> Sequence[API.Genre]: 

764 if genres := self._get_json(self._make_url("getGenres")).genres: 

765 return genres.genre 

766 return [] 

767 

768 def get_play_queue(self) -> Optional[API.PlayQueue]: 

769 return self._get_json(self._make_url("getPlayQueue")).play_queue 

770 

771 def save_play_queue( 

772 self, 

773 song_ids: Sequence[str], 

774 current_song_index: int = None, 

775 position: timedelta = None, 

776 ): 

777 # TODO (sonic-extensions-api/specification#1) make an extension that allows you 

778 # to save the play queue position by index instead of id. 

779 self._get( 

780 self._make_url("savePlayQueue"), 

781 id=song_ids, 

782 timeout=2, 

783 current=song_ids[current_song_index] 

784 if current_song_index is not None 

785 else None, 

786 position=math.floor(position.total_seconds() * 1000) if position else None, 

787 ) 

788 

789 def search(self, query: str) -> API.SearchResult: 

790 result = self._get_json(self._make_url("search3"), query=query).search_result 

791 if not result: 

792 return API.SearchResult(query) 

793 

794 search_result = API.SearchResult(query) 

795 search_result.add_results("albums", result.album) 

796 search_result.add_results("artists", result.artist) 

797 search_result.add_results("songs", result.song) 

798 return search_result