player.rs 8.6 KB


  1. use eventual::Async;
  2. use portaudio;
  3. use std::sync::{mpsc, Mutex, Arc, Condvar, MutexGuard};
  4. use std::thread;
  5. use vorbis;
  6. use metadata::TrackRef;
  7. use session::Session;
  8. use audio_decrypt::AudioDecrypt;
  9. use util::{self, SpotifyId, Subfile};
  10. use spirc::{SpircState, SpircDelegate, PlayStatus};
  11. pub struct Player {
  12. state: Arc<(Mutex<PlayerState>, Condvar)>,
  13. commands: mpsc::Sender<PlayerCommand>,
  14. }
  15. pub struct PlayerState {
  16. status: PlayStatus,
  17. position_ms: u32,
  18. position_measured_at: i64,
  19. update_time: i64,
  20. end_of_track: bool
  21. }
  22. struct PlayerInternal {
  23. state: Arc<(Mutex<PlayerState>, Condvar)>,
  24. session: Session,
  25. commands: mpsc::Receiver<PlayerCommand>,
  26. }
  27. enum PlayerCommand {
  28. Load(SpotifyId, bool, u32),
  29. Play,
  30. Pause,
  31. Stop,
  32. Seek(u32)
  33. }
  34. impl Player {
  35. pub fn new(session: &Session) -> Player {
  36. let (cmd_tx, cmd_rx) = mpsc::channel();
  37. let state = Arc::new((Mutex::new(PlayerState {
  38. status: PlayStatus::kPlayStatusStop,
  39. position_ms: 0,
  40. position_measured_at: 0,
  41. update_time: util::now_ms(),
  42. end_of_track: false,
  43. }), Condvar::new()));
  44. let internal = PlayerInternal {
  45. session: session.clone(),
  46. commands: cmd_rx,
  47. state: state.clone()
  48. };
  49. thread::spawn(move || {
  50. internal.run()
  51. });
  52. Player {
  53. commands: cmd_tx,
  54. state: state,
  55. }
  56. }
  57. fn command(&self, cmd: PlayerCommand) {
  58. self.commands.send(cmd).unwrap();
  59. }
  60. }
  61. impl PlayerInternal {
  62. fn run(self) {
  63. portaudio::initialize().unwrap();
  64. let stream = portaudio::stream::Stream::<i16, i16>::open_default(
  65. 0, 2, 44100.0,
  66. portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED,
  67. None
  68. ).unwrap();
  69. let mut decoder = None;
  70. loop {
  71. match self.commands.try_recv() {
  72. Ok(PlayerCommand::Load(id, play, position)) => {
  73. self.update(|state| {
  74. if state.status == PlayStatus::kPlayStatusPlay {
  75. stream.stop().unwrap();
  76. }
  77. state.end_of_track = false;
  78. state.status = PlayStatus::kPlayStatusLoading;
  79. state.position_ms = position;
  80. state.position_measured_at = util::now_ms();
  81. return true;
  82. });
  83. let track : TrackRef = self.session.metadata(id);
  84. let file_id = *track.wait().unwrap().files.first().unwrap();
  85. let key = self.session.audio_key(track.id(), file_id).await().unwrap();
  86. decoder = Some(
  87. vorbis::Decoder::new(
  88. Subfile::new(
  89. AudioDecrypt::new(key,
  90. self.session.audio_file(file_id)), 0xa7)).unwrap());
  91. decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap();
  92. self.update(|state| {
  93. state.status = if play {
  94. stream.start().unwrap();
  95. PlayStatus::kPlayStatusPlay
  96. } else {
  97. PlayStatus::kPlayStatusPause
  98. };
  99. state.position_ms = position;
  100. state.position_measured_at = util::now_ms();
  101. return true;
  102. });
  103. println!("Load Done");
  104. }
  105. Ok(PlayerCommand::Seek(ms)) => {
  106. decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap();
  107. self.update(|state| {
  108. state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
  109. state.position_measured_at = util::now_ms();
  110. return true;
  111. });
  112. },
  113. Ok(PlayerCommand::Play) => {
  114. self.update(|state| {
  115. state.status = PlayStatus::kPlayStatusPlay;
  116. return true;
  117. });
  118. stream.start().unwrap();
  119. },
  120. Ok(PlayerCommand::Pause) => {
  121. self.update(|state| {
  122. state.status = PlayStatus::kPlayStatusPause;
  123. state.update_time = util::now_ms();
  124. return true;
  125. });
  126. stream.stop().unwrap();
  127. },
  128. Ok(PlayerCommand::Stop) => {
  129. self.update(|state| {
  130. if state.status == PlayStatus::kPlayStatusPlay {
  131. state.status = PlayStatus::kPlayStatusPause;
  132. }
  133. return true;
  134. });
  135. stream.stop().unwrap();
  136. decoder = None;
  137. },
  138. Err(..) => (),
  139. }
  140. if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
  141. match decoder.as_mut().unwrap().packets().next() {
  142. Some(Ok(packet)) => {
  143. match stream.write(&packet.data) {
  144. Ok(_) => (),
  145. Err(portaudio::PaError::OutputUnderflowed)
  146. => eprintln!("Underflow"),
  147. Err(e) => panic!("PA Error {}", e)
  148. };
  149. },
  150. Some(Err(vorbis::VorbisError::Hole)) => (),
  151. Some(Err(e)) => panic!("Vorbis error {:?}", e),
  152. None => {
  153. self.update(|state| {
  154. state.status = PlayStatus::kPlayStatusStop;
  155. state.end_of_track = true;
  156. return true;
  157. });
  158. stream.stop().unwrap();
  159. decoder = None;
  160. }
  161. }
  162. self.update(|state| {
  163. let now = util::now_ms();
  164. if now - state.position_measured_at > 5000 {
  165. state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
  166. state.position_measured_at = now;
  167. return true;
  168. } else {
  169. return false;
  170. }
  171. });
  172. }
  173. }
  174. drop(stream);
  175. portaudio::terminate().unwrap();
  176. }
  177. fn update<F>(&self, f: F)
  178. where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool {
  179. let mut guard = self.state.0.lock().unwrap();
  180. let update = f(&mut guard);
  181. if update {
  182. guard.update_time = util::now_ms();
  183. self.state.1.notify_all();
  184. }
  185. }
  186. }
  187. impl SpircDelegate for Player {
  188. type State = PlayerState;
  189. fn load(&self, track: SpotifyId,
  190. start_playing: bool, position_ms: u32) {
  191. self.command(PlayerCommand::Load(track, start_playing, position_ms));
  192. }
  193. fn play(&self) {
  194. self.command(PlayerCommand::Play)
  195. }
  196. fn pause(&self) {
  197. self.command(PlayerCommand::Pause)
  198. }
  199. fn stop(&self) {
  200. self.command(PlayerCommand::Stop)
  201. }
  202. fn seek(&self, position_ms: u32) {
  203. self.command(PlayerCommand::Seek(position_ms));
  204. }
  205. fn state(&self) -> MutexGuard<Self::State> {
  206. self.state.0.lock().unwrap()
  207. }
  208. fn updates(&self) -> mpsc::Receiver<i64> {
  209. let state = self.state.clone();
  210. let (update_tx, update_rx) = mpsc::channel();
  211. thread::spawn(move || {
  212. let mut guard = state.0.lock().unwrap();
  213. let mut last_update;
  214. loop {
  215. last_update = guard.update_time;
  216. update_tx.send(guard.update_time).unwrap();
  217. while last_update >= guard.update_time {
  218. guard = state.1.wait(guard).unwrap();
  219. }
  220. }
  221. });
  222. return update_rx;
  223. }
  224. }
  225. impl SpircState for PlayerState {
  226. fn status(&self) -> PlayStatus {
  227. return self.status;
  228. }
  229. fn position(&self) -> (u32, i64) {
  230. return (self.position_ms, self.position_measured_at);
  231. }
  232. fn update_time(&self) -> i64 {
  233. return self.update_time;
  234. }
  235. fn end_of_track(&self) -> bool {
  236. return self.end_of_track;
  237. }
  238. }