Browse Source

Merge branch 'feature/mixer' of https://github.com/romerod/librespot into romerod-feature/mixer

Paul Lietar 7 years ago
parent
commit
387c2598e0
6 changed files with 161 additions and 95 deletions
  1. 1 0
      src/lib.rs
  2. 13 6
      src/main.rs
  3. 23 0
      src/mixer/mod.rs
  4. 48 0
      src/mixer/softmixer.rs
  5. 9 47
      src/player.rs
  6. 67 42
      src/spirc.rs

+ 1 - 0
src/lib.rs

@@ -60,6 +60,7 @@ pub mod player;
 pub mod stream;
 pub mod stream;
 pub mod util;
 pub mod util;
 pub mod version;
 pub mod version;
+pub mod mixer;
 
 
 #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs"));
 #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs"));
 #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs");
 #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs");

+ 13 - 6
src/main.rs

@@ -18,6 +18,8 @@ use librespot::audio_backend::{self, BACKENDS};
 use librespot::cache::{Cache, DefaultCache, NoCache};
 use librespot::cache::{Cache, DefaultCache, NoCache};
 use librespot::player::Player;
 use librespot::player::Player;
 use librespot::session::{Bitrate, Config, Session};
 use librespot::session::{Bitrate, Config, Session};
+use librespot::mixer::{self, Mixer};
+
 use librespot::version;
 use librespot::version;
 
 
 fn usage(program: &str, opts: &getopts::Options) -> String {
 fn usage(program: &str, opts: &getopts::Options) -> String {
@@ -59,7 +61,7 @@ fn list_backends() {
     }
     }
 }
 }
 
 
-fn setup(args: &[String]) -> (Session, Player) {
+fn setup(args: &[String]) -> (Session, Player, Box<Mixer + Send>) {
     let mut opts = getopts::Options::new();
     let mut opts = getopts::Options::new();
     opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE")
     opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE")
         .reqopt("n", "name", "Device name", "NAME")
         .reqopt("n", "name", "Device name", "NAME")
@@ -70,7 +72,8 @@ fn setup(args: &[String]) -> (Session, Player) {
         .optopt("u", "username", "Username to sign in with", "USERNAME")
         .optopt("u", "username", "Username to sign in with", "USERNAME")
         .optopt("p", "password", "Password", "PASSWORD")
         .optopt("p", "password", "Password", "PASSWORD")
         .optopt("", "backend", "Audio backend to use. Use '?' to list options", "BACKEND")
         .optopt("", "backend", "Audio backend to use. Use '?' to list options", "BACKEND")
-        .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE");
+        .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE")
+        .optopt("", "mixer", "Mixer to use", "MIXER");
 
 
     let matches = match opts.parse(&args[1..]) {
     let matches = match opts.parse(&args[1..]) {
         Ok(m) => m,
         Ok(m) => m,
@@ -119,20 +122,24 @@ fn setup(args: &[String]) -> (Session, Player) {
     let credentials = get_credentials(&session, matches.opt_str("username"),
     let credentials = get_credentials(&session, matches.opt_str("username"),
     matches.opt_str("password"));
     matches.opt_str("password"));
     session.login(credentials).unwrap();
     session.login(credentials).unwrap();
+ 
+    let mixer_name = matches.opt_str("mixer");
+    let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer");
+    let audio_filter = mixer.get_audio_filter();
 
 
     let device_name = matches.opt_str("device");
     let device_name = matches.opt_str("device");
-    let player = Player::new(session.clone(), move || {
+    let player = Player::new(session.clone(), audio_filter, move || {
         (backend)(device_name.as_ref().map(AsRef::as_ref))
         (backend)(device_name.as_ref().map(AsRef::as_ref))
     });
     });
 
 
-    (session, player)
+    (session, player, mixer)
 }
 }
 
 
 fn main() {
 fn main() {
     let args: Vec<String> = std::env::args().collect();
     let args: Vec<String> = std::env::args().collect();
-    let (session, player) = setup(&args);
+    let (session, player, mixer) = setup(&args);
 
 
-    let spirc = SpircManager::new(session.clone(), player);
+    let spirc = SpircManager::new(session.clone(), player, mixer);
     let spirc_signal = spirc.clone();
     let spirc_signal = spirc.clone();
     thread::spawn(move || spirc.run());
     thread::spawn(move || spirc.run());
 
 

+ 23 - 0
src/mixer/mod.rs

@@ -0,0 +1,23 @@
+use self::softmixer::SoftMixer;
+
+pub mod softmixer;
+
+pub trait Mixer {
+    fn start(&self);
+    fn stop(&self);
+    fn set_volume(&self, volume: u16);
+    fn volume(&self) -> u16;
+    fn get_audio_filter(&self) -> Option<Box<AudioFilter + Send>> {
+        None
+    }
+}
+
+pub trait AudioFilter {
+  fn modify_stream(&self, data: &mut [i16]);
+}
+
+pub fn find<T: AsRef<str>>(name: Option<T>) -> Option<Box<Mixer + Send>> {
+  match name {
+    _ => Some(Box::new(SoftMixer::new())),
+  }
+}

+ 48 - 0
src/mixer/softmixer.rs

@@ -0,0 +1,48 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use super::Mixer;
+use super::AudioFilter;
+
+pub struct SoftMixer {
+  volume: Arc<AtomicUsize>
+}
+
+impl SoftMixer {
+    pub fn new() -> SoftMixer {
+        SoftMixer {
+            volume: Arc::new(AtomicUsize::new(0xFFFF))
+        }
+    }
+}
+
+impl Mixer for SoftMixer {
+    fn start(&self) {
+    }
+    fn stop(&self) {
+    }
+    fn volume(&self) -> u16 {
+        self.volume.load(Ordering::Relaxed) as u16
+    }
+    fn set_volume(&self, volume: u16) {
+        self.volume.store(volume as usize, Ordering::Relaxed);
+    }
+    fn get_audio_filter(&self) -> Option<Box<AudioFilter + Send>> {
+        Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() }))
+    }
+}
+
+struct SoftVolumeApplier {
+  volume: Arc<AtomicUsize>
+}
+
+impl AudioFilter for SoftVolumeApplier {
+    fn modify_stream(&self, data: &mut [i16]) {
+        let volume = self.volume.load(Ordering::Relaxed) as u16;
+        if volume != 0xFFFF {
+            for x in data.iter_mut() {
+                *x = (*x as i32 * volume as i32 / 0xFFFF) as i16;
+            }
+        }
+    }
+}

+ 9 - 47
src/player.rs

@@ -9,6 +9,7 @@ use audio_decrypt::AudioDecrypt;
 use audio_backend::Sink;
 use audio_backend::Sink;
 use metadata::{FileFormat, Track, TrackRef};
 use metadata::{FileFormat, Track, TrackRef};
 use session::{Bitrate, Session};
 use session::{Bitrate, Session};
+use mixer::AudioFilter;
 use util::{self, ReadSeek, SpotifyId, Subfile};
 use util::{self, ReadSeek, SpotifyId, Subfile};
 pub use spirc::PlayStatus;
 pub use spirc::PlayStatus;
 
 
@@ -47,8 +48,6 @@ pub struct PlayerState {
     pub status: PlayStatus,
     pub status: PlayStatus,
     pub position_ms: u32,
     pub position_ms: u32,
     pub position_measured_at: i64,
     pub position_measured_at: i64,
-    pub update_time: i64,
-    pub volume: u16,
     pub track: Option<SpotifyId>,
     pub track: Option<SpotifyId>,
 
 
     pub end_of_track: bool,
     pub end_of_track: bool,
@@ -67,14 +66,13 @@ enum PlayerCommand {
     Load(SpotifyId, bool, u32),
     Load(SpotifyId, bool, u32),
     Play,
     Play,
     Pause,
     Pause,
-    Volume(u16),
     Stop,
     Stop,
     Seek(u32),
     Seek(u32),
     SeekAt(u32, i64),
     SeekAt(u32, i64),
 }
 }
 
 
 impl Player {
 impl Player {
-    pub fn new<F>(session: Session, sink_builder: F) -> Player
+    pub fn new<F>(session: Session, stream_editor: Option<Box<AudioFilter + Send>>, sink_builder: F) -> Player
         where F: FnOnce() -> Box<Sink> + Send + 'static {
         where F: FnOnce() -> Box<Sink> + Send + 'static {
         let (cmd_tx, cmd_rx) = mpsc::channel();
         let (cmd_tx, cmd_rx) = mpsc::channel();
 
 
@@ -82,8 +80,6 @@ impl Player {
             status: PlayStatus::kPlayStatusStop,
             status: PlayStatus::kPlayStatusStop,
             position_ms: 0,
             position_ms: 0,
             position_measured_at: 0,
             position_measured_at: 0,
-            update_time: util::now_ms(),
-            volume: 0xFFFF,
             track: None,
             track: None,
             end_of_track: false,
             end_of_track: false,
         }));
         }));
@@ -97,7 +93,7 @@ impl Player {
             observers: observers.clone(),
             observers: observers.clone(),
         };
         };
 
 
-        thread::spawn(move || internal.run(sink_builder()));
+        thread::spawn(move || internal.run(sink_builder(), stream_editor));
 
 
         Player {
         Player {
             commands: cmd_tx,
             commands: cmd_tx,
@@ -138,30 +134,11 @@ impl Player {
         self.state.lock().unwrap().clone()
         self.state.lock().unwrap().clone()
     }
     }
 
 
-    pub fn volume(&self, vol: u16) {
-        self.command(PlayerCommand::Volume(vol));
-    }
-
     pub fn add_observer(&self, observer: PlayerObserver) {
     pub fn add_observer(&self, observer: PlayerObserver) {
         self.observers.lock().unwrap().push(observer);
         self.observers.lock().unwrap().push(observer);
     }
     }
 }
 }
 
 
-fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> {
-    // Fast path when volume is 100%
-    if volume == 0xFFFF {
-        Cow::Borrowed(data)
-    } else {
-        Cow::Owned(data.iter()
-                       .map(|&x| {
-                           (x as i32
-                            * volume as i32
-                            / 0xFFFF) as i16
-                       })
-                       .collect())
-    }
-}
-
 fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option<Cow<'a, Track>> {
 fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option<Cow<'a, Track>> {
     if track.available {
     if track.available {
         Some(Cow::Borrowed(track))
         Some(Cow::Borrowed(track))
@@ -229,7 +206,7 @@ fn run_onstop(session: &Session) {
 }
 }
 
 
 impl PlayerInternal {
 impl PlayerInternal {
-    fn run(self, mut sink: Box<Sink>) {
+    fn run(self, mut sink: Box<Sink>, stream_editor: Option<Box<AudioFilter + Send>>) {
         let mut decoder = None;
         let mut decoder = None;
 
 
         loop {
         loop {
@@ -334,7 +311,6 @@ impl PlayerInternal {
                 Some(PlayerCommand::Pause) => {
                 Some(PlayerCommand::Pause) => {
                     self.update(|state| {
                     self.update(|state| {
                         state.status = PlayStatus::kPlayStatusPause;
                         state.status = PlayStatus::kPlayStatusPause;
-                        state.update_time = util::now_ms();
                         state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32;
                         state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32;
                         state.position_measured_at = util::now_ms();
                         state.position_measured_at = util::now_ms();
                         true
                         true
@@ -343,12 +319,6 @@ impl PlayerInternal {
                     sink.stop().unwrap();
                     sink.stop().unwrap();
                     run_onstop(&self.session);
                     run_onstop(&self.session);
                 }
                 }
-                Some(PlayerCommand::Volume(vol)) => {
-                    self.update(|state| {
-                        state.volume = vol;
-                        true
-                    });
-                }
                 Some(PlayerCommand::Stop) => {
                 Some(PlayerCommand::Stop) => {
                     self.update(|state| {
                     self.update(|state| {
                         if state.status == PlayStatus::kPlayStatusPlay {
                         if state.status == PlayStatus::kPlayStatusPlay {
@@ -370,10 +340,11 @@ impl PlayerInternal {
                 let packet = decoder.as_mut().unwrap().packets().next();
                 let packet = decoder.as_mut().unwrap().packets().next();
 
 
                 match packet {
                 match packet {
-                    Some(Ok(packet)) => {
-                        let buffer = apply_volume(self.state.lock().unwrap().volume,
-                                                  &packet.data);
-                        sink.write(&buffer).unwrap();
+                    Some(Ok(mut packet)) => {
+                        if let Some(ref editor) = stream_editor {
+                            editor.modify_stream(&mut packet.data)
+                        };
+                        sink.write(&packet.data).unwrap();
 
 
                         self.update(|state| {
                         self.update(|state| {
                             state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
                             state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
@@ -408,7 +379,6 @@ impl PlayerInternal {
 
 
         let observers = self.observers.lock().unwrap();
         let observers = self.observers.lock().unwrap();
         if update {
         if update {
-            guard.update_time = util::now_ms();
             let state = guard.clone();
             let state = guard.clone();
             drop(guard);
             drop(guard);
 
 
@@ -428,14 +398,6 @@ impl PlayerState {
         (self.position_ms, self.position_measured_at)
         (self.position_ms, self.position_measured_at)
     }
     }
 
 
-    pub fn volume(&self) -> u16 {
-        self.volume
-    }
-
-    pub fn update_time(&self) -> i64 {
-        self.update_time
-    }
-
     pub fn end_of_track(&self) -> bool {
     pub fn end_of_track(&self) -> bool {
         self.end_of_track
         self.end_of_track
     }
     }

+ 67 - 42
src/spirc.rs

@@ -1,11 +1,11 @@
 use eventual::Async;
 use eventual::Async;
 use protobuf::{self, Message, RepeatedField};
 use protobuf::{self, Message, RepeatedField};
-use std::borrow::Cow;
 use std::sync::{Mutex, Arc};
 use std::sync::{Mutex, Arc};
 use std::collections::HashMap;
 use std::collections::HashMap;
 
 
 use mercury::{MercuryRequest, MercuryMethod};
 use mercury::{MercuryRequest, MercuryMethod};
 use player::{Player, PlayerState};
 use player::{Player, PlayerState};
+use mixer::Mixer;
 use session::Session;
 use session::Session;
 use util;
 use util;
 use util::SpotifyId;
 use util::SpotifyId;
@@ -20,6 +20,7 @@ pub struct SpircManager(Arc<Mutex<SpircInternal>>);
 struct SpircInternal {
 struct SpircInternal {
     player: Player,
     player: Player,
     session: Session,
     session: Session,
+    mixer: Box<Mixer + Send>,
 
 
     seq_nr: u32,
     seq_nr: u32,
 
 
@@ -43,14 +44,62 @@ struct SpircInternal {
     devices: HashMap<String, String>,
     devices: HashMap<String, String>,
 }
 }
 
 
+#[derive(Clone)]
+pub struct State {
+    pub status: PlayStatus,
+    pub position_ms: u32,
+    pub position_measured_at: i64,
+    pub update_time: i64,
+    pub volume: u16,
+    pub track: Option<SpotifyId>,
+    pub end_of_track: bool,
+}
+
+impl State {
+    pub fn new() -> State {
+        let state = State {
+            status: PlayStatus::kPlayStatusStop,
+            position_ms: 0,
+            position_measured_at: 0,
+            update_time: 0,
+            volume: 0,
+            track: None,
+            end_of_track: false,
+        };
+        state.update_time()
+    }
+
+    pub fn update_from_player(mut self, player: &Player) -> State {
+        let player_state = player.state();
+        let (position_ms, position_measured_at) = player_state.position();
+        self.status = player_state.status();
+        self.position_ms = position_ms;
+        self.position_measured_at = position_measured_at;
+        self.track = player_state.track;
+        self.end_of_track = player_state.end_of_track();
+        self.update_time()
+    }
+
+    pub fn update_from_mixer(mut self, mixer: &Box<Mixer + Send>) -> State {
+        self.volume = mixer.volume();
+        self.update_time()
+    }
+
+    fn update_time(mut self) -> State {
+        self.update_time = util::now_ms();
+        self
+    }
+}
+
 impl SpircManager {
 impl SpircManager {
-    pub fn new(session: Session, player: Player) -> SpircManager {
+    pub fn new(session: Session, player: Player, mixer: Box<Mixer + Send>) -> SpircManager {
         let ident = session.device_id().to_owned();
         let ident = session.device_id().to_owned();
         let name = session.config().device_name.clone();
         let name = session.config().device_name.clone();
 
 
         SpircManager(Arc::new(Mutex::new(SpircInternal {
         SpircManager(Arc::new(Mutex::new(SpircInternal {
             player: player,
             player: player,
             session: session,
             session: session,
+            mixer: mixer,
 
 
             seq_nr: 0,
             seq_nr: 0,
 
 
@@ -184,7 +233,7 @@ impl SpircInternal {
             let track = self.tracks[self.index as usize];
             let track = self.tracks[self.index as usize];
             self.player.load(track, true, 0);
             self.player.load(track, true, 0);
         } else {
         } else {
-            self.notify_with_player_state(false, None, player_state);
+            self.notify(false, None);
         }
         }
     }
     }
 
 
@@ -226,9 +275,11 @@ impl SpircInternal {
             }
             }
             MessageType::kMessageTypePlay => {
             MessageType::kMessageTypePlay => {
                 self.player.play();
                 self.player.play();
+                self.mixer.start();
             }
             }
             MessageType::kMessageTypePause => {
             MessageType::kMessageTypePause => {
                 self.player.pause();
                 self.player.pause();
+                self.mixer.stop();
             }
             }
             MessageType::kMessageTypeNext => {
             MessageType::kMessageTypeNext => {
                 self.index = (self.index + 1) % self.tracks.len() as u32;
                 self.index = (self.index + 1) % self.tracks.len() as u32;
@@ -250,10 +301,12 @@ impl SpircInternal {
                 if self.is_active && frame.get_device_state().get_is_active() {
                 if self.is_active && frame.get_device_state().get_is_active() {
                     self.is_active = false;
                     self.is_active = false;
                     self.player.stop();
                     self.player.stop();
+                    self.mixer.stop();
                 }
                 }
             }
             }
             MessageType::kMessageTypeVolume => {
             MessageType::kMessageTypeVolume => {
-                self.player.volume(frame.get_volume() as u16);
+                self.mixer.set_volume(frame.get_volume() as u16);
+                self.notify(false, None);
             }
             }
             MessageType::kMessageTypeGoodbye => {
             MessageType::kMessageTypeGoodbye => {
                 if frame.has_ident() {
                 if frame.has_ident() {
@@ -287,30 +340,11 @@ impl SpircInternal {
         cs.send();
         cs.send();
     }
     }
 
 
-    fn notify_with_player_state(&mut self,
-                                hello: bool,
-                                recipient: Option<&str>,
-                                player_state: &PlayerState) {
-        let mut cs = CommandSender::new(self,
-                                        if hello {
-                                            MessageType::kMessageTypeHello
-                                        } else {
-                                            MessageType::kMessageTypeNotify
-                                        })
-                         .player_state(player_state);
-        if let Some(s) = recipient {
-            cs = cs.recipient(&s);
-        }
-        cs.send();
-    }
-
-    fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State {
-        let (position_ms, position_measured_at) = player_state.position();
-
+    fn spirc_state(&self, state: &State) -> protocol::spirc::State {
         protobuf_init!(protocol::spirc::State::new(), {
         protobuf_init!(protocol::spirc::State::new(), {
-            status: player_state.status(),
-            position_ms: position_ms,
-            position_measured_at: position_measured_at as u64,
+            status: state.status,
+            position_ms: state.position_ms,
+            position_measured_at: state.position_measured_at as u64,
 
 
             playing_track_index: self.index,
             playing_track_index: self.index,
             track: self.tracks.iter().map(|track| {
             track: self.tracks.iter().map(|track| {
@@ -329,12 +363,12 @@ impl SpircInternal {
         })
         })
     }
     }
 
 
-    fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState {
+    fn device_state(&self, state: &State) -> protocol::spirc::DeviceState {
         protobuf_init!(protocol::spirc::DeviceState::new(), {
         protobuf_init!(protocol::spirc::DeviceState::new(), {
             sw_version: version::version_string(),
             sw_version: version::version_string(),
             is_active: self.is_active,
             is_active: self.is_active,
             can_play: self.can_play,
             can_play: self.can_play,
-            volume: player_state.volume() as u32,
+            volume: state.volume as u32,
             name: self.name.clone(),
             name: self.name.clone(),
             error_code: 0,
             error_code: 0,
             became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
             became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
@@ -398,7 +432,6 @@ struct CommandSender<'a> {
     spirc_internal: &'a mut SpircInternal,
     spirc_internal: &'a mut SpircInternal,
     cmd: MessageType,
     cmd: MessageType,
     recipient: Option<&'a str>,
     recipient: Option<&'a str>,
-    player_state: Option<&'a PlayerState>,
     state: Option<protocol::spirc::State>,
     state: Option<protocol::spirc::State>,
 }
 }
 
 
@@ -408,7 +441,6 @@ impl<'a> CommandSender<'a> {
             spirc_internal: spirc_internal,
             spirc_internal: spirc_internal,
             cmd: cmd,
             cmd: cmd,
             recipient: None,
             recipient: None,
-            player_state: None,
             state: None,
             state: None,
         }
         }
     }
     }
@@ -418,22 +450,15 @@ impl<'a> CommandSender<'a> {
         self
         self
     }
     }
 
 
-    fn player_state(mut self, s: &'a PlayerState) -> CommandSender {
-        self.player_state = Some(s);
-        self
-    }
-
     fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
     fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
         self.state = Some(s);
         self.state = Some(s);
         self
         self
     }
     }
 
 
     fn send(self) {
     fn send(self) {
-        let state = self.player_state.map_or_else(|| {
-            Cow::Owned(self.spirc_internal.player.state())
-        }, |s| {
-            Cow::Borrowed(s)
-        });
+        let state = State::new()
+                         .update_from_player(&self.spirc_internal.player)
+                         .update_from_mixer(&self.spirc_internal.mixer);
 
 
         let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
         let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
             version: 1,
             version: 1,
@@ -445,7 +470,7 @@ impl<'a> CommandSender<'a> {
                 self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
                 self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
                 ),
                 ),
             device_state: self.spirc_internal.device_state(&state),
             device_state: self.spirc_internal.device_state(&state),
-            state_update_id: state.update_time()
+            state_update_id: state.update_time
         });
         });
 
 
         if self.spirc_internal.is_active {
         if self.spirc_internal.is_active {