gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate {
77            self.eos(eos)
78        } else {
79            self
80        }
81    }
82
83    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84        if let Some(eos) = eos {
85            self.eos(eos)
86        } else {
87            self
88        }
89    }
90
91    pub fn new_preroll<
92        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93    >(
94        self,
95        new_preroll: F,
96    ) -> Self {
97        Self {
98            new_preroll: Some(Box::new(new_preroll)),
99            ..self
100        }
101    }
102
103    pub fn new_preroll_if<
104        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105    >(
106        self,
107        new_preroll: F,
108        predicate: bool,
109    ) -> Self {
110        if predicate {
111            self.new_preroll(new_preroll)
112        } else {
113            self
114        }
115    }
116
117    pub fn new_preroll_if_some<
118        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119    >(
120        self,
121        new_preroll: Option<F>,
122    ) -> Self {
123        if let Some(new_preroll) = new_preroll {
124            self.new_preroll(new_preroll)
125        } else {
126            self
127        }
128    }
129
130    pub fn new_sample<
131        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132    >(
133        self,
134        new_sample: F,
135    ) -> Self {
136        Self {
137            new_sample: Some(Box::new(new_sample)),
138            ..self
139        }
140    }
141
142    pub fn new_sample_if<
143        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144    >(
145        self,
146        new_sample: F,
147        predicate: bool,
148    ) -> Self {
149        if predicate {
150            self.new_sample(new_sample)
151        } else {
152            self
153        }
154    }
155
156    pub fn new_sample_if_some<
157        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158    >(
159        self,
160        new_sample: Option<F>,
161    ) -> Self {
162        if let Some(new_sample) = new_sample {
163            self.new_sample(new_sample)
164        } else {
165            self
166        }
167    }
168
169    #[cfg(feature = "v1_20")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172        Self {
173            new_event: Some(Box::new(new_event)),
174            ..self
175        }
176    }
177
178    #[cfg(feature = "v1_20")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181        self,
182        new_event: F,
183        predicate: bool,
184    ) -> Self {
185        if predicate {
186            self.new_event(new_event)
187        } else {
188            self
189        }
190    }
191
192    #[cfg(feature = "v1_20")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195        self,
196        new_event: Option<F>,
197    ) -> Self {
198        if let Some(new_event) = new_event {
199            self.new_event(new_event)
200        } else {
201            self
202        }
203    }
204
205    #[cfg(feature = "v1_24")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207    pub fn propose_allocation<
208        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209    >(
210        self,
211        propose_allocation: F,
212    ) -> Self {
213        Self {
214            propose_allocation: Some(Box::new(propose_allocation)),
215            ..self
216        }
217    }
218
219    #[cfg(feature = "v1_24")]
220    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221    pub fn propose_allocation_if<
222        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223    >(
224        self,
225        propose_allocation: F,
226        predicate: bool,
227    ) -> Self {
228        if predicate {
229            self.propose_allocation(propose_allocation)
230        } else {
231            self
232        }
233    }
234
235    #[cfg(feature = "v1_24")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237    pub fn propose_allocation_if_some<
238        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239    >(
240        self,
241        propose_allocation: Option<F>,
242    ) -> Self {
243        if let Some(propose_allocation) = propose_allocation {
244            self.propose_allocation(propose_allocation)
245        } else {
246            self
247        }
248    }
249
250    #[must_use = "Building the callbacks without using them has no effect"]
251    pub fn build(self) -> AppSinkCallbacks {
252        let have_eos = self.eos.is_some();
253        let have_new_preroll = self.new_preroll.is_some();
254        let have_new_sample = self.new_sample.is_some();
255        let have_new_event = self.new_event.is_some();
256        let have_propose_allocation = self.propose_allocation.is_some();
257
258        AppSinkCallbacks {
259            eos: self.eos,
260            new_preroll: self.new_preroll,
261            new_sample: self.new_sample,
262            new_event: self.new_event,
263            propose_allocation: self.propose_allocation,
264            #[cfg(not(panic = "abort"))]
265            panicked: AtomicBool::new(false),
266            callbacks: ffi::GstAppSinkCallbacks {
267                eos: if have_eos { Some(trampoline_eos) } else { None },
268                new_preroll: if have_new_preroll {
269                    Some(trampoline_new_preroll)
270                } else {
271                    None
272                },
273                new_sample: if have_new_sample {
274                    Some(trampoline_new_sample)
275                } else {
276                    None
277                },
278                new_event: if have_new_event {
279                    Some(trampoline_new_event)
280                } else {
281                    None
282                },
283                propose_allocation: if have_propose_allocation {
284                    Some(trampoline_propose_allocation)
285                } else {
286                    None
287                },
288                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289            },
290        }
291    }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295    let callbacks = callbacks as *mut AppSinkCallbacks;
296    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298    #[cfg(not(panic = "abort"))]
299    if (*callbacks).panicked.load(Ordering::Relaxed) {
300        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302        return;
303    }
304
305    if let Some(ref mut eos) = (*callbacks).eos {
306        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307        match result {
308            Ok(result) => result,
309            Err(err) => {
310                #[cfg(panic = "abort")]
311                {
312                    unreachable!("{err:?}");
313                }
314                #[cfg(not(panic = "abort"))]
315                {
316                    (*callbacks).panicked.store(true, Ordering::Relaxed);
317                    gst::subclass::post_panic_error_message(
318                        element.upcast_ref(),
319                        element.upcast_ref(),
320                        Some(err),
321                    );
322                }
323            }
324        }
325    }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329    appsink: *mut ffi::GstAppSink,
330    callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332    let callbacks = callbacks as *mut AppSinkCallbacks;
333    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335    #[cfg(not(panic = "abort"))]
336    if (*callbacks).panicked.load(Ordering::Relaxed) {
337        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339        return gst::FlowReturn::Error.into_glib();
340    }
341
342    let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344        match result {
345            Ok(result) => result,
346            Err(err) => {
347                #[cfg(panic = "abort")]
348                {
349                    unreachable!("{err:?}");
350                }
351                #[cfg(not(panic = "abort"))]
352                {
353                    (*callbacks).panicked.store(true, Ordering::Relaxed);
354                    gst::subclass::post_panic_error_message(
355                        element.upcast_ref(),
356                        element.upcast_ref(),
357                        Some(err),
358                    );
359
360                    gst::FlowReturn::Error
361                }
362            }
363        }
364    } else {
365        gst::FlowReturn::Error
366    };
367
368    ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372    appsink: *mut ffi::GstAppSink,
373    callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375    let callbacks = callbacks as *mut AppSinkCallbacks;
376    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378    #[cfg(not(panic = "abort"))]
379    if (*callbacks).panicked.load(Ordering::Relaxed) {
380        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382        return gst::FlowReturn::Error.into_glib();
383    }
384
385    let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387        match result {
388            Ok(result) => result,
389            Err(err) => {
390                #[cfg(panic = "abort")]
391                {
392                    unreachable!("{err:?}");
393                }
394                #[cfg(not(panic = "abort"))]
395                {
396                    (*callbacks).panicked.store(true, Ordering::Relaxed);
397                    gst::subclass::post_panic_error_message(
398                        element.upcast_ref(),
399                        element.upcast_ref(),
400                        Some(err),
401                    );
402
403                    gst::FlowReturn::Error
404                }
405            }
406        }
407    } else {
408        gst::FlowReturn::Error
409    };
410
411    ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415    appsink: *mut ffi::GstAppSink,
416    callbacks: gpointer,
417) -> glib::ffi::gboolean {
418    let callbacks = callbacks as *mut AppSinkCallbacks;
419    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421    #[cfg(not(panic = "abort"))]
422    if (*callbacks).panicked.load(Ordering::Relaxed) {
423        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425        return false.into_glib();
426    }
427
428    let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430        match result {
431            Ok(result) => result,
432            Err(err) => {
433                #[cfg(panic = "abort")]
434                {
435                    unreachable!("{err:?}");
436                }
437                #[cfg(not(panic = "abort"))]
438                {
439                    (*callbacks).panicked.store(true, Ordering::Relaxed);
440                    gst::subclass::post_panic_error_message(
441                        element.upcast_ref(),
442                        element.upcast_ref(),
443                        Some(err),
444                    );
445
446                    false
447                }
448            }
449        }
450    } else {
451        false
452    };
453
454    ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458    appsink: *mut ffi::GstAppSink,
459    query: *mut gst::ffi::GstQuery,
460    callbacks: gpointer,
461) -> glib::ffi::gboolean {
462    let callbacks = callbacks as *mut AppSinkCallbacks;
463    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465    #[cfg(not(panic = "abort"))]
466    if (*callbacks).panicked.load(Ordering::Relaxed) {
467        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469        return false.into_glib();
470    }
471
472    let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473        let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474            gst::QueryViewMut::Allocation(allocation) => allocation,
475            _ => unreachable!(),
476        };
477        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478            propose_allocation(&element, query)
479        }));
480        match result {
481            Ok(result) => result,
482            Err(err) => {
483                #[cfg(panic = "abort")]
484                {
485                    unreachable!("{err:?}");
486                }
487                #[cfg(not(panic = "abort"))]
488                {
489                    (*callbacks).panicked.store(true, Ordering::Relaxed);
490                    gst::subclass::post_panic_error_message(
491                        element.upcast_ref(),
492                        element.upcast_ref(),
493                        Some(err),
494                    );
495                    false
496                }
497            }
498        }
499    } else {
500        false
501    };
502
503    ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507    let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511    // rustdoc-stripper-ignore-next
512    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
513    ///
514    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
515    pub fn builder<'a>() -> AppSinkBuilder<'a> {
516        assert_initialized_main_thread!();
517        AppSinkBuilder {
518            builder: gst::Object::builder(),
519            callbacks: None,
520            drop_out_of_segment: None,
521        }
522    }
523
524    /// Set callbacks which will be executed for each new preroll, new sample and eos.
525    /// This is an alternative to using the signals, it has lower overhead and is thus
526    /// less expensive, but also less flexible.
527    ///
528    /// If callbacks are installed, no signals will be emitted for performance
529    /// reasons.
530    ///
531    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
532    /// way.
533    /// ## `callbacks`
534    /// the callbacks
535    /// ## `notify`
536    /// a destroy notify function
537    #[doc(alias = "gst_app_sink_set_callbacks")]
538    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
539        unsafe {
540            let sink = self.to_glib_none().0;
541
542            #[cfg(not(feature = "v1_18"))]
543            {
544                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
545                    std::sync::OnceLock::new();
546
547                let set_once_quark = SET_ONCE_QUARK
548                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
549
550                // This is not thread-safe before 1.16.3, see
551                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
552                if gst::version() < (1, 16, 3, 0) {
553                    if !glib::gobject_ffi::g_object_get_qdata(
554                        sink as *mut _,
555                        set_once_quark.into_glib(),
556                    )
557                    .is_null()
558                    {
559                        panic!("AppSink callbacks can only be set once");
560                    }
561
562                    glib::gobject_ffi::g_object_set_qdata(
563                        sink as *mut _,
564                        set_once_quark.into_glib(),
565                        1 as *mut _,
566                    );
567                }
568            }
569
570            ffi::gst_app_sink_set_callbacks(
571                sink,
572                mut_override(&callbacks.callbacks),
573                Box::into_raw(Box::new(callbacks)) as *mut _,
574                Some(destroy_callbacks),
575            );
576        }
577    }
578
579    #[doc(alias = "drop-out-of-segment")]
580    pub fn drops_out_of_segment(&self) -> bool {
581        unsafe {
582            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
583                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
584            ))
585        }
586    }
587
588    #[doc(alias = "max-bitrate")]
589    #[doc(alias = "gst_base_sink_get_max_bitrate")]
590    pub fn max_bitrate(&self) -> u64 {
591        unsafe {
592            gst_base::ffi::gst_base_sink_get_max_bitrate(
593                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
594            )
595        }
596    }
597
598    #[doc(alias = "max-lateness")]
599    #[doc(alias = "gst_base_sink_get_max_lateness")]
600    pub fn max_lateness(&self) -> i64 {
601        unsafe {
602            gst_base::ffi::gst_base_sink_get_max_lateness(
603                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
604            )
605        }
606    }
607
608    #[doc(alias = "processing-deadline")]
609    #[cfg(feature = "v1_16")]
610    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
611    #[doc(alias = "gst_base_sink_get_processing_deadline")]
612    pub fn processing_deadline(&self) -> gst::ClockTime {
613        unsafe {
614            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
615                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
616            ))
617            .expect("undefined processing_deadline")
618        }
619    }
620
621    #[doc(alias = "render-delay")]
622    #[doc(alias = "gst_base_sink_get_render_delay")]
623    pub fn render_delay(&self) -> gst::ClockTime {
624        unsafe {
625            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
626                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
627            ))
628            .expect("undefined render_delay")
629        }
630    }
631
632    #[cfg(feature = "v1_18")]
633    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
634    #[doc(alias = "gst_base_sink_get_stats")]
635    pub fn stats(&self) -> gst::Structure {
636        unsafe {
637            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
638                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
639            ))
640        }
641    }
642
643    #[doc(alias = "sync")]
644    pub fn is_sync(&self) -> bool {
645        unsafe {
646            from_glib(gst_base::ffi::gst_base_sink_get_sync(
647                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
648            ))
649        }
650    }
651
652    #[doc(alias = "throttle-time")]
653    #[doc(alias = "gst_base_sink_get_throttle_time")]
654    pub fn throttle_time(&self) -> u64 {
655        unsafe {
656            gst_base::ffi::gst_base_sink_get_throttle_time(
657                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
658            )
659        }
660    }
661
662    #[doc(alias = "ts-offset")]
663    #[doc(alias = "gst_base_sink_get_ts_offset")]
664    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
665        unsafe {
666            gst_base::ffi::gst_base_sink_get_ts_offset(
667                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
668            )
669        }
670    }
671
672    #[doc(alias = "async")]
673    #[doc(alias = "gst_base_sink_is_async_enabled")]
674    pub fn is_async(&self) -> bool {
675        unsafe {
676            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
677                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
678            ))
679        }
680    }
681
682    #[doc(alias = "last-sample")]
683    pub fn enables_last_sample(&self) -> bool {
684        unsafe {
685            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
686                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
687            ))
688        }
689    }
690
691    #[doc(alias = "qos")]
692    #[doc(alias = "gst_base_sink_is_qos_enabled")]
693    pub fn is_qos(&self) -> bool {
694        unsafe {
695            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
696                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
697            ))
698        }
699    }
700
701    #[doc(alias = "async")]
702    #[doc(alias = "gst_base_sink_set_async_enabled")]
703    pub fn set_async(&self, enabled: bool) {
704        unsafe {
705            gst_base::ffi::gst_base_sink_set_async_enabled(
706                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
707                enabled.into_glib(),
708            );
709        }
710    }
711
712    #[doc(alias = "drop-out-of-segment")]
713    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
714    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
715        unsafe {
716            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
717                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
718                drop_out_of_segment.into_glib(),
719            );
720        }
721    }
722
723    #[doc(alias = "last-sample")]
724    pub fn set_enable_last_sample(&self, enabled: bool) {
725        unsafe {
726            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
727                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
728                enabled.into_glib(),
729            );
730        }
731    }
732
733    #[doc(alias = "max-bitrate")]
734    #[doc(alias = "gst_base_sink_set_max_bitrate")]
735    pub fn set_max_bitrate(&self, max_bitrate: u64) {
736        unsafe {
737            gst_base::ffi::gst_base_sink_set_max_bitrate(
738                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
739                max_bitrate,
740            );
741        }
742    }
743
744    #[doc(alias = "max-lateness")]
745    #[doc(alias = "gst_base_sink_set_max_lateness")]
746    pub fn set_max_lateness(&self, max_lateness: i64) {
747        unsafe {
748            gst_base::ffi::gst_base_sink_set_max_lateness(
749                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
750                max_lateness,
751            );
752        }
753    }
754
755    #[doc(alias = "processing-deadline")]
756    #[cfg(feature = "v1_16")]
757    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
758    #[doc(alias = "gst_base_sink_set_processing_deadline")]
759    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
760        unsafe {
761            gst_base::ffi::gst_base_sink_set_processing_deadline(
762                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
763                processing_deadline.into_glib(),
764            );
765        }
766    }
767
768    #[doc(alias = "qos")]
769    #[doc(alias = "gst_base_sink_set_qos_enabled")]
770    pub fn set_qos(&self, enabled: bool) {
771        unsafe {
772            gst_base::ffi::gst_base_sink_set_qos_enabled(
773                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
774                enabled.into_glib(),
775            );
776        }
777    }
778
779    #[doc(alias = "render-delay")]
780    #[doc(alias = "gst_base_sink_set_render_delay")]
781    pub fn set_render_delay(&self, delay: gst::ClockTime) {
782        unsafe {
783            gst_base::ffi::gst_base_sink_set_render_delay(
784                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
785                delay.into_glib(),
786            );
787        }
788    }
789
790    #[doc(alias = "sync")]
791    #[doc(alias = "gst_base_sink_set_sync")]
792    pub fn set_sync(&self, sync: bool) {
793        unsafe {
794            gst_base::ffi::gst_base_sink_set_sync(
795                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
796                sync.into_glib(),
797            );
798        }
799    }
800
801    #[doc(alias = "throttle-time")]
802    #[doc(alias = "gst_base_sink_set_throttle_time")]
803    pub fn set_throttle_time(&self, throttle: u64) {
804        unsafe {
805            gst_base::ffi::gst_base_sink_set_throttle_time(
806                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
807                throttle,
808            );
809        }
810    }
811
812    #[doc(alias = "ts-offset")]
813    #[doc(alias = "gst_base_sink_set_ts_offset")]
814    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
815        unsafe {
816            gst_base::ffi::gst_base_sink_set_ts_offset(
817                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
818                offset,
819            );
820        }
821    }
822
823    #[doc(alias = "async")]
824    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
825        &self,
826        f: F,
827    ) -> glib::SignalHandlerId {
828        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
829            this: *mut ffi::GstAppSink,
830            _param_spec: glib::ffi::gpointer,
831            f: glib::ffi::gpointer,
832        ) {
833            let f: &F = &*(f as *const F);
834            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
835        }
836        unsafe {
837            let f: Box<F> = Box::new(f);
838            glib::signal::connect_raw(
839                self.as_ptr() as *mut _,
840                b"notify::async\0".as_ptr() as *const _,
841                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
842                    notify_async_trampoline::<F> as *const (),
843                )),
844                Box::into_raw(f),
845            )
846        }
847    }
848
849    #[doc(alias = "blocksize")]
850    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
851        &self,
852        f: F,
853    ) -> glib::SignalHandlerId {
854        unsafe extern "C" fn notify_blocksize_trampoline<
855            F: Fn(&AppSink) + Send + Sync + 'static,
856        >(
857            this: *mut ffi::GstAppSink,
858            _param_spec: glib::ffi::gpointer,
859            f: glib::ffi::gpointer,
860        ) {
861            let f: &F = &*(f as *const F);
862            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
863        }
864        unsafe {
865            let f: Box<F> = Box::new(f);
866            glib::signal::connect_raw(
867                self.as_ptr() as *mut _,
868                b"notify::blocksize\0".as_ptr() as *const _,
869                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
870                    notify_blocksize_trampoline::<F> as *const (),
871                )),
872                Box::into_raw(f),
873            )
874        }
875    }
876
877    #[doc(alias = "enable-last-sample")]
878    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
879        &self,
880        f: F,
881    ) -> glib::SignalHandlerId {
882        unsafe extern "C" fn notify_enable_last_sample_trampoline<
883            F: Fn(&AppSink) + Send + Sync + 'static,
884        >(
885            this: *mut ffi::GstAppSink,
886            _param_spec: glib::ffi::gpointer,
887            f: glib::ffi::gpointer,
888        ) {
889            let f: &F = &*(f as *const F);
890            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
891        }
892        unsafe {
893            let f: Box<F> = Box::new(f);
894            glib::signal::connect_raw(
895                self.as_ptr() as *mut _,
896                b"notify::enable-last-sample\0".as_ptr() as *const _,
897                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
898                    notify_enable_last_sample_trampoline::<F> as *const (),
899                )),
900                Box::into_raw(f),
901            )
902        }
903    }
904
905    #[doc(alias = "last-sample")]
906    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
907        &self,
908        f: F,
909    ) -> glib::SignalHandlerId {
910        unsafe extern "C" fn notify_last_sample_trampoline<
911            F: Fn(&AppSink) + Send + Sync + 'static,
912        >(
913            this: *mut ffi::GstAppSink,
914            _param_spec: glib::ffi::gpointer,
915            f: glib::ffi::gpointer,
916        ) {
917            let f: &F = &*(f as *const F);
918            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
919        }
920        unsafe {
921            let f: Box<F> = Box::new(f);
922            glib::signal::connect_raw(
923                self.as_ptr() as *mut _,
924                b"notify::last-sample\0".as_ptr() as *const _,
925                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
926                    notify_last_sample_trampoline::<F> as *const (),
927                )),
928                Box::into_raw(f),
929            )
930        }
931    }
932
933    #[doc(alias = "max-bitrate")]
934    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
935        &self,
936        f: F,
937    ) -> glib::SignalHandlerId {
938        unsafe extern "C" fn notify_max_bitrate_trampoline<
939            F: Fn(&AppSink) + Send + Sync + 'static,
940        >(
941            this: *mut ffi::GstAppSink,
942            _param_spec: glib::ffi::gpointer,
943            f: glib::ffi::gpointer,
944        ) {
945            let f: &F = &*(f as *const F);
946            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
947        }
948        unsafe {
949            let f: Box<F> = Box::new(f);
950            glib::signal::connect_raw(
951                self.as_ptr() as *mut _,
952                b"notify::max-bitrate\0".as_ptr() as *const _,
953                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
954                    notify_max_bitrate_trampoline::<F> as *const (),
955                )),
956                Box::into_raw(f),
957            )
958        }
959    }
960
961    #[doc(alias = "max-lateness")]
962    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
963        &self,
964        f: F,
965    ) -> glib::SignalHandlerId {
966        unsafe extern "C" fn notify_max_lateness_trampoline<
967            F: Fn(&AppSink) + Send + Sync + 'static,
968        >(
969            this: *mut ffi::GstAppSink,
970            _param_spec: glib::ffi::gpointer,
971            f: glib::ffi::gpointer,
972        ) {
973            let f: &F = &*(f as *const F);
974            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
975        }
976        unsafe {
977            let f: Box<F> = Box::new(f);
978            glib::signal::connect_raw(
979                self.as_ptr() as *mut _,
980                b"notify::max-lateness\0".as_ptr() as *const _,
981                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
982                    notify_max_lateness_trampoline::<F> as *const (),
983                )),
984                Box::into_raw(f),
985            )
986        }
987    }
988
989    #[cfg(feature = "v1_16")]
990    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
991    #[doc(alias = "processing-deadline")]
992    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
993        &self,
994        f: F,
995    ) -> glib::SignalHandlerId {
996        unsafe extern "C" fn notify_processing_deadline_trampoline<
997            F: Fn(&AppSink) + Send + Sync + 'static,
998        >(
999            this: *mut ffi::GstAppSink,
1000            _param_spec: glib::ffi::gpointer,
1001            f: glib::ffi::gpointer,
1002        ) {
1003            let f: &F = &*(f as *const F);
1004            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1005        }
1006        unsafe {
1007            let f: Box<F> = Box::new(f);
1008            glib::signal::connect_raw(
1009                self.as_ptr() as *mut _,
1010                b"notify::processing-deadline\0".as_ptr() as *const _,
1011                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1012                    notify_processing_deadline_trampoline::<F> as *const (),
1013                )),
1014                Box::into_raw(f),
1015            )
1016        }
1017    }
1018
1019    #[doc(alias = "qos")]
1020    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1021        &self,
1022        f: F,
1023    ) -> glib::SignalHandlerId {
1024        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1025            this: *mut ffi::GstAppSink,
1026            _param_spec: glib::ffi::gpointer,
1027            f: glib::ffi::gpointer,
1028        ) {
1029            let f: &F = &*(f as *const F);
1030            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1031        }
1032        unsafe {
1033            let f: Box<F> = Box::new(f);
1034            glib::signal::connect_raw(
1035                self.as_ptr() as *mut _,
1036                b"notify::qos\0".as_ptr() as *const _,
1037                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1038                    notify_qos_trampoline::<F> as *const (),
1039                )),
1040                Box::into_raw(f),
1041            )
1042        }
1043    }
1044
1045    #[doc(alias = "render-delay")]
1046    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1047        &self,
1048        f: F,
1049    ) -> glib::SignalHandlerId {
1050        unsafe extern "C" fn notify_render_delay_trampoline<
1051            F: Fn(&AppSink) + Send + Sync + 'static,
1052        >(
1053            this: *mut ffi::GstAppSink,
1054            _param_spec: glib::ffi::gpointer,
1055            f: glib::ffi::gpointer,
1056        ) {
1057            let f: &F = &*(f as *const F);
1058            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1059        }
1060        unsafe {
1061            let f: Box<F> = Box::new(f);
1062            glib::signal::connect_raw(
1063                self.as_ptr() as *mut _,
1064                b"notify::render-delay\0".as_ptr() as *const _,
1065                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1066                    notify_render_delay_trampoline::<F> as *const (),
1067                )),
1068                Box::into_raw(f),
1069            )
1070        }
1071    }
1072
1073    #[cfg(feature = "v1_18")]
1074    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1075    #[doc(alias = "stats")]
1076    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1077        &self,
1078        f: F,
1079    ) -> glib::SignalHandlerId {
1080        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1081            this: *mut ffi::GstAppSink,
1082            _param_spec: glib::ffi::gpointer,
1083            f: glib::ffi::gpointer,
1084        ) {
1085            let f: &F = &*(f as *const F);
1086            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1087        }
1088        unsafe {
1089            let f: Box<F> = Box::new(f);
1090            glib::signal::connect_raw(
1091                self.as_ptr() as *mut _,
1092                b"notify::stats\0".as_ptr() as *const _,
1093                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1094                    notify_stats_trampoline::<F> as *const (),
1095                )),
1096                Box::into_raw(f),
1097            )
1098        }
1099    }
1100
1101    #[doc(alias = "sync")]
1102    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1103        &self,
1104        f: F,
1105    ) -> glib::SignalHandlerId {
1106        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1107            this: *mut ffi::GstAppSink,
1108            _param_spec: glib::ffi::gpointer,
1109            f: glib::ffi::gpointer,
1110        ) {
1111            let f: &F = &*(f as *const F);
1112            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1113        }
1114        unsafe {
1115            let f: Box<F> = Box::new(f);
1116            glib::signal::connect_raw(
1117                self.as_ptr() as *mut _,
1118                b"notify::sync\0".as_ptr() as *const _,
1119                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1120                    notify_sync_trampoline::<F> as *const (),
1121                )),
1122                Box::into_raw(f),
1123            )
1124        }
1125    }
1126
1127    #[doc(alias = "throttle-time")]
1128    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1129        &self,
1130        f: F,
1131    ) -> glib::SignalHandlerId {
1132        unsafe extern "C" fn notify_throttle_time_trampoline<
1133            F: Fn(&AppSink) + Send + Sync + 'static,
1134        >(
1135            this: *mut ffi::GstAppSink,
1136            _param_spec: glib::ffi::gpointer,
1137            f: glib::ffi::gpointer,
1138        ) {
1139            let f: &F = &*(f as *const F);
1140            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1141        }
1142        unsafe {
1143            let f: Box<F> = Box::new(f);
1144            glib::signal::connect_raw(
1145                self.as_ptr() as *mut _,
1146                b"notify::throttle-time\0".as_ptr() as *const _,
1147                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1148                    notify_throttle_time_trampoline::<F> as *const (),
1149                )),
1150                Box::into_raw(f),
1151            )
1152        }
1153    }
1154
1155    #[doc(alias = "ts-offset")]
1156    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1157        &self,
1158        f: F,
1159    ) -> glib::SignalHandlerId {
1160        unsafe extern "C" fn notify_ts_offset_trampoline<
1161            F: Fn(&AppSink) + Send + Sync + 'static,
1162        >(
1163            this: *mut ffi::GstAppSink,
1164            _param_spec: glib::ffi::gpointer,
1165            f: glib::ffi::gpointer,
1166        ) {
1167            let f: &F = &*(f as *const F);
1168            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1169        }
1170        unsafe {
1171            let f: Box<F> = Box::new(f);
1172            glib::signal::connect_raw(
1173                self.as_ptr() as *mut _,
1174                b"notify::ts-offset\0".as_ptr() as *const _,
1175                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1176                    notify_ts_offset_trampoline::<F> as *const (),
1177                )),
1178                Box::into_raw(f),
1179            )
1180        }
1181    }
1182
1183    pub fn stream(&self) -> AppSinkStream {
1184        AppSinkStream::new(self)
1185    }
1186}
1187
1188// rustdoc-stripper-ignore-next
1189/// A [builder-pattern] type to construct [`AppSink`] objects.
1190///
1191/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1192#[must_use = "The builder must be built to be used"]
1193pub struct AppSinkBuilder<'a> {
1194    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1195    callbacks: Option<AppSinkCallbacks>,
1196    drop_out_of_segment: Option<bool>,
1197}
1198
1199impl<'a> AppSinkBuilder<'a> {
1200    // rustdoc-stripper-ignore-next
1201    /// Build the [`AppSink`].
1202    ///
1203    /// # Panics
1204    ///
1205    /// This panics if the [`AppSink`] doesn't have all the given properties or
1206    /// property values of the wrong type are provided.
1207    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1208    pub fn build(self) -> AppSink {
1209        let appsink = self.builder.build().unwrap();
1210
1211        if let Some(callbacks) = self.callbacks {
1212            appsink.set_callbacks(callbacks);
1213        }
1214
1215        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1216            appsink.set_drop_out_of_segment(drop_out_of_segment);
1217        }
1218
1219        appsink
1220    }
1221
1222    pub fn async_(self, async_: bool) -> Self {
1223        Self {
1224            builder: self.builder.property("async", async_),
1225            ..self
1226        }
1227    }
1228
1229    pub fn buffer_list(self, buffer_list: bool) -> Self {
1230        Self {
1231            builder: self.builder.property("buffer-list", buffer_list),
1232            ..self
1233        }
1234    }
1235
1236    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1237        Self {
1238            callbacks: Some(callbacks),
1239            ..self
1240        }
1241    }
1242
1243    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1244        Self {
1245            builder: self.builder.property("caps", caps),
1246            ..self
1247        }
1248    }
1249
1250    pub fn drop(self, drop: bool) -> Self {
1251        Self {
1252            builder: self.builder.property("drop", drop),
1253            ..self
1254        }
1255    }
1256
1257    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1258        Self {
1259            builder: self
1260                .builder
1261                .property("drop-out-of-segment", drop_out_of_segment),
1262            ..self
1263        }
1264    }
1265
1266    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1267        Self {
1268            builder: self
1269                .builder
1270                .property("enable-last-sample", enable_last_sample),
1271            ..self
1272        }
1273    }
1274
1275    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1276        Self {
1277            builder: self.builder.property("max-bitrate", max_bitrate),
1278            ..self
1279        }
1280    }
1281
1282    pub fn max_buffers(self, max_buffers: u32) -> Self {
1283        Self {
1284            builder: self.builder.property("max-buffers", max_buffers),
1285            ..self
1286        }
1287    }
1288
1289    pub fn max_lateness(self, max_lateness: i64) -> Self {
1290        Self {
1291            builder: self.builder.property("max-lateness", max_lateness),
1292            ..self
1293        }
1294    }
1295
1296    #[cfg(feature = "v1_16")]
1297    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1298    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1299        Self {
1300            builder: self
1301                .builder
1302                .property("processing-deadline", processing_deadline),
1303            ..self
1304        }
1305    }
1306
1307    pub fn qos(self, qos: bool) -> Self {
1308        Self {
1309            builder: self.builder.property("qos", qos),
1310            ..self
1311        }
1312    }
1313
1314    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1315        Self {
1316            builder: self.builder.property("render-delay", render_delay),
1317            ..self
1318        }
1319    }
1320
1321    pub fn sync(self, sync: bool) -> Self {
1322        Self {
1323            builder: self.builder.property("sync", sync),
1324            ..self
1325        }
1326    }
1327
1328    pub fn throttle_time(self, throttle_time: u64) -> Self {
1329        Self {
1330            builder: self.builder.property("throttle-time", throttle_time),
1331            ..self
1332        }
1333    }
1334
1335    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1336        Self {
1337            builder: self.builder.property("ts-offset", ts_offset),
1338            ..self
1339        }
1340    }
1341
1342    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1343        Self {
1344            builder: self.builder.property("wait-on-eos", wait_on_eos),
1345            ..self
1346        }
1347    }
1348
1349    #[cfg(feature = "v1_24")]
1350    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1351    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1352        Self {
1353            builder: self.builder.property("max-time", max_time),
1354            ..self
1355        }
1356    }
1357
1358    #[cfg(feature = "v1_24")]
1359    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1360    pub fn max_bytes(self, max_bytes: u64) -> Self {
1361        Self {
1362            builder: self.builder.property("max-bytes", max_bytes),
1363            ..self
1364        }
1365    }
1366
1367    // rustdoc-stripper-ignore-next
1368    /// Sets property `name` to the given value `value`.
1369    ///
1370    /// Overrides any default or previously defined value for `name`.
1371    #[inline]
1372    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1373        Self {
1374            builder: self.builder.property(name, value),
1375            ..self
1376        }
1377    }
1378
1379    // rustdoc-stripper-ignore-next
1380    /// Sets property `name` to the given string value `value`.
1381    #[inline]
1382    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1383        Self {
1384            builder: self.builder.property_from_str(name, value),
1385            ..self
1386        }
1387    }
1388
1389    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1390}
1391
1392#[derive(Debug)]
1393pub struct AppSinkStream {
1394    app_sink: glib::WeakRef<AppSink>,
1395    waker_reference: Arc<Mutex<Option<Waker>>>,
1396}
1397
1398impl AppSinkStream {
1399    fn new(app_sink: &AppSink) -> Self {
1400        skip_assert_initialized!();
1401
1402        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1403
1404        app_sink.set_callbacks(
1405            AppSinkCallbacks::builder()
1406                .new_sample({
1407                    let waker_reference = Arc::clone(&waker_reference);
1408
1409                    move |_| {
1410                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1411                            waker.wake();
1412                        }
1413
1414                        Ok(gst::FlowSuccess::Ok)
1415                    }
1416                })
1417                .eos({
1418                    let waker_reference = Arc::clone(&waker_reference);
1419
1420                    move |_| {
1421                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1422                            waker.wake();
1423                        }
1424                    }
1425                })
1426                .build(),
1427        );
1428
1429        Self {
1430            app_sink: app_sink.downgrade(),
1431            waker_reference,
1432        }
1433    }
1434}
1435
1436impl Drop for AppSinkStream {
1437    fn drop(&mut self) {
1438        #[cfg(not(feature = "v1_18"))]
1439        {
1440            // This is not thread-safe before 1.16.3, see
1441            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1442            if gst::version() >= (1, 16, 3, 0) {
1443                if let Some(app_sink) = self.app_sink.upgrade() {
1444                    app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1445                }
1446            }
1447        }
1448    }
1449}
1450
1451impl Stream for AppSinkStream {
1452    type Item = gst::Sample;
1453
1454    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1455        let mut waker = self.waker_reference.lock().unwrap();
1456
1457        let Some(app_sink) = self.app_sink.upgrade() else {
1458            return Poll::Ready(None);
1459        };
1460
1461        app_sink
1462            .try_pull_sample(gst::ClockTime::ZERO)
1463            .map(|sample| Poll::Ready(Some(sample)))
1464            .unwrap_or_else(|| {
1465                if app_sink.is_eos() {
1466                    return Poll::Ready(None);
1467                }
1468
1469                waker.replace(context.waker().to_owned());
1470
1471                Poll::Pending
1472            })
1473    }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478    use futures_util::StreamExt;
1479    use gst::prelude::*;
1480
1481    use super::*;
1482
1483    #[test]
1484    fn test_app_sink_stream() {
1485        gst::init().unwrap();
1486
1487        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1488            .property("num-buffers", 5)
1489            .build()
1490            .unwrap();
1491        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1492
1493        let pipeline = gst::Pipeline::new();
1494        pipeline.add(&videotestsrc).unwrap();
1495        pipeline.add(&appsink).unwrap();
1496
1497        videotestsrc.link(&appsink).unwrap();
1498
1499        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1500        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1501
1502        pipeline.set_state(gst::State::Playing).unwrap();
1503        let samples = futures_executor::block_on(samples_future);
1504        pipeline.set_state(gst::State::Null).unwrap();
1505
1506        assert_eq!(samples.len(), 5);
1507    }
1508}