fetch.rs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. use crate::range_set::{Range, RangeSet};
  2. use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
  3. use bytes::Bytes;
  4. use futures::sync::{mpsc, oneshot};
  5. use futures::Stream;
  6. use futures::{Async, Future, Poll};
  7. use std::cmp::{max, min};
  8. use std::fs;
  9. use std::io::{self, Read, Seek, SeekFrom, Write};
  10. use std::sync::{Arc, Condvar, Mutex};
  11. use std::time::{Duration, Instant};
  12. use tempfile::NamedTempFile;
  13. use futures::sync::mpsc::unbounded;
  14. use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  15. use librespot_core::session::Session;
  16. use librespot_core::spotify_id::FileId;
  17. use std::sync::atomic;
  18. use std::sync::atomic::AtomicUsize;
  19. const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
  20. // The minimum size of a block that is requested from the Spotify servers in one request.
  21. // This is the block size that is typically requested while doing a seek() on a file.
  22. // Note: smaller requests can happen if part of the block is downloaded already.
  23. const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
  24. // The amount of data that is requested when initially opening a file.
  25. // Note: if the file is opened to play from the beginning, the amount of data to
  26. // read ahead is requested in addition to this amount. If the file is opened to seek to
  27. // another position, then only this amount is requested on the first request.
  28. const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
  29. // The pig time that is used for calculations before a ping time was actually measured.
  30. const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
  31. // If the measured ping time to the Spotify server is larger than this value, it is capped
  32. // to avoid run-away block sizes and pre-fetching.
  33. pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
  34. // Before playback starts, this many seconds of data must be present.
  35. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  36. // of audio data may be larger or smaller.
  37. pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
  38. // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  39. // time to the Spotify server.
  40. // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
  41. // obeyed.
  42. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  43. // of audio data may be larger or smaller.
  44. pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
  45. // While playing back, this many seconds of data ahead of the current read position are
  46. // requested.
  47. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  48. // of audio data may be larger or smaller.
  49. pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
  50. // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  51. // time to the Spotify server.
  52. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  53. // of audio data may be larger or smaller.
  54. const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
  55. // If the amount of data that is pending (requested but not received) is less than a certain amount,
  56. // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
  57. // data is calculated as
  58. // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
  59. const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
  60. // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
  61. // The formula used is
  62. // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
  63. // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
  64. // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
  65. // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
  66. // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
  67. // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
  68. const MAX_PREFETCH_REQUESTS: usize = 4;
  69. // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
  70. // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
  71. // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
  72. // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
  73. pub enum AudioFile {
  74. Cached(fs::File),
  75. Streaming(AudioFileStreaming),
  76. }
  77. pub enum AudioFileOpen {
  78. Cached(Option<fs::File>),
  79. Streaming(AudioFileOpenStreaming),
  80. }
  81. pub struct AudioFileOpenStreaming {
  82. session: Session,
  83. initial_data_rx: Option<ChannelData>,
  84. initial_data_length: Option<usize>,
  85. initial_request_sent_time: Instant,
  86. headers: ChannelHeaders,
  87. file_id: FileId,
  88. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  89. streaming_data_rate: usize,
  90. }
  91. enum StreamLoaderCommand {
  92. Fetch(Range), // signal the stream loader to fetch a range of the file
  93. RandomAccessMode(), // optimise download strategy for random access
  94. StreamMode(), // optimise download strategy for streaming
  95. Close(), // terminate and don't load any more data
  96. }
  97. #[derive(Clone)]
  98. pub struct StreamLoaderController {
  99. channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
  100. stream_shared: Option<Arc<AudioFileShared>>,
  101. file_size: usize,
  102. }
  103. impl StreamLoaderController {
  104. pub fn len(&self) -> usize {
  105. return self.file_size;
  106. }
  107. pub fn range_available(&self, range: Range) -> bool {
  108. if let Some(ref shared) = self.stream_shared {
  109. let download_status = shared.download_status.lock().unwrap();
  110. if range.length
  111. <= download_status
  112. .downloaded
  113. .contained_length_from_value(range.start)
  114. {
  115. return true;
  116. } else {
  117. return false;
  118. }
  119. } else {
  120. if range.length <= self.len() - range.start {
  121. return true;
  122. } else {
  123. return false;
  124. }
  125. }
  126. }
  127. pub fn range_to_end_available(&self) -> bool {
  128. if let Some(ref shared) = self.stream_shared {
  129. let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
  130. self.range_available(Range::new(read_position, self.len() - read_position))
  131. } else {
  132. true
  133. }
  134. }
  135. pub fn ping_time_ms(&self) -> usize {
  136. if let Some(ref shared) = self.stream_shared {
  137. return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
  138. } else {
  139. return 0;
  140. }
  141. }
  142. fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
  143. if let Some(ref mut channel) = self.channel_tx {
  144. // ignore the error in case the channel has been closed already.
  145. let _ = channel.unbounded_send(command);
  146. }
  147. }
  148. pub fn fetch(&mut self, range: Range) {
  149. // signal the stream loader to fetch a range of the file
  150. self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
  151. }
  152. pub fn fetch_blocking(&mut self, mut range: Range) {
  153. // signal the stream loader to tech a range of the file and block until it is loaded.
  154. // ensure the range is within the file's bounds.
  155. if range.start >= self.len() {
  156. range.length = 0;
  157. } else if range.end() > self.len() {
  158. range.length = self.len() - range.start;
  159. }
  160. self.fetch(range);
  161. if let Some(ref shared) = self.stream_shared {
  162. let mut download_status = shared.download_status.lock().unwrap();
  163. while range.length
  164. > download_status
  165. .downloaded
  166. .contained_length_from_value(range.start)
  167. {
  168. download_status = shared
  169. .cond
  170. .wait_timeout(download_status, Duration::from_millis(1000))
  171. .unwrap()
  172. .0;
  173. if range.length
  174. > (download_status
  175. .downloaded
  176. .union(&download_status.requested)
  177. .contained_length_from_value(range.start))
  178. {
  179. // For some reason, the requested range is neither downloaded nor requested.
  180. // This could be due to a network error. Request it again.
  181. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
  182. if let Some(ref mut channel) = self.channel_tx {
  183. // ignore the error in case the channel has been closed already.
  184. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
  185. }
  186. }
  187. }
  188. }
  189. }
  190. pub fn fetch_next(&mut self, length: usize) {
  191. let range: Range = if let Some(ref shared) = self.stream_shared {
  192. Range {
  193. start: shared.read_position.load(atomic::Ordering::Relaxed),
  194. length: length,
  195. }
  196. } else {
  197. return;
  198. };
  199. self.fetch(range);
  200. }
  201. pub fn fetch_next_blocking(&mut self, length: usize) {
  202. let range: Range = if let Some(ref shared) = self.stream_shared {
  203. Range {
  204. start: shared.read_position.load(atomic::Ordering::Relaxed),
  205. length: length,
  206. }
  207. } else {
  208. return;
  209. };
  210. self.fetch_blocking(range);
  211. }
  212. pub fn set_random_access_mode(&mut self) {
  213. // optimise download strategy for random access
  214. self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
  215. }
  216. pub fn set_stream_mode(&mut self) {
  217. // optimise download strategy for streaming
  218. self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
  219. }
  220. pub fn close(&mut self) {
  221. // terminate stream loading and don't load any more data for this file.
  222. self.send_stream_loader_command(StreamLoaderCommand::Close());
  223. }
  224. }
  225. pub struct AudioFileStreaming {
  226. read_file: fs::File,
  227. position: u64,
  228. stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
  229. shared: Arc<AudioFileShared>,
  230. }
  231. struct AudioFileDownloadStatus {
  232. requested: RangeSet,
  233. downloaded: RangeSet,
  234. }
  235. #[derive(Copy, Clone)]
  236. enum DownloadStrategy {
  237. RandomAccess(),
  238. Streaming(),
  239. }
  240. struct AudioFileShared {
  241. file_id: FileId,
  242. file_size: usize,
  243. stream_data_rate: usize,
  244. cond: Condvar,
  245. download_status: Mutex<AudioFileDownloadStatus>,
  246. download_strategy: Mutex<DownloadStrategy>,
  247. number_of_open_requests: AtomicUsize,
  248. ping_time_ms: AtomicUsize,
  249. read_position: AtomicUsize,
  250. }
  251. impl AudioFileOpenStreaming {
  252. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  253. let shared = Arc::new(AudioFileShared {
  254. file_id: self.file_id,
  255. file_size: size,
  256. stream_data_rate: self.streaming_data_rate,
  257. cond: Condvar::new(),
  258. download_status: Mutex::new(AudioFileDownloadStatus {
  259. requested: RangeSet::new(),
  260. downloaded: RangeSet::new(),
  261. }),
  262. download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
  263. number_of_open_requests: AtomicUsize::new(0),
  264. ping_time_ms: AtomicUsize::new(0),
  265. read_position: AtomicUsize::new(0),
  266. });
  267. let mut write_file = NamedTempFile::new().unwrap();
  268. write_file.as_file().set_len(size as u64).unwrap();
  269. write_file.seek(SeekFrom::Start(0)).unwrap();
  270. let read_file = write_file.reopen().unwrap();
  271. let initial_data_rx = self.initial_data_rx.take().unwrap();
  272. let initial_data_length = self.initial_data_length.take().unwrap();
  273. let complete_tx = self.complete_tx.take().unwrap();
  274. //let (seek_tx, seek_rx) = mpsc::unbounded();
  275. let (stream_loader_command_tx, stream_loader_command_rx) =
  276. mpsc::unbounded::<StreamLoaderCommand>();
  277. let fetcher = AudioFileFetch::new(
  278. self.session.clone(),
  279. shared.clone(),
  280. initial_data_rx,
  281. self.initial_request_sent_time,
  282. initial_data_length,
  283. write_file,
  284. stream_loader_command_rx,
  285. complete_tx,
  286. );
  287. self.session.spawn(move |_| fetcher);
  288. AudioFileStreaming {
  289. read_file: read_file,
  290. position: 0,
  291. //seek: seek_tx,
  292. stream_loader_command_tx: stream_loader_command_tx,
  293. shared: shared,
  294. }
  295. }
  296. }
  297. impl Future for AudioFileOpen {
  298. type Item = AudioFile;
  299. type Error = ChannelError;
  300. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  301. match *self {
  302. AudioFileOpen::Streaming(ref mut open) => {
  303. let file = try_ready!(open.poll());
  304. Ok(Async::Ready(AudioFile::Streaming(file)))
  305. }
  306. AudioFileOpen::Cached(ref mut file) => {
  307. let file = file.take().unwrap();
  308. Ok(Async::Ready(AudioFile::Cached(file)))
  309. }
  310. }
  311. }
  312. }
  313. impl Future for AudioFileOpenStreaming {
  314. type Item = AudioFileStreaming;
  315. type Error = ChannelError;
  316. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  317. loop {
  318. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  319. if id == 0x3 {
  320. let size = BigEndian::read_u32(&data) as usize * 4;
  321. let file = self.finish(size);
  322. return Ok(Async::Ready(file));
  323. }
  324. }
  325. }
  326. }
  327. impl AudioFile {
  328. pub fn open(
  329. session: &Session,
  330. file_id: FileId,
  331. bytes_per_second: usize,
  332. play_from_beginning: bool,
  333. ) -> AudioFileOpen {
  334. let cache = session.cache().cloned();
  335. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  336. debug!("File {} already in cache", file_id);
  337. return AudioFileOpen::Cached(Some(file));
  338. }
  339. debug!("Downloading file {}", file_id);
  340. let (complete_tx, complete_rx) = oneshot::channel();
  341. let mut initial_data_length = if play_from_beginning {
  342. INITIAL_DOWNLOAD_SIZE
  343. + max(
  344. (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
  345. (INITIAL_PING_TIME_ESTIMATE_SECONDS
  346. * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  347. * bytes_per_second as f64) as usize,
  348. )
  349. } else {
  350. INITIAL_DOWNLOAD_SIZE
  351. };
  352. if initial_data_length % 4 != 0 {
  353. initial_data_length += 4 - (initial_data_length % 4);
  354. }
  355. let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
  356. let open = AudioFileOpenStreaming {
  357. session: session.clone(),
  358. file_id: file_id,
  359. headers: headers,
  360. initial_data_rx: Some(data),
  361. initial_data_length: Some(initial_data_length),
  362. initial_request_sent_time: Instant::now(),
  363. complete_tx: Some(complete_tx),
  364. streaming_data_rate: bytes_per_second,
  365. };
  366. let session_ = session.clone();
  367. session.spawn(move |_| {
  368. complete_rx
  369. .map(move |mut file| {
  370. if let Some(cache) = session_.cache() {
  371. cache.save_file(file_id, &mut file);
  372. debug!("File {} complete, saving to cache", file_id);
  373. } else {
  374. debug!("File {} complete", file_id);
  375. }
  376. })
  377. .or_else(|oneshot::Canceled| Ok(()))
  378. });
  379. return AudioFileOpen::Streaming(open);
  380. }
  381. pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
  382. match self {
  383. AudioFile::Streaming(ref stream) => {
  384. return StreamLoaderController {
  385. channel_tx: Some(stream.stream_loader_command_tx.clone()),
  386. stream_shared: Some(stream.shared.clone()),
  387. file_size: stream.shared.file_size,
  388. };
  389. }
  390. AudioFile::Cached(ref file) => {
  391. return StreamLoaderController {
  392. channel_tx: None,
  393. stream_shared: None,
  394. file_size: file.metadata().unwrap().len() as usize,
  395. };
  396. }
  397. }
  398. }
  399. }
  400. fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
  401. assert!(
  402. offset % 4 == 0,
  403. "Range request start positions must be aligned by 4 bytes."
  404. );
  405. assert!(
  406. length % 4 == 0,
  407. "Range request range lengths must be aligned by 4 bytes."
  408. );
  409. let start = offset / 4;
  410. let end = (offset + length) / 4;
  411. let (id, channel) = session.channel().allocate();
  412. let mut data: Vec<u8> = Vec::new();
  413. data.write_u16::<BigEndian>(id).unwrap();
  414. data.write_u8(0).unwrap();
  415. data.write_u8(1).unwrap();
  416. data.write_u16::<BigEndian>(0x0000).unwrap();
  417. data.write_u32::<BigEndian>(0x00000000).unwrap();
  418. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  419. data.write_u32::<BigEndian>(0x00020000).unwrap();
  420. data.write(&file.0).unwrap();
  421. data.write_u32::<BigEndian>(start as u32).unwrap();
  422. data.write_u32::<BigEndian>(end as u32).unwrap();
  423. session.send_packet(0x8, data);
  424. channel
  425. }
  426. struct PartialFileData {
  427. offset: usize,
  428. data: Bytes,
  429. }
  430. enum ReceivedData {
  431. ResponseTimeMs(usize),
  432. Data(PartialFileData),
  433. }
  434. struct AudioFileFetchDataReceiver {
  435. shared: Arc<AudioFileShared>,
  436. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  437. data_rx: ChannelData,
  438. initial_data_offset: usize,
  439. initial_request_length: usize,
  440. data_offset: usize,
  441. request_length: usize,
  442. request_sent_time: Option<Instant>,
  443. measure_ping_time: bool,
  444. }
  445. impl AudioFileFetchDataReceiver {
  446. fn new(
  447. shared: Arc<AudioFileShared>,
  448. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  449. data_rx: ChannelData,
  450. data_offset: usize,
  451. request_length: usize,
  452. request_sent_time: Instant,
  453. ) -> AudioFileFetchDataReceiver {
  454. let measure_ping_time = shared
  455. .number_of_open_requests
  456. .load(atomic::Ordering::SeqCst)
  457. == 0;
  458. shared
  459. .number_of_open_requests
  460. .fetch_add(1, atomic::Ordering::SeqCst);
  461. AudioFileFetchDataReceiver {
  462. shared: shared,
  463. data_rx: data_rx,
  464. file_data_tx: file_data_tx,
  465. initial_data_offset: data_offset,
  466. initial_request_length: request_length,
  467. data_offset: data_offset,
  468. request_length: request_length,
  469. request_sent_time: Some(request_sent_time),
  470. measure_ping_time: measure_ping_time,
  471. }
  472. }
  473. }
  474. impl AudioFileFetchDataReceiver {
  475. fn finish(&mut self) {
  476. if self.request_length > 0 {
  477. let missing_range = Range::new(self.data_offset, self.request_length);
  478. let mut download_status = self.shared.download_status.lock().unwrap();
  479. download_status.requested.subtract_range(&missing_range);
  480. self.shared.cond.notify_all();
  481. }
  482. self.shared
  483. .number_of_open_requests
  484. .fetch_sub(1, atomic::Ordering::SeqCst);
  485. }
  486. }
  487. impl Future for AudioFileFetchDataReceiver {
  488. type Item = ();
  489. type Error = ();
  490. fn poll(&mut self) -> Poll<(), ()> {
  491. loop {
  492. match self.data_rx.poll() {
  493. Ok(Async::Ready(Some(data))) => {
  494. if self.measure_ping_time {
  495. if let Some(request_sent_time) = self.request_sent_time {
  496. let duration = Instant::now() - request_sent_time;
  497. let duration_ms: u64;
  498. if 0.001 * (duration.as_millis() as f64)
  499. > MAXIMUM_ASSUMED_PING_TIME_SECONDS
  500. {
  501. duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
  502. } else {
  503. duration_ms = duration.as_millis() as u64;
  504. }
  505. let _ = self
  506. .file_data_tx
  507. .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
  508. self.measure_ping_time = false;
  509. }
  510. }
  511. let data_size = data.len();
  512. let _ = self
  513. .file_data_tx
  514. .unbounded_send(ReceivedData::Data(PartialFileData {
  515. offset: self.data_offset,
  516. data: data,
  517. }));
  518. self.data_offset += data_size;
  519. if self.request_length < data_size {
  520. warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
  521. self.request_length = 0;
  522. } else {
  523. self.request_length -= data_size;
  524. }
  525. if self.request_length == 0 {
  526. self.finish();
  527. return Ok(Async::Ready(()));
  528. }
  529. }
  530. Ok(Async::Ready(None)) => {
  531. if self.request_length > 0 {
  532. warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
  533. }
  534. self.finish();
  535. return Ok(Async::Ready(()));
  536. }
  537. Ok(Async::NotReady) => {
  538. return Ok(Async::NotReady);
  539. }
  540. Err(ChannelError) => {
  541. warn!(
  542. "Error from channel for data receiver for range {} (+{}).",
  543. self.initial_data_offset, self.initial_request_length
  544. );
  545. self.finish();
  546. return Ok(Async::Ready(()));
  547. }
  548. }
  549. }
  550. }
  551. }
  552. struct AudioFileFetch {
  553. session: Session,
  554. shared: Arc<AudioFileShared>,
  555. output: Option<NamedTempFile>,
  556. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  557. file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
  558. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  559. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  560. network_response_times_ms: Vec<usize>,
  561. }
  562. impl AudioFileFetch {
  563. fn new(
  564. session: Session,
  565. shared: Arc<AudioFileShared>,
  566. initial_data_rx: ChannelData,
  567. initial_request_sent_time: Instant,
  568. initial_data_length: usize,
  569. output: NamedTempFile,
  570. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  571. complete_tx: oneshot::Sender<NamedTempFile>,
  572. ) -> AudioFileFetch {
  573. let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
  574. {
  575. let requested_range = Range::new(0, initial_data_length);
  576. let mut download_status = shared.download_status.lock().unwrap();
  577. download_status.requested.add_range(&requested_range);
  578. }
  579. let initial_data_receiver = AudioFileFetchDataReceiver::new(
  580. shared.clone(),
  581. file_data_tx.clone(),
  582. initial_data_rx,
  583. 0,
  584. initial_data_length,
  585. initial_request_sent_time,
  586. );
  587. session.spawn(move |_| initial_data_receiver);
  588. AudioFileFetch {
  589. session: session,
  590. shared: shared,
  591. output: Some(output),
  592. file_data_tx: file_data_tx,
  593. file_data_rx: file_data_rx,
  594. stream_loader_command_rx: stream_loader_command_rx,
  595. complete_tx: Some(complete_tx),
  596. network_response_times_ms: Vec::new(),
  597. }
  598. }
  599. fn get_download_strategy(&mut self) -> DownloadStrategy {
  600. *(self.shared.download_strategy.lock().unwrap())
  601. }
  602. fn download_range(&mut self, mut offset: usize, mut length: usize) {
  603. if length < MINIMUM_DOWNLOAD_SIZE {
  604. length = MINIMUM_DOWNLOAD_SIZE;
  605. }
  606. // ensure the values are within the bounds and align them by 4 for the spotify protocol.
  607. if offset >= self.shared.file_size {
  608. return;
  609. }
  610. if length <= 0 {
  611. return;
  612. }
  613. if offset + length > self.shared.file_size {
  614. length = self.shared.file_size - offset;
  615. }
  616. if offset % 4 != 0 {
  617. length += offset % 4;
  618. offset -= offset % 4;
  619. }
  620. if length % 4 != 0 {
  621. length += 4 - (length % 4);
  622. }
  623. let mut ranges_to_request = RangeSet::new();
  624. ranges_to_request.add_range(&Range::new(offset, length));
  625. let mut download_status = self.shared.download_status.lock().unwrap();
  626. ranges_to_request.subtract_range_set(&download_status.downloaded);
  627. ranges_to_request.subtract_range_set(&download_status.requested);
  628. for range in ranges_to_request.iter() {
  629. let (_headers, data) = request_range(
  630. &self.session,
  631. self.shared.file_id,
  632. range.start,
  633. range.length,
  634. )
  635. .split();
  636. download_status.requested.add_range(range);
  637. let receiver = AudioFileFetchDataReceiver::new(
  638. self.shared.clone(),
  639. self.file_data_tx.clone(),
  640. data,
  641. range.start,
  642. range.length,
  643. Instant::now(),
  644. );
  645. self.session.spawn(move |_| receiver);
  646. }
  647. }
  648. fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
  649. let mut bytes_to_go = bytes;
  650. let mut requests_to_go = max_requests_to_send;
  651. while bytes_to_go > 0 && requests_to_go > 0 {
  652. // determine what is still missing
  653. let mut missing_data = RangeSet::new();
  654. missing_data.add_range(&Range::new(0, self.shared.file_size));
  655. {
  656. let download_status = self.shared.download_status.lock().unwrap();
  657. missing_data.subtract_range_set(&download_status.downloaded);
  658. missing_data.subtract_range_set(&download_status.requested);
  659. }
  660. // download data from after the current read position first
  661. let mut tail_end = RangeSet::new();
  662. let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
  663. tail_end.add_range(&Range::new(
  664. read_position,
  665. self.shared.file_size - read_position,
  666. ));
  667. let tail_end = tail_end.intersection(&missing_data);
  668. if !tail_end.is_empty() {
  669. let range = tail_end.get_range(0);
  670. let offset = range.start;
  671. let length = min(range.length, bytes_to_go);
  672. self.download_range(offset, length);
  673. requests_to_go -= 1;
  674. bytes_to_go -= length;
  675. } else if !missing_data.is_empty() {
  676. // ok, the tail is downloaded, download something fom the beginning.
  677. let range = missing_data.get_range(0);
  678. let offset = range.start;
  679. let length = min(range.length, bytes_to_go);
  680. self.download_range(offset, length);
  681. requests_to_go -= 1;
  682. bytes_to_go -= length;
  683. } else {
  684. return;
  685. }
  686. }
  687. }
  688. fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
  689. loop {
  690. match self.file_data_rx.poll() {
  691. Ok(Async::Ready(None)) => {
  692. return Ok(Async::Ready(()));
  693. }
  694. Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
  695. trace!("Ping time estimated as: {} ms.", response_time_ms);
  696. // record the response time
  697. self.network_response_times_ms.push(response_time_ms);
  698. // prune old response times. Keep at most three.
  699. while self.network_response_times_ms.len() > 3 {
  700. self.network_response_times_ms.remove(0);
  701. }
  702. // stats::median is experimental. So we calculate the median of up to three ourselves.
  703. let ping_time_ms: usize = match self.network_response_times_ms.len() {
  704. 1 => self.network_response_times_ms[0] as usize,
  705. 2 => {
  706. ((self.network_response_times_ms[0]
  707. + self.network_response_times_ms[1])
  708. / 2) as usize
  709. }
  710. 3 => {
  711. let mut times = self.network_response_times_ms.clone();
  712. times.sort();
  713. times[1]
  714. }
  715. _ => unreachable!(),
  716. };
  717. // store our new estimate for everyone to see
  718. self.shared
  719. .ping_time_ms
  720. .store(ping_time_ms, atomic::Ordering::Relaxed);
  721. }
  722. Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
  723. self.output
  724. .as_mut()
  725. .unwrap()
  726. .seek(SeekFrom::Start(data.offset as u64))
  727. .unwrap();
  728. self.output
  729. .as_mut()
  730. .unwrap()
  731. .write_all(data.data.as_ref())
  732. .unwrap();
  733. let mut full = false;
  734. {
  735. let mut download_status = self.shared.download_status.lock().unwrap();
  736. let received_range = Range::new(data.offset, data.data.len());
  737. download_status.downloaded.add_range(&received_range);
  738. self.shared.cond.notify_all();
  739. if download_status.downloaded.contained_length_from_value(0)
  740. >= self.shared.file_size
  741. {
  742. full = true;
  743. }
  744. drop(download_status);
  745. }
  746. if full {
  747. self.finish();
  748. return Ok(Async::Ready(()));
  749. }
  750. }
  751. Ok(Async::NotReady) => {
  752. return Ok(Async::NotReady);
  753. }
  754. Err(()) => unreachable!(),
  755. }
  756. }
  757. }
  758. fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
  759. loop {
  760. match self.stream_loader_command_rx.poll() {
  761. Ok(Async::Ready(None)) => {
  762. return Ok(Async::Ready(()));
  763. }
  764. Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
  765. self.download_range(request.start, request.length);
  766. }
  767. Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
  768. *(self.shared.download_strategy.lock().unwrap()) =
  769. DownloadStrategy::RandomAccess();
  770. }
  771. Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
  772. *(self.shared.download_strategy.lock().unwrap()) =
  773. DownloadStrategy::Streaming();
  774. }
  775. Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
  776. return Ok(Async::Ready(()));
  777. }
  778. Ok(Async::NotReady) => return Ok(Async::NotReady),
  779. Err(()) => unreachable!(),
  780. }
  781. }
  782. }
  783. fn finish(&mut self) {
  784. let mut output = self.output.take().unwrap();
  785. let complete_tx = self.complete_tx.take().unwrap();
  786. output.seek(SeekFrom::Start(0)).unwrap();
  787. let _ = complete_tx.send(output);
  788. }
  789. }
  790. impl Future for AudioFileFetch {
  791. type Item = ();
  792. type Error = ();
  793. fn poll(&mut self) -> Poll<(), ()> {
  794. match self.poll_stream_loader_command_rx() {
  795. Ok(Async::NotReady) => (),
  796. Ok(Async::Ready(_)) => {
  797. return Ok(Async::Ready(()));
  798. }
  799. Err(()) => unreachable!(),
  800. }
  801. match self.poll_file_data_rx() {
  802. Ok(Async::NotReady) => (),
  803. Ok(Async::Ready(_)) => {
  804. return Ok(Async::Ready(()));
  805. }
  806. Err(()) => unreachable!(),
  807. }
  808. if let DownloadStrategy::Streaming() = self.get_download_strategy() {
  809. let number_of_open_requests = self
  810. .shared
  811. .number_of_open_requests
  812. .load(atomic::Ordering::SeqCst);
  813. let max_requests_to_send =
  814. MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
  815. if max_requests_to_send > 0 {
  816. let bytes_pending: usize = {
  817. let download_status = self.shared.download_status.lock().unwrap();
  818. download_status
  819. .requested
  820. .minus(&download_status.downloaded)
  821. .len()
  822. };
  823. let ping_time_seconds =
  824. 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  825. let download_rate = self.session.channel().get_download_rate_estimate();
  826. let desired_pending_bytes = max(
  827. (PREFETCH_THRESHOLD_FACTOR
  828. * ping_time_seconds
  829. * self.shared.stream_data_rate as f64) as usize,
  830. (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
  831. as usize,
  832. );
  833. if bytes_pending < desired_pending_bytes {
  834. self.pre_fetch_more_data(
  835. desired_pending_bytes - bytes_pending,
  836. max_requests_to_send,
  837. );
  838. }
  839. }
  840. }
  841. return Ok(Async::NotReady);
  842. }
  843. }
  844. impl Read for AudioFileStreaming {
  845. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  846. let offset = self.position as usize;
  847. if offset >= self.shared.file_size {
  848. return Ok(0);
  849. }
  850. let length = min(output.len(), self.shared.file_size - offset);
  851. let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
  852. DownloadStrategy::RandomAccess() => length,
  853. DownloadStrategy::Streaming() => {
  854. // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
  855. let ping_time_seconds =
  856. 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  857. let length_to_request = length
  858. + max(
  859. (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
  860. as usize,
  861. (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  862. * ping_time_seconds
  863. * self.shared.stream_data_rate as f64) as usize,
  864. );
  865. min(length_to_request, self.shared.file_size - offset)
  866. }
  867. };
  868. let mut ranges_to_request = RangeSet::new();
  869. ranges_to_request.add_range(&Range::new(offset, length_to_request));
  870. let mut download_status = self.shared.download_status.lock().unwrap();
  871. ranges_to_request.subtract_range_set(&download_status.downloaded);
  872. ranges_to_request.subtract_range_set(&download_status.requested);
  873. for range in ranges_to_request.iter() {
  874. self.stream_loader_command_tx
  875. .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
  876. .unwrap();
  877. }
  878. if length == 0 {
  879. return Ok(0);
  880. }
  881. let mut download_message_printed = false;
  882. while !download_status.downloaded.contains(offset) {
  883. if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
  884. if !download_message_printed {
  885. debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
  886. download_message_printed = true;
  887. }
  888. }
  889. download_status = self
  890. .shared
  891. .cond
  892. .wait_timeout(download_status, Duration::from_millis(1000))
  893. .unwrap()
  894. .0;
  895. }
  896. let available_length = download_status
  897. .downloaded
  898. .contained_length_from_value(offset);
  899. assert!(available_length > 0);
  900. drop(download_status);
  901. self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
  902. let read_len = min(length, available_length);
  903. let read_len = self.read_file.read(&mut output[..read_len])?;
  904. if download_message_printed {
  905. debug!(
  906. "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
  907. offset,
  908. read_len,
  909. output.len()
  910. );
  911. }
  912. self.position += read_len as u64;
  913. self.shared
  914. .read_position
  915. .store(self.position as usize, atomic::Ordering::Relaxed);
  916. return Ok(read_len);
  917. }
  918. }
  919. impl Seek for AudioFileStreaming {
  920. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  921. self.position = self.read_file.seek(pos)?;
  922. // Do not seek past EOF
  923. self.shared
  924. .read_position
  925. .store(self.position as usize, atomic::Ordering::Relaxed);
  926. Ok(self.position)
  927. }
  928. }
  929. impl Read for AudioFile {
  930. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  931. match *self {
  932. AudioFile::Cached(ref mut file) => file.read(output),
  933. AudioFile::Streaming(ref mut file) => file.read(output),
  934. }
  935. }
  936. }
  937. impl Seek for AudioFile {
  938. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  939. match *self {
  940. AudioFile::Cached(ref mut file) => file.seek(pos),
  941. AudioFile::Streaming(ref mut file) => file.seek(pos),
  942. }
  943. }
  944. }