Browse Source

rustfmt: connect

awiouy 7 years ago
parent
commit
b7c32e9d6d
4 changed files with 106 additions and 85 deletions
  1. 0 1
      connect/build.rs
  2. 60 45
      connect/src/discovery.rs
  3. 4 2
      connect/src/lib.rs
  4. 42 37
      connect/src/spirc.rs

+ 0 - 1
connect/build.rs

@@ -10,5 +10,4 @@ fn main() {
 
     println!("cargo:rerun-if-changed=src/lib.in.rs");
     println!("cargo:rerun-if-changed=src/spirc.rs");
-
 }

+ 60 - 45
connect/src/discovery.rs

@@ -1,11 +1,11 @@
 use base64;
+use crypto;
 use crypto::digest::Digest;
 use crypto::mac::Mac;
-use crypto;
+use futures::{Future, Poll, Stream};
 use futures::sync::mpsc;
-use futures::{Future, Stream, Poll};
-use hyper::server::{Service, Request, Response, Http};
 use hyper::{self, Get, Post, StatusCode};
+use hyper::server::{Http, Request, Response, Service};
 
 #[cfg(feature = "with-dns-sd")]
 use dns_sd::DNSService;
@@ -21,10 +21,10 @@ use std::sync::Arc;
 use tokio_core::reactor::Handle;
 use url;
 
-use core::diffie_hellman::{DH_GENERATOR, DH_PRIME};
 use core::authentication::Credentials;
-use core::util;
 use core::config::ConnectConfig;
+use core::diffie_hellman::{DH_GENERATOR, DH_PRIME};
+use core::util;
 
 #[derive(Clone)]
 struct Discovery(Arc<DiscoveryInner>);
@@ -37,9 +37,10 @@ struct DiscoveryInner {
 }
 
 impl Discovery {
-    fn new(config: ConnectConfig, device_id: String)
-        -> (Discovery, mpsc::UnboundedReceiver<Credentials>)
-    {
+    fn new(
+        config: ConnectConfig,
+        device_id: String,
+    ) -> (Discovery, mpsc::UnboundedReceiver<Credentials>) {
         let (tx, rx) = mpsc::unbounded();
 
         let key_data = util::rand_vec(&mut rand::thread_rng(), 95);
@@ -59,9 +60,10 @@ impl Discovery {
 }
 
 impl Discovery {
-    fn handle_get_info(&self, _params: &BTreeMap<String, String>)
-        -> ::futures::Finished<Response, hyper::Error>
-    {
+    fn handle_get_info(
+        &self,
+        _params: &BTreeMap<String, String>,
+    ) -> ::futures::Finished<Response, hyper::Error> {
         let public_key = self.0.public_key.to_bytes_be();
         let public_key = base64::encode(&public_key);
 
@@ -85,9 +87,10 @@ impl Discovery {
         ::futures::finished(Response::new().with_body(body))
     }
 
-    fn handle_add_user(&self, params: &BTreeMap<String, String>)
-        -> ::futures::Finished<Response, hyper::Error>
-    {
+    fn handle_add_user(
+        &self,
+        params: &BTreeMap<String, String>,
+    ) -> ::futures::Finished<Response, hyper::Error> {
         let username = params.get("userName").unwrap();
         let encrypted_blob = params.get("blob").unwrap();
         let client_key = params.get("clientKey").unwrap();
@@ -133,8 +136,8 @@ impl Discovery {
 
         let decrypted = {
             let mut data = vec![0u8; encrypted.len()];
-            let mut cipher = crypto::aes::ctr(crypto::aes::KeySize::KeySize128,
-                                              &encryption_key[0..16], iv);
+            let mut cipher =
+                crypto::aes::ctr(crypto::aes::KeySize::KeySize128, &encryption_key[0..16], iv);
             cipher.process(encrypted, &mut data);
             String::from_utf8(data).unwrap()
         };
@@ -153,9 +156,7 @@ impl Discovery {
         ::futures::finished(Response::new().with_body(body))
     }
 
-    fn not_found(&self)
-        -> ::futures::Finished<Response, hyper::Error>
-    {
+    fn not_found(&self) -> ::futures::Finished<Response, hyper::Error> {
         ::futures::finished(Response::new().with_status(StatusCode::NotFound))
     }
 }
@@ -179,19 +180,22 @@ impl Service for Discovery {
         }
 
         let this = self.clone();
-        Box::new(body.fold(Vec::new(), |mut acc, chunk| {
-            acc.extend_from_slice(chunk.as_ref());
-            Ok::<_, hyper::Error>(acc)
-        }).map(move |body| {
-            params.extend(url::form_urlencoded::parse(&body).into_owned());
-            params
-        }).and_then(move |params| {
-            match (method, params.get("action").map(AsRef::as_ref)) {
-                (Get, Some("getInfo")) => this.handle_get_info(&params),
-                (Post, Some("addUser")) => this.handle_add_user(&params),
-                _ => this.not_found(),
-            }
-        }))
+        Box::new(
+            body.fold(Vec::new(), |mut acc, chunk| {
+                acc.extend_from_slice(chunk.as_ref());
+                Ok::<_, hyper::Error>(acc)
+            }).map(move |body| {
+                    params.extend(url::form_urlencoded::parse(&body).into_owned());
+                    params
+                })
+                .and_then(
+                    move |params| match (method, params.get("action").map(AsRef::as_ref)) {
+                        (Get, Some("getInfo")) => this.handle_get_info(&params),
+                        (Post, Some("addUser")) => this.handle_add_user(&params),
+                        _ => this.not_found(),
+                    },
+                ),
+        )
     }
 }
 
@@ -207,22 +211,30 @@ pub struct DiscoveryStream {
     _svc: mdns::Service,
 }
 
-pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String, port: u16)
-    -> io::Result<DiscoveryStream>
-{
+pub fn discovery(
+    handle: &Handle,
+    config: ConnectConfig,
+    device_id: String,
+    port: u16,
+) -> io::Result<DiscoveryStream> {
     let (discovery, creds_rx) = Discovery::new(config.clone(), device_id);
 
     let serve = {
         let http = Http::new();
         debug!("Zeroconf server listening on 0.0.0.0:{}", port);
-        http.serve_addr_handle(&format!("0.0.0.0:{}", port).parse().unwrap(), &handle, move || Ok(discovery.clone())).unwrap()
+        http.serve_addr_handle(
+            &format!("0.0.0.0:{}", port).parse().unwrap(),
+            &handle,
+            move || Ok(discovery.clone()),
+        ).unwrap()
     };
 
     let s_port = serve.incoming_ref().local_addr().port();
 
     let server_future = {
         let handle = handle.clone();
-        serve.for_each(move |connection| {
+        serve
+            .for_each(move |connection| {
                 handle.spawn(connection.then(|_| Ok(())));
                 Ok(())
             })
@@ -231,22 +243,25 @@ pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String, port
     handle.spawn(server_future);
 
     #[cfg(feature = "with-dns-sd")]
-    let svc = DNSService::register(Some(&*config.name),
-       "_spotify-connect._tcp",
-       None,
-       None,
-       s_port,
-       &["VERSION=1.0", "CPath=/"]).unwrap();
+    let svc = DNSService::register(
+        Some(&*config.name),
+        "_spotify-connect._tcp",
+        None,
+        None,
+        s_port,
+        &["VERSION=1.0", "CPath=/"],
+    ).unwrap();
 
     #[cfg(not(feature = "with-dns-sd"))]
     let responder = mdns::Responder::spawn(&handle)?;
-    
+
     #[cfg(not(feature = "with-dns-sd"))]
     let svc = responder.register(
         "_spotify-connect._tcp".to_owned(),
         config.name,
         s_port,
-        &["VERSION=1.0", "CPath=/"]);
+        &["VERSION=1.0", "CPath=/"],
+    );
 
     Ok(DiscoveryStream {
         credentials: creds_rx,

+ 4 - 2
connect/src/lib.rs

@@ -1,5 +1,7 @@
-#[macro_use] extern crate log;
-#[macro_use] extern crate serde_json;
+#[macro_use]
+extern crate log;
+#[macro_use]
+extern crate serde_json;
 
 extern crate base64;
 extern crate crypto;

+ 42 - 37
connect/src/spirc.rs

@@ -1,24 +1,24 @@
+use futures::{Async, Future, Poll, Sink, Stream};
 use futures::future;
-use futures::sync::{oneshot, mpsc};
-use futures::{Future, Stream, Sink, Async, Poll};
+use futures::sync::{mpsc, oneshot};
 use protobuf::{self, Message};
 
 use core::config::ConnectConfig;
 use core::mercury::MercuryError;
 use core::session::Session;
-use core::util::{SpotifyId, SeqGenerator};
+use core::util::{SeqGenerator, SpotifyId};
 use core::version;
 
 use protocol;
-use protocol::spirc::{PlayStatus, State, MessageType, Frame, DeviceState};
+use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
 
 use playback::mixer::Mixer;
 use playback::player::Player;
 
-use std;
 use rand;
 use rand::Rng;
-use std::time::{UNIX_EPOCH, SystemTime};
+use std;
+use std::time::{SystemTime, UNIX_EPOCH};
 
 pub struct SpircTask {
     player: Player,
@@ -47,7 +47,7 @@ pub enum SpircCommand {
     Next,
     VolumeUp,
     VolumeDown,
-    Shutdown
+    Shutdown,
 }
 
 pub struct Spirc {
@@ -152,11 +152,13 @@ fn volume_to_mixer(volume: u16) -> u16 {
     val
 }
 
-
 impl Spirc {
-    pub fn new(config: ConnectConfig, session: Session, player: Player, mixer: Box<Mixer>)
-        -> (Spirc, SpircTask)
-    {
+    pub fn new(
+        config: ConnectConfig,
+        session: Session,
+        player: Player,
+        mixer: Box<Mixer>,
+    ) -> (Spirc, SpircTask) {
         debug!("new Spirc[{}]", session.session_id());
 
         let ident = session.device_id().to_owned();
@@ -164,15 +166,20 @@ impl Spirc {
         let uri = format!("hm://remote/3/user/{}/", session.username());
 
         let subscription = session.mercury().subscribe(&uri as &str);
-        let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream();
+        let subscription = subscription
+            .map(|stream| stream.map_err(|_| MercuryError))
+            .flatten_stream();
         let subscription = Box::new(subscription.map(|response| -> Frame {
             let data = response.payload.first().unwrap();
             protobuf::parse_from_bytes(data).unwrap()
         }));
 
-        let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| {
-            Ok(frame.write_to_bytes().unwrap())
-        }));
+        let sender = Box::new(
+            session
+                .mercury()
+                .sender(uri)
+                .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
+        );
 
         let (cmd_tx, cmd_rx) = mpsc::unbounded();
 
@@ -200,9 +207,7 @@ impl Spirc {
             session: session.clone(),
         };
 
-        let spirc = Spirc {
-            commands: cmd_tx,
-        };
+        let spirc = Spirc { commands: cmd_tx };
 
         task.hello();
 
@@ -268,9 +273,7 @@ impl Future for SpircTask {
                         self.handle_end_of_track();
                     }
                     Ok(Async::NotReady) => (),
-                    Err(oneshot::Canceled) => {
-                        self.end_of_track = Box::new(future::empty())
-                    }
+                    Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
                 }
             }
 
@@ -357,15 +360,18 @@ impl SpircTask {
     }
 
     fn handle_frame(&mut self, frame: Frame) {
-        debug!("{:?} {:?} {} {} {}",
-               frame.get_typ(),
-               frame.get_device_state().get_name(),
-               frame.get_ident(),
-               frame.get_seq_nr(),
-               frame.get_state_update_id());
-
-        if frame.get_ident() == self.ident ||
-           (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
+        debug!(
+            "{:?} {:?} {} {} {}",
+            frame.get_typ(),
+            frame.get_device_state().get_name(),
+            frame.get_ident(),
+            frame.get_seq_nr(),
+            frame.get_state_update_id()
+        );
+
+        if frame.get_ident() == self.ident
+            || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
+        {
             return;
         }
 
@@ -383,7 +389,8 @@ impl SpircTask {
                 self.update_tracks(&frame);
 
                 if self.state.get_track().len() > 0 {
-                    self.state.set_position_ms(frame.get_state().get_position_ms());
+                    self.state
+                        .set_position_ms(frame.get_state().get_position_ms());
                     self.state.set_position_measured_at(now_ms() as u64);
 
                     let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
@@ -437,8 +444,7 @@ impl SpircTask {
 
             MessageType::kMessageTypeShuffle => {
                 self.state.set_shuffle(frame.get_state().get_shuffle());
-                if self.state.get_shuffle()
-                {
+                if self.state.get_shuffle() {
                     let current_index = self.state.get_playing_track_index();
                     {
                         let tracks = self.state.mut_track();
@@ -471,14 +477,13 @@ impl SpircTask {
 
             MessageType::kMessageTypeVolume => {
                 self.device.set_volume(frame.get_volume());
-                self.mixer.set_volume(volume_to_mixer(frame.get_volume() as u16));
+                self.mixer
+                    .set_volume(volume_to_mixer(frame.get_volume() as u16));
                 self.notify(None);
             }
 
             MessageType::kMessageTypeNotify => {
-                if self.device.get_is_active() &&
-                    frame.get_device_state().get_is_active()
-                {
+                if self.device.get_is_active() && frame.get_device_state().get_is_active() {
                     self.device.set_is_active(false);
                     self.state.set_status(PlayStatus::kPlayStatusStop);
                     self.player.stop();