Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import functools
2import re
3from datetime import timedelta
4from typing import (
5 Any,
6 Callable,
7 cast,
8 Iterable,
9 List,
10 Match,
11 Optional,
12 Tuple,
13 Union,
14)
16from deepdiff import DeepDiff
17from gi.repository import Gdk, GLib, Gtk
19from ..adapters import AdapterManager, CacheMissError, Result, SongCacheStatus
20from ..adapters.api_objects import Playlist, Song
21from ..config import AppConfiguration
23deep_diff_exclude_regexp = re.compile(r"root\[\d+\]\.props")
26def format_song_duration(duration_secs: Union[int, timedelta, None]) -> str:
27 """
28 Formats the song duration as mins:seconds with the seconds being
29 zero-padded if necessary.
31 >>> format_song_duration(80)
32 '1:20'
33 >>> format_song_duration(62)
34 '1:02'
35 >>> format_song_duration(timedelta(seconds=68.2))
36 '1:08'
37 >>> format_song_duration(None)
38 '-:--'
39 """
40 if isinstance(duration_secs, timedelta):
41 duration_secs = round(duration_secs.total_seconds())
42 if duration_secs is None:
43 return "-:--"
45 duration_secs = max(duration_secs, 0)
47 return f"{duration_secs // 60}:{duration_secs % 60:02}"
50def pluralize(string: str, number: int, pluralized_form: str = None) -> str:
51 """
52 Pluralize the given string given the count as a number.
54 >>> pluralize('foo', 1)
55 'foo'
56 >>> pluralize('foo', 2)
57 'foos'
58 >>> pluralize('foo', 0)
59 'foos'
60 """
61 if number != 1:
62 return pluralized_form or f"{string}s"
63 return string
66def format_sequence_duration(duration: Optional[timedelta]) -> str:
67 """
68 Formats duration in English.
70 >>> format_sequence_duration(timedelta(seconds=90))
71 '1 minute, 30 seconds'
72 >>> format_sequence_duration(timedelta(seconds=(60 * 60 + 120)))
73 '1 hour, 2 minutes'
74 >>> format_sequence_duration(None)
75 '0 seconds'
76 """
77 duration_secs = round(duration.total_seconds()) if duration else 0
78 duration_mins = (duration_secs // 60) % 60
79 duration_hrs = duration_secs // 60 // 60
80 duration_secs = duration_secs % 60
82 format_components = []
83 if duration_hrs > 0:
84 hrs = "{} {}".format(duration_hrs, pluralize("hour", duration_hrs))
85 format_components.append(hrs)
87 if duration_mins > 0:
88 mins = "{} {}".format(duration_mins, pluralize("minute", duration_mins))
89 format_components.append(mins)
91 # Show seconds if there are no hours.
92 if duration_hrs == 0:
93 secs = "{} {}".format(duration_secs, pluralize("second", duration_secs))
94 format_components.append(secs)
96 return ", ".join(format_components)
99def dot_join(*items: Any) -> str:
100 """
101 Joins the given strings with a dot character. Filters out ``None`` values.
103 >>> dot_join(None, "foo", "bar", None, "baz")
104 'foo • bar • baz'
105 """
106 return " • ".join(map(str, filter(lambda x: x is not None, items)))
109def get_cached_status_icons(song_ids: List[str]) -> List[str]:
110 cache_icon = {
111 SongCacheStatus.CACHED: "folder-download-symbolic",
112 SongCacheStatus.PERMANENTLY_CACHED: "view-pin-symbolic",
113 SongCacheStatus.DOWNLOADING: "emblem-synchronizing-symbolic",
114 }
115 return [
116 cache_icon.get(cache_status, "")
117 for cache_status in AdapterManager.get_cached_statuses(song_ids)
118 ]
121def _parse_diff_location(location: str) -> Tuple:
122 """
123 Parses a diff location as returned by deepdiff.
125 >>> _parse_diff_location("root[22]")
126 ('22',)
127 >>> _parse_diff_location("root[22][4]")
128 ('22', '4')
129 >>> _parse_diff_location("root[22].foo")
130 ('22', 'foo')
131 """
132 match = re.match(r"root\[(\d*)\](?:\[(\d*)\]|\.(.*))?", location)
133 return tuple(g for g in cast(Match, match).groups() if g is not None)
136def diff_song_store(store_to_edit: Any, new_store: Iterable[Any]):
137 """
138 Diffing song stores is nice, because we can easily make edits by modifying
139 the underlying store.
140 """
141 old_store = [row[:] for row in store_to_edit]
143 # Diff the lists to determine what needs to be changed.
144 diff = DeepDiff(old_store, new_store)
145 changed = diff.get("values_changed", {})
146 added = diff.get("iterable_item_added", {})
147 removed = diff.get("iterable_item_removed", {})
149 for edit_location, diff in changed.items():
150 idx, field = _parse_diff_location(edit_location)
151 store_to_edit[int(idx)][int(field)] = diff["new_value"]
153 for remove_location, _ in reversed(list(removed.items())):
154 remove_at = int(_parse_diff_location(remove_location)[0])
155 del store_to_edit[remove_at]
157 for _, value in added.items():
158 store_to_edit.append(value)
161def diff_model_store(store_to_edit: Any, new_store: Iterable[Any]):
162 """
163 The diff here is that if there are any differences, then we refresh the
164 entire list. This is because it is too hard to do editing.
165 """
166 old_store = store_to_edit[:]
168 diff = DeepDiff(old_store, new_store, exclude_regex_paths=deep_diff_exclude_regexp)
169 if diff == {}:
170 return
172 store_to_edit.splice(0, len(store_to_edit), new_store)
175def show_song_popover(
176 song_ids: List[str],
177 x: int,
178 y: int,
179 relative_to: Any,
180 offline_mode: bool,
181 position: Gtk.PositionType = Gtk.PositionType.BOTTOM,
182 on_download_state_change: Callable[[str], None] = lambda _: None,
183 on_remove_downloads_click: Callable[[], Any] = lambda: None,
184 on_playlist_state_change: Callable[[], None] = lambda: None,
185 show_remove_from_playlist_button: bool = False,
186 extra_menu_items: List[Tuple[Gtk.ModelButton, Any]] = None,
187):
188 def on_download_songs_click(_: Any):
189 AdapterManager.batch_download_songs(
190 song_ids,
191 before_download=on_download_state_change,
192 on_song_download_complete=on_download_state_change,
193 )
195 def do_on_remove_downloads_click(_: Any):
196 AdapterManager.cancel_download_songs(song_ids)
197 AdapterManager.batch_delete_cached_songs(
198 song_ids,
199 on_song_delete=on_download_state_change,
200 )
201 on_remove_downloads_click()
203 def on_add_to_playlist_click(_: Any, playlist: Playlist):
204 update_playlist_result = AdapterManager.update_playlist(
205 playlist_id=playlist.id, append_song_ids=song_ids
206 )
207 update_playlist_result.add_done_callback(lambda _: on_playlist_state_change())
209 popover = Gtk.PopoverMenu()
210 vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
212 # Add all of the menu items to the popover.
213 song_count = len(song_ids)
215 play_next_button = Gtk.ModelButton(text="Play next", sensitive=False)
216 add_to_queue_button = Gtk.ModelButton(text="Add to queue", sensitive=False)
217 if not offline_mode:
218 play_next_button.set_action_name("app.play-next")
219 play_next_button.set_action_target_value(GLib.Variant("as", song_ids))
220 add_to_queue_button.set_action_name("app.add-to-queue")
221 add_to_queue_button.set_action_target_value(GLib.Variant("as", song_ids))
223 go_to_album_button = Gtk.ModelButton(text="Go to album", sensitive=False)
224 go_to_artist_button = Gtk.ModelButton(text="Go to artist", sensitive=False)
225 browse_to_song = Gtk.ModelButton(
226 text=f"Browse to {pluralize('song', song_count)}", sensitive=False
227 )
228 download_song_button = Gtk.ModelButton(
229 text=f"Download {pluralize('song', song_count)}", sensitive=False
230 )
231 remove_download_button = Gtk.ModelButton(
232 text=f"Remove {pluralize('download', song_count)}", sensitive=False
233 )
235 # Retrieve songs and set the buttons as sensitive later.
236 def on_get_song_details_done(songs: List[Song]):
237 song_cache_statuses = AdapterManager.get_cached_statuses([s.id for s in songs])
238 if not offline_mode and any(
239 status == SongCacheStatus.NOT_CACHED for status in song_cache_statuses
240 ):
241 download_song_button.set_sensitive(True)
242 if any(
243 status
244 in (
245 SongCacheStatus.CACHED,
246 SongCacheStatus.PERMANENTLY_CACHED,
247 SongCacheStatus.DOWNLOADING,
248 )
249 for status in song_cache_statuses
250 ):
251 remove_download_button.set_sensitive(True)
252 play_next_button.set_action_target_value(GLib.Variant("as", song_ids))
253 play_next_button.set_action_name("app.play-next")
254 add_to_queue_button.set_action_target_value(GLib.Variant("as", song_ids))
255 add_to_queue_button.set_action_name("app.add-to-queue")
257 albums, artists, parents = set(), set(), set()
258 for song in songs:
259 parents.add(parent_id if (parent_id := song.parent_id) else None)
261 if (al := song.album) and (id_ := al.id) and not id_.startswith("invalid:"):
262 albums.add(id_)
264 if (a := song.artist) and (id_ := a.id) and not id_.startswith("invalid:"):
265 artists.add(id_)
267 if len(albums) == 1 and list(albums)[0] is not None:
268 go_to_album_button.set_action_target_value(
269 GLib.Variant("s", list(albums)[0])
270 )
271 go_to_album_button.set_action_name("app.go-to-album")
272 if len(artists) == 1 and list(artists)[0] is not None:
273 go_to_artist_button.set_action_target_value(
274 GLib.Variant("s", list(artists)[0])
275 )
276 go_to_artist_button.set_action_name("app.go-to-artist")
277 if len(parents) == 1 and list(parents)[0] is not None:
278 browse_to_song.set_action_target_value(GLib.Variant("s", list(parents)[0]))
279 browse_to_song.set_action_name("app.browse-to")
281 def batch_get_song_details() -> List[Song]:
282 return [
283 AdapterManager.get_song_details(song_id).result() for song_id in song_ids
284 ]
286 get_song_details_result: Result[List[Song]] = Result(batch_get_song_details)
287 get_song_details_result.add_done_callback(
288 lambda f: GLib.idle_add(on_get_song_details_done, f.result())
289 )
291 menu_items = [
292 play_next_button,
293 add_to_queue_button,
294 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL),
295 go_to_album_button,
296 go_to_artist_button,
297 browse_to_song,
298 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL),
299 (download_song_button, on_download_songs_click),
300 (remove_download_button, do_on_remove_downloads_click),
301 Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL),
302 Gtk.ModelButton(
303 text=f"Add {pluralize('song', song_count)} to playlist",
304 menu_name="add-to-playlist",
305 name="menu-item-add-to-playlist",
306 sensitive=not offline_mode,
307 ),
308 *(extra_menu_items or []),
309 ]
311 for item in menu_items:
312 if type(item) == tuple:
313 el, fn = item
314 el.connect("clicked", fn)
315 el.get_style_context().add_class("menu-button")
316 vbox.pack_start(item[0], False, True, 0)
317 else:
318 item.get_style_context().add_class("menu-button")
319 vbox.pack_start(item, False, True, 0)
321 popover.add(vbox)
323 # Create the "Add song(s) to playlist" sub-menu.
324 playlists_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
326 if not offline_mode:
327 # Back button
328 playlists_vbox.add(
329 Gtk.ModelButton(inverted=True, centered=True, menu_name="main")
330 )
332 # Loading indicator
333 loading_indicator = Gtk.Spinner(name="menu-item-spinner")
334 loading_indicator.start()
335 playlists_vbox.add(loading_indicator)
337 # Create a future to make the actual playlist buttons
338 def on_get_playlists_done(f: Result[List[Playlist]]):
339 playlists_vbox.remove(loading_indicator)
341 for playlist in f.result():
342 button = Gtk.ModelButton(text=playlist.name)
343 button.get_style_context().add_class("menu-button")
344 button.connect("clicked", on_add_to_playlist_click, playlist)
345 button.show()
346 playlists_vbox.pack_start(button, False, True, 0)
348 playlists_result = AdapterManager.get_playlists()
349 playlists_result.add_done_callback(on_get_playlists_done)
351 popover.add(playlists_vbox)
352 popover.child_set_property(playlists_vbox, "submenu", "add-to-playlist")
354 # Positioning of the popover.
355 rect = Gdk.Rectangle()
356 rect.x, rect.y, rect.width, rect.height = x, y, 1, 1
357 popover.set_pointing_to(rect)
358 popover.set_position(position)
359 popover.set_relative_to(relative_to)
361 popover.popup()
362 popover.show_all()
365def async_callback(
366 future_fn: Callable[..., Result],
367 before_download: Callable[[Any], None] = None,
368 on_failure: Callable[[Any, Exception], None] = None,
369) -> Callable[[Callable], Callable]:
370 """
371 Defines the ``async_callback`` decorator.
373 When a function is annotated with this decorator, the function becomes the done
374 callback for the given result-generating lambda function. The annotated function
375 will be called with the result of the Result generated by said lambda function.
377 :param future_fn: a function which generates an :class:`AdapterManager.Result`.
378 """
380 def decorator(callback_fn: Callable) -> Callable:
381 @functools.wraps(callback_fn)
382 def wrapper(
383 self: Any,
384 *args,
385 app_config: AppConfiguration = None,
386 force: bool = False,
387 order_token: int = None,
388 **kwargs,
389 ):
390 def on_before_download():
391 if before_download:
392 GLib.idle_add(before_download, self)
394 def future_callback(is_immediate: bool, f: Result):
395 try:
396 result = f.result()
397 is_partial = False
398 except CacheMissError as e:
399 result = e.partial_data
400 if result is None:
401 if on_failure:
402 GLib.idle_add(on_failure, self, e)
403 return
405 is_partial = True
406 except Exception as e:
407 if on_failure:
408 GLib.idle_add(on_failure, self, e)
409 return
411 fn = functools.partial(
412 callback_fn,
413 self,
414 result,
415 app_config=app_config,
416 force=force,
417 order_token=order_token,
418 is_partial=is_partial,
419 )
421 if is_immediate:
422 # The data is available now, no need to wait for the future to
423 # finish, and no need to incur the overhead of adding to the GLib
424 # event queue.
425 fn()
426 else:
427 # We don't have the data yet, meaning that it is a future, and we
428 # have to idle add so that we don't seg fault GTK.
429 GLib.idle_add(fn)
431 result: Result = future_fn(
432 *args,
433 before_download=on_before_download,
434 force=force,
435 **kwargs,
436 )
437 result.add_done_callback(
438 functools.partial(future_callback, result.data_is_available)
439 )
441 return wrapper
443 return decorator