1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use once_cell::sync::Lazy;
9use thiserror::Error;
10
11#[derive(Debug)]
15struct WrappedAtomicU64 {
16 #[cfg(not(target_has_atomic = "64"))]
17 atomic: Mutex<u64>,
18 #[cfg(target_has_atomic = "64")]
19 atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24 fn new(value: u64) -> WrappedAtomicU64 {
25 WrappedAtomicU64 {
26 atomic: atomic::AtomicU64::new(value),
27 }
28 }
29 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30 self.atomic.fetch_add(value, order)
31 }
32 fn store(&self, value: u64, order: atomic::Ordering) {
33 self.atomic.store(value, order);
34 }
35
36 fn load(&self, order: atomic::Ordering) -> u64 {
37 self.atomic.load(order)
38 }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43 fn new(value: u64) -> WrappedAtomicU64 {
44 WrappedAtomicU64 {
45 atomic: Mutex::new(value),
46 }
47 }
48 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49 let mut guard = self.atomic.lock().unwrap();
50 let old = *guard;
51 *guard += value;
52 old
53 }
54 fn store(&self, value: u64, _order: atomic::Ordering) {
55 *self.atomic.lock().unwrap() = value;
56 }
57 fn load(&self, _order: atomic::Ordering) -> u64 {
58 *self.atomic.lock().unwrap()
59 }
60}
61
62static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
63 gst::DebugCategory::new(
64 "utilsrs-stream-producer",
65 gst::DebugColorFlags::empty(),
66 Some("gst_app Stream Producer interface"),
67 )
68});
69
70#[derive(Debug, Clone)]
76pub struct StreamProducer {
77 appsink: gst_app::AppSink,
79 consumers: Arc<Mutex<StreamConsumers>>,
81}
82
83impl PartialEq for StreamProducer {
84 fn eq(&self, other: &Self) -> bool {
85 self.appsink.eq(&other.appsink)
86 }
87}
88
89impl Eq for StreamProducer {}
90
91#[derive(Debug)]
94#[must_use]
95pub struct ConsumptionLink {
96 consumer: gst_app::AppSrc,
97 producer: Option<StreamProducer>,
98 dropped: Arc<WrappedAtomicU64>,
100 pushed: Arc<WrappedAtomicU64>,
102 discard: Arc<atomic::AtomicBool>,
104}
105
106impl ConsumptionLink {
107 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
109 ConsumptionLink {
110 consumer,
111 producer: None,
112 dropped: Arc::new(WrappedAtomicU64::new(0)),
113 pushed: Arc::new(WrappedAtomicU64::new(0)),
114 discard: Arc::new(atomic::AtomicBool::new(false)),
115 }
116 }
117
118 pub fn change_producer(
120 &mut self,
121 new_producer: &StreamProducer,
122 reset_stats: bool,
123 ) -> Result<(), AddConsumerError> {
124 self.disconnect();
125 if reset_stats {
126 self.dropped.store(0, atomic::Ordering::SeqCst);
127 self.pushed.store(0, atomic::Ordering::SeqCst);
128 }
129 new_producer.add_consumer_internal(
130 &self.consumer,
131 self.dropped.clone(),
132 self.pushed.clone(),
133 self.discard.clone(),
134 )?;
135 self.producer = Some(new_producer.clone());
136 Ok(())
137 }
138
139 pub fn disconnect(&mut self) {
141 if let Some(producer) = self.producer.take() {
142 producer.remove_consumer(&self.consumer);
143 }
144 }
145
146 pub fn dropped(&self) -> u64 {
148 self.dropped.load(atomic::Ordering::SeqCst)
149 }
150
151 pub fn pushed(&self) -> u64 {
153 self.pushed.load(atomic::Ordering::SeqCst)
154 }
155
156 pub fn discard(&self) -> bool {
158 self.discard.load(atomic::Ordering::SeqCst)
159 }
160
161 pub fn set_discard(&self, discard: bool) {
163 self.discard.store(discard, atomic::Ordering::SeqCst)
164 }
165
166 pub fn appsrc(&self) -> &gst_app::AppSrc {
168 &self.consumer
169 }
170}
171
172impl Drop for ConsumptionLink {
173 fn drop(&mut self) {
174 self.disconnect();
175 }
176}
177
178#[derive(Debug, Error)]
179pub enum AddConsumerError {
181 #[error("Consumer already added")]
182 AlreadyAdded,
184}
185
186impl StreamProducer {
187 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
191 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
194 consumer.set_format(gst::Format::Time);
195 consumer.set_is_live(true);
196 consumer.set_handle_segment_change(true);
197 consumer.set_max_buffers(0);
198 consumer.set_max_bytes(0);
199 consumer.set_max_time(500 * gst::ClockTime::MSECOND);
200 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
201 consumer.set_automatic_eos(false);
202 }
203
204 pub fn add_consumer(
208 &self,
209 consumer: &gst_app::AppSrc,
210 ) -> Result<ConsumptionLink, AddConsumerError> {
211 let dropped = Arc::new(WrappedAtomicU64::new(0));
212 let pushed = Arc::new(WrappedAtomicU64::new(0));
213 let discard = Arc::new(atomic::AtomicBool::new(false));
214
215 self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?;
216
217 Ok(ConsumptionLink {
218 consumer: consumer.clone(),
219 producer: Some(self.clone()),
220 dropped,
221 pushed,
222 discard,
223 })
224 }
225
226 fn add_consumer_internal(
227 &self,
228 consumer: &gst_app::AppSrc,
229 dropped: Arc<WrappedAtomicU64>,
230 pushed: Arc<WrappedAtomicU64>,
231 discard: Arc<atomic::AtomicBool>,
232 ) -> Result<(), AddConsumerError> {
233 let mut consumers = self.consumers.lock().unwrap();
234 if consumers.consumers.contains_key(consumer) {
235 gst::error!(
236 CAT,
237 obj = &self.appsink,
238 "Consumer {} ({:?}) already added",
239 consumer.name(),
240 consumer
241 );
242 return Err(AddConsumerError::AlreadyAdded);
243 }
244
245 gst::debug!(
246 CAT,
247 obj = &self.appsink,
248 "Adding consumer {} ({:?})",
249 consumer.name(),
250 consumer
251 );
252
253 Self::configure_consumer(consumer);
254
255 let srcpad = consumer.static_pad("src").unwrap();
257 let appsink = &self.appsink;
258 let fku_probe_id = srcpad
259 .add_probe(
260 gst::PadProbeType::EVENT_UPSTREAM,
261 glib::clone!(
262 #[weak]
263 appsink,
264 #[upgrade_or_panic]
265 move |_pad, info| {
266 let Some(event) = info.event() else {
267 return gst::PadProbeReturn::Ok;
268 };
269
270 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
271 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
272 let pad = appsink.static_pad("sink").unwrap();
274 let _ = pad.push_event(event.clone());
275 }
276
277 gst::PadProbeReturn::Ok
278 }
279 ),
280 )
281 .unwrap();
282
283 let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard);
284
285 consumers
286 .consumers
287 .insert(consumer.clone(), stream_consumer);
288
289 let events_to_forward = consumers.events_to_forward.clone();
292 drop(consumers);
294
295 let appsink_pad = self.appsink.static_pad("sink").unwrap();
296 appsink_pad.sticky_events_foreach(|event| {
297 if events_to_forward.contains(&event.type_()) {
298 gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
299 consumer.send_event(event.clone());
300 }
301
302 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
303 });
304
305 Ok(())
306 }
307
308 fn process_sample(
309 sample: gst::Sample,
310 appsink: &gst_app::AppSink,
311 mut consumers: MutexGuard<StreamConsumers>,
312 ) -> Result<gst::FlowSuccess, gst::FlowError> {
313 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
314 let flags = buf.flags();
315
316 (
317 flags.contains(gst::BufferFlags::DISCONT),
318 !flags.contains(gst::BufferFlags::DELTA_UNIT),
319 )
320 } else {
321 (false, true)
322 };
323
324 gst::trace!(
325 CAT,
326 obj = appsink,
327 "processing sample {:?}",
328 sample.buffer()
329 );
330
331 let latency = consumers.current_latency;
332 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
333
334 let mut needs_keyframe_request = false;
335
336 let current_consumers = consumers
337 .consumers
338 .values()
339 .filter_map(|consumer| {
340 if let Some(latency) = latency {
341 if consumer
342 .forwarded_latency
343 .compare_exchange(
344 false,
345 true,
346 atomic::Ordering::SeqCst,
347 atomic::Ordering::SeqCst,
348 )
349 .is_ok()
350 || latency_updated
351 {
352 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
353 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
354 }
355 }
356
357 if consumer.discard.load(atomic::Ordering::SeqCst) {
358 consumer
359 .needs_keyframe
360 .store(false, atomic::Ordering::SeqCst);
361 return None;
362 }
363
364 if is_discont && !is_keyframe {
365 consumer
367 .needs_keyframe
368 .store(true, atomic::Ordering::SeqCst);
369 }
370
371 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
372 if !needs_keyframe_request {
374 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
375 needs_keyframe_request = true;
376 }
377
378 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
379
380 gst::debug!(
381 CAT,
382 obj = appsink,
383 "Ignoring frame for {} while waiting for a keyframe",
384 consumer.appsrc.name()
385 );
386 None
387 } else {
388 consumer
389 .needs_keyframe
390 .store(false, atomic::Ordering::SeqCst);
391 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
392
393 Some(consumer.appsrc.clone())
394 }
395 })
396 .collect::<Vec<_>>();
397
398 drop(consumers);
399
400 if needs_keyframe_request {
401 let pad = appsink.static_pad("sink").unwrap();
403 pad.push_event(
404 gst_video::UpstreamForceKeyUnitEvent::builder()
405 .all_headers(true)
406 .build(),
407 );
408 }
409
410 for consumer in current_consumers {
411 if let Err(err) = consumer.push_sample(&sample) {
412 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
413 }
414 }
415 Ok(gst::FlowSuccess::Ok)
416 }
417
418 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
420 let name = consumer.name();
421 if self
422 .consumers
423 .lock()
424 .unwrap()
425 .consumers
426 .remove(consumer)
427 .is_some()
428 {
429 gst::debug!(
430 CAT,
431 obj = &self.appsink,
432 "Removed consumer {} ({:?})",
433 name,
434 consumer
435 );
436 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
437 } else {
438 gst::debug!(
439 CAT,
440 obj = &self.appsink,
441 "Consumer {} ({:?}) not found",
442 name,
443 consumer
444 );
445 }
446 }
447
448 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
450 self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
451 }
452
453 pub fn set_forward_preroll(&self, forward_preroll: bool) {
455 self.consumers.lock().unwrap().forward_preroll = forward_preroll;
456 }
457
458 pub fn appsink(&self) -> &gst_app::AppSink {
460 &self.appsink
461 }
462
463 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
465 let consumers = self.consumers.lock().unwrap();
466
467 for consumer in consumers.consumers.keys() {
468 let mut msg_builder =
469 gst::message::Error::builder_from_error(error.clone()).src(consumer);
470 if let Some(debug) = debug {
471 msg_builder = msg_builder.debug(debug);
472 }
473
474 let _ = consumer.post_message(msg_builder.build());
475 }
476 }
477
478 pub fn last_sample(&self) -> Option<gst::Sample> {
480 self.appsink.property("last-sample")
481 }
482}
483
484impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
485 fn from(appsink: &'a gst_app::AppSink) -> Self {
486 let consumers = Arc::new(Mutex::new(StreamConsumers {
487 current_latency: None,
488 latency_updated: false,
489 consumers: HashMap::new(),
490 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
493 forward_preroll: true,
494 just_forwarded_preroll: false,
495 }));
496
497 appsink.set_callbacks(
498 gst_app::AppSinkCallbacks::builder()
499 .new_sample(glib::clone!(
500 #[strong]
501 consumers,
502 move |appsink| {
503 let mut consumers = consumers.lock().unwrap();
504
505 let sample = match appsink.pull_sample() {
506 Ok(sample) => sample,
507 Err(_err) => {
508 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
509 return Err(gst::FlowError::Flushing);
510 }
511 };
512
513 let just_forwarded_preroll =
514 mem::replace(&mut consumers.just_forwarded_preroll, false);
515
516 if just_forwarded_preroll {
517 return Ok(gst::FlowSuccess::Ok);
518 }
519
520 StreamProducer::process_sample(sample, appsink, consumers)
521 }
522 ))
523 .new_preroll(glib::clone!(
524 #[strong]
525 consumers,
526 move |appsink| {
527 let mut consumers = consumers.lock().unwrap();
528
529 let sample = match appsink.pull_preroll() {
530 Ok(sample) => sample,
531 Err(_err) => {
532 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
533 return Err(gst::FlowError::Flushing);
534 }
535 };
536
537 if consumers.forward_preroll {
538 consumers.just_forwarded_preroll = true;
539
540 StreamProducer::process_sample(sample, appsink, consumers)
541 } else {
542 Ok(gst::FlowSuccess::Ok)
543 }
544 }
545 ))
546 .new_event(glib::clone!(
547 #[strong]
548 consumers,
549 move |appsink| {
550 match appsink
551 .pull_object()
552 .map(|obj| obj.downcast::<gst::Event>())
553 {
554 Ok(Ok(event)) => {
555 let (events_to_forward, appsrcs) = {
556 let consumers = consumers.lock().unwrap();
558 let events = consumers.events_to_forward.clone();
559 let appsrcs =
560 consumers.consumers.keys().cloned().collect::<Vec<_>>();
561
562 (events, appsrcs)
563 };
564
565 if events_to_forward.contains(&event.type_()) {
566 for appsrc in appsrcs {
567 appsrc.send_event(event.clone());
568 }
569 }
570 }
571 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
573 }
574
575 false
576 }
577 ))
578 .eos(glib::clone!(
579 #[strong]
580 consumers,
581 move |appsink| {
582 let stream_consumers = consumers.lock().unwrap();
583
584 if stream_consumers
585 .events_to_forward
586 .contains(&gst::EventType::Eos)
587 {
588 let current_consumers = stream_consumers
589 .consumers
590 .values()
591 .map(|c| c.appsrc.clone())
592 .collect::<Vec<_>>();
593 drop(stream_consumers);
594
595 for consumer in current_consumers {
596 gst::debug!(
597 CAT,
598 obj = appsink,
599 "set EOS on consumer {}",
600 consumer.name()
601 );
602 let _ = consumer.end_of_stream();
603 }
604 } else {
605 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
606 }
607 }
608 ))
609 .build(),
610 );
611
612 let sinkpad = appsink.static_pad("sink").unwrap();
613 sinkpad.add_probe(
614 gst::PadProbeType::EVENT_UPSTREAM,
615 glib::clone!(
616 #[strong]
617 consumers,
618 move |_pad, info| {
619 let Some(event) = info.event() else {
620 return gst::PadProbeReturn::Ok;
621 };
622
623 let gst::EventView::Latency(event) = event.view() else {
624 return gst::PadProbeReturn::Ok;
625 };
626
627 let latency = event.latency();
628 let mut consumers = consumers.lock().unwrap();
629 consumers.current_latency = Some(latency);
630 consumers.latency_updated = true;
631
632 gst::PadProbeReturn::Ok
633 }
634 ),
635 );
636
637 StreamProducer {
638 appsink: appsink.clone(),
639 consumers,
640 }
641 }
642}
643
644#[derive(Debug)]
647struct StreamConsumers {
648 current_latency: Option<gst::ClockTime>,
650 latency_updated: bool,
652 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
654 events_to_forward: Vec<gst::EventType>,
656 forward_preroll: bool,
658 just_forwarded_preroll: bool,
662}
663
664#[derive(Debug)]
666struct StreamConsumer {
667 appsrc: gst_app::AppSrc,
669 fku_probe_id: Option<gst::PadProbeId>,
671 forwarded_latency: atomic::AtomicBool,
673 needs_keyframe: Arc<atomic::AtomicBool>,
677 dropped: Arc<WrappedAtomicU64>,
679 pushed: Arc<WrappedAtomicU64>,
681 discard: Arc<atomic::AtomicBool>,
683}
684
685impl StreamConsumer {
686 fn new(
688 appsrc: &gst_app::AppSrc,
689 fku_probe_id: gst::PadProbeId,
690 dropped: Arc<WrappedAtomicU64>,
691 pushed: Arc<WrappedAtomicU64>,
692 discard: Arc<atomic::AtomicBool>,
693 ) -> Self {
694 let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
695 let needs_keyframe_clone = needs_keyframe.clone();
696 let dropped_clone = dropped.clone();
697
698 appsrc.set_callbacks(
699 gst_app::AppSrcCallbacks::builder()
700 .enough_data(move |appsrc| {
701 gst::debug!(
702 CAT,
703 obj = appsrc,
704 "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
705 appsrc.name(),
706 appsrc,
707 );
708
709 needs_keyframe_clone.store(true, atomic::Ordering::SeqCst);
710 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
711 })
712 .build(),
713 );
714
715 StreamConsumer {
716 appsrc: appsrc.clone(),
717 fku_probe_id: Some(fku_probe_id),
718 forwarded_latency: atomic::AtomicBool::new(false),
719 needs_keyframe,
720 dropped,
721 pushed,
722 discard,
723 }
724 }
725}
726
727impl Drop for StreamConsumer {
728 fn drop(&mut self) {
729 if let Some(fku_probe_id) = self.fku_probe_id.take() {
730 let srcpad = self.appsrc.static_pad("src").unwrap();
731 srcpad.remove_probe(fku_probe_id);
732 }
733 }
734}
735
736impl PartialEq for StreamConsumer {
737 fn eq(&self, other: &Self) -> bool {
738 self.appsrc.eq(&other.appsrc)
739 }
740}
741
742impl Eq for StreamConsumer {}
743
744impl std::hash::Hash for StreamConsumer {
745 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
746 std::hash::Hash::hash(&self.appsrc, state);
747 }
748}
749
750impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
751 #[inline]
752 fn borrow(&self) -> &gst_app::AppSrc {
753 &self.appsrc
754 }
755}
756
757#[cfg(test)]
758mod tests {
759 use std::{
760 str::FromStr,
761 sync::{Arc, Mutex},
762 };
763
764 use futures::{
765 channel::{mpsc, mpsc::Receiver},
766 SinkExt, StreamExt,
767 };
768 use gst::prelude::*;
769
770 use crate::{ConsumptionLink, StreamProducer};
771
772 fn create_producer() -> (
773 gst::Pipeline,
774 gst_app::AppSrc,
775 gst_app::AppSink,
776 StreamProducer,
777 ) {
778 let producer_pipe =
779 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
780 .unwrap()
781 .downcast::<gst::Pipeline>()
782 .unwrap();
783 let producer_sink = producer_pipe
784 .by_name("producer_sink")
785 .unwrap()
786 .downcast::<gst_app::AppSink>()
787 .unwrap();
788
789 (
790 producer_pipe.clone(),
791 producer_pipe
792 .by_name("producer_src")
793 .unwrap()
794 .downcast::<gst_app::AppSrc>()
795 .unwrap(),
796 producer_sink.clone(),
797 StreamProducer::from(&producer_sink),
798 )
799 }
800
801 struct Consumer {
802 pipeline: gst::Pipeline,
803 src: gst_app::AppSrc,
804 sink: gst_app::AppSink,
805 receiver: Mutex<Receiver<gst::Sample>>,
806 connected: Mutex<bool>,
807 }
808
809 impl Consumer {
810 fn new(id: &str) -> Self {
811 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
812 .unwrap()
813 .downcast::<gst::Pipeline>()
814 .unwrap();
815
816 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
817 let sender = Arc::new(Mutex::new(sender));
818 let sink = pipeline
819 .by_name("sink")
820 .unwrap()
821 .downcast::<gst_app::AppSink>()
822 .unwrap();
823
824 sink.set_callbacks(
825 gst_app::AppSinkCallbacks::builder()
826 .new_sample(move |appsink| {
828 let sender_clone = sender.clone();
830 futures::executor::block_on(
831 sender_clone
832 .lock()
833 .unwrap()
834 .send(appsink.pull_sample().unwrap()),
835 )
836 .unwrap();
837
838 Ok(gst::FlowSuccess::Ok)
839 })
840 .build(),
841 );
842
843 Self {
844 pipeline: pipeline.clone(),
845 src: pipeline
846 .by_name(id)
847 .unwrap()
848 .downcast::<gst_app::AppSrc>()
849 .unwrap(),
850 sink,
851 receiver: Mutex::new(receiver),
852 connected: Mutex::new(false),
853 }
854 }
855
856 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
857 {
858 let mut connected = self.connected.lock().unwrap();
859 *connected = true;
860 }
861
862 producer.add_consumer(&self.src).unwrap()
863 }
864
865 fn disconnect(&self, producer: &StreamProducer) {
866 {
867 let mut connected = self.connected.lock().unwrap();
868 *connected = false;
869 }
870
871 producer.remove_consumer(&self.src);
872 }
873 }
874
875 #[test]
876 fn simple() {
877 gst::init().unwrap();
878
879 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
880 producer_pipe
881 .set_state(gst::State::Playing)
882 .expect("Couldn't set producer pipeline state");
883
884 let mut consumers: Vec<Consumer> = Vec::new();
885 let consumer = Consumer::new("consumer1");
886 let link1 = consumer.connect(&producer);
887 consumer
888 .pipeline
889 .set_state(gst::State::Playing)
890 .expect("Couldn't set producer pipeline state");
891 consumers.push(consumer);
892
893 let consumer = Consumer::new("consumer2");
894 let link2 = consumer.connect(&producer);
895 consumer
896 .pipeline
897 .set_state(gst::State::Playing)
898 .expect("Couldn't set producer pipeline state");
899 consumers.push(consumer);
900
901 assert!(producer.last_sample().is_none());
902
903 for i in 0..10 {
904 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
905 producer_src.set_caps(Some(&caps));
906 producer_src.push_buffer(gst::Buffer::new()).unwrap();
907
908 for consumer in &consumers {
909 if *consumer.connected.lock().unwrap() {
910 let sample =
911 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
912 .expect("Received an empty buffer?");
913 sample.buffer().expect("No buffer on the sample?");
914 assert_eq!(sample.caps(), Some(caps.as_ref()));
915 } else {
916 debug_assert!(
917 consumer
918 .sink
919 .try_pull_sample(gst::ClockTime::from_nseconds(0))
920 .is_none(),
921 "Disconnected consumer got a new sample?!"
922 );
923 }
924 }
925
926 if i == 5 {
927 consumers.first().unwrap().disconnect(&producer);
928 }
929 }
930
931 assert!(producer.last_sample().is_some());
932
933 assert_eq!(link1.pushed(), 6);
934 assert_eq!(link1.dropped(), 0);
935 assert_eq!(link2.pushed(), 10);
936 assert_eq!(link2.dropped(), 0);
937 }
938}