gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
12// operations where supported, and fallback to a mutex where it is not. The wrapper methods
13// are the ones that are needed, and not all are exposed.
14#[derive(Debug)]
15struct WrappedAtomicU64 {
16    #[cfg(not(target_has_atomic = "64"))]
17    atomic: Mutex<u64>,
18    #[cfg(target_has_atomic = "64")]
19    atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24    fn new(value: u64) -> WrappedAtomicU64 {
25        WrappedAtomicU64 {
26            atomic: atomic::AtomicU64::new(value),
27        }
28    }
29    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30        self.atomic.fetch_add(value, order)
31    }
32    fn store(&self, value: u64, order: atomic::Ordering) {
33        self.atomic.store(value, order);
34    }
35
36    fn load(&self, order: atomic::Ordering) -> u64 {
37        self.atomic.load(order)
38    }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43    fn new(value: u64) -> WrappedAtomicU64 {
44        WrappedAtomicU64 {
45            atomic: Mutex::new(value),
46        }
47    }
48    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49        let mut guard = self.atomic.lock().unwrap();
50        let old = *guard;
51        *guard += value;
52        old
53    }
54    fn store(&self, value: u64, _order: atomic::Ordering) {
55        *self.atomic.lock().unwrap() = value;
56    }
57    fn load(&self, _order: atomic::Ordering) -> u64 {
58        *self.atomic.lock().unwrap()
59    }
60}
61
62static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
63    gst::DebugCategory::new(
64        "utilsrs-stream-producer",
65        gst::DebugColorFlags::empty(),
66        Some("gst_app Stream Producer interface"),
67    )
68});
69
70/// The interface for transporting media data from one node
71/// to another.
72///
73/// A producer is essentially a GStreamer `appsink` whose output
74/// is sent to a set of consumers, who are essentially `appsrc` wrappers
75#[derive(Debug, Clone)]
76pub struct StreamProducer {
77    /// The appsink to dispatch data for
78    appsink: gst_app::AppSink,
79    /// The consumers to dispatch data to
80    consumers: Arc<Mutex<StreamConsumers>>,
81}
82
83impl PartialEq for StreamProducer {
84    fn eq(&self, other: &Self) -> bool {
85        self.appsink.eq(&other.appsink)
86    }
87}
88
89impl Eq for StreamProducer {}
90
91/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
92/// The producer and consumer will stay alive while the link is.
93#[derive(Debug)]
94#[must_use]
95pub struct ConsumptionLink {
96    consumer: gst_app::AppSrc,
97    producer: Option<StreamProducer>,
98    /// number of buffers dropped because `consumer` internal queue was full
99    dropped: Arc<WrappedAtomicU64>,
100    /// number of buffers pushed through `consumer`
101    pushed: Arc<WrappedAtomicU64>,
102    /// if buffers should not be pushed to the `consumer` right now
103    discard: Arc<atomic::AtomicBool>,
104    /// whether the link will drop delta frames until next keyframe on discont
105    wait_for_keyframe: Arc<atomic::AtomicBool>,
106}
107
108impl ConsumptionLink {
109    /// Create a new disconnected `ConsumptionLink`.
110    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
111        ConsumptionLink {
112            consumer,
113            producer: None,
114            dropped: Arc::new(WrappedAtomicU64::new(0)),
115            pushed: Arc::new(WrappedAtomicU64::new(0)),
116            discard: Arc::new(atomic::AtomicBool::new(false)),
117            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
118        }
119    }
120
121    /// Replace the producer by a new one, keeping the existing consumer.
122    pub fn change_producer(
123        &mut self,
124        new_producer: &StreamProducer,
125        reset_stats: bool,
126    ) -> Result<(), AddConsumerError> {
127        self.disconnect();
128        if reset_stats {
129            self.dropped.store(0, atomic::Ordering::SeqCst);
130            self.pushed.store(0, atomic::Ordering::SeqCst);
131        }
132        new_producer.add_consumer_internal(
133            &self.consumer,
134            self.dropped.clone(),
135            self.pushed.clone(),
136            self.discard.clone(),
137            self.wait_for_keyframe.clone(),
138        )?;
139        self.producer = Some(new_producer.clone());
140        Ok(())
141    }
142
143    /// Disconnect the consumer from the producer
144    pub fn disconnect(&mut self) {
145        if let Some(producer) = self.producer.take() {
146            producer.remove_consumer(&self.consumer);
147        }
148    }
149
150    /// number of dropped buffers because the consumer internal queue was full
151    pub fn dropped(&self) -> u64 {
152        self.dropped.load(atomic::Ordering::SeqCst)
153    }
154
155    /// number of buffers pushed through this link
156    pub fn pushed(&self) -> u64 {
157        self.pushed.load(atomic::Ordering::SeqCst)
158    }
159
160    /// if buffers are currently pushed through this link
161    pub fn discard(&self) -> bool {
162        self.discard.load(atomic::Ordering::SeqCst)
163    }
164
165    /// If set to `true` then no buffers will be pushed through this link
166    pub fn set_discard(&self, discard: bool) {
167        self.discard.store(discard, atomic::Ordering::SeqCst)
168    }
169
170    /// if the link will drop frames until the next keyframe on discont
171    pub fn wait_for_keyframe(&self) -> bool {
172        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
173    }
174
175    /// If set to `true` then the link will drop delta-frames until the next
176    /// keyframe on discont (default behavior).
177    pub fn set_wait_for_keyframe(&self, wait: bool) {
178        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
179    }
180
181    /// Get the GStreamer `appsrc` wrapped by this link
182    pub fn appsrc(&self) -> &gst_app::AppSrc {
183        &self.consumer
184    }
185}
186
187impl Drop for ConsumptionLink {
188    fn drop(&mut self) {
189        self.disconnect();
190    }
191}
192
193#[derive(Debug, Error)]
194/// Error type returned when adding consumers to producers.
195pub enum AddConsumerError {
196    #[error("Consumer already added")]
197    /// Consumer has already been added to this producer.
198    AlreadyAdded,
199}
200
201impl StreamProducer {
202    /// Configure a consumer `appsrc` for later use in a `StreamProducer`
203    ///
204    /// This is automatically called when calling `add_consumer()`.
205    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
206        // Latency on the appsrc is set by the publisher before the first buffer
207        // and whenever it changes
208        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
209        consumer.set_format(gst::Format::Time);
210        consumer.set_is_live(true);
211        consumer.set_handle_segment_change(true);
212        consumer.set_max_buffers(0);
213        consumer.set_max_bytes(0);
214        consumer.set_max_time(500 * gst::ClockTime::MSECOND);
215        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
216        consumer.set_automatic_eos(false);
217    }
218
219    /// Add an appsrc to dispatch data to.
220    ///
221    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
222    pub fn add_consumer(
223        &self,
224        consumer: &gst_app::AppSrc,
225    ) -> Result<ConsumptionLink, AddConsumerError> {
226        let dropped = Arc::new(WrappedAtomicU64::new(0));
227        let pushed = Arc::new(WrappedAtomicU64::new(0));
228        let discard = Arc::new(atomic::AtomicBool::new(false));
229        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
230
231        self.add_consumer_internal(
232            consumer,
233            dropped.clone(),
234            pushed.clone(),
235            discard.clone(),
236            wait_for_keyframe.clone(),
237        )?;
238
239        Ok(ConsumptionLink {
240            consumer: consumer.clone(),
241            producer: Some(self.clone()),
242            dropped,
243            pushed,
244            discard,
245            wait_for_keyframe,
246        })
247    }
248
249    fn add_consumer_internal(
250        &self,
251        consumer: &gst_app::AppSrc,
252        dropped: Arc<WrappedAtomicU64>,
253        pushed: Arc<WrappedAtomicU64>,
254        discard: Arc<atomic::AtomicBool>,
255        wait_for_keyframe: Arc<atomic::AtomicBool>,
256    ) -> Result<(), AddConsumerError> {
257        let mut consumers = self.consumers.lock().unwrap();
258        if consumers.consumers.contains_key(consumer) {
259            gst::error!(
260                CAT,
261                obj = &self.appsink,
262                "Consumer {} ({:?}) already added",
263                consumer.name(),
264                consumer
265            );
266            return Err(AddConsumerError::AlreadyAdded);
267        }
268
269        gst::debug!(
270            CAT,
271            obj = &self.appsink,
272            "Adding consumer {} ({:?})",
273            consumer.name(),
274            consumer
275        );
276
277        Self::configure_consumer(consumer);
278
279        // Forward force-keyunit events upstream to the appsink
280        let srcpad = consumer.static_pad("src").unwrap();
281        let appsink = &self.appsink;
282        let fku_probe_id = srcpad
283            .add_probe(
284                gst::PadProbeType::EVENT_UPSTREAM,
285                glib::clone!(
286                    #[weak]
287                    appsink,
288                    #[upgrade_or_panic]
289                    move |_pad, info| {
290                        let Some(event) = info.event() else {
291                            return gst::PadProbeReturn::Ok;
292                        };
293
294                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
295                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
296                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
297                            let pad = appsink.static_pad("sink").unwrap();
298                            let _ = pad.push_event(event.clone());
299                        }
300
301                        gst::PadProbeReturn::Ok
302                    }
303                ),
304            )
305            .unwrap();
306
307        let stream_consumer = StreamConsumer::new(
308            consumer,
309            fku_probe_id,
310            dropped,
311            pushed,
312            discard,
313            wait_for_keyframe,
314        );
315
316        consumers
317            .consumers
318            .insert(consumer.clone(), stream_consumer);
319
320        // forward selected sticky events. We can send those now as appsrc will delay the events
321        // until stream-start, caps and segment are sent.
322        let events_to_forward = consumers.events_to_forward.clone();
323        // drop the lock before sending events
324        drop(consumers);
325
326        let appsink_pad = self.appsink.static_pad("sink").unwrap();
327        appsink_pad.sticky_events_foreach(|event| {
328            if events_to_forward.contains(&event.type_()) {
329                gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
330                consumer.send_event(event.clone());
331            }
332
333            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
334        });
335
336        Ok(())
337    }
338
339    fn process_sample(
340        sample: gst::Sample,
341        appsink: &gst_app::AppSink,
342        mut consumers: MutexGuard<StreamConsumers>,
343    ) -> Result<gst::FlowSuccess, gst::FlowError> {
344        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
345            let flags = buf.flags();
346
347            (
348                flags.contains(gst::BufferFlags::DISCONT),
349                !flags.contains(gst::BufferFlags::DELTA_UNIT),
350            )
351        } else {
352            (false, true)
353        };
354
355        gst::trace!(
356            CAT,
357            obj = appsink,
358            "processing sample {:?}",
359            sample.buffer()
360        );
361
362        let latency = consumers.current_latency;
363        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
364
365        let mut needs_keyframe_request = false;
366
367        let current_consumers = consumers
368            .consumers
369            .values()
370            .filter_map(|consumer| {
371                if let Some(latency) = latency {
372                    if consumer
373                        .forwarded_latency
374                        .compare_exchange(
375                            false,
376                            true,
377                            atomic::Ordering::SeqCst,
378                            atomic::Ordering::SeqCst,
379                        )
380                        .is_ok()
381                        || latency_updated
382                    {
383                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
384                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
385                    }
386                }
387
388                if consumer.discard.load(atomic::Ordering::SeqCst) {
389                    consumer
390                        .needs_keyframe
391                        .store(false, atomic::Ordering::SeqCst);
392                    return None;
393                }
394
395                if is_discont
396                    && !is_keyframe
397                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
398                {
399                    // Whenever we have a discontinuity, we need a new keyframe
400                    consumer
401                        .needs_keyframe
402                        .store(true, atomic::Ordering::SeqCst);
403                }
404
405                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
406                    // If we need a keyframe (and this one isn't) request a keyframe upstream
407                    if !needs_keyframe_request {
408                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
409                        needs_keyframe_request = true;
410                    }
411
412                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
413
414                    gst::error!(
415                        CAT,
416                        obj = appsink,
417                        "Ignoring frame for {} while waiting for a keyframe",
418                        consumer.appsrc.name()
419                    );
420                    None
421                } else {
422                    consumer
423                        .needs_keyframe
424                        .store(false, atomic::Ordering::SeqCst);
425                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
426
427                    Some(consumer.appsrc.clone())
428                }
429            })
430            .collect::<Vec<_>>();
431
432        drop(consumers);
433
434        if needs_keyframe_request {
435            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
436            let pad = appsink.static_pad("sink").unwrap();
437            pad.push_event(
438                gst_video::UpstreamForceKeyUnitEvent::builder()
439                    .all_headers(true)
440                    .build(),
441            );
442        }
443
444        for consumer in current_consumers {
445            if let Err(err) = consumer.push_sample(&sample) {
446                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
447            }
448        }
449        Ok(gst::FlowSuccess::Ok)
450    }
451
452    /// Remove a consumer appsrc by id
453    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
454        let name = consumer.name();
455        if self
456            .consumers
457            .lock()
458            .unwrap()
459            .consumers
460            .remove(consumer)
461            .is_some()
462        {
463            gst::debug!(
464                CAT,
465                obj = &self.appsink,
466                "Removed consumer {} ({:?})",
467                name,
468                consumer
469            );
470            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
471        } else {
472            gst::debug!(
473                CAT,
474                obj = &self.appsink,
475                "Consumer {} ({:?}) not found",
476                name,
477                consumer
478            );
479        }
480    }
481
482    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
483    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
484        self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
485    }
486
487    /// get event types the appsink should forward to all its consumers
488    pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
489        self.consumers.lock().unwrap().events_to_forward.clone()
490    }
491
492    /// configure whether the preroll sample should be forwarded (default: `true`)
493    pub fn set_forward_preroll(&self, forward_preroll: bool) {
494        self.consumers.lock().unwrap().forward_preroll = forward_preroll;
495    }
496
497    /// Get the GStreamer `appsink` wrapped by this producer
498    pub fn appsink(&self) -> &gst_app::AppSink {
499        &self.appsink
500    }
501
502    /// Signals an error on all consumers
503    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
504        let consumers = self.consumers.lock().unwrap();
505
506        for consumer in consumers.consumers.keys() {
507            let mut msg_builder =
508                gst::message::Error::builder_from_error(error.clone()).src(consumer);
509            if let Some(debug) = debug {
510                msg_builder = msg_builder.debug(debug);
511            }
512
513            let _ = consumer.post_message(msg_builder.build());
514        }
515    }
516
517    /// The last sample produced by this producer.
518    pub fn last_sample(&self) -> Option<gst::Sample> {
519        self.appsink.property("last-sample")
520    }
521}
522
523impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
524    fn from(appsink: &'a gst_app::AppSink) -> Self {
525        let consumers = Arc::new(Mutex::new(StreamConsumers {
526            current_latency: None,
527            latency_updated: false,
528            consumers: HashMap::new(),
529            // it would make sense to automatically forward more events such as Tag but that would break
530            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
531            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
532            forward_preroll: true,
533            just_forwarded_preroll: false,
534        }));
535
536        appsink.set_callbacks(
537            gst_app::AppSinkCallbacks::builder()
538                .new_sample(glib::clone!(
539                    #[strong]
540                    consumers,
541                    move |appsink| {
542                        let mut consumers = consumers.lock().unwrap();
543
544                        let sample = match appsink.pull_sample() {
545                            Ok(sample) => sample,
546                            Err(_err) => {
547                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
548                                return Err(gst::FlowError::Flushing);
549                            }
550                        };
551
552                        let just_forwarded_preroll =
553                            mem::replace(&mut consumers.just_forwarded_preroll, false);
554
555                        if just_forwarded_preroll {
556                            return Ok(gst::FlowSuccess::Ok);
557                        }
558
559                        StreamProducer::process_sample(sample, appsink, consumers)
560                    }
561                ))
562                .new_preroll(glib::clone!(
563                    #[strong]
564                    consumers,
565                    move |appsink| {
566                        let mut consumers = consumers.lock().unwrap();
567
568                        let sample = match appsink.pull_preroll() {
569                            Ok(sample) => sample,
570                            Err(_err) => {
571                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
572                                return Err(gst::FlowError::Flushing);
573                            }
574                        };
575
576                        if consumers.forward_preroll {
577                            consumers.just_forwarded_preroll = true;
578
579                            StreamProducer::process_sample(sample, appsink, consumers)
580                        } else {
581                            Ok(gst::FlowSuccess::Ok)
582                        }
583                    }
584                ))
585                .new_event(glib::clone!(
586                    #[strong]
587                    consumers,
588                    move |appsink| {
589                        match appsink
590                            .pull_object()
591                            .map(|obj| obj.downcast::<gst::Event>())
592                        {
593                            Ok(Ok(event)) => {
594                                let (events_to_forward, appsrcs) = {
595                                    // clone so we don't keep the lock while pushing events
596                                    let consumers = consumers.lock().unwrap();
597                                    let events = consumers.events_to_forward.clone();
598                                    let appsrcs =
599                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
600
601                                    (events, appsrcs)
602                                };
603
604                                if events_to_forward.contains(&event.type_()) {
605                                    for appsrc in appsrcs {
606                                        appsrc.send_event(event.clone());
607                                    }
608                                }
609                            }
610                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
611                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
612                        }
613
614                        false
615                    }
616                ))
617                .eos(glib::clone!(
618                    #[strong]
619                    consumers,
620                    move |appsink| {
621                        let stream_consumers = consumers.lock().unwrap();
622
623                        if stream_consumers
624                            .events_to_forward
625                            .contains(&gst::EventType::Eos)
626                        {
627                            let current_consumers = stream_consumers
628                                .consumers
629                                .values()
630                                .map(|c| c.appsrc.clone())
631                                .collect::<Vec<_>>();
632                            drop(stream_consumers);
633
634                            for consumer in current_consumers {
635                                gst::debug!(
636                                    CAT,
637                                    obj = appsink,
638                                    "set EOS on consumer {}",
639                                    consumer.name()
640                                );
641                                let _ = consumer.end_of_stream();
642                            }
643                        } else {
644                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
645                        }
646                    }
647                ))
648                .build(),
649        );
650
651        let sinkpad = appsink.static_pad("sink").unwrap();
652        sinkpad.add_probe(
653            gst::PadProbeType::EVENT_UPSTREAM,
654            glib::clone!(
655                #[strong]
656                consumers,
657                move |_pad, info| {
658                    let Some(event) = info.event() else {
659                        return gst::PadProbeReturn::Ok;
660                    };
661
662                    let gst::EventView::Latency(event) = event.view() else {
663                        return gst::PadProbeReturn::Ok;
664                    };
665
666                    let latency = event.latency();
667                    let mut consumers = consumers.lock().unwrap();
668                    consumers.current_latency = Some(latency);
669                    consumers.latency_updated = true;
670
671                    gst::PadProbeReturn::Ok
672                }
673            ),
674        );
675
676        StreamProducer {
677            appsink: appsink.clone(),
678            consumers,
679        }
680    }
681}
682
683/// Wrapper around a HashMap of consumers, exists for thread safety
684/// and also protects some of the producer state
685#[derive(Debug)]
686struct StreamConsumers {
687    /// The currently-observed latency
688    current_latency: Option<gst::ClockTime>,
689    /// Whether the consumers' appsrc latency needs updating
690    latency_updated: bool,
691    /// The consumers, AppSrc pointer value -> consumer
692    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
693    /// What events should be forwarded to consumers
694    events_to_forward: Vec<gst::EventType>,
695    /// Whether the preroll sample should be forwarded at all
696    forward_preroll: bool,
697    /// Whether we just forwarded the preroll sample. When we did we want to
698    /// discard the next sample from on_new_sample as it would cause us to
699    /// otherwise push out the same sample twice to consumers.
700    just_forwarded_preroll: bool,
701}
702
703/// Wrapper around a consumer's `appsrc`
704#[derive(Debug)]
705struct StreamConsumer {
706    /// The GStreamer `appsrc` of the consumer
707    appsrc: gst_app::AppSrc,
708    /// The id of a pad probe that intercepts force-key-unit events
709    fku_probe_id: Option<gst::PadProbeId>,
710    /// Whether an initial latency was forwarded to the `appsrc`
711    forwarded_latency: atomic::AtomicBool,
712    /// Whether a first buffer has made it through, used to determine
713    /// whether a new key unit should be requested. Only useful for encoded
714    /// streams.
715    needs_keyframe: Arc<atomic::AtomicBool>,
716    /// number of buffers dropped because `appsrc` internal queue was full
717    dropped: Arc<WrappedAtomicU64>,
718    /// number of buffers pushed through `appsrc`
719    pushed: Arc<WrappedAtomicU64>,
720    /// if buffers should not be pushed to the `appsrc` right now
721    discard: Arc<atomic::AtomicBool>,
722    /// whether the consumer should drop delta frames until next keyframe on discont
723    wait_for_keyframe: Arc<atomic::AtomicBool>,
724}
725
726impl StreamConsumer {
727    /// Create a new consumer
728    fn new(
729        appsrc: &gst_app::AppSrc,
730        fku_probe_id: gst::PadProbeId,
731        dropped: Arc<WrappedAtomicU64>,
732        pushed: Arc<WrappedAtomicU64>,
733        discard: Arc<atomic::AtomicBool>,
734        wait_for_keyframe: Arc<atomic::AtomicBool>,
735    ) -> Self {
736        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
737            wait_for_keyframe.load(atomic::Ordering::SeqCst),
738        ));
739        let needs_keyframe_clone = needs_keyframe.clone();
740        let wait_for_keyframe_clone = wait_for_keyframe.clone();
741        let dropped_clone = dropped.clone();
742
743        appsrc.set_callbacks(
744            gst_app::AppSrcCallbacks::builder()
745                .enough_data(move |appsrc| {
746                    gst::debug!(
747                        CAT,
748                        obj = appsrc,
749                        "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
750                        appsrc.name(),
751                        appsrc,
752                    );
753
754                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
755                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
756                })
757                .build(),
758        );
759
760        StreamConsumer {
761            appsrc: appsrc.clone(),
762            fku_probe_id: Some(fku_probe_id),
763            forwarded_latency: atomic::AtomicBool::new(false),
764            needs_keyframe,
765            dropped,
766            pushed,
767            discard,
768            wait_for_keyframe,
769        }
770    }
771}
772
773impl Drop for StreamConsumer {
774    fn drop(&mut self) {
775        if let Some(fku_probe_id) = self.fku_probe_id.take() {
776            let srcpad = self.appsrc.static_pad("src").unwrap();
777            srcpad.remove_probe(fku_probe_id);
778        }
779    }
780}
781
782impl PartialEq for StreamConsumer {
783    fn eq(&self, other: &Self) -> bool {
784        self.appsrc.eq(&other.appsrc)
785    }
786}
787
788impl Eq for StreamConsumer {}
789
790impl std::hash::Hash for StreamConsumer {
791    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
792        std::hash::Hash::hash(&self.appsrc, state);
793    }
794}
795
796impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
797    #[inline]
798    fn borrow(&self) -> &gst_app::AppSrc {
799        &self.appsrc
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use std::{
806        str::FromStr,
807        sync::{Arc, Mutex},
808    };
809
810    use futures::{
811        channel::{mpsc, mpsc::Receiver},
812        SinkExt, StreamExt,
813    };
814    use gst::prelude::*;
815
816    use crate::{ConsumptionLink, StreamProducer};
817
818    fn create_producer() -> (
819        gst::Pipeline,
820        gst_app::AppSrc,
821        gst_app::AppSink,
822        StreamProducer,
823    ) {
824        let producer_pipe =
825            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
826                .unwrap()
827                .downcast::<gst::Pipeline>()
828                .unwrap();
829        let producer_sink = producer_pipe
830            .by_name("producer_sink")
831            .unwrap()
832            .downcast::<gst_app::AppSink>()
833            .unwrap();
834
835        (
836            producer_pipe.clone(),
837            producer_pipe
838                .by_name("producer_src")
839                .unwrap()
840                .downcast::<gst_app::AppSrc>()
841                .unwrap(),
842            producer_sink.clone(),
843            StreamProducer::from(&producer_sink),
844        )
845    }
846
847    struct Consumer {
848        pipeline: gst::Pipeline,
849        src: gst_app::AppSrc,
850        sink: gst_app::AppSink,
851        receiver: Mutex<Receiver<gst::Sample>>,
852        connected: Mutex<bool>,
853    }
854
855    impl Consumer {
856        fn new(id: &str) -> Self {
857            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
858                .unwrap()
859                .downcast::<gst::Pipeline>()
860                .unwrap();
861
862            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
863            let sender = Arc::new(Mutex::new(sender));
864            let sink = pipeline
865                .by_name("sink")
866                .unwrap()
867                .downcast::<gst_app::AppSink>()
868                .unwrap();
869
870            sink.set_callbacks(
871                gst_app::AppSinkCallbacks::builder()
872                    // Add a handler to the "new-sample" signal.
873                    .new_sample(move |appsink| {
874                        // Pull the sample in question out of the appsink's buffer.
875                        let sender_clone = sender.clone();
876                        futures::executor::block_on(
877                            sender_clone
878                                .lock()
879                                .unwrap()
880                                .send(appsink.pull_sample().unwrap()),
881                        )
882                        .unwrap();
883
884                        Ok(gst::FlowSuccess::Ok)
885                    })
886                    .build(),
887            );
888
889            Self {
890                pipeline: pipeline.clone(),
891                src: pipeline
892                    .by_name(id)
893                    .unwrap()
894                    .downcast::<gst_app::AppSrc>()
895                    .unwrap(),
896                sink,
897                receiver: Mutex::new(receiver),
898                connected: Mutex::new(false),
899            }
900        }
901
902        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
903            {
904                let mut connected = self.connected.lock().unwrap();
905                *connected = true;
906            }
907
908            producer.add_consumer(&self.src).unwrap()
909        }
910
911        fn disconnect(&self, producer: &StreamProducer) {
912            {
913                let mut connected = self.connected.lock().unwrap();
914                *connected = false;
915            }
916
917            producer.remove_consumer(&self.src);
918        }
919    }
920
921    #[test]
922    fn simple() {
923        gst::init().unwrap();
924
925        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
926        producer_pipe
927            .set_state(gst::State::Playing)
928            .expect("Couldn't set producer pipeline state");
929
930        let mut consumers: Vec<Consumer> = Vec::new();
931        let consumer = Consumer::new("consumer1");
932        let link1 = consumer.connect(&producer);
933        consumer
934            .pipeline
935            .set_state(gst::State::Playing)
936            .expect("Couldn't set producer pipeline state");
937        consumers.push(consumer);
938
939        let consumer = Consumer::new("consumer2");
940        let link2 = consumer.connect(&producer);
941        consumer
942            .pipeline
943            .set_state(gst::State::Playing)
944            .expect("Couldn't set producer pipeline state");
945        consumers.push(consumer);
946
947        assert!(producer.last_sample().is_none());
948
949        for i in 0..10 {
950            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
951            producer_src.set_caps(Some(&caps));
952            producer_src.push_buffer(gst::Buffer::new()).unwrap();
953
954            for consumer in &consumers {
955                if *consumer.connected.lock().unwrap() {
956                    let sample =
957                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
958                            .expect("Received an empty buffer?");
959                    sample.buffer().expect("No buffer on the sample?");
960                    assert_eq!(sample.caps(), Some(caps.as_ref()));
961                } else {
962                    debug_assert!(
963                        consumer
964                            .sink
965                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
966                            .is_none(),
967                        "Disconnected consumer got a new sample?!"
968                    );
969                }
970            }
971
972            if i == 5 {
973                consumers.first().unwrap().disconnect(&producer);
974            }
975        }
976
977        assert!(producer.last_sample().is_some());
978
979        assert_eq!(link1.pushed(), 6);
980        assert_eq!(link1.dropped(), 0);
981        assert_eq!(link2.pushed(), 10);
982        assert_eq!(link2.dropped(), 0);
983    }
984}