Ver código fonte

Merge pull request #6 from brain0/work

Make librespot more robust against audio failures
Colm 7 anos atrás
pai
commit
5302bd1072
7 arquivos alterados com 153 adições e 49 exclusões
  1. 1 0
      Cargo.lock
  2. 2 1
      Cargo.toml
  3. 1 0
      core/src/lib.in.rs
  4. 66 26
      src/audio_backend/pulseaudio.rs
  5. 1 0
      src/lib.in.rs
  6. 3 0
      src/lib.rs
  7. 79 22
      src/player.rs

+ 1 - 0
Cargo.lock

@@ -264,6 +264,7 @@ dependencies = [
  "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
  "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
  "libpulse-sys 0.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "librespot-audio 0.1.0",
  "librespot-core 0.1.0",

+ 2 - 1
Cargo.toml

@@ -52,6 +52,7 @@ url = "1.3"
 alsa            = { git = "https://github.com/plietar/rust-alsa", optional = true }
 portaudio-rs    = { version = "0.3.0", optional = true }
 libpulse-sys    = { version = "0.0.0", optional = true }
+libc            = { version = "0.2", optional = true }
 
 [build-dependencies]
 rand            = "0.3.13"
@@ -61,7 +62,7 @@ protobuf_macros = { git = "https://github.com/plietar/rust-protobuf-macros", fea
 [features]
 alsa-backend = ["alsa"]
 portaudio-backend = ["portaudio-rs"]
-pulseaudio-backend = ["libpulse-sys"]
+pulseaudio-backend = ["libpulse-sys", "libc"]
 
 with-tremor = ["librespot-audio/with-tremor"]
 with-lewton = ["librespot-audio/with-lewton"]

+ 1 - 0
core/src/lib.in.rs

@@ -1 +1,2 @@
+#[allow(unused_mut)]
 pub mod connection;

+ 66 - 26
src/audio_backend/pulseaudio.rs

@@ -2,8 +2,10 @@ use super::{Open, Sink};
 use std::io;
 use libpulse_sys::*;
 use std::ptr::{null, null_mut};
-use std::mem::{transmute};
 use std::ffi::CString;
+use std::ffi::CStr;
+use std::mem;
+use libc;
 
 pub struct PulseAudioSink {
     s    : *mut pa_simple,
@@ -12,6 +14,39 @@ pub struct PulseAudioSink {
     desc : CString
 }
 
+fn call_pulseaudio<T, F, FailCheck>(f: F, fail_check: FailCheck, kind: io::ErrorKind) -> io::Result<T> where
+    T: Copy,
+    F: Fn(*mut libc::c_int) -> T,
+    FailCheck: Fn(T) -> bool,
+{
+    let mut error: libc::c_int = 0;
+    let ret = f(&mut error);
+    if fail_check(ret) {
+        let err_cstr = unsafe { CStr::from_ptr(pa_strerror(error)) };
+        let errstr =  err_cstr.to_string_lossy().into_owned();
+        Err(io::Error::new(kind, errstr))
+    } else {
+        Ok(ret)
+    }
+}
+
+impl PulseAudioSink {
+    fn free_connection(&mut self) {
+        if self.s != null_mut() {
+            unsafe {
+                pa_simple_free(self.s);
+            }
+            self.s = null_mut();
+        }
+    }
+}
+
+impl Drop for PulseAudioSink {
+    fn drop(&mut self) {
+        self.free_connection();
+    }
+}
+
 impl Open for PulseAudioSink {
    fn open(device: Option<String>) -> PulseAudioSink {
         debug!("Using PulseAudio sink");
@@ -27,7 +62,7 @@ impl Open for PulseAudioSink {
         };
         
         let name = CString::new("librespot").unwrap();
-        let description = CString::new("A spoty client library").unwrap();
+        let description = CString::new("Spotify endpoint").unwrap();
 
         PulseAudioSink {
             s: null_mut(),
@@ -41,38 +76,43 @@ impl Open for PulseAudioSink {
 impl Sink for PulseAudioSink {
     fn start(&mut self) -> io::Result<()> {
         if self.s == null_mut() {
-            self.s = unsafe {
-                pa_simple_new(null(),               // Use the default server.
-                              self.name.as_ptr(),   // Our application's name.
-                              PA_STREAM_PLAYBACK,
-                              null(),               // Use the default device.
-                              self.desc.as_ptr(),   // desc of our stream.
-                              &self.ss,             // Our sample format.
-                              null(),               // Use default channel map
-                              null(),               // Use default buffering attributes.
-                              null_mut(),           // Ignore error code.
-                )
-            };
-            assert!(self.s != null_mut());
+            self.s = call_pulseaudio(
+                |err| unsafe {
+                    pa_simple_new(null(),               // Use the default server.
+                                  self.name.as_ptr(),   // Our application's name.
+                                  PA_STREAM_PLAYBACK,
+                                  null(),               // Use the default device.
+                                  self.desc.as_ptr(),   // desc of our stream.
+                                  &self.ss,             // Our sample format.
+                                  null(),               // Use default channel map
+                                  null(),               // Use default buffering attributes.
+                                  err)
+                },
+                |ptr| ptr == null_mut(),
+                io::ErrorKind::ConnectionRefused)?;
         }
         Ok(())
     }
 
     fn stop(&mut self) -> io::Result<()> {
-        unsafe {
-            pa_simple_free(self.s);
-        }
-        self.s = null_mut();
+        self.free_connection();
         Ok(())
     }
 
     fn write(&mut self, data: &[i16]) -> io::Result<()> {
-        unsafe {
-            let ptr = transmute(data.as_ptr());
-            let bytes = data.len() as usize * 2;
-            pa_simple_write(self.s, ptr, bytes, null_mut());
-        };
-        
-        Ok(())
+        if self.s == null_mut() {
+            Err(io::Error::new(io::ErrorKind::NotConnected, "Not connected to pulseaudio"))
+        }
+        else {
+            let ptr = data.as_ptr() as *const libc::c_void;
+            let len = data.len() as usize * mem::size_of::<i16>();
+            call_pulseaudio(
+                |err| unsafe {
+                    pa_simple_write(self.s, ptr, len, err)
+                },
+                |ret| ret < 0,
+                io::ErrorKind::BrokenPipe)?;
+            Ok(())
+        }
     }
 }

+ 1 - 0
src/lib.in.rs

@@ -1 +1,2 @@
+#[allow(unused_mut)]
 pub mod spirc;

+ 3 - 0
src/lib.rs

@@ -34,6 +34,9 @@ extern crate portaudio_rs;
 #[cfg(feature = "libpulse-sys")]
 extern crate libpulse_sys;
 
+#[cfg(feature = "libc")]
+extern crate libc;
+
 pub mod audio_backend;
 pub mod discovery;
 pub mod keymaster;

+ 79 - 22
src/player.rs

@@ -2,8 +2,9 @@ use futures::sync::oneshot;
 use futures::{future, Future};
 use std::borrow::Cow;
 use std::mem;
-use std::sync::mpsc::{RecvError, TryRecvError};
+use std::sync::mpsc::{RecvError, TryRecvError, RecvTimeoutError};
 use std::thread;
+use std::time::Duration;
 use std;
 
 use core::config::{Bitrate, PlayerConfig};
@@ -16,9 +17,9 @@ use audio::{VorbisDecoder, VorbisPacket};
 use metadata::{FileFormat, Track, Metadata};
 use mixer::AudioFilter;
 
-#[derive(Clone)]
 pub struct Player {
-    commands: std::sync::mpsc::Sender<PlayerCommand>,
+    commands: Option<std::sync::mpsc::Sender<PlayerCommand>>,
+    thread_handle: Option<thread::JoinHandle<()>>,
 }
 
 struct PlayerInternal {
@@ -28,6 +29,7 @@ struct PlayerInternal {
 
     state: PlayerState,
     sink: Box<Sink>,
+    sink_running: bool,
     audio_filter: Option<Box<AudioFilter + Send>>,
 }
 
@@ -47,7 +49,7 @@ impl Player {
     {
         let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
 
-        thread::spawn(move || {
+        let handle = thread::spawn(move || {
             debug!("new Player[{}]", session.session_id());
 
             let internal = PlayerInternal {
@@ -57,6 +59,7 @@ impl Player {
 
                 state: PlayerState::Stopped,
                 sink: sink_builder(),
+                sink_running: false,
                 audio_filter: audio_filter,
             };
 
@@ -64,12 +67,13 @@ impl Player {
         });
 
         Player {
-            commands: cmd_tx,
+            commands: Some(cmd_tx),
+            thread_handle: Some(handle),
         }
     }
 
     fn command(&self, cmd: PlayerCommand) {
-        self.commands.send(cmd).unwrap();
+        self.commands.as_ref().unwrap().send(cmd).unwrap();
     }
 
     pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32)
@@ -98,6 +102,19 @@ impl Player {
     }
 }
 
+impl Drop for Player {
+    fn drop(&mut self) {
+        debug!("Shutting down player thread ...");
+        self.commands = None;
+        if let Some(handle) = self.thread_handle.take() {
+            match handle.join() {
+                Ok(_) => (),
+                Err(_) => error!("Player thread panicked!")
+            }
+        }
+    }
+}
+
 type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>;
 enum PlayerState {
     Stopped,
@@ -177,10 +194,21 @@ impl PlayerInternal {
     fn run(mut self) {
         loop {
             let cmd = if self.state.is_playing() {
-                match self.commands.try_recv() {
-                    Ok(cmd) => Some(cmd),
-                    Err(TryRecvError::Empty) => None,
-                    Err(TryRecvError::Disconnected) => return,
+                if self.sink_running
+                {
+                    match self.commands.try_recv() {
+                        Ok(cmd) => Some(cmd),
+                        Err(TryRecvError::Empty) => None,
+                        Err(TryRecvError::Disconnected) => return,
+                    }
+                }
+                else
+                {
+                    match self.commands.recv_timeout(Duration::from_secs(5)) {
+                        Ok(cmd) => Some(cmd),
+                        Err(RecvTimeoutError::Timeout) => None,
+                        Err(RecvTimeoutError::Disconnected) => return,
+                    }
                 }
             } else {
                 match self.commands.recv() {
@@ -193,16 +221,42 @@ impl PlayerInternal {
                 self.handle_command(cmd);
             }
 
-            let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
-                Some(decoder.next_packet().expect("Vorbis error"))
-            } else { None };
+            if self.state.is_playing() && ! self.sink_running {
+                self.start_sink();
+            }
+
+            if self.sink_running {
+                let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
+                    Some(decoder.next_packet().expect("Vorbis error"))
+                } else {
+                    None
+                };
 
-            if let Some(packet) = packet {
-                self.handle_packet(packet);
+                if let Some(packet) = packet {
+                    self.handle_packet(packet);
+                }
             }
         }
     }
 
+    fn start_sink(&mut self) {
+        match self.sink.start() {
+            Ok(()) => self.sink_running = true,
+            Err(err) => error!("Could not start audio: {}", err),
+        }
+    }
+
+    fn stop_sink_if_running(&mut self) {
+        if self.sink_running {
+            self.stop_sink();
+        }
+    }
+
+    fn stop_sink(&mut self) {
+        self.sink.stop().unwrap();
+        self.sink_running = false;
+    }
+
     fn handle_packet(&mut self, packet: Option<VorbisPacket>) {
         match packet {
             Some(mut packet) => {
@@ -210,11 +264,14 @@ impl PlayerInternal {
                     editor.modify_stream(&mut packet.data_mut())
                 };
 
-                self.sink.write(&packet.data()).unwrap();
+                if let Err(err) = self.sink.write(&packet.data()) {
+                    error!("Could not write audio: {}", err);
+                    self.stop_sink();
+                }
             }
 
             None => {
-                self.sink.stop().unwrap();
+                self.stop_sink();
                 self.run_onstop();
 
                 let old_state = mem::replace(&mut self.state, PlayerState::Stopped);
@@ -228,7 +285,7 @@ impl PlayerInternal {
         match cmd {
             PlayerCommand::Load(track_id, play, position, end_of_track) => {
                 if self.state.is_playing() {
-                    self.sink.stop().unwrap();
+                    self.stop_sink_if_running();
                 }
 
                 match self.load_track(track_id, position as i64) {
@@ -237,7 +294,7 @@ impl PlayerInternal {
                             if !self.state.is_playing() {
                                 self.run_onstart();
                             }
-                            self.sink.start().unwrap();
+                            self.start_sink();
 
                             self.state = PlayerState::Playing {
                                 decoder: decoder,
@@ -280,7 +337,7 @@ impl PlayerInternal {
                     self.state.paused_to_playing();
 
                     self.run_onstart();
-                    self.sink.start().unwrap();
+                    self.start_sink();
                 } else {
                     warn!("Player::play called from invalid state");
                 }
@@ -290,7 +347,7 @@ impl PlayerInternal {
                 if let PlayerState::Playing { .. } = self.state {
                     self.state.playing_to_paused();
 
-                    self.sink.stop().unwrap();
+                    self.stop_sink_if_running();
                     self.run_onstop();
                 } else {
                     warn!("Player::pause called from invalid state");
@@ -300,7 +357,7 @@ impl PlayerInternal {
             PlayerCommand::Stop => {
                 match self.state {
                     PlayerState::Playing { .. } => {
-                        self.sink.stop().unwrap();
+                        self.stop_sink_if_running();
                         self.run_onstop();
                         self.state = PlayerState::Stopped;
                     }