Ver código fonte

add audio seeking support

Paul Lietar 9 anos atrás
pai
commit
4835d25370
7 arquivos alterados com 206 adições e 123 exclusões
  1. 22 12
      src/audio_decrypt.rs
  2. 93 56
      src/audio_file.rs
  3. 1 2
      src/connection.rs
  4. 9 12
      src/player.rs
  5. 39 32
      src/stream.rs
  6. 2 9
      src/util/mod.rs
  7. 40 0
      src/util/subfile.rs

+ 22 - 12
src/audio_decrypt.rs

@@ -1,7 +1,9 @@
 use crypto::aes;
 use crypto::symmetriccipher::SynchronousStreamCipher;
-use readall::ReadAllExt;
 use std::io;
+use std::ops::Add;
+use num::FromPrimitive;
+use gmp::Mpz;
 
 use audio_key::AudioKey;
 
@@ -16,16 +18,10 @@ pub struct AudioDecrypt<T : io::Read> {
 }
 
 impl <T : io::Read> AudioDecrypt<T> {
-    pub fn new(key: AudioKey, mut reader: T) -> AudioDecrypt<T> {
-        let mut cipher = aes::ctr(aes::KeySize::KeySize128,
+    pub fn new(key: AudioKey, reader: T) -> AudioDecrypt<T> {
+        let cipher = aes::ctr(aes::KeySize::KeySize128,
                               &key,
                               AUDIO_AESIV);
-
-        let mut buf = [0; 0xa7];
-        let mut buf2 = [0; 0xa7];
-        reader.read_all(&mut buf).unwrap();
-        cipher.process(&buf, &mut buf2);
-
         AudioDecrypt {
             cipher: cipher,
             key: key,
@@ -45,9 +41,23 @@ impl <T : io::Read> io::Read for AudioDecrypt<T> {
     }
 }
 
-impl <T : io::Read> io::Seek for AudioDecrypt<T> {
-    fn seek(&mut self, _pos: io::SeekFrom) -> io::Result<u64> {
-        Err(io::Error::new(io::ErrorKind::Other, "Cannot seek"))
+impl <T : io::Read + io::Seek> io::Seek for AudioDecrypt<T> {
+    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
+        let newpos = try!(self.reader.seek(pos));
+        let skip = newpos % 16;
+
+        let iv = Mpz::from_bytes_be(AUDIO_AESIV)
+                    .add(Mpz::from_u64(newpos / 16).unwrap())
+                    .to_bytes_be();
+        self.cipher = aes::ctr(aes::KeySize::KeySize128,
+                               &self.key,
+                               &iv);
+
+        let buf = vec![0u8; skip as usize];
+        let mut buf2 = vec![0u8; skip as usize];
+        self.cipher.process(&buf, &mut buf2);
+
+        Ok(newpos as u64)
     }
 }
 

+ 93 - 56
src/audio_file.rs

@@ -1,23 +1,24 @@
 use byteorder::{ByteOrder, BigEndian};
 use std::cmp::min;
 use std::collections::BitSet;
-use std::io;
+use std::io::{self, SeekFrom};
 use std::slice::bytes::copy_memory;
-use std::sync::{mpsc, Arc, Condvar, Mutex};
+use std::sync::{Arc, Condvar, Mutex};
+use std::sync::mpsc::{self, TryRecvError};
 
 use stream::{StreamRequest, StreamEvent};
 use util::FileId;
+use std::thread;
 
-const CHUNK_SIZE: usize = 0x40000;
-#[derive(Clone)]
-pub struct AudioFileRef(Arc<AudioFile>);
+const CHUNK_SIZE : usize = 0x40000;
 
-struct AudioFile {
+#[derive(Clone)]
+pub struct AudioFile {
     file: FileId,
     size: usize,
+    seek: mpsc::Sender<u64>,
 
-    data: Mutex<AudioFileData>,
-    cond: Condvar
+    data: Arc<(Mutex<AudioFileData>, Condvar)>,
 }
 
 struct AudioFileData {
@@ -25,8 +26,8 @@ struct AudioFileData {
     bitmap: BitSet,
 }
 
-impl AudioFileRef {
-    pub fn new(file: FileId, streams: mpsc::Sender<StreamRequest>) -> AudioFileRef {
+impl AudioFile {
+    pub fn new(file: FileId, streams: mpsc::Sender<StreamRequest>) -> AudioFile {
         let (tx, rx) = mpsc::channel();
 
         streams.send(StreamRequest {
@@ -51,70 +52,99 @@ impl AudioFileRef {
             }
             size.unwrap() as usize
         };
+
+        let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE); 
+        let (tx, rx) = mpsc::channel();
         
-        AudioFileRef(Arc::new(AudioFile {
+        let ret = AudioFile {
             file: file,
             size: size,
+            seek: tx,
 
-            data: Mutex::new(AudioFileData {
-                buffer: vec![0u8; size + (CHUNK_SIZE - size % CHUNK_SIZE)],
-                bitmap: BitSet::with_capacity(size / CHUNK_SIZE)
-            }),
-            cond: Condvar::new(),
-        }))
-    }
-    
-    pub fn fetch(&self, streams: mpsc::Sender<StreamRequest>) {
-        let &AudioFileRef(ref inner) = self;
+            data: Arc::new((Mutex::new(AudioFileData {
+                buffer: vec![0u8; bufsize],
+                bitmap: BitSet::with_capacity(bufsize / CHUNK_SIZE as usize)
+            }), Condvar::new())),
+        };
 
-        let mut index : usize = 0;
+        let f = ret.clone();
 
-        while index * CHUNK_SIZE < inner.size {
-            let (tx, rx) = mpsc::channel();
+        thread::spawn( move || { f.fetch(streams, rx); });
 
-            streams.send(StreamRequest {
-                id: inner.file,
-                offset: (index * CHUNK_SIZE / 4) as u32,
-                size: (CHUNK_SIZE / 4) as u32,
-                callback: tx
-            }).unwrap();
+        ret
+    }
+    
+    fn fetch_chunk(&self, streams: &mpsc::Sender<StreamRequest>, index: usize) {
+        let (tx, rx) = mpsc::channel();
+        streams.send(StreamRequest {
+            id: self.file,
+            offset: (index * CHUNK_SIZE / 4) as u32,
+            size: (CHUNK_SIZE / 4) as u32,
+            callback: tx
+        }).unwrap();
 
-            let mut offset = 0;
-            for event in rx.iter() {
-                match event {
-                    StreamEvent::Header(_,_) => (),
-                    StreamEvent::Data(data) => {
-                        let mut handle = inner.data.lock().unwrap();
-                        copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset..]);
-                        offset += data.len();
-
-                        if offset >= CHUNK_SIZE {
-                            break
-                        }
+        let mut offset = 0usize;
+        for event in rx.iter() {
+            match event {
+                StreamEvent::Header(_,_) => (),
+                StreamEvent::Data(data) => {
+                    let mut handle = self.data.0.lock().unwrap();
+                    copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset ..]);
+                    offset += data.len();
+
+                    if offset >= CHUNK_SIZE {
+                        break
                     }
                 }
             }
-            
+        }
+        
+        {
+            let mut handle = self.data.0.lock().unwrap();
+            handle.bitmap.insert(index as usize);
+            self.data.1.notify_all();
+        }
+    }
+
+    fn fetch(&self, streams: mpsc::Sender<StreamRequest>, seek: mpsc::Receiver<u64>) {
+        let mut index = 0;
+        loop {
+            index = if index * CHUNK_SIZE < self.size {
+                match seek.try_recv() {
+                    Ok(position) => position as usize / CHUNK_SIZE,
+                    Err(TryRecvError::Empty) => index,
+                    Err(TryRecvError::Disconnected) => break
+                }
+            } else {
+                match seek.recv() {
+                    Ok(position) => position as usize / CHUNK_SIZE,
+                    Err(_) => break
+                }
+            };
+
             {
-                let mut handle = inner.data.lock().unwrap();
-                handle.bitmap.insert(index);
-                inner.cond.notify_all();
+                let handle = self.data.0.lock().unwrap();
+                while handle.bitmap.contains(&index) && index * CHUNK_SIZE < self.size {
+                    index += 1;
+                }
             }
 
-            index += 1;
+            if index * CHUNK_SIZE < self.size {
+                self.fetch_chunk(&streams, index) 
+            }
         }
     }
 }
 
 pub struct AudioFileReader {
-    file: AudioFileRef,
-    position: usize
+    file: AudioFile,
+    position: usize,
 }
 
 impl AudioFileReader {
-    pub fn new(file: &AudioFileRef) -> AudioFileReader {
+    pub fn new(file: AudioFile) -> AudioFileReader {
         AudioFileReader {
-            file: file.clone(),
+            file: file,
             position: 0
         }
     }
@@ -126,11 +156,10 @@ impl io::Read for AudioFileReader {
         let offset = self.position % CHUNK_SIZE;
         let len = min(output.len(), CHUNK_SIZE-offset);
 
-        let &AudioFileRef(ref inner) = &self.file;
-        let mut handle = inner.data.lock().unwrap();
+        let mut handle = self.file.data.0.lock().unwrap();
 
         while !handle.bitmap.contains(&index) {
-            handle = inner.cond.wait(handle).unwrap();
+            handle = self.file.data.1.wait(handle).unwrap();
         }
 
         copy_memory(&handle.buffer[self.position..self.position+len], output);
@@ -141,8 +170,16 @@ impl io::Read for AudioFileReader {
 }
 
 impl io::Seek for AudioFileReader {
-    fn seek(&mut self, _pos: io::SeekFrom) -> io::Result<u64> {
-        Err(io::Error::new(io::ErrorKind::Other, "Cannot seek"))
+    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
+        let newpos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => self.file.size as i64 + offset,
+            SeekFrom::Current(offset) => self.position as i64 + offset,
+        };
+
+        self.position = min(newpos as usize, self.file.size);
+        self.file.seek.send(self.position as u64).unwrap();
+        Ok(self.position as u64)
     }
 }
 

+ 1 - 2
src/connection.rs

@@ -9,7 +9,6 @@ use std::result;
 use std::sync::mpsc;
 
 use keys::SharedKeys;
-use util;
 
 #[derive(Debug)]
 pub enum Error {
@@ -69,7 +68,7 @@ impl PlainConnection {
 
     pub fn recv_packet(&mut self) -> Result<Vec<u8>> {
         let size = try!(self.stream.read_u32::<BigEndian>()) as usize;
-        let mut buffer = util::alloc_buffer(size);
+        let mut buffer = vec![0u8; size];
 
         BigEndian::write_u32(&mut buffer, size as u32);
         try!(self.stream.read_all(&mut buffer[4..]));

+ 9 - 12
src/player.rs

@@ -1,13 +1,13 @@
 use portaudio;
 use std::sync::mpsc;
-use std::thread;
 use vorbis;
 
 use audio_key::{AudioKeyRequest, AudioKeyResponse};
 use metadata::TrackRef;
 use session::Session;
-use audio_file::{AudioFileRef, AudioFileReader};
+use audio_file::{AudioFile, AudioFileReader};
 use audio_decrypt::AudioDecrypt;
+use util::Subfile;
 
 pub struct Player;
 
@@ -28,14 +28,13 @@ impl Player {
             key
         };
 
-        let reader = {
-            let file = AudioFileRef::new(file_id, session.stream.clone());
-            let f = file.clone();
-            let s = session.stream.clone();
-            thread::spawn( move || { f.fetch(s) });
-            AudioDecrypt::new(key, AudioFileReader::new(&file))
-        };
-
+        let mut decoder = 
+            vorbis::Decoder::new(
+                Subfile::new(
+                        AudioDecrypt::new(key,
+                            AudioFileReader::new(
+                                AudioFile::new(file_id, session.stream.clone()))), 0xa7)).unwrap();
+        //decoder.time_seek(60f64).unwrap();
 
         portaudio::initialize().unwrap();
 
@@ -48,8 +47,6 @@ impl Player {
                 ).unwrap();
         stream.start().unwrap();
 
-        let mut decoder = vorbis::Decoder::new(reader).unwrap();
-
         for pkt in decoder.packets() {
             match pkt {
                 Ok(packet) => {

+ 39 - 32
src/stream.rs

@@ -92,44 +92,51 @@ impl StreamManager {
         let mut packet = Cursor::new(&data as &[u8]);
 
         let id : ChannelId = packet.read_u16::<BigEndian>().unwrap();
-        let channel = match self.channels.get_mut(&id) {
-            Some(ch) => ch,
-            None => { return; }
-        };
-
-        match channel.mode {
-            ChannelMode::Header => {
-                let mut length = 0;
-
-                while packet.position() < data.len() as u64 {
-                    length = packet.read_u16::<BigEndian>().unwrap();
-                    if length > 0 {
-                        let header_id = packet.read_u8().unwrap();
-                        channel.callback.send(StreamEvent::Header(
-                                header_id,
-                                data.clone()
-                                    .offset(packet.position() as usize)
-                                    .limit(length as usize - 1)
-                            )).unwrap();
-
-                        packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap();
+        let mut close = false;
+        {
+            let channel = match self.channels.get_mut(&id) {
+                Some(ch) => ch,
+                None => { return; }
+            };
+
+            match channel.mode {
+                ChannelMode::Header => {
+                    let mut length = 0;
+
+                    while packet.position() < data.len() as u64 {
+                        length = packet.read_u16::<BigEndian>().unwrap();
+                        if length > 0 {
+                            let header_id = packet.read_u8().unwrap();
+                            channel.callback.send(StreamEvent::Header(
+                                    header_id,
+                                    data.clone()
+                                        .offset(packet.position() as usize)
+                                        .limit(length as usize - 1)
+                                )).unwrap();
+
+                            packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap();
+                        }
+                    }
+                    
+                    if length == 0 {
+                        channel.mode = ChannelMode::Data;
                     }
                 }
-                
-                if length == 0 {
-                    channel.mode = ChannelMode::Data;
-                }
-            }
 
-            ChannelMode::Data => {
-                if packet.position() < data.len() as u64 {
-                    channel.callback.send(StreamEvent::Data(
-                            data.clone().offset(packet.position() as usize))).unwrap();
-                } else {
-                    // TODO: close the channel
+                ChannelMode::Data => {
+                    if packet.position() < data.len() as u64 {
+                        channel.callback.send(StreamEvent::Data(
+                                data.clone().offset(packet.position() as usize))).unwrap();
+                    } else {
+                        close = true;
+                    }
                 }
             }
         }
+
+        if close {
+            self.channels.remove(&id);
+        }
     }
 }
 

+ 2 - 9
src/util/mod.rs

@@ -3,10 +3,12 @@ use rand::{Rng,Rand};
 mod int128;
 mod spotify_id;
 mod arcvec;
+mod subfile;
 
 pub use util::int128::u128;
 pub use util::spotify_id::{SpotifyId, FileId};
 pub use util::arcvec::ArcVec;
+pub use util::subfile::Subfile;
 
 #[macro_export]
 macro_rules! eprintln(
@@ -37,15 +39,6 @@ pub fn rand_vec<G: Rng, R: Rand>(rng: &mut G, size: usize) -> Vec<R> {
     return vec
 }
 
-pub fn alloc_buffer(size: usize) -> Vec<u8> {
-    let mut vec = Vec::with_capacity(size);
-    unsafe {
-        vec.set_len(size);
-    }
-
-    vec
-}
-
 pub mod version {
     include!(concat!(env!("OUT_DIR"), "/version.rs"));
 

+ 40 - 0
src/util/subfile.rs

@@ -0,0 +1,40 @@
+use std::io::{Read, Seek, SeekFrom, Result};
+
+pub struct Subfile<T: Read + Seek> {
+    stream: T,
+    offset: u64
+}
+
+impl <T: Read + Seek> Subfile<T> {
+    pub fn new(mut stream: T, offset: u64) -> Subfile<T> {
+        stream.seek(SeekFrom::Start(offset)).unwrap();
+        Subfile {
+            stream: stream,
+            offset: offset
+        }
+    }
+}
+
+impl <T: Read + Seek> Read for Subfile<T> {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        self.stream.read(buf)
+    }
+}
+
+impl <T: Read + Seek> Seek for Subfile<T> {
+    fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
+        pos = match pos {
+            SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
+            x => x
+        };
+
+        let newpos = try!(self.stream.seek(pos));
+
+        if newpos > self.offset {
+            return Ok(newpos - self.offset)
+        } else {
+            return Ok(0)
+        }
+    }
+}
+