1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate {
77 self.eos(eos)
78 } else {
79 self
80 }
81 }
82
83 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84 if let Some(eos) = eos {
85 self.eos(eos)
86 } else {
87 self
88 }
89 }
90
91 pub fn new_preroll<
92 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93 >(
94 self,
95 new_preroll: F,
96 ) -> Self {
97 Self {
98 new_preroll: Some(Box::new(new_preroll)),
99 ..self
100 }
101 }
102
103 pub fn new_preroll_if<
104 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105 >(
106 self,
107 new_preroll: F,
108 predicate: bool,
109 ) -> Self {
110 if predicate {
111 self.new_preroll(new_preroll)
112 } else {
113 self
114 }
115 }
116
117 pub fn new_preroll_if_some<
118 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119 >(
120 self,
121 new_preroll: Option<F>,
122 ) -> Self {
123 if let Some(new_preroll) = new_preroll {
124 self.new_preroll(new_preroll)
125 } else {
126 self
127 }
128 }
129
130 pub fn new_sample<
131 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132 >(
133 self,
134 new_sample: F,
135 ) -> Self {
136 Self {
137 new_sample: Some(Box::new(new_sample)),
138 ..self
139 }
140 }
141
142 pub fn new_sample_if<
143 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144 >(
145 self,
146 new_sample: F,
147 predicate: bool,
148 ) -> Self {
149 if predicate {
150 self.new_sample(new_sample)
151 } else {
152 self
153 }
154 }
155
156 pub fn new_sample_if_some<
157 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158 >(
159 self,
160 new_sample: Option<F>,
161 ) -> Self {
162 if let Some(new_sample) = new_sample {
163 self.new_sample(new_sample)
164 } else {
165 self
166 }
167 }
168
169 #[cfg(feature = "v1_20")]
170 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172 Self {
173 new_event: Some(Box::new(new_event)),
174 ..self
175 }
176 }
177
178 #[cfg(feature = "v1_20")]
179 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181 self,
182 new_event: F,
183 predicate: bool,
184 ) -> Self {
185 if predicate {
186 self.new_event(new_event)
187 } else {
188 self
189 }
190 }
191
192 #[cfg(feature = "v1_20")]
193 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195 self,
196 new_event: Option<F>,
197 ) -> Self {
198 if let Some(new_event) = new_event {
199 self.new_event(new_event)
200 } else {
201 self
202 }
203 }
204
205 #[cfg(feature = "v1_24")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207 pub fn propose_allocation<
208 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209 >(
210 self,
211 propose_allocation: F,
212 ) -> Self {
213 Self {
214 propose_allocation: Some(Box::new(propose_allocation)),
215 ..self
216 }
217 }
218
219 #[cfg(feature = "v1_24")]
220 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221 pub fn propose_allocation_if<
222 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223 >(
224 self,
225 propose_allocation: F,
226 predicate: bool,
227 ) -> Self {
228 if predicate {
229 self.propose_allocation(propose_allocation)
230 } else {
231 self
232 }
233 }
234
235 #[cfg(feature = "v1_24")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237 pub fn propose_allocation_if_some<
238 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239 >(
240 self,
241 propose_allocation: Option<F>,
242 ) -> Self {
243 if let Some(propose_allocation) = propose_allocation {
244 self.propose_allocation(propose_allocation)
245 } else {
246 self
247 }
248 }
249
250 #[must_use = "Building the callbacks without using them has no effect"]
251 pub fn build(self) -> AppSinkCallbacks {
252 let have_eos = self.eos.is_some();
253 let have_new_preroll = self.new_preroll.is_some();
254 let have_new_sample = self.new_sample.is_some();
255 let have_new_event = self.new_event.is_some();
256 let have_propose_allocation = self.propose_allocation.is_some();
257
258 AppSinkCallbacks {
259 eos: self.eos,
260 new_preroll: self.new_preroll,
261 new_sample: self.new_sample,
262 new_event: self.new_event,
263 propose_allocation: self.propose_allocation,
264 #[cfg(not(panic = "abort"))]
265 panicked: AtomicBool::new(false),
266 callbacks: ffi::GstAppSinkCallbacks {
267 eos: if have_eos { Some(trampoline_eos) } else { None },
268 new_preroll: if have_new_preroll {
269 Some(trampoline_new_preroll)
270 } else {
271 None
272 },
273 new_sample: if have_new_sample {
274 Some(trampoline_new_sample)
275 } else {
276 None
277 },
278 new_event: if have_new_event {
279 Some(trampoline_new_event)
280 } else {
281 None
282 },
283 propose_allocation: if have_propose_allocation {
284 Some(trampoline_propose_allocation)
285 } else {
286 None
287 },
288 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289 },
290 }
291 }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295 let callbacks = callbacks as *mut AppSinkCallbacks;
296 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298 #[cfg(not(panic = "abort"))]
299 if (*callbacks).panicked.load(Ordering::Relaxed) {
300 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302 return;
303 }
304
305 if let Some(ref mut eos) = (*callbacks).eos {
306 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307 match result {
308 Ok(result) => result,
309 Err(err) => {
310 #[cfg(panic = "abort")]
311 {
312 unreachable!("{err:?}");
313 }
314 #[cfg(not(panic = "abort"))]
315 {
316 (*callbacks).panicked.store(true, Ordering::Relaxed);
317 gst::subclass::post_panic_error_message(
318 element.upcast_ref(),
319 element.upcast_ref(),
320 Some(err),
321 );
322 }
323 }
324 }
325 }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329 appsink: *mut ffi::GstAppSink,
330 callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332 let callbacks = callbacks as *mut AppSinkCallbacks;
333 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335 #[cfg(not(panic = "abort"))]
336 if (*callbacks).panicked.load(Ordering::Relaxed) {
337 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339 return gst::FlowReturn::Error.into_glib();
340 }
341
342 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344 match result {
345 Ok(result) => result,
346 Err(err) => {
347 #[cfg(panic = "abort")]
348 {
349 unreachable!("{err:?}");
350 }
351 #[cfg(not(panic = "abort"))]
352 {
353 (*callbacks).panicked.store(true, Ordering::Relaxed);
354 gst::subclass::post_panic_error_message(
355 element.upcast_ref(),
356 element.upcast_ref(),
357 Some(err),
358 );
359
360 gst::FlowReturn::Error
361 }
362 }
363 }
364 } else {
365 gst::FlowReturn::Error
366 };
367
368 ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372 appsink: *mut ffi::GstAppSink,
373 callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375 let callbacks = callbacks as *mut AppSinkCallbacks;
376 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378 #[cfg(not(panic = "abort"))]
379 if (*callbacks).panicked.load(Ordering::Relaxed) {
380 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382 return gst::FlowReturn::Error.into_glib();
383 }
384
385 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387 match result {
388 Ok(result) => result,
389 Err(err) => {
390 #[cfg(panic = "abort")]
391 {
392 unreachable!("{err:?}");
393 }
394 #[cfg(not(panic = "abort"))]
395 {
396 (*callbacks).panicked.store(true, Ordering::Relaxed);
397 gst::subclass::post_panic_error_message(
398 element.upcast_ref(),
399 element.upcast_ref(),
400 Some(err),
401 );
402
403 gst::FlowReturn::Error
404 }
405 }
406 }
407 } else {
408 gst::FlowReturn::Error
409 };
410
411 ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415 appsink: *mut ffi::GstAppSink,
416 callbacks: gpointer,
417) -> glib::ffi::gboolean {
418 let callbacks = callbacks as *mut AppSinkCallbacks;
419 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421 #[cfg(not(panic = "abort"))]
422 if (*callbacks).panicked.load(Ordering::Relaxed) {
423 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425 return false.into_glib();
426 }
427
428 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430 match result {
431 Ok(result) => result,
432 Err(err) => {
433 #[cfg(panic = "abort")]
434 {
435 unreachable!("{err:?}");
436 }
437 #[cfg(not(panic = "abort"))]
438 {
439 (*callbacks).panicked.store(true, Ordering::Relaxed);
440 gst::subclass::post_panic_error_message(
441 element.upcast_ref(),
442 element.upcast_ref(),
443 Some(err),
444 );
445
446 false
447 }
448 }
449 }
450 } else {
451 false
452 };
453
454 ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458 appsink: *mut ffi::GstAppSink,
459 query: *mut gst::ffi::GstQuery,
460 callbacks: gpointer,
461) -> glib::ffi::gboolean {
462 let callbacks = callbacks as *mut AppSinkCallbacks;
463 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465 #[cfg(not(panic = "abort"))]
466 if (*callbacks).panicked.load(Ordering::Relaxed) {
467 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469 return false.into_glib();
470 }
471
472 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474 gst::QueryViewMut::Allocation(allocation) => allocation,
475 _ => unreachable!(),
476 };
477 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478 propose_allocation(&element, query)
479 }));
480 match result {
481 Ok(result) => result,
482 Err(err) => {
483 #[cfg(panic = "abort")]
484 {
485 unreachable!("{err:?}");
486 }
487 #[cfg(not(panic = "abort"))]
488 {
489 (*callbacks).panicked.store(true, Ordering::Relaxed);
490 gst::subclass::post_panic_error_message(
491 element.upcast_ref(),
492 element.upcast_ref(),
493 Some(err),
494 );
495 false
496 }
497 }
498 }
499 } else {
500 false
501 };
502
503 ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511 pub fn builder<'a>() -> AppSinkBuilder<'a> {
516 assert_initialized_main_thread!();
517 AppSinkBuilder {
518 builder: gst::Object::builder(),
519 callbacks: None,
520 drop_out_of_segment: None,
521 }
522 }
523
524 #[doc(alias = "gst_app_sink_set_callbacks")]
538 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
539 unsafe {
540 let sink = self.to_glib_none().0;
541
542 #[cfg(not(feature = "v1_18"))]
543 {
544 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
545 std::sync::OnceLock::new();
546
547 let set_once_quark = SET_ONCE_QUARK
548 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
549
550 if gst::version() < (1, 16, 3, 0) {
553 if !glib::gobject_ffi::g_object_get_qdata(
554 sink as *mut _,
555 set_once_quark.into_glib(),
556 )
557 .is_null()
558 {
559 panic!("AppSink callbacks can only be set once");
560 }
561
562 glib::gobject_ffi::g_object_set_qdata(
563 sink as *mut _,
564 set_once_quark.into_glib(),
565 1 as *mut _,
566 );
567 }
568 }
569
570 ffi::gst_app_sink_set_callbacks(
571 sink,
572 mut_override(&callbacks.callbacks),
573 Box::into_raw(Box::new(callbacks)) as *mut _,
574 Some(destroy_callbacks),
575 );
576 }
577 }
578
579 #[doc(alias = "drop-out-of-segment")]
580 pub fn drops_out_of_segment(&self) -> bool {
581 unsafe {
582 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
583 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
584 ))
585 }
586 }
587
588 #[doc(alias = "max-bitrate")]
589 #[doc(alias = "gst_base_sink_get_max_bitrate")]
590 pub fn max_bitrate(&self) -> u64 {
591 unsafe {
592 gst_base::ffi::gst_base_sink_get_max_bitrate(
593 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
594 )
595 }
596 }
597
598 #[doc(alias = "max-lateness")]
599 #[doc(alias = "gst_base_sink_get_max_lateness")]
600 pub fn max_lateness(&self) -> i64 {
601 unsafe {
602 gst_base::ffi::gst_base_sink_get_max_lateness(
603 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
604 )
605 }
606 }
607
608 #[doc(alias = "processing-deadline")]
609 #[cfg(feature = "v1_16")]
610 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
611 #[doc(alias = "gst_base_sink_get_processing_deadline")]
612 pub fn processing_deadline(&self) -> gst::ClockTime {
613 unsafe {
614 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
615 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
616 ))
617 .expect("undefined processing_deadline")
618 }
619 }
620
621 #[doc(alias = "render-delay")]
622 #[doc(alias = "gst_base_sink_get_render_delay")]
623 pub fn render_delay(&self) -> gst::ClockTime {
624 unsafe {
625 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
626 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
627 ))
628 .expect("undefined render_delay")
629 }
630 }
631
632 #[cfg(feature = "v1_18")]
633 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
634 #[doc(alias = "gst_base_sink_get_stats")]
635 pub fn stats(&self) -> gst::Structure {
636 unsafe {
637 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
638 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
639 ))
640 }
641 }
642
643 #[doc(alias = "sync")]
644 pub fn is_sync(&self) -> bool {
645 unsafe {
646 from_glib(gst_base::ffi::gst_base_sink_get_sync(
647 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
648 ))
649 }
650 }
651
652 #[doc(alias = "throttle-time")]
653 #[doc(alias = "gst_base_sink_get_throttle_time")]
654 pub fn throttle_time(&self) -> u64 {
655 unsafe {
656 gst_base::ffi::gst_base_sink_get_throttle_time(
657 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
658 )
659 }
660 }
661
662 #[doc(alias = "ts-offset")]
663 #[doc(alias = "gst_base_sink_get_ts_offset")]
664 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
665 unsafe {
666 gst_base::ffi::gst_base_sink_get_ts_offset(
667 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
668 )
669 }
670 }
671
672 #[doc(alias = "async")]
673 #[doc(alias = "gst_base_sink_is_async_enabled")]
674 pub fn is_async(&self) -> bool {
675 unsafe {
676 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
677 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
678 ))
679 }
680 }
681
682 #[doc(alias = "last-sample")]
683 pub fn enables_last_sample(&self) -> bool {
684 unsafe {
685 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
686 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
687 ))
688 }
689 }
690
691 #[doc(alias = "qos")]
692 #[doc(alias = "gst_base_sink_is_qos_enabled")]
693 pub fn is_qos(&self) -> bool {
694 unsafe {
695 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
696 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
697 ))
698 }
699 }
700
701 #[doc(alias = "async")]
702 #[doc(alias = "gst_base_sink_set_async_enabled")]
703 pub fn set_async(&self, enabled: bool) {
704 unsafe {
705 gst_base::ffi::gst_base_sink_set_async_enabled(
706 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
707 enabled.into_glib(),
708 );
709 }
710 }
711
712 #[doc(alias = "drop-out-of-segment")]
713 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
714 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
715 unsafe {
716 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
717 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
718 drop_out_of_segment.into_glib(),
719 );
720 }
721 }
722
723 #[doc(alias = "last-sample")]
724 pub fn set_enable_last_sample(&self, enabled: bool) {
725 unsafe {
726 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
727 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
728 enabled.into_glib(),
729 );
730 }
731 }
732
733 #[doc(alias = "max-bitrate")]
734 #[doc(alias = "gst_base_sink_set_max_bitrate")]
735 pub fn set_max_bitrate(&self, max_bitrate: u64) {
736 unsafe {
737 gst_base::ffi::gst_base_sink_set_max_bitrate(
738 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
739 max_bitrate,
740 );
741 }
742 }
743
744 #[doc(alias = "max-lateness")]
745 #[doc(alias = "gst_base_sink_set_max_lateness")]
746 pub fn set_max_lateness(&self, max_lateness: i64) {
747 unsafe {
748 gst_base::ffi::gst_base_sink_set_max_lateness(
749 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
750 max_lateness,
751 );
752 }
753 }
754
755 #[doc(alias = "processing-deadline")]
756 #[cfg(feature = "v1_16")]
757 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
758 #[doc(alias = "gst_base_sink_set_processing_deadline")]
759 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
760 unsafe {
761 gst_base::ffi::gst_base_sink_set_processing_deadline(
762 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
763 processing_deadline.into_glib(),
764 );
765 }
766 }
767
768 #[doc(alias = "qos")]
769 #[doc(alias = "gst_base_sink_set_qos_enabled")]
770 pub fn set_qos(&self, enabled: bool) {
771 unsafe {
772 gst_base::ffi::gst_base_sink_set_qos_enabled(
773 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
774 enabled.into_glib(),
775 );
776 }
777 }
778
779 #[doc(alias = "render-delay")]
780 #[doc(alias = "gst_base_sink_set_render_delay")]
781 pub fn set_render_delay(&self, delay: gst::ClockTime) {
782 unsafe {
783 gst_base::ffi::gst_base_sink_set_render_delay(
784 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
785 delay.into_glib(),
786 );
787 }
788 }
789
790 #[doc(alias = "sync")]
791 #[doc(alias = "gst_base_sink_set_sync")]
792 pub fn set_sync(&self, sync: bool) {
793 unsafe {
794 gst_base::ffi::gst_base_sink_set_sync(
795 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
796 sync.into_glib(),
797 );
798 }
799 }
800
801 #[doc(alias = "throttle-time")]
802 #[doc(alias = "gst_base_sink_set_throttle_time")]
803 pub fn set_throttle_time(&self, throttle: u64) {
804 unsafe {
805 gst_base::ffi::gst_base_sink_set_throttle_time(
806 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
807 throttle,
808 );
809 }
810 }
811
812 #[doc(alias = "ts-offset")]
813 #[doc(alias = "gst_base_sink_set_ts_offset")]
814 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
815 unsafe {
816 gst_base::ffi::gst_base_sink_set_ts_offset(
817 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
818 offset,
819 );
820 }
821 }
822
823 #[doc(alias = "async")]
824 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
825 &self,
826 f: F,
827 ) -> glib::SignalHandlerId {
828 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
829 this: *mut ffi::GstAppSink,
830 _param_spec: glib::ffi::gpointer,
831 f: glib::ffi::gpointer,
832 ) {
833 let f: &F = &*(f as *const F);
834 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
835 }
836 unsafe {
837 let f: Box<F> = Box::new(f);
838 glib::signal::connect_raw(
839 self.as_ptr() as *mut _,
840 b"notify::async\0".as_ptr() as *const _,
841 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
842 notify_async_trampoline::<F> as *const (),
843 )),
844 Box::into_raw(f),
845 )
846 }
847 }
848
849 #[doc(alias = "blocksize")]
850 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
851 &self,
852 f: F,
853 ) -> glib::SignalHandlerId {
854 unsafe extern "C" fn notify_blocksize_trampoline<
855 F: Fn(&AppSink) + Send + Sync + 'static,
856 >(
857 this: *mut ffi::GstAppSink,
858 _param_spec: glib::ffi::gpointer,
859 f: glib::ffi::gpointer,
860 ) {
861 let f: &F = &*(f as *const F);
862 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
863 }
864 unsafe {
865 let f: Box<F> = Box::new(f);
866 glib::signal::connect_raw(
867 self.as_ptr() as *mut _,
868 b"notify::blocksize\0".as_ptr() as *const _,
869 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
870 notify_blocksize_trampoline::<F> as *const (),
871 )),
872 Box::into_raw(f),
873 )
874 }
875 }
876
877 #[doc(alias = "enable-last-sample")]
878 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
879 &self,
880 f: F,
881 ) -> glib::SignalHandlerId {
882 unsafe extern "C" fn notify_enable_last_sample_trampoline<
883 F: Fn(&AppSink) + Send + Sync + 'static,
884 >(
885 this: *mut ffi::GstAppSink,
886 _param_spec: glib::ffi::gpointer,
887 f: glib::ffi::gpointer,
888 ) {
889 let f: &F = &*(f as *const F);
890 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
891 }
892 unsafe {
893 let f: Box<F> = Box::new(f);
894 glib::signal::connect_raw(
895 self.as_ptr() as *mut _,
896 b"notify::enable-last-sample\0".as_ptr() as *const _,
897 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
898 notify_enable_last_sample_trampoline::<F> as *const (),
899 )),
900 Box::into_raw(f),
901 )
902 }
903 }
904
905 #[doc(alias = "last-sample")]
906 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
907 &self,
908 f: F,
909 ) -> glib::SignalHandlerId {
910 unsafe extern "C" fn notify_last_sample_trampoline<
911 F: Fn(&AppSink) + Send + Sync + 'static,
912 >(
913 this: *mut ffi::GstAppSink,
914 _param_spec: glib::ffi::gpointer,
915 f: glib::ffi::gpointer,
916 ) {
917 let f: &F = &*(f as *const F);
918 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
919 }
920 unsafe {
921 let f: Box<F> = Box::new(f);
922 glib::signal::connect_raw(
923 self.as_ptr() as *mut _,
924 b"notify::last-sample\0".as_ptr() as *const _,
925 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
926 notify_last_sample_trampoline::<F> as *const (),
927 )),
928 Box::into_raw(f),
929 )
930 }
931 }
932
933 #[doc(alias = "max-bitrate")]
934 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
935 &self,
936 f: F,
937 ) -> glib::SignalHandlerId {
938 unsafe extern "C" fn notify_max_bitrate_trampoline<
939 F: Fn(&AppSink) + Send + Sync + 'static,
940 >(
941 this: *mut ffi::GstAppSink,
942 _param_spec: glib::ffi::gpointer,
943 f: glib::ffi::gpointer,
944 ) {
945 let f: &F = &*(f as *const F);
946 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
947 }
948 unsafe {
949 let f: Box<F> = Box::new(f);
950 glib::signal::connect_raw(
951 self.as_ptr() as *mut _,
952 b"notify::max-bitrate\0".as_ptr() as *const _,
953 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
954 notify_max_bitrate_trampoline::<F> as *const (),
955 )),
956 Box::into_raw(f),
957 )
958 }
959 }
960
961 #[doc(alias = "max-lateness")]
962 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
963 &self,
964 f: F,
965 ) -> glib::SignalHandlerId {
966 unsafe extern "C" fn notify_max_lateness_trampoline<
967 F: Fn(&AppSink) + Send + Sync + 'static,
968 >(
969 this: *mut ffi::GstAppSink,
970 _param_spec: glib::ffi::gpointer,
971 f: glib::ffi::gpointer,
972 ) {
973 let f: &F = &*(f as *const F);
974 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
975 }
976 unsafe {
977 let f: Box<F> = Box::new(f);
978 glib::signal::connect_raw(
979 self.as_ptr() as *mut _,
980 b"notify::max-lateness\0".as_ptr() as *const _,
981 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
982 notify_max_lateness_trampoline::<F> as *const (),
983 )),
984 Box::into_raw(f),
985 )
986 }
987 }
988
989 #[cfg(feature = "v1_16")]
990 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
991 #[doc(alias = "processing-deadline")]
992 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
993 &self,
994 f: F,
995 ) -> glib::SignalHandlerId {
996 unsafe extern "C" fn notify_processing_deadline_trampoline<
997 F: Fn(&AppSink) + Send + Sync + 'static,
998 >(
999 this: *mut ffi::GstAppSink,
1000 _param_spec: glib::ffi::gpointer,
1001 f: glib::ffi::gpointer,
1002 ) {
1003 let f: &F = &*(f as *const F);
1004 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1005 }
1006 unsafe {
1007 let f: Box<F> = Box::new(f);
1008 glib::signal::connect_raw(
1009 self.as_ptr() as *mut _,
1010 b"notify::processing-deadline\0".as_ptr() as *const _,
1011 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1012 notify_processing_deadline_trampoline::<F> as *const (),
1013 )),
1014 Box::into_raw(f),
1015 )
1016 }
1017 }
1018
1019 #[doc(alias = "qos")]
1020 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1021 &self,
1022 f: F,
1023 ) -> glib::SignalHandlerId {
1024 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1025 this: *mut ffi::GstAppSink,
1026 _param_spec: glib::ffi::gpointer,
1027 f: glib::ffi::gpointer,
1028 ) {
1029 let f: &F = &*(f as *const F);
1030 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1031 }
1032 unsafe {
1033 let f: Box<F> = Box::new(f);
1034 glib::signal::connect_raw(
1035 self.as_ptr() as *mut _,
1036 b"notify::qos\0".as_ptr() as *const _,
1037 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1038 notify_qos_trampoline::<F> as *const (),
1039 )),
1040 Box::into_raw(f),
1041 )
1042 }
1043 }
1044
1045 #[doc(alias = "render-delay")]
1046 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1047 &self,
1048 f: F,
1049 ) -> glib::SignalHandlerId {
1050 unsafe extern "C" fn notify_render_delay_trampoline<
1051 F: Fn(&AppSink) + Send + Sync + 'static,
1052 >(
1053 this: *mut ffi::GstAppSink,
1054 _param_spec: glib::ffi::gpointer,
1055 f: glib::ffi::gpointer,
1056 ) {
1057 let f: &F = &*(f as *const F);
1058 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1059 }
1060 unsafe {
1061 let f: Box<F> = Box::new(f);
1062 glib::signal::connect_raw(
1063 self.as_ptr() as *mut _,
1064 b"notify::render-delay\0".as_ptr() as *const _,
1065 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1066 notify_render_delay_trampoline::<F> as *const (),
1067 )),
1068 Box::into_raw(f),
1069 )
1070 }
1071 }
1072
1073 #[cfg(feature = "v1_18")]
1074 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1075 #[doc(alias = "stats")]
1076 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1077 &self,
1078 f: F,
1079 ) -> glib::SignalHandlerId {
1080 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1081 this: *mut ffi::GstAppSink,
1082 _param_spec: glib::ffi::gpointer,
1083 f: glib::ffi::gpointer,
1084 ) {
1085 let f: &F = &*(f as *const F);
1086 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1087 }
1088 unsafe {
1089 let f: Box<F> = Box::new(f);
1090 glib::signal::connect_raw(
1091 self.as_ptr() as *mut _,
1092 b"notify::stats\0".as_ptr() as *const _,
1093 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1094 notify_stats_trampoline::<F> as *const (),
1095 )),
1096 Box::into_raw(f),
1097 )
1098 }
1099 }
1100
1101 #[doc(alias = "sync")]
1102 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1103 &self,
1104 f: F,
1105 ) -> glib::SignalHandlerId {
1106 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1107 this: *mut ffi::GstAppSink,
1108 _param_spec: glib::ffi::gpointer,
1109 f: glib::ffi::gpointer,
1110 ) {
1111 let f: &F = &*(f as *const F);
1112 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1113 }
1114 unsafe {
1115 let f: Box<F> = Box::new(f);
1116 glib::signal::connect_raw(
1117 self.as_ptr() as *mut _,
1118 b"notify::sync\0".as_ptr() as *const _,
1119 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1120 notify_sync_trampoline::<F> as *const (),
1121 )),
1122 Box::into_raw(f),
1123 )
1124 }
1125 }
1126
1127 #[doc(alias = "throttle-time")]
1128 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1129 &self,
1130 f: F,
1131 ) -> glib::SignalHandlerId {
1132 unsafe extern "C" fn notify_throttle_time_trampoline<
1133 F: Fn(&AppSink) + Send + Sync + 'static,
1134 >(
1135 this: *mut ffi::GstAppSink,
1136 _param_spec: glib::ffi::gpointer,
1137 f: glib::ffi::gpointer,
1138 ) {
1139 let f: &F = &*(f as *const F);
1140 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1141 }
1142 unsafe {
1143 let f: Box<F> = Box::new(f);
1144 glib::signal::connect_raw(
1145 self.as_ptr() as *mut _,
1146 b"notify::throttle-time\0".as_ptr() as *const _,
1147 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1148 notify_throttle_time_trampoline::<F> as *const (),
1149 )),
1150 Box::into_raw(f),
1151 )
1152 }
1153 }
1154
1155 #[doc(alias = "ts-offset")]
1156 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1157 &self,
1158 f: F,
1159 ) -> glib::SignalHandlerId {
1160 unsafe extern "C" fn notify_ts_offset_trampoline<
1161 F: Fn(&AppSink) + Send + Sync + 'static,
1162 >(
1163 this: *mut ffi::GstAppSink,
1164 _param_spec: glib::ffi::gpointer,
1165 f: glib::ffi::gpointer,
1166 ) {
1167 let f: &F = &*(f as *const F);
1168 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1169 }
1170 unsafe {
1171 let f: Box<F> = Box::new(f);
1172 glib::signal::connect_raw(
1173 self.as_ptr() as *mut _,
1174 b"notify::ts-offset\0".as_ptr() as *const _,
1175 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1176 notify_ts_offset_trampoline::<F> as *const (),
1177 )),
1178 Box::into_raw(f),
1179 )
1180 }
1181 }
1182
1183 pub fn stream(&self) -> AppSinkStream {
1184 AppSinkStream::new(self)
1185 }
1186}
1187
1188#[must_use = "The builder must be built to be used"]
1193pub struct AppSinkBuilder<'a> {
1194 builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1195 callbacks: Option<AppSinkCallbacks>,
1196 drop_out_of_segment: Option<bool>,
1197}
1198
1199impl<'a> AppSinkBuilder<'a> {
1200 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1208 pub fn build(self) -> AppSink {
1209 let appsink = self.builder.build().unwrap();
1210
1211 if let Some(callbacks) = self.callbacks {
1212 appsink.set_callbacks(callbacks);
1213 }
1214
1215 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1216 appsink.set_drop_out_of_segment(drop_out_of_segment);
1217 }
1218
1219 appsink
1220 }
1221
1222 pub fn async_(self, async_: bool) -> Self {
1223 Self {
1224 builder: self.builder.property("async", async_),
1225 ..self
1226 }
1227 }
1228
1229 pub fn buffer_list(self, buffer_list: bool) -> Self {
1230 Self {
1231 builder: self.builder.property("buffer-list", buffer_list),
1232 ..self
1233 }
1234 }
1235
1236 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1237 Self {
1238 callbacks: Some(callbacks),
1239 ..self
1240 }
1241 }
1242
1243 pub fn caps(self, caps: &'a gst::Caps) -> Self {
1244 Self {
1245 builder: self.builder.property("caps", caps),
1246 ..self
1247 }
1248 }
1249
1250 pub fn drop(self, drop: bool) -> Self {
1251 Self {
1252 builder: self.builder.property("drop", drop),
1253 ..self
1254 }
1255 }
1256
1257 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1258 Self {
1259 builder: self
1260 .builder
1261 .property("drop-out-of-segment", drop_out_of_segment),
1262 ..self
1263 }
1264 }
1265
1266 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1267 Self {
1268 builder: self
1269 .builder
1270 .property("enable-last-sample", enable_last_sample),
1271 ..self
1272 }
1273 }
1274
1275 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1276 Self {
1277 builder: self.builder.property("max-bitrate", max_bitrate),
1278 ..self
1279 }
1280 }
1281
1282 pub fn max_buffers(self, max_buffers: u32) -> Self {
1283 Self {
1284 builder: self.builder.property("max-buffers", max_buffers),
1285 ..self
1286 }
1287 }
1288
1289 pub fn max_lateness(self, max_lateness: i64) -> Self {
1290 Self {
1291 builder: self.builder.property("max-lateness", max_lateness),
1292 ..self
1293 }
1294 }
1295
1296 #[cfg(feature = "v1_16")]
1297 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1298 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1299 Self {
1300 builder: self
1301 .builder
1302 .property("processing-deadline", processing_deadline),
1303 ..self
1304 }
1305 }
1306
1307 pub fn qos(self, qos: bool) -> Self {
1308 Self {
1309 builder: self.builder.property("qos", qos),
1310 ..self
1311 }
1312 }
1313
1314 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1315 Self {
1316 builder: self.builder.property("render-delay", render_delay),
1317 ..self
1318 }
1319 }
1320
1321 pub fn sync(self, sync: bool) -> Self {
1322 Self {
1323 builder: self.builder.property("sync", sync),
1324 ..self
1325 }
1326 }
1327
1328 pub fn throttle_time(self, throttle_time: u64) -> Self {
1329 Self {
1330 builder: self.builder.property("throttle-time", throttle_time),
1331 ..self
1332 }
1333 }
1334
1335 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1336 Self {
1337 builder: self.builder.property("ts-offset", ts_offset),
1338 ..self
1339 }
1340 }
1341
1342 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1343 Self {
1344 builder: self.builder.property("wait-on-eos", wait_on_eos),
1345 ..self
1346 }
1347 }
1348
1349 #[cfg(feature = "v1_24")]
1350 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1351 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1352 Self {
1353 builder: self.builder.property("max-time", max_time),
1354 ..self
1355 }
1356 }
1357
1358 #[cfg(feature = "v1_24")]
1359 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1360 pub fn max_bytes(self, max_bytes: u64) -> Self {
1361 Self {
1362 builder: self.builder.property("max-bytes", max_bytes),
1363 ..self
1364 }
1365 }
1366
1367 #[inline]
1372 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1373 Self {
1374 builder: self.builder.property(name, value),
1375 ..self
1376 }
1377 }
1378
1379 #[inline]
1382 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1383 Self {
1384 builder: self.builder.property_from_str(name, value),
1385 ..self
1386 }
1387 }
1388
1389 gst::impl_builder_gvalue_extra_setters!(property_and_name);
1390}
1391
1392#[derive(Debug)]
1393pub struct AppSinkStream {
1394 app_sink: glib::WeakRef<AppSink>,
1395 waker_reference: Arc<Mutex<Option<Waker>>>,
1396}
1397
1398impl AppSinkStream {
1399 fn new(app_sink: &AppSink) -> Self {
1400 skip_assert_initialized!();
1401
1402 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1403
1404 app_sink.set_callbacks(
1405 AppSinkCallbacks::builder()
1406 .new_sample({
1407 let waker_reference = Arc::clone(&waker_reference);
1408
1409 move |_| {
1410 if let Some(waker) = waker_reference.lock().unwrap().take() {
1411 waker.wake();
1412 }
1413
1414 Ok(gst::FlowSuccess::Ok)
1415 }
1416 })
1417 .eos({
1418 let waker_reference = Arc::clone(&waker_reference);
1419
1420 move |_| {
1421 if let Some(waker) = waker_reference.lock().unwrap().take() {
1422 waker.wake();
1423 }
1424 }
1425 })
1426 .build(),
1427 );
1428
1429 Self {
1430 app_sink: app_sink.downgrade(),
1431 waker_reference,
1432 }
1433 }
1434}
1435
1436impl Drop for AppSinkStream {
1437 fn drop(&mut self) {
1438 #[cfg(not(feature = "v1_18"))]
1439 {
1440 if gst::version() >= (1, 16, 3, 0) {
1443 if let Some(app_sink) = self.app_sink.upgrade() {
1444 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1445 }
1446 }
1447 }
1448 }
1449}
1450
1451impl Stream for AppSinkStream {
1452 type Item = gst::Sample;
1453
1454 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1455 let mut waker = self.waker_reference.lock().unwrap();
1456
1457 let Some(app_sink) = self.app_sink.upgrade() else {
1458 return Poll::Ready(None);
1459 };
1460
1461 app_sink
1462 .try_pull_sample(gst::ClockTime::ZERO)
1463 .map(|sample| Poll::Ready(Some(sample)))
1464 .unwrap_or_else(|| {
1465 if app_sink.is_eos() {
1466 return Poll::Ready(None);
1467 }
1468
1469 waker.replace(context.waker().to_owned());
1470
1471 Poll::Pending
1472 })
1473 }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478 use futures_util::StreamExt;
1479 use gst::prelude::*;
1480
1481 use super::*;
1482
1483 #[test]
1484 fn test_app_sink_stream() {
1485 gst::init().unwrap();
1486
1487 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1488 .property("num-buffers", 5)
1489 .build()
1490 .unwrap();
1491 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1492
1493 let pipeline = gst::Pipeline::new();
1494 pipeline.add(&videotestsrc).unwrap();
1495 pipeline.add(&appsink).unwrap();
1496
1497 videotestsrc.link(&appsink).unwrap();
1498
1499 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1500 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1501
1502 pipeline.set_state(gst::State::Playing).unwrap();
1503 let samples = futures_executor::block_on(samples_future);
1504 pipeline.set_state(gst::State::Null).unwrap();
1505
1506 assert_eq!(samples.len(), 5);
1507 }
1508}