|
@@ -1,7 +1,8 @@
|
|
|
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
|
|
|
use std::collections::HashMap;
|
|
|
+use std::collections::hash_map::Entry;
|
|
|
use std::io::{Cursor, Seek, SeekFrom, Write};
|
|
|
-use std::sync::mpsc;
|
|
|
+use eventual::{self, Async};
|
|
|
|
|
|
use util::{ArcVec, FileId};
|
|
|
use connection::PacketHandler;
|
|
@@ -13,7 +14,10 @@ pub enum StreamEvent {
|
|
|
Data(ArcVec<u8>),
|
|
|
}
|
|
|
|
|
|
-type ChannelId = u16;
|
|
|
+#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)]
|
|
|
+pub struct StreamError;
|
|
|
+
|
|
|
+pub type ChannelId = u16;
|
|
|
|
|
|
enum ChannelMode {
|
|
|
Header,
|
|
@@ -22,7 +26,7 @@ enum ChannelMode {
|
|
|
|
|
|
struct Channel {
|
|
|
mode: ChannelMode,
|
|
|
- callback: mpsc::Sender<StreamEvent>,
|
|
|
+ callback: Option<eventual::Sender<StreamEvent, StreamError>>,
|
|
|
}
|
|
|
|
|
|
pub struct StreamManager {
|
|
@@ -38,17 +42,29 @@ impl StreamManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ pub fn allocate_stream(&mut self) -> (ChannelId, eventual::Stream<StreamEvent, StreamError>) {
|
|
|
+ let (tx, rx) = eventual::Stream::pair();
|
|
|
+
|
|
|
+ let channel_id = self.next_id;
|
|
|
+ self.next_id += 1;
|
|
|
+
|
|
|
+ self.channels.insert(channel_id,
|
|
|
+ Channel {
|
|
|
+ mode: ChannelMode::Header,
|
|
|
+ callback: Some(tx),
|
|
|
+ });
|
|
|
+
|
|
|
+ (channel_id, rx)
|
|
|
+ }
|
|
|
+
|
|
|
pub fn request(&mut self,
|
|
|
session: &Session,
|
|
|
file: FileId,
|
|
|
offset: u32,
|
|
|
size: u32)
|
|
|
- -> mpsc::Receiver<StreamEvent> {
|
|
|
+ -> eventual::Stream<StreamEvent, StreamError> {
|
|
|
|
|
|
- let (tx, rx) = mpsc::channel();
|
|
|
-
|
|
|
- let channel_id = self.next_id;
|
|
|
- self.next_id += 1;
|
|
|
+ let (channel_id, rx) = self.allocate_stream();
|
|
|
|
|
|
let mut data: Vec<u8> = Vec::new();
|
|
|
data.write_u16::<BigEndian>(channel_id).unwrap();
|
|
@@ -64,76 +80,69 @@ impl StreamManager {
|
|
|
|
|
|
session.send_packet(0x8, &data).unwrap();
|
|
|
|
|
|
- self.channels.insert(channel_id,
|
|
|
- Channel {
|
|
|
- mode: ChannelMode::Header,
|
|
|
- callback: tx,
|
|
|
- });
|
|
|
-
|
|
|
rx
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl PacketHandler for StreamManager {
|
|
|
- fn handle(&mut self, _cmd: u8, data: Vec<u8>) {
|
|
|
+impl Channel {
|
|
|
+ fn handle_packet(&mut self, cmd: u8, data: Vec<u8>) {
|
|
|
let data = ArcVec::new(data);
|
|
|
let mut packet = Cursor::new(&data as &[u8]);
|
|
|
+ packet.read_u16::<BigEndian>().unwrap(); // Skip channel id
|
|
|
|
|
|
- let id: ChannelId = packet.read_u16::<BigEndian>().unwrap();
|
|
|
- let mut close = false;
|
|
|
- {
|
|
|
- let channel = match self.channels.get_mut(&id) {
|
|
|
- Some(ch) => ch,
|
|
|
- None => {
|
|
|
- return;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- match channel.mode {
|
|
|
+ if cmd == 0xa {
|
|
|
+ self.callback.take().map(|c| c.fail(StreamError));
|
|
|
+ } else {
|
|
|
+ match self.mode {
|
|
|
ChannelMode::Header => {
|
|
|
let mut length = 0;
|
|
|
|
|
|
- while packet.position() < data.len() as u64 && !close {
|
|
|
+ while packet.position() < data.len() as u64 {
|
|
|
length = packet.read_u16::<BigEndian>().unwrap();
|
|
|
if length > 0 {
|
|
|
let header_id = packet.read_u8().unwrap();
|
|
|
- channel.callback
|
|
|
- .send(StreamEvent::Header(
|
|
|
- header_id,
|
|
|
- data.clone()
|
|
|
- .offset(packet.position() as usize)
|
|
|
- .limit(length as usize - 1)
|
|
|
- ))
|
|
|
- .unwrap_or_else(|_| {
|
|
|
- close = true;
|
|
|
- });
|
|
|
+ let header_data = data.clone()
|
|
|
+ .offset(packet.position() as usize)
|
|
|
+ .limit(length as usize - 1);
|
|
|
+
|
|
|
+ let header = StreamEvent::Header(header_id, header_data);
|
|
|
+
|
|
|
+ self.callback = self.callback.take().and_then(|c| c.send(header).await().ok());
|
|
|
|
|
|
packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if length == 0 {
|
|
|
- channel.mode = ChannelMode::Data;
|
|
|
+ self.mode = ChannelMode::Data;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
ChannelMode::Data => {
|
|
|
if packet.position() < data.len() as u64 {
|
|
|
- channel.callback
|
|
|
- .send(StreamEvent::Data(data.clone()
|
|
|
- .offset(packet.position() as usize)))
|
|
|
- .unwrap_or_else(|_| {
|
|
|
- close = true;
|
|
|
- });
|
|
|
+ let event_data = data.clone().offset(packet.position() as usize);
|
|
|
+ let event = StreamEvent::Data(event_data);
|
|
|
+
|
|
|
+ self.callback = self.callback.take().and_then(|c| c.send(event).await().ok());
|
|
|
} else {
|
|
|
- close = true;
|
|
|
+ self.callback = None;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- if close {
|
|
|
- self.channels.remove(&id);
|
|
|
+impl PacketHandler for StreamManager {
|
|
|
+ fn handle(&mut self, cmd: u8, data: Vec<u8>) {
|
|
|
+ let id: ChannelId = BigEndian::read_u16(&data[0..2]);
|
|
|
+
|
|
|
+ if let Entry::Occupied(mut entry) = self.channels.entry(id) {
|
|
|
+ entry.get_mut().handle_packet(cmd, data);
|
|
|
+
|
|
|
+ if entry.get().callback.is_none() {
|
|
|
+ entry.remove();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|