1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate { self.eos(eos) } else { self }
77 }
78
79 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80 if let Some(eos) = eos {
81 self.eos(eos)
82 } else {
83 self
84 }
85 }
86
87 pub fn new_preroll<
88 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89 >(
90 self,
91 new_preroll: F,
92 ) -> Self {
93 Self {
94 new_preroll: Some(Box::new(new_preroll)),
95 ..self
96 }
97 }
98
99 pub fn new_preroll_if<
100 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101 >(
102 self,
103 new_preroll: F,
104 predicate: bool,
105 ) -> Self {
106 if predicate {
107 self.new_preroll(new_preroll)
108 } else {
109 self
110 }
111 }
112
113 pub fn new_preroll_if_some<
114 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115 >(
116 self,
117 new_preroll: Option<F>,
118 ) -> Self {
119 if let Some(new_preroll) = new_preroll {
120 self.new_preroll(new_preroll)
121 } else {
122 self
123 }
124 }
125
126 pub fn new_sample<
127 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128 >(
129 self,
130 new_sample: F,
131 ) -> Self {
132 Self {
133 new_sample: Some(Box::new(new_sample)),
134 ..self
135 }
136 }
137
138 pub fn new_sample_if<
139 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140 >(
141 self,
142 new_sample: F,
143 predicate: bool,
144 ) -> Self {
145 if predicate {
146 self.new_sample(new_sample)
147 } else {
148 self
149 }
150 }
151
152 pub fn new_sample_if_some<
153 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154 >(
155 self,
156 new_sample: Option<F>,
157 ) -> Self {
158 if let Some(new_sample) = new_sample {
159 self.new_sample(new_sample)
160 } else {
161 self
162 }
163 }
164
165 #[cfg(feature = "v1_20")]
166 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168 Self {
169 new_event: Some(Box::new(new_event)),
170 ..self
171 }
172 }
173
174 #[cfg(feature = "v1_20")]
175 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177 self,
178 new_event: F,
179 predicate: bool,
180 ) -> Self {
181 if predicate {
182 self.new_event(new_event)
183 } else {
184 self
185 }
186 }
187
188 #[cfg(feature = "v1_20")]
189 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191 self,
192 new_event: Option<F>,
193 ) -> Self {
194 if let Some(new_event) = new_event {
195 self.new_event(new_event)
196 } else {
197 self
198 }
199 }
200
201 #[cfg(feature = "v1_24")]
202 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203 pub fn propose_allocation<
204 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205 >(
206 self,
207 propose_allocation: F,
208 ) -> Self {
209 Self {
210 propose_allocation: Some(Box::new(propose_allocation)),
211 ..self
212 }
213 }
214
215 #[cfg(feature = "v1_24")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217 pub fn propose_allocation_if<
218 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219 >(
220 self,
221 propose_allocation: F,
222 predicate: bool,
223 ) -> Self {
224 if predicate {
225 self.propose_allocation(propose_allocation)
226 } else {
227 self
228 }
229 }
230
231 #[cfg(feature = "v1_24")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233 pub fn propose_allocation_if_some<
234 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235 >(
236 self,
237 propose_allocation: Option<F>,
238 ) -> Self {
239 if let Some(propose_allocation) = propose_allocation {
240 self.propose_allocation(propose_allocation)
241 } else {
242 self
243 }
244 }
245
246 #[must_use = "Building the callbacks without using them has no effect"]
247 pub fn build(self) -> AppSinkCallbacks {
248 let have_eos = self.eos.is_some();
249 let have_new_preroll = self.new_preroll.is_some();
250 let have_new_sample = self.new_sample.is_some();
251 let have_new_event = self.new_event.is_some();
252 let have_propose_allocation = self.propose_allocation.is_some();
253
254 AppSinkCallbacks {
255 eos: self.eos,
256 new_preroll: self.new_preroll,
257 new_sample: self.new_sample,
258 new_event: self.new_event,
259 propose_allocation: self.propose_allocation,
260 #[cfg(not(panic = "abort"))]
261 panicked: AtomicBool::new(false),
262 callbacks: ffi::GstAppSinkCallbacks {
263 eos: if have_eos { Some(trampoline_eos) } else { None },
264 new_preroll: if have_new_preroll {
265 Some(trampoline_new_preroll)
266 } else {
267 None
268 },
269 new_sample: if have_new_sample {
270 Some(trampoline_new_sample)
271 } else {
272 None
273 },
274 new_event: if have_new_event {
275 Some(trampoline_new_event)
276 } else {
277 None
278 },
279 propose_allocation: if have_propose_allocation {
280 Some(trampoline_propose_allocation)
281 } else {
282 None
283 },
284 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285 },
286 }
287 }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291 unsafe {
292 let callbacks = callbacks as *mut AppSinkCallbacks;
293 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295 #[cfg(not(panic = "abort"))]
296 if (*callbacks).panicked.load(Ordering::Relaxed) {
297 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298 gst::subclass::post_panic_error_message(
299 element.upcast_ref(),
300 element.upcast_ref(),
301 None,
302 );
303 return;
304 }
305
306 if let Some(ref mut eos) = (*callbacks).eos {
307 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308 match result {
309 Ok(result) => result,
310 Err(err) => {
311 #[cfg(panic = "abort")]
312 {
313 unreachable!("{err:?}");
314 }
315 #[cfg(not(panic = "abort"))]
316 {
317 (*callbacks).panicked.store(true, Ordering::Relaxed);
318 gst::subclass::post_panic_error_message(
319 element.upcast_ref(),
320 element.upcast_ref(),
321 Some(err),
322 );
323 }
324 }
325 }
326 }
327 }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331 appsink: *mut ffi::GstAppSink,
332 callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334 unsafe {
335 let callbacks = callbacks as *mut AppSinkCallbacks;
336 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338 #[cfg(not(panic = "abort"))]
339 if (*callbacks).panicked.load(Ordering::Relaxed) {
340 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341 gst::subclass::post_panic_error_message(
342 element.upcast_ref(),
343 element.upcast_ref(),
344 None,
345 );
346 return gst::FlowReturn::Error.into_glib();
347 }
348
349 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350 let result =
351 panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352 match result {
353 Ok(result) => result,
354 Err(err) => {
355 #[cfg(panic = "abort")]
356 {
357 unreachable!("{err:?}");
358 }
359 #[cfg(not(panic = "abort"))]
360 {
361 (*callbacks).panicked.store(true, Ordering::Relaxed);
362 gst::subclass::post_panic_error_message(
363 element.upcast_ref(),
364 element.upcast_ref(),
365 Some(err),
366 );
367
368 gst::FlowReturn::Error
369 }
370 }
371 }
372 } else {
373 gst::FlowReturn::Error
374 };
375
376 ret.into_glib()
377 }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381 appsink: *mut ffi::GstAppSink,
382 callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384 unsafe {
385 let callbacks = callbacks as *mut AppSinkCallbacks;
386 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388 #[cfg(not(panic = "abort"))]
389 if (*callbacks).panicked.load(Ordering::Relaxed) {
390 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391 gst::subclass::post_panic_error_message(
392 element.upcast_ref(),
393 element.upcast_ref(),
394 None,
395 );
396 return gst::FlowReturn::Error.into_glib();
397 }
398
399 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400 let result =
401 panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402 match result {
403 Ok(result) => result,
404 Err(err) => {
405 #[cfg(panic = "abort")]
406 {
407 unreachable!("{err:?}");
408 }
409 #[cfg(not(panic = "abort"))]
410 {
411 (*callbacks).panicked.store(true, Ordering::Relaxed);
412 gst::subclass::post_panic_error_message(
413 element.upcast_ref(),
414 element.upcast_ref(),
415 Some(err),
416 );
417
418 gst::FlowReturn::Error
419 }
420 }
421 }
422 } else {
423 gst::FlowReturn::Error
424 };
425
426 ret.into_glib()
427 }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431 appsink: *mut ffi::GstAppSink,
432 callbacks: gpointer,
433) -> glib::ffi::gboolean {
434 unsafe {
435 let callbacks = callbacks as *mut AppSinkCallbacks;
436 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438 #[cfg(not(panic = "abort"))]
439 if (*callbacks).panicked.load(Ordering::Relaxed) {
440 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441 gst::subclass::post_panic_error_message(
442 element.upcast_ref(),
443 element.upcast_ref(),
444 None,
445 );
446 return false.into_glib();
447 }
448
449 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451 match result {
452 Ok(result) => result,
453 Err(err) => {
454 #[cfg(panic = "abort")]
455 {
456 unreachable!("{err:?}");
457 }
458 #[cfg(not(panic = "abort"))]
459 {
460 (*callbacks).panicked.store(true, Ordering::Relaxed);
461 gst::subclass::post_panic_error_message(
462 element.upcast_ref(),
463 element.upcast_ref(),
464 Some(err),
465 );
466
467 false
468 }
469 }
470 }
471 } else {
472 false
473 };
474
475 ret.into_glib()
476 }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480 appsink: *mut ffi::GstAppSink,
481 query: *mut gst::ffi::GstQuery,
482 callbacks: gpointer,
483) -> glib::ffi::gboolean {
484 unsafe {
485 let callbacks = callbacks as *mut AppSinkCallbacks;
486 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488 #[cfg(not(panic = "abort"))]
489 if (*callbacks).panicked.load(Ordering::Relaxed) {
490 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491 gst::subclass::post_panic_error_message(
492 element.upcast_ref(),
493 element.upcast_ref(),
494 None,
495 );
496 return false.into_glib();
497 }
498
499 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501 gst::QueryViewMut::Allocation(allocation) => allocation,
502 _ => unreachable!(),
503 };
504 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505 propose_allocation(&element, query)
506 }));
507 match result {
508 Ok(result) => result,
509 Err(err) => {
510 #[cfg(panic = "abort")]
511 {
512 unreachable!("{err:?}");
513 }
514 #[cfg(not(panic = "abort"))]
515 {
516 (*callbacks).panicked.store(true, Ordering::Relaxed);
517 gst::subclass::post_panic_error_message(
518 element.upcast_ref(),
519 element.upcast_ref(),
520 Some(err),
521 );
522 false
523 }
524 }
525 }
526 } else {
527 false
528 };
529
530 ret.into_glib()
531 }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535 unsafe {
536 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537 }
538}
539
540impl AppSink {
541 pub fn builder<'a>() -> AppSinkBuilder<'a> {
546 assert_initialized_main_thread!();
547 AppSinkBuilder {
548 builder: gst::Object::builder(),
549 callbacks: None,
550 drop_out_of_segment: None,
551 }
552 }
553
554 #[doc(alias = "gst_app_sink_set_callbacks")]
572 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
573 unsafe {
574 let sink = self.to_glib_none().0;
575
576 #[allow(clippy::manual_dangling_ptr)]
577 #[cfg(not(feature = "v1_18"))]
578 {
579 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
580 std::sync::OnceLock::new();
581
582 let set_once_quark = SET_ONCE_QUARK
583 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
584
585 if gst::version() < (1, 16, 3, 0) {
588 if !glib::gobject_ffi::g_object_get_qdata(
589 sink as *mut _,
590 set_once_quark.into_glib(),
591 )
592 .is_null()
593 {
594 panic!("AppSink callbacks can only be set once");
595 }
596
597 glib::gobject_ffi::g_object_set_qdata(
598 sink as *mut _,
599 set_once_quark.into_glib(),
600 1 as *mut _,
601 );
602 }
603 }
604
605 ffi::gst_app_sink_set_callbacks(
606 sink,
607 mut_override(&callbacks.callbacks),
608 Box::into_raw(Box::new(callbacks)) as *mut _,
609 Some(destroy_callbacks),
610 );
611 }
612 }
613
614 #[doc(alias = "drop-out-of-segment")]
615 pub fn drops_out_of_segment(&self) -> bool {
616 unsafe {
617 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
618 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
619 ))
620 }
621 }
622
623 #[doc(alias = "max-bitrate")]
624 #[doc(alias = "gst_base_sink_get_max_bitrate")]
625 pub fn max_bitrate(&self) -> u64 {
626 unsafe {
627 gst_base::ffi::gst_base_sink_get_max_bitrate(
628 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
629 )
630 }
631 }
632
633 #[doc(alias = "max-lateness")]
634 #[doc(alias = "gst_base_sink_get_max_lateness")]
635 pub fn max_lateness(&self) -> i64 {
636 unsafe {
637 gst_base::ffi::gst_base_sink_get_max_lateness(
638 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
639 )
640 }
641 }
642
643 #[doc(alias = "processing-deadline")]
644 #[cfg(feature = "v1_16")]
645 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
646 #[doc(alias = "gst_base_sink_get_processing_deadline")]
647 pub fn processing_deadline(&self) -> gst::ClockTime {
648 unsafe {
649 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
650 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
651 ))
652 .expect("undefined processing_deadline")
653 }
654 }
655
656 #[doc(alias = "render-delay")]
657 #[doc(alias = "gst_base_sink_get_render_delay")]
658 pub fn render_delay(&self) -> gst::ClockTime {
659 unsafe {
660 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
661 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
662 ))
663 .expect("undefined render_delay")
664 }
665 }
666
667 #[cfg(feature = "v1_18")]
668 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
669 #[doc(alias = "gst_base_sink_get_stats")]
670 pub fn stats(&self) -> gst::Structure {
671 unsafe {
672 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
673 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
674 ))
675 }
676 }
677
678 #[doc(alias = "sync")]
679 pub fn is_sync(&self) -> bool {
680 unsafe {
681 from_glib(gst_base::ffi::gst_base_sink_get_sync(
682 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
683 ))
684 }
685 }
686
687 #[doc(alias = "throttle-time")]
688 #[doc(alias = "gst_base_sink_get_throttle_time")]
689 pub fn throttle_time(&self) -> u64 {
690 unsafe {
691 gst_base::ffi::gst_base_sink_get_throttle_time(
692 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
693 )
694 }
695 }
696
697 #[doc(alias = "ts-offset")]
698 #[doc(alias = "gst_base_sink_get_ts_offset")]
699 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
700 unsafe {
701 gst_base::ffi::gst_base_sink_get_ts_offset(
702 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
703 )
704 }
705 }
706
707 #[doc(alias = "async")]
708 #[doc(alias = "gst_base_sink_is_async_enabled")]
709 pub fn is_async(&self) -> bool {
710 unsafe {
711 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
712 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
713 ))
714 }
715 }
716
717 #[doc(alias = "last-sample")]
718 pub fn enables_last_sample(&self) -> bool {
719 unsafe {
720 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
721 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
722 ))
723 }
724 }
725
726 #[doc(alias = "qos")]
727 #[doc(alias = "gst_base_sink_is_qos_enabled")]
728 pub fn is_qos(&self) -> bool {
729 unsafe {
730 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
731 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
732 ))
733 }
734 }
735
736 #[doc(alias = "async")]
737 #[doc(alias = "gst_base_sink_set_async_enabled")]
738 pub fn set_async(&self, enabled: bool) {
739 unsafe {
740 gst_base::ffi::gst_base_sink_set_async_enabled(
741 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
742 enabled.into_glib(),
743 );
744 }
745 }
746
747 #[doc(alias = "drop-out-of-segment")]
748 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
749 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
750 unsafe {
751 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
752 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
753 drop_out_of_segment.into_glib(),
754 );
755 }
756 }
757
758 #[doc(alias = "last-sample")]
759 pub fn set_enable_last_sample(&self, enabled: bool) {
760 unsafe {
761 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
762 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
763 enabled.into_glib(),
764 );
765 }
766 }
767
768 #[doc(alias = "max-bitrate")]
769 #[doc(alias = "gst_base_sink_set_max_bitrate")]
770 pub fn set_max_bitrate(&self, max_bitrate: u64) {
771 unsafe {
772 gst_base::ffi::gst_base_sink_set_max_bitrate(
773 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
774 max_bitrate,
775 );
776 }
777 }
778
779 #[doc(alias = "max-lateness")]
780 #[doc(alias = "gst_base_sink_set_max_lateness")]
781 pub fn set_max_lateness(&self, max_lateness: i64) {
782 unsafe {
783 gst_base::ffi::gst_base_sink_set_max_lateness(
784 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
785 max_lateness,
786 );
787 }
788 }
789
790 #[doc(alias = "processing-deadline")]
791 #[cfg(feature = "v1_16")]
792 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
793 #[doc(alias = "gst_base_sink_set_processing_deadline")]
794 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
795 unsafe {
796 gst_base::ffi::gst_base_sink_set_processing_deadline(
797 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
798 processing_deadline.into_glib(),
799 );
800 }
801 }
802
803 #[doc(alias = "qos")]
804 #[doc(alias = "gst_base_sink_set_qos_enabled")]
805 pub fn set_qos(&self, enabled: bool) {
806 unsafe {
807 gst_base::ffi::gst_base_sink_set_qos_enabled(
808 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
809 enabled.into_glib(),
810 );
811 }
812 }
813
814 #[doc(alias = "render-delay")]
815 #[doc(alias = "gst_base_sink_set_render_delay")]
816 pub fn set_render_delay(&self, delay: gst::ClockTime) {
817 unsafe {
818 gst_base::ffi::gst_base_sink_set_render_delay(
819 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
820 delay.into_glib(),
821 );
822 }
823 }
824
825 #[doc(alias = "sync")]
826 #[doc(alias = "gst_base_sink_set_sync")]
827 pub fn set_sync(&self, sync: bool) {
828 unsafe {
829 gst_base::ffi::gst_base_sink_set_sync(
830 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
831 sync.into_glib(),
832 );
833 }
834 }
835
836 #[doc(alias = "throttle-time")]
837 #[doc(alias = "gst_base_sink_set_throttle_time")]
838 pub fn set_throttle_time(&self, throttle: u64) {
839 unsafe {
840 gst_base::ffi::gst_base_sink_set_throttle_time(
841 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
842 throttle,
843 );
844 }
845 }
846
847 #[doc(alias = "ts-offset")]
848 #[doc(alias = "gst_base_sink_set_ts_offset")]
849 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
850 unsafe {
851 gst_base::ffi::gst_base_sink_set_ts_offset(
852 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
853 offset,
854 );
855 }
856 }
857
858 #[doc(alias = "async")]
859 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
860 &self,
861 f: F,
862 ) -> glib::SignalHandlerId {
863 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
864 this: *mut ffi::GstAppSink,
865 _param_spec: glib::ffi::gpointer,
866 f: glib::ffi::gpointer,
867 ) {
868 unsafe {
869 let f: &F = &*(f as *const F);
870 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
871 }
872 }
873 unsafe {
874 let f: Box<F> = Box::new(f);
875 glib::signal::connect_raw(
876 self.as_ptr() as *mut _,
877 b"notify::async\0".as_ptr() as *const _,
878 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
879 notify_async_trampoline::<F> as *const (),
880 )),
881 Box::into_raw(f),
882 )
883 }
884 }
885
886 #[doc(alias = "blocksize")]
887 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
888 &self,
889 f: F,
890 ) -> glib::SignalHandlerId {
891 unsafe extern "C" fn notify_blocksize_trampoline<
892 F: Fn(&AppSink) + Send + Sync + 'static,
893 >(
894 this: *mut ffi::GstAppSink,
895 _param_spec: glib::ffi::gpointer,
896 f: glib::ffi::gpointer,
897 ) {
898 unsafe {
899 let f: &F = &*(f as *const F);
900 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
901 }
902 }
903 unsafe {
904 let f: Box<F> = Box::new(f);
905 glib::signal::connect_raw(
906 self.as_ptr() as *mut _,
907 b"notify::blocksize\0".as_ptr() as *const _,
908 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
909 notify_blocksize_trampoline::<F> as *const (),
910 )),
911 Box::into_raw(f),
912 )
913 }
914 }
915
916 #[doc(alias = "enable-last-sample")]
917 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
918 &self,
919 f: F,
920 ) -> glib::SignalHandlerId {
921 unsafe extern "C" fn notify_enable_last_sample_trampoline<
922 F: Fn(&AppSink) + Send + Sync + 'static,
923 >(
924 this: *mut ffi::GstAppSink,
925 _param_spec: glib::ffi::gpointer,
926 f: glib::ffi::gpointer,
927 ) {
928 unsafe {
929 let f: &F = &*(f as *const F);
930 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
931 }
932 }
933 unsafe {
934 let f: Box<F> = Box::new(f);
935 glib::signal::connect_raw(
936 self.as_ptr() as *mut _,
937 b"notify::enable-last-sample\0".as_ptr() as *const _,
938 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
939 notify_enable_last_sample_trampoline::<F> as *const (),
940 )),
941 Box::into_raw(f),
942 )
943 }
944 }
945
946 #[doc(alias = "last-sample")]
947 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
948 &self,
949 f: F,
950 ) -> glib::SignalHandlerId {
951 unsafe extern "C" fn notify_last_sample_trampoline<
952 F: Fn(&AppSink) + Send + Sync + 'static,
953 >(
954 this: *mut ffi::GstAppSink,
955 _param_spec: glib::ffi::gpointer,
956 f: glib::ffi::gpointer,
957 ) {
958 unsafe {
959 let f: &F = &*(f as *const F);
960 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
961 }
962 }
963 unsafe {
964 let f: Box<F> = Box::new(f);
965 glib::signal::connect_raw(
966 self.as_ptr() as *mut _,
967 b"notify::last-sample\0".as_ptr() as *const _,
968 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
969 notify_last_sample_trampoline::<F> as *const (),
970 )),
971 Box::into_raw(f),
972 )
973 }
974 }
975
976 #[doc(alias = "max-bitrate")]
977 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
978 &self,
979 f: F,
980 ) -> glib::SignalHandlerId {
981 unsafe extern "C" fn notify_max_bitrate_trampoline<
982 F: Fn(&AppSink) + Send + Sync + 'static,
983 >(
984 this: *mut ffi::GstAppSink,
985 _param_spec: glib::ffi::gpointer,
986 f: glib::ffi::gpointer,
987 ) {
988 unsafe {
989 let f: &F = &*(f as *const F);
990 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
991 }
992 }
993 unsafe {
994 let f: Box<F> = Box::new(f);
995 glib::signal::connect_raw(
996 self.as_ptr() as *mut _,
997 b"notify::max-bitrate\0".as_ptr() as *const _,
998 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
999 notify_max_bitrate_trampoline::<F> as *const (),
1000 )),
1001 Box::into_raw(f),
1002 )
1003 }
1004 }
1005
1006 #[doc(alias = "max-lateness")]
1007 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
1008 &self,
1009 f: F,
1010 ) -> glib::SignalHandlerId {
1011 unsafe extern "C" fn notify_max_lateness_trampoline<
1012 F: Fn(&AppSink) + Send + Sync + 'static,
1013 >(
1014 this: *mut ffi::GstAppSink,
1015 _param_spec: glib::ffi::gpointer,
1016 f: glib::ffi::gpointer,
1017 ) {
1018 unsafe {
1019 let f: &F = &*(f as *const F);
1020 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1021 }
1022 }
1023 unsafe {
1024 let f: Box<F> = Box::new(f);
1025 glib::signal::connect_raw(
1026 self.as_ptr() as *mut _,
1027 b"notify::max-lateness\0".as_ptr() as *const _,
1028 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1029 notify_max_lateness_trampoline::<F> as *const (),
1030 )),
1031 Box::into_raw(f),
1032 )
1033 }
1034 }
1035
1036 #[cfg(feature = "v1_16")]
1037 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1038 #[doc(alias = "processing-deadline")]
1039 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1040 &self,
1041 f: F,
1042 ) -> glib::SignalHandlerId {
1043 unsafe extern "C" fn notify_processing_deadline_trampoline<
1044 F: Fn(&AppSink) + Send + Sync + 'static,
1045 >(
1046 this: *mut ffi::GstAppSink,
1047 _param_spec: glib::ffi::gpointer,
1048 f: glib::ffi::gpointer,
1049 ) {
1050 unsafe {
1051 let f: &F = &*(f as *const F);
1052 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1053 }
1054 }
1055 unsafe {
1056 let f: Box<F> = Box::new(f);
1057 glib::signal::connect_raw(
1058 self.as_ptr() as *mut _,
1059 b"notify::processing-deadline\0".as_ptr() as *const _,
1060 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1061 notify_processing_deadline_trampoline::<F> as *const (),
1062 )),
1063 Box::into_raw(f),
1064 )
1065 }
1066 }
1067
1068 #[doc(alias = "qos")]
1069 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1070 &self,
1071 f: F,
1072 ) -> glib::SignalHandlerId {
1073 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1074 this: *mut ffi::GstAppSink,
1075 _param_spec: glib::ffi::gpointer,
1076 f: glib::ffi::gpointer,
1077 ) {
1078 unsafe {
1079 let f: &F = &*(f as *const F);
1080 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1081 }
1082 }
1083 unsafe {
1084 let f: Box<F> = Box::new(f);
1085 glib::signal::connect_raw(
1086 self.as_ptr() as *mut _,
1087 b"notify::qos\0".as_ptr() as *const _,
1088 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1089 notify_qos_trampoline::<F> as *const (),
1090 )),
1091 Box::into_raw(f),
1092 )
1093 }
1094 }
1095
1096 #[doc(alias = "render-delay")]
1097 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1098 &self,
1099 f: F,
1100 ) -> glib::SignalHandlerId {
1101 unsafe extern "C" fn notify_render_delay_trampoline<
1102 F: Fn(&AppSink) + Send + Sync + 'static,
1103 >(
1104 this: *mut ffi::GstAppSink,
1105 _param_spec: glib::ffi::gpointer,
1106 f: glib::ffi::gpointer,
1107 ) {
1108 unsafe {
1109 let f: &F = &*(f as *const F);
1110 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1111 }
1112 }
1113 unsafe {
1114 let f: Box<F> = Box::new(f);
1115 glib::signal::connect_raw(
1116 self.as_ptr() as *mut _,
1117 b"notify::render-delay\0".as_ptr() as *const _,
1118 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1119 notify_render_delay_trampoline::<F> as *const (),
1120 )),
1121 Box::into_raw(f),
1122 )
1123 }
1124 }
1125
1126 #[cfg(feature = "v1_18")]
1127 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1128 #[doc(alias = "stats")]
1129 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1130 &self,
1131 f: F,
1132 ) -> glib::SignalHandlerId {
1133 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1134 this: *mut ffi::GstAppSink,
1135 _param_spec: glib::ffi::gpointer,
1136 f: glib::ffi::gpointer,
1137 ) {
1138 unsafe {
1139 let f: &F = &*(f as *const F);
1140 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1141 }
1142 }
1143 unsafe {
1144 let f: Box<F> = Box::new(f);
1145 glib::signal::connect_raw(
1146 self.as_ptr() as *mut _,
1147 b"notify::stats\0".as_ptr() as *const _,
1148 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1149 notify_stats_trampoline::<F> as *const (),
1150 )),
1151 Box::into_raw(f),
1152 )
1153 }
1154 }
1155
1156 #[doc(alias = "sync")]
1157 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1158 &self,
1159 f: F,
1160 ) -> glib::SignalHandlerId {
1161 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1162 this: *mut ffi::GstAppSink,
1163 _param_spec: glib::ffi::gpointer,
1164 f: glib::ffi::gpointer,
1165 ) {
1166 unsafe {
1167 let f: &F = &*(f as *const F);
1168 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1169 }
1170 }
1171 unsafe {
1172 let f: Box<F> = Box::new(f);
1173 glib::signal::connect_raw(
1174 self.as_ptr() as *mut _,
1175 b"notify::sync\0".as_ptr() as *const _,
1176 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1177 notify_sync_trampoline::<F> as *const (),
1178 )),
1179 Box::into_raw(f),
1180 )
1181 }
1182 }
1183
1184 #[doc(alias = "throttle-time")]
1185 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1186 &self,
1187 f: F,
1188 ) -> glib::SignalHandlerId {
1189 unsafe extern "C" fn notify_throttle_time_trampoline<
1190 F: Fn(&AppSink) + Send + Sync + 'static,
1191 >(
1192 this: *mut ffi::GstAppSink,
1193 _param_spec: glib::ffi::gpointer,
1194 f: glib::ffi::gpointer,
1195 ) {
1196 unsafe {
1197 let f: &F = &*(f as *const F);
1198 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1199 }
1200 }
1201 unsafe {
1202 let f: Box<F> = Box::new(f);
1203 glib::signal::connect_raw(
1204 self.as_ptr() as *mut _,
1205 b"notify::throttle-time\0".as_ptr() as *const _,
1206 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1207 notify_throttle_time_trampoline::<F> as *const (),
1208 )),
1209 Box::into_raw(f),
1210 )
1211 }
1212 }
1213
1214 #[doc(alias = "ts-offset")]
1215 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1216 &self,
1217 f: F,
1218 ) -> glib::SignalHandlerId {
1219 unsafe extern "C" fn notify_ts_offset_trampoline<
1220 F: Fn(&AppSink) + Send + Sync + 'static,
1221 >(
1222 this: *mut ffi::GstAppSink,
1223 _param_spec: glib::ffi::gpointer,
1224 f: glib::ffi::gpointer,
1225 ) {
1226 unsafe {
1227 let f: &F = &*(f as *const F);
1228 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1229 }
1230 }
1231 unsafe {
1232 let f: Box<F> = Box::new(f);
1233 glib::signal::connect_raw(
1234 self.as_ptr() as *mut _,
1235 b"notify::ts-offset\0".as_ptr() as *const _,
1236 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1237 notify_ts_offset_trampoline::<F> as *const (),
1238 )),
1239 Box::into_raw(f),
1240 )
1241 }
1242 }
1243
1244 pub fn stream(&self) -> AppSinkStream {
1245 AppSinkStream::new(self)
1246 }
1247}
1248
1249#[must_use = "The builder must be built to be used"]
1254pub struct AppSinkBuilder<'a> {
1255 builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1256 callbacks: Option<AppSinkCallbacks>,
1257 drop_out_of_segment: Option<bool>,
1258}
1259
1260impl<'a> AppSinkBuilder<'a> {
1261 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1269 pub fn build(self) -> AppSink {
1270 let appsink = self.builder.build().unwrap();
1271
1272 if let Some(callbacks) = self.callbacks {
1273 appsink.set_callbacks(callbacks);
1274 }
1275
1276 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1277 appsink.set_drop_out_of_segment(drop_out_of_segment);
1278 }
1279
1280 appsink
1281 }
1282
1283 pub fn async_(self, async_: bool) -> Self {
1284 Self {
1285 builder: self.builder.property("async", async_),
1286 ..self
1287 }
1288 }
1289
1290 pub fn buffer_list(self, buffer_list: bool) -> Self {
1291 Self {
1292 builder: self.builder.property("buffer-list", buffer_list),
1293 ..self
1294 }
1295 }
1296
1297 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1298 Self {
1299 callbacks: Some(callbacks),
1300 ..self
1301 }
1302 }
1303
1304 pub fn caps(self, caps: &'a gst::Caps) -> Self {
1305 Self {
1306 builder: self.builder.property("caps", caps),
1307 ..self
1308 }
1309 }
1310
1311 #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1312 #[allow(deprecated)]
1313 pub fn drop(self, drop: bool) -> Self {
1314 Self {
1315 builder: self.builder.property("drop", drop),
1316 ..self
1317 }
1318 }
1319
1320 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1321 Self {
1322 drop_out_of_segment: Some(drop_out_of_segment),
1323 ..self
1324 }
1325 }
1326
1327 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1328 Self {
1329 builder: self
1330 .builder
1331 .property("enable-last-sample", enable_last_sample),
1332 ..self
1333 }
1334 }
1335
1336 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1337 Self {
1338 builder: self.builder.property("max-bitrate", max_bitrate),
1339 ..self
1340 }
1341 }
1342
1343 pub fn max_buffers(self, max_buffers: u32) -> Self {
1344 Self {
1345 builder: self.builder.property("max-buffers", max_buffers),
1346 ..self
1347 }
1348 }
1349
1350 pub fn max_lateness(self, max_lateness: i64) -> Self {
1351 Self {
1352 builder: self.builder.property("max-lateness", max_lateness),
1353 ..self
1354 }
1355 }
1356
1357 #[cfg(feature = "v1_16")]
1358 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1359 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1360 Self {
1361 builder: self
1362 .builder
1363 .property("processing-deadline", processing_deadline),
1364 ..self
1365 }
1366 }
1367
1368 pub fn qos(self, qos: bool) -> Self {
1369 Self {
1370 builder: self.builder.property("qos", qos),
1371 ..self
1372 }
1373 }
1374
1375 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1376 Self {
1377 builder: self.builder.property("render-delay", render_delay),
1378 ..self
1379 }
1380 }
1381
1382 pub fn sync(self, sync: bool) -> Self {
1383 Self {
1384 builder: self.builder.property("sync", sync),
1385 ..self
1386 }
1387 }
1388
1389 pub fn throttle_time(self, throttle_time: u64) -> Self {
1390 Self {
1391 builder: self.builder.property("throttle-time", throttle_time),
1392 ..self
1393 }
1394 }
1395
1396 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1397 Self {
1398 builder: self.builder.property("ts-offset", ts_offset),
1399 ..self
1400 }
1401 }
1402
1403 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1404 Self {
1405 builder: self.builder.property("wait-on-eos", wait_on_eos),
1406 ..self
1407 }
1408 }
1409
1410 #[cfg(feature = "v1_24")]
1411 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1412 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1413 Self {
1414 builder: self.builder.property("max-time", max_time),
1415 ..self
1416 }
1417 }
1418
1419 #[cfg(feature = "v1_24")]
1420 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1421 pub fn max_bytes(self, max_bytes: u64) -> Self {
1422 Self {
1423 builder: self.builder.property("max-bytes", max_bytes),
1424 ..self
1425 }
1426 }
1427
1428 #[cfg(feature = "v1_28")]
1429 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1430 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1431 Self {
1432 builder: self.builder.property("leaky-type", leaky_type),
1433 ..self
1434 }
1435 }
1436
1437 #[cfg(feature = "v1_28")]
1438 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1439 pub fn silent(self, silent: bool) -> Self {
1440 Self {
1441 builder: self.builder.property("silent", silent),
1442 ..self
1443 }
1444 }
1445
1446 #[inline]
1451 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1452 Self {
1453 builder: self.builder.property(name, value),
1454 ..self
1455 }
1456 }
1457
1458 #[inline]
1461 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1462 Self {
1463 builder: self.builder.property_from_str(name, value),
1464 ..self
1465 }
1466 }
1467
1468 gst::impl_builder_gvalue_extra_setters!(property_and_name);
1469}
1470
1471#[derive(Debug)]
1472pub struct AppSinkStream {
1473 app_sink: glib::WeakRef<AppSink>,
1474 waker_reference: Arc<Mutex<Option<Waker>>>,
1475}
1476
1477impl AppSinkStream {
1478 fn new(app_sink: &AppSink) -> Self {
1479 skip_assert_initialized!();
1480
1481 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1482
1483 app_sink.set_callbacks(
1484 AppSinkCallbacks::builder()
1485 .new_sample({
1486 let waker_reference = Arc::clone(&waker_reference);
1487
1488 move |_| {
1489 if let Some(waker) = waker_reference.lock().unwrap().take() {
1490 waker.wake();
1491 }
1492
1493 Ok(gst::FlowSuccess::Ok)
1494 }
1495 })
1496 .eos({
1497 let waker_reference = Arc::clone(&waker_reference);
1498
1499 move |_| {
1500 if let Some(waker) = waker_reference.lock().unwrap().take() {
1501 waker.wake();
1502 }
1503 }
1504 })
1505 .build(),
1506 );
1507
1508 Self {
1509 app_sink: app_sink.downgrade(),
1510 waker_reference,
1511 }
1512 }
1513}
1514
1515impl Drop for AppSinkStream {
1516 fn drop(&mut self) {
1517 #[cfg(not(feature = "v1_18"))]
1518 {
1519 if gst::version() >= (1, 16, 3, 0)
1522 && let Some(app_sink) = self.app_sink.upgrade()
1523 {
1524 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1525 }
1526 }
1527 }
1528}
1529
1530impl Stream for AppSinkStream {
1531 type Item = gst::Sample;
1532
1533 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1534 let mut waker = self.waker_reference.lock().unwrap();
1535
1536 let Some(app_sink) = self.app_sink.upgrade() else {
1537 return Poll::Ready(None);
1538 };
1539
1540 app_sink
1541 .try_pull_sample(gst::ClockTime::ZERO)
1542 .map(|sample| Poll::Ready(Some(sample)))
1543 .unwrap_or_else(|| {
1544 if app_sink.is_eos() {
1545 return Poll::Ready(None);
1546 }
1547
1548 waker.replace(context.waker().to_owned());
1549
1550 Poll::Pending
1551 })
1552 }
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557 use futures_util::StreamExt;
1558 use gst::prelude::*;
1559
1560 use super::*;
1561
1562 #[test]
1563 fn test_app_sink_stream() {
1564 gst::init().unwrap();
1565
1566 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1567 .property("num-buffers", 5)
1568 .build()
1569 .unwrap();
1570 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1571
1572 let pipeline = gst::Pipeline::new();
1573 pipeline.add(&videotestsrc).unwrap();
1574 pipeline.add(&appsink).unwrap();
1575
1576 videotestsrc.link(&appsink).unwrap();
1577
1578 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1579 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1580
1581 pipeline.set_state(gst::State::Playing).unwrap();
1582 let samples = futures_executor::block_on(samples_future);
1583 pipeline.set_state(gst::State::Null).unwrap();
1584
1585 assert_eq!(samples.len(), 5);
1586 }
1587}