Procházet zdrojové kódy

Working proof of concept with up to date gstreamer bindings and Rust 2018.

Sean McNamara před 5 roky
rodič
revize
ed04c049cc
1 změnil soubory, kde provedl 11 přidání a 40 odebrání
  1. 11 40
      playback/src/audio_backend/gstreamer.rs

+ 11 - 40
playback/src/audio_backend/gstreamer.rs

@@ -8,7 +8,7 @@ use glib::MainLoop;
 use zerocopy::*;
 
 pub struct GstreamerSink {
-    tx: SyncSender<Vec<i16>>,
+    tx: SyncSender<Vec<u8>>,
     pipeline: gst::Pipeline
 }
 
@@ -21,6 +21,7 @@ impl Open for GstreamerSink {
             Some(x) => format!("{}{}", pipeline_str_preamble, x),
             None => format!("{}{}", pipeline_str_preamble, pipeline_str_rest)
         };
+        println!("Pipeline: {}", pipeline_str);
 
         gst::init().unwrap();
         let pipelinee = gst::parse_launch(&*pipeline_str).expect("New Pipeline error");
@@ -29,47 +30,20 @@ impl Open for GstreamerSink {
         let mut mainloop = glib::MainLoop::new(None, false);
         let mut appsrce : gst::Element = pipeline.get_by_name("appsrc0").expect("Couldn't get appsrc from pipeline");
         let mut appsrc : gst_app::AppSrc = appsrce.dynamic_cast::<gst_app::AppSrc>().expect("Couldnt cast AppSrc element at runtime!");
-        //let mut appsrc = gst_app::AppSrc::new_from_element(appsrc_element.to_element());
         let bufferpool = gst::BufferPool::new();
         let appsrc_caps = appsrc.get_caps().expect("get appsrc caps failed");
         let mut conf = bufferpool.get_config();
-        conf.set_params(Some(&appsrc_caps), 2048 * 2, 0, 0);
-        if bufferpool.set_active(true).is_err(){
-            panic!("Couldn't activate buffer pool");
-        }
-
-        /*
-        thread::spawn(move || {
-        let bus_receiver = bus.receiver();
-            for message in bus_receiver.iter() {
-                match message.parse() {
-                    gst::message::StateChanged(x) =>
-                        println!("element `{}` state changed", message.src_name()),
-                    gst::message::Error(x) => {
-                        println!("error msg from element `{}`: {}, quitting", message.src_name(), error.message());
-                        break;
-                    },
-                    gst::message::Eos(ref _msg) => {
-                        println!("eos received; quitting");
-                        break;
-                    },
-                    _ =>
-                        println!("Pipe message: {} from {} at {}", message.type_name(), message.src_name(), message.timestamp())
-                }
-            }
-        });*/
+        conf.set_params(Some(&appsrc_caps), 8192, 0, 0);
+        bufferpool.set_config(conf).expect("Couldn't configure the buffer pool");
+        bufferpool.set_active(true).expect("Couldn't activate buffer pool");
 
-        let (tx, rx) = sync_channel::<Vec<i16>>(64);
+        let (tx, rx) = sync_channel::<Vec<u8>>(128);
         thread::spawn(move || {
             for data in rx {
                 let mut buffer = bufferpool.acquire_buffer(None).expect("acquire buffer");
-
-                //assert!(data.len() <= buffer.len::<i16>());
                 let mutbuf = buffer.make_mut();
-                mutbuf.set_size(data.len() * 2);
-                mutbuf.map_writable().unwrap().as_mut_slice().clone_from_slice(&data[..].as_bytes());
-
-                //buffer.set_live(true);
+                mutbuf.set_size(data.len());
+                mutbuf.copy_from_slice(0, data.as_bytes());
                 let res = appsrc.push_buffer(buffer).expect("Failed to push buffer");
             }
         });
@@ -110,20 +84,17 @@ impl Open for GstreamerSink {
 
 impl Sink for GstreamerSink {
     fn start(&mut self) -> io::Result<()> {
-        //self.pipeline.play();
         self.pipeline.set_state(gst::State::Playing).expect("Unable to set the pipeline to the `Playing` state");
         Ok(())
     }
     fn stop(&mut self) -> io::Result<()> {
-        //self.pipeline.pause();
         self.pipeline.set_state(gst::State::Paused).expect("Unable to set the pipeline to the `Paused` state");
         Ok(())
     }
     fn write(&mut self, data: &[i16]) -> io::Result<()> {
-        // Copy expensively to avoid thread synchronization
-        let data = data.to_vec();
-        self.tx.send(data).expect("tx send failed in write function");
-
+        // Copy expensively (in to_vec()) to avoid thread synchronization
+        let deighta : &[u8] = data.as_bytes();
+        self.tx.send(deighta.to_vec()).expect("tx send failed in write function");
         Ok(())
     }
 }