player.rs 16 KB


  1. use futures::sync::oneshot;
  2. use futures::{future, Future};
  3. use std;
  4. use std::borrow::Cow;
  5. use std::io::{Read, Seek, SeekFrom, Result};
  6. use std::mem;
  7. use std::sync::mpsc::{RecvError, TryRecvError, RecvTimeoutError};
  8. use std::thread;
  9. use std::time::Duration;
  10. use config::{Bitrate, PlayerConfig, PlayerEvent};
  11. use core::session::Session;
  12. use core::spotify_id::SpotifyId;
  13. use audio_backend::Sink;
  14. use audio::{AudioFile, AudioDecrypt};
  15. use audio::{VorbisDecoder, VorbisPacket};
  16. use metadata::{FileFormat, Track, Metadata};
  17. use mixer::AudioFilter;
  18. pub struct Player {
  19. commands: Option<std::sync::mpsc::Sender<PlayerCommand>>,
  20. thread_handle: Option<thread::JoinHandle<()>>,
  21. }
  22. struct PlayerInternal {
  23. session: Session,
  24. config: PlayerConfig,
  25. commands: std::sync::mpsc::Receiver<PlayerCommand>,
  26. state: PlayerState,
  27. sink: Box<Sink>,
  28. sink_running: bool,
  29. audio_filter: Option<Box<AudioFilter + Send>>,
  30. }
  31. enum PlayerCommand {
  32. Load(SpotifyId, bool, u32, oneshot::Sender<()>),
  33. Play,
  34. Pause,
  35. Stop,
  36. Seek(u32),
  37. }
  38. impl Player {
  39. pub fn new<F>(config: PlayerConfig, session: Session,
  40. audio_filter: Option<Box<AudioFilter + Send>>,
  41. sink_builder: F) -> Player
  42. where F: FnOnce() -> Box<Sink> + Send + 'static
  43. {
  44. let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
  45. let handle = thread::spawn(move || {
  46. debug!("new Player[{}]", session.session_id());
  47. let internal = PlayerInternal {
  48. session: session,
  49. config: config,
  50. commands: cmd_rx,
  51. state: PlayerState::Stopped,
  52. sink: sink_builder(),
  53. sink_running: false,
  54. audio_filter: audio_filter,
  55. };
  56. internal.run();
  57. });
  58. Player {
  59. commands: Some(cmd_tx),
  60. thread_handle: Some(handle),
  61. }
  62. }
  63. fn command(&self, cmd: PlayerCommand) {
  64. self.commands.as_ref().unwrap().send(cmd).unwrap();
  65. }
  66. pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32)
  67. -> oneshot::Receiver<()>
  68. {
  69. let (tx, rx) = oneshot::channel();
  70. self.command(PlayerCommand::Load(track, start_playing, position_ms, tx));
  71. rx
  72. }
  73. pub fn play(&self) {
  74. self.command(PlayerCommand::Play)
  75. }
  76. pub fn pause(&self) {
  77. self.command(PlayerCommand::Pause)
  78. }
  79. pub fn stop(&self) {
  80. self.command(PlayerCommand::Stop)
  81. }
  82. pub fn seek(&self, position_ms: u32) {
  83. self.command(PlayerCommand::Seek(position_ms));
  84. }
  85. }
  86. impl Drop for Player {
  87. fn drop(&mut self) {
  88. debug!("Shutting down player thread ...");
  89. self.commands = None;
  90. if let Some(handle) = self.thread_handle.take() {
  91. match handle.join() {
  92. Ok(_) => (),
  93. Err(_) => error!("Player thread panicked!")
  94. }
  95. }
  96. }
  97. }
  98. type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>;
  99. enum PlayerState {
  100. Stopped,
  101. Paused {
  102. track_id: SpotifyId,
  103. decoder: Decoder,
  104. end_of_track: oneshot::Sender<()>,
  105. },
  106. Playing {
  107. track_id: SpotifyId,
  108. decoder: Decoder,
  109. end_of_track: oneshot::Sender<()>,
  110. },
  111. EndOfTrack { track_id: SpotifyId },
  112. Invalid,
  113. }
  114. impl PlayerState {
  115. fn is_playing(&self) -> bool {
  116. use self::PlayerState::*;
  117. match *self {
  118. Stopped | EndOfTrack { .. } | Paused { .. } => false,
  119. Playing { .. } => true,
  120. Invalid => panic!("invalid state"),
  121. }
  122. }
  123. fn decoder(&mut self) -> Option<&mut Decoder> {
  124. use self::PlayerState::*;
  125. match *self {
  126. Stopped | EndOfTrack { .. } => None,
  127. Paused { ref mut decoder, .. } |
  128. Playing { ref mut decoder, .. } => Some(decoder),
  129. Invalid => panic!("invalid state"),
  130. }
  131. }
  132. fn signal_end_of_track(self) {
  133. use self::PlayerState::*;
  134. match self {
  135. Paused { end_of_track, .. } |
  136. Playing { end_of_track, .. } => {
  137. let _ = end_of_track.send(());
  138. }
  139. EndOfTrack { .. } => warn!("signal_end_of_track from end of track state"),
  140. Stopped => warn!("signal_end_of_track from stopped state"),
  141. Invalid => panic!("invalid state"),
  142. }
  143. }
  144. fn paused_to_playing(&mut self) {
  145. use self::PlayerState::*;
  146. match ::std::mem::replace(self, Invalid) {
  147. Paused { decoder, end_of_track, track_id } => {
  148. *self = Playing {
  149. decoder: decoder,
  150. end_of_track: end_of_track,
  151. track_id: track_id,
  152. };
  153. }
  154. _ => panic!("invalid state"),
  155. }
  156. }
  157. fn playing_to_paused(&mut self) {
  158. use self::PlayerState::*;
  159. match ::std::mem::replace(self, Invalid) {
  160. Playing { decoder, end_of_track, track_id } => {
  161. *self = Paused {
  162. decoder: decoder,
  163. end_of_track: end_of_track,
  164. track_id: track_id,
  165. };
  166. }
  167. _ => panic!("invalid state"),
  168. }
  169. }
  170. }
  171. impl PlayerInternal {
  172. fn run(mut self) {
  173. loop {
  174. let cmd = if self.state.is_playing() {
  175. if self.sink_running
  176. {
  177. match self.commands.try_recv() {
  178. Ok(cmd) => Some(cmd),
  179. Err(TryRecvError::Empty) => None,
  180. Err(TryRecvError::Disconnected) => return,
  181. }
  182. }
  183. else
  184. {
  185. match self.commands.recv_timeout(Duration::from_secs(5)) {
  186. Ok(cmd) => Some(cmd),
  187. Err(RecvTimeoutError::Timeout) => None,
  188. Err(RecvTimeoutError::Disconnected) => return,
  189. }
  190. }
  191. } else {
  192. match self.commands.recv() {
  193. Ok(cmd) => Some(cmd),
  194. Err(RecvError) => return,
  195. }
  196. };
  197. if let Some(cmd) = cmd {
  198. self.handle_command(cmd);
  199. }
  200. if self.state.is_playing() && ! self.sink_running {
  201. self.start_sink();
  202. }
  203. if self.sink_running {
  204. let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
  205. Some(decoder.next_packet().expect("Vorbis error"))
  206. } else {
  207. None
  208. };
  209. if let Some(packet) = packet {
  210. self.handle_packet(packet);
  211. }
  212. }
  213. }
  214. }
  215. fn start_sink(&mut self) {
  216. match self.sink.start() {
  217. Ok(()) => self.sink_running = true,
  218. Err(err) => error!("Could not start audio: {}", err),
  219. }
  220. }
  221. fn stop_sink_if_running(&mut self) {
  222. if self.sink_running {
  223. self.stop_sink();
  224. }
  225. }
  226. fn stop_sink(&mut self) {
  227. self.sink.stop().unwrap();
  228. self.sink_running = false;
  229. }
  230. fn handle_packet(&mut self, packet: Option<VorbisPacket>) {
  231. match packet {
  232. Some(mut packet) => {
  233. if let Some(ref editor) = self.audio_filter {
  234. editor.modify_stream(&mut packet.data_mut())
  235. };
  236. if let Err(err) = self.sink.write(&packet.data()) {
  237. error!("Could not write audio: {}", err);
  238. self.stop_sink();
  239. }
  240. }
  241. None => {
  242. self.stop_sink();
  243. let new_state = match self.state {
  244. PlayerState::Playing { track_id, .. }
  245. | PlayerState::Paused { track_id, .. } =>
  246. PlayerState::EndOfTrack { track_id },
  247. _ => PlayerState::Stopped,
  248. };
  249. let old_state = mem::replace(&mut self.state, new_state);
  250. old_state.signal_end_of_track();
  251. }
  252. }
  253. }
  254. fn handle_command(&mut self, cmd: PlayerCommand) {
  255. debug!("command={:?}", cmd);
  256. match cmd {
  257. PlayerCommand::Load(track_id, play, position, end_of_track) => {
  258. if self.state.is_playing() {
  259. self.stop_sink_if_running();
  260. }
  261. match self.load_track(track_id, position as i64) {
  262. Some(decoder) => {
  263. if play {
  264. match self.state {
  265. PlayerState::Playing { track_id: old_track_id, ..}
  266. | PlayerState::EndOfTrack { track_id: old_track_id, .. } =>
  267. self.send_event(PlayerEvent::Changed {
  268. old_track_id: old_track_id,
  269. new_track_id: track_id
  270. }),
  271. _ => self.send_event(PlayerEvent::Started { track_id }),
  272. }
  273. self.start_sink();
  274. self.state = PlayerState::Playing {
  275. track_id: track_id,
  276. decoder: decoder,
  277. end_of_track: end_of_track,
  278. };
  279. } else {
  280. self.state = PlayerState::Paused {
  281. track_id: track_id,
  282. decoder: decoder,
  283. end_of_track: end_of_track,
  284. };
  285. self.send_event(PlayerEvent::Stopped { track_id });
  286. }
  287. }
  288. None => {
  289. let _ = end_of_track.send(());
  290. }
  291. }
  292. }
  293. PlayerCommand::Seek(position) => {
  294. if let Some(decoder) = self.state.decoder() {
  295. match decoder.seek(position as i64) {
  296. Ok(_) => (),
  297. Err(err) => error!("Vorbis error: {:?}", err),
  298. }
  299. } else {
  300. warn!("Player::seek called from invalid state");
  301. }
  302. }
  303. PlayerCommand::Play => {
  304. if let PlayerState::Paused { track_id, .. } = self.state {
  305. self.state.paused_to_playing();
  306. self.send_event(PlayerEvent::Started { track_id });
  307. self.start_sink();
  308. } else {
  309. warn!("Player::play called from invalid state");
  310. }
  311. }
  312. PlayerCommand::Pause => {
  313. if let PlayerState::Playing { track_id, .. } = self.state {
  314. self.state.playing_to_paused();
  315. self.stop_sink_if_running();
  316. self.send_event(PlayerEvent::Stopped { track_id });
  317. } else {
  318. warn!("Player::pause called from invalid state");
  319. }
  320. }
  321. PlayerCommand::Stop => {
  322. match self.state {
  323. PlayerState::Playing { track_id, .. }
  324. | PlayerState::Paused { track_id, .. }
  325. | PlayerState::EndOfTrack { track_id } => {
  326. self.stop_sink_if_running();
  327. self.send_event(PlayerEvent::Stopped { track_id });
  328. self.state = PlayerState::Stopped;
  329. },
  330. PlayerState::Stopped => {
  331. warn!("Player::stop called from invalid state");
  332. }
  333. PlayerState::Invalid => panic!("invalid state"),
  334. }
  335. }
  336. }
  337. }
  338. fn send_event(&mut self, event: PlayerEvent) {
  339. match self.config.event_sender {
  340. Some(ref s) =>
  341. match s.send(event.clone()) {
  342. Ok(_) => info!("Sent event {:?} to event listener.", event),
  343. Err(err) => error!("Failed to send event {:?} to listener: {:?}", event, err)
  344. }
  345. None => ()
  346. }
  347. }
  348. fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> {
  349. if track.available {
  350. Some(Cow::Borrowed(track))
  351. } else {
  352. let alternatives = track.alternatives
  353. .iter()
  354. .map(|alt_id| {
  355. Track::get(&self.session, *alt_id)
  356. });
  357. let alternatives = future::join_all(alternatives).wait().unwrap();
  358. alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
  359. }
  360. }
  361. fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<Decoder> {
  362. let track = Track::get(&self.session, track_id).wait().unwrap();
  363. info!("Loading track \"{}\"", track.name);
  364. let track = match self.find_available_alternative(&track) {
  365. Some(track) => track,
  366. None => {
  367. warn!("Track \"{}\" is not available", track.name);
  368. return None;
  369. }
  370. };
  371. let format = match self.config.bitrate {
  372. Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
  373. Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
  374. Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
  375. };
  376. let file_id = match track.files.get(&format) {
  377. Some(&file_id) => file_id,
  378. None => {
  379. warn!("Track \"{}\" is not available in format {:?}", track.name, format);
  380. return None;
  381. }
  382. };
  383. let key = self.session.audio_key().request(track.id, file_id).wait().unwrap();
  384. let encrypted_file = AudioFile::open(&self.session, file_id).wait().unwrap();
  385. let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7);
  386. let mut decoder = VorbisDecoder::new(audio_file).unwrap();
  387. match decoder.seek(position) {
  388. Ok(_) => (),
  389. Err(err) => error!("Vorbis error: {:?}", err),
  390. }
  391. info!("Track \"{}\" loaded", track.name);
  392. Some(decoder)
  393. }
  394. }
  395. impl Drop for PlayerInternal {
  396. fn drop(&mut self) {
  397. debug!("drop Player[{}]", self.session.session_id());
  398. }
  399. }
  400. impl ::std::fmt::Debug for PlayerCommand {
  401. fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
  402. match *self {
  403. PlayerCommand::Load(track, play, position, _) => {
  404. f.debug_tuple("Load")
  405. .field(&track)
  406. .field(&play)
  407. .field(&position)
  408. .finish()
  409. }
  410. PlayerCommand::Play => {
  411. f.debug_tuple("Play").finish()
  412. }
  413. PlayerCommand::Pause => {
  414. f.debug_tuple("Pause").finish()
  415. }
  416. PlayerCommand::Stop => {
  417. f.debug_tuple("Stop").finish()
  418. }
  419. PlayerCommand::Seek(position) => {
  420. f.debug_tuple("Seek")
  421. .field(&position)
  422. .finish()
  423. }
  424. }
  425. }
  426. }
  427. struct Subfile<T: Read + Seek> {
  428. stream: T,
  429. offset: u64,
  430. }
  431. impl<T: Read + Seek> Subfile<T> {
  432. pub fn new(mut stream: T, offset: u64) -> Subfile<T> {
  433. stream.seek(SeekFrom::Start(offset)).unwrap();
  434. Subfile {
  435. stream: stream,
  436. offset: offset,
  437. }
  438. }
  439. }
  440. impl<T: Read + Seek> Read for Subfile<T> {
  441. fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
  442. self.stream.read(buf)
  443. }
  444. }
  445. impl<T: Read + Seek> Seek for Subfile<T> {
  446. fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
  447. pos = match pos {
  448. SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
  449. x => x,
  450. };
  451. let newpos = try!(self.stream.seek(pos));
  452. if newpos > self.offset {
  453. Ok(newpos - self.offset)
  454. } else {
  455. Ok(0)
  456. }
  457. }
  458. }