Skip to main content

gstreamer_app/
app_src.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_sink::Sink;
15use glib::{
16    ffi::{gboolean, gpointer},
17    prelude::*,
18    translate::*,
19};
20
21use crate::{AppSrc, ffi};
22
23#[allow(clippy::type_complexity)]
24pub struct AppSrcCallbacks {
25    need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
26    enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
27    seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
28    #[cfg(not(panic = "abort"))]
29    panicked: AtomicBool,
30    callbacks: ffi::GstAppSrcCallbacks,
31}
32
33unsafe impl Send for AppSrcCallbacks {}
34unsafe impl Sync for AppSrcCallbacks {}
35
36impl AppSrcCallbacks {
37    pub fn builder() -> AppSrcCallbacksBuilder {
38        skip_assert_initialized!();
39
40        AppSrcCallbacksBuilder {
41            need_data: None,
42            enough_data: None,
43            seek_data: None,
44        }
45    }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "The builder must be built to be used"]
50pub struct AppSrcCallbacksBuilder {
51    need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
52    enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
53    seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
54}
55
56impl AppSrcCallbacksBuilder {
57    pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self {
58        Self {
59            need_data: Some(Box::new(need_data)),
60            ..self
61        }
62    }
63
64    pub fn need_data_if<F: FnMut(&AppSrc, u32) + Send + 'static>(
65        self,
66        need_data: F,
67        predicate: bool,
68    ) -> Self {
69        if predicate {
70            self.need_data(need_data)
71        } else {
72            self
73        }
74    }
75
76    pub fn need_data_if_some<F: FnMut(&AppSrc, u32) + Send + 'static>(
77        self,
78        need_data: Option<F>,
79    ) -> Self {
80        if let Some(need_data) = need_data {
81            self.need_data(need_data)
82        } else {
83            self
84        }
85    }
86
87    pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self {
88        Self {
89            enough_data: Some(Box::new(enough_data)),
90            ..self
91        }
92    }
93
94    pub fn enough_data_if<F: Fn(&AppSrc) + Send + Sync + 'static>(
95        self,
96        enough_data: F,
97        predicate: bool,
98    ) -> Self {
99        if predicate {
100            self.enough_data(enough_data)
101        } else {
102            self
103        }
104    }
105
106    pub fn enough_data_if_some<F: Fn(&AppSrc) + Send + Sync + 'static>(
107        self,
108        enough_data: Option<F>,
109    ) -> Self {
110        if let Some(enough_data) = enough_data {
111            self.enough_data(enough_data)
112        } else {
113            self
114        }
115    }
116
117    pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
118        self,
119        seek_data: F,
120    ) -> Self {
121        Self {
122            seek_data: Some(Box::new(seek_data)),
123            ..self
124        }
125    }
126
127    pub fn seek_data_if<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
128        self,
129        seek_data: F,
130        predicate: bool,
131    ) -> Self {
132        if predicate {
133            self.seek_data(seek_data)
134        } else {
135            self
136        }
137    }
138
139    pub fn seek_data_if_some<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
140        self,
141        seek_data: Option<F>,
142    ) -> Self {
143        if let Some(seek_data) = seek_data {
144            self.seek_data(seek_data)
145        } else {
146            self
147        }
148    }
149
150    #[must_use = "Building the callbacks without using them has no effect"]
151    pub fn build(self) -> AppSrcCallbacks {
152        let have_need_data = self.need_data.is_some();
153        let have_enough_data = self.enough_data.is_some();
154        let have_seek_data = self.seek_data.is_some();
155
156        AppSrcCallbacks {
157            need_data: self.need_data,
158            enough_data: self.enough_data,
159            seek_data: self.seek_data,
160            #[cfg(not(panic = "abort"))]
161            panicked: AtomicBool::new(false),
162            callbacks: ffi::GstAppSrcCallbacks {
163                need_data: if have_need_data {
164                    Some(trampoline_need_data)
165                } else {
166                    None
167                },
168                enough_data: if have_enough_data {
169                    Some(trampoline_enough_data)
170                } else {
171                    None
172                },
173                seek_data: if have_seek_data {
174                    Some(trampoline_seek_data)
175                } else {
176                    None
177                },
178                _gst_reserved: [
179                    ptr::null_mut(),
180                    ptr::null_mut(),
181                    ptr::null_mut(),
182                    ptr::null_mut(),
183                ],
184            },
185        }
186    }
187}
188
189unsafe extern "C" fn trampoline_need_data(
190    appsrc: *mut ffi::GstAppSrc,
191    length: u32,
192    callbacks: gpointer,
193) {
194    unsafe {
195        let callbacks = callbacks as *mut AppSrcCallbacks;
196        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
197
198        #[cfg(not(panic = "abort"))]
199        if (*callbacks).panicked.load(Ordering::Relaxed) {
200            let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
201            gst::subclass::post_panic_error_message(
202                element.upcast_ref(),
203                element.upcast_ref(),
204                None,
205            );
206            return;
207        }
208
209        if let Some(ref mut need_data) = (*callbacks).need_data {
210            let result =
211                panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length)));
212            match result {
213                Ok(result) => result,
214                Err(err) => {
215                    #[cfg(panic = "abort")]
216                    {
217                        unreachable!("{err:?}");
218                    }
219                    #[cfg(not(panic = "abort"))]
220                    {
221                        (*callbacks).panicked.store(true, Ordering::Relaxed);
222                        gst::subclass::post_panic_error_message(
223                            element.upcast_ref(),
224                            element.upcast_ref(),
225                            Some(err),
226                        );
227                    }
228                }
229            }
230        }
231    }
232}
233
234unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) {
235    unsafe {
236        let callbacks = callbacks as *const AppSrcCallbacks;
237        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
238
239        #[cfg(not(panic = "abort"))]
240        if (*callbacks).panicked.load(Ordering::Relaxed) {
241            let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
242            gst::subclass::post_panic_error_message(
243                element.upcast_ref(),
244                element.upcast_ref(),
245                None,
246            );
247            return;
248        }
249
250        if let Some(ref enough_data) = (*callbacks).enough_data {
251            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element)));
252            match result {
253                Ok(result) => result,
254                Err(err) => {
255                    #[cfg(panic = "abort")]
256                    {
257                        unreachable!("{err:?}");
258                    }
259                    #[cfg(not(panic = "abort"))]
260                    {
261                        (*callbacks).panicked.store(true, Ordering::Relaxed);
262                        gst::subclass::post_panic_error_message(
263                            element.upcast_ref(),
264                            element.upcast_ref(),
265                            Some(err),
266                        );
267                    }
268                }
269            }
270        }
271    }
272}
273
274unsafe extern "C" fn trampoline_seek_data(
275    appsrc: *mut ffi::GstAppSrc,
276    offset: u64,
277    callbacks: gpointer,
278) -> gboolean {
279    unsafe {
280        let callbacks = callbacks as *const AppSrcCallbacks;
281        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
282
283        #[cfg(not(panic = "abort"))]
284        if (*callbacks).panicked.load(Ordering::Relaxed) {
285            let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
286            gst::subclass::post_panic_error_message(
287                element.upcast_ref(),
288                element.upcast_ref(),
289                None,
290            );
291            return false.into_glib();
292        }
293
294        let ret = if let Some(ref seek_data) = (*callbacks).seek_data {
295            let result =
296                panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset)));
297            match result {
298                Ok(result) => result,
299                Err(err) => {
300                    #[cfg(panic = "abort")]
301                    {
302                        unreachable!("{err:?}");
303                    }
304                    #[cfg(not(panic = "abort"))]
305                    {
306                        (*callbacks).panicked.store(true, Ordering::Relaxed);
307                        gst::subclass::post_panic_error_message(
308                            element.upcast_ref(),
309                            element.upcast_ref(),
310                            Some(err),
311                        );
312
313                        false
314                    }
315                }
316            }
317        } else {
318            false
319        };
320
321        ret.into_glib()
322    }
323}
324
325unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
326    unsafe {
327        let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _);
328    }
329}
330
331impl AppSrc {
332    // rustdoc-stripper-ignore-next
333    /// Creates a new builder-pattern struct instance to construct [`AppSrc`] objects.
334    ///
335    /// This method returns an instance of [`AppSrcBuilder`](crate::builders::AppSrcBuilder) which can be used to create [`AppSrc`] objects.
336    pub fn builder<'a>() -> AppSrcBuilder<'a> {
337        assert_initialized_main_thread!();
338        AppSrcBuilder {
339            builder: gst::Object::builder(),
340            callbacks: None,
341            automatic_eos: None,
342        }
343    }
344
345    /// Set callbacks which will be executed when data is needed, enough data has
346    /// been collected or when a seek should be performed.
347    /// This is an alternative to using the signals, it has lower overhead and is thus
348    /// less expensive, but also less flexible.
349    ///
350    /// If callbacks are installed, no signals will be emitted for performance
351    /// reasons.
352    ///
353    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
354    /// way.
355    /// ## `callbacks`
356    /// the callbacks
357    /// ## `notify`
358    /// a destroy notify function
359    #[doc(alias = "gst_app_src_set_callbacks")]
360    pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
361        unsafe {
362            let src = self.to_glib_none().0;
363            #[allow(clippy::manual_dangling_ptr)]
364            #[cfg(not(feature = "v1_18"))]
365            {
366                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
367                    std::sync::OnceLock::new();
368
369                let set_once_quark = SET_ONCE_QUARK
370                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks"));
371
372                // This is not thread-safe before 1.16.3, see
373                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
374                if gst::version() < (1, 16, 3, 0) {
375                    if !glib::gobject_ffi::g_object_get_qdata(
376                        src as *mut _,
377                        set_once_quark.into_glib(),
378                    )
379                    .is_null()
380                    {
381                        panic!("AppSrc callbacks can only be set once");
382                    }
383
384                    glib::gobject_ffi::g_object_set_qdata(
385                        src as *mut _,
386                        set_once_quark.into_glib(),
387                        1 as *mut _,
388                    );
389                }
390            }
391
392            ffi::gst_app_src_set_callbacks(
393                src,
394                mut_override(&callbacks.callbacks),
395                Box::into_raw(Box::new(callbacks)) as *mut _,
396                Some(destroy_callbacks),
397            );
398        }
399    }
400
401    /// Configure the `min` and `max` latency in `src`. If `min` is set to -1, the
402    /// default latency calculations for pseudo-live sources will be used.
403    /// ## `min`
404    /// the min latency
405    /// ## `max`
406    /// the max latency
407    #[doc(alias = "gst_app_src_set_latency")]
408    pub fn set_latency(
409        &self,
410        min: impl Into<Option<gst::ClockTime>>,
411        max: impl Into<Option<gst::ClockTime>>,
412    ) {
413        unsafe {
414            ffi::gst_app_src_set_latency(
415                self.to_glib_none().0,
416                min.into().into_glib(),
417                max.into().into_glib(),
418            );
419        }
420    }
421
422    /// Retrieve the min and max latencies in `min` and `max` respectively.
423    ///
424    /// # Returns
425    ///
426    ///
427    /// ## `min`
428    /// the min latency
429    ///
430    /// ## `max`
431    /// the max latency
432    #[doc(alias = "get_latency")]
433    #[doc(alias = "gst_app_src_get_latency")]
434    pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) {
435        unsafe {
436            let mut min = mem::MaybeUninit::uninit();
437            let mut max = mem::MaybeUninit::uninit();
438            ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr());
439            (from_glib(min.assume_init()), from_glib(max.assume_init()))
440        }
441    }
442
443    #[doc(alias = "do-timestamp")]
444    #[doc(alias = "gst_base_src_set_do_timestamp")]
445    pub fn set_do_timestamp(&self, timestamp: bool) {
446        unsafe {
447            gst_base::ffi::gst_base_src_set_do_timestamp(
448                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
449                timestamp.into_glib(),
450            );
451        }
452    }
453
454    #[doc(alias = "do-timestamp")]
455    #[doc(alias = "gst_base_src_get_do_timestamp")]
456    pub fn do_timestamp(&self) -> bool {
457        unsafe {
458            from_glib(gst_base::ffi::gst_base_src_get_do_timestamp(
459                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc
460            ))
461        }
462    }
463
464    #[doc(alias = "do-timestamp")]
465    pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>(
466        &self,
467        f: F,
468    ) -> glib::SignalHandlerId {
469        unsafe extern "C" fn notify_do_timestamp_trampoline<
470            F: Fn(&AppSrc) + Send + Sync + 'static,
471        >(
472            this: *mut ffi::GstAppSrc,
473            _param_spec: glib::ffi::gpointer,
474            f: glib::ffi::gpointer,
475        ) {
476            unsafe {
477                let f: &F = &*(f as *const F);
478                f(&AppSrc::from_glib_borrow(this))
479            }
480        }
481        unsafe {
482            let f: Box<F> = Box::new(f);
483            glib::signal::connect_raw(
484                self.as_ptr() as *mut _,
485                b"notify::do-timestamp\0".as_ptr() as *const _,
486                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
487                    notify_do_timestamp_trampoline::<F> as *const (),
488                )),
489                Box::into_raw(f),
490            )
491        }
492    }
493
494    #[doc(alias = "set-automatic-eos")]
495    #[doc(alias = "gst_base_src_set_automatic_eos")]
496    pub fn set_automatic_eos(&self, automatic_eos: bool) {
497        unsafe {
498            gst_base::ffi::gst_base_src_set_automatic_eos(
499                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
500                automatic_eos.into_glib(),
501            );
502        }
503    }
504
505    pub fn sink(&self) -> AppSrcSink {
506        AppSrcSink::new(self)
507    }
508}
509
510// rustdoc-stripper-ignore-next
511/// A [builder-pattern] type to construct [`AppSrc`] objects.
512///
513/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
514#[must_use = "The builder must be built to be used"]
515pub struct AppSrcBuilder<'a> {
516    builder: gst::gobject::GObjectBuilder<'a, AppSrc>,
517    callbacks: Option<AppSrcCallbacks>,
518    automatic_eos: Option<bool>,
519}
520
521impl<'a> AppSrcBuilder<'a> {
522    // rustdoc-stripper-ignore-next
523    /// Build the [`AppSrc`].
524    ///
525    /// # Panics
526    ///
527    /// This panics if the [`AppSrc`] doesn't have all the given properties or
528    /// property values of the wrong type are provided.
529    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
530    pub fn build(self) -> AppSrc {
531        let appsrc = self.builder.build().unwrap();
532
533        if let Some(callbacks) = self.callbacks {
534            appsrc.set_callbacks(callbacks);
535        }
536
537        if let Some(automatic_eos) = self.automatic_eos {
538            appsrc.set_automatic_eos(automatic_eos);
539        }
540
541        appsrc
542    }
543
544    pub fn automatic_eos(self, automatic_eos: bool) -> Self {
545        Self {
546            automatic_eos: Some(automatic_eos),
547            ..self
548        }
549    }
550
551    pub fn block(self, block: bool) -> Self {
552        Self {
553            builder: self.builder.property("block", block),
554            ..self
555        }
556    }
557
558    pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self {
559        Self {
560            callbacks: Some(callbacks),
561            ..self
562        }
563    }
564
565    pub fn caps(self, caps: &'a gst::Caps) -> Self {
566        Self {
567            builder: self.builder.property("caps", caps),
568            ..self
569        }
570    }
571
572    pub fn do_timestamp(self, do_timestamp: bool) -> Self {
573        Self {
574            builder: self.builder.property("do-timestamp", do_timestamp),
575            ..self
576        }
577    }
578
579    pub fn duration(self, duration: u64) -> Self {
580        Self {
581            builder: self.builder.property("duration", duration),
582            ..self
583        }
584    }
585
586    pub fn format(self, format: gst::Format) -> Self {
587        Self {
588            builder: self.builder.property("format", format),
589            ..self
590        }
591    }
592
593    #[cfg(feature = "v1_18")]
594    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
595    pub fn handle_segment_change(self, handle_segment_change: bool) -> Self {
596        Self {
597            builder: self
598                .builder
599                .property("handle-segment-change", handle_segment_change),
600            ..self
601        }
602    }
603
604    pub fn is_live(self, is_live: bool) -> Self {
605        Self {
606            builder: self.builder.property("is-live", is_live),
607            ..self
608        }
609    }
610
611    #[cfg(feature = "v1_20")]
612    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
613    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
614        Self {
615            builder: self.builder.property("leaky-type", leaky_type),
616            ..self
617        }
618    }
619
620    #[cfg(feature = "v1_20")]
621    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
622    pub fn max_buffers(self, max_buffers: u64) -> Self {
623        Self {
624            builder: self.builder.property("max-buffers", max_buffers),
625            ..self
626        }
627    }
628
629    pub fn max_bytes(self, max_bytes: u64) -> Self {
630        Self {
631            builder: self.builder.property("max-bytes", max_bytes),
632            ..self
633        }
634    }
635
636    pub fn max_latency(self, max_latency: i64) -> Self {
637        Self {
638            builder: self.builder.property("max-latency", max_latency),
639            ..self
640        }
641    }
642
643    #[cfg(feature = "v1_20")]
644    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
645    pub fn max_time(self, max_time: gst::ClockTime) -> Self {
646        Self {
647            builder: self.builder.property("max-time", max_time),
648            ..self
649        }
650    }
651
652    pub fn min_latency(self, min_latency: i64) -> Self {
653        Self {
654            builder: self.builder.property("min-latency", min_latency),
655            ..self
656        }
657    }
658
659    pub fn min_percent(self, min_percent: u32) -> Self {
660        Self {
661            builder: self.builder.property("min-percent", min_percent),
662            ..self
663        }
664    }
665
666    pub fn size(self, size: i64) -> Self {
667        Self {
668            builder: self.builder.property("size", size),
669            ..self
670        }
671    }
672
673    pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self {
674        Self {
675            builder: self.builder.property("stream-type", stream_type),
676            ..self
677        }
678    }
679
680    #[cfg(feature = "v1_28")]
681    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
682    pub fn silent(self, silent: bool) -> Self {
683        Self {
684            builder: self.builder.property("silent", silent),
685            ..self
686        }
687    }
688
689    // rustdoc-stripper-ignore-next
690    /// Sets property `name` to the given value `value`.
691    ///
692    /// Overrides any default or previously defined value for `name`.
693    #[inline]
694    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
695        Self {
696            builder: self.builder.property(name, value),
697            ..self
698        }
699    }
700
701    // rustdoc-stripper-ignore-next
702    /// Sets property `name` to the given string value `value`.
703    #[inline]
704    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
705        Self {
706            builder: self.builder.property_from_str(name, value),
707            ..self
708        }
709    }
710
711    gst::impl_builder_gvalue_extra_setters!(property_and_name);
712}
713
714#[derive(Debug)]
715pub struct AppSrcSink {
716    app_src: glib::WeakRef<AppSrc>,
717    waker_reference: Arc<Mutex<Option<Waker>>>,
718}
719
720impl AppSrcSink {
721    fn new(app_src: &AppSrc) -> Self {
722        skip_assert_initialized!();
723
724        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
725
726        app_src.set_callbacks(
727            AppSrcCallbacks::builder()
728                .need_data({
729                    let waker_reference = Arc::clone(&waker_reference);
730
731                    move |_, _| {
732                        if let Some(waker) = waker_reference.lock().unwrap().take() {
733                            waker.wake();
734                        }
735                    }
736                })
737                .build(),
738        );
739
740        Self {
741            app_src: app_src.downgrade(),
742            waker_reference,
743        }
744    }
745}
746
747impl Drop for AppSrcSink {
748    fn drop(&mut self) {
749        #[cfg(not(feature = "v1_18"))]
750        {
751            // This is not thread-safe before 1.16.3, see
752            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
753            if gst::version() >= (1, 16, 3, 0)
754                && let Some(app_src) = self.app_src.upgrade()
755            {
756                app_src.set_callbacks(AppSrcCallbacks::builder().build());
757            }
758        }
759    }
760}
761
762impl Sink<gst::Sample> for AppSrcSink {
763    type Error = gst::FlowError;
764
765    fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
766        let mut waker = self.waker_reference.lock().unwrap();
767
768        let Some(app_src) = self.app_src.upgrade() else {
769            return Poll::Ready(Err(gst::FlowError::Eos));
770        };
771
772        let current_level_bytes = app_src.current_level_bytes();
773        let max_bytes = app_src.max_bytes();
774
775        if current_level_bytes >= max_bytes && max_bytes != 0 {
776            waker.replace(context.waker().to_owned());
777
778            Poll::Pending
779        } else {
780            Poll::Ready(Ok(()))
781        }
782    }
783
784    fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
785        let Some(app_src) = self.app_src.upgrade() else {
786            return Err(gst::FlowError::Eos);
787        };
788
789        app_src.push_sample(&sample)?;
790
791        Ok(())
792    }
793
794    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
795        Poll::Ready(Ok(()))
796    }
797
798    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
799        let Some(app_src) = self.app_src.upgrade() else {
800            return Poll::Ready(Ok(()));
801        };
802
803        app_src.end_of_stream()?;
804
805        Poll::Ready(Ok(()))
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use std::sync::atomic::{AtomicUsize, Ordering};
812
813    use futures_util::{sink::SinkExt, stream::StreamExt};
814    use gst::prelude::*;
815
816    use super::*;
817
818    #[test]
819    fn test_app_src_sink() {
820        gst::init().unwrap();
821
822        let appsrc = gst::ElementFactory::make("appsrc").build().unwrap();
823        let fakesink = gst::ElementFactory::make("fakesink")
824            .property("signal-handoffs", true)
825            .build()
826            .unwrap();
827
828        let pipeline = gst::Pipeline::new();
829        pipeline.add(&appsrc).unwrap();
830        pipeline.add(&fakesink).unwrap();
831
832        appsrc.link(&fakesink).unwrap();
833
834        let mut bus_stream = pipeline.bus().unwrap().stream();
835        let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
836
837        let sample_quantity = 5;
838
839        let samples = (0..sample_quantity)
840            .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build())
841            .collect::<Vec<gst::Sample>>();
842
843        let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
844
845        let handoff_count_reference = Arc::new(AtomicUsize::new(0));
846
847        fakesink.connect("handoff", false, {
848            let handoff_count_reference = Arc::clone(&handoff_count_reference);
849
850            move |_| {
851                handoff_count_reference.fetch_add(1, Ordering::AcqRel);
852
853                None
854            }
855        });
856
857        pipeline.set_state(gst::State::Playing).unwrap();
858
859        futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
860        futures_executor::block_on(app_src_sink.close()).unwrap();
861
862        while let Some(message) = futures_executor::block_on(bus_stream.next()) {
863            match message.view() {
864                gst::MessageView::Eos(_) => break,
865                gst::MessageView::Error(_) => unreachable!(),
866                _ => continue,
867            }
868        }
869
870        pipeline.set_state(gst::State::Null).unwrap();
871
872        assert_eq!(
873            handoff_count_reference.load(Ordering::Acquire),
874            sample_quantity
875        );
876    }
877
878    #[test]
879    fn builder_caps_lt() {
880        gst::init().unwrap();
881
882        let caps = &gst::Caps::new_any();
883        {
884            let stream_type = "random-access".to_owned();
885            let appsrc = AppSrc::builder()
886                .property_from_str("stream-type", &stream_type)
887                .caps(caps)
888                .build();
889            assert_eq!(
890                appsrc.property::<crate::AppStreamType>("stream-type"),
891                crate::AppStreamType::RandomAccess
892            );
893            assert!(appsrc.property::<gst::Caps>("caps").is_any());
894        }
895
896        let stream_type = &"random-access".to_owned();
897        {
898            let caps = &gst::Caps::new_any();
899            let appsrc = AppSrc::builder()
900                .property_from_str("stream-type", stream_type)
901                .caps(caps)
902                .build();
903            assert_eq!(
904                appsrc.property::<crate::AppStreamType>("stream-type"),
905                crate::AppStreamType::RandomAccess
906            );
907            assert!(appsrc.property::<gst::Caps>("caps").is_any());
908        }
909    }
910}