Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import functools 

2import re 

3from datetime import timedelta 

4from typing import ( 

5 Any, 

6 Callable, 

7 cast, 

8 Iterable, 

9 List, 

10 Match, 

11 Optional, 

12 Tuple, 

13 Union, 

14) 

15 

16from deepdiff import DeepDiff 

17from gi.repository import Gdk, GLib, Gtk 

18 

19from ..adapters import AdapterManager, CacheMissError, Result, SongCacheStatus 

20from ..adapters.api_objects import Playlist, Song 

21from ..config import AppConfiguration 

22 

23deep_diff_exclude_regexp = re.compile(r"root\[\d+\]\.props") 

24 

25 

26def format_song_duration(duration_secs: Union[int, timedelta, None]) -> str: 

27 """ 

28 Formats the song duration as mins:seconds with the seconds being 

29 zero-padded if necessary. 

30 

31 >>> format_song_duration(80) 

32 '1:20' 

33 >>> format_song_duration(62) 

34 '1:02' 

35 >>> format_song_duration(timedelta(seconds=68.2)) 

36 '1:08' 

37 >>> format_song_duration(None) 

38 '-:--' 

39 """ 

40 if isinstance(duration_secs, timedelta): 

41 duration_secs = round(duration_secs.total_seconds()) 

42 if duration_secs is None: 

43 return "-:--" 

44 

45 duration_secs = max(duration_secs, 0) 

46 

47 return f"{duration_secs // 60}:{duration_secs % 60:02}" 

48 

49 

50def pluralize(string: str, number: int, pluralized_form: str = None) -> str: 

51 """ 

52 Pluralize the given string given the count as a number. 

53 

54 >>> pluralize('foo', 1) 

55 'foo' 

56 >>> pluralize('foo', 2) 

57 'foos' 

58 >>> pluralize('foo', 0) 

59 'foos' 

60 """ 

61 if number != 1: 

62 return pluralized_form or f"{string}s" 

63 return string 

64 

65 

66def format_sequence_duration(duration: Optional[timedelta]) -> str: 

67 """ 

68 Formats duration in English. 

69 

70 >>> format_sequence_duration(timedelta(seconds=90)) 

71 '1 minute, 30 seconds' 

72 >>> format_sequence_duration(timedelta(seconds=(60 * 60 + 120))) 

73 '1 hour, 2 minutes' 

74 >>> format_sequence_duration(None) 

75 '0 seconds' 

76 """ 

77 duration_secs = round(duration.total_seconds()) if duration else 0 

78 duration_mins = (duration_secs // 60) % 60 

79 duration_hrs = duration_secs // 60 // 60 

80 duration_secs = duration_secs % 60 

81 

82 format_components = [] 

83 if duration_hrs > 0: 

84 hrs = "{} {}".format(duration_hrs, pluralize("hour", duration_hrs)) 

85 format_components.append(hrs) 

86 

87 if duration_mins > 0: 

88 mins = "{} {}".format(duration_mins, pluralize("minute", duration_mins)) 

89 format_components.append(mins) 

90 

91 # Show seconds if there are no hours. 

92 if duration_hrs == 0: 

93 secs = "{} {}".format(duration_secs, pluralize("second", duration_secs)) 

94 format_components.append(secs) 

95 

96 return ", ".join(format_components) 

97 

98 

99def dot_join(*items: Any) -> str: 

100 """ 

101 Joins the given strings with a dot character. Filters out ``None`` values. 

102 

103 >>> dot_join(None, "foo", "bar", None, "baz") 

104 'foo • bar • baz' 

105 """ 

106 return " • ".join(map(str, filter(lambda x: x is not None, items))) 

107 

108 

109def get_cached_status_icons(song_ids: List[str]) -> List[str]: 

110 cache_icon = { 

111 SongCacheStatus.CACHED: "folder-download-symbolic", 

112 SongCacheStatus.PERMANENTLY_CACHED: "view-pin-symbolic", 

113 SongCacheStatus.DOWNLOADING: "emblem-synchronizing-symbolic", 

114 } 

115 return [ 

116 cache_icon.get(cache_status, "") 

117 for cache_status in AdapterManager.get_cached_statuses(song_ids) 

118 ] 

119 

120 

121def _parse_diff_location(location: str) -> Tuple: 

122 """ 

123 Parses a diff location as returned by deepdiff. 

124 

125 >>> _parse_diff_location("root[22]") 

126 ('22',) 

127 >>> _parse_diff_location("root[22][4]") 

128 ('22', '4') 

129 >>> _parse_diff_location("root[22].foo") 

130 ('22', 'foo') 

131 """ 

132 match = re.match(r"root\[(\d*)\](?:\[(\d*)\]|\.(.*))?", location) 

133 return tuple(g for g in cast(Match, match).groups() if g is not None) 

134 

135 

136def diff_song_store(store_to_edit: Any, new_store: Iterable[Any]): 

137 """ 

138 Diffing song stores is nice, because we can easily make edits by modifying 

139 the underlying store. 

140 """ 

141 old_store = [row[:] for row in store_to_edit] 

142 

143 # Diff the lists to determine what needs to be changed. 

144 diff = DeepDiff(old_store, new_store) 

145 changed = diff.get("values_changed", {}) 

146 added = diff.get("iterable_item_added", {}) 

147 removed = diff.get("iterable_item_removed", {}) 

148 

149 for edit_location, diff in changed.items(): 

150 idx, field = _parse_diff_location(edit_location) 

151 store_to_edit[int(idx)][int(field)] = diff["new_value"] 

152 

153 for remove_location, _ in reversed(list(removed.items())): 

154 remove_at = int(_parse_diff_location(remove_location)[0]) 

155 del store_to_edit[remove_at] 

156 

157 for _, value in added.items(): 

158 store_to_edit.append(value) 

159 

160 

161def diff_model_store(store_to_edit: Any, new_store: Iterable[Any]): 

162 """ 

163 The diff here is that if there are any differences, then we refresh the 

164 entire list. This is because it is too hard to do editing. 

165 """ 

166 old_store = store_to_edit[:] 

167 

168 diff = DeepDiff(old_store, new_store, exclude_regex_paths=deep_diff_exclude_regexp) 

169 if diff == {}: 

170 return 

171 

172 store_to_edit.splice(0, len(store_to_edit), new_store) 

173 

174 

175def show_song_popover( 

176 song_ids: List[str], 

177 x: int, 

178 y: int, 

179 relative_to: Any, 

180 offline_mode: bool, 

181 position: Gtk.PositionType = Gtk.PositionType.BOTTOM, 

182 on_download_state_change: Callable[[str], None] = lambda _: None, 

183 on_remove_downloads_click: Callable[[], Any] = lambda: None, 

184 on_playlist_state_change: Callable[[], None] = lambda: None, 

185 show_remove_from_playlist_button: bool = False, 

186 extra_menu_items: List[Tuple[Gtk.ModelButton, Any]] = None, 

187): 

188 def on_download_songs_click(_: Any): 

189 AdapterManager.batch_download_songs( 

190 song_ids, 

191 before_download=on_download_state_change, 

192 on_song_download_complete=on_download_state_change, 

193 ) 

194 

195 def do_on_remove_downloads_click(_: Any): 

196 AdapterManager.cancel_download_songs(song_ids) 

197 AdapterManager.batch_delete_cached_songs( 

198 song_ids, 

199 on_song_delete=on_download_state_change, 

200 ) 

201 on_remove_downloads_click() 

202 

203 def on_add_to_playlist_click(_: Any, playlist: Playlist): 

204 update_playlist_result = AdapterManager.update_playlist( 

205 playlist_id=playlist.id, append_song_ids=song_ids 

206 ) 

207 update_playlist_result.add_done_callback(lambda _: on_playlist_state_change()) 

208 

209 popover = Gtk.PopoverMenu() 

210 vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) 

211 

212 # Add all of the menu items to the popover. 

213 song_count = len(song_ids) 

214 

215 play_next_button = Gtk.ModelButton(text="Play next", sensitive=False) 

216 add_to_queue_button = Gtk.ModelButton(text="Add to queue", sensitive=False) 

217 if not offline_mode: 

218 play_next_button.set_action_name("app.play-next") 

219 play_next_button.set_action_target_value(GLib.Variant("as", song_ids)) 

220 add_to_queue_button.set_action_name("app.add-to-queue") 

221 add_to_queue_button.set_action_target_value(GLib.Variant("as", song_ids)) 

222 

223 go_to_album_button = Gtk.ModelButton(text="Go to album", sensitive=False) 

224 go_to_artist_button = Gtk.ModelButton(text="Go to artist", sensitive=False) 

225 browse_to_song = Gtk.ModelButton( 

226 text=f"Browse to {pluralize('song', song_count)}", sensitive=False 

227 ) 

228 download_song_button = Gtk.ModelButton( 

229 text=f"Download {pluralize('song', song_count)}", sensitive=False 

230 ) 

231 remove_download_button = Gtk.ModelButton( 

232 text=f"Remove {pluralize('download', song_count)}", sensitive=False 

233 ) 

234 

235 # Retrieve songs and set the buttons as sensitive later. 

236 def on_get_song_details_done(songs: List[Song]): 

237 song_cache_statuses = AdapterManager.get_cached_statuses([s.id for s in songs]) 

238 if not offline_mode and any( 

239 status == SongCacheStatus.NOT_CACHED for status in song_cache_statuses 

240 ): 

241 download_song_button.set_sensitive(True) 

242 if any( 

243 status 

244 in ( 

245 SongCacheStatus.CACHED, 

246 SongCacheStatus.PERMANENTLY_CACHED, 

247 SongCacheStatus.DOWNLOADING, 

248 ) 

249 for status in song_cache_statuses 

250 ): 

251 remove_download_button.set_sensitive(True) 

252 play_next_button.set_action_target_value(GLib.Variant("as", song_ids)) 

253 play_next_button.set_action_name("app.play-next") 

254 add_to_queue_button.set_action_target_value(GLib.Variant("as", song_ids)) 

255 add_to_queue_button.set_action_name("app.add-to-queue") 

256 

257 albums, artists, parents = set(), set(), set() 

258 for song in songs: 

259 parents.add(parent_id if (parent_id := song.parent_id) else None) 

260 

261 if (al := song.album) and (id_ := al.id) and not id_.startswith("invalid:"): 

262 albums.add(id_) 

263 

264 if (a := song.artist) and (id_ := a.id) and not id_.startswith("invalid:"): 

265 artists.add(id_) 

266 

267 if len(albums) == 1 and list(albums)[0] is not None: 

268 go_to_album_button.set_action_target_value( 

269 GLib.Variant("s", list(albums)[0]) 

270 ) 

271 go_to_album_button.set_action_name("app.go-to-album") 

272 if len(artists) == 1 and list(artists)[0] is not None: 

273 go_to_artist_button.set_action_target_value( 

274 GLib.Variant("s", list(artists)[0]) 

275 ) 

276 go_to_artist_button.set_action_name("app.go-to-artist") 

277 if len(parents) == 1 and list(parents)[0] is not None: 

278 browse_to_song.set_action_target_value(GLib.Variant("s", list(parents)[0])) 

279 browse_to_song.set_action_name("app.browse-to") 

280 

281 def batch_get_song_details() -> List[Song]: 

282 return [ 

283 AdapterManager.get_song_details(song_id).result() for song_id in song_ids 

284 ] 

285 

286 get_song_details_result: Result[List[Song]] = Result(batch_get_song_details) 

287 get_song_details_result.add_done_callback( 

288 lambda f: GLib.idle_add(on_get_song_details_done, f.result()) 

289 ) 

290 

291 menu_items = [ 

292 play_next_button, 

293 add_to_queue_button, 

294 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL), 

295 go_to_album_button, 

296 go_to_artist_button, 

297 browse_to_song, 

298 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL), 

299 (download_song_button, on_download_songs_click), 

300 (remove_download_button, do_on_remove_downloads_click), 

301 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL), 

302 Gtk.ModelButton( 

303 text=f"Add {pluralize('song', song_count)} to playlist", 

304 menu_name="add-to-playlist", 

305 name="menu-item-add-to-playlist", 

306 sensitive=not offline_mode, 

307 ), 

308 *(extra_menu_items or []), 

309 ] 

310 

311 for item in menu_items: 

312 if type(item) == tuple: 

313 el, fn = item 

314 el.connect("clicked", fn) 

315 el.get_style_context().add_class("menu-button") 

316 vbox.pack_start(item[0], False, True, 0) 

317 else: 

318 item.get_style_context().add_class("menu-button") 

319 vbox.pack_start(item, False, True, 0) 

320 

321 popover.add(vbox) 

322 

323 # Create the "Add song(s) to playlist" sub-menu. 

324 playlists_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) 

325 

326 if not offline_mode: 

327 # Back button 

328 playlists_vbox.add( 

329 Gtk.ModelButton(inverted=True, centered=True, menu_name="main") 

330 ) 

331 

332 # Loading indicator 

333 loading_indicator = Gtk.Spinner(name="menu-item-spinner") 

334 loading_indicator.start() 

335 playlists_vbox.add(loading_indicator) 

336 

337 # Create a future to make the actual playlist buttons 

338 def on_get_playlists_done(f: Result[List[Playlist]]): 

339 playlists_vbox.remove(loading_indicator) 

340 

341 for playlist in f.result(): 

342 button = Gtk.ModelButton(text=playlist.name) 

343 button.get_style_context().add_class("menu-button") 

344 button.connect("clicked", on_add_to_playlist_click, playlist) 

345 button.show() 

346 playlists_vbox.pack_start(button, False, True, 0) 

347 

348 playlists_result = AdapterManager.get_playlists() 

349 playlists_result.add_done_callback(on_get_playlists_done) 

350 

351 popover.add(playlists_vbox) 

352 popover.child_set_property(playlists_vbox, "submenu", "add-to-playlist") 

353 

354 # Positioning of the popover. 

355 rect = Gdk.Rectangle() 

356 rect.x, rect.y, rect.width, rect.height = x, y, 1, 1 

357 popover.set_pointing_to(rect) 

358 popover.set_position(position) 

359 popover.set_relative_to(relative_to) 

360 

361 popover.popup() 

362 popover.show_all() 

363 

364 

365def async_callback( 

366 future_fn: Callable[..., Result], 

367 before_download: Callable[[Any], None] = None, 

368 on_failure: Callable[[Any, Exception], None] = None, 

369) -> Callable[[Callable], Callable]: 

370 """ 

371 Defines the ``async_callback`` decorator. 

372 

373 When a function is annotated with this decorator, the function becomes the done 

374 callback for the given result-generating lambda function. The annotated function 

375 will be called with the result of the Result generated by said lambda function. 

376 

377 :param future_fn: a function which generates an :class:`AdapterManager.Result`. 

378 """ 

379 

380 def decorator(callback_fn: Callable) -> Callable: 

381 @functools.wraps(callback_fn) 

382 def wrapper( 

383 self: Any, 

384 *args, 

385 app_config: AppConfiguration = None, 

386 force: bool = False, 

387 order_token: int = None, 

388 **kwargs, 

389 ): 

390 def on_before_download(): 

391 if before_download: 

392 GLib.idle_add(before_download, self) 

393 

394 def future_callback(is_immediate: bool, f: Result): 

395 try: 

396 result = f.result() 

397 is_partial = False 

398 except CacheMissError as e: 

399 result = e.partial_data 

400 if result is None: 

401 if on_failure: 

402 GLib.idle_add(on_failure, self, e) 

403 return 

404 

405 is_partial = True 

406 except Exception as e: 

407 if on_failure: 

408 GLib.idle_add(on_failure, self, e) 

409 return 

410 

411 fn = functools.partial( 

412 callback_fn, 

413 self, 

414 result, 

415 app_config=app_config, 

416 force=force, 

417 order_token=order_token, 

418 is_partial=is_partial, 

419 ) 

420 

421 if is_immediate: 

422 # The data is available now, no need to wait for the future to 

423 # finish, and no need to incur the overhead of adding to the GLib 

424 # event queue. 

425 fn() 

426 else: 

427 # We don't have the data yet, meaning that it is a future, and we 

428 # have to idle add so that we don't seg fault GTK. 

429 GLib.idle_add(fn) 

430 

431 result: Result = future_fn( 

432 *args, 

433 before_download=on_before_download, 

434 force=force, 

435 **kwargs, 

436 ) 

437 result.add_done_callback( 

438 functools.partial(future_callback, result.data_is_available) 

439 ) 

440 

441 return wrapper 

442 

443 return decorator