Browse Source

Remove busy waiting in SpircManager.

Paul Lietar 9 years ago
parent
commit
94503e351b
3 changed files with 109 additions and 76 deletions
  1. 1 1
      src/lib.rs
  2. 84 47
      src/player.rs
  3. 24 28
      src/spirc.rs

+ 1 - 1
src/lib.rs

@@ -1,6 +1,6 @@
 #![crate_name = "librespot"]
 
-#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)]
+#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future,mpsc_select)]
 #![allow(deprecated)]
 //#![allow(unused_imports,dead_code)]
 

+ 84 - 47
src/player.rs

@@ -76,28 +76,25 @@ impl <'s> PlayerInternal<'s> {
         portaudio::initialize().unwrap();
 
         let stream = portaudio::stream::Stream::<i16>::open_default(
-                0,
-                2,
-                44100.0,
+                0, 2, 44100.0,
                 portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED,
                 None
-                ).unwrap();
+        ).unwrap();
 
         let mut decoder = None;
 
         loop {
             match self.commands.try_recv() {
                 Ok(PlayerCommand::Load(id, play, position)) => {
-                    println!("Load");
-                    let mut h = self.state.0.lock().unwrap();
-                    if h.status == PlayStatus::kPlayStatusPlay {
-                        stream.stop().unwrap();
-                    }
-                    h.status = PlayStatus::kPlayStatusLoading;
-                    h.position_ms = position;
-                    h.position_measured_at = util::now_ms();
-                    h.update_time = util::now_ms();
-                    drop(h);
+                    self.update(|state| {
+                        if state.status == PlayStatus::kPlayStatusPlay {
+                            stream.stop().unwrap();
+                        }
+                        state.status = PlayStatus::kPlayStatusLoading;
+                        state.position_ms = position;
+                        state.position_measured_at = util::now_ms();
+                        return true;
+                    });
 
                     let track : TrackRef = self.session.metadata(id);
                     let file_id = *track.wait().unwrap().files.first().unwrap();
@@ -109,48 +106,54 @@ impl <'s> PlayerInternal<'s> {
                         self.session.audio_file(file_id)), 0xa7)).unwrap());
                     decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap();
 
-                    let mut h = self.state.0.lock().unwrap();
-                    h.status = if play {
-                        stream.start().unwrap();
-                        PlayStatus::kPlayStatusPlay
-                    } else {
-                        PlayStatus::kPlayStatusPause
-                    };
-                    h.position_ms = position;
-                    h.position_measured_at = util::now_ms();
-                    h.update_time = util::now_ms();
+                    self.update(|state| {
+                        state.status = if play {
+                            stream.start().unwrap();
+                            PlayStatus::kPlayStatusPlay
+                        } else {
+                            PlayStatus::kPlayStatusPause
+                        };
+                        state.position_ms = position;
+                        state.position_measured_at = util::now_ms();
+
+                        return true;
+                    });
                     println!("Load Done");
                 }
                 Ok(PlayerCommand::Seek(ms)) => {
-                    let mut h = self.state.0.lock().unwrap();
                     decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap();
-                    h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
-                    h.position_measured_at = util::now_ms();
-                    h.update_time = util::now_ms();
+                    self.update(|state| {
+                        state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
+                        state.position_measured_at = util::now_ms();
+                        return true;
+                    });
                 },
                 Ok(PlayerCommand::Play) => {
-                    println!("Play");
-                    let mut h = self.state.0.lock().unwrap();
-                    h.status = PlayStatus::kPlayStatusPlay;
-                    h.update_time = util::now_ms();
+                    self.update(|state| {
+                        state.status = PlayStatus::kPlayStatusPlay;
+                        return true;
+                    });
 
                     stream.start().unwrap();
                 },
                 Ok(PlayerCommand::Pause) => {
-                    let mut h = self.state.0.lock().unwrap();
-                    h.status = PlayStatus::kPlayStatusPause;
-                    h.update_time = util::now_ms();
+                    self.update(|state| {
+                        state.status = PlayStatus::kPlayStatusPause;
+                        state.update_time = util::now_ms();
+                        return true;
+                    });
 
                     stream.stop().unwrap();
                 },
                 Ok(PlayerCommand::Stop) => {
-                    let mut h = self.state.0.lock().unwrap();
-                    if h.status == PlayStatus::kPlayStatusPlay {
-                        stream.stop().unwrap();
-                    }
+                    self.update(|state| {
+                        if state.status == PlayStatus::kPlayStatusPlay {
+                            state.status = PlayStatus::kPlayStatusPause;
+                        }
+                        return true;
+                    });
 
-                    h.status = PlayStatus::kPlayStatusPause;
-                    h.update_time = util::now_ms();
+                    stream.stop().unwrap();
                     decoder = None;
                 },
                 Err(..) => (),
@@ -170,9 +173,17 @@ impl <'s> PlayerInternal<'s> {
                     Err(e) => panic!("Vorbis error {:?}", e)
                 }
 
-                let mut h = self.state.0.lock().unwrap();
-                h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
-                h.position_measured_at = util::now_ms();
+                self.update(|state| {
+                    let now = util::now_ms();
+
+                    if now - state.position_measured_at > 5000 {
+                        state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
+                        state.position_measured_at = now;
+                        return true;
+                    } else {
+                        return false;
+                    }
+                });
             }
         }
 
@@ -180,6 +191,17 @@ impl <'s> PlayerInternal<'s> {
 
         portaudio::terminate().unwrap();
     }
+
+    fn update<F>(&self, f: F)
+        where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool {
+        let mut guard = self.state.0.lock().unwrap();
+        let update = f(&mut guard);
+        if update {
+            guard.update_time = util::now_ms();
+            self.state.1.notify_all();
+
+        }
+    }
 }
 
 impl <'s> SpircDelegate for Player<'s> {
@@ -210,9 +232,24 @@ impl <'s> SpircDelegate for Player<'s> {
         self.state.0.lock().unwrap()
     }
 
-    fn wait_update<'a>(&'a self, guard: MutexGuard<'a, Self::State>)
-        -> MutexGuard<'a, Self::State> {
-        self.state.1.wait(guard).unwrap()
+    fn updates(&self) -> mpsc::Receiver<i64> {
+        let state = self.state.clone();
+        let (update_tx, update_rx) = mpsc::channel();
+
+        thread::spawn(move || {
+            let mut guard = state.0.lock().unwrap();
+            let mut last_update;
+            loop {
+                last_update = guard.update_time;
+                update_tx.send(guard.update_time).unwrap();
+
+                while last_update >= guard.update_time {
+                    guard = state.1.wait(guard).unwrap();
+                }
+            }
+        });
+
+        return update_rx;
     }
 }
 

+ 24 - 28
src/spirc.rs

@@ -1,11 +1,11 @@
 use protobuf::{self, Message};
+use std::sync::{mpsc, MutexGuard};
 
 use util;
 use session::Session;
 use util::SpotifyId;
 use util::version::version_string;
 use mercury::{MercuryRequest, MercuryMethod};
-use std::sync::MutexGuard;
 
 use librespot_protocol as protocol;
 pub use librespot_protocol::spirc::PlayStatus;
@@ -47,8 +47,7 @@ pub trait SpircDelegate {
     fn stop(&self);
 
     fn state(&self) -> MutexGuard<Self::State>;
-    fn wait_update<'a>(&'a self, guard: MutexGuard<'a, Self::State>)
-        -> MutexGuard<'a, Self::State>;
+    fn updates(&self) -> mpsc::Receiver<i64>;
 }
 
 pub trait SpircState {
@@ -89,33 +88,30 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> {
 
     pub fn run(&mut self) {
         let rx = self.session.mercury_sub(format!("hm://remote/user/{}/v23", self.username));
-
-        self.notify(None);
+        let updates = self.delegate.updates();
 
         loop {
-            if let Ok(pkt) = rx.try_recv() {
-                let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
-                    pkt.payload.front().unwrap()).unwrap();
-                println!("{:?} {} {} {} {}",
-                         frame.get_typ(),
-                         frame.get_device_state().get_name(),
-                         frame.get_ident(),
-                         frame.get_seq_nr(),
-                         frame.get_state_update_id());
-                if frame.get_ident() != self.ident &&
-                    (frame.get_recipient().len() == 0 ||
-                     frame.get_recipient().contains(&self.ident)) {
-                    self.handle(frame);
+            select! {
+                pkt = rx.recv() => {
+                    let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
+                        pkt.unwrap().payload.front().unwrap()).unwrap();
+                    println!("{:?} {} {} {} {}",
+                             frame.get_typ(),
+                             frame.get_device_state().get_name(),
+                             frame.get_ident(),
+                             frame.get_seq_nr(),
+                             frame.get_state_update_id());
+                    if frame.get_ident() != self.ident &&
+                        (frame.get_recipient().len() == 0 ||
+                         frame.get_recipient().contains(&self.ident)) {
+                            self.handle(frame);
+                        }
+                },
+                update_time = updates.recv() => {
+                    self.state_update_id = update_time.unwrap();
+                    self.notify(None);
                 }
             }
-
-            if {
-                let state = self.delegate.state();
-                state.update_time() > self.state_update_id
-            } {
-                self.state_update_id = util::now_ms();
-                self.notify(None);
-            }
         }
     }
 
@@ -170,7 +166,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> {
             recipient: protobuf::RepeatedField::from_vec(
                 recipient.map(|r| vec![r.to_string()] ).unwrap_or(vec![])
             ),
-            state_update_id: self.state_update_id
+            state_update_id: self.state_update_id as i64
         });
 
         if self.is_active {
@@ -219,7 +215,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> {
             volume: self.volume as u32,
             name: self.name.clone(),
             error_code: 0,
-            became_active_at: if self.is_active { self.became_active_at } else { 0 },
+            became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
             capabilities => [
                 @{
                     typ: protocol::spirc::CapabilityType::kCanBePlayer,