fetch.rs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083
  1. use crate::range_set::{Range, RangeSet};
  2. use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
  3. use bytes::Bytes;
  4. use futures::sync::{mpsc, oneshot};
  5. use futures::Stream;
  6. use futures::{Async, Future, Poll};
  7. use std::cmp::{max, min};
  8. use std::fs;
  9. use std::io::{self, Read, Seek, SeekFrom, Write};
  10. use std::sync::{Arc, Condvar, Mutex};
  11. use std::time::{Duration, Instant};
  12. use tempfile::NamedTempFile;
  13. use futures::sync::mpsc::unbounded;
  14. use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  15. use librespot_core::session::Session;
  16. use librespot_core::spotify_id::FileId;
  17. use std::sync::atomic;
  18. use std::sync::atomic::AtomicUsize;
  19. const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
  20. // The minimum size of a block that is requested from the Spotify servers in one request.
  21. // This is the block size that is typically requested while doing a seek() on a file.
  22. // Note: smaller requests can happen if part of the block is downloaded already.
  23. const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
  24. // The amount of data that is requested when initially opening a file.
  25. // Note: if the file is opened to play from the beginning, the amount of data to
  26. // read ahead is requested in addition to this amount. If the file is opened to seek to
  27. // another position, then only this amount is requested on the first request.
  28. const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
  29. // The pig time that is used for calculations before a ping time was actually measured.
  30. const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
  31. // If the measured ping time to the Spotify server is larger than this value, it is capped
  32. // to avoid run-away block sizes and pre-fetching.
  33. pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
  34. // Before playback starts, this many seconds of data must be present.
  35. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  36. // of audio data may be larger or smaller.
  37. pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
  38. // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  39. // time to the Spotify server.
  40. // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
  41. // obeyed.
  42. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  43. // of audio data may be larger or smaller.
  44. pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
  45. // While playing back, this many seconds of data ahead of the current read position are
  46. // requested.
  47. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  48. // of audio data may be larger or smaller.
  49. pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
  50. // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  51. // time to the Spotify server.
  52. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  53. // of audio data may be larger or smaller.
  54. const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
  55. // If the amount of data that is pending (requested but not received) is less than a certain amount,
  56. // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
  57. // data is calculated as
  58. // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
  59. const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
  60. // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
  61. // The formula used is
  62. // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
  63. // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
  64. // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
  65. // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
  66. // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
  67. // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
  68. const MAX_PREFETCH_REQUESTS: usize = 4;
  69. // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
  70. // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
  71. // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
  72. // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
  73. pub enum AudioFile {
  74. Cached(fs::File),
  75. Streaming(AudioFileStreaming),
  76. }
  77. pub enum AudioFileOpen {
  78. Cached(Option<fs::File>),
  79. Streaming(AudioFileOpenStreaming),
  80. }
  81. pub struct AudioFileOpenStreaming {
  82. session: Session,
  83. initial_data_rx: Option<ChannelData>,
  84. initial_data_length: Option<usize>,
  85. initial_request_sent_time: Instant,
  86. headers: ChannelHeaders,
  87. file_id: FileId,
  88. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  89. streaming_data_rate: usize,
  90. }
  91. enum StreamLoaderCommand {
  92. Fetch(Range), // signal the stream loader to fetch a range of the file
  93. RandomAccessMode(), // optimise download strategy for random access
  94. StreamMode(), // optimise download strategy for streaming
  95. Close(), // terminate and don't load any more data
  96. }
  97. #[derive(Clone)]
  98. pub struct StreamLoaderController {
  99. channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
  100. stream_shared: Option<Arc<AudioFileShared>>,
  101. file_size: usize,
  102. }
  103. impl StreamLoaderController {
  104. pub fn len(&self) -> usize {
  105. return self.file_size;
  106. }
  107. pub fn range_available(&self, range: Range) -> bool {
  108. if let Some(ref shared) = self.stream_shared {
  109. let download_status = shared.download_status.lock().unwrap();
  110. if range.length
  111. <= download_status
  112. .downloaded
  113. .contained_length_from_value(range.start)
  114. {
  115. return true;
  116. } else {
  117. return false;
  118. }
  119. } else {
  120. if range.length <= self.len() - range.start {
  121. return true;
  122. } else {
  123. return false;
  124. }
  125. }
  126. }
  127. pub fn ping_time_ms(&self) -> usize {
  128. if let Some(ref shared) = self.stream_shared {
  129. return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
  130. } else {
  131. return 0;
  132. }
  133. }
  134. fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
  135. if let Some(ref mut channel) = self.channel_tx {
  136. // ignore the error in case the channel has been closed already.
  137. let _ = channel.unbounded_send(command);
  138. }
  139. }
  140. pub fn fetch(&mut self, range: Range) {
  141. // signal the stream loader to fetch a range of the file
  142. self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
  143. }
  144. pub fn fetch_blocking(&mut self, mut range: Range) {
  145. // signal the stream loader to tech a range of the file and block until it is loaded.
  146. // ensure the range is within the file's bounds.
  147. if range.start >= self.len() {
  148. range.length = 0;
  149. } else if range.end() > self.len() {
  150. range.length = self.len() - range.start;
  151. }
  152. self.fetch(range);
  153. if let Some(ref shared) = self.stream_shared {
  154. let mut download_status = shared.download_status.lock().unwrap();
  155. while range.length
  156. > download_status
  157. .downloaded
  158. .contained_length_from_value(range.start)
  159. {
  160. download_status = shared
  161. .cond
  162. .wait_timeout(download_status, Duration::from_millis(1000))
  163. .unwrap()
  164. .0;
  165. if range.length
  166. > (download_status
  167. .downloaded
  168. .union(&download_status.requested)
  169. .contained_length_from_value(range.start))
  170. {
  171. // For some reason, the requested range is neither downloaded nor requested.
  172. // This could be due to a network error. Request it again.
  173. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
  174. if let Some(ref mut channel) = self.channel_tx {
  175. // ignore the error in case the channel has been closed already.
  176. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
  177. }
  178. }
  179. }
  180. }
  181. }
  182. pub fn fetch_next(&mut self, length: usize) {
  183. let range: Range = if let Some(ref shared) = self.stream_shared {
  184. Range {
  185. start: shared.read_position.load(atomic::Ordering::Relaxed),
  186. length: length,
  187. }
  188. } else {
  189. return;
  190. };
  191. self.fetch(range);
  192. }
  193. pub fn fetch_next_blocking(&mut self, length: usize) {
  194. let range: Range = if let Some(ref shared) = self.stream_shared {
  195. Range {
  196. start: shared.read_position.load(atomic::Ordering::Relaxed),
  197. length: length,
  198. }
  199. } else {
  200. return;
  201. };
  202. self.fetch_blocking(range);
  203. }
  204. pub fn set_random_access_mode(&mut self) {
  205. // optimise download strategy for random access
  206. self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
  207. }
  208. pub fn set_stream_mode(&mut self) {
  209. // optimise download strategy for streaming
  210. self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
  211. }
  212. pub fn close(&mut self) {
  213. // terminate stream loading and don't load any more data for this file.
  214. self.send_stream_loader_command(StreamLoaderCommand::Close());
  215. }
  216. }
  217. pub struct AudioFileStreaming {
  218. read_file: fs::File,
  219. position: u64,
  220. stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
  221. shared: Arc<AudioFileShared>,
  222. }
  223. struct AudioFileDownloadStatus {
  224. requested: RangeSet,
  225. downloaded: RangeSet,
  226. }
  227. #[derive(Copy, Clone)]
  228. enum DownloadStrategy {
  229. RandomAccess(),
  230. Streaming(),
  231. }
  232. struct AudioFileShared {
  233. file_id: FileId,
  234. file_size: usize,
  235. stream_data_rate: usize,
  236. cond: Condvar,
  237. download_status: Mutex<AudioFileDownloadStatus>,
  238. download_strategy: Mutex<DownloadStrategy>,
  239. number_of_open_requests: AtomicUsize,
  240. ping_time_ms: AtomicUsize,
  241. read_position: AtomicUsize,
  242. }
  243. impl AudioFileOpenStreaming {
  244. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  245. let shared = Arc::new(AudioFileShared {
  246. file_id: self.file_id,
  247. file_size: size,
  248. stream_data_rate: self.streaming_data_rate,
  249. cond: Condvar::new(),
  250. download_status: Mutex::new(AudioFileDownloadStatus {
  251. requested: RangeSet::new(),
  252. downloaded: RangeSet::new(),
  253. }),
  254. download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
  255. number_of_open_requests: AtomicUsize::new(0),
  256. ping_time_ms: AtomicUsize::new(0),
  257. read_position: AtomicUsize::new(0),
  258. });
  259. let mut write_file = NamedTempFile::new().unwrap();
  260. write_file.as_file().set_len(size as u64).unwrap();
  261. write_file.seek(SeekFrom::Start(0)).unwrap();
  262. let read_file = write_file.reopen().unwrap();
  263. let initial_data_rx = self.initial_data_rx.take().unwrap();
  264. let initial_data_length = self.initial_data_length.take().unwrap();
  265. let complete_tx = self.complete_tx.take().unwrap();
  266. //let (seek_tx, seek_rx) = mpsc::unbounded();
  267. let (stream_loader_command_tx, stream_loader_command_rx) =
  268. mpsc::unbounded::<StreamLoaderCommand>();
  269. let fetcher = AudioFileFetch::new(
  270. self.session.clone(),
  271. shared.clone(),
  272. initial_data_rx,
  273. self.initial_request_sent_time,
  274. initial_data_length,
  275. write_file,
  276. stream_loader_command_rx,
  277. complete_tx,
  278. );
  279. self.session.spawn(move |_| fetcher);
  280. AudioFileStreaming {
  281. read_file: read_file,
  282. position: 0,
  283. //seek: seek_tx,
  284. stream_loader_command_tx: stream_loader_command_tx,
  285. shared: shared,
  286. }
  287. }
  288. }
  289. impl Future for AudioFileOpen {
  290. type Item = AudioFile;
  291. type Error = ChannelError;
  292. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  293. match *self {
  294. AudioFileOpen::Streaming(ref mut open) => {
  295. let file = try_ready!(open.poll());
  296. Ok(Async::Ready(AudioFile::Streaming(file)))
  297. }
  298. AudioFileOpen::Cached(ref mut file) => {
  299. let file = file.take().unwrap();
  300. Ok(Async::Ready(AudioFile::Cached(file)))
  301. }
  302. }
  303. }
  304. }
  305. impl Future for AudioFileOpenStreaming {
  306. type Item = AudioFileStreaming;
  307. type Error = ChannelError;
  308. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  309. loop {
  310. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  311. if id == 0x3 {
  312. let size = BigEndian::read_u32(&data) as usize * 4;
  313. let file = self.finish(size);
  314. return Ok(Async::Ready(file));
  315. }
  316. }
  317. }
  318. }
  319. impl AudioFile {
  320. pub fn open(
  321. session: &Session,
  322. file_id: FileId,
  323. bytes_per_second: usize,
  324. play_from_beginning: bool,
  325. ) -> AudioFileOpen {
  326. let cache = session.cache().cloned();
  327. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  328. debug!("File {} already in cache", file_id);
  329. return AudioFileOpen::Cached(Some(file));
  330. }
  331. debug!("Downloading file {}", file_id);
  332. let (complete_tx, complete_rx) = oneshot::channel();
  333. let mut initial_data_length = if play_from_beginning {
  334. INITIAL_DOWNLOAD_SIZE
  335. + max(
  336. (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
  337. (INITIAL_PING_TIME_ESTIMATE_SECONDS
  338. * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  339. * bytes_per_second as f64) as usize,
  340. )
  341. } else {
  342. INITIAL_DOWNLOAD_SIZE
  343. };
  344. if initial_data_length % 4 != 0 {
  345. initial_data_length += 4 - (initial_data_length % 4);
  346. }
  347. let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
  348. let open = AudioFileOpenStreaming {
  349. session: session.clone(),
  350. file_id: file_id,
  351. headers: headers,
  352. initial_data_rx: Some(data),
  353. initial_data_length: Some(initial_data_length),
  354. initial_request_sent_time: Instant::now(),
  355. complete_tx: Some(complete_tx),
  356. streaming_data_rate: bytes_per_second,
  357. };
  358. let session_ = session.clone();
  359. session.spawn(move |_| {
  360. complete_rx
  361. .map(move |mut file| {
  362. if let Some(cache) = session_.cache() {
  363. cache.save_file(file_id, &mut file);
  364. debug!("File {} complete, saving to cache", file_id);
  365. } else {
  366. debug!("File {} complete", file_id);
  367. }
  368. })
  369. .or_else(|oneshot::Canceled| Ok(()))
  370. });
  371. return AudioFileOpen::Streaming(open);
  372. }
  373. pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
  374. match self {
  375. AudioFile::Streaming(ref stream) => {
  376. return StreamLoaderController {
  377. channel_tx: Some(stream.stream_loader_command_tx.clone()),
  378. stream_shared: Some(stream.shared.clone()),
  379. file_size: stream.shared.file_size,
  380. };
  381. }
  382. AudioFile::Cached(ref file) => {
  383. return StreamLoaderController {
  384. channel_tx: None,
  385. stream_shared: None,
  386. file_size: file.metadata().unwrap().len() as usize,
  387. };
  388. }
  389. }
  390. }
  391. }
  392. fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
  393. assert!(
  394. offset % 4 == 0,
  395. "Range request start positions must be aligned by 4 bytes."
  396. );
  397. assert!(
  398. length % 4 == 0,
  399. "Range request range lengths must be aligned by 4 bytes."
  400. );
  401. let start = offset / 4;
  402. let end = (offset + length) / 4;
  403. let (id, channel) = session.channel().allocate();
  404. let mut data: Vec<u8> = Vec::new();
  405. data.write_u16::<BigEndian>(id).unwrap();
  406. data.write_u8(0).unwrap();
  407. data.write_u8(1).unwrap();
  408. data.write_u16::<BigEndian>(0x0000).unwrap();
  409. data.write_u32::<BigEndian>(0x00000000).unwrap();
  410. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  411. data.write_u32::<BigEndian>(0x00020000).unwrap();
  412. data.write(&file.0).unwrap();
  413. data.write_u32::<BigEndian>(start as u32).unwrap();
  414. data.write_u32::<BigEndian>(end as u32).unwrap();
  415. session.send_packet(0x8, data);
  416. channel
  417. }
  418. struct PartialFileData {
  419. offset: usize,
  420. data: Bytes,
  421. }
  422. enum ReceivedData {
  423. ResponseTimeMs(usize),
  424. Data(PartialFileData),
  425. }
  426. struct AudioFileFetchDataReceiver {
  427. shared: Arc<AudioFileShared>,
  428. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  429. data_rx: ChannelData,
  430. initial_data_offset: usize,
  431. initial_request_length: usize,
  432. data_offset: usize,
  433. request_length: usize,
  434. request_sent_time: Option<Instant>,
  435. measure_ping_time: bool,
  436. }
  437. impl AudioFileFetchDataReceiver {
  438. fn new(
  439. shared: Arc<AudioFileShared>,
  440. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  441. data_rx: ChannelData,
  442. data_offset: usize,
  443. request_length: usize,
  444. request_sent_time: Instant,
  445. ) -> AudioFileFetchDataReceiver {
  446. let measure_ping_time = shared
  447. .number_of_open_requests
  448. .load(atomic::Ordering::SeqCst)
  449. == 0;
  450. shared
  451. .number_of_open_requests
  452. .fetch_add(1, atomic::Ordering::SeqCst);
  453. AudioFileFetchDataReceiver {
  454. shared: shared,
  455. data_rx: data_rx,
  456. file_data_tx: file_data_tx,
  457. initial_data_offset: data_offset,
  458. initial_request_length: request_length,
  459. data_offset: data_offset,
  460. request_length: request_length,
  461. request_sent_time: Some(request_sent_time),
  462. measure_ping_time: measure_ping_time,
  463. }
  464. }
  465. }
  466. impl AudioFileFetchDataReceiver {
  467. fn finish(&mut self) {
  468. if self.request_length > 0 {
  469. let missing_range = Range::new(self.data_offset, self.request_length);
  470. let mut download_status = self.shared.download_status.lock().unwrap();
  471. download_status.requested.subtract_range(&missing_range);
  472. self.shared.cond.notify_all();
  473. }
  474. self.shared
  475. .number_of_open_requests
  476. .fetch_sub(1, atomic::Ordering::SeqCst);
  477. }
  478. }
  479. impl Future for AudioFileFetchDataReceiver {
  480. type Item = ();
  481. type Error = ();
  482. fn poll(&mut self) -> Poll<(), ()> {
  483. loop {
  484. match self.data_rx.poll() {
  485. Ok(Async::Ready(Some(data))) => {
  486. if self.measure_ping_time {
  487. if let Some(request_sent_time) = self.request_sent_time {
  488. let duration = Instant::now() - request_sent_time;
  489. let duration_ms: u64;
  490. if 0.001 * (duration.as_millis() as f64)
  491. > MAXIMUM_ASSUMED_PING_TIME_SECONDS
  492. {
  493. duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
  494. } else {
  495. duration_ms = duration.as_millis() as u64;
  496. }
  497. let _ = self
  498. .file_data_tx
  499. .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
  500. self.measure_ping_time = false;
  501. }
  502. }
  503. let data_size = data.len();
  504. let _ = self
  505. .file_data_tx
  506. .unbounded_send(ReceivedData::Data(PartialFileData {
  507. offset: self.data_offset,
  508. data: data,
  509. }));
  510. self.data_offset += data_size;
  511. if self.request_length < data_size {
  512. warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
  513. self.request_length = 0;
  514. } else {
  515. self.request_length -= data_size;
  516. }
  517. if self.request_length == 0 {
  518. self.finish();
  519. return Ok(Async::Ready(()));
  520. }
  521. }
  522. Ok(Async::Ready(None)) => {
  523. if self.request_length > 0 {
  524. warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
  525. }
  526. self.finish();
  527. return Ok(Async::Ready(()));
  528. }
  529. Ok(Async::NotReady) => {
  530. return Ok(Async::NotReady);
  531. }
  532. Err(ChannelError) => {
  533. warn!(
  534. "Error from channel for data receiver for range {} (+{}).",
  535. self.initial_data_offset, self.initial_request_length
  536. );
  537. self.finish();
  538. return Ok(Async::Ready(()));
  539. }
  540. }
  541. }
  542. }
  543. }
  544. struct AudioFileFetch {
  545. session: Session,
  546. shared: Arc<AudioFileShared>,
  547. output: Option<NamedTempFile>,
  548. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  549. file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
  550. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  551. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  552. network_response_times_ms: Vec<usize>,
  553. }
  554. impl AudioFileFetch {
  555. fn new(
  556. session: Session,
  557. shared: Arc<AudioFileShared>,
  558. initial_data_rx: ChannelData,
  559. initial_request_sent_time: Instant,
  560. initial_data_length: usize,
  561. output: NamedTempFile,
  562. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  563. complete_tx: oneshot::Sender<NamedTempFile>,
  564. ) -> AudioFileFetch {
  565. let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
  566. {
  567. let requested_range = Range::new(0, initial_data_length);
  568. let mut download_status = shared.download_status.lock().unwrap();
  569. download_status.requested.add_range(&requested_range);
  570. }
  571. let initial_data_receiver = AudioFileFetchDataReceiver::new(
  572. shared.clone(),
  573. file_data_tx.clone(),
  574. initial_data_rx,
  575. 0,
  576. initial_data_length,
  577. initial_request_sent_time,
  578. );
  579. session.spawn(move |_| initial_data_receiver);
  580. AudioFileFetch {
  581. session: session,
  582. shared: shared,
  583. output: Some(output),
  584. file_data_tx: file_data_tx,
  585. file_data_rx: file_data_rx,
  586. stream_loader_command_rx: stream_loader_command_rx,
  587. complete_tx: Some(complete_tx),
  588. network_response_times_ms: Vec::new(),
  589. }
  590. }
  591. fn get_download_strategy(&mut self) -> DownloadStrategy {
  592. *(self.shared.download_strategy.lock().unwrap())
  593. }
  594. fn download_range(&mut self, mut offset: usize, mut length: usize) {
  595. if length < MINIMUM_DOWNLOAD_SIZE {
  596. length = MINIMUM_DOWNLOAD_SIZE;
  597. }
  598. // ensure the values are within the bounds and align them by 4 for the spotify protocol.
  599. if offset >= self.shared.file_size {
  600. return;
  601. }
  602. if length <= 0 {
  603. return;
  604. }
  605. if offset + length > self.shared.file_size {
  606. length = self.shared.file_size - offset;
  607. }
  608. if offset % 4 != 0 {
  609. length += offset % 4;
  610. offset -= offset % 4;
  611. }
  612. if length % 4 != 0 {
  613. length += 4 - (length % 4);
  614. }
  615. let mut ranges_to_request = RangeSet::new();
  616. ranges_to_request.add_range(&Range::new(offset, length));
  617. let mut download_status = self.shared.download_status.lock().unwrap();
  618. ranges_to_request.subtract_range_set(&download_status.downloaded);
  619. ranges_to_request.subtract_range_set(&download_status.requested);
  620. for range in ranges_to_request.iter() {
  621. let (_headers, data) = request_range(
  622. &self.session,
  623. self.shared.file_id,
  624. range.start,
  625. range.length,
  626. )
  627. .split();
  628. download_status.requested.add_range(range);
  629. let receiver = AudioFileFetchDataReceiver::new(
  630. self.shared.clone(),
  631. self.file_data_tx.clone(),
  632. data,
  633. range.start,
  634. range.length,
  635. Instant::now(),
  636. );
  637. self.session.spawn(move |_| receiver);
  638. }
  639. }
  640. fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
  641. let mut bytes_to_go = bytes;
  642. let mut requests_to_go = max_requests_to_send;
  643. while bytes_to_go > 0 && requests_to_go > 0 {
  644. // determine what is still missing
  645. let mut missing_data = RangeSet::new();
  646. missing_data.add_range(&Range::new(0, self.shared.file_size));
  647. {
  648. let download_status = self.shared.download_status.lock().unwrap();
  649. missing_data.subtract_range_set(&download_status.downloaded);
  650. missing_data.subtract_range_set(&download_status.requested);
  651. }
  652. // download data from after the current read position first
  653. let mut tail_end = RangeSet::new();
  654. let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
  655. tail_end.add_range(&Range::new(
  656. read_position,
  657. self.shared.file_size - read_position,
  658. ));
  659. let tail_end = tail_end.intersection(&missing_data);
  660. if !tail_end.is_empty() {
  661. let range = tail_end.get_range(0);
  662. let offset = range.start;
  663. let length = min(range.length, bytes_to_go);
  664. self.download_range(offset, length);
  665. requests_to_go -= 1;
  666. bytes_to_go -= length;
  667. } else if !missing_data.is_empty() {
  668. // ok, the tail is downloaded, download something fom the beginning.
  669. let range = missing_data.get_range(0);
  670. let offset = range.start;
  671. let length = min(range.length, bytes_to_go);
  672. self.download_range(offset, length);
  673. requests_to_go -= 1;
  674. bytes_to_go -= length;
  675. } else {
  676. return;
  677. }
  678. }
  679. }
  680. fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
  681. loop {
  682. match self.file_data_rx.poll() {
  683. Ok(Async::Ready(None)) => {
  684. return Ok(Async::Ready(()));
  685. }
  686. Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
  687. trace!("Ping time estimated as: {} ms.", response_time_ms);
  688. // record the response time
  689. self.network_response_times_ms.push(response_time_ms);
  690. // prune old response times. Keep at most three.
  691. while self.network_response_times_ms.len() > 3 {
  692. self.network_response_times_ms.remove(0);
  693. }
  694. // stats::median is experimental. So we calculate the median of up to three ourselves.
  695. let ping_time_ms: usize = match self.network_response_times_ms.len() {
  696. 1 => self.network_response_times_ms[0] as usize,
  697. 2 => {
  698. ((self.network_response_times_ms[0]
  699. + self.network_response_times_ms[1])
  700. / 2) as usize
  701. }
  702. 3 => {
  703. let mut times = self.network_response_times_ms.clone();
  704. times.sort();
  705. times[1]
  706. }
  707. _ => unreachable!(),
  708. };
  709. // store our new estimate for everyone to see
  710. self.shared
  711. .ping_time_ms
  712. .store(ping_time_ms, atomic::Ordering::Relaxed);
  713. }
  714. Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
  715. self.output
  716. .as_mut()
  717. .unwrap()
  718. .seek(SeekFrom::Start(data.offset as u64))
  719. .unwrap();
  720. self.output
  721. .as_mut()
  722. .unwrap()
  723. .write_all(data.data.as_ref())
  724. .unwrap();
  725. let mut full = false;
  726. {
  727. let mut download_status = self.shared.download_status.lock().unwrap();
  728. let received_range = Range::new(data.offset, data.data.len());
  729. download_status.downloaded.add_range(&received_range);
  730. self.shared.cond.notify_all();
  731. if download_status.downloaded.contained_length_from_value(0)
  732. >= self.shared.file_size
  733. {
  734. full = true;
  735. }
  736. drop(download_status);
  737. }
  738. if full {
  739. self.finish();
  740. return Ok(Async::Ready(()));
  741. }
  742. }
  743. Ok(Async::NotReady) => {
  744. return Ok(Async::NotReady);
  745. }
  746. Err(()) => unreachable!(),
  747. }
  748. }
  749. }
  750. fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
  751. loop {
  752. match self.stream_loader_command_rx.poll() {
  753. Ok(Async::Ready(None)) => {
  754. return Ok(Async::Ready(()));
  755. }
  756. Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
  757. self.download_range(request.start, request.length);
  758. }
  759. Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
  760. *(self.shared.download_strategy.lock().unwrap()) =
  761. DownloadStrategy::RandomAccess();
  762. }
  763. Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
  764. *(self.shared.download_strategy.lock().unwrap()) =
  765. DownloadStrategy::Streaming();
  766. }
  767. Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
  768. return Ok(Async::Ready(()));
  769. }
  770. Ok(Async::NotReady) => return Ok(Async::NotReady),
  771. Err(()) => unreachable!(),
  772. }
  773. }
  774. }
  775. fn finish(&mut self) {
  776. let mut output = self.output.take().unwrap();
  777. let complete_tx = self.complete_tx.take().unwrap();
  778. output.seek(SeekFrom::Start(0)).unwrap();
  779. let _ = complete_tx.send(output);
  780. }
  781. }
  782. impl Future for AudioFileFetch {
  783. type Item = ();
  784. type Error = ();
  785. fn poll(&mut self) -> Poll<(), ()> {
  786. match self.poll_stream_loader_command_rx() {
  787. Ok(Async::NotReady) => (),
  788. Ok(Async::Ready(_)) => {
  789. return Ok(Async::Ready(()));
  790. }
  791. Err(()) => unreachable!(),
  792. }
  793. match self.poll_file_data_rx() {
  794. Ok(Async::NotReady) => (),
  795. Ok(Async::Ready(_)) => {
  796. return Ok(Async::Ready(()));
  797. }
  798. Err(()) => unreachable!(),
  799. }
  800. if let DownloadStrategy::Streaming() = self.get_download_strategy() {
  801. let number_of_open_requests = self
  802. .shared
  803. .number_of_open_requests
  804. .load(atomic::Ordering::SeqCst);
  805. let max_requests_to_send =
  806. MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
  807. if max_requests_to_send > 0 {
  808. let bytes_pending: usize = {
  809. let download_status = self.shared.download_status.lock().unwrap();
  810. download_status
  811. .requested
  812. .minus(&download_status.downloaded)
  813. .len()
  814. };
  815. let ping_time_seconds =
  816. 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  817. let download_rate = self.session.channel().get_download_rate_estimate();
  818. let desired_pending_bytes = max(
  819. (PREFETCH_THRESHOLD_FACTOR
  820. * ping_time_seconds
  821. * self.shared.stream_data_rate as f64) as usize,
  822. (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
  823. as usize,
  824. );
  825. if bytes_pending < desired_pending_bytes {
  826. self.pre_fetch_more_data(
  827. desired_pending_bytes - bytes_pending,
  828. max_requests_to_send,
  829. );
  830. }
  831. }
  832. }
  833. return Ok(Async::NotReady);
  834. }
  835. }
  836. impl Read for AudioFileStreaming {
  837. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  838. let offset = self.position as usize;
  839. if offset >= self.shared.file_size {
  840. return Ok(0);
  841. }
  842. let length = min(output.len(), self.shared.file_size - offset);
  843. let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
  844. DownloadStrategy::RandomAccess() => length,
  845. DownloadStrategy::Streaming() => {
  846. // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
  847. let ping_time_seconds =
  848. 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  849. let length_to_request = length
  850. + max(
  851. (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
  852. as usize,
  853. (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  854. * ping_time_seconds
  855. * self.shared.stream_data_rate as f64) as usize,
  856. );
  857. min(length_to_request, self.shared.file_size - offset)
  858. }
  859. };
  860. let mut ranges_to_request = RangeSet::new();
  861. ranges_to_request.add_range(&Range::new(offset, length_to_request));
  862. let mut download_status = self.shared.download_status.lock().unwrap();
  863. ranges_to_request.subtract_range_set(&download_status.downloaded);
  864. ranges_to_request.subtract_range_set(&download_status.requested);
  865. for range in ranges_to_request.iter() {
  866. self.stream_loader_command_tx
  867. .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
  868. .unwrap();
  869. }
  870. if length == 0 {
  871. return Ok(0);
  872. }
  873. let mut download_message_printed = false;
  874. while !download_status.downloaded.contains(offset) {
  875. if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
  876. if !download_message_printed {
  877. debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
  878. download_message_printed = true;
  879. }
  880. }
  881. download_status = self
  882. .shared
  883. .cond
  884. .wait_timeout(download_status, Duration::from_millis(1000))
  885. .unwrap()
  886. .0;
  887. }
  888. let available_length = download_status
  889. .downloaded
  890. .contained_length_from_value(offset);
  891. assert!(available_length > 0);
  892. drop(download_status);
  893. self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
  894. let read_len = min(length, available_length);
  895. let read_len = self.read_file.read(&mut output[..read_len])?;
  896. if download_message_printed {
  897. debug!(
  898. "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
  899. offset,
  900. read_len,
  901. output.len()
  902. );
  903. }
  904. self.position += read_len as u64;
  905. self.shared
  906. .read_position
  907. .store(self.position as usize, atomic::Ordering::Relaxed);
  908. return Ok(read_len);
  909. }
  910. }
  911. impl Seek for AudioFileStreaming {
  912. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  913. self.position = self.read_file.seek(pos)?;
  914. // Do not seek past EOF
  915. self.shared
  916. .read_position
  917. .store(self.position as usize, atomic::Ordering::Relaxed);
  918. Ok(self.position)
  919. }
  920. }
  921. impl Read for AudioFile {
  922. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  923. match *self {
  924. AudioFile::Cached(ref mut file) => file.read(output),
  925. AudioFile::Streaming(ref mut file) => file.read(output),
  926. }
  927. }
  928. }
  929. impl Seek for AudioFile {
  930. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  931. match *self {
  932. AudioFile::Cached(ref mut file) => file.seek(pos),
  933. AudioFile::Streaming(ref mut file) => file.seek(pos),
  934. }
  935. }
  936. }