1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate {
77 self.eos(eos)
78 } else {
79 self
80 }
81 }
82
83 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84 if let Some(eos) = eos {
85 self.eos(eos)
86 } else {
87 self
88 }
89 }
90
91 pub fn new_preroll<
92 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93 >(
94 self,
95 new_preroll: F,
96 ) -> Self {
97 Self {
98 new_preroll: Some(Box::new(new_preroll)),
99 ..self
100 }
101 }
102
103 pub fn new_preroll_if<
104 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105 >(
106 self,
107 new_preroll: F,
108 predicate: bool,
109 ) -> Self {
110 if predicate {
111 self.new_preroll(new_preroll)
112 } else {
113 self
114 }
115 }
116
117 pub fn new_preroll_if_some<
118 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119 >(
120 self,
121 new_preroll: Option<F>,
122 ) -> Self {
123 if let Some(new_preroll) = new_preroll {
124 self.new_preroll(new_preroll)
125 } else {
126 self
127 }
128 }
129
130 pub fn new_sample<
131 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132 >(
133 self,
134 new_sample: F,
135 ) -> Self {
136 Self {
137 new_sample: Some(Box::new(new_sample)),
138 ..self
139 }
140 }
141
142 pub fn new_sample_if<
143 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144 >(
145 self,
146 new_sample: F,
147 predicate: bool,
148 ) -> Self {
149 if predicate {
150 self.new_sample(new_sample)
151 } else {
152 self
153 }
154 }
155
156 pub fn new_sample_if_some<
157 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158 >(
159 self,
160 new_sample: Option<F>,
161 ) -> Self {
162 if let Some(new_sample) = new_sample {
163 self.new_sample(new_sample)
164 } else {
165 self
166 }
167 }
168
169 #[cfg(feature = "v1_20")]
170 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172 Self {
173 new_event: Some(Box::new(new_event)),
174 ..self
175 }
176 }
177
178 #[cfg(feature = "v1_20")]
179 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181 self,
182 new_event: F,
183 predicate: bool,
184 ) -> Self {
185 if predicate {
186 self.new_event(new_event)
187 } else {
188 self
189 }
190 }
191
192 #[cfg(feature = "v1_20")]
193 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195 self,
196 new_event: Option<F>,
197 ) -> Self {
198 if let Some(new_event) = new_event {
199 self.new_event(new_event)
200 } else {
201 self
202 }
203 }
204
205 #[cfg(feature = "v1_24")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207 pub fn propose_allocation<
208 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209 >(
210 self,
211 propose_allocation: F,
212 ) -> Self {
213 Self {
214 propose_allocation: Some(Box::new(propose_allocation)),
215 ..self
216 }
217 }
218
219 #[cfg(feature = "v1_24")]
220 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221 pub fn propose_allocation_if<
222 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223 >(
224 self,
225 propose_allocation: F,
226 predicate: bool,
227 ) -> Self {
228 if predicate {
229 self.propose_allocation(propose_allocation)
230 } else {
231 self
232 }
233 }
234
235 #[cfg(feature = "v1_24")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237 pub fn propose_allocation_if_some<
238 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239 >(
240 self,
241 propose_allocation: Option<F>,
242 ) -> Self {
243 if let Some(propose_allocation) = propose_allocation {
244 self.propose_allocation(propose_allocation)
245 } else {
246 self
247 }
248 }
249
250 #[must_use = "Building the callbacks without using them has no effect"]
251 pub fn build(self) -> AppSinkCallbacks {
252 let have_eos = self.eos.is_some();
253 let have_new_preroll = self.new_preroll.is_some();
254 let have_new_sample = self.new_sample.is_some();
255 let have_new_event = self.new_event.is_some();
256 let have_propose_allocation = self.propose_allocation.is_some();
257
258 AppSinkCallbacks {
259 eos: self.eos,
260 new_preroll: self.new_preroll,
261 new_sample: self.new_sample,
262 new_event: self.new_event,
263 propose_allocation: self.propose_allocation,
264 #[cfg(not(panic = "abort"))]
265 panicked: AtomicBool::new(false),
266 callbacks: ffi::GstAppSinkCallbacks {
267 eos: if have_eos { Some(trampoline_eos) } else { None },
268 new_preroll: if have_new_preroll {
269 Some(trampoline_new_preroll)
270 } else {
271 None
272 },
273 new_sample: if have_new_sample {
274 Some(trampoline_new_sample)
275 } else {
276 None
277 },
278 new_event: if have_new_event {
279 Some(trampoline_new_event)
280 } else {
281 None
282 },
283 propose_allocation: if have_propose_allocation {
284 Some(trampoline_propose_allocation)
285 } else {
286 None
287 },
288 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289 },
290 }
291 }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295 let callbacks = callbacks as *mut AppSinkCallbacks;
296 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298 #[cfg(not(panic = "abort"))]
299 if (*callbacks).panicked.load(Ordering::Relaxed) {
300 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302 return;
303 }
304
305 if let Some(ref mut eos) = (*callbacks).eos {
306 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307 match result {
308 Ok(result) => result,
309 Err(err) => {
310 #[cfg(panic = "abort")]
311 {
312 unreachable!("{err:?}");
313 }
314 #[cfg(not(panic = "abort"))]
315 {
316 (*callbacks).panicked.store(true, Ordering::Relaxed);
317 gst::subclass::post_panic_error_message(
318 element.upcast_ref(),
319 element.upcast_ref(),
320 Some(err),
321 );
322 }
323 }
324 }
325 }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329 appsink: *mut ffi::GstAppSink,
330 callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332 let callbacks = callbacks as *mut AppSinkCallbacks;
333 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335 #[cfg(not(panic = "abort"))]
336 if (*callbacks).panicked.load(Ordering::Relaxed) {
337 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339 return gst::FlowReturn::Error.into_glib();
340 }
341
342 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344 match result {
345 Ok(result) => result,
346 Err(err) => {
347 #[cfg(panic = "abort")]
348 {
349 unreachable!("{err:?}");
350 }
351 #[cfg(not(panic = "abort"))]
352 {
353 (*callbacks).panicked.store(true, Ordering::Relaxed);
354 gst::subclass::post_panic_error_message(
355 element.upcast_ref(),
356 element.upcast_ref(),
357 Some(err),
358 );
359
360 gst::FlowReturn::Error
361 }
362 }
363 }
364 } else {
365 gst::FlowReturn::Error
366 };
367
368 ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372 appsink: *mut ffi::GstAppSink,
373 callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375 let callbacks = callbacks as *mut AppSinkCallbacks;
376 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378 #[cfg(not(panic = "abort"))]
379 if (*callbacks).panicked.load(Ordering::Relaxed) {
380 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382 return gst::FlowReturn::Error.into_glib();
383 }
384
385 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387 match result {
388 Ok(result) => result,
389 Err(err) => {
390 #[cfg(panic = "abort")]
391 {
392 unreachable!("{err:?}");
393 }
394 #[cfg(not(panic = "abort"))]
395 {
396 (*callbacks).panicked.store(true, Ordering::Relaxed);
397 gst::subclass::post_panic_error_message(
398 element.upcast_ref(),
399 element.upcast_ref(),
400 Some(err),
401 );
402
403 gst::FlowReturn::Error
404 }
405 }
406 }
407 } else {
408 gst::FlowReturn::Error
409 };
410
411 ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415 appsink: *mut ffi::GstAppSink,
416 callbacks: gpointer,
417) -> glib::ffi::gboolean {
418 let callbacks = callbacks as *mut AppSinkCallbacks;
419 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421 #[cfg(not(panic = "abort"))]
422 if (*callbacks).panicked.load(Ordering::Relaxed) {
423 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425 return false.into_glib();
426 }
427
428 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430 match result {
431 Ok(result) => result,
432 Err(err) => {
433 #[cfg(panic = "abort")]
434 {
435 unreachable!("{err:?}");
436 }
437 #[cfg(not(panic = "abort"))]
438 {
439 (*callbacks).panicked.store(true, Ordering::Relaxed);
440 gst::subclass::post_panic_error_message(
441 element.upcast_ref(),
442 element.upcast_ref(),
443 Some(err),
444 );
445
446 false
447 }
448 }
449 }
450 } else {
451 false
452 };
453
454 ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458 appsink: *mut ffi::GstAppSink,
459 query: *mut gst::ffi::GstQuery,
460 callbacks: gpointer,
461) -> glib::ffi::gboolean {
462 let callbacks = callbacks as *mut AppSinkCallbacks;
463 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465 #[cfg(not(panic = "abort"))]
466 if (*callbacks).panicked.load(Ordering::Relaxed) {
467 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469 return false.into_glib();
470 }
471
472 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474 gst::QueryViewMut::Allocation(allocation) => allocation,
475 _ => unreachable!(),
476 };
477 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478 propose_allocation(&element, query)
479 }));
480 match result {
481 Ok(result) => result,
482 Err(err) => {
483 #[cfg(panic = "abort")]
484 {
485 unreachable!("{err:?}");
486 }
487 #[cfg(not(panic = "abort"))]
488 {
489 (*callbacks).panicked.store(true, Ordering::Relaxed);
490 gst::subclass::post_panic_error_message(
491 element.upcast_ref(),
492 element.upcast_ref(),
493 Some(err),
494 );
495 false
496 }
497 }
498 }
499 } else {
500 false
501 };
502
503 ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511 pub fn builder() -> AppSinkBuilder {
516 assert_initialized_main_thread!();
517 AppSinkBuilder::new()
518 }
519
520 #[doc(alias = "gst_app_sink_set_callbacks")]
534 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
535 unsafe {
536 let sink = self.to_glib_none().0;
537
538 #[cfg(not(feature = "v1_18"))]
539 {
540 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
541 std::sync::OnceLock::new();
542
543 let set_once_quark = SET_ONCE_QUARK
544 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
545
546 if gst::version() < (1, 16, 3, 0) {
549 if !glib::gobject_ffi::g_object_get_qdata(
550 sink as *mut _,
551 set_once_quark.into_glib(),
552 )
553 .is_null()
554 {
555 panic!("AppSink callbacks can only be set once");
556 }
557
558 glib::gobject_ffi::g_object_set_qdata(
559 sink as *mut _,
560 set_once_quark.into_glib(),
561 1 as *mut _,
562 );
563 }
564 }
565
566 ffi::gst_app_sink_set_callbacks(
567 sink,
568 mut_override(&callbacks.callbacks),
569 Box::into_raw(Box::new(callbacks)) as *mut _,
570 Some(destroy_callbacks),
571 );
572 }
573 }
574
575 #[doc(alias = "drop-out-of-segment")]
576 pub fn drops_out_of_segment(&self) -> bool {
577 unsafe {
578 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
579 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
580 ))
581 }
582 }
583
584 #[doc(alias = "max-bitrate")]
585 #[doc(alias = "gst_base_sink_get_max_bitrate")]
586 pub fn max_bitrate(&self) -> u64 {
587 unsafe {
588 gst_base::ffi::gst_base_sink_get_max_bitrate(
589 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
590 )
591 }
592 }
593
594 #[doc(alias = "max-lateness")]
595 #[doc(alias = "gst_base_sink_get_max_lateness")]
596 pub fn max_lateness(&self) -> i64 {
597 unsafe {
598 gst_base::ffi::gst_base_sink_get_max_lateness(
599 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
600 )
601 }
602 }
603
604 #[doc(alias = "processing-deadline")]
605 #[cfg(feature = "v1_16")]
606 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
607 #[doc(alias = "gst_base_sink_get_processing_deadline")]
608 pub fn processing_deadline(&self) -> gst::ClockTime {
609 unsafe {
610 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
611 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
612 ))
613 .expect("undefined processing_deadline")
614 }
615 }
616
617 #[doc(alias = "render-delay")]
618 #[doc(alias = "gst_base_sink_get_render_delay")]
619 pub fn render_delay(&self) -> gst::ClockTime {
620 unsafe {
621 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
622 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
623 ))
624 .expect("undefined render_delay")
625 }
626 }
627
628 #[cfg(feature = "v1_18")]
629 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
630 #[doc(alias = "gst_base_sink_get_stats")]
631 pub fn stats(&self) -> gst::Structure {
632 unsafe {
633 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
634 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
635 ))
636 }
637 }
638
639 #[doc(alias = "sync")]
640 pub fn is_sync(&self) -> bool {
641 unsafe {
642 from_glib(gst_base::ffi::gst_base_sink_get_sync(
643 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
644 ))
645 }
646 }
647
648 #[doc(alias = "throttle-time")]
649 #[doc(alias = "gst_base_sink_get_throttle_time")]
650 pub fn throttle_time(&self) -> u64 {
651 unsafe {
652 gst_base::ffi::gst_base_sink_get_throttle_time(
653 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
654 )
655 }
656 }
657
658 #[doc(alias = "ts-offset")]
659 #[doc(alias = "gst_base_sink_get_ts_offset")]
660 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
661 unsafe {
662 gst_base::ffi::gst_base_sink_get_ts_offset(
663 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
664 )
665 }
666 }
667
668 #[doc(alias = "async")]
669 #[doc(alias = "gst_base_sink_is_async_enabled")]
670 pub fn is_async(&self) -> bool {
671 unsafe {
672 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
673 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
674 ))
675 }
676 }
677
678 #[doc(alias = "last-sample")]
679 pub fn enables_last_sample(&self) -> bool {
680 unsafe {
681 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
682 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
683 ))
684 }
685 }
686
687 #[doc(alias = "qos")]
688 #[doc(alias = "gst_base_sink_is_qos_enabled")]
689 pub fn is_qos(&self) -> bool {
690 unsafe {
691 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
692 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
693 ))
694 }
695 }
696
697 #[doc(alias = "async")]
698 #[doc(alias = "gst_base_sink_set_async_enabled")]
699 pub fn set_async(&self, enabled: bool) {
700 unsafe {
701 gst_base::ffi::gst_base_sink_set_async_enabled(
702 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
703 enabled.into_glib(),
704 );
705 }
706 }
707
708 #[doc(alias = "drop-out-of-segment")]
709 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
710 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
711 unsafe {
712 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
713 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
714 drop_out_of_segment.into_glib(),
715 );
716 }
717 }
718
719 #[doc(alias = "last-sample")]
720 pub fn set_enable_last_sample(&self, enabled: bool) {
721 unsafe {
722 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
723 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
724 enabled.into_glib(),
725 );
726 }
727 }
728
729 #[doc(alias = "max-bitrate")]
730 #[doc(alias = "gst_base_sink_set_max_bitrate")]
731 pub fn set_max_bitrate(&self, max_bitrate: u64) {
732 unsafe {
733 gst_base::ffi::gst_base_sink_set_max_bitrate(
734 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
735 max_bitrate,
736 );
737 }
738 }
739
740 #[doc(alias = "max-lateness")]
741 #[doc(alias = "gst_base_sink_set_max_lateness")]
742 pub fn set_max_lateness(&self, max_lateness: i64) {
743 unsafe {
744 gst_base::ffi::gst_base_sink_set_max_lateness(
745 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746 max_lateness,
747 );
748 }
749 }
750
751 #[doc(alias = "processing-deadline")]
752 #[cfg(feature = "v1_16")]
753 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
754 #[doc(alias = "gst_base_sink_set_processing_deadline")]
755 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
756 unsafe {
757 gst_base::ffi::gst_base_sink_set_processing_deadline(
758 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
759 processing_deadline.into_glib(),
760 );
761 }
762 }
763
764 #[doc(alias = "qos")]
765 #[doc(alias = "gst_base_sink_set_qos_enabled")]
766 pub fn set_qos(&self, enabled: bool) {
767 unsafe {
768 gst_base::ffi::gst_base_sink_set_qos_enabled(
769 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
770 enabled.into_glib(),
771 );
772 }
773 }
774
775 #[doc(alias = "render-delay")]
776 #[doc(alias = "gst_base_sink_set_render_delay")]
777 pub fn set_render_delay(&self, delay: gst::ClockTime) {
778 unsafe {
779 gst_base::ffi::gst_base_sink_set_render_delay(
780 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
781 delay.into_glib(),
782 );
783 }
784 }
785
786 #[doc(alias = "sync")]
787 #[doc(alias = "gst_base_sink_set_sync")]
788 pub fn set_sync(&self, sync: bool) {
789 unsafe {
790 gst_base::ffi::gst_base_sink_set_sync(
791 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
792 sync.into_glib(),
793 );
794 }
795 }
796
797 #[doc(alias = "throttle-time")]
798 #[doc(alias = "gst_base_sink_set_throttle_time")]
799 pub fn set_throttle_time(&self, throttle: u64) {
800 unsafe {
801 gst_base::ffi::gst_base_sink_set_throttle_time(
802 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
803 throttle,
804 );
805 }
806 }
807
808 #[doc(alias = "ts-offset")]
809 #[doc(alias = "gst_base_sink_set_ts_offset")]
810 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
811 unsafe {
812 gst_base::ffi::gst_base_sink_set_ts_offset(
813 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
814 offset,
815 );
816 }
817 }
818
819 #[doc(alias = "async")]
820 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
821 &self,
822 f: F,
823 ) -> glib::SignalHandlerId {
824 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
825 this: *mut ffi::GstAppSink,
826 _param_spec: glib::ffi::gpointer,
827 f: glib::ffi::gpointer,
828 ) {
829 let f: &F = &*(f as *const F);
830 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
831 }
832 unsafe {
833 let f: Box<F> = Box::new(f);
834 glib::signal::connect_raw(
835 self.as_ptr() as *mut _,
836 b"notify::async\0".as_ptr() as *const _,
837 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
838 notify_async_trampoline::<F> as *const (),
839 )),
840 Box::into_raw(f),
841 )
842 }
843 }
844
845 #[doc(alias = "blocksize")]
846 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
847 &self,
848 f: F,
849 ) -> glib::SignalHandlerId {
850 unsafe extern "C" fn notify_blocksize_trampoline<
851 F: Fn(&AppSink) + Send + Sync + 'static,
852 >(
853 this: *mut ffi::GstAppSink,
854 _param_spec: glib::ffi::gpointer,
855 f: glib::ffi::gpointer,
856 ) {
857 let f: &F = &*(f as *const F);
858 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
859 }
860 unsafe {
861 let f: Box<F> = Box::new(f);
862 glib::signal::connect_raw(
863 self.as_ptr() as *mut _,
864 b"notify::blocksize\0".as_ptr() as *const _,
865 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
866 notify_blocksize_trampoline::<F> as *const (),
867 )),
868 Box::into_raw(f),
869 )
870 }
871 }
872
873 #[doc(alias = "enable-last-sample")]
874 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
875 &self,
876 f: F,
877 ) -> glib::SignalHandlerId {
878 unsafe extern "C" fn notify_enable_last_sample_trampoline<
879 F: Fn(&AppSink) + Send + Sync + 'static,
880 >(
881 this: *mut ffi::GstAppSink,
882 _param_spec: glib::ffi::gpointer,
883 f: glib::ffi::gpointer,
884 ) {
885 let f: &F = &*(f as *const F);
886 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
887 }
888 unsafe {
889 let f: Box<F> = Box::new(f);
890 glib::signal::connect_raw(
891 self.as_ptr() as *mut _,
892 b"notify::enable-last-sample\0".as_ptr() as *const _,
893 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
894 notify_enable_last_sample_trampoline::<F> as *const (),
895 )),
896 Box::into_raw(f),
897 )
898 }
899 }
900
901 #[doc(alias = "last-sample")]
902 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
903 &self,
904 f: F,
905 ) -> glib::SignalHandlerId {
906 unsafe extern "C" fn notify_last_sample_trampoline<
907 F: Fn(&AppSink) + Send + Sync + 'static,
908 >(
909 this: *mut ffi::GstAppSink,
910 _param_spec: glib::ffi::gpointer,
911 f: glib::ffi::gpointer,
912 ) {
913 let f: &F = &*(f as *const F);
914 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
915 }
916 unsafe {
917 let f: Box<F> = Box::new(f);
918 glib::signal::connect_raw(
919 self.as_ptr() as *mut _,
920 b"notify::last-sample\0".as_ptr() as *const _,
921 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
922 notify_last_sample_trampoline::<F> as *const (),
923 )),
924 Box::into_raw(f),
925 )
926 }
927 }
928
929 #[doc(alias = "max-bitrate")]
930 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
931 &self,
932 f: F,
933 ) -> glib::SignalHandlerId {
934 unsafe extern "C" fn notify_max_bitrate_trampoline<
935 F: Fn(&AppSink) + Send + Sync + 'static,
936 >(
937 this: *mut ffi::GstAppSink,
938 _param_spec: glib::ffi::gpointer,
939 f: glib::ffi::gpointer,
940 ) {
941 let f: &F = &*(f as *const F);
942 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
943 }
944 unsafe {
945 let f: Box<F> = Box::new(f);
946 glib::signal::connect_raw(
947 self.as_ptr() as *mut _,
948 b"notify::max-bitrate\0".as_ptr() as *const _,
949 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
950 notify_max_bitrate_trampoline::<F> as *const (),
951 )),
952 Box::into_raw(f),
953 )
954 }
955 }
956
957 #[doc(alias = "max-lateness")]
958 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
959 &self,
960 f: F,
961 ) -> glib::SignalHandlerId {
962 unsafe extern "C" fn notify_max_lateness_trampoline<
963 F: Fn(&AppSink) + Send + Sync + 'static,
964 >(
965 this: *mut ffi::GstAppSink,
966 _param_spec: glib::ffi::gpointer,
967 f: glib::ffi::gpointer,
968 ) {
969 let f: &F = &*(f as *const F);
970 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
971 }
972 unsafe {
973 let f: Box<F> = Box::new(f);
974 glib::signal::connect_raw(
975 self.as_ptr() as *mut _,
976 b"notify::max-lateness\0".as_ptr() as *const _,
977 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
978 notify_max_lateness_trampoline::<F> as *const (),
979 )),
980 Box::into_raw(f),
981 )
982 }
983 }
984
985 #[cfg(feature = "v1_16")]
986 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
987 #[doc(alias = "processing-deadline")]
988 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
989 &self,
990 f: F,
991 ) -> glib::SignalHandlerId {
992 unsafe extern "C" fn notify_processing_deadline_trampoline<
993 F: Fn(&AppSink) + Send + Sync + 'static,
994 >(
995 this: *mut ffi::GstAppSink,
996 _param_spec: glib::ffi::gpointer,
997 f: glib::ffi::gpointer,
998 ) {
999 let f: &F = &*(f as *const F);
1000 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1001 }
1002 unsafe {
1003 let f: Box<F> = Box::new(f);
1004 glib::signal::connect_raw(
1005 self.as_ptr() as *mut _,
1006 b"notify::processing-deadline\0".as_ptr() as *const _,
1007 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1008 notify_processing_deadline_trampoline::<F> as *const (),
1009 )),
1010 Box::into_raw(f),
1011 )
1012 }
1013 }
1014
1015 #[doc(alias = "qos")]
1016 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1017 &self,
1018 f: F,
1019 ) -> glib::SignalHandlerId {
1020 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1021 this: *mut ffi::GstAppSink,
1022 _param_spec: glib::ffi::gpointer,
1023 f: glib::ffi::gpointer,
1024 ) {
1025 let f: &F = &*(f as *const F);
1026 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1027 }
1028 unsafe {
1029 let f: Box<F> = Box::new(f);
1030 glib::signal::connect_raw(
1031 self.as_ptr() as *mut _,
1032 b"notify::qos\0".as_ptr() as *const _,
1033 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1034 notify_qos_trampoline::<F> as *const (),
1035 )),
1036 Box::into_raw(f),
1037 )
1038 }
1039 }
1040
1041 #[doc(alias = "render-delay")]
1042 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1043 &self,
1044 f: F,
1045 ) -> glib::SignalHandlerId {
1046 unsafe extern "C" fn notify_render_delay_trampoline<
1047 F: Fn(&AppSink) + Send + Sync + 'static,
1048 >(
1049 this: *mut ffi::GstAppSink,
1050 _param_spec: glib::ffi::gpointer,
1051 f: glib::ffi::gpointer,
1052 ) {
1053 let f: &F = &*(f as *const F);
1054 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1055 }
1056 unsafe {
1057 let f: Box<F> = Box::new(f);
1058 glib::signal::connect_raw(
1059 self.as_ptr() as *mut _,
1060 b"notify::render-delay\0".as_ptr() as *const _,
1061 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1062 notify_render_delay_trampoline::<F> as *const (),
1063 )),
1064 Box::into_raw(f),
1065 )
1066 }
1067 }
1068
1069 #[cfg(feature = "v1_18")]
1070 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1071 #[doc(alias = "stats")]
1072 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1073 &self,
1074 f: F,
1075 ) -> glib::SignalHandlerId {
1076 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1077 this: *mut ffi::GstAppSink,
1078 _param_spec: glib::ffi::gpointer,
1079 f: glib::ffi::gpointer,
1080 ) {
1081 let f: &F = &*(f as *const F);
1082 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1083 }
1084 unsafe {
1085 let f: Box<F> = Box::new(f);
1086 glib::signal::connect_raw(
1087 self.as_ptr() as *mut _,
1088 b"notify::stats\0".as_ptr() as *const _,
1089 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1090 notify_stats_trampoline::<F> as *const (),
1091 )),
1092 Box::into_raw(f),
1093 )
1094 }
1095 }
1096
1097 #[doc(alias = "sync")]
1098 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1099 &self,
1100 f: F,
1101 ) -> glib::SignalHandlerId {
1102 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1103 this: *mut ffi::GstAppSink,
1104 _param_spec: glib::ffi::gpointer,
1105 f: glib::ffi::gpointer,
1106 ) {
1107 let f: &F = &*(f as *const F);
1108 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1109 }
1110 unsafe {
1111 let f: Box<F> = Box::new(f);
1112 glib::signal::connect_raw(
1113 self.as_ptr() as *mut _,
1114 b"notify::sync\0".as_ptr() as *const _,
1115 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1116 notify_sync_trampoline::<F> as *const (),
1117 )),
1118 Box::into_raw(f),
1119 )
1120 }
1121 }
1122
1123 #[doc(alias = "throttle-time")]
1124 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1125 &self,
1126 f: F,
1127 ) -> glib::SignalHandlerId {
1128 unsafe extern "C" fn notify_throttle_time_trampoline<
1129 F: Fn(&AppSink) + Send + Sync + 'static,
1130 >(
1131 this: *mut ffi::GstAppSink,
1132 _param_spec: glib::ffi::gpointer,
1133 f: glib::ffi::gpointer,
1134 ) {
1135 let f: &F = &*(f as *const F);
1136 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1137 }
1138 unsafe {
1139 let f: Box<F> = Box::new(f);
1140 glib::signal::connect_raw(
1141 self.as_ptr() as *mut _,
1142 b"notify::throttle-time\0".as_ptr() as *const _,
1143 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1144 notify_throttle_time_trampoline::<F> as *const (),
1145 )),
1146 Box::into_raw(f),
1147 )
1148 }
1149 }
1150
1151 #[doc(alias = "ts-offset")]
1152 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1153 &self,
1154 f: F,
1155 ) -> glib::SignalHandlerId {
1156 unsafe extern "C" fn notify_ts_offset_trampoline<
1157 F: Fn(&AppSink) + Send + Sync + 'static,
1158 >(
1159 this: *mut ffi::GstAppSink,
1160 _param_spec: glib::ffi::gpointer,
1161 f: glib::ffi::gpointer,
1162 ) {
1163 let f: &F = &*(f as *const F);
1164 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1165 }
1166 unsafe {
1167 let f: Box<F> = Box::new(f);
1168 glib::signal::connect_raw(
1169 self.as_ptr() as *mut _,
1170 b"notify::ts-offset\0".as_ptr() as *const _,
1171 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1172 notify_ts_offset_trampoline::<F> as *const (),
1173 )),
1174 Box::into_raw(f),
1175 )
1176 }
1177 }
1178
1179 pub fn stream(&self) -> AppSinkStream {
1180 AppSinkStream::new(self)
1181 }
1182}
1183
1184#[must_use = "The builder must be built to be used"]
1189pub struct AppSinkBuilder {
1190 builder: glib::object::ObjectBuilder<'static, AppSink>,
1191 callbacks: Option<AppSinkCallbacks>,
1192 drop_out_of_segment: Option<bool>,
1193}
1194
1195impl AppSinkBuilder {
1196 fn new() -> Self {
1197 Self {
1198 builder: glib::Object::builder(),
1199 callbacks: None,
1200 drop_out_of_segment: None,
1201 }
1202 }
1203
1204 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1207 pub fn build(self) -> AppSink {
1208 let appsink = self.builder.build();
1209
1210 if let Some(callbacks) = self.callbacks {
1211 appsink.set_callbacks(callbacks);
1212 }
1213
1214 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1215 appsink.set_drop_out_of_segment(drop_out_of_segment);
1216 }
1217
1218 appsink
1219 }
1220
1221 pub fn async_(self, async_: bool) -> Self {
1222 Self {
1223 builder: self.builder.property("async", async_),
1224 ..self
1225 }
1226 }
1227
1228 pub fn buffer_list(self, buffer_list: bool) -> Self {
1229 Self {
1230 builder: self.builder.property("buffer-list", buffer_list),
1231 ..self
1232 }
1233 }
1234
1235 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1236 Self {
1237 callbacks: Some(callbacks),
1238 ..self
1239 }
1240 }
1241
1242 pub fn caps(self, caps: &gst::Caps) -> Self {
1243 Self {
1244 builder: self.builder.property("caps", caps),
1245 ..self
1246 }
1247 }
1248
1249 pub fn drop(self, drop: bool) -> Self {
1250 Self {
1251 builder: self.builder.property("drop", drop),
1252 ..self
1253 }
1254 }
1255
1256 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1257 Self {
1258 builder: self
1259 .builder
1260 .property("drop-out-of-segment", drop_out_of_segment),
1261 ..self
1262 }
1263 }
1264
1265 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1266 Self {
1267 builder: self
1268 .builder
1269 .property("enable-last-sample", enable_last_sample),
1270 ..self
1271 }
1272 }
1273
1274 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1275 Self {
1276 builder: self.builder.property("max-bitrate", max_bitrate),
1277 ..self
1278 }
1279 }
1280
1281 pub fn max_buffers(self, max_buffers: u32) -> Self {
1282 Self {
1283 builder: self.builder.property("max-buffers", max_buffers),
1284 ..self
1285 }
1286 }
1287
1288 pub fn max_lateness(self, max_lateness: i64) -> Self {
1289 Self {
1290 builder: self.builder.property("max-lateness", max_lateness),
1291 ..self
1292 }
1293 }
1294
1295 #[cfg(feature = "v1_16")]
1296 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1297 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1298 Self {
1299 builder: self
1300 .builder
1301 .property("processing-deadline", processing_deadline),
1302 ..self
1303 }
1304 }
1305
1306 pub fn qos(self, qos: bool) -> Self {
1307 Self {
1308 builder: self.builder.property("qos", qos),
1309 ..self
1310 }
1311 }
1312
1313 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1314 Self {
1315 builder: self.builder.property("render-delay", render_delay),
1316 ..self
1317 }
1318 }
1319
1320 pub fn sync(self, sync: bool) -> Self {
1321 Self {
1322 builder: self.builder.property("sync", sync),
1323 ..self
1324 }
1325 }
1326
1327 pub fn throttle_time(self, throttle_time: u64) -> Self {
1328 Self {
1329 builder: self.builder.property("throttle-time", throttle_time),
1330 ..self
1331 }
1332 }
1333
1334 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1335 Self {
1336 builder: self.builder.property("ts-offset", ts_offset),
1337 ..self
1338 }
1339 }
1340
1341 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1342 Self {
1343 builder: self.builder.property("wait-on-eos", wait_on_eos),
1344 ..self
1345 }
1346 }
1347
1348 #[cfg(feature = "v1_24")]
1349 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1350 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1351 Self {
1352 builder: self.builder.property("max-time", max_time),
1353 ..self
1354 }
1355 }
1356
1357 #[cfg(feature = "v1_24")]
1358 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1359 pub fn max_bytes(self, max_bytes: u64) -> Self {
1360 Self {
1361 builder: self.builder.property("max-bytes", max_bytes),
1362 ..self
1363 }
1364 }
1365
1366 pub fn name(self, name: impl Into<glib::GString>) -> Self {
1367 Self {
1368 builder: self.builder.property("name", name.into()),
1369 ..self
1370 }
1371 }
1372}
1373
1374#[derive(Debug)]
1375pub struct AppSinkStream {
1376 app_sink: glib::WeakRef<AppSink>,
1377 waker_reference: Arc<Mutex<Option<Waker>>>,
1378}
1379
1380impl AppSinkStream {
1381 fn new(app_sink: &AppSink) -> Self {
1382 skip_assert_initialized!();
1383
1384 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1385
1386 app_sink.set_callbacks(
1387 AppSinkCallbacks::builder()
1388 .new_sample({
1389 let waker_reference = Arc::clone(&waker_reference);
1390
1391 move |_| {
1392 if let Some(waker) = waker_reference.lock().unwrap().take() {
1393 waker.wake();
1394 }
1395
1396 Ok(gst::FlowSuccess::Ok)
1397 }
1398 })
1399 .eos({
1400 let waker_reference = Arc::clone(&waker_reference);
1401
1402 move |_| {
1403 if let Some(waker) = waker_reference.lock().unwrap().take() {
1404 waker.wake();
1405 }
1406 }
1407 })
1408 .build(),
1409 );
1410
1411 Self {
1412 app_sink: app_sink.downgrade(),
1413 waker_reference,
1414 }
1415 }
1416}
1417
1418impl Drop for AppSinkStream {
1419 fn drop(&mut self) {
1420 #[cfg(not(feature = "v1_18"))]
1421 {
1422 if gst::version() >= (1, 16, 3, 0) {
1425 if let Some(app_sink) = self.app_sink.upgrade() {
1426 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1427 }
1428 }
1429 }
1430 }
1431}
1432
1433impl Stream for AppSinkStream {
1434 type Item = gst::Sample;
1435
1436 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1437 let mut waker = self.waker_reference.lock().unwrap();
1438
1439 let Some(app_sink) = self.app_sink.upgrade() else {
1440 return Poll::Ready(None);
1441 };
1442
1443 app_sink
1444 .try_pull_sample(gst::ClockTime::ZERO)
1445 .map(|sample| Poll::Ready(Some(sample)))
1446 .unwrap_or_else(|| {
1447 if app_sink.is_eos() {
1448 return Poll::Ready(None);
1449 }
1450
1451 waker.replace(context.waker().to_owned());
1452
1453 Poll::Pending
1454 })
1455 }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460 use futures_util::StreamExt;
1461 use gst::prelude::*;
1462
1463 use super::*;
1464
1465 #[test]
1466 fn test_app_sink_stream() {
1467 gst::init().unwrap();
1468
1469 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1470 .property("num-buffers", 5)
1471 .build()
1472 .unwrap();
1473 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1474
1475 let pipeline = gst::Pipeline::new();
1476 pipeline.add(&videotestsrc).unwrap();
1477 pipeline.add(&appsink).unwrap();
1478
1479 videotestsrc.link(&appsink).unwrap();
1480
1481 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1482 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1483
1484 pipeline.set_state(gst::State::Playing).unwrap();
1485 let samples = futures_executor::block_on(samples_future);
1486 pipeline.set_state(gst::State::Null).unwrap();
1487
1488 assert_eq!(samples.len(), 5);
1489 }
1490}