gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use once_cell::sync::Lazy;
9use thiserror::Error;
10
11// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
12// operations where supported, and fallback to a mutex where it is not. The wrapper methods
13// are the ones that are needed, and not all are exposed.
14#[derive(Debug)]
15struct WrappedAtomicU64 {
16    #[cfg(not(target_has_atomic = "64"))]
17    atomic: Mutex<u64>,
18    #[cfg(target_has_atomic = "64")]
19    atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24    fn new(value: u64) -> WrappedAtomicU64 {
25        WrappedAtomicU64 {
26            atomic: atomic::AtomicU64::new(value),
27        }
28    }
29    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30        self.atomic.fetch_add(value, order)
31    }
32    fn store(&self, value: u64, order: atomic::Ordering) {
33        self.atomic.store(value, order);
34    }
35
36    fn load(&self, order: atomic::Ordering) -> u64 {
37        self.atomic.load(order)
38    }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43    fn new(value: u64) -> WrappedAtomicU64 {
44        WrappedAtomicU64 {
45            atomic: Mutex::new(value),
46        }
47    }
48    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49        let mut guard = self.atomic.lock().unwrap();
50        let old = *guard;
51        *guard += value;
52        old
53    }
54    fn store(&self, value: u64, _order: atomic::Ordering) {
55        *self.atomic.lock().unwrap() = value;
56    }
57    fn load(&self, _order: atomic::Ordering) -> u64 {
58        *self.atomic.lock().unwrap()
59    }
60}
61
62static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
63    gst::DebugCategory::new(
64        "utilsrs-stream-producer",
65        gst::DebugColorFlags::empty(),
66        Some("gst_app Stream Producer interface"),
67    )
68});
69
70/// The interface for transporting media data from one node
71/// to another.
72///
73/// A producer is essentially a GStreamer `appsink` whose output
74/// is sent to a set of consumers, who are essentially `appsrc` wrappers
75#[derive(Debug, Clone)]
76pub struct StreamProducer {
77    /// The appsink to dispatch data for
78    appsink: gst_app::AppSink,
79    /// The consumers to dispatch data to
80    consumers: Arc<Mutex<StreamConsumers>>,
81}
82
83impl PartialEq for StreamProducer {
84    fn eq(&self, other: &Self) -> bool {
85        self.appsink.eq(&other.appsink)
86    }
87}
88
89impl Eq for StreamProducer {}
90
91/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
92/// The producer and consumer will stay alive while the link is.
93#[derive(Debug)]
94#[must_use]
95pub struct ConsumptionLink {
96    consumer: gst_app::AppSrc,
97    producer: Option<StreamProducer>,
98    /// number of buffers dropped because `consumer` internal queue was full
99    dropped: Arc<WrappedAtomicU64>,
100    /// number of buffers pushed through `consumer`
101    pushed: Arc<WrappedAtomicU64>,
102    /// if buffers should not be pushed to the `consumer` right now
103    discard: Arc<atomic::AtomicBool>,
104}
105
106impl ConsumptionLink {
107    /// Create a new disconnected `ConsumptionLink`.
108    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
109        ConsumptionLink {
110            consumer,
111            producer: None,
112            dropped: Arc::new(WrappedAtomicU64::new(0)),
113            pushed: Arc::new(WrappedAtomicU64::new(0)),
114            discard: Arc::new(atomic::AtomicBool::new(false)),
115        }
116    }
117
118    /// Replace the producer by a new one, keeping the existing consumer.
119    pub fn change_producer(
120        &mut self,
121        new_producer: &StreamProducer,
122        reset_stats: bool,
123    ) -> Result<(), AddConsumerError> {
124        self.disconnect();
125        if reset_stats {
126            self.dropped.store(0, atomic::Ordering::SeqCst);
127            self.pushed.store(0, atomic::Ordering::SeqCst);
128        }
129        new_producer.add_consumer_internal(
130            &self.consumer,
131            self.dropped.clone(),
132            self.pushed.clone(),
133            self.discard.clone(),
134        )?;
135        self.producer = Some(new_producer.clone());
136        Ok(())
137    }
138
139    /// Disconnect the consumer from the producer
140    pub fn disconnect(&mut self) {
141        if let Some(producer) = self.producer.take() {
142            producer.remove_consumer(&self.consumer);
143        }
144    }
145
146    /// number of dropped buffers because the consumer internal queue was full
147    pub fn dropped(&self) -> u64 {
148        self.dropped.load(atomic::Ordering::SeqCst)
149    }
150
151    /// number of buffers pushed through this link
152    pub fn pushed(&self) -> u64 {
153        self.pushed.load(atomic::Ordering::SeqCst)
154    }
155
156    /// if buffers are currently pushed through this link
157    pub fn discard(&self) -> bool {
158        self.discard.load(atomic::Ordering::SeqCst)
159    }
160
161    /// If set to `true` then no buffers will be pushed through this link
162    pub fn set_discard(&self, discard: bool) {
163        self.discard.store(discard, atomic::Ordering::SeqCst)
164    }
165
166    /// Get the GStreamer `appsrc` wrapped by this link
167    pub fn appsrc(&self) -> &gst_app::AppSrc {
168        &self.consumer
169    }
170}
171
172impl Drop for ConsumptionLink {
173    fn drop(&mut self) {
174        self.disconnect();
175    }
176}
177
178#[derive(Debug, Error)]
179/// Error type returned when adding consumers to producers.
180pub enum AddConsumerError {
181    #[error("Consumer already added")]
182    /// Consumer has already been added to this producer.
183    AlreadyAdded,
184}
185
186impl StreamProducer {
187    /// Configure a consumer `appsrc` for later use in a `StreamProducer`
188    ///
189    /// This is automatically called when calling `add_consumer()`.
190    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
191        // Latency on the appsrc is set by the publisher before the first buffer
192        // and whenever it changes
193        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
194        consumer.set_format(gst::Format::Time);
195        consumer.set_is_live(true);
196        consumer.set_handle_segment_change(true);
197        consumer.set_max_buffers(0);
198        consumer.set_max_bytes(0);
199        consumer.set_max_time(500 * gst::ClockTime::MSECOND);
200        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
201        consumer.set_automatic_eos(false);
202    }
203
204    /// Add an appsrc to dispatch data to.
205    ///
206    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
207    pub fn add_consumer(
208        &self,
209        consumer: &gst_app::AppSrc,
210    ) -> Result<ConsumptionLink, AddConsumerError> {
211        let dropped = Arc::new(WrappedAtomicU64::new(0));
212        let pushed = Arc::new(WrappedAtomicU64::new(0));
213        let discard = Arc::new(atomic::AtomicBool::new(false));
214
215        self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?;
216
217        Ok(ConsumptionLink {
218            consumer: consumer.clone(),
219            producer: Some(self.clone()),
220            dropped,
221            pushed,
222            discard,
223        })
224    }
225
226    fn add_consumer_internal(
227        &self,
228        consumer: &gst_app::AppSrc,
229        dropped: Arc<WrappedAtomicU64>,
230        pushed: Arc<WrappedAtomicU64>,
231        discard: Arc<atomic::AtomicBool>,
232    ) -> Result<(), AddConsumerError> {
233        let mut consumers = self.consumers.lock().unwrap();
234        if consumers.consumers.contains_key(consumer) {
235            gst::error!(
236                CAT,
237                obj = &self.appsink,
238                "Consumer {} ({:?}) already added",
239                consumer.name(),
240                consumer
241            );
242            return Err(AddConsumerError::AlreadyAdded);
243        }
244
245        gst::debug!(
246            CAT,
247            obj = &self.appsink,
248            "Adding consumer {} ({:?})",
249            consumer.name(),
250            consumer
251        );
252
253        Self::configure_consumer(consumer);
254
255        // Forward force-keyunit events upstream to the appsink
256        let srcpad = consumer.static_pad("src").unwrap();
257        let appsink = &self.appsink;
258        let fku_probe_id = srcpad
259            .add_probe(
260                gst::PadProbeType::EVENT_UPSTREAM,
261                glib::clone!(
262                    #[weak]
263                    appsink,
264                    #[upgrade_or_panic]
265                    move |_pad, info| {
266                        let Some(event) = info.event() else {
267                            return gst::PadProbeReturn::Ok;
268                        };
269
270                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
271                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
272                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
273                            let pad = appsink.static_pad("sink").unwrap();
274                            let _ = pad.push_event(event.clone());
275                        }
276
277                        gst::PadProbeReturn::Ok
278                    }
279                ),
280            )
281            .unwrap();
282
283        let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard);
284
285        consumers
286            .consumers
287            .insert(consumer.clone(), stream_consumer);
288
289        // forward selected sticky events. We can send those now as appsrc will delay the events
290        // until stream-start, caps and segment are sent.
291        let events_to_forward = consumers.events_to_forward.clone();
292        // drop the lock before sending events
293        drop(consumers);
294
295        let appsink_pad = self.appsink.static_pad("sink").unwrap();
296        appsink_pad.sticky_events_foreach(|event| {
297            if events_to_forward.contains(&event.type_()) {
298                gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
299                consumer.send_event(event.clone());
300            }
301
302            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
303        });
304
305        Ok(())
306    }
307
308    fn process_sample(
309        sample: gst::Sample,
310        appsink: &gst_app::AppSink,
311        mut consumers: MutexGuard<StreamConsumers>,
312    ) -> Result<gst::FlowSuccess, gst::FlowError> {
313        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
314            let flags = buf.flags();
315
316            (
317                flags.contains(gst::BufferFlags::DISCONT),
318                !flags.contains(gst::BufferFlags::DELTA_UNIT),
319            )
320        } else {
321            (false, true)
322        };
323
324        gst::trace!(
325            CAT,
326            obj = appsink,
327            "processing sample {:?}",
328            sample.buffer()
329        );
330
331        let latency = consumers.current_latency;
332        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
333
334        let mut needs_keyframe_request = false;
335
336        let current_consumers = consumers
337            .consumers
338            .values()
339            .filter_map(|consumer| {
340                if let Some(latency) = latency {
341                    if consumer
342                        .forwarded_latency
343                        .compare_exchange(
344                            false,
345                            true,
346                            atomic::Ordering::SeqCst,
347                            atomic::Ordering::SeqCst,
348                        )
349                        .is_ok()
350                        || latency_updated
351                    {
352                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
353                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
354                    }
355                }
356
357                if consumer.discard.load(atomic::Ordering::SeqCst) {
358                    consumer
359                        .needs_keyframe
360                        .store(false, atomic::Ordering::SeqCst);
361                    return None;
362                }
363
364                if is_discont && !is_keyframe {
365                    // Whenever we have a discontinuity, we need a new keyframe
366                    consumer
367                        .needs_keyframe
368                        .store(true, atomic::Ordering::SeqCst);
369                }
370
371                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
372                    // If we need a keyframe (and this one isn't) request a keyframe upstream
373                    if !needs_keyframe_request {
374                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
375                        needs_keyframe_request = true;
376                    }
377
378                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
379
380                    gst::debug!(
381                        CAT,
382                        obj = appsink,
383                        "Ignoring frame for {} while waiting for a keyframe",
384                        consumer.appsrc.name()
385                    );
386                    None
387                } else {
388                    consumer
389                        .needs_keyframe
390                        .store(false, atomic::Ordering::SeqCst);
391                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
392
393                    Some(consumer.appsrc.clone())
394                }
395            })
396            .collect::<Vec<_>>();
397
398        drop(consumers);
399
400        if needs_keyframe_request {
401            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
402            let pad = appsink.static_pad("sink").unwrap();
403            pad.push_event(
404                gst_video::UpstreamForceKeyUnitEvent::builder()
405                    .all_headers(true)
406                    .build(),
407            );
408        }
409
410        for consumer in current_consumers {
411            if let Err(err) = consumer.push_sample(&sample) {
412                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
413            }
414        }
415        Ok(gst::FlowSuccess::Ok)
416    }
417
418    /// Remove a consumer appsrc by id
419    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
420        let name = consumer.name();
421        if self
422            .consumers
423            .lock()
424            .unwrap()
425            .consumers
426            .remove(consumer)
427            .is_some()
428        {
429            gst::debug!(
430                CAT,
431                obj = &self.appsink,
432                "Removed consumer {} ({:?})",
433                name,
434                consumer
435            );
436            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
437        } else {
438            gst::debug!(
439                CAT,
440                obj = &self.appsink,
441                "Consumer {} ({:?}) not found",
442                name,
443                consumer
444            );
445        }
446    }
447
448    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
449    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
450        self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
451    }
452
453    /// configure whether the preroll sample should be forwarded (default: `true`)
454    pub fn set_forward_preroll(&self, forward_preroll: bool) {
455        self.consumers.lock().unwrap().forward_preroll = forward_preroll;
456    }
457
458    /// Get the GStreamer `appsink` wrapped by this producer
459    pub fn appsink(&self) -> &gst_app::AppSink {
460        &self.appsink
461    }
462
463    /// Signals an error on all consumers
464    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
465        let consumers = self.consumers.lock().unwrap();
466
467        for consumer in consumers.consumers.keys() {
468            let mut msg_builder =
469                gst::message::Error::builder_from_error(error.clone()).src(consumer);
470            if let Some(debug) = debug {
471                msg_builder = msg_builder.debug(debug);
472            }
473
474            let _ = consumer.post_message(msg_builder.build());
475        }
476    }
477
478    /// The last sample produced by this producer.
479    pub fn last_sample(&self) -> Option<gst::Sample> {
480        self.appsink.property("last-sample")
481    }
482}
483
484impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
485    fn from(appsink: &'a gst_app::AppSink) -> Self {
486        let consumers = Arc::new(Mutex::new(StreamConsumers {
487            current_latency: None,
488            latency_updated: false,
489            consumers: HashMap::new(),
490            // it would make sense to automatically forward more events such as Tag but that would break
491            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
492            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
493            forward_preroll: true,
494            just_forwarded_preroll: false,
495        }));
496
497        appsink.set_callbacks(
498            gst_app::AppSinkCallbacks::builder()
499                .new_sample(glib::clone!(
500                    #[strong]
501                    consumers,
502                    move |appsink| {
503                        let mut consumers = consumers.lock().unwrap();
504
505                        let sample = match appsink.pull_sample() {
506                            Ok(sample) => sample,
507                            Err(_err) => {
508                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
509                                return Err(gst::FlowError::Flushing);
510                            }
511                        };
512
513                        let just_forwarded_preroll =
514                            mem::replace(&mut consumers.just_forwarded_preroll, false);
515
516                        if just_forwarded_preroll {
517                            return Ok(gst::FlowSuccess::Ok);
518                        }
519
520                        StreamProducer::process_sample(sample, appsink, consumers)
521                    }
522                ))
523                .new_preroll(glib::clone!(
524                    #[strong]
525                    consumers,
526                    move |appsink| {
527                        let mut consumers = consumers.lock().unwrap();
528
529                        let sample = match appsink.pull_preroll() {
530                            Ok(sample) => sample,
531                            Err(_err) => {
532                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
533                                return Err(gst::FlowError::Flushing);
534                            }
535                        };
536
537                        if consumers.forward_preroll {
538                            consumers.just_forwarded_preroll = true;
539
540                            StreamProducer::process_sample(sample, appsink, consumers)
541                        } else {
542                            Ok(gst::FlowSuccess::Ok)
543                        }
544                    }
545                ))
546                .new_event(glib::clone!(
547                    #[strong]
548                    consumers,
549                    move |appsink| {
550                        match appsink
551                            .pull_object()
552                            .map(|obj| obj.downcast::<gst::Event>())
553                        {
554                            Ok(Ok(event)) => {
555                                let (events_to_forward, appsrcs) = {
556                                    // clone so we don't keep the lock while pushing events
557                                    let consumers = consumers.lock().unwrap();
558                                    let events = consumers.events_to_forward.clone();
559                                    let appsrcs =
560                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
561
562                                    (events, appsrcs)
563                                };
564
565                                if events_to_forward.contains(&event.type_()) {
566                                    for appsrc in appsrcs {
567                                        appsrc.send_event(event.clone());
568                                    }
569                                }
570                            }
571                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
572                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
573                        }
574
575                        false
576                    }
577                ))
578                .eos(glib::clone!(
579                    #[strong]
580                    consumers,
581                    move |appsink| {
582                        let stream_consumers = consumers.lock().unwrap();
583
584                        if stream_consumers
585                            .events_to_forward
586                            .contains(&gst::EventType::Eos)
587                        {
588                            let current_consumers = stream_consumers
589                                .consumers
590                                .values()
591                                .map(|c| c.appsrc.clone())
592                                .collect::<Vec<_>>();
593                            drop(stream_consumers);
594
595                            for consumer in current_consumers {
596                                gst::debug!(
597                                    CAT,
598                                    obj = appsink,
599                                    "set EOS on consumer {}",
600                                    consumer.name()
601                                );
602                                let _ = consumer.end_of_stream();
603                            }
604                        } else {
605                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
606                        }
607                    }
608                ))
609                .build(),
610        );
611
612        let sinkpad = appsink.static_pad("sink").unwrap();
613        sinkpad.add_probe(
614            gst::PadProbeType::EVENT_UPSTREAM,
615            glib::clone!(
616                #[strong]
617                consumers,
618                move |_pad, info| {
619                    let Some(event) = info.event() else {
620                        return gst::PadProbeReturn::Ok;
621                    };
622
623                    let gst::EventView::Latency(event) = event.view() else {
624                        return gst::PadProbeReturn::Ok;
625                    };
626
627                    let latency = event.latency();
628                    let mut consumers = consumers.lock().unwrap();
629                    consumers.current_latency = Some(latency);
630                    consumers.latency_updated = true;
631
632                    gst::PadProbeReturn::Ok
633                }
634            ),
635        );
636
637        StreamProducer {
638            appsink: appsink.clone(),
639            consumers,
640        }
641    }
642}
643
644/// Wrapper around a HashMap of consumers, exists for thread safety
645/// and also protects some of the producer state
646#[derive(Debug)]
647struct StreamConsumers {
648    /// The currently-observed latency
649    current_latency: Option<gst::ClockTime>,
650    /// Whether the consumers' appsrc latency needs updating
651    latency_updated: bool,
652    /// The consumers, AppSrc pointer value -> consumer
653    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
654    /// What events should be forwarded to consumers
655    events_to_forward: Vec<gst::EventType>,
656    /// Whether the preroll sample should be forwarded at all
657    forward_preroll: bool,
658    /// Whether we just forwarded the preroll sample. When we did we want to
659    /// discard the next sample from on_new_sample as it would cause us to
660    /// otherwise push out the same sample twice to consumers.
661    just_forwarded_preroll: bool,
662}
663
664/// Wrapper around a consumer's `appsrc`
665#[derive(Debug)]
666struct StreamConsumer {
667    /// The GStreamer `appsrc` of the consumer
668    appsrc: gst_app::AppSrc,
669    /// The id of a pad probe that intercepts force-key-unit events
670    fku_probe_id: Option<gst::PadProbeId>,
671    /// Whether an initial latency was forwarded to the `appsrc`
672    forwarded_latency: atomic::AtomicBool,
673    /// Whether a first buffer has made it through, used to determine
674    /// whether a new key unit should be requested. Only useful for encoded
675    /// streams.
676    needs_keyframe: Arc<atomic::AtomicBool>,
677    /// number of buffers dropped because `appsrc` internal queue was full
678    dropped: Arc<WrappedAtomicU64>,
679    /// number of buffers pushed through `appsrc`
680    pushed: Arc<WrappedAtomicU64>,
681    /// if buffers should not be pushed to the `appsrc` right now
682    discard: Arc<atomic::AtomicBool>,
683}
684
685impl StreamConsumer {
686    /// Create a new consumer
687    fn new(
688        appsrc: &gst_app::AppSrc,
689        fku_probe_id: gst::PadProbeId,
690        dropped: Arc<WrappedAtomicU64>,
691        pushed: Arc<WrappedAtomicU64>,
692        discard: Arc<atomic::AtomicBool>,
693    ) -> Self {
694        let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
695        let needs_keyframe_clone = needs_keyframe.clone();
696        let dropped_clone = dropped.clone();
697
698        appsrc.set_callbacks(
699            gst_app::AppSrcCallbacks::builder()
700                .enough_data(move |appsrc| {
701                    gst::debug!(
702                        CAT,
703                        obj = appsrc,
704                        "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
705                        appsrc.name(),
706                        appsrc,
707                    );
708
709                    needs_keyframe_clone.store(true, atomic::Ordering::SeqCst);
710                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
711                })
712                .build(),
713        );
714
715        StreamConsumer {
716            appsrc: appsrc.clone(),
717            fku_probe_id: Some(fku_probe_id),
718            forwarded_latency: atomic::AtomicBool::new(false),
719            needs_keyframe,
720            dropped,
721            pushed,
722            discard,
723        }
724    }
725}
726
727impl Drop for StreamConsumer {
728    fn drop(&mut self) {
729        if let Some(fku_probe_id) = self.fku_probe_id.take() {
730            let srcpad = self.appsrc.static_pad("src").unwrap();
731            srcpad.remove_probe(fku_probe_id);
732        }
733    }
734}
735
736impl PartialEq for StreamConsumer {
737    fn eq(&self, other: &Self) -> bool {
738        self.appsrc.eq(&other.appsrc)
739    }
740}
741
742impl Eq for StreamConsumer {}
743
744impl std::hash::Hash for StreamConsumer {
745    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
746        std::hash::Hash::hash(&self.appsrc, state);
747    }
748}
749
750impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
751    #[inline]
752    fn borrow(&self) -> &gst_app::AppSrc {
753        &self.appsrc
754    }
755}
756
757#[cfg(test)]
758mod tests {
759    use std::{
760        str::FromStr,
761        sync::{Arc, Mutex},
762    };
763
764    use futures::{
765        channel::{mpsc, mpsc::Receiver},
766        SinkExt, StreamExt,
767    };
768    use gst::prelude::*;
769
770    use crate::{ConsumptionLink, StreamProducer};
771
772    fn create_producer() -> (
773        gst::Pipeline,
774        gst_app::AppSrc,
775        gst_app::AppSink,
776        StreamProducer,
777    ) {
778        let producer_pipe =
779            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
780                .unwrap()
781                .downcast::<gst::Pipeline>()
782                .unwrap();
783        let producer_sink = producer_pipe
784            .by_name("producer_sink")
785            .unwrap()
786            .downcast::<gst_app::AppSink>()
787            .unwrap();
788
789        (
790            producer_pipe.clone(),
791            producer_pipe
792                .by_name("producer_src")
793                .unwrap()
794                .downcast::<gst_app::AppSrc>()
795                .unwrap(),
796            producer_sink.clone(),
797            StreamProducer::from(&producer_sink),
798        )
799    }
800
801    struct Consumer {
802        pipeline: gst::Pipeline,
803        src: gst_app::AppSrc,
804        sink: gst_app::AppSink,
805        receiver: Mutex<Receiver<gst::Sample>>,
806        connected: Mutex<bool>,
807    }
808
809    impl Consumer {
810        fn new(id: &str) -> Self {
811            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
812                .unwrap()
813                .downcast::<gst::Pipeline>()
814                .unwrap();
815
816            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
817            let sender = Arc::new(Mutex::new(sender));
818            let sink = pipeline
819                .by_name("sink")
820                .unwrap()
821                .downcast::<gst_app::AppSink>()
822                .unwrap();
823
824            sink.set_callbacks(
825                gst_app::AppSinkCallbacks::builder()
826                    // Add a handler to the "new-sample" signal.
827                    .new_sample(move |appsink| {
828                        // Pull the sample in question out of the appsink's buffer.
829                        let sender_clone = sender.clone();
830                        futures::executor::block_on(
831                            sender_clone
832                                .lock()
833                                .unwrap()
834                                .send(appsink.pull_sample().unwrap()),
835                        )
836                        .unwrap();
837
838                        Ok(gst::FlowSuccess::Ok)
839                    })
840                    .build(),
841            );
842
843            Self {
844                pipeline: pipeline.clone(),
845                src: pipeline
846                    .by_name(id)
847                    .unwrap()
848                    .downcast::<gst_app::AppSrc>()
849                    .unwrap(),
850                sink,
851                receiver: Mutex::new(receiver),
852                connected: Mutex::new(false),
853            }
854        }
855
856        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
857            {
858                let mut connected = self.connected.lock().unwrap();
859                *connected = true;
860            }
861
862            producer.add_consumer(&self.src).unwrap()
863        }
864
865        fn disconnect(&self, producer: &StreamProducer) {
866            {
867                let mut connected = self.connected.lock().unwrap();
868                *connected = false;
869            }
870
871            producer.remove_consumer(&self.src);
872        }
873    }
874
875    #[test]
876    fn simple() {
877        gst::init().unwrap();
878
879        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
880        producer_pipe
881            .set_state(gst::State::Playing)
882            .expect("Couldn't set producer pipeline state");
883
884        let mut consumers: Vec<Consumer> = Vec::new();
885        let consumer = Consumer::new("consumer1");
886        let link1 = consumer.connect(&producer);
887        consumer
888            .pipeline
889            .set_state(gst::State::Playing)
890            .expect("Couldn't set producer pipeline state");
891        consumers.push(consumer);
892
893        let consumer = Consumer::new("consumer2");
894        let link2 = consumer.connect(&producer);
895        consumer
896            .pipeline
897            .set_state(gst::State::Playing)
898            .expect("Couldn't set producer pipeline state");
899        consumers.push(consumer);
900
901        assert!(producer.last_sample().is_none());
902
903        for i in 0..10 {
904            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
905            producer_src.set_caps(Some(&caps));
906            producer_src.push_buffer(gst::Buffer::new()).unwrap();
907
908            for consumer in &consumers {
909                if *consumer.connected.lock().unwrap() {
910                    let sample =
911                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
912                            .expect("Received an empty buffer?");
913                    sample.buffer().expect("No buffer on the sample?");
914                    assert_eq!(sample.caps(), Some(caps.as_ref()));
915                } else {
916                    debug_assert!(
917                        consumer
918                            .sink
919                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
920                            .is_none(),
921                        "Disconnected consumer got a new sample?!"
922                    );
923                }
924            }
925
926            if i == 5 {
927                consumers.first().unwrap().disconnect(&producer);
928            }
929        }
930
931        assert!(producer.last_sample().is_some());
932
933        assert_eq!(link1.pushed(), 6);
934        assert_eq!(link1.dropped(), 0);
935        assert_eq!(link2.pushed(), 10);
936        assert_eq!(link2.dropped(), 0);
937    }
938}