player.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. use futures::sync::oneshot;
  2. use futures::{future, Future};
  3. use std::borrow::Cow;
  4. use std::io::{Read, Seek};
  5. use std::mem;
  6. use std::sync::mpsc::{RecvError, TryRecvError};
  7. use std::thread;
  8. use std;
  9. use vorbis::{self, VorbisError};
  10. use audio_backend::Sink;
  11. use audio_decrypt::AudioDecrypt;
  12. use audio_file::AudioFile;
  13. use metadata::{FileFormat, Track};
  14. use session::{Bitrate, Session};
  15. use mixer::AudioFilter;
  16. use util::{self, SpotifyId, Subfile};
  17. #[derive(Clone)]
  18. pub struct Player {
  19. commands: std::sync::mpsc::Sender<PlayerCommand>,
  20. }
  21. struct PlayerInternal {
  22. session: Session,
  23. commands: std::sync::mpsc::Receiver<PlayerCommand>,
  24. state: PlayerState,
  25. sink: Box<Sink>,
  26. audio_filter: Option<Box<AudioFilter + Send>>,
  27. }
  28. enum PlayerCommand {
  29. Load(SpotifyId, bool, u32, oneshot::Sender<()>),
  30. Play,
  31. Pause,
  32. Stop,
  33. Seek(u32),
  34. }
  35. impl Player {
  36. pub fn new<F>(session: Session, audio_filter: Option<Box<AudioFilter + Send>>, sink_builder: F) -> Player
  37. where F: FnOnce() -> Box<Sink> + Send + 'static {
  38. let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
  39. thread::spawn(move || {
  40. debug!("new Player[{}]", session.session_id());
  41. let internal = PlayerInternal {
  42. session: session,
  43. commands: cmd_rx,
  44. state: PlayerState::Stopped,
  45. sink: sink_builder(),
  46. audio_filter: audio_filter,
  47. };
  48. internal.run();
  49. });
  50. Player {
  51. commands: cmd_tx,
  52. }
  53. }
  54. fn command(&self, cmd: PlayerCommand) {
  55. self.commands.send(cmd).unwrap();
  56. }
  57. pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32)
  58. -> oneshot::Receiver<()>
  59. {
  60. let (tx, rx) = oneshot::channel();
  61. self.command(PlayerCommand::Load(track, start_playing, position_ms, tx));
  62. rx
  63. }
  64. pub fn play(&self) {
  65. self.command(PlayerCommand::Play)
  66. }
  67. pub fn pause(&self) {
  68. self.command(PlayerCommand::Pause)
  69. }
  70. pub fn stop(&self) {
  71. self.command(PlayerCommand::Stop)
  72. }
  73. pub fn seek(&self, position_ms: u32) {
  74. self.command(PlayerCommand::Seek(position_ms));
  75. }
  76. }
  77. type Decoder = vorbis::Decoder<Subfile<AudioDecrypt<AudioFile>>>;
  78. enum PlayerState {
  79. Stopped,
  80. Paused {
  81. decoder: Decoder,
  82. end_of_track: oneshot::Sender<()>,
  83. },
  84. Playing {
  85. decoder: Decoder,
  86. end_of_track: oneshot::Sender<()>,
  87. },
  88. Invalid,
  89. }
  90. impl PlayerState {
  91. fn is_playing(&self) -> bool {
  92. use self::PlayerState::*;
  93. match *self {
  94. Stopped | Paused { .. } => false,
  95. Playing { .. } => true,
  96. Invalid => panic!("invalid state"),
  97. }
  98. }
  99. fn decoder(&mut self) -> Option<&mut Decoder> {
  100. use self::PlayerState::*;
  101. match *self {
  102. Stopped => None,
  103. Paused { ref mut decoder, .. } |
  104. Playing { ref mut decoder, .. } => Some(decoder),
  105. Invalid => panic!("invalid state"),
  106. }
  107. }
  108. fn signal_end_of_track(self) {
  109. use self::PlayerState::*;
  110. match self {
  111. Paused { end_of_track, .. } |
  112. Playing { end_of_track, .. } => {
  113. end_of_track.complete(())
  114. }
  115. Stopped => warn!("signal_end_of_track from stopped state"),
  116. Invalid => panic!("invalid state"),
  117. }
  118. }
  119. fn paused_to_playing(&mut self) {
  120. use self::PlayerState::*;
  121. match ::std::mem::replace(self, Invalid) {
  122. Paused { decoder, end_of_track } => {
  123. *self = Playing {
  124. decoder: decoder,
  125. end_of_track: end_of_track,
  126. };
  127. }
  128. _ => panic!("invalid state"),
  129. }
  130. }
  131. fn playing_to_paused(&mut self) {
  132. use self::PlayerState::*;
  133. match ::std::mem::replace(self, Invalid) {
  134. Playing { decoder, end_of_track } => {
  135. *self = Paused {
  136. decoder: decoder,
  137. end_of_track: end_of_track,
  138. };
  139. }
  140. _ => panic!("invalid state"),
  141. }
  142. }
  143. }
  144. impl PlayerInternal {
  145. fn run(mut self) {
  146. loop {
  147. let cmd = if self.state.is_playing() {
  148. match self.commands.try_recv() {
  149. Ok(cmd) => Some(cmd),
  150. Err(TryRecvError::Empty) => None,
  151. Err(TryRecvError::Disconnected) => return,
  152. }
  153. } else {
  154. match self.commands.recv() {
  155. Ok(cmd) => Some(cmd),
  156. Err(RecvError) => return,
  157. }
  158. };
  159. if let Some(cmd) = cmd {
  160. self.handle_command(cmd);
  161. }
  162. let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
  163. Some(decoder.packets().next())
  164. } else { None };
  165. if let Some(packet) = packet {
  166. self.handle_packet(packet);
  167. }
  168. }
  169. }
  170. fn handle_packet(&mut self, packet: Option<Result<vorbis::Packet, VorbisError>>) {
  171. match packet {
  172. Some(Ok(mut packet)) => {
  173. if let Some(ref editor) = self.audio_filter {
  174. editor.modify_stream(&mut packet.data)
  175. };
  176. self.sink.write(&packet.data).unwrap();
  177. }
  178. Some(Err(vorbis::VorbisError::Hole)) => (),
  179. Some(Err(e)) => panic!("Vorbis error {:?}", e),
  180. None => {
  181. self.sink.stop().unwrap();
  182. self.run_onstop();
  183. let old_state = mem::replace(&mut self.state, PlayerState::Stopped);
  184. old_state.signal_end_of_track();
  185. }
  186. }
  187. }
  188. fn handle_command(&mut self, cmd: PlayerCommand) {
  189. debug!("command={:?}", cmd);
  190. match cmd {
  191. PlayerCommand::Load(track_id, play, position, end_of_track) => {
  192. if self.state.is_playing() {
  193. self.sink.stop().unwrap();
  194. }
  195. match self.load_track(track_id, position as i64) {
  196. Some(decoder) => {
  197. if play {
  198. if !self.state.is_playing() {
  199. self.run_onstart();
  200. }
  201. self.sink.start().unwrap();
  202. self.state = PlayerState::Playing {
  203. decoder: decoder,
  204. end_of_track: end_of_track,
  205. };
  206. } else {
  207. if self.state.is_playing() {
  208. self.run_onstop();
  209. }
  210. self.state = PlayerState::Paused {
  211. decoder: decoder,
  212. end_of_track: end_of_track,
  213. };
  214. }
  215. }
  216. None => {
  217. end_of_track.complete(());
  218. if self.state.is_playing() {
  219. self.run_onstop();
  220. }
  221. }
  222. }
  223. }
  224. PlayerCommand::Seek(position) => {
  225. if let Some(decoder) = self.state.decoder() {
  226. match vorbis_time_seek_ms(decoder, position as i64) {
  227. Ok(_) => (),
  228. Err(err) => error!("Vorbis error: {:?}", err),
  229. }
  230. } else {
  231. warn!("Player::seek called from invalid state");
  232. }
  233. }
  234. PlayerCommand::Play => {
  235. if let PlayerState::Paused { .. } = self.state {
  236. self.state.paused_to_playing();
  237. self.run_onstart();
  238. self.sink.start().unwrap();
  239. } else {
  240. warn!("Player::play called from invalid state");
  241. }
  242. }
  243. PlayerCommand::Pause => {
  244. if let PlayerState::Playing { .. } = self.state {
  245. self.state.playing_to_paused();
  246. self.sink.stop().unwrap();
  247. self.run_onstop();
  248. } else {
  249. warn!("Player::pause called from invalid state");
  250. }
  251. }
  252. PlayerCommand::Stop => {
  253. match self.state {
  254. PlayerState::Playing { .. } => {
  255. self.sink.stop().unwrap();
  256. self.run_onstop();
  257. self.state = PlayerState::Stopped;
  258. }
  259. PlayerState::Paused { .. } => {
  260. self.state = PlayerState::Stopped;
  261. },
  262. PlayerState::Stopped => {
  263. warn!("Player::stop called from invalid state");
  264. }
  265. PlayerState::Invalid => panic!("invalid state"),
  266. }
  267. }
  268. }
  269. }
  270. fn run_onstart(&self) {
  271. if let Some(ref program) = self.session.config().onstart {
  272. util::run_program(program)
  273. }
  274. }
  275. fn run_onstop(&self) {
  276. if let Some(ref program) = self.session.config().onstop {
  277. util::run_program(program)
  278. }
  279. }
  280. fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> {
  281. if track.available {
  282. Some(Cow::Borrowed(track))
  283. } else {
  284. let alternatives = track.alternatives
  285. .iter()
  286. .map(|alt_id| {
  287. self.session.metadata().get::<Track>(*alt_id)
  288. });
  289. let alternatives = future::join_all(alternatives).wait().unwrap();
  290. alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
  291. }
  292. }
  293. fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<Decoder> {
  294. let track = self.session.metadata().get::<Track>(track_id).wait().unwrap();
  295. info!("Loading track \"{}\"", track.name);
  296. let track = match self.find_available_alternative(&track) {
  297. Some(track) => track,
  298. None => {
  299. warn!("Track \"{}\" is not available", track.name);
  300. return None;
  301. }
  302. };
  303. let format = match self.session.config().bitrate {
  304. Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
  305. Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
  306. Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
  307. };
  308. let file_id = match track.files.get(&format) {
  309. Some(&file_id) => file_id,
  310. None => {
  311. warn!("Track \"{}\" is not available in format {:?}", track.name, format);
  312. return None;
  313. }
  314. };
  315. let key = self.session.audio_key().request(track.id, file_id).wait().unwrap();
  316. let open = self.session.audio_file().open(file_id);
  317. let encrypted_file = open.wait().unwrap();
  318. let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7);
  319. let mut decoder = vorbis::Decoder::new(audio_file).unwrap();
  320. match vorbis_time_seek_ms(&mut decoder, position) {
  321. Ok(_) => (),
  322. Err(err) => error!("Vorbis error: {:?}", err),
  323. }
  324. info!("Track \"{}\" loaded", track.name);
  325. Some(decoder)
  326. }
  327. }
  328. impl Drop for PlayerInternal {
  329. fn drop(&mut self) {
  330. debug!("drop Player[{}]", self.session.session_id());
  331. }
  332. }
  333. #[cfg(not(feature = "with-tremor"))]
  334. fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
  335. decoder.time_seek(ms as f64 / 1000f64)
  336. }
  337. #[cfg(feature = "with-tremor")]
  338. fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
  339. decoder.time_seek(ms)
  340. }
  341. impl ::std::fmt::Debug for PlayerCommand {
  342. fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
  343. match *self {
  344. PlayerCommand::Load(track, play, position, _) => {
  345. f.debug_tuple("Load")
  346. .field(&track)
  347. .field(&play)
  348. .field(&position)
  349. .finish()
  350. }
  351. PlayerCommand::Play => {
  352. f.debug_tuple("Play").finish()
  353. }
  354. PlayerCommand::Pause => {
  355. f.debug_tuple("Pause").finish()
  356. }
  357. PlayerCommand::Stop => {
  358. f.debug_tuple("Stop").finish()
  359. }
  360. PlayerCommand::Seek(position) => {
  361. f.debug_tuple("Seek")
  362. .field(&position)
  363. .finish()
  364. }
  365. }
  366. }
  367. }