fetch.rs 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059
  1. use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
  2. use bytes::Bytes;
  3. use futures::sync::{mpsc, oneshot};
  4. use futures::Stream;
  5. use futures::{Async, Future, Poll};
  6. use range_set::{Range, RangeSet};
  7. use std::cmp::{max, min};
  8. use std::fs;
  9. use std::io::{self, Read, Seek, SeekFrom, Write};
  10. use std::sync::{Arc, Condvar, Mutex};
  11. use std::time::{Duration, Instant};
  12. use tempfile::NamedTempFile;
  13. use futures::sync::mpsc::unbounded;
  14. use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  15. use librespot_core::session::Session;
  16. use librespot_core::spotify_id::FileId;
  17. use std::sync::atomic;
  18. use std::sync::atomic::AtomicUsize;
  19. const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
  20. // The minimum size of a block that is requested from the Spotify servers in one request.
  21. // This is the block size that is typically requested while doing a seek() on a file.
  22. // Note: smaller requests can happen if part of the block is downloaded already.
  23. const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
  24. // The amount of data that is requested when initially opening a file.
  25. // Note: if the file is opened to play from the beginning, the amount of data to
  26. // read ahead is requested in addition to this amount. If the file is opened to seek to
  27. // another position, then only this amount is requested on the first request.
  28. const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
  29. // The pig time that is used for calculations before a ping time was actually measured.
  30. const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
  31. // If the measured ping time to the Spotify server is larger than this value, it is capped
  32. // to avoid run-away block sizes and pre-fetching.
  33. pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
  34. // Before playback starts, this many seconds of data must be present.
  35. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  36. // of audio data may be larger or smaller.
  37. pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
  38. // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  39. // time to the Spotify server.
  40. // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
  41. // obeyed.
  42. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  43. // of audio data may be larger or smaller.
  44. pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
  45. // While playing back, this many seconds of data ahead of the current read position are
  46. // requested.
  47. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  48. // of audio data may be larger or smaller.
  49. pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
  50. // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
  51. // time to the Spotify server.
  52. // Note: the calculations are done using the nominal bitrate of the file. The actual amount
  53. // of audio data may be larger or smaller.
  54. const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
  55. // If the amount of data that is pending (requested but not received) is less than a certain amount,
  56. // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
  57. // data is calculated as
  58. // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
  59. const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
  60. // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
  61. // The formula used is
  62. // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
  63. // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
  64. // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
  65. // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
  66. // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
  67. // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
  68. const MAX_PREFETCH_REQUESTS: usize = 4;
  69. // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
  70. // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
  71. // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
  72. // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
  73. pub enum AudioFile {
  74. Cached(fs::File),
  75. Streaming(AudioFileStreaming),
  76. }
  77. pub enum AudioFileOpen {
  78. Cached(Option<fs::File>),
  79. Streaming(AudioFileOpenStreaming),
  80. }
  81. pub struct AudioFileOpenStreaming {
  82. session: Session,
  83. initial_data_rx: Option<ChannelData>,
  84. initial_data_length: Option<usize>,
  85. initial_request_sent_time: Instant,
  86. headers: ChannelHeaders,
  87. file_id: FileId,
  88. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  89. streaming_data_rate: usize,
  90. }
  91. enum StreamLoaderCommand {
  92. Fetch(Range), // signal the stream loader to fetch a range of the file
  93. RandomAccessMode(), // optimise download strategy for random access
  94. StreamMode(), // optimise download strategy for streaming
  95. Close(), // terminate and don't load any more data
  96. }
  97. #[derive(Clone)]
  98. pub struct StreamLoaderController {
  99. channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
  100. stream_shared: Option<Arc<AudioFileShared>>,
  101. file_size: usize,
  102. }
  103. impl StreamLoaderController {
  104. pub fn len(&self) -> usize {
  105. return self.file_size;
  106. }
  107. pub fn range_available(&self, range: Range) -> bool {
  108. if let Some(ref shared) = self.stream_shared {
  109. let download_status = shared.download_status.lock().unwrap();
  110. if range.length
  111. <= download_status
  112. .downloaded
  113. .contained_length_from_value(range.start)
  114. {
  115. return true;
  116. } else {
  117. return false;
  118. }
  119. } else {
  120. if range.length <= self.len() - range.start {
  121. return true;
  122. } else {
  123. return false;
  124. }
  125. }
  126. }
  127. pub fn ping_time_ms(&self) -> usize {
  128. if let Some(ref shared) = self.stream_shared {
  129. return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
  130. } else {
  131. return 0;
  132. }
  133. }
  134. fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
  135. if let Some(ref mut channel) = self.channel_tx {
  136. // ignore the error in case the channel has been closed already.
  137. let _ = channel.unbounded_send(command);
  138. }
  139. }
  140. pub fn fetch(&mut self, range: Range) {
  141. // signal the stream loader to fetch a range of the file
  142. self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
  143. }
  144. pub fn fetch_blocking(&mut self, mut range: Range) {
  145. // signal the stream loader to tech a range of the file and block until it is loaded.
  146. // ensure the range is within the file's bounds.
  147. if range.start >= self.len() {
  148. range.length = 0;
  149. } else if range.end() > self.len() {
  150. range.length = self.len() - range.start;
  151. }
  152. self.fetch(range);
  153. if let Some(ref shared) = self.stream_shared {
  154. let mut download_status = shared.download_status.lock().unwrap();
  155. while range.length
  156. > download_status
  157. .downloaded
  158. .contained_length_from_value(range.start)
  159. {
  160. download_status = shared
  161. .cond
  162. .wait_timeout(download_status, Duration::from_millis(1000))
  163. .unwrap()
  164. .0;
  165. if range.length
  166. > (download_status
  167. .downloaded
  168. .union(&download_status.requested)
  169. .contained_length_from_value(range.start))
  170. {
  171. // For some reason, the requested range is neither downloaded nor requested.
  172. // This could be due to a network error. Request it again.
  173. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
  174. if let Some(ref mut channel) = self.channel_tx {
  175. // ignore the error in case the channel has been closed already.
  176. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
  177. }
  178. }
  179. }
  180. }
  181. }
  182. pub fn fetch_next(&mut self, length: usize) {
  183. let range: Range = if let Some(ref shared) = self.stream_shared {
  184. Range {
  185. start: shared.read_position.load(atomic::Ordering::Relaxed),
  186. length: length,
  187. }
  188. } else {
  189. return;
  190. };
  191. self.fetch(range);
  192. }
  193. pub fn fetch_next_blocking(&mut self, length: usize) {
  194. let range: Range = if let Some(ref shared) = self.stream_shared {
  195. Range {
  196. start: shared.read_position.load(atomic::Ordering::Relaxed),
  197. length: length,
  198. }
  199. } else {
  200. return;
  201. };
  202. self.fetch_blocking(range);
  203. }
  204. pub fn set_random_access_mode(&mut self) {
  205. // optimise download strategy for random access
  206. self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
  207. }
  208. pub fn set_stream_mode(&mut self) {
  209. // optimise download strategy for streaming
  210. self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
  211. }
  212. pub fn close(&mut self) {
  213. // terminate stream loading and don't load any more data for this file.
  214. self.send_stream_loader_command(StreamLoaderCommand::Close());
  215. }
  216. }
  217. pub struct AudioFileStreaming {
  218. read_file: fs::File,
  219. position: u64,
  220. stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
  221. shared: Arc<AudioFileShared>,
  222. }
  223. struct AudioFileDownloadStatus {
  224. requested: RangeSet,
  225. downloaded: RangeSet,
  226. }
  227. #[derive(Copy, Clone)]
  228. enum DownloadStrategy {
  229. RandomAccess(),
  230. Streaming(),
  231. }
  232. struct AudioFileShared {
  233. file_id: FileId,
  234. file_size: usize,
  235. stream_data_rate: usize,
  236. cond: Condvar,
  237. download_status: Mutex<AudioFileDownloadStatus>,
  238. download_strategy: Mutex<DownloadStrategy>,
  239. number_of_open_requests: AtomicUsize,
  240. ping_time_ms: AtomicUsize,
  241. read_position: AtomicUsize,
  242. }
  243. impl AudioFileOpenStreaming {
  244. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  245. let shared = Arc::new(AudioFileShared {
  246. file_id: self.file_id,
  247. file_size: size,
  248. stream_data_rate: self.streaming_data_rate,
  249. cond: Condvar::new(),
  250. download_status: Mutex::new(AudioFileDownloadStatus {
  251. requested: RangeSet::new(),
  252. downloaded: RangeSet::new(),
  253. }),
  254. download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
  255. number_of_open_requests: AtomicUsize::new(0),
  256. ping_time_ms: AtomicUsize::new(0),
  257. read_position: AtomicUsize::new(0),
  258. });
  259. let mut write_file = NamedTempFile::new().unwrap();
  260. write_file.as_file().set_len(size as u64).unwrap();
  261. write_file.seek(SeekFrom::Start(0)).unwrap();
  262. let read_file = write_file.reopen().unwrap();
  263. let initial_data_rx = self.initial_data_rx.take().unwrap();
  264. let initial_data_length = self.initial_data_length.take().unwrap();
  265. let complete_tx = self.complete_tx.take().unwrap();
  266. //let (seek_tx, seek_rx) = mpsc::unbounded();
  267. let (stream_loader_command_tx, stream_loader_command_rx) =
  268. mpsc::unbounded::<StreamLoaderCommand>();
  269. let fetcher = AudioFileFetch::new(
  270. self.session.clone(),
  271. shared.clone(),
  272. initial_data_rx,
  273. self.initial_request_sent_time,
  274. initial_data_length,
  275. write_file,
  276. stream_loader_command_rx,
  277. complete_tx,
  278. );
  279. self.session.spawn(move |_| fetcher);
  280. AudioFileStreaming {
  281. read_file: read_file,
  282. position: 0,
  283. //seek: seek_tx,
  284. stream_loader_command_tx: stream_loader_command_tx,
  285. shared: shared,
  286. }
  287. }
  288. }
  289. impl Future for AudioFileOpen {
  290. type Item = AudioFile;
  291. type Error = ChannelError;
  292. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  293. match *self {
  294. AudioFileOpen::Streaming(ref mut open) => {
  295. let file = try_ready!(open.poll());
  296. Ok(Async::Ready(AudioFile::Streaming(file)))
  297. }
  298. AudioFileOpen::Cached(ref mut file) => {
  299. let file = file.take().unwrap();
  300. Ok(Async::Ready(AudioFile::Cached(file)))
  301. }
  302. }
  303. }
  304. }
  305. impl Future for AudioFileOpenStreaming {
  306. type Item = AudioFileStreaming;
  307. type Error = ChannelError;
  308. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  309. loop {
  310. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  311. if id == 0x3 {
  312. let size = BigEndian::read_u32(&data) as usize * 4;
  313. let file = self.finish(size);
  314. return Ok(Async::Ready(file));
  315. }
  316. }
  317. }
  318. }
  319. impl AudioFile {
  320. pub fn open(
  321. session: &Session,
  322. file_id: FileId,
  323. bytes_per_second: usize,
  324. play_from_beginning: bool,
  325. ) -> AudioFileOpen {
  326. let cache = session.cache().cloned();
  327. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  328. debug!("File {} already in cache", file_id);
  329. return AudioFileOpen::Cached(Some(file));
  330. }
  331. debug!("Downloading file {}", file_id);
  332. let (complete_tx, complete_rx) = oneshot::channel();
  333. let mut initial_data_length = if play_from_beginning {
  334. INITIAL_DOWNLOAD_SIZE
  335. + max(
  336. (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
  337. (INITIAL_PING_TIME_ESTIMATE_SECONDS
  338. * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  339. * bytes_per_second as f64) as usize,
  340. )
  341. } else {
  342. INITIAL_DOWNLOAD_SIZE
  343. };
  344. if initial_data_length % 4 != 0 {
  345. initial_data_length += 4 - (initial_data_length % 4);
  346. }
  347. let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
  348. let open = AudioFileOpenStreaming {
  349. session: session.clone(),
  350. file_id: file_id,
  351. headers: headers,
  352. initial_data_rx: Some(data),
  353. initial_data_length: Some(initial_data_length),
  354. initial_request_sent_time: Instant::now(),
  355. complete_tx: Some(complete_tx),
  356. streaming_data_rate: bytes_per_second,
  357. };
  358. let session_ = session.clone();
  359. session.spawn(move |_| {
  360. complete_rx
  361. .map(move |mut file| {
  362. if let Some(cache) = session_.cache() {
  363. cache.save_file(file_id, &mut file);
  364. debug!("File {} complete, saving to cache", file_id);
  365. } else {
  366. debug!("File {} complete", file_id);
  367. }
  368. })
  369. .or_else(|oneshot::Canceled| Ok(()))
  370. });
  371. return AudioFileOpen::Streaming(open);
  372. }
  373. pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
  374. match self {
  375. AudioFile::Streaming(ref stream) => {
  376. return StreamLoaderController {
  377. channel_tx: Some(stream.stream_loader_command_tx.clone()),
  378. stream_shared: Some(stream.shared.clone()),
  379. file_size: stream.shared.file_size,
  380. };
  381. }
  382. AudioFile::Cached(ref file) => {
  383. return StreamLoaderController {
  384. channel_tx: None,
  385. stream_shared: None,
  386. file_size: file.metadata().unwrap().len() as usize,
  387. }
  388. }
  389. }
  390. }
  391. }
  392. fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
  393. assert!(
  394. offset % 4 == 0,
  395. "Range request start positions must be aligned by 4 bytes."
  396. );
  397. assert!(
  398. length % 4 == 0,
  399. "Range request range lengths must be aligned by 4 bytes."
  400. );
  401. let start = offset / 4;
  402. let end = (offset + length) / 4;
  403. let (id, channel) = session.channel().allocate();
  404. let mut data: Vec<u8> = Vec::new();
  405. data.write_u16::<BigEndian>(id).unwrap();
  406. data.write_u8(0).unwrap();
  407. data.write_u8(1).unwrap();
  408. data.write_u16::<BigEndian>(0x0000).unwrap();
  409. data.write_u32::<BigEndian>(0x00000000).unwrap();
  410. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  411. data.write_u32::<BigEndian>(0x00020000).unwrap();
  412. data.write(&file.0).unwrap();
  413. data.write_u32::<BigEndian>(start as u32).unwrap();
  414. data.write_u32::<BigEndian>(end as u32).unwrap();
  415. session.send_packet(0x8, data);
  416. channel
  417. }
  418. struct PartialFileData {
  419. offset: usize,
  420. data: Bytes,
  421. }
  422. enum ReceivedData {
  423. ResponseTimeMs(usize),
  424. Data(PartialFileData),
  425. }
  426. struct AudioFileFetchDataReceiver {
  427. shared: Arc<AudioFileShared>,
  428. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  429. data_rx: ChannelData,
  430. initial_data_offset: usize,
  431. initial_request_length: usize,
  432. data_offset: usize,
  433. request_length: usize,
  434. request_sent_time: Option<Instant>,
  435. measure_ping_time: bool,
  436. }
  437. impl AudioFileFetchDataReceiver {
  438. fn new(
  439. shared: Arc<AudioFileShared>,
  440. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  441. data_rx: ChannelData,
  442. data_offset: usize,
  443. request_length: usize,
  444. request_sent_time: Instant,
  445. ) -> AudioFileFetchDataReceiver {
  446. let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0;
  447. shared
  448. .number_of_open_requests
  449. .fetch_add(1, atomic::Ordering::SeqCst);
  450. AudioFileFetchDataReceiver {
  451. shared: shared,
  452. data_rx: data_rx,
  453. file_data_tx: file_data_tx,
  454. initial_data_offset: data_offset,
  455. initial_request_length: request_length,
  456. data_offset: data_offset,
  457. request_length: request_length,
  458. request_sent_time: Some(request_sent_time),
  459. measure_ping_time: measure_ping_time,
  460. }
  461. }
  462. }
  463. impl AudioFileFetchDataReceiver {
  464. fn finish(&mut self) {
  465. if self.request_length > 0 {
  466. let missing_range = Range::new(self.data_offset, self.request_length);
  467. let mut download_status = self.shared.download_status.lock().unwrap();
  468. download_status.requested.subtract_range(&missing_range);
  469. self.shared.cond.notify_all();
  470. }
  471. self.shared
  472. .number_of_open_requests
  473. .fetch_sub(1, atomic::Ordering::SeqCst);
  474. }
  475. }
  476. impl Future for AudioFileFetchDataReceiver {
  477. type Item = ();
  478. type Error = ();
  479. fn poll(&mut self) -> Poll<(), ()> {
  480. loop {
  481. match self.data_rx.poll() {
  482. Ok(Async::Ready(Some(data))) => {
  483. if self.measure_ping_time {
  484. if let Some(request_sent_time) = self.request_sent_time {
  485. let duration = Instant::now() - request_sent_time;
  486. let duration_ms: u64;
  487. if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS
  488. {
  489. duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
  490. } else {
  491. duration_ms = duration.as_millis() as u64;
  492. }
  493. let _ = self
  494. .file_data_tx
  495. .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
  496. self.measure_ping_time = false;
  497. }
  498. }
  499. let data_size = data.len();
  500. let _ = self
  501. .file_data_tx
  502. .unbounded_send(ReceivedData::Data(PartialFileData {
  503. offset: self.data_offset,
  504. data: data,
  505. }));
  506. self.data_offset += data_size;
  507. if self.request_length < data_size {
  508. warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
  509. self.request_length = 0;
  510. } else {
  511. self.request_length -= data_size;
  512. }
  513. if self.request_length == 0 {
  514. self.finish();
  515. return Ok(Async::Ready(()));
  516. }
  517. }
  518. Ok(Async::Ready(None)) => {
  519. if self.request_length > 0 {
  520. warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
  521. }
  522. self.finish();
  523. return Ok(Async::Ready(()));
  524. }
  525. Ok(Async::NotReady) => {
  526. return Ok(Async::NotReady);
  527. }
  528. Err(ChannelError) => {
  529. warn!(
  530. "Error from channel for data receiver for range {} (+{}).",
  531. self.initial_data_offset, self.initial_request_length
  532. );
  533. self.finish();
  534. return Ok(Async::Ready(()));
  535. }
  536. }
  537. }
  538. }
  539. }
  540. struct AudioFileFetch {
  541. session: Session,
  542. shared: Arc<AudioFileShared>,
  543. output: Option<NamedTempFile>,
  544. file_data_tx: mpsc::UnboundedSender<ReceivedData>,
  545. file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
  546. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  547. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  548. network_response_times_ms: Vec<usize>,
  549. }
  550. impl AudioFileFetch {
  551. fn new(
  552. session: Session,
  553. shared: Arc<AudioFileShared>,
  554. initial_data_rx: ChannelData,
  555. initial_request_sent_time: Instant,
  556. initial_data_length: usize,
  557. output: NamedTempFile,
  558. stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
  559. complete_tx: oneshot::Sender<NamedTempFile>,
  560. ) -> AudioFileFetch {
  561. let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
  562. {
  563. let requested_range = Range::new(0, initial_data_length);
  564. let mut download_status = shared.download_status.lock().unwrap();
  565. download_status.requested.add_range(&requested_range);
  566. }
  567. let initial_data_receiver = AudioFileFetchDataReceiver::new(
  568. shared.clone(),
  569. file_data_tx.clone(),
  570. initial_data_rx,
  571. 0,
  572. initial_data_length,
  573. initial_request_sent_time,
  574. );
  575. session.spawn(move |_| initial_data_receiver);
  576. AudioFileFetch {
  577. session: session,
  578. shared: shared,
  579. output: Some(output),
  580. file_data_tx: file_data_tx,
  581. file_data_rx: file_data_rx,
  582. stream_loader_command_rx: stream_loader_command_rx,
  583. complete_tx: Some(complete_tx),
  584. network_response_times_ms: Vec::new(),
  585. }
  586. }
  587. fn get_download_strategy(&mut self) -> DownloadStrategy {
  588. *(self.shared.download_strategy.lock().unwrap())
  589. }
  590. fn download_range(&mut self, mut offset: usize, mut length: usize) {
  591. if length < MINIMUM_DOWNLOAD_SIZE {
  592. length = MINIMUM_DOWNLOAD_SIZE;
  593. }
  594. // ensure the values are within the bounds and align them by 4 for the spotify protocol.
  595. if offset >= self.shared.file_size {
  596. return;
  597. }
  598. if length <= 0 {
  599. return;
  600. }
  601. if offset + length > self.shared.file_size {
  602. length = self.shared.file_size - offset;
  603. }
  604. if offset % 4 != 0 {
  605. length += offset % 4;
  606. offset -= offset % 4;
  607. }
  608. if length % 4 != 0 {
  609. length += 4 - (length % 4);
  610. }
  611. let mut ranges_to_request = RangeSet::new();
  612. ranges_to_request.add_range(&Range::new(offset, length));
  613. let mut download_status = self.shared.download_status.lock().unwrap();
  614. ranges_to_request.subtract_range_set(&download_status.downloaded);
  615. ranges_to_request.subtract_range_set(&download_status.requested);
  616. for range in ranges_to_request.iter() {
  617. let (_headers, data) =
  618. request_range(&self.session, self.shared.file_id, range.start, range.length).split();
  619. download_status.requested.add_range(range);
  620. let receiver = AudioFileFetchDataReceiver::new(
  621. self.shared.clone(),
  622. self.file_data_tx.clone(),
  623. data,
  624. range.start,
  625. range.length,
  626. Instant::now(),
  627. );
  628. self.session.spawn(move |_| receiver);
  629. }
  630. }
  631. fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
  632. let mut bytes_to_go = bytes;
  633. let mut requests_to_go = max_requests_to_send;
  634. while bytes_to_go > 0 && requests_to_go > 0 {
  635. // determine what is still missing
  636. let mut missing_data = RangeSet::new();
  637. missing_data.add_range(&Range::new(0, self.shared.file_size));
  638. {
  639. let download_status = self.shared.download_status.lock().unwrap();
  640. missing_data.subtract_range_set(&download_status.downloaded);
  641. missing_data.subtract_range_set(&download_status.requested);
  642. }
  643. // download data from after the current read position first
  644. let mut tail_end = RangeSet::new();
  645. let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
  646. tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
  647. let tail_end = tail_end.intersection(&missing_data);
  648. if !tail_end.is_empty() {
  649. let range = tail_end.get_range(0);
  650. let offset = range.start;
  651. let length = min(range.length, bytes_to_go);
  652. self.download_range(offset, length);
  653. requests_to_go -= 1;
  654. bytes_to_go -= length;
  655. } else if !missing_data.is_empty() {
  656. // ok, the tail is downloaded, download something fom the beginning.
  657. let range = missing_data.get_range(0);
  658. let offset = range.start;
  659. let length = min(range.length, bytes_to_go);
  660. self.download_range(offset, length);
  661. requests_to_go -= 1;
  662. bytes_to_go -= length;
  663. } else {
  664. return;
  665. }
  666. }
  667. }
  668. fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
  669. loop {
  670. match self.file_data_rx.poll() {
  671. Ok(Async::Ready(None)) => {
  672. return Ok(Async::Ready(()));
  673. }
  674. Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
  675. trace!("Ping time estimated as: {} ms.", response_time_ms);
  676. // record the response time
  677. self.network_response_times_ms.push(response_time_ms);
  678. // prune old response times. Keep at most three.
  679. while self.network_response_times_ms.len() > 3 {
  680. self.network_response_times_ms.remove(0);
  681. }
  682. // stats::median is experimental. So we calculate the median of up to three ourselves.
  683. let ping_time_ms: usize = match self.network_response_times_ms.len() {
  684. 1 => self.network_response_times_ms[0] as usize,
  685. 2 => {
  686. ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2)
  687. as usize
  688. }
  689. 3 => {
  690. let mut times = self.network_response_times_ms.clone();
  691. times.sort();
  692. times[1]
  693. }
  694. _ => unreachable!(),
  695. };
  696. // store our new estimate for everyone to see
  697. self.shared
  698. .ping_time_ms
  699. .store(ping_time_ms, atomic::Ordering::Relaxed);
  700. }
  701. Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
  702. self.output
  703. .as_mut()
  704. .unwrap()
  705. .seek(SeekFrom::Start(data.offset as u64))
  706. .unwrap();
  707. self.output
  708. .as_mut()
  709. .unwrap()
  710. .write_all(data.data.as_ref())
  711. .unwrap();
  712. let mut full = false;
  713. {
  714. let mut download_status = self.shared.download_status.lock().unwrap();
  715. let received_range = Range::new(data.offset, data.data.len());
  716. download_status.downloaded.add_range(&received_range);
  717. self.shared.cond.notify_all();
  718. if download_status.downloaded.contained_length_from_value(0)
  719. >= self.shared.file_size
  720. {
  721. full = true;
  722. }
  723. drop(download_status);
  724. }
  725. if full {
  726. self.finish();
  727. return Ok(Async::Ready(()));
  728. }
  729. }
  730. Ok(Async::NotReady) => {
  731. return Ok(Async::NotReady);
  732. }
  733. Err(()) => unreachable!(),
  734. }
  735. }
  736. }
  737. fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
  738. loop {
  739. match self.stream_loader_command_rx.poll() {
  740. Ok(Async::Ready(None)) => {
  741. return Ok(Async::Ready(()));
  742. }
  743. Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
  744. self.download_range(request.start, request.length);
  745. }
  746. Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
  747. *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
  748. }
  749. Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
  750. *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
  751. }
  752. Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
  753. return Ok(Async::Ready(()));
  754. }
  755. Ok(Async::NotReady) => return Ok(Async::NotReady),
  756. Err(()) => unreachable!(),
  757. }
  758. }
  759. }
  760. fn finish(&mut self) {
  761. let mut output = self.output.take().unwrap();
  762. let complete_tx = self.complete_tx.take().unwrap();
  763. output.seek(SeekFrom::Start(0)).unwrap();
  764. let _ = complete_tx.send(output);
  765. }
  766. }
  767. impl Future for AudioFileFetch {
  768. type Item = ();
  769. type Error = ();
  770. fn poll(&mut self) -> Poll<(), ()> {
  771. match self.poll_stream_loader_command_rx() {
  772. Ok(Async::NotReady) => (),
  773. Ok(Async::Ready(_)) => {
  774. return Ok(Async::Ready(()));
  775. }
  776. Err(()) => unreachable!(),
  777. }
  778. match self.poll_file_data_rx() {
  779. Ok(Async::NotReady) => (),
  780. Ok(Async::Ready(_)) => {
  781. return Ok(Async::Ready(()));
  782. }
  783. Err(()) => unreachable!(),
  784. }
  785. if let DownloadStrategy::Streaming() = self.get_download_strategy() {
  786. let number_of_open_requests =
  787. self.shared.number_of_open_requests.load(atomic::Ordering::SeqCst);
  788. let max_requests_to_send =
  789. MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
  790. if max_requests_to_send > 0 {
  791. let bytes_pending: usize = {
  792. let download_status = self.shared.download_status.lock().unwrap();
  793. download_status.requested.minus(&download_status.downloaded).len()
  794. };
  795. let ping_time_seconds =
  796. 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  797. let download_rate = self.session.channel().get_download_rate_estimate();
  798. let desired_pending_bytes = max(
  799. (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64)
  800. as usize,
  801. (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize,
  802. );
  803. if bytes_pending < desired_pending_bytes {
  804. self.pre_fetch_more_data(
  805. desired_pending_bytes - bytes_pending,
  806. max_requests_to_send,
  807. );
  808. }
  809. }
  810. }
  811. return Ok(Async::NotReady);
  812. }
  813. }
  814. impl Read for AudioFileStreaming {
  815. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  816. let offset = self.position as usize;
  817. if offset >= self.shared.file_size {
  818. return Ok(0);
  819. }
  820. let length = min(output.len(), self.shared.file_size - offset);
  821. let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
  822. DownloadStrategy::RandomAccess() => length,
  823. DownloadStrategy::Streaming() => {
  824. // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
  825. let ping_time_seconds =
  826. 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
  827. let length_to_request = length
  828. + max(
  829. (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
  830. as usize,
  831. (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
  832. * ping_time_seconds
  833. * self.shared.stream_data_rate as f64) as usize,
  834. );
  835. min(length_to_request, self.shared.file_size - offset)
  836. }
  837. };
  838. let mut ranges_to_request = RangeSet::new();
  839. ranges_to_request.add_range(&Range::new(offset, length_to_request));
  840. let mut download_status = self.shared.download_status.lock().unwrap();
  841. ranges_to_request.subtract_range_set(&download_status.downloaded);
  842. ranges_to_request.subtract_range_set(&download_status.requested);
  843. for range in ranges_to_request.iter() {
  844. self.stream_loader_command_tx
  845. .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
  846. .unwrap();
  847. }
  848. if length == 0 {
  849. return Ok(0);
  850. }
  851. let mut download_message_printed = false;
  852. while !download_status.downloaded.contains(offset) {
  853. if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
  854. if !download_message_printed {
  855. debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
  856. download_message_printed = true;
  857. }
  858. }
  859. download_status = self
  860. .shared
  861. .cond
  862. .wait_timeout(download_status, Duration::from_millis(1000))
  863. .unwrap()
  864. .0;
  865. }
  866. let available_length = download_status.downloaded.contained_length_from_value(offset);
  867. assert!(available_length > 0);
  868. drop(download_status);
  869. self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
  870. let read_len = min(length, available_length);
  871. let read_len = self.read_file.read(&mut output[..read_len])?;
  872. if download_message_printed {
  873. debug!(
  874. "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
  875. offset,
  876. read_len,
  877. output.len()
  878. );
  879. }
  880. self.position += read_len as u64;
  881. self.shared
  882. .read_position
  883. .store(self.position as usize, atomic::Ordering::Relaxed);
  884. return Ok(read_len);
  885. }
  886. }
  887. impl Seek for AudioFileStreaming {
  888. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  889. self.position = self.read_file.seek(pos)?;
  890. // Do not seek past EOF
  891. self.shared
  892. .read_position
  893. .store(self.position as usize, atomic::Ordering::Relaxed);
  894. Ok(self.position)
  895. }
  896. }
  897. impl Read for AudioFile {
  898. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  899. match *self {
  900. AudioFile::Cached(ref mut file) => file.read(output),
  901. AudioFile::Streaming(ref mut file) => file.read(output),
  902. }
  903. }
  904. }
  905. impl Seek for AudioFile {
  906. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  907. match *self {
  908. AudioFile::Cached(ref mut file) => file.seek(pos),
  909. AudioFile::Streaming(ref mut file) => file.seek(pos),
  910. }
  911. }
  912. }