Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import logging 

2from datetime import timedelta 

3from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union 

4 

5from .base import PlayerDeviceEvent, PlayerEvent 

6from .chromecast import ChromecastPlayer # noqa: F401 

7from .mpv import MPVPlayer # noqa: F401 

8from ..adapters.api_objects import Song 

9 

10 

11class PlayerManager: 

12 # Available Players. Order matters for UI display. 

13 available_player_types: List[Type] = [MPVPlayer, ChromecastPlayer] 

14 

15 @staticmethod 

16 def get_configuration_options() -> Dict[ 

17 str, Dict[str, Union[Type, Tuple[str, ...]]] 

18 ]: 

19 """ 

20 :returns: Dictionary of the name of the player -> option configs (see 

21 :class:`sublime_music.players.base.Player.get_configuration_options` for 

22 details). 

23 """ 

24 return { 

25 p.name: p.get_configuration_options() 

26 for p in PlayerManager.available_player_types 

27 } 

28 

29 # Initialization and Shutdown 

30 def __init__( 

31 self, 

32 on_timepos_change: Callable[[Optional[float]], None], 

33 on_track_end: Callable[[], None], 

34 on_player_event: Callable[[PlayerEvent], None], 

35 player_device_change_callback: Callable[[PlayerDeviceEvent], None], 

36 config: Dict[str, Dict[str, Union[Type, Tuple[str, ...]]]], 

37 ): 

38 self.current_song: Optional[Song] = None 

39 self.next_song_uri: Optional[str] = None 

40 self.on_timepos_change = on_timepos_change 

41 self.on_track_end = on_track_end 

42 self.config = config 

43 self.players: Dict[Type, Any] = {} 

44 self.device_id_type_map: Dict[str, Type] = {} 

45 self._current_device_id: Optional[str] = None 

46 self._track_ending: bool = False 

47 

48 def player_event_wrapper(pe: PlayerEvent): 

49 if pe.device_id == self._current_device_id: 

50 on_player_event(pe) 

51 

52 self.on_player_event = player_event_wrapper 

53 

54 def callback_wrapper(pde: PlayerDeviceEvent): 

55 self.device_id_type_map[pde.id] = pde.player_type 

56 player_device_change_callback(pde) 

57 

58 self.player_device_change_callback = callback_wrapper 

59 

60 self.players = { 

61 player_type: player_type( 

62 self.on_timepos_change, 

63 self._on_track_end, 

64 self.on_player_event, 

65 self.player_device_change_callback, 

66 self.config.get(player_type.name), 

67 ) 

68 for player_type in PlayerManager.available_player_types 

69 } 

70 

71 def change_settings( 

72 self, 

73 config: Dict[str, Dict[str, Union[Type, Tuple[str, ...]]]], 

74 ): 

75 self.config = config 

76 for player_type, player in self.players.items(): 

77 player.change_settings(config.get(player_type.name)) 

78 

79 def refresh_players(self): 

80 for player in self.players.values(): 

81 player.refresh_players() 

82 

83 def shutdown(self): 

84 for p in self.players.values(): 

85 p.shutdown() 

86 

87 def _get_current_player_type(self) -> Any: 

88 device_id = self._current_device_id 

89 if device_id: 

90 return self.device_id_type_map.get(device_id) 

91 

92 def _get_current_player(self) -> Any: 

93 if current_player_type := self._get_current_player_type(): 

94 return self.players.get(current_player_type) 

95 

96 def _on_track_end(self): 

97 self._track_ending = True 

98 self.on_track_end() 

99 

100 @property 

101 def supported_schemes(self) -> Set[str]: 

102 if cp := self._get_current_player(): 

103 return cp.supported_schemes 

104 return set() 

105 

106 @property 

107 def can_start_playing_with_no_latency(self) -> bool: 

108 if self._current_device_id: 

109 return self._get_current_player_type().can_start_playing_with_no_latency 

110 else: 

111 return False 

112 

113 @property 

114 def current_device_id(self) -> Optional[str]: 

115 return self._current_device_id 

116 

117 def set_current_device_id(self, device_id: str): 

118 logging.info(f"Setting current device id to '{device_id}'") 

119 if cp := self._get_current_player(): 

120 cp.pause() 

121 cp.song_loaded = False 

122 

123 self._current_device_id = device_id 

124 

125 if cp := self._get_current_player(): 

126 cp.set_current_device_id(device_id) 

127 cp.song_loaded = False 

128 

129 def reset(self): 

130 if current_player := self._get_current_player(): 

131 current_player.reset() 

132 

133 @property 

134 def song_loaded(self) -> bool: 

135 if current_player := self._get_current_player(): 

136 return current_player.song_loaded 

137 return False 

138 

139 @property 

140 def playing(self) -> bool: 

141 if current_player := self._get_current_player(): 

142 return current_player.playing 

143 return False 

144 

145 def get_volume(self) -> float: 

146 if current_player := self._get_current_player(): 

147 return current_player.get_volume() 

148 return 100 

149 

150 def set_volume(self, volume: float): 

151 if current_player := self._get_current_player(): 

152 current_player.set_volume(volume) 

153 

154 def get_is_muted(self) -> bool: 

155 if current_player := self._get_current_player(): 

156 return current_player.get_is_muted() 

157 return False 

158 

159 def set_muted(self, muted: bool): 

160 if current_player := self._get_current_player(): 

161 current_player.set_muted(muted) 

162 

163 def play_media(self, uri: str, progress: timedelta, song: Song): 

164 current_player = self._get_current_player() 

165 if not current_player: 

166 return 

167 

168 if ( 

169 current_player.gapless_playback 

170 and self.next_song_uri 

171 and uri == self.next_song_uri 

172 and progress == timedelta(0) 

173 and self._track_ending 

174 ): 

175 # In this case the player already knows about the next 

176 # song and will automatically play it when the current 

177 # song is complete. 

178 self.current_song = song 

179 self.next_song_uri = None 

180 self._track_ending = False 

181 current_player.song_loaded = True 

182 return 

183 

184 # If we are changing the current song then the next song 

185 # should also be invalidated. 

186 if self.current_song != song: 

187 self.current_song = song 

188 self.next_song_uri = None 

189 

190 self._track_ending = False 

191 current_player.play_media(uri, progress, song) 

192 

193 def pause(self): 

194 if current_player := self._get_current_player(): 

195 current_player.pause() 

196 

197 def toggle_play(self): 

198 if current_player := self._get_current_player(): 

199 if self.playing: 

200 current_player.pause() 

201 else: 

202 current_player.play() 

203 

204 def seek(self, position: timedelta): 

205 if current_player := self._get_current_player(): 

206 current_player.seek(position) 

207 

208 def next_media_cached(self, uri: str, song: Song): 

209 if current_player := self._get_current_player(): 

210 if current_player.gapless_playback: 

211 self.next_song_uri = uri 

212 

213 current_player.next_media_cached(uri, song)