player.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. use eventual::{self, Async};
  2. use std::borrow::Cow;
  3. use std::sync::{mpsc, Mutex, Arc, MutexGuard};
  4. use std::thread;
  5. use std::io::{Read, Seek};
  6. use vorbis;
  7. use audio_decrypt::AudioDecrypt;
  8. use audio_backend::Sink;
  9. use metadata::{FileFormat, Track, TrackRef};
  10. use session::{Bitrate, Session};
  11. use util::{self, ReadSeek, SpotifyId, Subfile};
  12. pub use spirc::PlayStatus;
  13. #[cfg(not(feature = "with-tremor"))]
  14. fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
  15. decoder.time_seek(ms as f64 / 1000f64)
  16. }
  17. #[cfg(not(feature = "with-tremor"))]
  18. fn vorbis_time_tell_ms<R>(decoder: &mut vorbis::Decoder<R>) -> Result<i64, vorbis::VorbisError> where R: Read + Seek {
  19. decoder.time_tell().map(|t| (t * 1000f64) as i64)
  20. }
  21. #[cfg(feature = "with-tremor")]
  22. fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
  23. decoder.time_seek(ms)
  24. }
  25. #[cfg(feature = "with-tremor")]
  26. fn vorbis_time_tell_ms<R>(decoder: &mut vorbis::Decoder<R>) -> Result<i64, vorbis::VorbisError> where R: Read + Seek {
  27. decoder.time_tell()
  28. }
  29. pub type PlayerObserver = Box<Fn(&PlayerState) + Send>;
  30. #[derive(Clone)]
  31. pub struct Player {
  32. state: Arc<Mutex<PlayerState>>,
  33. observers: Arc<Mutex<Vec<PlayerObserver>>>,
  34. commands: mpsc::Sender<PlayerCommand>,
  35. }
  36. #[derive(Clone)]
  37. pub struct PlayerState {
  38. pub status: PlayStatus,
  39. pub position_ms: u32,
  40. pub position_measured_at: i64,
  41. pub update_time: i64,
  42. pub volume: u16,
  43. pub track: Option<SpotifyId>,
  44. pub end_of_track: bool,
  45. }
  46. struct PlayerInternal {
  47. state: Arc<Mutex<PlayerState>>,
  48. observers: Arc<Mutex<Vec<PlayerObserver>>>,
  49. session: Session,
  50. commands: mpsc::Receiver<PlayerCommand>,
  51. }
  52. #[derive(Debug)]
  53. enum PlayerCommand {
  54. Load(SpotifyId, bool, u32),
  55. Play,
  56. Pause,
  57. Volume(u16),
  58. Stop,
  59. Seek(u32),
  60. SeekAt(u32, i64),
  61. }
  62. impl Player {
  63. pub fn new<F>(session: Session, sink_builder: F) -> Player
  64. where F: FnOnce() -> Box<Sink> + Send + 'static {
  65. let (cmd_tx, cmd_rx) = mpsc::channel();
  66. let state = Arc::new(Mutex::new(PlayerState {
  67. status: PlayStatus::kPlayStatusStop,
  68. position_ms: 0,
  69. position_measured_at: 0,
  70. update_time: util::now_ms(),
  71. volume: 0xFFFF,
  72. track: None,
  73. end_of_track: false,
  74. }));
  75. let observers = Arc::new(Mutex::new(Vec::new()));
  76. let internal = PlayerInternal {
  77. session: session,
  78. commands: cmd_rx,
  79. state: state.clone(),
  80. observers: observers.clone(),
  81. };
  82. thread::spawn(move || internal.run(sink_builder()));
  83. Player {
  84. commands: cmd_tx,
  85. state: state,
  86. observers: observers,
  87. }
  88. }
  89. fn command(&self, cmd: PlayerCommand) {
  90. self.commands.send(cmd).unwrap();
  91. }
  92. pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) {
  93. self.command(PlayerCommand::Load(track, start_playing, position_ms));
  94. }
  95. pub fn play(&self) {
  96. self.command(PlayerCommand::Play)
  97. }
  98. pub fn pause(&self) {
  99. self.command(PlayerCommand::Pause)
  100. }
  101. pub fn stop(&self) {
  102. self.command(PlayerCommand::Stop)
  103. }
  104. pub fn seek(&self, position_ms: u32) {
  105. self.command(PlayerCommand::Seek(position_ms));
  106. }
  107. pub fn seek_at(&self, position_ms: u32, measured_at: i64) {
  108. self.command(PlayerCommand::SeekAt(position_ms, measured_at));
  109. }
  110. pub fn state(&self) -> PlayerState {
  111. self.state.lock().unwrap().clone()
  112. }
  113. pub fn volume(&self, vol: u16) {
  114. self.command(PlayerCommand::Volume(vol));
  115. }
  116. pub fn add_observer(&self, observer: PlayerObserver) {
  117. self.observers.lock().unwrap().push(observer);
  118. }
  119. }
  120. fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> {
  121. // Fast path when volume is 100%
  122. if volume == 0xFFFF {
  123. Cow::Borrowed(data)
  124. } else {
  125. Cow::Owned(data.iter()
  126. .map(|&x| {
  127. (x as i32
  128. * volume as i32
  129. / 0xFFFF) as i16
  130. })
  131. .collect())
  132. }
  133. }
  134. fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option<Cow<'a, Track>> {
  135. if track.available {
  136. Some(Cow::Borrowed(track))
  137. } else {
  138. let alternatives = track.alternatives
  139. .iter()
  140. .map(|alt_id| {
  141. session.metadata::<Track>(*alt_id)
  142. })
  143. .collect::<Vec<TrackRef>>();
  144. eventual::sequence(alternatives.into_iter()).iter().find(|alt| alt.available).map(Cow::Owned)
  145. }
  146. }
  147. fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<Subfile<AudioDecrypt<Box<ReadSeek>>>>> {
  148. let track = session.metadata::<Track>(track_id).await().unwrap();
  149. info!("Loading track \"{}\"", track.name);
  150. let track = match find_available_alternative(session, &track) {
  151. Some(track) => track,
  152. None => {
  153. warn!("Track \"{}\" is not available", track.name);
  154. return None;
  155. }
  156. };
  157. let format = match session.config().bitrate {
  158. Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
  159. Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
  160. Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
  161. };
  162. let file_id = match track.files.get(&format) {
  163. Some(&file_id) => file_id,
  164. None => {
  165. warn!("Track \"{}\" is not available in format {:?}", track.name, format);
  166. return None;
  167. }
  168. };
  169. let key = session.audio_key(track.id, file_id).await().unwrap();
  170. let audio_file = Subfile::new(AudioDecrypt::new(key, session.audio_file(file_id)), 0xa7);
  171. let decoder = vorbis::Decoder::new(audio_file).unwrap();
  172. Some(decoder)
  173. }
  174. fn run_onstart(session: &Session) {
  175. match session.config().onstart {
  176. Some(ref program) => util::run_program(program),
  177. None => {},
  178. };
  179. }
  180. fn run_onstop(session: &Session) {
  181. match session.config().onstop {
  182. Some(ref program) => util::run_program(program),
  183. None => {},
  184. };
  185. }
  186. impl PlayerInternal {
  187. fn run(self, mut sink: Box<Sink>) {
  188. let mut decoder = None;
  189. loop {
  190. let playing = self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay;
  191. let cmd = if playing {
  192. self.commands.try_recv().ok()
  193. } else {
  194. Some(self.commands.recv().unwrap())
  195. };
  196. match cmd {
  197. Some(PlayerCommand::Load(track_id, play, position)) => {
  198. self.update(|state| {
  199. if state.status == PlayStatus::kPlayStatusPlay {
  200. sink.stop().unwrap();
  201. run_onstop(&self.session);
  202. }
  203. state.end_of_track = false;
  204. state.status = PlayStatus::kPlayStatusPause;
  205. state.position_ms = position;
  206. state.position_measured_at = util::now_ms();
  207. state.track = Some(track_id);
  208. true
  209. });
  210. drop(decoder);
  211. decoder = match load_track(&self.session, track_id) {
  212. Some(mut decoder) => {
  213. match vorbis_time_seek_ms(&mut decoder, position as i64) {
  214. Ok(_) => (),
  215. Err(err) => error!("Vorbis error: {:?}", err),
  216. }
  217. self.update(|state| {
  218. state.status = if play {
  219. run_onstart(&self.session);
  220. sink.start().unwrap();
  221. PlayStatus::kPlayStatusPlay
  222. } else {
  223. PlayStatus::kPlayStatusPause
  224. };
  225. state.position_ms = position;
  226. state.position_measured_at = util::now_ms();
  227. true
  228. });
  229. info!("Load Done");
  230. Some(decoder)
  231. }
  232. None => {
  233. self.update(|state| {
  234. state.status = PlayStatus::kPlayStatusStop;
  235. state.end_of_track = true;
  236. true
  237. });
  238. None
  239. }
  240. }
  241. }
  242. Some(PlayerCommand::Seek(position)) => {
  243. match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) {
  244. Ok(_) => (),
  245. Err(err) => error!("Vorbis error: {:?}", err),
  246. }
  247. self.update(|state| {
  248. state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
  249. state.position_measured_at = util::now_ms();
  250. true
  251. });
  252. }
  253. Some(PlayerCommand::SeekAt(position, measured_at)) => {
  254. let position = (util::now_ms() - measured_at + position as i64) as u32;
  255. match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) {
  256. Ok(_) => (),
  257. Err(err) => error!("Vorbis error: {:?}", err),
  258. }
  259. self.update(|state| {
  260. state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
  261. state.position_measured_at = util::now_ms();
  262. true
  263. });
  264. }
  265. Some(PlayerCommand::Play) => {
  266. self.update(|state| {
  267. state.status = PlayStatus::kPlayStatusPlay;
  268. state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
  269. state.position_measured_at = util::now_ms();
  270. true
  271. });
  272. run_onstart(&self.session);
  273. sink.start().unwrap();
  274. }
  275. Some(PlayerCommand::Pause) => {
  276. self.update(|state| {
  277. state.status = PlayStatus::kPlayStatusPause;
  278. state.update_time = util::now_ms();
  279. state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32;
  280. state.position_measured_at = util::now_ms();
  281. true
  282. });
  283. sink.stop().unwrap();
  284. run_onstop(&self.session);
  285. }
  286. Some(PlayerCommand::Volume(vol)) => {
  287. self.update(|state| {
  288. state.volume = vol;
  289. true
  290. });
  291. }
  292. Some(PlayerCommand::Stop) => {
  293. self.update(|state| {
  294. if state.status == PlayStatus::kPlayStatusPlay {
  295. state.status = PlayStatus::kPlayStatusPause;
  296. }
  297. state.position_ms = 0;
  298. state.position_measured_at = util::now_ms();
  299. true
  300. });
  301. sink.stop().unwrap();
  302. run_onstop(&self.session);
  303. decoder = None;
  304. }
  305. None => (),
  306. }
  307. if self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
  308. let packet = decoder.as_mut().unwrap().packets().next();
  309. match packet {
  310. Some(Ok(packet)) => {
  311. let buffer = apply_volume(self.state.lock().unwrap().volume,
  312. &packet.data);
  313. sink.write(&buffer).unwrap();
  314. self.update(|state| {
  315. state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
  316. state.position_measured_at = util::now_ms();
  317. false
  318. });
  319. }
  320. Some(Err(vorbis::VorbisError::Hole)) => (),
  321. Some(Err(e)) => panic!("Vorbis error {:?}", e),
  322. None => {
  323. self.update(|state| {
  324. state.status = PlayStatus::kPlayStatusStop;
  325. state.end_of_track = true;
  326. true
  327. });
  328. sink.stop().unwrap();
  329. run_onstop(&self.session);
  330. decoder = None;
  331. }
  332. }
  333. }
  334. }
  335. }
  336. fn update<F>(&self, f: F)
  337. where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool
  338. {
  339. let mut guard = self.state.lock().unwrap();
  340. let update = f(&mut guard);
  341. let observers = self.observers.lock().unwrap();
  342. if update {
  343. guard.update_time = util::now_ms();
  344. let state = guard.clone();
  345. drop(guard);
  346. for observer in observers.iter() {
  347. observer(&state);
  348. }
  349. }
  350. }
  351. }
  352. impl PlayerState {
  353. pub fn status(&self) -> PlayStatus {
  354. self.status
  355. }
  356. pub fn position(&self) -> (u32, i64) {
  357. (self.position_ms, self.position_measured_at)
  358. }
  359. pub fn volume(&self) -> u16 {
  360. self.volume
  361. }
  362. pub fn update_time(&self) -> i64 {
  363. self.update_time
  364. }
  365. pub fn end_of_track(&self) -> bool {
  366. self.end_of_track
  367. }
  368. }