fetch.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
  2. use bytes::Bytes;
  3. use futures::sync::{mpsc, oneshot};
  4. use futures::Stream;
  5. use futures::{Async, Future, Poll};
  6. use std::cmp::min;
  7. use std::fs;
  8. use std::io::{self, Read, Seek, SeekFrom, Write};
  9. use std::sync::{Arc, Condvar, Mutex};
  10. use std::time::{Duration, Instant};
  11. use tempfile::NamedTempFile;
  12. use range_set::{Range, RangeSet};
  13. use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  14. use librespot_core::session::Session;
  15. use librespot_core::spotify_id::FileId;
  16. use futures::sync::mpsc::unbounded;
  17. use std::sync::atomic;
  18. use std::sync::atomic::AtomicUsize;
  19. const MINIMUM_CHUNK_SIZE: usize = 1024 * 16;
  20. const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128;
  21. const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5;
  22. pub enum AudioFile {
  23. Cached(fs::File),
  24. Streaming(AudioFileStreaming),
  25. }
  26. pub enum AudioFileOpen {
  27. Cached(Option<fs::File>),
  28. Streaming(AudioFileOpenStreaming),
  29. }
  30. pub struct AudioFileOpenStreaming {
  31. session: Session,
  32. initial_data_rx: Option<ChannelData>,
  33. initial_data_length: Option<usize>,
  34. initial_request_sent_time: Instant,
  35. headers: ChannelHeaders,
  36. file_id: FileId,
  37. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  38. }
  39. enum StreamLoaderCommand{
  40. Fetch(Range), // signal the stream loader to fetch a range of the file
  41. RandomAccessMode(), // optimise download strategy for random access
  42. StreamMode(), // optimise download strategy for streaming
  43. StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second.
  44. Close(), // terminate and don't load any more data
  45. }
  46. #[derive(Clone)]
  47. pub struct StreamLoaderController {
  48. channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
  49. stream_shared: Option<Arc<AudioFileShared>>,
  50. file_size: usize,
  51. }
  52. impl StreamLoaderController {
  53. pub fn len(&self) -> usize {
  54. return self.file_size;
  55. }
  56. pub fn range_available(&self, range: Range) -> bool {
  57. if let Some(ref shared) = self.stream_shared {
  58. let download_status = shared.download_status.lock().unwrap();
  59. if range.length <= download_status.downloaded.contained_length_from_value(range.start) {
  60. return true;
  61. } else {
  62. return false;
  63. }
  64. } else {
  65. if range.length <= self.len() - range.start {
  66. return true;
  67. } else {
  68. return false;
  69. }
  70. }
  71. }
  72. pub fn ping_time_ms(&self) -> usize {
  73. if let Some(ref shared) = self.stream_shared {
  74. return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
  75. } else {
  76. return 0;
  77. }
  78. }
  79. fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
  80. if let Some(ref mut channel) = self.channel_tx {
  81. // ignore the error in case the channel has been closed already.
  82. let _ = channel.unbounded_send(command);
  83. }
  84. }
  85. pub fn fetch(&mut self, range: Range) {
  86. // signal the stream loader to fetch a range of the file
  87. self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
  88. }
  89. pub fn fetch_blocking(&mut self, mut range: Range) {
  90. // signal the stream loader to tech a range of the file and block until it is loaded.
  91. // ensure the range is within the file's bounds.
  92. if range.start >= self.len() {
  93. range.length = 0;
  94. } else if range.end() > self.len() {
  95. range.length = self.len() - range.start;
  96. }
  97. self.fetch(range);
  98. if let Some(ref shared) = self.stream_shared {
  99. let mut download_status = shared.download_status.lock().unwrap();
  100. while range.length > download_status.downloaded.contained_length_from_value(range.start) {
  101. download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
  102. if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) {
  103. // For some reason, the requested range is neither downloaded nor requested.
  104. // This could be due to a network error. Request it again.
  105. // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly.
  106. if let Some(ref mut channel) = self.channel_tx {
  107. // ignore the error in case the channel has been closed already.
  108. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
  109. }
  110. }
  111. }
  112. }
  113. }
  114. pub fn fetch_next(&mut self, length: usize) {
  115. let range:Range = if let Some(ref shared) = self.stream_shared {
  116. Range {
  117. start: shared.read_position.load(atomic::Ordering::Relaxed),
  118. length: length,
  119. }
  120. } else {
  121. return;
  122. };
  123. self.fetch(range);
  124. }
  125. pub fn fetch_next_blocking(&mut self, length: usize) {
  126. let range:Range = if let Some(ref shared) = self.stream_shared {
  127. Range {
  128. start: shared.read_position.load(atomic::Ordering::Relaxed),
  129. length: length,
  130. }
  131. } else {
  132. return;
  133. };
  134. self.fetch_blocking(range);
  135. }
  136. pub fn set_random_access_mode(&mut self) {
  137. // optimise download strategy for random access
  138. self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
  139. }
  140. pub fn set_stream_mode(&mut self) {
  141. // optimise download strategy for streaming
  142. self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
  143. }
  144. pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) {
  145. // when optimising for streaming, assume a streaming rate of this many bytes per second.
  146. self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second));
  147. }
  148. pub fn close(&mut self) {
  149. // terminate stream loading and don't load any more data for this file.
  150. self.send_stream_loader_command(StreamLoaderCommand::Close());
  151. }
  152. }
  153. pub struct AudioFileStreaming {
  154. read_file: fs::File,
  155. position: u64,
  156. stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
  157. shared: Arc<AudioFileShared>,
  158. }
  159. struct AudioFileDownloadStatus {
  160. requested: RangeSet,
  161. downloaded: RangeSet,
  162. }
  163. struct AudioFileShared {
  164. file_id: FileId,
  165. file_size: usize,
  166. cond: Condvar,
  167. download_status: Mutex<AudioFileDownloadStatus>,
  168. ping_time_ms: AtomicUsize,
  169. read_position: AtomicUsize,
  170. }
  171. impl AudioFileOpenStreaming {
  172. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  173. let shared = Arc::new(AudioFileShared {
  174. file_id: self.file_id,
  175. file_size: size,
  176. cond: Condvar::new(),
  177. download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}),
  178. ping_time_ms: AtomicUsize::new(0),
  179. read_position: AtomicUsize::new(0),
  180. });
  181. let mut write_file = NamedTempFile::new().unwrap();
  182. write_file.as_file().set_len(size as u64).unwrap();
  183. write_file.seek(SeekFrom::Start(0)).unwrap();
  184. let read_file = write_file.reopen().unwrap();
  185. let initial_data_rx = self.initial_data_rx.take().unwrap();
  186. let initial_data_length = self.initial_data_length.take().unwrap();
  187. let complete_tx = self.complete_tx.take().unwrap();
  188. //let (seek_tx, seek_rx) = mpsc::unbounded();
  189. let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::<StreamLoaderCommand>();
  190. let fetcher = AudioFileFetch::new(
  191. self.session.clone(),
  192. shared.clone(),
  193. initial_data_rx,
  194. self.initial_request_sent_time,
  195. initial_data_length,
  196. write_file,
  197. stream_loader_command_rx,
  198. complete_tx,
  199. );
  200. self.session.spawn(move |_| fetcher);
  201. AudioFileStreaming {
  202. read_file: read_file,
  203. position: 0,
  204. //seek: seek_tx,
  205. stream_loader_command_tx: stream_loader_command_tx,
  206. shared: shared,
  207. }
  208. }
  209. }
  210. impl Future for AudioFileOpen {
  211. type Item = AudioFile;
  212. type Error = ChannelError;
  213. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  214. match *self {
  215. AudioFileOpen::Streaming(ref mut open) => {
  216. let file = try_ready!(open.poll());
  217. Ok(Async::Ready(AudioFile::Streaming(file)))
  218. }
  219. AudioFileOpen::Cached(ref mut file) => {
  220. let file = file.take().unwrap();
  221. Ok(Async::Ready(AudioFile::Cached(file)))
  222. }
  223. }
  224. }
  225. }
  226. impl Future for AudioFileOpenStreaming {
  227. type Item = AudioFileStreaming;
  228. type Error = ChannelError;
  229. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  230. loop {
  231. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  232. if id == 0x3 {
  233. let size = BigEndian::read_u32(&data) as usize * 4;
  234. let file = self.finish(size);
  235. return Ok(Async::Ready(file));
  236. }
  237. }
  238. }
  239. }
  240. impl AudioFile {
  241. pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
  242. let cache = session.cache().cloned();
  243. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  244. debug!("File {} already in cache", file_id);
  245. return AudioFileOpen::Cached(Some(file));
  246. }
  247. debug!("Downloading file {}", file_id);
  248. let (complete_tx, complete_rx) = oneshot::channel();
  249. let initial_data_length = MINIMUM_CHUNK_SIZE;
  250. let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
  251. let open = AudioFileOpenStreaming {
  252. session: session.clone(),
  253. file_id: file_id,
  254. headers: headers,
  255. initial_data_rx: Some(data),
  256. initial_data_length: Some(initial_data_length),
  257. initial_request_sent_time: Instant::now(),
  258. complete_tx: Some(complete_tx),
  259. };
  260. let session_ = session.clone();
  261. session.spawn(move |_| {
  262. complete_rx
  263. .map(move |mut file| {
  264. if let Some(cache) = session_.cache() {
  265. cache.save_file(file_id, &mut file);
  266. debug!("File {} complete, saving to cache", file_id);
  267. } else {
  268. debug!("File {} complete", file_id);
  269. }
  270. })
  271. .or_else(|oneshot::Canceled| Ok(()))
  272. });
  273. AudioFileOpen::Streaming(open)
  274. }
  275. pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
  276. match self {
  277. AudioFile::Streaming(stream) => {
  278. return StreamLoaderController {
  279. channel_tx: Some(stream.stream_loader_command_tx.clone()),
  280. stream_shared: Some(stream.shared.clone()),
  281. file_size: stream.shared.file_size,
  282. }
  283. }
  284. AudioFile::Cached(ref file) => {
  285. return StreamLoaderController {
  286. channel_tx: None,
  287. stream_shared: None,
  288. file_size: file.metadata().unwrap().len() as usize,
  289. }
  290. }
  291. }
  292. }
  293. }
  294. fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
  295. trace!("requesting range starting at {} of length {}", offset, length);
  296. let start = offset / 4;
  297. let mut end = (offset+length) / 4;
  298. if (offset+length) % 4 != 0 {
  299. end += 1;
  300. }
  301. let (id, channel) = session.channel().allocate();
  302. let mut data: Vec<u8> = Vec::new();
  303. data.write_u16::<BigEndian>(id).unwrap();
  304. data.write_u8(0).unwrap();
  305. data.write_u8(1).unwrap();
  306. data.write_u16::<BigEndian>(0x0000).unwrap();
  307. data.write_u32::<BigEndian>(0x00000000).unwrap();
  308. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  309. data.write_u32::<BigEndian>(0x00020000).unwrap();
  310. data.write(&file.0).unwrap();
  311. data.write_u32::<BigEndian>(start as u32).unwrap();
  312. data.write_u32::<BigEndian>(end as u32).unwrap();
  313. session.send_packet(0x8, data);
  314. channel
  315. }
  316. struct PartialFileData {
  317. offset: usize,
  318. data: Bytes,
  319. }
  320. enum ReceivedData {
  321. ResponseTimeMs(usize),
  322. Data(PartialFileData),
  323. }
  324. struct AudioFileFetchDataReceiver {
  325. shared: Arc<AudioFileShared>,
  326. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  327. data_rx: ChannelData,
  328. data_offset: usize,
  329. request_length: usize,
  330. request_sent_time: Option<Instant>,
  331. }
  332. impl AudioFileFetchDataReceiver {
  333. fn new(
  334. shared: Arc<AudioFileShared>,
  335. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  336. data_rx: ChannelData,
  337. data_offset: usize,
  338. request_length: usize,
  339. request_sent_time: Instant,
  340. ) -> AudioFileFetchDataReceiver {
  341. AudioFileFetchDataReceiver {
  342. shared: shared,
  343. data_rx: data_rx,
  344. file_data_tx: file_data_tx,
  345. data_offset: data_offset,
  346. request_length: request_length,
  347. request_sent_time: Some(request_sent_time),
  348. }
  349. }
  350. }
  351. impl AudioFileFetchDataReceiver {
  352. fn finish(&mut self) {
  353. if self.request_length > 0 {
  354. let missing_range = Range::new(self.data_offset, self.request_length);
  355. let mut download_status = self.shared.download_status.lock().unwrap();
  356. download_status.requested.subtract_range(&missing_range);
  357. self.shared.cond.notify_all();
  358. }
  359. }
  360. }
  361. impl Future for AudioFileFetchDataReceiver {
  362. type Item = ();
  363. type Error = ();
  364. fn poll(&mut self) -> Poll<(), ()> {
  365. loop {
  366. trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length);
  367. match self.data_rx.poll() {
  368. Ok(Async::Ready(Some(data))) => {
  369. if let Some(request_sent_time) = self.request_sent_time {
  370. let duration = Instant::now() - request_sent_time;
  371. let duration_ms: u64;
  372. if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
  373. duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000;
  374. }else {
  375. duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64;
  376. }
  377. let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
  378. }
  379. let data_size = data.len();
  380. trace!("data_receiver got {} bytes of data", data_size);
  381. let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, }));
  382. self.data_offset += data_size;
  383. if self.request_length < data_size {
  384. warn!("Received more data from server than requested.");
  385. self.request_length = 0;
  386. } else {
  387. self.request_length -= data_size;
  388. }
  389. if self.request_length == 0 {
  390. trace!("Data receiver completed at position {}", self.data_offset);
  391. return Ok(Async::Ready(()));
  392. }
  393. }
  394. Ok(Async::Ready(None)) => {
  395. if self.request_length > 0 {
  396. warn!("Received less data from server than requested.");
  397. self.finish();
  398. }
  399. return Ok(Async::Ready(()));
  400. }
  401. Ok(Async::NotReady) => {
  402. //trace!("No more data for data_receiver at the moment.");
  403. return Ok(Async::NotReady);
  404. }
  405. Err(ChannelError) => {
  406. warn!("error from channel");
  407. self.finish();
  408. return Ok(Async::Ready(()));
  409. }
  410. }
  411. }
  412. }
  413. }
  414. enum DownloadStrategy {
  415. RandomAccess(),
  416. Streaming(),
  417. }
  418. struct AudioFileFetch {
  419. session: Session,
  420. shared: Arc<AudioFileShared>,
  421. output: Option<NamedTempFile>,
  422. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  423. file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
  424. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  425. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  426. download_strategy: DownloadStrategy,
  427. streaming_data_rate: usize,
  428. network_response_times_ms: Vec<usize>,
  429. }
  430. impl AudioFileFetch {
  431. fn new(
  432. session: Session,
  433. shared: Arc<AudioFileShared>,
  434. initial_data_rx: ChannelData,
  435. initial_request_sent_time: Instant,
  436. initial_data_length: usize,
  437. output: NamedTempFile,
  438. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  439. complete_tx: oneshot::Sender<NamedTempFile>,
  440. ) -> AudioFileFetch {
  441. let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
  442. {
  443. let requested_range = Range::new(0, initial_data_length);
  444. let mut download_status = shared.download_status.lock().unwrap();
  445. download_status.requested.add_range(&requested_range);
  446. }
  447. let initial_data_receiver = AudioFileFetchDataReceiver::new(
  448. shared.clone(),
  449. file_data_tx.clone(),
  450. initial_data_rx,
  451. 0,
  452. initial_data_length,
  453. initial_request_sent_time,
  454. );
  455. session.spawn(move |_| initial_data_receiver);
  456. AudioFileFetch {
  457. session: session,
  458. shared: shared,
  459. output: Some(output),
  460. file_data_tx: file_data_tx,
  461. file_data_rx: file_data_rx,
  462. stream_loader_command_rx: stream_loader_command_rx,
  463. complete_tx: Some(complete_tx),
  464. download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise
  465. streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise.
  466. network_response_times_ms: Vec::new(),
  467. }
  468. }
  469. fn download_range(&mut self, mut offset: usize, mut length: usize) {
  470. if length < MINIMUM_CHUNK_SIZE {
  471. length = MINIMUM_CHUNK_SIZE;
  472. }
  473. // ensure the values are within the bounds and align them by 4 for the spotify protocol.
  474. if offset >= self.shared.file_size {
  475. return;
  476. }
  477. if length <= 0 {
  478. return;
  479. }
  480. if offset + length > self.shared.file_size {
  481. length = self.shared.file_size - offset;
  482. }
  483. if offset % 4 != 0 {
  484. length += offset % 4;
  485. offset -= offset % 4;
  486. }
  487. if length % 4 != 0 {
  488. length += 4 - (length % 4);
  489. }
  490. let mut ranges_to_request = RangeSet::new();
  491. ranges_to_request.add_range(&Range::new(offset, length));
  492. let mut download_status = self.shared.download_status.lock().unwrap();
  493. ranges_to_request.subtract_range_set(&download_status.downloaded);
  494. ranges_to_request.subtract_range_set(&download_status.requested);
  495. for range in ranges_to_request.iter() {
  496. let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split();
  497. download_status.requested.add_range(range);
  498. let receiver = AudioFileFetchDataReceiver::new(
  499. self.shared.clone(),
  500. self.file_data_tx.clone(),
  501. data,
  502. range.start,
  503. range.length,
  504. Instant::now(),
  505. );
  506. self.session.spawn(move |_| receiver);
  507. }
  508. }
  509. fn pre_fetch_more_data(&mut self) {
  510. // determine what is still missing
  511. let mut missing_data = RangeSet::new();
  512. missing_data.add_range(&Range::new(0,self.shared.file_size));
  513. {
  514. let download_status = self.shared.download_status.lock().unwrap();
  515. missing_data.subtract_range_set(&download_status.downloaded);
  516. missing_data.subtract_range_set(&download_status.requested);
  517. }
  518. // download data from after the current read position first
  519. let mut tail_end = RangeSet::new();
  520. let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
  521. tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
  522. let tail_end = tail_end.intersection(&missing_data);
  523. if ! tail_end.is_empty() {
  524. let range = tail_end.get_range(0);
  525. let offset = range.start;
  526. let length = min(range.length, MAXIMUM_CHUNK_SIZE);
  527. self.download_range(offset, length);
  528. } else if ! missing_data.is_empty() {
  529. // ok, the tail is downloaded, download something fom the beginning.
  530. let range = missing_data.get_range(0);
  531. let offset = range.start;
  532. let length = min(range.length, MAXIMUM_CHUNK_SIZE);
  533. self.download_range(offset, length);
  534. }
  535. }
  536. fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
  537. loop {
  538. match self.file_data_rx.poll() {
  539. Ok(Async::Ready(None)) => {
  540. trace!("File data channel closed.");
  541. return Ok(Async::Ready(()));
  542. }
  543. Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
  544. trace!("Received ping time information: {} ms.", response_time_ms);
  545. // record the response time
  546. self.network_response_times_ms.push(response_time_ms);
  547. // prone old response times. Keep at most three.
  548. while self.network_response_times_ms.len() > 3 {
  549. self.network_response_times_ms.remove(0);
  550. }
  551. // stats::median is experimental. So we calculate the median of up to three ourselves.
  552. let ping_time_ms: usize = match self.network_response_times_ms.len() {
  553. 1 => self.network_response_times_ms[0] as usize,
  554. 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize,
  555. 3 => {
  556. let mut times = self.network_response_times_ms.clone();
  557. times.sort();
  558. times[1]
  559. }
  560. _ => unreachable!(),
  561. };
  562. // store our new estimate for everyone to see
  563. self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed);
  564. },
  565. Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
  566. trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len());
  567. self.output
  568. .as_mut()
  569. .unwrap()
  570. .seek(SeekFrom::Start(data.offset as u64))
  571. .unwrap();
  572. self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap();
  573. let mut full = false;
  574. {
  575. let mut download_status = self.shared.download_status.lock().unwrap();
  576. let received_range = Range::new(data.offset, data.data.len());
  577. download_status.downloaded.add_range(&received_range);
  578. self.shared.cond.notify_all();
  579. if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size {
  580. full = true;
  581. }
  582. drop(download_status);
  583. }
  584. if full {
  585. self.finish();
  586. return Ok(Async::Ready(()));
  587. }
  588. }
  589. Ok(Async::NotReady) => {
  590. return Ok(Async::NotReady);
  591. },
  592. Err(()) => unreachable!(),
  593. }
  594. }
  595. }
  596. fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
  597. loop {
  598. match self.stream_loader_command_rx.poll() {
  599. Ok(Async::Ready(None)) => {}
  600. Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
  601. self.download_range(request.start, request.length);
  602. }
  603. Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
  604. self.download_strategy = DownloadStrategy::RandomAccess();
  605. }
  606. Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
  607. self.download_strategy = DownloadStrategy::Streaming();
  608. }
  609. Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => {
  610. self.streaming_data_rate = rate;
  611. }
  612. Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
  613. return Ok(Async::Ready(()));
  614. }
  615. Ok(Async::NotReady) => {
  616. return Ok(Async::NotReady)
  617. },
  618. Err(()) => unreachable!(),
  619. }
  620. }
  621. }
  622. fn finish(&mut self) {
  623. trace!("====== FINISHED DOWNLOADING FILE! ======");
  624. let mut output = self.output.take().unwrap();
  625. let complete_tx = self.complete_tx.take().unwrap();
  626. output.seek(SeekFrom::Start(0)).unwrap();
  627. let _ = complete_tx.send(output);
  628. }
  629. }
  630. impl Future for AudioFileFetch {
  631. type Item = ();
  632. type Error = ();
  633. fn poll(&mut self) -> Poll<(), ()> {
  634. trace!("Polling AudioFileFetch");
  635. match self.poll_stream_loader_command_rx() {
  636. Ok(Async::NotReady) => (),
  637. Ok(Async::Ready(_)) => {
  638. return Ok(Async::Ready(()));
  639. }
  640. Err(()) => unreachable!(),
  641. }
  642. match self.poll_file_data_rx() {
  643. Ok(Async::NotReady) => (),
  644. Ok(Async::Ready(_)) => {
  645. return Ok(Async::Ready(()));
  646. }
  647. Err(()) => unreachable!(),
  648. }
  649. if let DownloadStrategy::Streaming() = self.download_strategy {
  650. let bytes_pending: usize = {
  651. let download_status = self.shared.download_status.lock().unwrap();
  652. download_status.requested.minus(&download_status.downloaded).len()
  653. };
  654. let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed);
  655. if bytes_pending < 2 * ping_time * self.streaming_data_rate {
  656. self.pre_fetch_more_data();
  657. }
  658. }
  659. return Ok(Async::NotReady)
  660. }
  661. }
  662. impl Read for AudioFileStreaming {
  663. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  664. let offset = self.position as usize;
  665. if offset >= self.shared.file_size {
  666. return Ok(0);
  667. }
  668. let length = min(output.len(), self.shared.file_size - offset);
  669. if length == 0 {
  670. return Ok(0);
  671. }
  672. let mut ranges_to_request = RangeSet::new();
  673. ranges_to_request.add_range(&Range::new(offset, length));
  674. let mut download_status = self.shared.download_status.lock().unwrap();
  675. ranges_to_request.subtract_range_set(&download_status.downloaded);
  676. ranges_to_request.subtract_range_set(&download_status.requested);
  677. for range in ranges_to_request.iter() {
  678. debug!("requesting data at position {} (length : {})", range.start, range.length);
  679. self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap();
  680. }
  681. while !download_status.downloaded.contains(offset) {
  682. download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
  683. }
  684. let available_length = download_status.downloaded.contained_length_from_value(offset);
  685. assert!(available_length > 0);
  686. drop(download_status);
  687. self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
  688. let read_len = min(length, available_length);
  689. let read_len = try!(self.read_file.read(&mut output[..read_len]));
  690. self.position += read_len as u64;
  691. self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
  692. return Ok(read_len);
  693. }
  694. }
  695. impl Seek for AudioFileStreaming {
  696. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  697. self.position = try!(self.read_file.seek(pos));
  698. // Do not seek past EOF
  699. self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
  700. Ok(self.position)
  701. }
  702. }
  703. impl Read for AudioFile {
  704. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  705. match *self {
  706. AudioFile::Cached(ref mut file) => file.read(output),
  707. AudioFile::Streaming(ref mut file) => file.read(output),
  708. }
  709. }
  710. }
  711. impl Seek for AudioFile {
  712. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  713. match *self {
  714. AudioFile::Cached(ref mut file) => file.seek(pos),
  715. AudioFile::Streaming(ref mut file) => file.seek(pos),
  716. }
  717. }
  718. }