player.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. use eventual::{self, Async};
  2. use portaudio;
  3. use std::sync::{mpsc, Mutex, Arc, Condvar, MutexGuard};
  4. use std::thread;
  5. use vorbis;
  6. use metadata::{FileFormat, Track, TrackRef};
  7. use session::{Bitrate, Session};
  8. use audio_decrypt::AudioDecrypt;
  9. use util::{self, SpotifyId, Subfile};
  10. use spirc::{SpircState, SpircDelegate, PlayStatus};
  11. pub struct Player {
  12. state: Arc<(Mutex<PlayerState>, Condvar)>,
  13. commands: mpsc::Sender<PlayerCommand>,
  14. }
  15. pub struct PlayerState {
  16. status: PlayStatus,
  17. position_ms: u32,
  18. position_measured_at: i64,
  19. update_time: i64,
  20. end_of_track: bool,
  21. }
  22. struct PlayerInternal {
  23. state: Arc<(Mutex<PlayerState>, Condvar)>,
  24. session: Session,
  25. commands: mpsc::Receiver<PlayerCommand>,
  26. }
  27. enum PlayerCommand {
  28. Load(SpotifyId, bool, u32),
  29. Play,
  30. Pause,
  31. Stop,
  32. Seek(u32),
  33. }
  34. impl Player {
  35. pub fn new(session: Session) -> Player {
  36. let (cmd_tx, cmd_rx) = mpsc::channel();
  37. let state = Arc::new((Mutex::new(PlayerState {
  38. status: PlayStatus::kPlayStatusStop,
  39. position_ms: 0,
  40. position_measured_at: 0,
  41. update_time: util::now_ms(),
  42. end_of_track: false,
  43. }),
  44. Condvar::new()));
  45. let internal = PlayerInternal {
  46. session: session,
  47. commands: cmd_rx,
  48. state: state.clone(),
  49. };
  50. thread::spawn(move || internal.run());
  51. Player {
  52. commands: cmd_tx,
  53. state: state,
  54. }
  55. }
  56. fn command(&self, cmd: PlayerCommand) {
  57. self.commands.send(cmd).unwrap();
  58. }
  59. }
  60. impl PlayerInternal {
  61. fn run(self) {
  62. portaudio::initialize().unwrap();
  63. let stream = portaudio::stream::Stream::<i16, i16>::open_default(
  64. 0, 2, 44100.0,
  65. portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED,
  66. None
  67. ).unwrap();
  68. let mut decoder = None;
  69. loop {
  70. let playing = self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay;
  71. let cmd = if playing {
  72. self.commands.try_recv().ok()
  73. } else {
  74. Some(self.commands.recv().unwrap())
  75. };
  76. match cmd {
  77. Some(PlayerCommand::Load(track_id, play, position)) => {
  78. self.update(|state| {
  79. if state.status == PlayStatus::kPlayStatusPlay {
  80. stream.stop().unwrap();
  81. }
  82. state.end_of_track = false;
  83. state.status = if play {
  84. PlayStatus::kPlayStatusPlay
  85. } else {
  86. PlayStatus::kPlayStatusPause
  87. };
  88. state.position_ms = position;
  89. state.position_measured_at = util::now_ms();
  90. true
  91. });
  92. drop(decoder);
  93. let mut track = self.session.metadata::<Track>(track_id).await().unwrap();
  94. if !track.available {
  95. let alternatives = track.alternatives
  96. .iter()
  97. .map(|alt_id| {
  98. self.session.metadata::<Track>(*alt_id)
  99. })
  100. .collect::<Vec<TrackRef>>();
  101. track = eventual::sequence(alternatives.into_iter())
  102. .iter()
  103. .find(|alt| alt.available)
  104. .unwrap();
  105. }
  106. let format = match self.session.0.config.bitrate {
  107. Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
  108. Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
  109. Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
  110. };
  111. let (file_id, _) = track.files.into_iter().find(|&(_, f)| f == format).unwrap();
  112. let key = self.session.audio_key(track.id, file_id).await().unwrap();
  113. decoder = Some(
  114. vorbis::Decoder::new(
  115. Subfile::new(
  116. AudioDecrypt::new(key,
  117. self.session.audio_file(file_id)), 0xa7)).unwrap());
  118. decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap();
  119. self.update(|state| {
  120. state.status = if play {
  121. stream.start().unwrap();
  122. PlayStatus::kPlayStatusPlay
  123. } else {
  124. PlayStatus::kPlayStatusPause
  125. };
  126. state.position_ms = position;
  127. state.position_measured_at = util::now_ms();
  128. true
  129. });
  130. println!("Load Done");
  131. }
  132. Some(PlayerCommand::Seek(ms)) => {
  133. decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap();
  134. self.update(|state| {
  135. state.position_ms =
  136. (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
  137. state.position_measured_at = util::now_ms();
  138. true
  139. });
  140. }
  141. Some(PlayerCommand::Play) => {
  142. self.update(|state| {
  143. state.status = PlayStatus::kPlayStatusPlay;
  144. true
  145. });
  146. stream.start().unwrap();
  147. }
  148. Some(PlayerCommand::Pause) => {
  149. self.update(|state| {
  150. state.status = PlayStatus::kPlayStatusPause;
  151. state.update_time = util::now_ms();
  152. true
  153. });
  154. stream.stop().unwrap();
  155. }
  156. Some(PlayerCommand::Stop) => {
  157. self.update(|state| {
  158. if state.status == PlayStatus::kPlayStatusPlay {
  159. state.status = PlayStatus::kPlayStatusPause;
  160. }
  161. true
  162. });
  163. stream.stop().unwrap();
  164. decoder = None;
  165. }
  166. None => (),
  167. }
  168. if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
  169. match decoder.as_mut().unwrap().packets().next() {
  170. Some(Ok(packet)) => {
  171. match stream.write(&packet.data) {
  172. Ok(_) => (),
  173. Err(portaudio::PaError::OutputUnderflowed) => eprintln!("Underflow"),
  174. Err(e) => panic!("PA Error {}", e),
  175. };
  176. }
  177. Some(Err(vorbis::VorbisError::Hole)) => (),
  178. Some(Err(e)) => panic!("Vorbis error {:?}", e),
  179. None => {
  180. self.update(|state| {
  181. state.status = PlayStatus::kPlayStatusStop;
  182. state.end_of_track = true;
  183. true
  184. });
  185. stream.stop().unwrap();
  186. decoder = None;
  187. }
  188. }
  189. self.update(|state| {
  190. let now = util::now_ms();
  191. if now - state.position_measured_at > 5000 {
  192. state.position_ms =
  193. (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
  194. state.position_measured_at = now;
  195. true
  196. } else {
  197. false
  198. }
  199. });
  200. }
  201. }
  202. drop(stream);
  203. portaudio::terminate().unwrap();
  204. }
  205. fn update<F>(&self, f: F)
  206. where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool
  207. {
  208. let mut guard = self.state.0.lock().unwrap();
  209. let update = f(&mut guard);
  210. if update {
  211. guard.update_time = util::now_ms();
  212. self.state.1.notify_all();
  213. }
  214. }
  215. }
  216. impl SpircDelegate for Player {
  217. type State = PlayerState;
  218. fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) {
  219. self.command(PlayerCommand::Load(track, start_playing, position_ms));
  220. }
  221. fn play(&self) {
  222. self.command(PlayerCommand::Play)
  223. }
  224. fn pause(&self) {
  225. self.command(PlayerCommand::Pause)
  226. }
  227. fn stop(&self) {
  228. self.command(PlayerCommand::Stop)
  229. }
  230. fn seek(&self, position_ms: u32) {
  231. self.command(PlayerCommand::Seek(position_ms));
  232. }
  233. fn state(&self) -> MutexGuard<Self::State> {
  234. self.state.0.lock().unwrap()
  235. }
  236. fn updates(&self) -> mpsc::Receiver<i64> {
  237. let state = self.state.clone();
  238. let (update_tx, update_rx) = mpsc::channel();
  239. thread::spawn(move || {
  240. let mut guard = state.0.lock().unwrap();
  241. let mut last_update;
  242. loop {
  243. last_update = guard.update_time;
  244. update_tx.send(guard.update_time).unwrap();
  245. while last_update >= guard.update_time {
  246. guard = state.1.wait(guard).unwrap();
  247. }
  248. }
  249. });
  250. update_rx
  251. }
  252. }
  253. impl SpircState for PlayerState {
  254. fn status(&self) -> PlayStatus {
  255. self.status
  256. }
  257. fn position(&self) -> (u32, i64) {
  258. (self.position_ms, self.position_measured_at)
  259. }
  260. fn update_time(&self) -> i64 {
  261. self.update_time
  262. }
  263. fn end_of_track(&self) -> bool {
  264. self.end_of_track
  265. }
  266. }