player.rs 15 KB


  1. use futures::sync::oneshot;
  2. use futures::{future, Future};
  3. use std;
  4. use std::borrow::Cow;
  5. use std::io::{Read, Seek, SeekFrom, Result};
  6. use std::mem;
  7. use std::process::Command;
  8. use std::sync::mpsc::{RecvError, TryRecvError, RecvTimeoutError};
  9. use std::thread;
  10. use std::time::Duration;
  11. use core::config::{Bitrate, PlayerConfig};
  12. use core::session::Session;
  13. use core::util::SpotifyId;
  14. use audio_backend::Sink;
  15. use audio::{AudioFile, AudioDecrypt};
  16. use audio::{VorbisDecoder, VorbisPacket};
  17. use metadata::{FileFormat, Track, Metadata};
  18. use mixer::AudioFilter;
  19. pub struct Player {
  20. commands: Option<std::sync::mpsc::Sender<PlayerCommand>>,
  21. thread_handle: Option<thread::JoinHandle<()>>,
  22. }
  23. struct PlayerInternal {
  24. session: Session,
  25. config: PlayerConfig,
  26. commands: std::sync::mpsc::Receiver<PlayerCommand>,
  27. state: PlayerState,
  28. sink: Box<Sink>,
  29. sink_running: bool,
  30. audio_filter: Option<Box<AudioFilter + Send>>,
  31. }
  32. enum PlayerCommand {
  33. Load(SpotifyId, bool, u32, oneshot::Sender<()>),
  34. Play,
  35. Pause,
  36. Stop,
  37. Seek(u32),
  38. }
  39. impl Player {
  40. pub fn new<F>(config: PlayerConfig, session: Session,
  41. audio_filter: Option<Box<AudioFilter + Send>>,
  42. sink_builder: F) -> Player
  43. where F: FnOnce() -> Box<Sink> + Send + 'static
  44. {
  45. let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
  46. let handle = thread::spawn(move || {
  47. debug!("new Player[{}]", session.session_id());
  48. let internal = PlayerInternal {
  49. session: session,
  50. config: config,
  51. commands: cmd_rx,
  52. state: PlayerState::Stopped,
  53. sink: sink_builder(),
  54. sink_running: false,
  55. audio_filter: audio_filter,
  56. };
  57. internal.run();
  58. });
  59. Player {
  60. commands: Some(cmd_tx),
  61. thread_handle: Some(handle),
  62. }
  63. }
  64. fn command(&self, cmd: PlayerCommand) {
  65. self.commands.as_ref().unwrap().send(cmd).unwrap();
  66. }
  67. pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32)
  68. -> oneshot::Receiver<()>
  69. {
  70. let (tx, rx) = oneshot::channel();
  71. self.command(PlayerCommand::Load(track, start_playing, position_ms, tx));
  72. rx
  73. }
  74. pub fn play(&self) {
  75. self.command(PlayerCommand::Play)
  76. }
  77. pub fn pause(&self) {
  78. self.command(PlayerCommand::Pause)
  79. }
  80. pub fn stop(&self) {
  81. self.command(PlayerCommand::Stop)
  82. }
  83. pub fn seek(&self, position_ms: u32) {
  84. self.command(PlayerCommand::Seek(position_ms));
  85. }
  86. }
  87. impl Drop for Player {
  88. fn drop(&mut self) {
  89. debug!("Shutting down player thread ...");
  90. self.commands = None;
  91. if let Some(handle) = self.thread_handle.take() {
  92. match handle.join() {
  93. Ok(_) => (),
  94. Err(_) => error!("Player thread panicked!")
  95. }
  96. }
  97. }
  98. }
  99. type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>;
  100. enum PlayerState {
  101. Stopped,
  102. Paused {
  103. decoder: Decoder,
  104. end_of_track: oneshot::Sender<()>,
  105. },
  106. Playing {
  107. decoder: Decoder,
  108. end_of_track: oneshot::Sender<()>,
  109. },
  110. Invalid,
  111. }
  112. impl PlayerState {
  113. fn is_playing(&self) -> bool {
  114. use self::PlayerState::*;
  115. match *self {
  116. Stopped | Paused { .. } => false,
  117. Playing { .. } => true,
  118. Invalid => panic!("invalid state"),
  119. }
  120. }
  121. fn decoder(&mut self) -> Option<&mut Decoder> {
  122. use self::PlayerState::*;
  123. match *self {
  124. Stopped => None,
  125. Paused { ref mut decoder, .. } |
  126. Playing { ref mut decoder, .. } => Some(decoder),
  127. Invalid => panic!("invalid state"),
  128. }
  129. }
  130. fn signal_end_of_track(self) {
  131. use self::PlayerState::*;
  132. match self {
  133. Paused { end_of_track, .. } |
  134. Playing { end_of_track, .. } => {
  135. let _ = end_of_track.send(());
  136. }
  137. Stopped => warn!("signal_end_of_track from stopped state"),
  138. Invalid => panic!("invalid state"),
  139. }
  140. }
  141. fn paused_to_playing(&mut self) {
  142. use self::PlayerState::*;
  143. match ::std::mem::replace(self, Invalid) {
  144. Paused { decoder, end_of_track } => {
  145. *self = Playing {
  146. decoder: decoder,
  147. end_of_track: end_of_track,
  148. };
  149. }
  150. _ => panic!("invalid state"),
  151. }
  152. }
  153. fn playing_to_paused(&mut self) {
  154. use self::PlayerState::*;
  155. match ::std::mem::replace(self, Invalid) {
  156. Playing { decoder, end_of_track } => {
  157. *self = Paused {
  158. decoder: decoder,
  159. end_of_track: end_of_track,
  160. };
  161. }
  162. _ => panic!("invalid state"),
  163. }
  164. }
  165. }
  166. impl PlayerInternal {
  167. fn run(mut self) {
  168. loop {
  169. let cmd = if self.state.is_playing() {
  170. if self.sink_running
  171. {
  172. match self.commands.try_recv() {
  173. Ok(cmd) => Some(cmd),
  174. Err(TryRecvError::Empty) => None,
  175. Err(TryRecvError::Disconnected) => return,
  176. }
  177. }
  178. else
  179. {
  180. match self.commands.recv_timeout(Duration::from_secs(5)) {
  181. Ok(cmd) => Some(cmd),
  182. Err(RecvTimeoutError::Timeout) => None,
  183. Err(RecvTimeoutError::Disconnected) => return,
  184. }
  185. }
  186. } else {
  187. match self.commands.recv() {
  188. Ok(cmd) => Some(cmd),
  189. Err(RecvError) => return,
  190. }
  191. };
  192. if let Some(cmd) = cmd {
  193. self.handle_command(cmd);
  194. }
  195. if self.state.is_playing() && ! self.sink_running {
  196. self.start_sink();
  197. }
  198. if self.sink_running {
  199. let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
  200. Some(decoder.next_packet().expect("Vorbis error"))
  201. } else {
  202. None
  203. };
  204. if let Some(packet) = packet {
  205. self.handle_packet(packet);
  206. }
  207. }
  208. }
  209. }
  210. fn start_sink(&mut self) {
  211. match self.sink.start() {
  212. Ok(()) => self.sink_running = true,
  213. Err(err) => error!("Could not start audio: {}", err),
  214. }
  215. }
  216. fn stop_sink_if_running(&mut self) {
  217. if self.sink_running {
  218. self.stop_sink();
  219. }
  220. }
  221. fn stop_sink(&mut self) {
  222. self.sink.stop().unwrap();
  223. self.sink_running = false;
  224. }
  225. fn handle_packet(&mut self, packet: Option<VorbisPacket>) {
  226. match packet {
  227. Some(mut packet) => {
  228. if let Some(ref editor) = self.audio_filter {
  229. editor.modify_stream(&mut packet.data_mut())
  230. };
  231. if let Err(err) = self.sink.write(&packet.data()) {
  232. error!("Could not write audio: {}", err);
  233. self.stop_sink();
  234. }
  235. }
  236. None => {
  237. self.stop_sink();
  238. self.run_onstop();
  239. let old_state = mem::replace(&mut self.state, PlayerState::Stopped);
  240. old_state.signal_end_of_track();
  241. }
  242. }
  243. }
  244. fn handle_command(&mut self, cmd: PlayerCommand) {
  245. debug!("command={:?}", cmd);
  246. match cmd {
  247. PlayerCommand::Load(track_id, play, position, end_of_track) => {
  248. if self.state.is_playing() {
  249. self.stop_sink_if_running();
  250. }
  251. match self.load_track(track_id, position as i64) {
  252. Some(decoder) => {
  253. if play {
  254. if !self.state.is_playing() {
  255. self.run_onstart();
  256. }
  257. self.start_sink();
  258. self.state = PlayerState::Playing {
  259. decoder: decoder,
  260. end_of_track: end_of_track,
  261. };
  262. } else {
  263. if self.state.is_playing() {
  264. self.run_onstop();
  265. }
  266. self.state = PlayerState::Paused {
  267. decoder: decoder,
  268. end_of_track: end_of_track,
  269. };
  270. }
  271. }
  272. None => {
  273. let _ = end_of_track.send(());
  274. if self.state.is_playing() {
  275. self.run_onstop();
  276. }
  277. }
  278. }
  279. }
  280. PlayerCommand::Seek(position) => {
  281. if let Some(decoder) = self.state.decoder() {
  282. match decoder.seek(position as i64) {
  283. Ok(_) => (),
  284. Err(err) => error!("Vorbis error: {:?}", err),
  285. }
  286. } else {
  287. warn!("Player::seek called from invalid state");
  288. }
  289. }
  290. PlayerCommand::Play => {
  291. if let PlayerState::Paused { .. } = self.state {
  292. self.state.paused_to_playing();
  293. self.run_onstart();
  294. self.start_sink();
  295. } else {
  296. warn!("Player::play called from invalid state");
  297. }
  298. }
  299. PlayerCommand::Pause => {
  300. if let PlayerState::Playing { .. } = self.state {
  301. self.state.playing_to_paused();
  302. self.stop_sink_if_running();
  303. self.run_onstop();
  304. } else {
  305. warn!("Player::pause called from invalid state");
  306. }
  307. }
  308. PlayerCommand::Stop => {
  309. match self.state {
  310. PlayerState::Playing { .. } => {
  311. self.stop_sink_if_running();
  312. self.run_onstop();
  313. self.state = PlayerState::Stopped;
  314. }
  315. PlayerState::Paused { .. } => {
  316. self.state = PlayerState::Stopped;
  317. },
  318. PlayerState::Stopped => {
  319. warn!("Player::stop called from invalid state");
  320. }
  321. PlayerState::Invalid => panic!("invalid state"),
  322. }
  323. }
  324. }
  325. }
  326. fn run_onstart(&self) {
  327. if let Some(ref program) = self.config.onstart {
  328. run_program(program)
  329. }
  330. }
  331. fn run_onstop(&self) {
  332. if let Some(ref program) = self.config.onstop {
  333. run_program(program)
  334. }
  335. }
  336. fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> {
  337. if track.available {
  338. Some(Cow::Borrowed(track))
  339. } else {
  340. let alternatives = track.alternatives
  341. .iter()
  342. .map(|alt_id| {
  343. Track::get(&self.session, *alt_id)
  344. });
  345. let alternatives = future::join_all(alternatives).wait().unwrap();
  346. alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
  347. }
  348. }
  349. fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<Decoder> {
  350. let track = Track::get(&self.session, track_id).wait().unwrap();
  351. info!("Loading track \"{}\"", track.name);
  352. let track = match self.find_available_alternative(&track) {
  353. Some(track) => track,
  354. None => {
  355. warn!("Track \"{}\" is not available", track.name);
  356. return None;
  357. }
  358. };
  359. let format = match self.config.bitrate {
  360. Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
  361. Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
  362. Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
  363. };
  364. let file_id = match track.files.get(&format) {
  365. Some(&file_id) => file_id,
  366. None => {
  367. warn!("Track \"{}\" is not available in format {:?}", track.name, format);
  368. return None;
  369. }
  370. };
  371. let key = self.session.audio_key().request(track.id, file_id).wait().unwrap();
  372. let encrypted_file = AudioFile::open(&self.session, file_id).wait().unwrap();
  373. let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7);
  374. let mut decoder = VorbisDecoder::new(audio_file).unwrap();
  375. match decoder.seek(position) {
  376. Ok(_) => (),
  377. Err(err) => error!("Vorbis error: {:?}", err),
  378. }
  379. info!("Track \"{}\" loaded", track.name);
  380. Some(decoder)
  381. }
  382. }
  383. impl Drop for PlayerInternal {
  384. fn drop(&mut self) {
  385. debug!("drop Player[{}]", self.session.session_id());
  386. }
  387. }
  388. impl ::std::fmt::Debug for PlayerCommand {
  389. fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
  390. match *self {
  391. PlayerCommand::Load(track, play, position, _) => {
  392. f.debug_tuple("Load")
  393. .field(&track)
  394. .field(&play)
  395. .field(&position)
  396. .finish()
  397. }
  398. PlayerCommand::Play => {
  399. f.debug_tuple("Play").finish()
  400. }
  401. PlayerCommand::Pause => {
  402. f.debug_tuple("Pause").finish()
  403. }
  404. PlayerCommand::Stop => {
  405. f.debug_tuple("Stop").finish()
  406. }
  407. PlayerCommand::Seek(position) => {
  408. f.debug_tuple("Seek")
  409. .field(&position)
  410. .finish()
  411. }
  412. }
  413. }
  414. }
  415. struct Subfile<T: Read + Seek> {
  416. stream: T,
  417. offset: u64,
  418. }
  419. impl<T: Read + Seek> Subfile<T> {
  420. pub fn new(mut stream: T, offset: u64) -> Subfile<T> {
  421. stream.seek(SeekFrom::Start(offset)).unwrap();
  422. Subfile {
  423. stream: stream,
  424. offset: offset,
  425. }
  426. }
  427. }
  428. impl<T: Read + Seek> Read for Subfile<T> {
  429. fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
  430. self.stream.read(buf)
  431. }
  432. }
  433. impl<T: Read + Seek> Seek for Subfile<T> {
  434. fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
  435. pos = match pos {
  436. SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
  437. x => x,
  438. };
  439. let newpos = try!(self.stream.seek(pos));
  440. if newpos > self.offset {
  441. Ok(newpos - self.offset)
  442. } else {
  443. Ok(0)
  444. }
  445. }
  446. }
  447. fn run_program(program: &str) {
  448. info!("Running {}", program);
  449. let mut v: Vec<&str> = program.split_whitespace().collect();
  450. let status = Command::new(&v.remove(0))
  451. .args(&v)
  452. .status()
  453. .expect("program failed to start");
  454. info!("Exit status: {}", status);
  455. }