1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11#[derive(Debug)]
15struct WrappedAtomicU64 {
16 #[cfg(not(target_has_atomic = "64"))]
17 atomic: Mutex<u64>,
18 #[cfg(target_has_atomic = "64")]
19 atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24 fn new(value: u64) -> WrappedAtomicU64 {
25 WrappedAtomicU64 {
26 atomic: atomic::AtomicU64::new(value),
27 }
28 }
29 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30 self.atomic.fetch_add(value, order)
31 }
32 fn store(&self, value: u64, order: atomic::Ordering) {
33 self.atomic.store(value, order);
34 }
35
36 fn load(&self, order: atomic::Ordering) -> u64 {
37 self.atomic.load(order)
38 }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43 fn new(value: u64) -> WrappedAtomicU64 {
44 WrappedAtomicU64 {
45 atomic: Mutex::new(value),
46 }
47 }
48 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49 let mut guard = self.atomic.lock().unwrap();
50 let old = *guard;
51 *guard += value;
52 old
53 }
54 fn store(&self, value: u64, _order: atomic::Ordering) {
55 *self.atomic.lock().unwrap() = value;
56 }
57 fn load(&self, _order: atomic::Ordering) -> u64 {
58 *self.atomic.lock().unwrap()
59 }
60}
61
62static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
63 gst::DebugCategory::new(
64 "utilsrs-stream-producer",
65 gst::DebugColorFlags::empty(),
66 Some("gst_app Stream Producer interface"),
67 )
68});
69
70#[derive(Debug, Clone)]
76pub struct StreamProducer {
77 appsink: gst_app::AppSink,
79 consumers: Arc<Mutex<StreamConsumers>>,
81}
82
83impl PartialEq for StreamProducer {
84 fn eq(&self, other: &Self) -> bool {
85 self.appsink.eq(&other.appsink)
86 }
87}
88
89impl Eq for StreamProducer {}
90
91#[derive(Debug)]
94#[must_use]
95pub struct ConsumptionLink {
96 consumer: gst_app::AppSrc,
97 producer: Option<StreamProducer>,
98 dropped: Arc<WrappedAtomicU64>,
100 pushed: Arc<WrappedAtomicU64>,
102 discard: Arc<atomic::AtomicBool>,
104 wait_for_keyframe: Arc<atomic::AtomicBool>,
106}
107
108impl ConsumptionLink {
109 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
111 ConsumptionLink {
112 consumer,
113 producer: None,
114 dropped: Arc::new(WrappedAtomicU64::new(0)),
115 pushed: Arc::new(WrappedAtomicU64::new(0)),
116 discard: Arc::new(atomic::AtomicBool::new(false)),
117 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
118 }
119 }
120
121 pub fn change_producer(
123 &mut self,
124 new_producer: &StreamProducer,
125 reset_stats: bool,
126 ) -> Result<(), AddConsumerError> {
127 self.disconnect();
128 if reset_stats {
129 self.dropped.store(0, atomic::Ordering::SeqCst);
130 self.pushed.store(0, atomic::Ordering::SeqCst);
131 }
132 new_producer.add_consumer_internal(
133 &self.consumer,
134 self.dropped.clone(),
135 self.pushed.clone(),
136 self.discard.clone(),
137 self.wait_for_keyframe.clone(),
138 )?;
139 self.producer = Some(new_producer.clone());
140 Ok(())
141 }
142
143 pub fn disconnect(&mut self) {
145 if let Some(producer) = self.producer.take() {
146 producer.remove_consumer(&self.consumer);
147 }
148 }
149
150 pub fn dropped(&self) -> u64 {
152 self.dropped.load(atomic::Ordering::SeqCst)
153 }
154
155 pub fn pushed(&self) -> u64 {
157 self.pushed.load(atomic::Ordering::SeqCst)
158 }
159
160 pub fn discard(&self) -> bool {
162 self.discard.load(atomic::Ordering::SeqCst)
163 }
164
165 pub fn set_discard(&self, discard: bool) {
167 self.discard.store(discard, atomic::Ordering::SeqCst)
168 }
169
170 pub fn wait_for_keyframe(&self) -> bool {
172 self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
173 }
174
175 pub fn set_wait_for_keyframe(&self, wait: bool) {
178 self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
179 }
180
181 pub fn appsrc(&self) -> &gst_app::AppSrc {
183 &self.consumer
184 }
185}
186
187impl Drop for ConsumptionLink {
188 fn drop(&mut self) {
189 self.disconnect();
190 }
191}
192
193#[derive(Debug, Error)]
194pub enum AddConsumerError {
196 #[error("Consumer already added")]
197 AlreadyAdded,
199}
200
201impl StreamProducer {
202 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
206 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
209 consumer.set_format(gst::Format::Time);
210 consumer.set_is_live(true);
211 consumer.set_handle_segment_change(true);
212 consumer.set_max_buffers(0);
213 consumer.set_max_bytes(0);
214 consumer.set_max_time(500 * gst::ClockTime::MSECOND);
215 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
216 consumer.set_automatic_eos(false);
217 }
218
219 pub fn add_consumer(
223 &self,
224 consumer: &gst_app::AppSrc,
225 ) -> Result<ConsumptionLink, AddConsumerError> {
226 let dropped = Arc::new(WrappedAtomicU64::new(0));
227 let pushed = Arc::new(WrappedAtomicU64::new(0));
228 let discard = Arc::new(atomic::AtomicBool::new(false));
229 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
230
231 self.add_consumer_internal(
232 consumer,
233 dropped.clone(),
234 pushed.clone(),
235 discard.clone(),
236 wait_for_keyframe.clone(),
237 )?;
238
239 Ok(ConsumptionLink {
240 consumer: consumer.clone(),
241 producer: Some(self.clone()),
242 dropped,
243 pushed,
244 discard,
245 wait_for_keyframe,
246 })
247 }
248
249 fn add_consumer_internal(
250 &self,
251 consumer: &gst_app::AppSrc,
252 dropped: Arc<WrappedAtomicU64>,
253 pushed: Arc<WrappedAtomicU64>,
254 discard: Arc<atomic::AtomicBool>,
255 wait_for_keyframe: Arc<atomic::AtomicBool>,
256 ) -> Result<(), AddConsumerError> {
257 let mut consumers = self.consumers.lock().unwrap();
258 if consumers.consumers.contains_key(consumer) {
259 gst::error!(
260 CAT,
261 obj = &self.appsink,
262 "Consumer {} ({:?}) already added",
263 consumer.name(),
264 consumer
265 );
266 return Err(AddConsumerError::AlreadyAdded);
267 }
268
269 gst::debug!(
270 CAT,
271 obj = &self.appsink,
272 "Adding consumer {} ({:?})",
273 consumer.name(),
274 consumer
275 );
276
277 Self::configure_consumer(consumer);
278
279 let srcpad = consumer.static_pad("src").unwrap();
281 let appsink = &self.appsink;
282 let fku_probe_id = srcpad
283 .add_probe(
284 gst::PadProbeType::EVENT_UPSTREAM,
285 glib::clone!(
286 #[weak]
287 appsink,
288 #[upgrade_or_panic]
289 move |_pad, info| {
290 let Some(event) = info.event() else {
291 return gst::PadProbeReturn::Ok;
292 };
293
294 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
295 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
296 let pad = appsink.static_pad("sink").unwrap();
298 let _ = pad.push_event(event.clone());
299 }
300
301 gst::PadProbeReturn::Ok
302 }
303 ),
304 )
305 .unwrap();
306
307 let stream_consumer = StreamConsumer::new(
308 consumer,
309 fku_probe_id,
310 dropped,
311 pushed,
312 discard,
313 wait_for_keyframe,
314 );
315
316 consumers
317 .consumers
318 .insert(consumer.clone(), stream_consumer);
319
320 let events_to_forward = consumers.events_to_forward.clone();
323 drop(consumers);
325
326 let appsink_pad = self.appsink.static_pad("sink").unwrap();
327 appsink_pad.sticky_events_foreach(|event| {
328 if events_to_forward.contains(&event.type_()) {
329 gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
330 consumer.send_event(event.clone());
331 }
332
333 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
334 });
335
336 Ok(())
337 }
338
339 fn process_sample(
340 sample: gst::Sample,
341 appsink: &gst_app::AppSink,
342 mut consumers: MutexGuard<StreamConsumers>,
343 ) -> Result<gst::FlowSuccess, gst::FlowError> {
344 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
345 let flags = buf.flags();
346
347 (
348 flags.contains(gst::BufferFlags::DISCONT),
349 !flags.contains(gst::BufferFlags::DELTA_UNIT),
350 )
351 } else {
352 (false, true)
353 };
354
355 gst::trace!(
356 CAT,
357 obj = appsink,
358 "processing sample {:?}",
359 sample.buffer()
360 );
361
362 let latency = consumers.current_latency;
363 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
364
365 let mut needs_keyframe_request = false;
366
367 let current_consumers = consumers
368 .consumers
369 .values()
370 .filter_map(|consumer| {
371 if let Some(latency) = latency {
372 if consumer
373 .forwarded_latency
374 .compare_exchange(
375 false,
376 true,
377 atomic::Ordering::SeqCst,
378 atomic::Ordering::SeqCst,
379 )
380 .is_ok()
381 || latency_updated
382 {
383 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
384 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
385 }
386 }
387
388 if consumer.discard.load(atomic::Ordering::SeqCst) {
389 consumer
390 .needs_keyframe
391 .store(false, atomic::Ordering::SeqCst);
392 return None;
393 }
394
395 if is_discont
396 && !is_keyframe
397 && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
398 {
399 consumer
401 .needs_keyframe
402 .store(true, atomic::Ordering::SeqCst);
403 }
404
405 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
406 if !needs_keyframe_request {
408 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
409 needs_keyframe_request = true;
410 }
411
412 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
413
414 gst::error!(
415 CAT,
416 obj = appsink,
417 "Ignoring frame for {} while waiting for a keyframe",
418 consumer.appsrc.name()
419 );
420 None
421 } else {
422 consumer
423 .needs_keyframe
424 .store(false, atomic::Ordering::SeqCst);
425 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
426
427 Some(consumer.appsrc.clone())
428 }
429 })
430 .collect::<Vec<_>>();
431
432 drop(consumers);
433
434 if needs_keyframe_request {
435 let pad = appsink.static_pad("sink").unwrap();
437 pad.push_event(
438 gst_video::UpstreamForceKeyUnitEvent::builder()
439 .all_headers(true)
440 .build(),
441 );
442 }
443
444 for consumer in current_consumers {
445 if let Err(err) = consumer.push_sample(&sample) {
446 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
447 }
448 }
449 Ok(gst::FlowSuccess::Ok)
450 }
451
452 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
454 let name = consumer.name();
455 if self
456 .consumers
457 .lock()
458 .unwrap()
459 .consumers
460 .remove(consumer)
461 .is_some()
462 {
463 gst::debug!(
464 CAT,
465 obj = &self.appsink,
466 "Removed consumer {} ({:?})",
467 name,
468 consumer
469 );
470 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
471 } else {
472 gst::debug!(
473 CAT,
474 obj = &self.appsink,
475 "Consumer {} ({:?}) not found",
476 name,
477 consumer
478 );
479 }
480 }
481
482 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
484 self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
485 }
486
487 pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
489 self.consumers.lock().unwrap().events_to_forward.clone()
490 }
491
492 pub fn set_forward_preroll(&self, forward_preroll: bool) {
494 self.consumers.lock().unwrap().forward_preroll = forward_preroll;
495 }
496
497 pub fn appsink(&self) -> &gst_app::AppSink {
499 &self.appsink
500 }
501
502 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
504 let consumers = self.consumers.lock().unwrap();
505
506 for consumer in consumers.consumers.keys() {
507 let mut msg_builder =
508 gst::message::Error::builder_from_error(error.clone()).src(consumer);
509 if let Some(debug) = debug {
510 msg_builder = msg_builder.debug(debug);
511 }
512
513 let _ = consumer.post_message(msg_builder.build());
514 }
515 }
516
517 pub fn last_sample(&self) -> Option<gst::Sample> {
519 self.appsink.property("last-sample")
520 }
521}
522
523impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
524 fn from(appsink: &'a gst_app::AppSink) -> Self {
525 let consumers = Arc::new(Mutex::new(StreamConsumers {
526 current_latency: None,
527 latency_updated: false,
528 consumers: HashMap::new(),
529 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
532 forward_preroll: true,
533 just_forwarded_preroll: false,
534 }));
535
536 appsink.set_callbacks(
537 gst_app::AppSinkCallbacks::builder()
538 .new_sample(glib::clone!(
539 #[strong]
540 consumers,
541 move |appsink| {
542 let mut consumers = consumers.lock().unwrap();
543
544 let sample = match appsink.pull_sample() {
545 Ok(sample) => sample,
546 Err(_err) => {
547 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
548 return Err(gst::FlowError::Flushing);
549 }
550 };
551
552 let just_forwarded_preroll =
553 mem::replace(&mut consumers.just_forwarded_preroll, false);
554
555 if just_forwarded_preroll {
556 return Ok(gst::FlowSuccess::Ok);
557 }
558
559 StreamProducer::process_sample(sample, appsink, consumers)
560 }
561 ))
562 .new_preroll(glib::clone!(
563 #[strong]
564 consumers,
565 move |appsink| {
566 let mut consumers = consumers.lock().unwrap();
567
568 let sample = match appsink.pull_preroll() {
569 Ok(sample) => sample,
570 Err(_err) => {
571 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
572 return Err(gst::FlowError::Flushing);
573 }
574 };
575
576 if consumers.forward_preroll {
577 consumers.just_forwarded_preroll = true;
578
579 StreamProducer::process_sample(sample, appsink, consumers)
580 } else {
581 Ok(gst::FlowSuccess::Ok)
582 }
583 }
584 ))
585 .new_event(glib::clone!(
586 #[strong]
587 consumers,
588 move |appsink| {
589 match appsink
590 .pull_object()
591 .map(|obj| obj.downcast::<gst::Event>())
592 {
593 Ok(Ok(event)) => {
594 let (events_to_forward, appsrcs) = {
595 let consumers = consumers.lock().unwrap();
597 let events = consumers.events_to_forward.clone();
598 let appsrcs =
599 consumers.consumers.keys().cloned().collect::<Vec<_>>();
600
601 (events, appsrcs)
602 };
603
604 if events_to_forward.contains(&event.type_()) {
605 for appsrc in appsrcs {
606 appsrc.send_event(event.clone());
607 }
608 }
609 }
610 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
612 }
613
614 false
615 }
616 ))
617 .eos(glib::clone!(
618 #[strong]
619 consumers,
620 move |appsink| {
621 let stream_consumers = consumers.lock().unwrap();
622
623 if stream_consumers
624 .events_to_forward
625 .contains(&gst::EventType::Eos)
626 {
627 let current_consumers = stream_consumers
628 .consumers
629 .values()
630 .map(|c| c.appsrc.clone())
631 .collect::<Vec<_>>();
632 drop(stream_consumers);
633
634 for consumer in current_consumers {
635 gst::debug!(
636 CAT,
637 obj = appsink,
638 "set EOS on consumer {}",
639 consumer.name()
640 );
641 let _ = consumer.end_of_stream();
642 }
643 } else {
644 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
645 }
646 }
647 ))
648 .build(),
649 );
650
651 let sinkpad = appsink.static_pad("sink").unwrap();
652 sinkpad.add_probe(
653 gst::PadProbeType::EVENT_UPSTREAM,
654 glib::clone!(
655 #[strong]
656 consumers,
657 move |_pad, info| {
658 let Some(event) = info.event() else {
659 return gst::PadProbeReturn::Ok;
660 };
661
662 let gst::EventView::Latency(event) = event.view() else {
663 return gst::PadProbeReturn::Ok;
664 };
665
666 let latency = event.latency();
667 let mut consumers = consumers.lock().unwrap();
668 consumers.current_latency = Some(latency);
669 consumers.latency_updated = true;
670
671 gst::PadProbeReturn::Ok
672 }
673 ),
674 );
675
676 StreamProducer {
677 appsink: appsink.clone(),
678 consumers,
679 }
680 }
681}
682
683#[derive(Debug)]
686struct StreamConsumers {
687 current_latency: Option<gst::ClockTime>,
689 latency_updated: bool,
691 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
693 events_to_forward: Vec<gst::EventType>,
695 forward_preroll: bool,
697 just_forwarded_preroll: bool,
701}
702
703#[derive(Debug)]
705struct StreamConsumer {
706 appsrc: gst_app::AppSrc,
708 fku_probe_id: Option<gst::PadProbeId>,
710 forwarded_latency: atomic::AtomicBool,
712 needs_keyframe: Arc<atomic::AtomicBool>,
716 dropped: Arc<WrappedAtomicU64>,
718 pushed: Arc<WrappedAtomicU64>,
720 discard: Arc<atomic::AtomicBool>,
722 wait_for_keyframe: Arc<atomic::AtomicBool>,
724}
725
726impl StreamConsumer {
727 fn new(
729 appsrc: &gst_app::AppSrc,
730 fku_probe_id: gst::PadProbeId,
731 dropped: Arc<WrappedAtomicU64>,
732 pushed: Arc<WrappedAtomicU64>,
733 discard: Arc<atomic::AtomicBool>,
734 wait_for_keyframe: Arc<atomic::AtomicBool>,
735 ) -> Self {
736 let needs_keyframe = Arc::new(atomic::AtomicBool::new(
737 wait_for_keyframe.load(atomic::Ordering::SeqCst),
738 ));
739 let needs_keyframe_clone = needs_keyframe.clone();
740 let wait_for_keyframe_clone = wait_for_keyframe.clone();
741 let dropped_clone = dropped.clone();
742
743 appsrc.set_callbacks(
744 gst_app::AppSrcCallbacks::builder()
745 .enough_data(move |appsrc| {
746 gst::debug!(
747 CAT,
748 obj = appsrc,
749 "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
750 appsrc.name(),
751 appsrc,
752 );
753
754 needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
755 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
756 })
757 .build(),
758 );
759
760 StreamConsumer {
761 appsrc: appsrc.clone(),
762 fku_probe_id: Some(fku_probe_id),
763 forwarded_latency: atomic::AtomicBool::new(false),
764 needs_keyframe,
765 dropped,
766 pushed,
767 discard,
768 wait_for_keyframe,
769 }
770 }
771}
772
773impl Drop for StreamConsumer {
774 fn drop(&mut self) {
775 if let Some(fku_probe_id) = self.fku_probe_id.take() {
776 let srcpad = self.appsrc.static_pad("src").unwrap();
777 srcpad.remove_probe(fku_probe_id);
778 }
779 }
780}
781
782impl PartialEq for StreamConsumer {
783 fn eq(&self, other: &Self) -> bool {
784 self.appsrc.eq(&other.appsrc)
785 }
786}
787
788impl Eq for StreamConsumer {}
789
790impl std::hash::Hash for StreamConsumer {
791 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
792 std::hash::Hash::hash(&self.appsrc, state);
793 }
794}
795
796impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
797 #[inline]
798 fn borrow(&self) -> &gst_app::AppSrc {
799 &self.appsrc
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use std::{
806 str::FromStr,
807 sync::{Arc, Mutex},
808 };
809
810 use futures::{
811 channel::{mpsc, mpsc::Receiver},
812 SinkExt, StreamExt,
813 };
814 use gst::prelude::*;
815
816 use crate::{ConsumptionLink, StreamProducer};
817
818 fn create_producer() -> (
819 gst::Pipeline,
820 gst_app::AppSrc,
821 gst_app::AppSink,
822 StreamProducer,
823 ) {
824 let producer_pipe =
825 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
826 .unwrap()
827 .downcast::<gst::Pipeline>()
828 .unwrap();
829 let producer_sink = producer_pipe
830 .by_name("producer_sink")
831 .unwrap()
832 .downcast::<gst_app::AppSink>()
833 .unwrap();
834
835 (
836 producer_pipe.clone(),
837 producer_pipe
838 .by_name("producer_src")
839 .unwrap()
840 .downcast::<gst_app::AppSrc>()
841 .unwrap(),
842 producer_sink.clone(),
843 StreamProducer::from(&producer_sink),
844 )
845 }
846
847 struct Consumer {
848 pipeline: gst::Pipeline,
849 src: gst_app::AppSrc,
850 sink: gst_app::AppSink,
851 receiver: Mutex<Receiver<gst::Sample>>,
852 connected: Mutex<bool>,
853 }
854
855 impl Consumer {
856 fn new(id: &str) -> Self {
857 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
858 .unwrap()
859 .downcast::<gst::Pipeline>()
860 .unwrap();
861
862 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
863 let sender = Arc::new(Mutex::new(sender));
864 let sink = pipeline
865 .by_name("sink")
866 .unwrap()
867 .downcast::<gst_app::AppSink>()
868 .unwrap();
869
870 sink.set_callbacks(
871 gst_app::AppSinkCallbacks::builder()
872 .new_sample(move |appsink| {
874 let sender_clone = sender.clone();
876 futures::executor::block_on(
877 sender_clone
878 .lock()
879 .unwrap()
880 .send(appsink.pull_sample().unwrap()),
881 )
882 .unwrap();
883
884 Ok(gst::FlowSuccess::Ok)
885 })
886 .build(),
887 );
888
889 Self {
890 pipeline: pipeline.clone(),
891 src: pipeline
892 .by_name(id)
893 .unwrap()
894 .downcast::<gst_app::AppSrc>()
895 .unwrap(),
896 sink,
897 receiver: Mutex::new(receiver),
898 connected: Mutex::new(false),
899 }
900 }
901
902 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
903 {
904 let mut connected = self.connected.lock().unwrap();
905 *connected = true;
906 }
907
908 producer.add_consumer(&self.src).unwrap()
909 }
910
911 fn disconnect(&self, producer: &StreamProducer) {
912 {
913 let mut connected = self.connected.lock().unwrap();
914 *connected = false;
915 }
916
917 producer.remove_consumer(&self.src);
918 }
919 }
920
921 #[test]
922 fn simple() {
923 gst::init().unwrap();
924
925 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
926 producer_pipe
927 .set_state(gst::State::Playing)
928 .expect("Couldn't set producer pipeline state");
929
930 let mut consumers: Vec<Consumer> = Vec::new();
931 let consumer = Consumer::new("consumer1");
932 let link1 = consumer.connect(&producer);
933 consumer
934 .pipeline
935 .set_state(gst::State::Playing)
936 .expect("Couldn't set producer pipeline state");
937 consumers.push(consumer);
938
939 let consumer = Consumer::new("consumer2");
940 let link2 = consumer.connect(&producer);
941 consumer
942 .pipeline
943 .set_state(gst::State::Playing)
944 .expect("Couldn't set producer pipeline state");
945 consumers.push(consumer);
946
947 assert!(producer.last_sample().is_none());
948
949 for i in 0..10 {
950 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
951 producer_src.set_caps(Some(&caps));
952 producer_src.push_buffer(gst::Buffer::new()).unwrap();
953
954 for consumer in &consumers {
955 if *consumer.connected.lock().unwrap() {
956 let sample =
957 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
958 .expect("Received an empty buffer?");
959 sample.buffer().expect("No buffer on the sample?");
960 assert_eq!(sample.caps(), Some(caps.as_ref()));
961 } else {
962 debug_assert!(
963 consumer
964 .sink
965 .try_pull_sample(gst::ClockTime::from_nseconds(0))
966 .is_none(),
967 "Disconnected consumer got a new sample?!"
968 );
969 }
970 }
971
972 if i == 5 {
973 consumers.first().unwrap().disconnect(&producer);
974 }
975 }
976
977 assert!(producer.last_sample().is_some());
978
979 assert_eq!(link1.pushed(), 6);
980 assert_eq!(link1.dropped(), 0);
981 assert_eq!(link2.pushed(), 10);
982 assert_eq!(link2.dropped(), 0);
983 }
984}