Skip to main content

gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate { self.eos(eos) } else { self }
77    }
78
79    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80        if let Some(eos) = eos {
81            self.eos(eos)
82        } else {
83            self
84        }
85    }
86
87    pub fn new_preroll<
88        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89    >(
90        self,
91        new_preroll: F,
92    ) -> Self {
93        Self {
94            new_preroll: Some(Box::new(new_preroll)),
95            ..self
96        }
97    }
98
99    pub fn new_preroll_if<
100        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101    >(
102        self,
103        new_preroll: F,
104        predicate: bool,
105    ) -> Self {
106        if predicate {
107            self.new_preroll(new_preroll)
108        } else {
109            self
110        }
111    }
112
113    pub fn new_preroll_if_some<
114        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115    >(
116        self,
117        new_preroll: Option<F>,
118    ) -> Self {
119        if let Some(new_preroll) = new_preroll {
120            self.new_preroll(new_preroll)
121        } else {
122            self
123        }
124    }
125
126    pub fn new_sample<
127        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128    >(
129        self,
130        new_sample: F,
131    ) -> Self {
132        Self {
133            new_sample: Some(Box::new(new_sample)),
134            ..self
135        }
136    }
137
138    pub fn new_sample_if<
139        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140    >(
141        self,
142        new_sample: F,
143        predicate: bool,
144    ) -> Self {
145        if predicate {
146            self.new_sample(new_sample)
147        } else {
148            self
149        }
150    }
151
152    pub fn new_sample_if_some<
153        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154    >(
155        self,
156        new_sample: Option<F>,
157    ) -> Self {
158        if let Some(new_sample) = new_sample {
159            self.new_sample(new_sample)
160        } else {
161            self
162        }
163    }
164
165    #[cfg(feature = "v1_20")]
166    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168        Self {
169            new_event: Some(Box::new(new_event)),
170            ..self
171        }
172    }
173
174    #[cfg(feature = "v1_20")]
175    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177        self,
178        new_event: F,
179        predicate: bool,
180    ) -> Self {
181        if predicate {
182            self.new_event(new_event)
183        } else {
184            self
185        }
186    }
187
188    #[cfg(feature = "v1_20")]
189    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191        self,
192        new_event: Option<F>,
193    ) -> Self {
194        if let Some(new_event) = new_event {
195            self.new_event(new_event)
196        } else {
197            self
198        }
199    }
200
201    #[cfg(feature = "v1_24")]
202    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203    pub fn propose_allocation<
204        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205    >(
206        self,
207        propose_allocation: F,
208    ) -> Self {
209        Self {
210            propose_allocation: Some(Box::new(propose_allocation)),
211            ..self
212        }
213    }
214
215    #[cfg(feature = "v1_24")]
216    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217    pub fn propose_allocation_if<
218        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219    >(
220        self,
221        propose_allocation: F,
222        predicate: bool,
223    ) -> Self {
224        if predicate {
225            self.propose_allocation(propose_allocation)
226        } else {
227            self
228        }
229    }
230
231    #[cfg(feature = "v1_24")]
232    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233    pub fn propose_allocation_if_some<
234        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235    >(
236        self,
237        propose_allocation: Option<F>,
238    ) -> Self {
239        if let Some(propose_allocation) = propose_allocation {
240            self.propose_allocation(propose_allocation)
241        } else {
242            self
243        }
244    }
245
246    #[must_use = "Building the callbacks without using them has no effect"]
247    pub fn build(self) -> AppSinkCallbacks {
248        let have_eos = self.eos.is_some();
249        let have_new_preroll = self.new_preroll.is_some();
250        let have_new_sample = self.new_sample.is_some();
251        let have_new_event = self.new_event.is_some();
252        let have_propose_allocation = self.propose_allocation.is_some();
253
254        AppSinkCallbacks {
255            eos: self.eos,
256            new_preroll: self.new_preroll,
257            new_sample: self.new_sample,
258            new_event: self.new_event,
259            propose_allocation: self.propose_allocation,
260            #[cfg(not(panic = "abort"))]
261            panicked: AtomicBool::new(false),
262            callbacks: ffi::GstAppSinkCallbacks {
263                eos: if have_eos { Some(trampoline_eos) } else { None },
264                new_preroll: if have_new_preroll {
265                    Some(trampoline_new_preroll)
266                } else {
267                    None
268                },
269                new_sample: if have_new_sample {
270                    Some(trampoline_new_sample)
271                } else {
272                    None
273                },
274                new_event: if have_new_event {
275                    Some(trampoline_new_event)
276                } else {
277                    None
278                },
279                propose_allocation: if have_propose_allocation {
280                    Some(trampoline_propose_allocation)
281                } else {
282                    None
283                },
284                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285            },
286        }
287    }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291    unsafe {
292        let callbacks = callbacks as *mut AppSinkCallbacks;
293        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295        #[cfg(not(panic = "abort"))]
296        if (*callbacks).panicked.load(Ordering::Relaxed) {
297            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298            gst::subclass::post_panic_error_message(
299                element.upcast_ref(),
300                element.upcast_ref(),
301                None,
302            );
303            return;
304        }
305
306        if let Some(ref mut eos) = (*callbacks).eos {
307            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308            match result {
309                Ok(result) => result,
310                Err(err) => {
311                    #[cfg(panic = "abort")]
312                    {
313                        unreachable!("{err:?}");
314                    }
315                    #[cfg(not(panic = "abort"))]
316                    {
317                        (*callbacks).panicked.store(true, Ordering::Relaxed);
318                        gst::subclass::post_panic_error_message(
319                            element.upcast_ref(),
320                            element.upcast_ref(),
321                            Some(err),
322                        );
323                    }
324                }
325            }
326        }
327    }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331    appsink: *mut ffi::GstAppSink,
332    callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334    unsafe {
335        let callbacks = callbacks as *mut AppSinkCallbacks;
336        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338        #[cfg(not(panic = "abort"))]
339        if (*callbacks).panicked.load(Ordering::Relaxed) {
340            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341            gst::subclass::post_panic_error_message(
342                element.upcast_ref(),
343                element.upcast_ref(),
344                None,
345            );
346            return gst::FlowReturn::Error.into_glib();
347        }
348
349        let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350            let result =
351                panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352            match result {
353                Ok(result) => result,
354                Err(err) => {
355                    #[cfg(panic = "abort")]
356                    {
357                        unreachable!("{err:?}");
358                    }
359                    #[cfg(not(panic = "abort"))]
360                    {
361                        (*callbacks).panicked.store(true, Ordering::Relaxed);
362                        gst::subclass::post_panic_error_message(
363                            element.upcast_ref(),
364                            element.upcast_ref(),
365                            Some(err),
366                        );
367
368                        gst::FlowReturn::Error
369                    }
370                }
371            }
372        } else {
373            gst::FlowReturn::Error
374        };
375
376        ret.into_glib()
377    }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381    appsink: *mut ffi::GstAppSink,
382    callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384    unsafe {
385        let callbacks = callbacks as *mut AppSinkCallbacks;
386        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388        #[cfg(not(panic = "abort"))]
389        if (*callbacks).panicked.load(Ordering::Relaxed) {
390            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391            gst::subclass::post_panic_error_message(
392                element.upcast_ref(),
393                element.upcast_ref(),
394                None,
395            );
396            return gst::FlowReturn::Error.into_glib();
397        }
398
399        let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400            let result =
401                panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402            match result {
403                Ok(result) => result,
404                Err(err) => {
405                    #[cfg(panic = "abort")]
406                    {
407                        unreachable!("{err:?}");
408                    }
409                    #[cfg(not(panic = "abort"))]
410                    {
411                        (*callbacks).panicked.store(true, Ordering::Relaxed);
412                        gst::subclass::post_panic_error_message(
413                            element.upcast_ref(),
414                            element.upcast_ref(),
415                            Some(err),
416                        );
417
418                        gst::FlowReturn::Error
419                    }
420                }
421            }
422        } else {
423            gst::FlowReturn::Error
424        };
425
426        ret.into_glib()
427    }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431    appsink: *mut ffi::GstAppSink,
432    callbacks: gpointer,
433) -> glib::ffi::gboolean {
434    unsafe {
435        let callbacks = callbacks as *mut AppSinkCallbacks;
436        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438        #[cfg(not(panic = "abort"))]
439        if (*callbacks).panicked.load(Ordering::Relaxed) {
440            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441            gst::subclass::post_panic_error_message(
442                element.upcast_ref(),
443                element.upcast_ref(),
444                None,
445            );
446            return false.into_glib();
447        }
448
449        let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451            match result {
452                Ok(result) => result,
453                Err(err) => {
454                    #[cfg(panic = "abort")]
455                    {
456                        unreachable!("{err:?}");
457                    }
458                    #[cfg(not(panic = "abort"))]
459                    {
460                        (*callbacks).panicked.store(true, Ordering::Relaxed);
461                        gst::subclass::post_panic_error_message(
462                            element.upcast_ref(),
463                            element.upcast_ref(),
464                            Some(err),
465                        );
466
467                        false
468                    }
469                }
470            }
471        } else {
472            false
473        };
474
475        ret.into_glib()
476    }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480    appsink: *mut ffi::GstAppSink,
481    query: *mut gst::ffi::GstQuery,
482    callbacks: gpointer,
483) -> glib::ffi::gboolean {
484    unsafe {
485        let callbacks = callbacks as *mut AppSinkCallbacks;
486        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488        #[cfg(not(panic = "abort"))]
489        if (*callbacks).panicked.load(Ordering::Relaxed) {
490            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491            gst::subclass::post_panic_error_message(
492                element.upcast_ref(),
493                element.upcast_ref(),
494                None,
495            );
496            return false.into_glib();
497        }
498
499        let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500            let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501                gst::QueryViewMut::Allocation(allocation) => allocation,
502                _ => unreachable!(),
503            };
504            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505                propose_allocation(&element, query)
506            }));
507            match result {
508                Ok(result) => result,
509                Err(err) => {
510                    #[cfg(panic = "abort")]
511                    {
512                        unreachable!("{err:?}");
513                    }
514                    #[cfg(not(panic = "abort"))]
515                    {
516                        (*callbacks).panicked.store(true, Ordering::Relaxed);
517                        gst::subclass::post_panic_error_message(
518                            element.upcast_ref(),
519                            element.upcast_ref(),
520                            Some(err),
521                        );
522                        false
523                    }
524                }
525            }
526        } else {
527            false
528        };
529
530        ret.into_glib()
531    }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535    unsafe {
536        let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537    }
538}
539
540impl AppSink {
541    // rustdoc-stripper-ignore-next
542    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
543    ///
544    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
545    pub fn builder<'a>() -> AppSinkBuilder<'a> {
546        assert_initialized_main_thread!();
547        AppSinkBuilder {
548            builder: gst::Object::builder(),
549            callbacks: None,
550            drop_out_of_segment: None,
551        }
552    }
553
554    /// Set callbacks which will be executed for each new preroll, new sample and eos.
555    /// This is an alternative to using the signals, it has lower overhead and is thus
556    /// less expensive, but also less flexible.
557    ///
558    /// If callbacks are installed, no signals will be emitted for performance
559    /// reasons.
560    ///
561    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
562    /// way.
563    ///
564    /// Note that `gst_app_sink_set_callbacks()` and
565    /// `gst_app_sink_set_simple_callbacks()` are mutually exclusive and setting one
566    /// will unset the other.
567    /// ## `callbacks`
568    /// the callbacks
569    /// ## `notify`
570    /// a destroy notify function
571    #[doc(alias = "gst_app_sink_set_callbacks")]
572    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
573        unsafe {
574            let sink = self.to_glib_none().0;
575
576            #[allow(clippy::manual_dangling_ptr)]
577            #[cfg(not(feature = "v1_18"))]
578            {
579                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
580                    std::sync::OnceLock::new();
581
582                let set_once_quark = SET_ONCE_QUARK
583                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
584
585                // This is not thread-safe before 1.16.3, see
586                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
587                if gst::version() < (1, 16, 3, 0) {
588                    if !glib::gobject_ffi::g_object_get_qdata(
589                        sink as *mut _,
590                        set_once_quark.into_glib(),
591                    )
592                    .is_null()
593                    {
594                        panic!("AppSink callbacks can only be set once");
595                    }
596
597                    glib::gobject_ffi::g_object_set_qdata(
598                        sink as *mut _,
599                        set_once_quark.into_glib(),
600                        1 as *mut _,
601                    );
602                }
603            }
604
605            ffi::gst_app_sink_set_callbacks(
606                sink,
607                mut_override(&callbacks.callbacks),
608                Box::into_raw(Box::new(callbacks)) as *mut _,
609                Some(destroy_callbacks),
610            );
611        }
612    }
613
614    #[doc(alias = "drop-out-of-segment")]
615    pub fn drops_out_of_segment(&self) -> bool {
616        unsafe {
617            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
618                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
619            ))
620        }
621    }
622
623    #[doc(alias = "max-bitrate")]
624    #[doc(alias = "gst_base_sink_get_max_bitrate")]
625    pub fn max_bitrate(&self) -> u64 {
626        unsafe {
627            gst_base::ffi::gst_base_sink_get_max_bitrate(
628                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
629            )
630        }
631    }
632
633    #[doc(alias = "max-lateness")]
634    #[doc(alias = "gst_base_sink_get_max_lateness")]
635    pub fn max_lateness(&self) -> i64 {
636        unsafe {
637            gst_base::ffi::gst_base_sink_get_max_lateness(
638                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
639            )
640        }
641    }
642
643    #[doc(alias = "processing-deadline")]
644    #[cfg(feature = "v1_16")]
645    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
646    #[doc(alias = "gst_base_sink_get_processing_deadline")]
647    pub fn processing_deadline(&self) -> gst::ClockTime {
648        unsafe {
649            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
650                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
651            ))
652            .expect("undefined processing_deadline")
653        }
654    }
655
656    #[doc(alias = "render-delay")]
657    #[doc(alias = "gst_base_sink_get_render_delay")]
658    pub fn render_delay(&self) -> gst::ClockTime {
659        unsafe {
660            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
661                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
662            ))
663            .expect("undefined render_delay")
664        }
665    }
666
667    #[cfg(feature = "v1_18")]
668    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
669    #[doc(alias = "gst_base_sink_get_stats")]
670    pub fn stats(&self) -> gst::Structure {
671        unsafe {
672            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
673                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
674            ))
675        }
676    }
677
678    #[doc(alias = "sync")]
679    pub fn is_sync(&self) -> bool {
680        unsafe {
681            from_glib(gst_base::ffi::gst_base_sink_get_sync(
682                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
683            ))
684        }
685    }
686
687    #[doc(alias = "throttle-time")]
688    #[doc(alias = "gst_base_sink_get_throttle_time")]
689    pub fn throttle_time(&self) -> u64 {
690        unsafe {
691            gst_base::ffi::gst_base_sink_get_throttle_time(
692                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
693            )
694        }
695    }
696
697    #[doc(alias = "ts-offset")]
698    #[doc(alias = "gst_base_sink_get_ts_offset")]
699    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
700        unsafe {
701            gst_base::ffi::gst_base_sink_get_ts_offset(
702                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
703            )
704        }
705    }
706
707    #[doc(alias = "async")]
708    #[doc(alias = "gst_base_sink_is_async_enabled")]
709    pub fn is_async(&self) -> bool {
710        unsafe {
711            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
712                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
713            ))
714        }
715    }
716
717    #[doc(alias = "last-sample")]
718    pub fn enables_last_sample(&self) -> bool {
719        unsafe {
720            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
721                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
722            ))
723        }
724    }
725
726    #[doc(alias = "qos")]
727    #[doc(alias = "gst_base_sink_is_qos_enabled")]
728    pub fn is_qos(&self) -> bool {
729        unsafe {
730            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
731                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
732            ))
733        }
734    }
735
736    #[doc(alias = "async")]
737    #[doc(alias = "gst_base_sink_set_async_enabled")]
738    pub fn set_async(&self, enabled: bool) {
739        unsafe {
740            gst_base::ffi::gst_base_sink_set_async_enabled(
741                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
742                enabled.into_glib(),
743            );
744        }
745    }
746
747    #[doc(alias = "drop-out-of-segment")]
748    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
749    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
750        unsafe {
751            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
752                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
753                drop_out_of_segment.into_glib(),
754            );
755        }
756    }
757
758    #[doc(alias = "last-sample")]
759    pub fn set_enable_last_sample(&self, enabled: bool) {
760        unsafe {
761            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
762                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
763                enabled.into_glib(),
764            );
765        }
766    }
767
768    #[doc(alias = "max-bitrate")]
769    #[doc(alias = "gst_base_sink_set_max_bitrate")]
770    pub fn set_max_bitrate(&self, max_bitrate: u64) {
771        unsafe {
772            gst_base::ffi::gst_base_sink_set_max_bitrate(
773                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
774                max_bitrate,
775            );
776        }
777    }
778
779    #[doc(alias = "max-lateness")]
780    #[doc(alias = "gst_base_sink_set_max_lateness")]
781    pub fn set_max_lateness(&self, max_lateness: i64) {
782        unsafe {
783            gst_base::ffi::gst_base_sink_set_max_lateness(
784                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
785                max_lateness,
786            );
787        }
788    }
789
790    #[doc(alias = "processing-deadline")]
791    #[cfg(feature = "v1_16")]
792    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
793    #[doc(alias = "gst_base_sink_set_processing_deadline")]
794    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
795        unsafe {
796            gst_base::ffi::gst_base_sink_set_processing_deadline(
797                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
798                processing_deadline.into_glib(),
799            );
800        }
801    }
802
803    #[doc(alias = "qos")]
804    #[doc(alias = "gst_base_sink_set_qos_enabled")]
805    pub fn set_qos(&self, enabled: bool) {
806        unsafe {
807            gst_base::ffi::gst_base_sink_set_qos_enabled(
808                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
809                enabled.into_glib(),
810            );
811        }
812    }
813
814    #[doc(alias = "render-delay")]
815    #[doc(alias = "gst_base_sink_set_render_delay")]
816    pub fn set_render_delay(&self, delay: gst::ClockTime) {
817        unsafe {
818            gst_base::ffi::gst_base_sink_set_render_delay(
819                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
820                delay.into_glib(),
821            );
822        }
823    }
824
825    #[doc(alias = "sync")]
826    #[doc(alias = "gst_base_sink_set_sync")]
827    pub fn set_sync(&self, sync: bool) {
828        unsafe {
829            gst_base::ffi::gst_base_sink_set_sync(
830                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
831                sync.into_glib(),
832            );
833        }
834    }
835
836    #[doc(alias = "throttle-time")]
837    #[doc(alias = "gst_base_sink_set_throttle_time")]
838    pub fn set_throttle_time(&self, throttle: u64) {
839        unsafe {
840            gst_base::ffi::gst_base_sink_set_throttle_time(
841                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
842                throttle,
843            );
844        }
845    }
846
847    #[doc(alias = "ts-offset")]
848    #[doc(alias = "gst_base_sink_set_ts_offset")]
849    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
850        unsafe {
851            gst_base::ffi::gst_base_sink_set_ts_offset(
852                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
853                offset,
854            );
855        }
856    }
857
858    #[doc(alias = "async")]
859    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
860        &self,
861        f: F,
862    ) -> glib::SignalHandlerId {
863        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
864            this: *mut ffi::GstAppSink,
865            _param_spec: glib::ffi::gpointer,
866            f: glib::ffi::gpointer,
867        ) {
868            unsafe {
869                let f: &F = &*(f as *const F);
870                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
871            }
872        }
873        unsafe {
874            let f: Box<F> = Box::new(f);
875            glib::signal::connect_raw(
876                self.as_ptr() as *mut _,
877                b"notify::async\0".as_ptr() as *const _,
878                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
879                    notify_async_trampoline::<F> as *const (),
880                )),
881                Box::into_raw(f),
882            )
883        }
884    }
885
886    #[doc(alias = "blocksize")]
887    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
888        &self,
889        f: F,
890    ) -> glib::SignalHandlerId {
891        unsafe extern "C" fn notify_blocksize_trampoline<
892            F: Fn(&AppSink) + Send + Sync + 'static,
893        >(
894            this: *mut ffi::GstAppSink,
895            _param_spec: glib::ffi::gpointer,
896            f: glib::ffi::gpointer,
897        ) {
898            unsafe {
899                let f: &F = &*(f as *const F);
900                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
901            }
902        }
903        unsafe {
904            let f: Box<F> = Box::new(f);
905            glib::signal::connect_raw(
906                self.as_ptr() as *mut _,
907                b"notify::blocksize\0".as_ptr() as *const _,
908                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
909                    notify_blocksize_trampoline::<F> as *const (),
910                )),
911                Box::into_raw(f),
912            )
913        }
914    }
915
916    #[doc(alias = "enable-last-sample")]
917    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
918        &self,
919        f: F,
920    ) -> glib::SignalHandlerId {
921        unsafe extern "C" fn notify_enable_last_sample_trampoline<
922            F: Fn(&AppSink) + Send + Sync + 'static,
923        >(
924            this: *mut ffi::GstAppSink,
925            _param_spec: glib::ffi::gpointer,
926            f: glib::ffi::gpointer,
927        ) {
928            unsafe {
929                let f: &F = &*(f as *const F);
930                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
931            }
932        }
933        unsafe {
934            let f: Box<F> = Box::new(f);
935            glib::signal::connect_raw(
936                self.as_ptr() as *mut _,
937                b"notify::enable-last-sample\0".as_ptr() as *const _,
938                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
939                    notify_enable_last_sample_trampoline::<F> as *const (),
940                )),
941                Box::into_raw(f),
942            )
943        }
944    }
945
946    #[doc(alias = "last-sample")]
947    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
948        &self,
949        f: F,
950    ) -> glib::SignalHandlerId {
951        unsafe extern "C" fn notify_last_sample_trampoline<
952            F: Fn(&AppSink) + Send + Sync + 'static,
953        >(
954            this: *mut ffi::GstAppSink,
955            _param_spec: glib::ffi::gpointer,
956            f: glib::ffi::gpointer,
957        ) {
958            unsafe {
959                let f: &F = &*(f as *const F);
960                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
961            }
962        }
963        unsafe {
964            let f: Box<F> = Box::new(f);
965            glib::signal::connect_raw(
966                self.as_ptr() as *mut _,
967                b"notify::last-sample\0".as_ptr() as *const _,
968                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
969                    notify_last_sample_trampoline::<F> as *const (),
970                )),
971                Box::into_raw(f),
972            )
973        }
974    }
975
976    #[doc(alias = "max-bitrate")]
977    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
978        &self,
979        f: F,
980    ) -> glib::SignalHandlerId {
981        unsafe extern "C" fn notify_max_bitrate_trampoline<
982            F: Fn(&AppSink) + Send + Sync + 'static,
983        >(
984            this: *mut ffi::GstAppSink,
985            _param_spec: glib::ffi::gpointer,
986            f: glib::ffi::gpointer,
987        ) {
988            unsafe {
989                let f: &F = &*(f as *const F);
990                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
991            }
992        }
993        unsafe {
994            let f: Box<F> = Box::new(f);
995            glib::signal::connect_raw(
996                self.as_ptr() as *mut _,
997                b"notify::max-bitrate\0".as_ptr() as *const _,
998                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
999                    notify_max_bitrate_trampoline::<F> as *const (),
1000                )),
1001                Box::into_raw(f),
1002            )
1003        }
1004    }
1005
1006    #[doc(alias = "max-lateness")]
1007    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
1008        &self,
1009        f: F,
1010    ) -> glib::SignalHandlerId {
1011        unsafe extern "C" fn notify_max_lateness_trampoline<
1012            F: Fn(&AppSink) + Send + Sync + 'static,
1013        >(
1014            this: *mut ffi::GstAppSink,
1015            _param_spec: glib::ffi::gpointer,
1016            f: glib::ffi::gpointer,
1017        ) {
1018            unsafe {
1019                let f: &F = &*(f as *const F);
1020                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1021            }
1022        }
1023        unsafe {
1024            let f: Box<F> = Box::new(f);
1025            glib::signal::connect_raw(
1026                self.as_ptr() as *mut _,
1027                b"notify::max-lateness\0".as_ptr() as *const _,
1028                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1029                    notify_max_lateness_trampoline::<F> as *const (),
1030                )),
1031                Box::into_raw(f),
1032            )
1033        }
1034    }
1035
1036    #[cfg(feature = "v1_16")]
1037    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1038    #[doc(alias = "processing-deadline")]
1039    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1040        &self,
1041        f: F,
1042    ) -> glib::SignalHandlerId {
1043        unsafe extern "C" fn notify_processing_deadline_trampoline<
1044            F: Fn(&AppSink) + Send + Sync + 'static,
1045        >(
1046            this: *mut ffi::GstAppSink,
1047            _param_spec: glib::ffi::gpointer,
1048            f: glib::ffi::gpointer,
1049        ) {
1050            unsafe {
1051                let f: &F = &*(f as *const F);
1052                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1053            }
1054        }
1055        unsafe {
1056            let f: Box<F> = Box::new(f);
1057            glib::signal::connect_raw(
1058                self.as_ptr() as *mut _,
1059                b"notify::processing-deadline\0".as_ptr() as *const _,
1060                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1061                    notify_processing_deadline_trampoline::<F> as *const (),
1062                )),
1063                Box::into_raw(f),
1064            )
1065        }
1066    }
1067
1068    #[doc(alias = "qos")]
1069    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1070        &self,
1071        f: F,
1072    ) -> glib::SignalHandlerId {
1073        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1074            this: *mut ffi::GstAppSink,
1075            _param_spec: glib::ffi::gpointer,
1076            f: glib::ffi::gpointer,
1077        ) {
1078            unsafe {
1079                let f: &F = &*(f as *const F);
1080                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1081            }
1082        }
1083        unsafe {
1084            let f: Box<F> = Box::new(f);
1085            glib::signal::connect_raw(
1086                self.as_ptr() as *mut _,
1087                b"notify::qos\0".as_ptr() as *const _,
1088                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1089                    notify_qos_trampoline::<F> as *const (),
1090                )),
1091                Box::into_raw(f),
1092            )
1093        }
1094    }
1095
1096    #[doc(alias = "render-delay")]
1097    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1098        &self,
1099        f: F,
1100    ) -> glib::SignalHandlerId {
1101        unsafe extern "C" fn notify_render_delay_trampoline<
1102            F: Fn(&AppSink) + Send + Sync + 'static,
1103        >(
1104            this: *mut ffi::GstAppSink,
1105            _param_spec: glib::ffi::gpointer,
1106            f: glib::ffi::gpointer,
1107        ) {
1108            unsafe {
1109                let f: &F = &*(f as *const F);
1110                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1111            }
1112        }
1113        unsafe {
1114            let f: Box<F> = Box::new(f);
1115            glib::signal::connect_raw(
1116                self.as_ptr() as *mut _,
1117                b"notify::render-delay\0".as_ptr() as *const _,
1118                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1119                    notify_render_delay_trampoline::<F> as *const (),
1120                )),
1121                Box::into_raw(f),
1122            )
1123        }
1124    }
1125
1126    #[cfg(feature = "v1_18")]
1127    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1128    #[doc(alias = "stats")]
1129    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1130        &self,
1131        f: F,
1132    ) -> glib::SignalHandlerId {
1133        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1134            this: *mut ffi::GstAppSink,
1135            _param_spec: glib::ffi::gpointer,
1136            f: glib::ffi::gpointer,
1137        ) {
1138            unsafe {
1139                let f: &F = &*(f as *const F);
1140                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1141            }
1142        }
1143        unsafe {
1144            let f: Box<F> = Box::new(f);
1145            glib::signal::connect_raw(
1146                self.as_ptr() as *mut _,
1147                b"notify::stats\0".as_ptr() as *const _,
1148                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1149                    notify_stats_trampoline::<F> as *const (),
1150                )),
1151                Box::into_raw(f),
1152            )
1153        }
1154    }
1155
1156    #[doc(alias = "sync")]
1157    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1158        &self,
1159        f: F,
1160    ) -> glib::SignalHandlerId {
1161        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1162            this: *mut ffi::GstAppSink,
1163            _param_spec: glib::ffi::gpointer,
1164            f: glib::ffi::gpointer,
1165        ) {
1166            unsafe {
1167                let f: &F = &*(f as *const F);
1168                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1169            }
1170        }
1171        unsafe {
1172            let f: Box<F> = Box::new(f);
1173            glib::signal::connect_raw(
1174                self.as_ptr() as *mut _,
1175                b"notify::sync\0".as_ptr() as *const _,
1176                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1177                    notify_sync_trampoline::<F> as *const (),
1178                )),
1179                Box::into_raw(f),
1180            )
1181        }
1182    }
1183
1184    #[doc(alias = "throttle-time")]
1185    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1186        &self,
1187        f: F,
1188    ) -> glib::SignalHandlerId {
1189        unsafe extern "C" fn notify_throttle_time_trampoline<
1190            F: Fn(&AppSink) + Send + Sync + 'static,
1191        >(
1192            this: *mut ffi::GstAppSink,
1193            _param_spec: glib::ffi::gpointer,
1194            f: glib::ffi::gpointer,
1195        ) {
1196            unsafe {
1197                let f: &F = &*(f as *const F);
1198                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1199            }
1200        }
1201        unsafe {
1202            let f: Box<F> = Box::new(f);
1203            glib::signal::connect_raw(
1204                self.as_ptr() as *mut _,
1205                b"notify::throttle-time\0".as_ptr() as *const _,
1206                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1207                    notify_throttle_time_trampoline::<F> as *const (),
1208                )),
1209                Box::into_raw(f),
1210            )
1211        }
1212    }
1213
1214    #[doc(alias = "ts-offset")]
1215    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1216        &self,
1217        f: F,
1218    ) -> glib::SignalHandlerId {
1219        unsafe extern "C" fn notify_ts_offset_trampoline<
1220            F: Fn(&AppSink) + Send + Sync + 'static,
1221        >(
1222            this: *mut ffi::GstAppSink,
1223            _param_spec: glib::ffi::gpointer,
1224            f: glib::ffi::gpointer,
1225        ) {
1226            unsafe {
1227                let f: &F = &*(f as *const F);
1228                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1229            }
1230        }
1231        unsafe {
1232            let f: Box<F> = Box::new(f);
1233            glib::signal::connect_raw(
1234                self.as_ptr() as *mut _,
1235                b"notify::ts-offset\0".as_ptr() as *const _,
1236                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1237                    notify_ts_offset_trampoline::<F> as *const (),
1238                )),
1239                Box::into_raw(f),
1240            )
1241        }
1242    }
1243
1244    pub fn stream(&self) -> AppSinkStream {
1245        AppSinkStream::new(self)
1246    }
1247}
1248
1249// rustdoc-stripper-ignore-next
1250/// A [builder-pattern] type to construct [`AppSink`] objects.
1251///
1252/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1253#[must_use = "The builder must be built to be used"]
1254pub struct AppSinkBuilder<'a> {
1255    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1256    callbacks: Option<AppSinkCallbacks>,
1257    drop_out_of_segment: Option<bool>,
1258}
1259
1260impl<'a> AppSinkBuilder<'a> {
1261    // rustdoc-stripper-ignore-next
1262    /// Build the [`AppSink`].
1263    ///
1264    /// # Panics
1265    ///
1266    /// This panics if the [`AppSink`] doesn't have all the given properties or
1267    /// property values of the wrong type are provided.
1268    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1269    pub fn build(self) -> AppSink {
1270        let appsink = self.builder.build().unwrap();
1271
1272        if let Some(callbacks) = self.callbacks {
1273            appsink.set_callbacks(callbacks);
1274        }
1275
1276        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1277            appsink.set_drop_out_of_segment(drop_out_of_segment);
1278        }
1279
1280        appsink
1281    }
1282
1283    pub fn async_(self, async_: bool) -> Self {
1284        Self {
1285            builder: self.builder.property("async", async_),
1286            ..self
1287        }
1288    }
1289
1290    pub fn buffer_list(self, buffer_list: bool) -> Self {
1291        Self {
1292            builder: self.builder.property("buffer-list", buffer_list),
1293            ..self
1294        }
1295    }
1296
1297    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1298        Self {
1299            callbacks: Some(callbacks),
1300            ..self
1301        }
1302    }
1303
1304    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1305        Self {
1306            builder: self.builder.property("caps", caps),
1307            ..self
1308        }
1309    }
1310
1311    #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1312    #[allow(deprecated)]
1313    pub fn drop(self, drop: bool) -> Self {
1314        Self {
1315            builder: self.builder.property("drop", drop),
1316            ..self
1317        }
1318    }
1319
1320    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1321        Self {
1322            drop_out_of_segment: Some(drop_out_of_segment),
1323            ..self
1324        }
1325    }
1326
1327    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1328        Self {
1329            builder: self
1330                .builder
1331                .property("enable-last-sample", enable_last_sample),
1332            ..self
1333        }
1334    }
1335
1336    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1337        Self {
1338            builder: self.builder.property("max-bitrate", max_bitrate),
1339            ..self
1340        }
1341    }
1342
1343    pub fn max_buffers(self, max_buffers: u32) -> Self {
1344        Self {
1345            builder: self.builder.property("max-buffers", max_buffers),
1346            ..self
1347        }
1348    }
1349
1350    pub fn max_lateness(self, max_lateness: i64) -> Self {
1351        Self {
1352            builder: self.builder.property("max-lateness", max_lateness),
1353            ..self
1354        }
1355    }
1356
1357    #[cfg(feature = "v1_16")]
1358    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1359    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1360        Self {
1361            builder: self
1362                .builder
1363                .property("processing-deadline", processing_deadline),
1364            ..self
1365        }
1366    }
1367
1368    pub fn qos(self, qos: bool) -> Self {
1369        Self {
1370            builder: self.builder.property("qos", qos),
1371            ..self
1372        }
1373    }
1374
1375    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1376        Self {
1377            builder: self.builder.property("render-delay", render_delay),
1378            ..self
1379        }
1380    }
1381
1382    pub fn sync(self, sync: bool) -> Self {
1383        Self {
1384            builder: self.builder.property("sync", sync),
1385            ..self
1386        }
1387    }
1388
1389    pub fn throttle_time(self, throttle_time: u64) -> Self {
1390        Self {
1391            builder: self.builder.property("throttle-time", throttle_time),
1392            ..self
1393        }
1394    }
1395
1396    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1397        Self {
1398            builder: self.builder.property("ts-offset", ts_offset),
1399            ..self
1400        }
1401    }
1402
1403    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1404        Self {
1405            builder: self.builder.property("wait-on-eos", wait_on_eos),
1406            ..self
1407        }
1408    }
1409
1410    #[cfg(feature = "v1_24")]
1411    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1412    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1413        Self {
1414            builder: self.builder.property("max-time", max_time),
1415            ..self
1416        }
1417    }
1418
1419    #[cfg(feature = "v1_24")]
1420    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1421    pub fn max_bytes(self, max_bytes: u64) -> Self {
1422        Self {
1423            builder: self.builder.property("max-bytes", max_bytes),
1424            ..self
1425        }
1426    }
1427
1428    #[cfg(feature = "v1_28")]
1429    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1430    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1431        Self {
1432            builder: self.builder.property("leaky-type", leaky_type),
1433            ..self
1434        }
1435    }
1436
1437    #[cfg(feature = "v1_28")]
1438    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1439    pub fn silent(self, silent: bool) -> Self {
1440        Self {
1441            builder: self.builder.property("silent", silent),
1442            ..self
1443        }
1444    }
1445
1446    // rustdoc-stripper-ignore-next
1447    /// Sets property `name` to the given value `value`.
1448    ///
1449    /// Overrides any default or previously defined value for `name`.
1450    #[inline]
1451    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1452        Self {
1453            builder: self.builder.property(name, value),
1454            ..self
1455        }
1456    }
1457
1458    // rustdoc-stripper-ignore-next
1459    /// Sets property `name` to the given string value `value`.
1460    #[inline]
1461    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1462        Self {
1463            builder: self.builder.property_from_str(name, value),
1464            ..self
1465        }
1466    }
1467
1468    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1469}
1470
1471#[derive(Debug)]
1472pub struct AppSinkStream {
1473    app_sink: glib::WeakRef<AppSink>,
1474    waker_reference: Arc<Mutex<Option<Waker>>>,
1475}
1476
1477impl AppSinkStream {
1478    fn new(app_sink: &AppSink) -> Self {
1479        skip_assert_initialized!();
1480
1481        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1482
1483        app_sink.set_callbacks(
1484            AppSinkCallbacks::builder()
1485                .new_sample({
1486                    let waker_reference = Arc::clone(&waker_reference);
1487
1488                    move |_| {
1489                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1490                            waker.wake();
1491                        }
1492
1493                        Ok(gst::FlowSuccess::Ok)
1494                    }
1495                })
1496                .eos({
1497                    let waker_reference = Arc::clone(&waker_reference);
1498
1499                    move |_| {
1500                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1501                            waker.wake();
1502                        }
1503                    }
1504                })
1505                .build(),
1506        );
1507
1508        Self {
1509            app_sink: app_sink.downgrade(),
1510            waker_reference,
1511        }
1512    }
1513}
1514
1515impl Drop for AppSinkStream {
1516    fn drop(&mut self) {
1517        #[cfg(not(feature = "v1_18"))]
1518        {
1519            // This is not thread-safe before 1.16.3, see
1520            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1521            if gst::version() >= (1, 16, 3, 0)
1522                && let Some(app_sink) = self.app_sink.upgrade()
1523            {
1524                app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1525            }
1526        }
1527    }
1528}
1529
1530impl Stream for AppSinkStream {
1531    type Item = gst::Sample;
1532
1533    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1534        let mut waker = self.waker_reference.lock().unwrap();
1535
1536        let Some(app_sink) = self.app_sink.upgrade() else {
1537            return Poll::Ready(None);
1538        };
1539
1540        app_sink
1541            .try_pull_sample(gst::ClockTime::ZERO)
1542            .map(|sample| Poll::Ready(Some(sample)))
1543            .unwrap_or_else(|| {
1544                if app_sink.is_eos() {
1545                    return Poll::Ready(None);
1546                }
1547
1548                waker.replace(context.waker().to_owned());
1549
1550                Poll::Pending
1551            })
1552    }
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557    use futures_util::StreamExt;
1558    use gst::prelude::*;
1559
1560    use super::*;
1561
1562    #[test]
1563    fn test_app_sink_stream() {
1564        gst::init().unwrap();
1565
1566        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1567            .property("num-buffers", 5)
1568            .build()
1569            .unwrap();
1570        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1571
1572        let pipeline = gst::Pipeline::new();
1573        pipeline.add(&videotestsrc).unwrap();
1574        pipeline.add(&appsink).unwrap();
1575
1576        videotestsrc.link(&appsink).unwrap();
1577
1578        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1579        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1580
1581        pipeline.set_state(gst::State::Playing).unwrap();
1582        let samples = futures_executor::block_on(samples_future);
1583        pipeline.set_state(gst::State::Null).unwrap();
1584
1585        assert_eq!(samples.len(), 5);
1586    }
1587}