gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate {
77            self.eos(eos)
78        } else {
79            self
80        }
81    }
82
83    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84        if let Some(eos) = eos {
85            self.eos(eos)
86        } else {
87            self
88        }
89    }
90
91    pub fn new_preroll<
92        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93    >(
94        self,
95        new_preroll: F,
96    ) -> Self {
97        Self {
98            new_preroll: Some(Box::new(new_preroll)),
99            ..self
100        }
101    }
102
103    pub fn new_preroll_if<
104        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105    >(
106        self,
107        new_preroll: F,
108        predicate: bool,
109    ) -> Self {
110        if predicate {
111            self.new_preroll(new_preroll)
112        } else {
113            self
114        }
115    }
116
117    pub fn new_preroll_if_some<
118        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119    >(
120        self,
121        new_preroll: Option<F>,
122    ) -> Self {
123        if let Some(new_preroll) = new_preroll {
124            self.new_preroll(new_preroll)
125        } else {
126            self
127        }
128    }
129
130    pub fn new_sample<
131        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132    >(
133        self,
134        new_sample: F,
135    ) -> Self {
136        Self {
137            new_sample: Some(Box::new(new_sample)),
138            ..self
139        }
140    }
141
142    pub fn new_sample_if<
143        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144    >(
145        self,
146        new_sample: F,
147        predicate: bool,
148    ) -> Self {
149        if predicate {
150            self.new_sample(new_sample)
151        } else {
152            self
153        }
154    }
155
156    pub fn new_sample_if_some<
157        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158    >(
159        self,
160        new_sample: Option<F>,
161    ) -> Self {
162        if let Some(new_sample) = new_sample {
163            self.new_sample(new_sample)
164        } else {
165            self
166        }
167    }
168
169    #[cfg(feature = "v1_20")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172        Self {
173            new_event: Some(Box::new(new_event)),
174            ..self
175        }
176    }
177
178    #[cfg(feature = "v1_20")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181        self,
182        new_event: F,
183        predicate: bool,
184    ) -> Self {
185        if predicate {
186            self.new_event(new_event)
187        } else {
188            self
189        }
190    }
191
192    #[cfg(feature = "v1_20")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195        self,
196        new_event: Option<F>,
197    ) -> Self {
198        if let Some(new_event) = new_event {
199            self.new_event(new_event)
200        } else {
201            self
202        }
203    }
204
205    #[cfg(feature = "v1_24")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207    pub fn propose_allocation<
208        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209    >(
210        self,
211        propose_allocation: F,
212    ) -> Self {
213        Self {
214            propose_allocation: Some(Box::new(propose_allocation)),
215            ..self
216        }
217    }
218
219    #[cfg(feature = "v1_24")]
220    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221    pub fn propose_allocation_if<
222        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223    >(
224        self,
225        propose_allocation: F,
226        predicate: bool,
227    ) -> Self {
228        if predicate {
229            self.propose_allocation(propose_allocation)
230        } else {
231            self
232        }
233    }
234
235    #[cfg(feature = "v1_24")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237    pub fn propose_allocation_if_some<
238        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239    >(
240        self,
241        propose_allocation: Option<F>,
242    ) -> Self {
243        if let Some(propose_allocation) = propose_allocation {
244            self.propose_allocation(propose_allocation)
245        } else {
246            self
247        }
248    }
249
250    #[must_use = "Building the callbacks without using them has no effect"]
251    pub fn build(self) -> AppSinkCallbacks {
252        let have_eos = self.eos.is_some();
253        let have_new_preroll = self.new_preroll.is_some();
254        let have_new_sample = self.new_sample.is_some();
255        let have_new_event = self.new_event.is_some();
256        let have_propose_allocation = self.propose_allocation.is_some();
257
258        AppSinkCallbacks {
259            eos: self.eos,
260            new_preroll: self.new_preroll,
261            new_sample: self.new_sample,
262            new_event: self.new_event,
263            propose_allocation: self.propose_allocation,
264            #[cfg(not(panic = "abort"))]
265            panicked: AtomicBool::new(false),
266            callbacks: ffi::GstAppSinkCallbacks {
267                eos: if have_eos { Some(trampoline_eos) } else { None },
268                new_preroll: if have_new_preroll {
269                    Some(trampoline_new_preroll)
270                } else {
271                    None
272                },
273                new_sample: if have_new_sample {
274                    Some(trampoline_new_sample)
275                } else {
276                    None
277                },
278                new_event: if have_new_event {
279                    Some(trampoline_new_event)
280                } else {
281                    None
282                },
283                propose_allocation: if have_propose_allocation {
284                    Some(trampoline_propose_allocation)
285                } else {
286                    None
287                },
288                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289            },
290        }
291    }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295    let callbacks = callbacks as *mut AppSinkCallbacks;
296    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298    #[cfg(not(panic = "abort"))]
299    if (*callbacks).panicked.load(Ordering::Relaxed) {
300        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302        return;
303    }
304
305    if let Some(ref mut eos) = (*callbacks).eos {
306        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307        match result {
308            Ok(result) => result,
309            Err(err) => {
310                #[cfg(panic = "abort")]
311                {
312                    unreachable!("{err:?}");
313                }
314                #[cfg(not(panic = "abort"))]
315                {
316                    (*callbacks).panicked.store(true, Ordering::Relaxed);
317                    gst::subclass::post_panic_error_message(
318                        element.upcast_ref(),
319                        element.upcast_ref(),
320                        Some(err),
321                    );
322                }
323            }
324        }
325    }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329    appsink: *mut ffi::GstAppSink,
330    callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332    let callbacks = callbacks as *mut AppSinkCallbacks;
333    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335    #[cfg(not(panic = "abort"))]
336    if (*callbacks).panicked.load(Ordering::Relaxed) {
337        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339        return gst::FlowReturn::Error.into_glib();
340    }
341
342    let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344        match result {
345            Ok(result) => result,
346            Err(err) => {
347                #[cfg(panic = "abort")]
348                {
349                    unreachable!("{err:?}");
350                }
351                #[cfg(not(panic = "abort"))]
352                {
353                    (*callbacks).panicked.store(true, Ordering::Relaxed);
354                    gst::subclass::post_panic_error_message(
355                        element.upcast_ref(),
356                        element.upcast_ref(),
357                        Some(err),
358                    );
359
360                    gst::FlowReturn::Error
361                }
362            }
363        }
364    } else {
365        gst::FlowReturn::Error
366    };
367
368    ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372    appsink: *mut ffi::GstAppSink,
373    callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375    let callbacks = callbacks as *mut AppSinkCallbacks;
376    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378    #[cfg(not(panic = "abort"))]
379    if (*callbacks).panicked.load(Ordering::Relaxed) {
380        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382        return gst::FlowReturn::Error.into_glib();
383    }
384
385    let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387        match result {
388            Ok(result) => result,
389            Err(err) => {
390                #[cfg(panic = "abort")]
391                {
392                    unreachable!("{err:?}");
393                }
394                #[cfg(not(panic = "abort"))]
395                {
396                    (*callbacks).panicked.store(true, Ordering::Relaxed);
397                    gst::subclass::post_panic_error_message(
398                        element.upcast_ref(),
399                        element.upcast_ref(),
400                        Some(err),
401                    );
402
403                    gst::FlowReturn::Error
404                }
405            }
406        }
407    } else {
408        gst::FlowReturn::Error
409    };
410
411    ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415    appsink: *mut ffi::GstAppSink,
416    callbacks: gpointer,
417) -> glib::ffi::gboolean {
418    let callbacks = callbacks as *mut AppSinkCallbacks;
419    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421    #[cfg(not(panic = "abort"))]
422    if (*callbacks).panicked.load(Ordering::Relaxed) {
423        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425        return false.into_glib();
426    }
427
428    let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430        match result {
431            Ok(result) => result,
432            Err(err) => {
433                #[cfg(panic = "abort")]
434                {
435                    unreachable!("{err:?}");
436                }
437                #[cfg(not(panic = "abort"))]
438                {
439                    (*callbacks).panicked.store(true, Ordering::Relaxed);
440                    gst::subclass::post_panic_error_message(
441                        element.upcast_ref(),
442                        element.upcast_ref(),
443                        Some(err),
444                    );
445
446                    false
447                }
448            }
449        }
450    } else {
451        false
452    };
453
454    ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458    appsink: *mut ffi::GstAppSink,
459    query: *mut gst::ffi::GstQuery,
460    callbacks: gpointer,
461) -> glib::ffi::gboolean {
462    let callbacks = callbacks as *mut AppSinkCallbacks;
463    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465    #[cfg(not(panic = "abort"))]
466    if (*callbacks).panicked.load(Ordering::Relaxed) {
467        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469        return false.into_glib();
470    }
471
472    let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473        let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474            gst::QueryViewMut::Allocation(allocation) => allocation,
475            _ => unreachable!(),
476        };
477        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478            propose_allocation(&element, query)
479        }));
480        match result {
481            Ok(result) => result,
482            Err(err) => {
483                #[cfg(panic = "abort")]
484                {
485                    unreachable!("{err:?}");
486                }
487                #[cfg(not(panic = "abort"))]
488                {
489                    (*callbacks).panicked.store(true, Ordering::Relaxed);
490                    gst::subclass::post_panic_error_message(
491                        element.upcast_ref(),
492                        element.upcast_ref(),
493                        Some(err),
494                    );
495                    false
496                }
497            }
498        }
499    } else {
500        false
501    };
502
503    ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507    let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511    // rustdoc-stripper-ignore-next
512    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
513    ///
514    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
515    pub fn builder() -> AppSinkBuilder {
516        assert_initialized_main_thread!();
517        AppSinkBuilder::new()
518    }
519
520    /// Set callbacks which will be executed for each new preroll, new sample and eos.
521    /// This is an alternative to using the signals, it has lower overhead and is thus
522    /// less expensive, but also less flexible.
523    ///
524    /// If callbacks are installed, no signals will be emitted for performance
525    /// reasons.
526    ///
527    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
528    /// way.
529    /// ## `callbacks`
530    /// the callbacks
531    /// ## `notify`
532    /// a destroy notify function
533    #[doc(alias = "gst_app_sink_set_callbacks")]
534    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
535        unsafe {
536            let sink = self.to_glib_none().0;
537
538            #[cfg(not(feature = "v1_18"))]
539            {
540                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
541                    std::sync::OnceLock::new();
542
543                let set_once_quark = SET_ONCE_QUARK
544                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
545
546                // This is not thread-safe before 1.16.3, see
547                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
548                if gst::version() < (1, 16, 3, 0) {
549                    if !glib::gobject_ffi::g_object_get_qdata(
550                        sink as *mut _,
551                        set_once_quark.into_glib(),
552                    )
553                    .is_null()
554                    {
555                        panic!("AppSink callbacks can only be set once");
556                    }
557
558                    glib::gobject_ffi::g_object_set_qdata(
559                        sink as *mut _,
560                        set_once_quark.into_glib(),
561                        1 as *mut _,
562                    );
563                }
564            }
565
566            ffi::gst_app_sink_set_callbacks(
567                sink,
568                mut_override(&callbacks.callbacks),
569                Box::into_raw(Box::new(callbacks)) as *mut _,
570                Some(destroy_callbacks),
571            );
572        }
573    }
574
575    #[doc(alias = "drop-out-of-segment")]
576    pub fn drops_out_of_segment(&self) -> bool {
577        unsafe {
578            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
579                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
580            ))
581        }
582    }
583
584    #[doc(alias = "max-bitrate")]
585    #[doc(alias = "gst_base_sink_get_max_bitrate")]
586    pub fn max_bitrate(&self) -> u64 {
587        unsafe {
588            gst_base::ffi::gst_base_sink_get_max_bitrate(
589                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
590            )
591        }
592    }
593
594    #[doc(alias = "max-lateness")]
595    #[doc(alias = "gst_base_sink_get_max_lateness")]
596    pub fn max_lateness(&self) -> i64 {
597        unsafe {
598            gst_base::ffi::gst_base_sink_get_max_lateness(
599                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
600            )
601        }
602    }
603
604    #[doc(alias = "processing-deadline")]
605    #[cfg(feature = "v1_16")]
606    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
607    #[doc(alias = "gst_base_sink_get_processing_deadline")]
608    pub fn processing_deadline(&self) -> gst::ClockTime {
609        unsafe {
610            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
611                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
612            ))
613            .expect("undefined processing_deadline")
614        }
615    }
616
617    #[doc(alias = "render-delay")]
618    #[doc(alias = "gst_base_sink_get_render_delay")]
619    pub fn render_delay(&self) -> gst::ClockTime {
620        unsafe {
621            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
622                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
623            ))
624            .expect("undefined render_delay")
625        }
626    }
627
628    #[cfg(feature = "v1_18")]
629    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
630    #[doc(alias = "gst_base_sink_get_stats")]
631    pub fn stats(&self) -> gst::Structure {
632        unsafe {
633            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
634                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
635            ))
636        }
637    }
638
639    #[doc(alias = "sync")]
640    pub fn is_sync(&self) -> bool {
641        unsafe {
642            from_glib(gst_base::ffi::gst_base_sink_get_sync(
643                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
644            ))
645        }
646    }
647
648    #[doc(alias = "throttle-time")]
649    #[doc(alias = "gst_base_sink_get_throttle_time")]
650    pub fn throttle_time(&self) -> u64 {
651        unsafe {
652            gst_base::ffi::gst_base_sink_get_throttle_time(
653                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
654            )
655        }
656    }
657
658    #[doc(alias = "ts-offset")]
659    #[doc(alias = "gst_base_sink_get_ts_offset")]
660    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
661        unsafe {
662            gst_base::ffi::gst_base_sink_get_ts_offset(
663                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
664            )
665        }
666    }
667
668    #[doc(alias = "async")]
669    #[doc(alias = "gst_base_sink_is_async_enabled")]
670    pub fn is_async(&self) -> bool {
671        unsafe {
672            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
673                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
674            ))
675        }
676    }
677
678    #[doc(alias = "last-sample")]
679    pub fn enables_last_sample(&self) -> bool {
680        unsafe {
681            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
682                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
683            ))
684        }
685    }
686
687    #[doc(alias = "qos")]
688    #[doc(alias = "gst_base_sink_is_qos_enabled")]
689    pub fn is_qos(&self) -> bool {
690        unsafe {
691            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
692                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
693            ))
694        }
695    }
696
697    #[doc(alias = "async")]
698    #[doc(alias = "gst_base_sink_set_async_enabled")]
699    pub fn set_async(&self, enabled: bool) {
700        unsafe {
701            gst_base::ffi::gst_base_sink_set_async_enabled(
702                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
703                enabled.into_glib(),
704            );
705        }
706    }
707
708    #[doc(alias = "drop-out-of-segment")]
709    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
710    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
711        unsafe {
712            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
713                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
714                drop_out_of_segment.into_glib(),
715            );
716        }
717    }
718
719    #[doc(alias = "last-sample")]
720    pub fn set_enable_last_sample(&self, enabled: bool) {
721        unsafe {
722            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
723                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
724                enabled.into_glib(),
725            );
726        }
727    }
728
729    #[doc(alias = "max-bitrate")]
730    #[doc(alias = "gst_base_sink_set_max_bitrate")]
731    pub fn set_max_bitrate(&self, max_bitrate: u64) {
732        unsafe {
733            gst_base::ffi::gst_base_sink_set_max_bitrate(
734                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
735                max_bitrate,
736            );
737        }
738    }
739
740    #[doc(alias = "max-lateness")]
741    #[doc(alias = "gst_base_sink_set_max_lateness")]
742    pub fn set_max_lateness(&self, max_lateness: i64) {
743        unsafe {
744            gst_base::ffi::gst_base_sink_set_max_lateness(
745                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746                max_lateness,
747            );
748        }
749    }
750
751    #[doc(alias = "processing-deadline")]
752    #[cfg(feature = "v1_16")]
753    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
754    #[doc(alias = "gst_base_sink_set_processing_deadline")]
755    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
756        unsafe {
757            gst_base::ffi::gst_base_sink_set_processing_deadline(
758                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
759                processing_deadline.into_glib(),
760            );
761        }
762    }
763
764    #[doc(alias = "qos")]
765    #[doc(alias = "gst_base_sink_set_qos_enabled")]
766    pub fn set_qos(&self, enabled: bool) {
767        unsafe {
768            gst_base::ffi::gst_base_sink_set_qos_enabled(
769                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
770                enabled.into_glib(),
771            );
772        }
773    }
774
775    #[doc(alias = "render-delay")]
776    #[doc(alias = "gst_base_sink_set_render_delay")]
777    pub fn set_render_delay(&self, delay: gst::ClockTime) {
778        unsafe {
779            gst_base::ffi::gst_base_sink_set_render_delay(
780                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
781                delay.into_glib(),
782            );
783        }
784    }
785
786    #[doc(alias = "sync")]
787    #[doc(alias = "gst_base_sink_set_sync")]
788    pub fn set_sync(&self, sync: bool) {
789        unsafe {
790            gst_base::ffi::gst_base_sink_set_sync(
791                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
792                sync.into_glib(),
793            );
794        }
795    }
796
797    #[doc(alias = "throttle-time")]
798    #[doc(alias = "gst_base_sink_set_throttle_time")]
799    pub fn set_throttle_time(&self, throttle: u64) {
800        unsafe {
801            gst_base::ffi::gst_base_sink_set_throttle_time(
802                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
803                throttle,
804            );
805        }
806    }
807
808    #[doc(alias = "ts-offset")]
809    #[doc(alias = "gst_base_sink_set_ts_offset")]
810    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
811        unsafe {
812            gst_base::ffi::gst_base_sink_set_ts_offset(
813                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
814                offset,
815            );
816        }
817    }
818
819    #[doc(alias = "async")]
820    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
821        &self,
822        f: F,
823    ) -> glib::SignalHandlerId {
824        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
825            this: *mut ffi::GstAppSink,
826            _param_spec: glib::ffi::gpointer,
827            f: glib::ffi::gpointer,
828        ) {
829            let f: &F = &*(f as *const F);
830            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
831        }
832        unsafe {
833            let f: Box<F> = Box::new(f);
834            glib::signal::connect_raw(
835                self.as_ptr() as *mut _,
836                b"notify::async\0".as_ptr() as *const _,
837                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
838                    notify_async_trampoline::<F> as *const (),
839                )),
840                Box::into_raw(f),
841            )
842        }
843    }
844
845    #[doc(alias = "blocksize")]
846    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
847        &self,
848        f: F,
849    ) -> glib::SignalHandlerId {
850        unsafe extern "C" fn notify_blocksize_trampoline<
851            F: Fn(&AppSink) + Send + Sync + 'static,
852        >(
853            this: *mut ffi::GstAppSink,
854            _param_spec: glib::ffi::gpointer,
855            f: glib::ffi::gpointer,
856        ) {
857            let f: &F = &*(f as *const F);
858            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
859        }
860        unsafe {
861            let f: Box<F> = Box::new(f);
862            glib::signal::connect_raw(
863                self.as_ptr() as *mut _,
864                b"notify::blocksize\0".as_ptr() as *const _,
865                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
866                    notify_blocksize_trampoline::<F> as *const (),
867                )),
868                Box::into_raw(f),
869            )
870        }
871    }
872
873    #[doc(alias = "enable-last-sample")]
874    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
875        &self,
876        f: F,
877    ) -> glib::SignalHandlerId {
878        unsafe extern "C" fn notify_enable_last_sample_trampoline<
879            F: Fn(&AppSink) + Send + Sync + 'static,
880        >(
881            this: *mut ffi::GstAppSink,
882            _param_spec: glib::ffi::gpointer,
883            f: glib::ffi::gpointer,
884        ) {
885            let f: &F = &*(f as *const F);
886            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
887        }
888        unsafe {
889            let f: Box<F> = Box::new(f);
890            glib::signal::connect_raw(
891                self.as_ptr() as *mut _,
892                b"notify::enable-last-sample\0".as_ptr() as *const _,
893                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
894                    notify_enable_last_sample_trampoline::<F> as *const (),
895                )),
896                Box::into_raw(f),
897            )
898        }
899    }
900
901    #[doc(alias = "last-sample")]
902    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
903        &self,
904        f: F,
905    ) -> glib::SignalHandlerId {
906        unsafe extern "C" fn notify_last_sample_trampoline<
907            F: Fn(&AppSink) + Send + Sync + 'static,
908        >(
909            this: *mut ffi::GstAppSink,
910            _param_spec: glib::ffi::gpointer,
911            f: glib::ffi::gpointer,
912        ) {
913            let f: &F = &*(f as *const F);
914            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
915        }
916        unsafe {
917            let f: Box<F> = Box::new(f);
918            glib::signal::connect_raw(
919                self.as_ptr() as *mut _,
920                b"notify::last-sample\0".as_ptr() as *const _,
921                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
922                    notify_last_sample_trampoline::<F> as *const (),
923                )),
924                Box::into_raw(f),
925            )
926        }
927    }
928
929    #[doc(alias = "max-bitrate")]
930    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
931        &self,
932        f: F,
933    ) -> glib::SignalHandlerId {
934        unsafe extern "C" fn notify_max_bitrate_trampoline<
935            F: Fn(&AppSink) + Send + Sync + 'static,
936        >(
937            this: *mut ffi::GstAppSink,
938            _param_spec: glib::ffi::gpointer,
939            f: glib::ffi::gpointer,
940        ) {
941            let f: &F = &*(f as *const F);
942            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
943        }
944        unsafe {
945            let f: Box<F> = Box::new(f);
946            glib::signal::connect_raw(
947                self.as_ptr() as *mut _,
948                b"notify::max-bitrate\0".as_ptr() as *const _,
949                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
950                    notify_max_bitrate_trampoline::<F> as *const (),
951                )),
952                Box::into_raw(f),
953            )
954        }
955    }
956
957    #[doc(alias = "max-lateness")]
958    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
959        &self,
960        f: F,
961    ) -> glib::SignalHandlerId {
962        unsafe extern "C" fn notify_max_lateness_trampoline<
963            F: Fn(&AppSink) + Send + Sync + 'static,
964        >(
965            this: *mut ffi::GstAppSink,
966            _param_spec: glib::ffi::gpointer,
967            f: glib::ffi::gpointer,
968        ) {
969            let f: &F = &*(f as *const F);
970            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
971        }
972        unsafe {
973            let f: Box<F> = Box::new(f);
974            glib::signal::connect_raw(
975                self.as_ptr() as *mut _,
976                b"notify::max-lateness\0".as_ptr() as *const _,
977                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
978                    notify_max_lateness_trampoline::<F> as *const (),
979                )),
980                Box::into_raw(f),
981            )
982        }
983    }
984
985    #[cfg(feature = "v1_16")]
986    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
987    #[doc(alias = "processing-deadline")]
988    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
989        &self,
990        f: F,
991    ) -> glib::SignalHandlerId {
992        unsafe extern "C" fn notify_processing_deadline_trampoline<
993            F: Fn(&AppSink) + Send + Sync + 'static,
994        >(
995            this: *mut ffi::GstAppSink,
996            _param_spec: glib::ffi::gpointer,
997            f: glib::ffi::gpointer,
998        ) {
999            let f: &F = &*(f as *const F);
1000            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1001        }
1002        unsafe {
1003            let f: Box<F> = Box::new(f);
1004            glib::signal::connect_raw(
1005                self.as_ptr() as *mut _,
1006                b"notify::processing-deadline\0".as_ptr() as *const _,
1007                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1008                    notify_processing_deadline_trampoline::<F> as *const (),
1009                )),
1010                Box::into_raw(f),
1011            )
1012        }
1013    }
1014
1015    #[doc(alias = "qos")]
1016    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1017        &self,
1018        f: F,
1019    ) -> glib::SignalHandlerId {
1020        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1021            this: *mut ffi::GstAppSink,
1022            _param_spec: glib::ffi::gpointer,
1023            f: glib::ffi::gpointer,
1024        ) {
1025            let f: &F = &*(f as *const F);
1026            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1027        }
1028        unsafe {
1029            let f: Box<F> = Box::new(f);
1030            glib::signal::connect_raw(
1031                self.as_ptr() as *mut _,
1032                b"notify::qos\0".as_ptr() as *const _,
1033                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1034                    notify_qos_trampoline::<F> as *const (),
1035                )),
1036                Box::into_raw(f),
1037            )
1038        }
1039    }
1040
1041    #[doc(alias = "render-delay")]
1042    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1043        &self,
1044        f: F,
1045    ) -> glib::SignalHandlerId {
1046        unsafe extern "C" fn notify_render_delay_trampoline<
1047            F: Fn(&AppSink) + Send + Sync + 'static,
1048        >(
1049            this: *mut ffi::GstAppSink,
1050            _param_spec: glib::ffi::gpointer,
1051            f: glib::ffi::gpointer,
1052        ) {
1053            let f: &F = &*(f as *const F);
1054            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1055        }
1056        unsafe {
1057            let f: Box<F> = Box::new(f);
1058            glib::signal::connect_raw(
1059                self.as_ptr() as *mut _,
1060                b"notify::render-delay\0".as_ptr() as *const _,
1061                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1062                    notify_render_delay_trampoline::<F> as *const (),
1063                )),
1064                Box::into_raw(f),
1065            )
1066        }
1067    }
1068
1069    #[cfg(feature = "v1_18")]
1070    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1071    #[doc(alias = "stats")]
1072    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1073        &self,
1074        f: F,
1075    ) -> glib::SignalHandlerId {
1076        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1077            this: *mut ffi::GstAppSink,
1078            _param_spec: glib::ffi::gpointer,
1079            f: glib::ffi::gpointer,
1080        ) {
1081            let f: &F = &*(f as *const F);
1082            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1083        }
1084        unsafe {
1085            let f: Box<F> = Box::new(f);
1086            glib::signal::connect_raw(
1087                self.as_ptr() as *mut _,
1088                b"notify::stats\0".as_ptr() as *const _,
1089                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1090                    notify_stats_trampoline::<F> as *const (),
1091                )),
1092                Box::into_raw(f),
1093            )
1094        }
1095    }
1096
1097    #[doc(alias = "sync")]
1098    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1099        &self,
1100        f: F,
1101    ) -> glib::SignalHandlerId {
1102        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1103            this: *mut ffi::GstAppSink,
1104            _param_spec: glib::ffi::gpointer,
1105            f: glib::ffi::gpointer,
1106        ) {
1107            let f: &F = &*(f as *const F);
1108            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1109        }
1110        unsafe {
1111            let f: Box<F> = Box::new(f);
1112            glib::signal::connect_raw(
1113                self.as_ptr() as *mut _,
1114                b"notify::sync\0".as_ptr() as *const _,
1115                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1116                    notify_sync_trampoline::<F> as *const (),
1117                )),
1118                Box::into_raw(f),
1119            )
1120        }
1121    }
1122
1123    #[doc(alias = "throttle-time")]
1124    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1125        &self,
1126        f: F,
1127    ) -> glib::SignalHandlerId {
1128        unsafe extern "C" fn notify_throttle_time_trampoline<
1129            F: Fn(&AppSink) + Send + Sync + 'static,
1130        >(
1131            this: *mut ffi::GstAppSink,
1132            _param_spec: glib::ffi::gpointer,
1133            f: glib::ffi::gpointer,
1134        ) {
1135            let f: &F = &*(f as *const F);
1136            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1137        }
1138        unsafe {
1139            let f: Box<F> = Box::new(f);
1140            glib::signal::connect_raw(
1141                self.as_ptr() as *mut _,
1142                b"notify::throttle-time\0".as_ptr() as *const _,
1143                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1144                    notify_throttle_time_trampoline::<F> as *const (),
1145                )),
1146                Box::into_raw(f),
1147            )
1148        }
1149    }
1150
1151    #[doc(alias = "ts-offset")]
1152    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1153        &self,
1154        f: F,
1155    ) -> glib::SignalHandlerId {
1156        unsafe extern "C" fn notify_ts_offset_trampoline<
1157            F: Fn(&AppSink) + Send + Sync + 'static,
1158        >(
1159            this: *mut ffi::GstAppSink,
1160            _param_spec: glib::ffi::gpointer,
1161            f: glib::ffi::gpointer,
1162        ) {
1163            let f: &F = &*(f as *const F);
1164            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1165        }
1166        unsafe {
1167            let f: Box<F> = Box::new(f);
1168            glib::signal::connect_raw(
1169                self.as_ptr() as *mut _,
1170                b"notify::ts-offset\0".as_ptr() as *const _,
1171                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1172                    notify_ts_offset_trampoline::<F> as *const (),
1173                )),
1174                Box::into_raw(f),
1175            )
1176        }
1177    }
1178
1179    pub fn stream(&self) -> AppSinkStream {
1180        AppSinkStream::new(self)
1181    }
1182}
1183
1184// rustdoc-stripper-ignore-next
1185/// A [builder-pattern] type to construct [`AppSink`] objects.
1186///
1187/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1188#[must_use = "The builder must be built to be used"]
1189pub struct AppSinkBuilder {
1190    builder: glib::object::ObjectBuilder<'static, AppSink>,
1191    callbacks: Option<AppSinkCallbacks>,
1192    drop_out_of_segment: Option<bool>,
1193}
1194
1195impl AppSinkBuilder {
1196    fn new() -> Self {
1197        Self {
1198            builder: glib::Object::builder(),
1199            callbacks: None,
1200            drop_out_of_segment: None,
1201        }
1202    }
1203
1204    // rustdoc-stripper-ignore-next
1205    /// Build the [`AppSink`].
1206    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1207    pub fn build(self) -> AppSink {
1208        let appsink = self.builder.build();
1209
1210        if let Some(callbacks) = self.callbacks {
1211            appsink.set_callbacks(callbacks);
1212        }
1213
1214        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1215            appsink.set_drop_out_of_segment(drop_out_of_segment);
1216        }
1217
1218        appsink
1219    }
1220
1221    pub fn async_(self, async_: bool) -> Self {
1222        Self {
1223            builder: self.builder.property("async", async_),
1224            ..self
1225        }
1226    }
1227
1228    pub fn buffer_list(self, buffer_list: bool) -> Self {
1229        Self {
1230            builder: self.builder.property("buffer-list", buffer_list),
1231            ..self
1232        }
1233    }
1234
1235    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1236        Self {
1237            callbacks: Some(callbacks),
1238            ..self
1239        }
1240    }
1241
1242    pub fn caps(self, caps: &gst::Caps) -> Self {
1243        Self {
1244            builder: self.builder.property("caps", caps),
1245            ..self
1246        }
1247    }
1248
1249    pub fn drop(self, drop: bool) -> Self {
1250        Self {
1251            builder: self.builder.property("drop", drop),
1252            ..self
1253        }
1254    }
1255
1256    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1257        Self {
1258            builder: self
1259                .builder
1260                .property("drop-out-of-segment", drop_out_of_segment),
1261            ..self
1262        }
1263    }
1264
1265    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1266        Self {
1267            builder: self
1268                .builder
1269                .property("enable-last-sample", enable_last_sample),
1270            ..self
1271        }
1272    }
1273
1274    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1275        Self {
1276            builder: self.builder.property("max-bitrate", max_bitrate),
1277            ..self
1278        }
1279    }
1280
1281    pub fn max_buffers(self, max_buffers: u32) -> Self {
1282        Self {
1283            builder: self.builder.property("max-buffers", max_buffers),
1284            ..self
1285        }
1286    }
1287
1288    pub fn max_lateness(self, max_lateness: i64) -> Self {
1289        Self {
1290            builder: self.builder.property("max-lateness", max_lateness),
1291            ..self
1292        }
1293    }
1294
1295    #[cfg(feature = "v1_16")]
1296    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1297    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1298        Self {
1299            builder: self
1300                .builder
1301                .property("processing-deadline", processing_deadline),
1302            ..self
1303        }
1304    }
1305
1306    pub fn qos(self, qos: bool) -> Self {
1307        Self {
1308            builder: self.builder.property("qos", qos),
1309            ..self
1310        }
1311    }
1312
1313    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1314        Self {
1315            builder: self.builder.property("render-delay", render_delay),
1316            ..self
1317        }
1318    }
1319
1320    pub fn sync(self, sync: bool) -> Self {
1321        Self {
1322            builder: self.builder.property("sync", sync),
1323            ..self
1324        }
1325    }
1326
1327    pub fn throttle_time(self, throttle_time: u64) -> Self {
1328        Self {
1329            builder: self.builder.property("throttle-time", throttle_time),
1330            ..self
1331        }
1332    }
1333
1334    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1335        Self {
1336            builder: self.builder.property("ts-offset", ts_offset),
1337            ..self
1338        }
1339    }
1340
1341    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1342        Self {
1343            builder: self.builder.property("wait-on-eos", wait_on_eos),
1344            ..self
1345        }
1346    }
1347
1348    #[cfg(feature = "v1_24")]
1349    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1350    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1351        Self {
1352            builder: self.builder.property("max-time", max_time),
1353            ..self
1354        }
1355    }
1356
1357    #[cfg(feature = "v1_24")]
1358    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1359    pub fn max_bytes(self, max_bytes: u64) -> Self {
1360        Self {
1361            builder: self.builder.property("max-bytes", max_bytes),
1362            ..self
1363        }
1364    }
1365
1366    pub fn name(self, name: impl Into<glib::GString>) -> Self {
1367        Self {
1368            builder: self.builder.property("name", name.into()),
1369            ..self
1370        }
1371    }
1372}
1373
1374#[derive(Debug)]
1375pub struct AppSinkStream {
1376    app_sink: glib::WeakRef<AppSink>,
1377    waker_reference: Arc<Mutex<Option<Waker>>>,
1378}
1379
1380impl AppSinkStream {
1381    fn new(app_sink: &AppSink) -> Self {
1382        skip_assert_initialized!();
1383
1384        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1385
1386        app_sink.set_callbacks(
1387            AppSinkCallbacks::builder()
1388                .new_sample({
1389                    let waker_reference = Arc::clone(&waker_reference);
1390
1391                    move |_| {
1392                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1393                            waker.wake();
1394                        }
1395
1396                        Ok(gst::FlowSuccess::Ok)
1397                    }
1398                })
1399                .eos({
1400                    let waker_reference = Arc::clone(&waker_reference);
1401
1402                    move |_| {
1403                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1404                            waker.wake();
1405                        }
1406                    }
1407                })
1408                .build(),
1409        );
1410
1411        Self {
1412            app_sink: app_sink.downgrade(),
1413            waker_reference,
1414        }
1415    }
1416}
1417
1418impl Drop for AppSinkStream {
1419    fn drop(&mut self) {
1420        #[cfg(not(feature = "v1_18"))]
1421        {
1422            // This is not thread-safe before 1.16.3, see
1423            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1424            if gst::version() >= (1, 16, 3, 0) {
1425                if let Some(app_sink) = self.app_sink.upgrade() {
1426                    app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1427                }
1428            }
1429        }
1430    }
1431}
1432
1433impl Stream for AppSinkStream {
1434    type Item = gst::Sample;
1435
1436    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1437        let mut waker = self.waker_reference.lock().unwrap();
1438
1439        let Some(app_sink) = self.app_sink.upgrade() else {
1440            return Poll::Ready(None);
1441        };
1442
1443        app_sink
1444            .try_pull_sample(gst::ClockTime::ZERO)
1445            .map(|sample| Poll::Ready(Some(sample)))
1446            .unwrap_or_else(|| {
1447                if app_sink.is_eos() {
1448                    return Poll::Ready(None);
1449                }
1450
1451                waker.replace(context.waker().to_owned());
1452
1453                Poll::Pending
1454            })
1455    }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460    use futures_util::StreamExt;
1461    use gst::prelude::*;
1462
1463    use super::*;
1464
1465    #[test]
1466    fn test_app_sink_stream() {
1467        gst::init().unwrap();
1468
1469        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1470            .property("num-buffers", 5)
1471            .build()
1472            .unwrap();
1473        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1474
1475        let pipeline = gst::Pipeline::new();
1476        pipeline.add(&videotestsrc).unwrap();
1477        pipeline.add(&appsink).unwrap();
1478
1479        videotestsrc.link(&appsink).unwrap();
1480
1481        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1482        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1483
1484        pipeline.set_state(gst::State::Playing).unwrap();
1485        let samples = futures_executor::block_on(samples_future);
1486        pipeline.set_state(gst::State::Null).unwrap();
1487
1488        assert_eq!(samples.len(), 5);
1489    }
1490}