Skip to main content

gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{Arc, Mutex, MutexGuard, atomic},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11pub const DEFAULT_PRODUCER_SYNC: bool = true;
12
13pub const DEFAULT_CONSUMER_MAX_BUFFERS: u64 = 0;
14pub const DEFAULT_CONSUMER_MAX_BYTES: gst::format::Bytes = gst::format::Bytes::ZERO;
15pub const DEFAULT_CONSUMER_MAX_TIME: gst::ClockTime = gst::ClockTime::from_mseconds(500);
16
17// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
18// operations where supported, and fallback to a mutex where it is not. The wrapper methods
19// are the ones that are needed, and not all are exposed.
20#[derive(Debug)]
21struct WrappedAtomicU64 {
22    #[cfg(not(target_has_atomic = "64"))]
23    atomic: Mutex<u64>,
24    #[cfg(target_has_atomic = "64")]
25    atomic: atomic::AtomicU64,
26}
27
28#[cfg(target_has_atomic = "64")]
29impl WrappedAtomicU64 {
30    fn new(value: u64) -> WrappedAtomicU64 {
31        WrappedAtomicU64 {
32            atomic: atomic::AtomicU64::new(value),
33        }
34    }
35    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
36        self.atomic.fetch_add(value, order)
37    }
38    fn store(&self, value: u64, order: atomic::Ordering) {
39        self.atomic.store(value, order);
40    }
41
42    fn load(&self, order: atomic::Ordering) -> u64 {
43        self.atomic.load(order)
44    }
45}
46
47#[cfg(not(target_has_atomic = "64"))]
48impl WrappedAtomicU64 {
49    fn new(value: u64) -> WrappedAtomicU64 {
50        WrappedAtomicU64 {
51            atomic: Mutex::new(value),
52        }
53    }
54    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
55        let mut guard = self.atomic.lock().unwrap();
56        let old = *guard;
57        *guard += value;
58        old
59    }
60    fn store(&self, value: u64, _order: atomic::Ordering) {
61        *self.atomic.lock().unwrap() = value;
62    }
63    fn load(&self, _order: atomic::Ordering) -> u64 {
64        *self.atomic.lock().unwrap()
65    }
66}
67
68static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
69    gst::DebugCategory::new(
70        "utilsrs-stream-producer",
71        gst::DebugColorFlags::empty(),
72        Some("gst_app Stream Producer interface"),
73    )
74});
75
76/// The interface for transporting media data from one node
77/// to another.
78///
79/// A producer is essentially a GStreamer `appsink` whose output
80/// is sent to a set of consumers, who are essentially `appsrc` wrappers
81#[derive(Debug, Clone)]
82pub struct StreamProducer(Arc<StreamProducerInner>);
83
84impl PartialEq for StreamProducer {
85    fn eq(&self, other: &Self) -> bool {
86        self.0.appsink.eq(&other.0.appsink)
87    }
88}
89
90impl Eq for StreamProducer {}
91
92#[derive(Debug)]
93struct StreamProducerInner {
94    /// The appsink to dispatch data for
95    appsink: gst_app::AppSink,
96    /// The pad probe on the appsink=
97    appsink_probe_id: Option<gst::PadProbeId>,
98    /// The consumers to dispatch data to
99    consumers: Arc<Mutex<StreamConsumers>>,
100}
101
102impl Drop for StreamProducerInner {
103    fn drop(&mut self) {
104        if let Some(probe_id) = self.appsink_probe_id.take() {
105            let pad = self.appsink.static_pad("sink").unwrap();
106            pad.remove_probe(probe_id);
107        }
108
109        self.appsink
110            .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
111    }
112}
113
114/// User defined producer settings
115///
116/// Defaults to:
117///
118/// * `sync` <- `true` (sync on the clock)
119///
120/// Use `ConsumerSettings::builder()` if you need different values.
121#[derive(Debug, Clone, Eq, PartialEq, Hash)]
122pub struct ProducerSettings {
123    pub sync: bool,
124}
125
126impl Default for ProducerSettings {
127    fn default() -> Self {
128        ProducerSettings {
129            sync: DEFAULT_PRODUCER_SYNC,
130        }
131    }
132}
133
134/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
135/// The producer and consumer will stay alive while the link is.
136#[derive(Debug)]
137#[must_use]
138pub struct ConsumptionLink {
139    consumer: gst_app::AppSrc,
140    settings: ConsumerSettings,
141    producer: Option<StreamProducer>,
142    /// number of buffers dropped because `consumer` internal queue was full
143    dropped: Arc<WrappedAtomicU64>,
144    /// number of buffers pushed through `consumer`
145    pushed: Arc<WrappedAtomicU64>,
146    /// if buffers should not be pushed to the `consumer` right now
147    discard: Arc<atomic::AtomicBool>,
148    /// whether the link will drop delta frames until next keyframe on discont
149    wait_for_keyframe: Arc<atomic::AtomicBool>,
150}
151
152impl ConsumptionLink {
153    /// Create a new disconnected `ConsumptionLink`.
154    ///
155    /// The consumer will use the default configuration (see [StreamProducer::configure_consumer]).
156    /// If you need different settings, call [ConsumptionLink::disconnected_with] instead.
157    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
158        StreamProducer::configure_consumer(&consumer);
159
160        ConsumptionLink {
161            consumer,
162            settings: ConsumerSettings::default(),
163            producer: None,
164            dropped: Arc::new(WrappedAtomicU64::new(0)),
165            pushed: Arc::new(WrappedAtomicU64::new(0)),
166            discard: Arc::new(atomic::AtomicBool::new(false)),
167            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
168        }
169    }
170
171    /// Create a new disconnected `ConsumptionLink`.
172    pub fn disconnected_with(
173        consumer: gst_app::AppSrc,
174        settings: ConsumerSettings,
175    ) -> ConsumptionLink {
176        StreamProducer::configure_consumer(&consumer);
177
178        ConsumptionLink {
179            consumer,
180            settings,
181            producer: None,
182            dropped: Arc::new(WrappedAtomicU64::new(0)),
183            pushed: Arc::new(WrappedAtomicU64::new(0)),
184            discard: Arc::new(atomic::AtomicBool::new(false)),
185            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
186        }
187    }
188
189    /// Replace the producer by a new one, keeping the existing consumer.
190    pub fn change_producer(
191        &mut self,
192        new_producer: &StreamProducer,
193        reset_stats: bool,
194    ) -> Result<(), AddConsumerError> {
195        self.disconnect();
196        if reset_stats {
197            self.dropped.store(0, atomic::Ordering::SeqCst);
198            self.pushed.store(0, atomic::Ordering::SeqCst);
199        }
200        new_producer.add_consumer_internal(
201            &self.consumer,
202            self.settings.clone(),
203            self.dropped.clone(),
204            self.pushed.clone(),
205            self.discard.clone(),
206            self.wait_for_keyframe.clone(),
207        )?;
208        self.producer = Some(new_producer.clone());
209        Ok(())
210    }
211
212    /// Disconnect the consumer from the producer
213    pub fn disconnect(&mut self) {
214        if let Some(producer) = self.producer.take() {
215            producer.remove_consumer(&self.consumer);
216        }
217    }
218
219    /// number of dropped buffers because the consumer internal queue was full
220    pub fn dropped(&self) -> u64 {
221        self.dropped.load(atomic::Ordering::SeqCst)
222    }
223
224    /// number of buffers pushed through this link
225    pub fn pushed(&self) -> u64 {
226        self.pushed.load(atomic::Ordering::SeqCst)
227    }
228
229    /// if buffers are currently pushed through this link
230    pub fn discard(&self) -> bool {
231        self.discard.load(atomic::Ordering::SeqCst)
232    }
233
234    /// If set to `true` then no buffers will be pushed through this link
235    pub fn set_discard(&self, discard: bool) {
236        self.discard.store(discard, atomic::Ordering::SeqCst)
237    }
238
239    /// if the link will drop frames until the next keyframe on discont
240    pub fn wait_for_keyframe(&self) -> bool {
241        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
242    }
243
244    /// If set to `true` then the link will drop delta-frames until the next
245    /// keyframe on discont (default behavior).
246    pub fn set_wait_for_keyframe(&self, wait: bool) {
247        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
248    }
249
250    /// Get the GStreamer `appsrc` wrapped by this link
251    pub fn appsrc(&self) -> &gst_app::AppSrc {
252        &self.consumer
253    }
254
255    /// Get the `StreamProducer` currently by this link, if any.
256    pub fn stream_producer(&self) -> Option<&StreamProducer> {
257        self.producer.as_ref()
258    }
259
260    /// Get the settings for this Consumer.
261    pub fn settings(&self) -> ConsumerSettings {
262        self.settings.clone()
263    }
264}
265
266impl Drop for ConsumptionLink {
267    fn drop(&mut self) {
268        self.disconnect();
269    }
270}
271
272/// User defined consumer settings
273///
274/// Defaults to:
275///
276/// * `max-buffers` <- `0` (unlimited)
277/// * `max-bytes` <- `0` (unlimited)
278/// * `max-time` <- `500ms`
279/// * `event-types` <- `gst::UpstreamForceKeyUnitEvent`
280///   Note: force-key-unit events are always forwarded in addition to any
281///   of the `gst::EventType` set through this property
282///
283/// Use `ConsumerSettings::builder()` if you need different values.
284#[derive(Debug, Clone, Eq, PartialEq, Hash)]
285pub struct ConsumerSettings {
286    pub max_buffer: u64,
287    pub max_bytes: gst::format::Bytes,
288    pub max_time: gst::ClockTime,
289    pub event_types: Vec<gst::EventType>,
290}
291
292impl Default for ConsumerSettings {
293    fn default() -> Self {
294        ConsumerSettings {
295            max_buffer: DEFAULT_CONSUMER_MAX_BUFFERS,
296            max_bytes: DEFAULT_CONSUMER_MAX_BYTES,
297            max_time: DEFAULT_CONSUMER_MAX_TIME,
298            event_types: Vec::new(),
299        }
300    }
301}
302
303#[derive(Debug, Error)]
304/// Error type returned when adding consumers to producers.
305pub enum AddConsumerError {
306    #[error("Consumer already added")]
307    /// Consumer has already been added to this producer.
308    AlreadyAdded,
309}
310
311impl StreamProducer {
312    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
313    ///
314    /// This function configures the `appsrc` in a suitable state to act as a consumer
315    /// and also sets the internal queue properties as follows:
316    ///
317    /// * `max-buffers` <- `0` (unlimited)
318    /// * `max-bytes` <- `0` (unlimited)
319    /// * `max-time` <- `500ms`
320    ///
321    /// If you need different settings, call [`StreamProducer::configure_consumer_with`] instead.
322    ///
323    /// This is automatically invoked when calling [`StreamProducer::add_consumer`].
324    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
325        Self::configure_consumer_with(consumer, ConsumerSettings::default());
326    }
327
328    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
329    ///
330    /// This function configures the `appsrc` in a suitable state to act as a consumer
331    /// and applies the provided settings.
332    ///
333    /// If unsure, call [`StreamProducer::configure_consumer`] instead.
334    pub fn configure_consumer_with(consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
335        // Latency on the appsrc is set by the publisher before the first buffer
336        // and whenever it changes
337        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
338        consumer.set_format(gst::Format::Time);
339        consumer.set_is_live(true);
340        consumer.set_handle_segment_change(true);
341        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
342        consumer.set_automatic_eos(false);
343
344        consumer.set_max_buffers(settings.max_buffer);
345        consumer.set_max_bytes(settings.max_bytes.into());
346        consumer.set_max_time(settings.max_time);
347    }
348
349    /// Adds an `appsrc` to dispatch data to.
350    ///
351    /// This function configures the `appsrc` in a suitable state to act as a consumer
352    /// and also sets the internal queue properties as follows:
353    ///
354    /// * `max-buffers` <- `0` (unlimited)
355    /// * `max-bytes` <- `0` (unlimited)
356    /// * `max-time` <- `500ms`
357    ///
358    /// If you need different values, call [`StreamProducer::add_consumer_with`] instead.
359    ///
360    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
361    pub fn add_consumer(
362        &self,
363        consumer: &gst_app::AppSrc,
364    ) -> Result<ConsumptionLink, AddConsumerError> {
365        let dropped = Arc::new(WrappedAtomicU64::new(0));
366        let pushed = Arc::new(WrappedAtomicU64::new(0));
367        let discard = Arc::new(atomic::AtomicBool::new(false));
368        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
369
370        self.add_consumer_internal(
371            consumer,
372            ConsumerSettings::default(),
373            dropped.clone(),
374            pushed.clone(),
375            discard.clone(),
376            wait_for_keyframe.clone(),
377        )?;
378
379        Ok(ConsumptionLink {
380            consumer: consumer.clone(),
381            settings: ConsumerSettings::default(),
382            producer: Some(self.clone()),
383            dropped,
384            pushed,
385            discard,
386            wait_for_keyframe,
387        })
388    }
389
390    /// Adds a configured `appsrc` to dispatch data to.
391    ///
392    /// This function configures the `appsrc` in a suitable state to act as a consumer
393    /// and applies the provided settings.
394    ///
395    /// If unsure, call [`StreamProducer::add_consumer`] instead.
396    ///
397    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
398    pub fn add_consumer_with(
399        &self,
400        consumer: &gst_app::AppSrc,
401        settings: ConsumerSettings,
402    ) -> Result<ConsumptionLink, AddConsumerError> {
403        let dropped = Arc::new(WrappedAtomicU64::new(0));
404        let pushed = Arc::new(WrappedAtomicU64::new(0));
405        let discard = Arc::new(atomic::AtomicBool::new(false));
406        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
407
408        self.add_consumer_internal(
409            consumer,
410            settings.clone(),
411            dropped.clone(),
412            pushed.clone(),
413            discard.clone(),
414            wait_for_keyframe.clone(),
415        )?;
416
417        Ok(ConsumptionLink {
418            consumer: consumer.clone(),
419            settings,
420            producer: Some(self.clone()),
421            dropped,
422            pushed,
423            discard,
424            wait_for_keyframe,
425        })
426    }
427
428    fn add_consumer_internal(
429        &self,
430        consumer: &gst_app::AppSrc,
431        settings: ConsumerSettings,
432        dropped: Arc<WrappedAtomicU64>,
433        pushed: Arc<WrappedAtomicU64>,
434        discard: Arc<atomic::AtomicBool>,
435        wait_for_keyframe: Arc<atomic::AtomicBool>,
436    ) -> Result<(), AddConsumerError> {
437        let mut consumers = self.0.consumers.lock().unwrap();
438        if consumers.consumers.contains_key(consumer) {
439            gst::error!(
440                CAT,
441                obj = &self.0.appsink,
442                "Consumer {} ({:?}) already added",
443                consumer.name(),
444                consumer
445            );
446            return Err(AddConsumerError::AlreadyAdded);
447        }
448
449        gst::debug!(
450            CAT,
451            obj = &self.0.appsink,
452            "Adding consumer {} ({:?})",
453            consumer.name(),
454            consumer
455        );
456
457        let settings_clone = settings.clone();
458        Self::configure_consumer_with(consumer, settings);
459
460        // Forward force-keyunit events upstream to the appsink
461        let srcpad = consumer.static_pad("src").unwrap();
462        let fku_probe_id = srcpad
463            .add_probe(
464                gst::PadProbeType::EVENT_UPSTREAM,
465                glib::clone!(
466                    #[weak(rename_to = appsink)]
467                    self.0.appsink,
468                    #[upgrade_or_panic]
469                    move |_pad, info| {
470                        let Some(event) = info.event() else {
471                            return gst::PadProbeReturn::Ok;
472                        };
473
474                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok()
475                            || settings_clone.event_types.contains(&event.type_())
476                        {
477                            if gst_video::ForceKeyUnitEvent::is(event) {
478                                gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
479                            } else {
480                                gst::debug!(
481                                    CAT,
482                                    obj = &appsink,
483                                    "pushing upstream event {:?}",
484                                    event
485                                );
486                            }
487                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
488                            let pad = appsink.static_pad("sink").unwrap();
489                            let _ = pad.push_event(event.clone());
490                        }
491
492                        gst::PadProbeReturn::Ok
493                    }
494                ),
495            )
496            .unwrap();
497
498        let stream_consumer = StreamConsumer::new(
499            consumer,
500            fku_probe_id,
501            dropped,
502            pushed,
503            discard,
504            wait_for_keyframe,
505        );
506
507        consumers
508            .consumers
509            .insert(consumer.clone(), stream_consumer);
510
511        // forward selected sticky events. We can send those now as appsrc will delay the events
512        // until stream-start, caps and segment are sent.
513        let events_to_forward = consumers.events_to_forward.clone();
514        // drop the lock before sending events
515        drop(consumers);
516
517        let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
518        appsink_pad.sticky_events_foreach(|event| {
519            if events_to_forward.contains(&event.type_()) {
520                gst::debug!(
521                    CAT,
522                    obj = &self.0.appsink,
523                    "forward sticky event {:?}",
524                    event
525                );
526                consumer.send_event(event.clone());
527            }
528
529            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
530        });
531
532        Ok(())
533    }
534
535    fn process_sample(
536        sample: gst::Sample,
537        appsink: &gst_app::AppSink,
538        mut consumers: MutexGuard<StreamConsumers>,
539    ) -> Result<gst::FlowSuccess, gst::FlowError> {
540        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
541            let flags = buf.flags();
542
543            (
544                flags.contains(gst::BufferFlags::DISCONT),
545                !flags.contains(gst::BufferFlags::DELTA_UNIT),
546            )
547        } else {
548            (false, true)
549        };
550
551        gst::trace!(
552            CAT,
553            obj = appsink,
554            "processing sample {:?}",
555            sample.buffer()
556        );
557
558        let latency = consumers.current_latency;
559        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
560
561        let mut needs_keyframe_request = false;
562
563        let current_consumers = consumers
564            .consumers
565            .values()
566            .filter_map(|consumer| {
567                if let Some(latency) = latency
568                    && (consumer
569                        .forwarded_latency
570                        .compare_exchange(
571                            false,
572                            true,
573                            atomic::Ordering::SeqCst,
574                            atomic::Ordering::SeqCst,
575                        )
576                        .is_ok()
577                        || latency_updated)
578                {
579                    gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
580                    consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
581                }
582
583                if consumer.discard.load(atomic::Ordering::SeqCst) {
584                    consumer
585                        .needs_keyframe
586                        .store(true, atomic::Ordering::SeqCst);
587                    return None;
588                }
589
590                if is_discont
591                    && !is_keyframe
592                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
593                {
594                    // Whenever we have a discontinuity, we need a new keyframe
595                    consumer
596                        .needs_keyframe
597                        .store(true, atomic::Ordering::SeqCst);
598                }
599
600                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
601                    // If we need a keyframe (and this one isn't) request a keyframe upstream
602                    if !needs_keyframe_request {
603                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
604                        needs_keyframe_request = true;
605                    }
606
607                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
608
609                    gst::error!(
610                        CAT,
611                        obj = appsink,
612                        "Ignoring frame for {} while waiting for a keyframe",
613                        consumer.appsrc.name()
614                    );
615                    None
616                } else {
617                    consumer
618                        .needs_keyframe
619                        .store(false, atomic::Ordering::SeqCst);
620                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
621
622                    Some(consumer.appsrc.clone())
623                }
624            })
625            .collect::<Vec<_>>();
626
627        drop(consumers);
628
629        if needs_keyframe_request {
630            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
631            let pad = appsink.static_pad("sink").unwrap();
632            pad.push_event(
633                gst_video::UpstreamForceKeyUnitEvent::builder()
634                    .all_headers(true)
635                    .build(),
636            );
637        }
638
639        for consumer in current_consumers {
640            if let Err(err) = consumer.push_sample(&sample) {
641                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
642            }
643        }
644        Ok(gst::FlowSuccess::Ok)
645    }
646
647    /// Remove a consumer appsrc by id
648    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
649        let name = consumer.name();
650        if self
651            .0
652            .consumers
653            .lock()
654            .unwrap()
655            .consumers
656            .remove(consumer)
657            .is_some()
658        {
659            gst::debug!(
660                CAT,
661                obj = &self.0.appsink,
662                "Removed consumer {} ({:?})",
663                name,
664                consumer
665            );
666            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
667        } else {
668            gst::debug!(
669                CAT,
670                obj = &self.0.appsink,
671                "Consumer {} ({:?}) not found",
672                name,
673                consumer
674            );
675        }
676    }
677
678    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
679    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
680        self.0.consumers.lock().unwrap().events_to_forward =
681            events_to_forward.into_iter().collect();
682    }
683
684    /// get event types the appsink should forward to all its consumers
685    pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
686        self.0.consumers.lock().unwrap().events_to_forward.clone()
687    }
688
689    /// configure whether the preroll sample should be forwarded (default: `true`)
690    pub fn set_forward_preroll(&self, forward_preroll: bool) {
691        self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
692    }
693
694    /// Get the GStreamer `appsink` wrapped by this producer
695    pub fn appsink(&self) -> &gst_app::AppSink {
696        &self.0.appsink
697    }
698
699    /// Signals an error on all consumers
700    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
701        let consumers = self.0.consumers.lock().unwrap();
702
703        for consumer in consumers.consumers.keys() {
704            let mut msg_builder =
705                gst::message::Error::builder_from_error(error.clone()).src(consumer);
706            if let Some(debug) = debug {
707                msg_builder = msg_builder.debug(debug);
708            }
709
710            let _ = consumer.post_message(msg_builder.build());
711        }
712    }
713
714    /// The last sample produced by this producer.
715    pub fn last_sample(&self) -> Option<gst::Sample> {
716        self.0.appsink.property("last-sample")
717    }
718}
719
720impl StreamProducer {
721    /// Adds an `appsink` to dispatch data from.
722    ///
723    /// This function configures the `appsink` in a suitable state to act as a producer
724    /// and also sets the properties as follows:
725    ///
726    /// * `sync` <- `true` (sync on the clock)
727    ///
728    /// If you need a different value, use [`StreamProducer::with`] instead.
729    pub fn from(appsink: &gst_app::AppSink) -> Self {
730        Self::with(appsink, ProducerSettings::default())
731    }
732
733    /// Adds an `appsink` to dispatch data from.
734    ///
735    /// This function configures the `appsink` in a suitable state to act as a producer
736    /// and applies the provided settings.
737    ///
738    /// If unsure, use [`StreamProducer::from`] instead.
739    pub fn with(appsink: &gst_app::AppSink, settings: ProducerSettings) -> Self {
740        let consumers = Arc::new(Mutex::new(StreamConsumers {
741            current_latency: None,
742            latency_updated: false,
743            consumers: HashMap::new(),
744            // it would make sense to automatically forward more events such as Tag but that would break
745            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
746            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
747            forward_preroll: true,
748            just_forwarded_preroll: false,
749        }));
750
751        appsink.set_sync(settings.sync);
752
753        appsink.set_callbacks(
754            gst_app::AppSinkCallbacks::builder()
755                .new_sample(glib::clone!(
756                    #[strong]
757                    consumers,
758                    move |appsink| {
759                        let mut consumers = consumers.lock().unwrap();
760
761                        let sample = match appsink.pull_sample() {
762                            Ok(sample) => sample,
763                            Err(_err) => {
764                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
765                                return Err(gst::FlowError::Flushing);
766                            }
767                        };
768
769                        let just_forwarded_preroll =
770                            mem::replace(&mut consumers.just_forwarded_preroll, false);
771
772                        if just_forwarded_preroll {
773                            return Ok(gst::FlowSuccess::Ok);
774                        }
775
776                        StreamProducer::process_sample(sample, appsink, consumers)
777                    }
778                ))
779                .new_preroll(glib::clone!(
780                    #[strong]
781                    consumers,
782                    move |appsink| {
783                        let mut consumers = consumers.lock().unwrap();
784
785                        let sample = match appsink.pull_preroll() {
786                            Ok(sample) => sample,
787                            Err(_err) => {
788                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
789                                return Err(gst::FlowError::Flushing);
790                            }
791                        };
792
793                        if consumers.forward_preroll {
794                            consumers.just_forwarded_preroll = true;
795
796                            StreamProducer::process_sample(sample, appsink, consumers)
797                        } else {
798                            Ok(gst::FlowSuccess::Ok)
799                        }
800                    }
801                ))
802                .new_event(glib::clone!(
803                    #[strong]
804                    consumers,
805                    move |appsink| {
806                        match appsink
807                            .pull_object()
808                            .map(|obj| obj.downcast::<gst::Event>())
809                        {
810                            Ok(Ok(event)) => {
811                                let (events_to_forward, appsrcs) = {
812                                    // clone so we don't keep the lock while pushing events
813                                    let consumers = consumers.lock().unwrap();
814                                    let events = consumers.events_to_forward.clone();
815                                    let appsrcs =
816                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
817
818                                    (events, appsrcs)
819                                };
820
821                                if events_to_forward.contains(&event.type_()) {
822                                    for appsrc in appsrcs {
823                                        appsrc.send_event(event.clone());
824                                    }
825                                }
826                            }
827                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
828                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
829                        }
830
831                        false
832                    }
833                ))
834                .eos(glib::clone!(
835                    #[strong]
836                    consumers,
837                    move |appsink| {
838                        let stream_consumers = consumers.lock().unwrap();
839
840                        if stream_consumers
841                            .events_to_forward
842                            .contains(&gst::EventType::Eos)
843                        {
844                            let current_consumers = stream_consumers
845                                .consumers
846                                .values()
847                                .map(|c| c.appsrc.clone())
848                                .collect::<Vec<_>>();
849                            drop(stream_consumers);
850
851                            for consumer in current_consumers {
852                                gst::debug!(
853                                    CAT,
854                                    obj = appsink,
855                                    "set EOS on consumer {}",
856                                    consumer.name()
857                                );
858                                let _ = consumer.end_of_stream();
859                            }
860                        } else {
861                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
862                        }
863                    }
864                ))
865                .build(),
866        );
867
868        let sinkpad = appsink.static_pad("sink").unwrap();
869        let appsink_probe_id = if settings.sync {
870            // If appsink syncs on the clock,
871            // we need to propagate the latency consolidated by the producer's pipeline,
872            // which uses the max latency among all the branches. This value can be
873            // captured by observing the Latency event posted by the pipeline
874            // upon latency recalculation.
875            sinkpad
876                .add_probe(
877                    gst::PadProbeType::EVENT_UPSTREAM,
878                    glib::clone!(
879                        #[strong]
880                        consumers,
881                        move |_pad, info| {
882                            let Some(event) = info.event() else {
883                                unreachable!();
884                            };
885                            let gst::EventView::Latency(event) = event.view() else {
886                                return gst::PadProbeReturn::Ok;
887                            };
888
889                            let mut consumers = consumers.lock().unwrap();
890                            consumers.current_latency = Some(event.latency());
891                            consumers.latency_updated = true;
892
893                            gst::PadProbeReturn::Ok
894                        }
895                    ),
896                )
897                .unwrap()
898        } else {
899            // If appsink doesn't sync on the clock,
900            // only the latency from current branch needs to be considered.
901            // In this case, the Latency event doesn't take into account current branch.
902            // We can still capture current branch's latency by observing the result
903            // of the Latency query set by upstream elements.
904            //
905            // For reference:
906            //
907            // * BaseSink reports being non-live if the sink doesn't sync on the clock:
908            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/libs/gst/base/gstbasesink.c#L1297
909            // * Bin-level consolidation skips non-live branches:
910            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/gst/gstbin.c#L4166
911            sinkpad
912                .add_probe(
913                    gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
914                    glib::clone!(
915                        #[strong]
916                        consumers,
917                        move |_pad, info| {
918                            let Some(query) = info.query() else {
919                                unreachable!();
920                            };
921                            let gst::QueryView::Latency(query) = query.view() else {
922                                return gst::PadProbeReturn::Ok;
923                            };
924
925                            let mut consumers = consumers.lock().unwrap();
926                            consumers.current_latency = Some(query.result().1);
927                            consumers.latency_updated = true;
928
929                            gst::PadProbeReturn::Ok
930                        }
931                    ),
932                )
933                .unwrap()
934        };
935
936        StreamProducer(Arc::new(StreamProducerInner {
937            appsink: appsink.clone(),
938            appsink_probe_id: Some(appsink_probe_id),
939            consumers,
940        }))
941    }
942}
943
944/// Wrapper around a HashMap of consumers, exists for thread safety
945/// and also protects some of the producer state
946#[derive(Debug)]
947struct StreamConsumers {
948    /// The currently-observed latency
949    current_latency: Option<gst::ClockTime>,
950    /// Whether the consumers' appsrc latency needs updating
951    latency_updated: bool,
952    /// The consumers, AppSrc pointer value -> consumer
953    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
954    /// What events should be forwarded to consumers
955    events_to_forward: Vec<gst::EventType>,
956    /// Whether the preroll sample should be forwarded at all
957    forward_preroll: bool,
958    /// Whether we just forwarded the preroll sample. When we did we want to
959    /// discard the next sample from on_new_sample as it would cause us to
960    /// otherwise push out the same sample twice to consumers.
961    just_forwarded_preroll: bool,
962}
963
964/// Wrapper around a consumer's `appsrc`
965#[derive(Debug)]
966struct StreamConsumer {
967    /// The GStreamer `appsrc` of the consumer
968    appsrc: gst_app::AppSrc,
969    /// The id of a pad probe that intercepts force-key-unit events
970    fku_probe_id: Option<gst::PadProbeId>,
971    /// Whether an initial latency was forwarded to the `appsrc`
972    forwarded_latency: atomic::AtomicBool,
973    /// Whether a first buffer has made it through, used to determine
974    /// whether a new key unit should be requested. Only useful for encoded
975    /// streams.
976    needs_keyframe: Arc<atomic::AtomicBool>,
977    /// number of buffers dropped because `appsrc` internal queue was full
978    dropped: Arc<WrappedAtomicU64>,
979    /// number of buffers pushed through `appsrc`
980    pushed: Arc<WrappedAtomicU64>,
981    /// if buffers should not be pushed to the `appsrc` right now
982    discard: Arc<atomic::AtomicBool>,
983    /// whether the consumer should drop delta frames until next keyframe on discont
984    wait_for_keyframe: Arc<atomic::AtomicBool>,
985}
986
987impl StreamConsumer {
988    /// Create a new consumer
989    fn new(
990        appsrc: &gst_app::AppSrc,
991        fku_probe_id: gst::PadProbeId,
992        dropped: Arc<WrappedAtomicU64>,
993        pushed: Arc<WrappedAtomicU64>,
994        discard: Arc<atomic::AtomicBool>,
995        wait_for_keyframe: Arc<atomic::AtomicBool>,
996    ) -> Self {
997        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
998            wait_for_keyframe.load(atomic::Ordering::SeqCst),
999        ));
1000        let needs_keyframe_clone = needs_keyframe.clone();
1001        let wait_for_keyframe_clone = wait_for_keyframe.clone();
1002        let dropped_clone = dropped.clone();
1003
1004        appsrc.set_callbacks(
1005            gst_app::AppSrcCallbacks::builder()
1006                .enough_data(move |appsrc| {
1007                    gst::debug!(
1008                        CAT,
1009                        obj = appsrc,
1010                        "consumer {} ({appsrc:?}) is not consuming fast enough, old samples are getting dropped",
1011                        appsrc.name(),
1012                    );
1013
1014                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
1015                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
1016
1017                    let _  = appsrc.post_message(gst::message::Element::builder(
1018                        gst::Structure::new_empty("dropped-buffer")).src(appsrc).build()
1019                    );
1020                })
1021                .build(),
1022        );
1023
1024        StreamConsumer {
1025            appsrc: appsrc.clone(),
1026            fku_probe_id: Some(fku_probe_id),
1027            forwarded_latency: atomic::AtomicBool::new(false),
1028            needs_keyframe,
1029            dropped,
1030            pushed,
1031            discard,
1032            wait_for_keyframe,
1033        }
1034    }
1035}
1036
1037impl Drop for StreamConsumer {
1038    fn drop(&mut self) {
1039        if let Some(fku_probe_id) = self.fku_probe_id.take() {
1040            let srcpad = self.appsrc.static_pad("src").unwrap();
1041            srcpad.remove_probe(fku_probe_id);
1042        }
1043    }
1044}
1045
1046impl PartialEq for StreamConsumer {
1047    fn eq(&self, other: &Self) -> bool {
1048        self.appsrc.eq(&other.appsrc)
1049    }
1050}
1051
1052impl Eq for StreamConsumer {}
1053
1054impl std::hash::Hash for StreamConsumer {
1055    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1056        std::hash::Hash::hash(&self.appsrc, state);
1057    }
1058}
1059
1060impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
1061    #[inline]
1062    fn borrow(&self) -> &gst_app::AppSrc {
1063        &self.appsrc
1064    }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use std::{
1070        str::FromStr,
1071        sync::{Arc, Mutex},
1072    };
1073
1074    use futures::{
1075        SinkExt, StreamExt,
1076        channel::{mpsc, mpsc::Receiver},
1077    };
1078    use gst::prelude::*;
1079
1080    use crate::{ConsumptionLink, StreamProducer, streamproducer::ConsumerSettings};
1081
1082    fn create_producer() -> (
1083        gst::Pipeline,
1084        gst_app::AppSrc,
1085        gst_app::AppSink,
1086        StreamProducer,
1087    ) {
1088        let producer_pipe =
1089            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
1090                .unwrap()
1091                .downcast::<gst::Pipeline>()
1092                .unwrap();
1093        let producer_sink = producer_pipe
1094            .by_name("producer_sink")
1095            .unwrap()
1096            .downcast::<gst_app::AppSink>()
1097            .unwrap();
1098
1099        (
1100            producer_pipe.clone(),
1101            producer_pipe
1102                .by_name("producer_src")
1103                .unwrap()
1104                .downcast::<gst_app::AppSrc>()
1105                .unwrap(),
1106            producer_sink.clone(),
1107            StreamProducer::from(&producer_sink),
1108        )
1109    }
1110
1111    struct Consumer {
1112        pipeline: gst::Pipeline,
1113        src: gst_app::AppSrc,
1114        sink: gst_app::AppSink,
1115        receiver: Mutex<Receiver<gst::Sample>>,
1116        connected: Mutex<bool>,
1117    }
1118
1119    impl Consumer {
1120        fn new(id: &str) -> Self {
1121            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
1122                .unwrap()
1123                .downcast::<gst::Pipeline>()
1124                .unwrap();
1125
1126            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
1127            let sender = Arc::new(Mutex::new(sender));
1128            let sink = pipeline
1129                .by_name("sink")
1130                .unwrap()
1131                .downcast::<gst_app::AppSink>()
1132                .unwrap();
1133
1134            sink.set_callbacks(
1135                gst_app::AppSinkCallbacks::builder()
1136                    // Add a handler to the "new-sample" signal.
1137                    .new_sample(move |appsink| {
1138                        // Pull the sample in question out of the appsink's buffer.
1139                        let sender_clone = sender.clone();
1140                        futures::executor::block_on(
1141                            sender_clone
1142                                .lock()
1143                                .unwrap()
1144                                .send(appsink.pull_sample().unwrap()),
1145                        )
1146                        .unwrap();
1147
1148                        Ok(gst::FlowSuccess::Ok)
1149                    })
1150                    .build(),
1151            );
1152
1153            Self {
1154                pipeline: pipeline.clone(),
1155                src: pipeline
1156                    .by_name(id)
1157                    .unwrap()
1158                    .downcast::<gst_app::AppSrc>()
1159                    .unwrap(),
1160                sink,
1161                receiver: Mutex::new(receiver),
1162                connected: Mutex::new(false),
1163            }
1164        }
1165
1166        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
1167            {
1168                let mut connected = self.connected.lock().unwrap();
1169                *connected = true;
1170            }
1171
1172            producer.add_consumer(&self.src).unwrap()
1173        }
1174
1175        fn disconnect(&self, producer: &StreamProducer) {
1176            {
1177                let mut connected = self.connected.lock().unwrap();
1178                *connected = false;
1179            }
1180
1181            producer.remove_consumer(&self.src);
1182        }
1183    }
1184
1185    #[test]
1186    fn simple() {
1187        gst::init().unwrap();
1188
1189        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
1190        producer_pipe
1191            .set_state(gst::State::Playing)
1192            .expect("Couldn't set producer pipeline state");
1193
1194        let mut consumers: Vec<Consumer> = Vec::new();
1195        let consumer = Consumer::new("consumer1");
1196        let link1 = consumer.connect(&producer);
1197        consumer
1198            .pipeline
1199            .set_state(gst::State::Playing)
1200            .expect("Couldn't set producer pipeline state");
1201        consumers.push(consumer);
1202
1203        let consumer = Consumer::new("consumer2");
1204        let link2 = consumer.connect(&producer);
1205        consumer
1206            .pipeline
1207            .set_state(gst::State::Playing)
1208            .expect("Couldn't set producer pipeline state");
1209        consumers.push(consumer);
1210
1211        assert!(producer.last_sample().is_none());
1212
1213        for i in 0..10 {
1214            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
1215            producer_src.set_caps(Some(&caps));
1216            producer_src.push_buffer(gst::Buffer::new()).unwrap();
1217
1218            for consumer in &consumers {
1219                if *consumer.connected.lock().unwrap() {
1220                    let sample =
1221                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
1222                            .expect("Received an empty buffer?");
1223                    sample.buffer().expect("No buffer on the sample?");
1224                    assert_eq!(sample.caps(), Some(caps.as_ref()));
1225                } else {
1226                    debug_assert!(
1227                        consumer
1228                            .sink
1229                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
1230                            .is_none(),
1231                        "Disconnected consumer got a new sample?!"
1232                    );
1233                }
1234            }
1235
1236            if i == 5 {
1237                consumers.first().unwrap().disconnect(&producer);
1238            }
1239        }
1240
1241        assert!(producer.last_sample().is_some());
1242
1243        assert_eq!(link1.pushed(), 6);
1244        assert_eq!(link1.dropped(), 0);
1245        assert_eq!(link2.pushed(), 10);
1246        assert_eq!(link2.dropped(), 0);
1247    }
1248
1249    fn check_consumer_commons(consumer: &gst_app::AppSrc) {
1250        assert_eq!(
1251            consumer.latency(),
1252            (Some(gst::ClockTime::ZERO), gst::ClockTime::NONE)
1253        );
1254        assert_eq!(consumer.format(), gst::Format::Time);
1255        assert!(consumer.is_live());
1256        assert!(consumer.is_handle_segment_change());
1257        assert_eq!(consumer.leaky_type(), gst_app::AppLeakyType::Downstream);
1258        assert!(!consumer.property::<bool>("automatic-eos"));
1259    }
1260
1261    #[test]
1262    fn configure_consumer_defaults() {
1263        gst::init().unwrap();
1264
1265        let consumer = gst_app::AppSrc::builder().build();
1266        StreamProducer::configure_consumer(&consumer);
1267        check_consumer_commons(&consumer);
1268
1269        assert_eq!(consumer.max_buffers(), 0);
1270        assert_eq!(consumer.max_bytes(), 0);
1271        assert_eq!(consumer.max_time(), 500.mseconds());
1272    }
1273
1274    #[test]
1275    fn configure_consumer_with_defaults() {
1276        gst::init().unwrap();
1277
1278        let consumer = gst_app::AppSrc::builder().build();
1279        StreamProducer::configure_consumer_with(&consumer, ConsumerSettings::default());
1280        check_consumer_commons(&consumer);
1281
1282        assert_eq!(consumer.max_buffers(), 0);
1283        assert_eq!(consumer.max_bytes(), 0);
1284        assert_eq!(consumer.max_time(), 500.mseconds());
1285    }
1286
1287    #[test]
1288    fn configure_consumer_with_specifics() {
1289        gst::init().unwrap();
1290
1291        let consumer = gst_app::AppSrc::builder().build();
1292
1293        StreamProducer::configure_consumer_with(
1294            &consumer,
1295            ConsumerSettings {
1296                max_buffer: 50,
1297                ..Default::default()
1298            },
1299        );
1300        check_consumer_commons(&consumer);
1301
1302        assert_eq!(consumer.max_buffers(), 50);
1303        assert_eq!(consumer.max_bytes(), 0);
1304        assert_eq!(consumer.max_time(), 500.mseconds());
1305
1306        StreamProducer::configure_consumer_with(
1307            &consumer,
1308            ConsumerSettings {
1309                max_buffer: 10,
1310                max_bytes: 2.mebibytes(),
1311                ..Default::default()
1312            },
1313        );
1314        check_consumer_commons(&consumer);
1315
1316        assert_eq!(consumer.max_buffers(), 10);
1317        assert_eq!(consumer.max_bytes(), 2 * 1024 * 1024);
1318        assert_eq!(consumer.max_time(), 500.mseconds());
1319
1320        StreamProducer::configure_consumer_with(
1321            &consumer,
1322            ConsumerSettings {
1323                max_time: gst::ClockTime::ZERO,
1324                ..Default::default()
1325            },
1326        );
1327        check_consumer_commons(&consumer);
1328
1329        assert_eq!(consumer.max_buffers(), 0);
1330        assert_eq!(consumer.max_bytes(), 0);
1331        assert!(consumer.max_time().is_zero());
1332
1333        StreamProducer::configure_consumer_with(
1334            &consumer,
1335            ConsumerSettings {
1336                max_time: 750.mseconds(),
1337                ..Default::default()
1338            },
1339        );
1340        check_consumer_commons(&consumer);
1341
1342        assert_eq!(consumer.max_buffers(), 0);
1343        assert_eq!(consumer.max_bytes(), 0);
1344        assert_eq!(consumer.max_time(), 750.mseconds());
1345    }
1346}