فهرست منبع

Create event loop in main

Paul Lietar 8 سال پیش
والد
کامیت
d27063d5da
9فایلهای تغییر یافته به همراه130 افزوده شده و 133 حذف شده
  1. 2 2
      src/audio_backend/alsa.rs
  2. 4 5
      src/audio_backend/mod.rs
  3. 1 1
      src/audio_backend/pipe.rs
  4. 2 2
      src/audio_backend/portaudio.rs
  5. 1 1
      src/audio_backend/pulseaudio.rs
  6. 20 46
      src/connection/adaptor.rs
  7. 51 26
      src/main.rs
  8. 48 49
      src/session.rs
  9. 1 1
      src/spirc.rs

+ 2 - 2
src/audio_backend/alsa.rs

@@ -5,10 +5,10 @@ use alsa::{PCM, Stream, Mode, Format, Access};
 pub struct AlsaSink(Option<PCM>, String);
 
 impl Open for AlsaSink {
-   fn open(device: Option<&str>) -> AlsaSink {
+   fn open(device: Option<String>) -> AlsaSink {
         info!("Using alsa sink");
 
-        let name = device.unwrap_or("default").to_string();
+        let name = device.unwrap_or("default".to_string());
 
         AlsaSink(None, name)
     }

+ 4 - 5
src/audio_backend/mod.rs

@@ -1,7 +1,7 @@
 use std::io;
 
 pub trait Open {
-    fn open(Option<&str>) -> Self;
+    fn open(Option<String>) -> Self;
 }
 
 pub trait Sink {
@@ -49,8 +49,7 @@ macro_rules! _declare_backends {
     )
 }
 
-#[allow(dead_code)]
-fn mk_sink<S: Sink + Open + 'static>(device: Option<&str>) -> Box<Sink> {
+fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<Sink> {
     Box::new(S::open(device))
 }
 
@@ -75,7 +74,7 @@ use self::pipe::StdoutSink;
 declare_backends! {
     pub const BACKENDS : &'static [
         (&'static str,
-         &'static (Fn(Option<&str>) -> Box<Sink> + Sync + Send + 'static))
+         &'static (Fn(Option<String>) -> Box<Sink> + Sync + Send + 'static))
     ] = &[
         #[cfg(feature = "alsa-backend")]
         ("alsa", &mk_sink::<AlsaSink>),
@@ -87,7 +86,7 @@ declare_backends! {
     ];
 }
 
-pub fn find<T: AsRef<str>>(name: Option<T>) -> Option<&'static (Fn(Option<&str>) -> Box<Sink> + Send + Sync)> {
+pub fn find<T: AsRef<str>>(name: Option<T>) -> Option<&'static (Fn(Option<String>) -> Box<Sink> + Send + Sync)> {
     if let Some(name) = name.as_ref().map(AsRef::as_ref) {
         BACKENDS.iter().find(|backend| name == backend.0).map(|backend| backend.1)
     } else {

+ 1 - 1
src/audio_backend/pipe.rs

@@ -7,7 +7,7 @@ use std::slice;
 pub struct StdoutSink(Box<Write>);
 
 impl Open for StdoutSink {
-    fn open(path: Option<&str>) -> StdoutSink {
+    fn open(path: Option<String>) -> StdoutSink {
         if let Some(path) = path {
             let file = OpenOptions::new().write(true).open(path).unwrap();
             StdoutSink(Box::new(file))

+ 2 - 2
src/audio_backend/portaudio.rs

@@ -39,13 +39,13 @@ fn find_output(device: &str) -> Option<DeviceIndex> {
 }
 
 impl <'a> Open for PortAudioSink<'a> {
-    fn open(device: Option<&str>) -> PortAudioSink<'a> {
+    fn open(device: Option<String>) -> PortAudioSink<'a> {
 
         debug!("Using PortAudio sink");
 
         portaudio::initialize().unwrap();
 
-        let device_idx = match device {
+        let device_idx = match device.as_ref().map(AsRef::as_ref) {
             Some("?") => {
                 list_outputs();
                 exit(0)

+ 1 - 1
src/audio_backend/pulseaudio.rs

@@ -8,7 +8,7 @@ use std::ffi::CString;
 pub struct PulseAudioSink(*mut pa_simple);
 
 impl Open for PulseAudioSink {
-   fn open(device: Option<&str>) -> PulseAudioSink {
+   fn open(device: Option<String>) -> PulseAudioSink {
         debug!("Using PulseAudio sink");
 
         if device.is_some() {

+ 20 - 46
src/connection/adaptor.rs

@@ -6,8 +6,8 @@ use std::thread;
 use tokio_core::reactor::Core;
 use tokio_core::reactor::Handle;
 
-pub struct SinkAdaptor<T>(Option<mpsc::Sender<T>>);
-pub struct StreamAdaptor<T, E>(Option<mpsc::Receiver<Result<T, E>>>);
+pub struct SinkAdaptor<T>(pub Option<mpsc::Sender<T>>);
+pub struct StreamAdaptor<T, E>(pub Option<mpsc::Receiver<Result<T, E>>>);
 
 impl <T> SinkAdaptor<T> {
     pub fn send(&mut self, item: T) {
@@ -30,59 +30,33 @@ impl <T, E> StreamAdaptor<T, E> {
     }
 }
 
-fn adapt_sink<S>(sink: S, rx: mpsc::Receiver<S::SinkItem>) -> BoxFuture<(), ()>
-    where S: Sink + Send + 'static,
-          S::SinkItem: Send,
-          S::SinkError: Send,
-{
-    rx.map_err(|_| -> S::SinkError { panic!("") })
-      .forward(sink)
-      .map(|_| ()).map_err(|_| ())
-      .boxed()
-}
-
-fn adapt_stream<S>(stream: S, tx: mpsc::Sender<Result<S::Item, S::Error>>) -> BoxFuture<(), ()>
-    where S: Stream + Send + 'static,
-          S::Item: Send,
-          S::Error: Send,
-{
-    stream.then(ok::<_, mpsc::SendError<_>>)
-        .forward(tx)
-        .map(|_| ()).map_err(|_| ())
-        .boxed()
-}
-
-pub fn adapt<F, U, S>(f: F) -> (SinkAdaptor<S::SinkItem>, StreamAdaptor<S::Item, S::Error>)
-    where F: FnOnce(Handle) -> U + Send + 'static,
-          U: IntoFuture<Item=S>,
-          S: Sink + Stream + Send + 'static,
+pub fn adapt<S, E>(transport: S) -> (SinkAdaptor<S::SinkItem>,
+                                     StreamAdaptor<S::Item, E>,
+                                     BoxFuture<(), E>)
+    where S: Sink<SinkError=E> + Stream<Error=E> + Send + 'static,
           S::Item: Send + 'static,
-          S::Error: Send + 'static,
           S::SinkItem: Send + 'static,
-          S::SinkError: Send + 'static,
+          E: Send + 'static,
 {
-
     let (receiver_tx, receiver_rx) = mpsc::channel(0);
     let (sender_tx, sender_rx) = mpsc::channel(0);
 
+    let (sink, stream) = transport.split();
 
-    thread::spawn(move || {
-        let mut core = Core::new().unwrap();
-        let handle = core.handle();
-        let task =
-            f(handle).into_future()
-            .map(|connection| connection.split())
-            .map_err(|_| ())
-            .and_then(|(sink, stream)| {
-                (adapt_sink(sink, sender_rx),
-                 adapt_stream(stream, receiver_tx))
-            });
-
-        core.run(task).unwrap();
-    });
+    let receiver_task = stream
+        .then(ok::<_, mpsc::SendError<_>>)
+        .forward(receiver_tx).map(|_| ())
+        .map_err(|e| -> E { panic!(e) });
+
+    let sender_task = sender_rx
+        .map_err(|e| -> E { panic!(e) })
+        .forward(sink).map(|_| ());
+
+    let task = (receiver_task, sender_task).into_future()
+        .map(|((), ())| ()).boxed();
 
     (SinkAdaptor(Some(sender_tx)),
-     StreamAdaptor(Some(receiver_rx)))
+     StreamAdaptor(Some(receiver_rx)), task)
 }
 
 pub fn adapt_future<F, U>(f: F) -> oneshot::Receiver<Result<U::Item, U::Error>>

+ 51 - 26
src/main.rs

@@ -3,6 +3,8 @@ extern crate getopts;
 extern crate librespot;
 extern crate ctrlc;
 extern crate env_logger;
+extern crate futures;
+extern crate tokio_core;
 
 use env_logger::LogBuilder;
 use std::io::{stderr, Write};
@@ -11,10 +13,12 @@ use std::thread;
 use std::env;
 use std::path::PathBuf;
 use std::str::FromStr;
+use futures::Future;
+use tokio_core::reactor::Core;
 
 use librespot::spirc::SpircManager;
-use librespot::authentication::get_credentials;
-use librespot::audio_backend::{self, BACKENDS};
+use librespot::authentication::{get_credentials, Credentials};
+use librespot::audio_backend::{self, Sink, BACKENDS};
 use librespot::cache::{Cache, DefaultCache, NoCache};
 use librespot::player::Player;
 use librespot::session::{Bitrate, Config, Session};
@@ -59,7 +63,15 @@ fn list_backends() {
     }
 }
 
-fn setup(args: &[String]) -> (Session, Player) {
+struct Setup {
+    backend: &'static (Fn(Option<String>) -> Box<Sink> + Send + Sync),
+    cache: Box<Cache + Send + Sync>,
+    config: Config,
+    credentials: Credentials,
+    device: Option<String>,
+}
+
+fn setup(args: &[String]) -> Setup {
     let mut opts = getopts::Options::new();
     opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE")
         .reqopt("n", "name", "Device name", "NAME")
@@ -101,8 +113,8 @@ fn setup(args: &[String]) -> (Session, Player) {
         .map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate"))
         .unwrap_or(Bitrate::Bitrate160);
 
-    let device_name = matches.opt_str("name").unwrap();
-    let device_id = librespot::session::device_id(&device_name);
+    let name = matches.opt_str("name").unwrap();
+    let device_id = librespot::session::device_id(&name);
 
     let cache = matches.opt_str("c").map(|cache_location| {
         Box::new(DefaultCache::new(PathBuf::from(cache_location)).unwrap()) 
@@ -111,46 +123,59 @@ fn setup(args: &[String]) -> (Session, Player) {
 
     let cached_credentials = cache.get_credentials();
 
-    let credentials = get_credentials(&device_name, &device_id,
+    let credentials = get_credentials(&name, &device_id,
                                       matches.opt_str("username"),
                                       matches.opt_str("password"),
                                       cached_credentials);
 
     let config = Config {
         user_agent: version::version_string(),
-        device_name: device_name,
+        name: name,
         device_id: device_id,
         bitrate: bitrate,
         onstart: matches.opt_str("onstart"),
         onstop: matches.opt_str("onstop"),
     };
 
-    let session = Session::new(config, cache);
+    let device = matches.opt_str("device");
 
-    session.login(credentials).unwrap();
-
-    let device_name = matches.opt_str("device");
-    let player = Player::new(session.clone(), move || {
-        (backend)(device_name.as_ref().map(AsRef::as_ref))
-    });
-
-    (session, player)
+    Setup {
+        backend: backend,
+        cache: cache,
+        config: config,
+        credentials: credentials,
+        device: device,
+    }
 }
 
 fn main() {
+    let mut core = Core::new().unwrap();
+    let handle = core.handle();
+
     let args: Vec<String> = std::env::args().collect();
-    let (session, player) = setup(&args);
 
-    let spirc = SpircManager::new(session.clone(), player);
-    let spirc_signal = spirc.clone();
-    thread::spawn(move || spirc.run());
+    let Setup { backend, cache, config, credentials, device } = setup(&args);
 
-    ctrlc::set_handler(move || {
-        spirc_signal.send_goodbye();
-        exit(0);
+    let connection = Session::connect(config, credentials, cache, handle);
+
+    let task = connection.and_then(move |(session, task)| {
+        let player = Player::new(session.clone(), move || {
+            (backend)(device)
+        });
+
+        let spirc = SpircManager::new(session.clone(), player);
+        let spirc_signal = spirc.clone();
+
+        ctrlc::set_handler(move || {
+            spirc_signal.send_goodbye();
+            exit(0);
+        });
+
+        thread::spawn(move || spirc.run());
+        thread::spawn(move || loop { session.poll() });
+
+        task
     });
 
-    loop {
-        session.poll();
-    }
+    core.run(task).unwrap()
 }

+ 48 - 49
src/session.rs

@@ -9,7 +9,7 @@ use std::sync::{Mutex, RwLock, Arc, mpsc};
 use std::str::FromStr;
 use futures::Future as Future_;
 use futures::Stream;
-use futures::sync::oneshot;
+use tokio_core::reactor::Handle;
 
 use album_cover::AlbumCover;
 use apresolve::apresolve_or_fallback;
@@ -45,7 +45,7 @@ impl FromStr for Bitrate {
 
 pub struct Config {
     pub user_agent: String,
-    pub device_name: String,
+    pub name: String,
     pub device_id: String,
     pub bitrate: Bitrate,
     pub onstart: Option<String>,
@@ -66,75 +66,74 @@ pub struct SessionInternal {
     metadata: Mutex<MetadataManager>,
     stream: Mutex<StreamManager>,
     audio_key: Mutex<AudioKeyManager>,
-    rx_connection: Mutex<Option<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>>,
-    tx_connection: Mutex<Option<adaptor::SinkAdaptor<(u8, Vec<u8>)>>>,
+    rx_connection: Mutex<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>,
+    tx_connection: Mutex<adaptor::SinkAdaptor<(u8, Vec<u8>)>>,
 }
 
 #[derive(Clone)]
 pub struct Session(pub Arc<SessionInternal>);
 
-pub fn device_id(device_name: &str) -> String {
+pub fn device_id(name: &str) -> String {
     let mut h = Sha1::new();
-    h.input_str(&device_name);
+    h.input_str(&name);
     h.result_str()
 }
 
 impl Session {
-    pub fn new(config: Config, cache: Box<Cache + Send + Sync>) -> Session {
-        Session(Arc::new(SessionInternal {
+    pub fn connect(config: Config, credentials: Credentials,
+                   cache: Box<Cache + Send + Sync>, handle: Handle)
+        -> Box<Future_<Item=(Session, Box<Future_<Item=(), Error=io::Error>>), Error=io::Error>>
+    {
+        let access_point = apresolve_or_fallback::<io::Error>(&handle);
+
+        let connection = access_point.and_then(move |addr| {
+            info!("Connecting to AP \"{}\"", addr);
+            connection::connect::<&str>(&addr, &handle)
+        });
+
+        let device_id = config.device_id.clone();
+        let authentication = connection.and_then(move |connection| {
+            connection::authenticate(connection, credentials, device_id)
+        });
+
+        let result = authentication.map(move |(transport, reusable_credentials)| {
+            info!("Authenticated !");
+            cache.put_credentials(&reusable_credentials);
+
+            let (session, task) = Session::create(transport, config, cache, reusable_credentials.username.clone());
+            (session, task)
+        });
+        
+        Box::new(result)
+    }
+
+    fn create(transport: connection::Transport, config: Config,
+              cache: Box<Cache + Send + Sync>, username: String) -> (Session, Box<Future_<Item=(), Error=io::Error>>)
+    {
+        let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned()));
+        let (tx, rx, task) = adaptor::adapt(transport);
+
+        let session = Session(Arc::new(SessionInternal {
             config: config,
             data: RwLock::new(SessionData {
                 country: String::new(),
-                canonical_username: String::new(),
+                canonical_username: username,
             }),
 
-            rx_connection: Mutex::new(None),
-            tx_connection: Mutex::new(None),
+            rx_connection: Mutex::new(rx),
+            tx_connection: Mutex::new(tx),
 
             cache: cache,
             mercury: Mutex::new(MercuryManager::new()),
             metadata: Mutex::new(MetadataManager::new()),
             stream: Mutex::new(StreamManager::new()),
             audio_key: Mutex::new(AudioKeyManager::new()),
-        }))
-    }
-
-    pub fn login(&self, credentials: Credentials) -> Result<Credentials, ()> {
-        let device_id = self.device_id().to_owned();
-
-        let (creds_tx, creds_rx) = oneshot::channel();
+        }));
 
-        let (tx, rx) = adaptor::adapt(move |handle| {
-            let access_point = apresolve_or_fallback::<io::Error>(&handle);
-
-            let connection = access_point.and_then(move |addr| {
-                info!("Connecting to AP \"{}\"", addr);
-                connection::connect::<&str>(&addr, &handle)
-            });
-
-            let authentication = connection.and_then(move |connection| {
-                connection::authenticate(connection, credentials, device_id)
-            });
-
-            authentication.map(|(transport, creds)| {
-                creds_tx.complete(creds);
-                transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned()))
-            })
-        });
-
-        let reusable_credentials: Credentials = creds_rx.wait().unwrap();
-
-        self.0.data.write().unwrap().canonical_username = reusable_credentials.username.clone();
-        *self.0.rx_connection.lock().unwrap() = Some(rx);
-        *self.0.tx_connection.lock().unwrap() = Some(tx);
-
-        info!("Authenticated !");
-
-        self.0.cache.put_credentials(&reusable_credentials);
-
-        Ok(reusable_credentials)
+        (session, task)
     }
 
+
     pub fn poll(&self) {
         let (cmd, data) = self.recv();
 
@@ -152,11 +151,11 @@ impl Session {
     }
 
     pub fn recv(&self) -> (u8, Vec<u8>) {
-        self.0.rx_connection.lock().unwrap().as_mut().unwrap().recv().unwrap()
+        self.0.rx_connection.lock().unwrap().recv().unwrap()
     }
 
     pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
-        self.0.tx_connection.lock().unwrap().as_mut().unwrap().send((cmd, data))
+        self.0.tx_connection.lock().unwrap().send((cmd, data))
     }
 
     pub fn audio_key(&self, track: SpotifyId, file_id: FileId) -> Future<AudioKey, AudioKeyError> {

+ 1 - 1
src/spirc.rs

@@ -46,7 +46,7 @@ struct SpircInternal {
 impl SpircManager {
     pub fn new(session: Session, player: Player) -> SpircManager {
         let ident = session.device_id().to_owned();
-        let name = session.config().device_name.clone();
+        let name = session.config().name.clone();
 
         SpircManager(Arc::new(Mutex::new(SpircInternal {
             player: player,