1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_sink::Sink;
15use glib::{
16 ffi::{gboolean, gpointer},
17 prelude::*,
18 translate::*,
19};
20
21use crate::{AppSrc, ffi};
22
23#[allow(clippy::type_complexity)]
24pub struct AppSrcCallbacks {
25 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
26 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
27 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
28 #[cfg(not(panic = "abort"))]
29 panicked: AtomicBool,
30 callbacks: ffi::GstAppSrcCallbacks,
31}
32
33unsafe impl Send for AppSrcCallbacks {}
34unsafe impl Sync for AppSrcCallbacks {}
35
36impl AppSrcCallbacks {
37 pub fn builder() -> AppSrcCallbacksBuilder {
38 skip_assert_initialized!();
39
40 AppSrcCallbacksBuilder {
41 need_data: None,
42 enough_data: None,
43 seek_data: None,
44 }
45 }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "The builder must be built to be used"]
50pub struct AppSrcCallbacksBuilder {
51 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
52 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
53 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
54}
55
56impl AppSrcCallbacksBuilder {
57 pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self {
58 Self {
59 need_data: Some(Box::new(need_data)),
60 ..self
61 }
62 }
63
64 pub fn need_data_if<F: FnMut(&AppSrc, u32) + Send + 'static>(
65 self,
66 need_data: F,
67 predicate: bool,
68 ) -> Self {
69 if predicate {
70 self.need_data(need_data)
71 } else {
72 self
73 }
74 }
75
76 pub fn need_data_if_some<F: FnMut(&AppSrc, u32) + Send + 'static>(
77 self,
78 need_data: Option<F>,
79 ) -> Self {
80 if let Some(need_data) = need_data {
81 self.need_data(need_data)
82 } else {
83 self
84 }
85 }
86
87 pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self {
88 Self {
89 enough_data: Some(Box::new(enough_data)),
90 ..self
91 }
92 }
93
94 pub fn enough_data_if<F: Fn(&AppSrc) + Send + Sync + 'static>(
95 self,
96 enough_data: F,
97 predicate: bool,
98 ) -> Self {
99 if predicate {
100 self.enough_data(enough_data)
101 } else {
102 self
103 }
104 }
105
106 pub fn enough_data_if_some<F: Fn(&AppSrc) + Send + Sync + 'static>(
107 self,
108 enough_data: Option<F>,
109 ) -> Self {
110 if let Some(enough_data) = enough_data {
111 self.enough_data(enough_data)
112 } else {
113 self
114 }
115 }
116
117 pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
118 self,
119 seek_data: F,
120 ) -> Self {
121 Self {
122 seek_data: Some(Box::new(seek_data)),
123 ..self
124 }
125 }
126
127 pub fn seek_data_if<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
128 self,
129 seek_data: F,
130 predicate: bool,
131 ) -> Self {
132 if predicate {
133 self.seek_data(seek_data)
134 } else {
135 self
136 }
137 }
138
139 pub fn seek_data_if_some<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
140 self,
141 seek_data: Option<F>,
142 ) -> Self {
143 if let Some(seek_data) = seek_data {
144 self.seek_data(seek_data)
145 } else {
146 self
147 }
148 }
149
150 #[must_use = "Building the callbacks without using them has no effect"]
151 pub fn build(self) -> AppSrcCallbacks {
152 let have_need_data = self.need_data.is_some();
153 let have_enough_data = self.enough_data.is_some();
154 let have_seek_data = self.seek_data.is_some();
155
156 AppSrcCallbacks {
157 need_data: self.need_data,
158 enough_data: self.enough_data,
159 seek_data: self.seek_data,
160 #[cfg(not(panic = "abort"))]
161 panicked: AtomicBool::new(false),
162 callbacks: ffi::GstAppSrcCallbacks {
163 need_data: if have_need_data {
164 Some(trampoline_need_data)
165 } else {
166 None
167 },
168 enough_data: if have_enough_data {
169 Some(trampoline_enough_data)
170 } else {
171 None
172 },
173 seek_data: if have_seek_data {
174 Some(trampoline_seek_data)
175 } else {
176 None
177 },
178 _gst_reserved: [
179 ptr::null_mut(),
180 ptr::null_mut(),
181 ptr::null_mut(),
182 ptr::null_mut(),
183 ],
184 },
185 }
186 }
187}
188
189unsafe extern "C" fn trampoline_need_data(
190 appsrc: *mut ffi::GstAppSrc,
191 length: u32,
192 callbacks: gpointer,
193) {
194 unsafe {
195 let callbacks = callbacks as *mut AppSrcCallbacks;
196 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
197
198 #[cfg(not(panic = "abort"))]
199 if (*callbacks).panicked.load(Ordering::Relaxed) {
200 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
201 gst::subclass::post_panic_error_message(
202 element.upcast_ref(),
203 element.upcast_ref(),
204 None,
205 );
206 return;
207 }
208
209 if let Some(ref mut need_data) = (*callbacks).need_data {
210 let result =
211 panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length)));
212 match result {
213 Ok(result) => result,
214 Err(err) => {
215 #[cfg(panic = "abort")]
216 {
217 unreachable!("{err:?}");
218 }
219 #[cfg(not(panic = "abort"))]
220 {
221 (*callbacks).panicked.store(true, Ordering::Relaxed);
222 gst::subclass::post_panic_error_message(
223 element.upcast_ref(),
224 element.upcast_ref(),
225 Some(err),
226 );
227 }
228 }
229 }
230 }
231 }
232}
233
234unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) {
235 unsafe {
236 let callbacks = callbacks as *const AppSrcCallbacks;
237 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
238
239 #[cfg(not(panic = "abort"))]
240 if (*callbacks).panicked.load(Ordering::Relaxed) {
241 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
242 gst::subclass::post_panic_error_message(
243 element.upcast_ref(),
244 element.upcast_ref(),
245 None,
246 );
247 return;
248 }
249
250 if let Some(ref enough_data) = (*callbacks).enough_data {
251 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element)));
252 match result {
253 Ok(result) => result,
254 Err(err) => {
255 #[cfg(panic = "abort")]
256 {
257 unreachable!("{err:?}");
258 }
259 #[cfg(not(panic = "abort"))]
260 {
261 (*callbacks).panicked.store(true, Ordering::Relaxed);
262 gst::subclass::post_panic_error_message(
263 element.upcast_ref(),
264 element.upcast_ref(),
265 Some(err),
266 );
267 }
268 }
269 }
270 }
271 }
272}
273
274unsafe extern "C" fn trampoline_seek_data(
275 appsrc: *mut ffi::GstAppSrc,
276 offset: u64,
277 callbacks: gpointer,
278) -> gboolean {
279 unsafe {
280 let callbacks = callbacks as *const AppSrcCallbacks;
281 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
282
283 #[cfg(not(panic = "abort"))]
284 if (*callbacks).panicked.load(Ordering::Relaxed) {
285 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
286 gst::subclass::post_panic_error_message(
287 element.upcast_ref(),
288 element.upcast_ref(),
289 None,
290 );
291 return false.into_glib();
292 }
293
294 let ret = if let Some(ref seek_data) = (*callbacks).seek_data {
295 let result =
296 panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset)));
297 match result {
298 Ok(result) => result,
299 Err(err) => {
300 #[cfg(panic = "abort")]
301 {
302 unreachable!("{err:?}");
303 }
304 #[cfg(not(panic = "abort"))]
305 {
306 (*callbacks).panicked.store(true, Ordering::Relaxed);
307 gst::subclass::post_panic_error_message(
308 element.upcast_ref(),
309 element.upcast_ref(),
310 Some(err),
311 );
312
313 false
314 }
315 }
316 }
317 } else {
318 false
319 };
320
321 ret.into_glib()
322 }
323}
324
325unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
326 unsafe {
327 let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _);
328 }
329}
330
331impl AppSrc {
332 pub fn builder<'a>() -> AppSrcBuilder<'a> {
337 assert_initialized_main_thread!();
338 AppSrcBuilder {
339 builder: gst::Object::builder(),
340 callbacks: None,
341 automatic_eos: None,
342 }
343 }
344
345 #[doc(alias = "gst_app_src_set_callbacks")]
360 pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
361 unsafe {
362 let src = self.to_glib_none().0;
363 #[allow(clippy::manual_dangling_ptr)]
364 #[cfg(not(feature = "v1_18"))]
365 {
366 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
367 std::sync::OnceLock::new();
368
369 let set_once_quark = SET_ONCE_QUARK
370 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks"));
371
372 if gst::version() < (1, 16, 3, 0) {
375 if !glib::gobject_ffi::g_object_get_qdata(
376 src as *mut _,
377 set_once_quark.into_glib(),
378 )
379 .is_null()
380 {
381 panic!("AppSrc callbacks can only be set once");
382 }
383
384 glib::gobject_ffi::g_object_set_qdata(
385 src as *mut _,
386 set_once_quark.into_glib(),
387 1 as *mut _,
388 );
389 }
390 }
391
392 ffi::gst_app_src_set_callbacks(
393 src,
394 mut_override(&callbacks.callbacks),
395 Box::into_raw(Box::new(callbacks)) as *mut _,
396 Some(destroy_callbacks),
397 );
398 }
399 }
400
401 #[doc(alias = "gst_app_src_set_latency")]
408 pub fn set_latency(
409 &self,
410 min: impl Into<Option<gst::ClockTime>>,
411 max: impl Into<Option<gst::ClockTime>>,
412 ) {
413 unsafe {
414 ffi::gst_app_src_set_latency(
415 self.to_glib_none().0,
416 min.into().into_glib(),
417 max.into().into_glib(),
418 );
419 }
420 }
421
422 #[doc(alias = "get_latency")]
433 #[doc(alias = "gst_app_src_get_latency")]
434 pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) {
435 unsafe {
436 let mut min = mem::MaybeUninit::uninit();
437 let mut max = mem::MaybeUninit::uninit();
438 ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr());
439 (from_glib(min.assume_init()), from_glib(max.assume_init()))
440 }
441 }
442
443 #[doc(alias = "do-timestamp")]
444 #[doc(alias = "gst_base_src_set_do_timestamp")]
445 pub fn set_do_timestamp(&self, timestamp: bool) {
446 unsafe {
447 gst_base::ffi::gst_base_src_set_do_timestamp(
448 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
449 timestamp.into_glib(),
450 );
451 }
452 }
453
454 #[doc(alias = "do-timestamp")]
455 #[doc(alias = "gst_base_src_get_do_timestamp")]
456 pub fn do_timestamp(&self) -> bool {
457 unsafe {
458 from_glib(gst_base::ffi::gst_base_src_get_do_timestamp(
459 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc
460 ))
461 }
462 }
463
464 #[doc(alias = "do-timestamp")]
465 pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>(
466 &self,
467 f: F,
468 ) -> glib::SignalHandlerId {
469 unsafe extern "C" fn notify_do_timestamp_trampoline<
470 F: Fn(&AppSrc) + Send + Sync + 'static,
471 >(
472 this: *mut ffi::GstAppSrc,
473 _param_spec: glib::ffi::gpointer,
474 f: glib::ffi::gpointer,
475 ) {
476 unsafe {
477 let f: &F = &*(f as *const F);
478 f(&AppSrc::from_glib_borrow(this))
479 }
480 }
481 unsafe {
482 let f: Box<F> = Box::new(f);
483 glib::signal::connect_raw(
484 self.as_ptr() as *mut _,
485 b"notify::do-timestamp\0".as_ptr() as *const _,
486 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
487 notify_do_timestamp_trampoline::<F> as *const (),
488 )),
489 Box::into_raw(f),
490 )
491 }
492 }
493
494 #[doc(alias = "set-automatic-eos")]
495 #[doc(alias = "gst_base_src_set_automatic_eos")]
496 pub fn set_automatic_eos(&self, automatic_eos: bool) {
497 unsafe {
498 gst_base::ffi::gst_base_src_set_automatic_eos(
499 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
500 automatic_eos.into_glib(),
501 );
502 }
503 }
504
505 pub fn sink(&self) -> AppSrcSink {
506 AppSrcSink::new(self)
507 }
508}
509
510#[must_use = "The builder must be built to be used"]
515pub struct AppSrcBuilder<'a> {
516 builder: gst::gobject::GObjectBuilder<'a, AppSrc>,
517 callbacks: Option<AppSrcCallbacks>,
518 automatic_eos: Option<bool>,
519}
520
521impl<'a> AppSrcBuilder<'a> {
522 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
530 pub fn build(self) -> AppSrc {
531 let appsrc = self.builder.build().unwrap();
532
533 if let Some(callbacks) = self.callbacks {
534 appsrc.set_callbacks(callbacks);
535 }
536
537 if let Some(automatic_eos) = self.automatic_eos {
538 appsrc.set_automatic_eos(automatic_eos);
539 }
540
541 appsrc
542 }
543
544 pub fn automatic_eos(self, automatic_eos: bool) -> Self {
545 Self {
546 automatic_eos: Some(automatic_eos),
547 ..self
548 }
549 }
550
551 pub fn block(self, block: bool) -> Self {
552 Self {
553 builder: self.builder.property("block", block),
554 ..self
555 }
556 }
557
558 pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self {
559 Self {
560 callbacks: Some(callbacks),
561 ..self
562 }
563 }
564
565 pub fn caps(self, caps: &'a gst::Caps) -> Self {
566 Self {
567 builder: self.builder.property("caps", caps),
568 ..self
569 }
570 }
571
572 pub fn do_timestamp(self, do_timestamp: bool) -> Self {
573 Self {
574 builder: self.builder.property("do-timestamp", do_timestamp),
575 ..self
576 }
577 }
578
579 pub fn duration(self, duration: u64) -> Self {
580 Self {
581 builder: self.builder.property("duration", duration),
582 ..self
583 }
584 }
585
586 pub fn format(self, format: gst::Format) -> Self {
587 Self {
588 builder: self.builder.property("format", format),
589 ..self
590 }
591 }
592
593 #[cfg(feature = "v1_18")]
594 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
595 pub fn handle_segment_change(self, handle_segment_change: bool) -> Self {
596 Self {
597 builder: self
598 .builder
599 .property("handle-segment-change", handle_segment_change),
600 ..self
601 }
602 }
603
604 pub fn is_live(self, is_live: bool) -> Self {
605 Self {
606 builder: self.builder.property("is-live", is_live),
607 ..self
608 }
609 }
610
611 #[cfg(feature = "v1_20")]
612 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
613 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
614 Self {
615 builder: self.builder.property("leaky-type", leaky_type),
616 ..self
617 }
618 }
619
620 #[cfg(feature = "v1_20")]
621 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
622 pub fn max_buffers(self, max_buffers: u64) -> Self {
623 Self {
624 builder: self.builder.property("max-buffers", max_buffers),
625 ..self
626 }
627 }
628
629 pub fn max_bytes(self, max_bytes: u64) -> Self {
630 Self {
631 builder: self.builder.property("max-bytes", max_bytes),
632 ..self
633 }
634 }
635
636 pub fn max_latency(self, max_latency: i64) -> Self {
637 Self {
638 builder: self.builder.property("max-latency", max_latency),
639 ..self
640 }
641 }
642
643 #[cfg(feature = "v1_20")]
644 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
645 pub fn max_time(self, max_time: gst::ClockTime) -> Self {
646 Self {
647 builder: self.builder.property("max-time", max_time),
648 ..self
649 }
650 }
651
652 pub fn min_latency(self, min_latency: i64) -> Self {
653 Self {
654 builder: self.builder.property("min-latency", min_latency),
655 ..self
656 }
657 }
658
659 pub fn min_percent(self, min_percent: u32) -> Self {
660 Self {
661 builder: self.builder.property("min-percent", min_percent),
662 ..self
663 }
664 }
665
666 pub fn size(self, size: i64) -> Self {
667 Self {
668 builder: self.builder.property("size", size),
669 ..self
670 }
671 }
672
673 pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self {
674 Self {
675 builder: self.builder.property("stream-type", stream_type),
676 ..self
677 }
678 }
679
680 #[cfg(feature = "v1_28")]
681 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
682 pub fn silent(self, silent: bool) -> Self {
683 Self {
684 builder: self.builder.property("silent", silent),
685 ..self
686 }
687 }
688
689 #[inline]
694 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
695 Self {
696 builder: self.builder.property(name, value),
697 ..self
698 }
699 }
700
701 #[inline]
704 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
705 Self {
706 builder: self.builder.property_from_str(name, value),
707 ..self
708 }
709 }
710
711 gst::impl_builder_gvalue_extra_setters!(property_and_name);
712}
713
714#[derive(Debug)]
715pub struct AppSrcSink {
716 app_src: glib::WeakRef<AppSrc>,
717 waker_reference: Arc<Mutex<Option<Waker>>>,
718}
719
720impl AppSrcSink {
721 fn new(app_src: &AppSrc) -> Self {
722 skip_assert_initialized!();
723
724 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
725
726 app_src.set_callbacks(
727 AppSrcCallbacks::builder()
728 .need_data({
729 let waker_reference = Arc::clone(&waker_reference);
730
731 move |_, _| {
732 if let Some(waker) = waker_reference.lock().unwrap().take() {
733 waker.wake();
734 }
735 }
736 })
737 .build(),
738 );
739
740 Self {
741 app_src: app_src.downgrade(),
742 waker_reference,
743 }
744 }
745}
746
747impl Drop for AppSrcSink {
748 fn drop(&mut self) {
749 #[cfg(not(feature = "v1_18"))]
750 {
751 if gst::version() >= (1, 16, 3, 0)
754 && let Some(app_src) = self.app_src.upgrade()
755 {
756 app_src.set_callbacks(AppSrcCallbacks::builder().build());
757 }
758 }
759 }
760}
761
762impl Sink<gst::Sample> for AppSrcSink {
763 type Error = gst::FlowError;
764
765 fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
766 let mut waker = self.waker_reference.lock().unwrap();
767
768 let Some(app_src) = self.app_src.upgrade() else {
769 return Poll::Ready(Err(gst::FlowError::Eos));
770 };
771
772 let current_level_bytes = app_src.current_level_bytes();
773 let max_bytes = app_src.max_bytes();
774
775 if current_level_bytes >= max_bytes && max_bytes != 0 {
776 waker.replace(context.waker().to_owned());
777
778 Poll::Pending
779 } else {
780 Poll::Ready(Ok(()))
781 }
782 }
783
784 fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
785 let Some(app_src) = self.app_src.upgrade() else {
786 return Err(gst::FlowError::Eos);
787 };
788
789 app_src.push_sample(&sample)?;
790
791 Ok(())
792 }
793
794 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
795 Poll::Ready(Ok(()))
796 }
797
798 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
799 let Some(app_src) = self.app_src.upgrade() else {
800 return Poll::Ready(Ok(()));
801 };
802
803 app_src.end_of_stream()?;
804
805 Poll::Ready(Ok(()))
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use std::sync::atomic::{AtomicUsize, Ordering};
812
813 use futures_util::{sink::SinkExt, stream::StreamExt};
814 use gst::prelude::*;
815
816 use super::*;
817
818 #[test]
819 fn test_app_src_sink() {
820 gst::init().unwrap();
821
822 let appsrc = gst::ElementFactory::make("appsrc").build().unwrap();
823 let fakesink = gst::ElementFactory::make("fakesink")
824 .property("signal-handoffs", true)
825 .build()
826 .unwrap();
827
828 let pipeline = gst::Pipeline::new();
829 pipeline.add(&appsrc).unwrap();
830 pipeline.add(&fakesink).unwrap();
831
832 appsrc.link(&fakesink).unwrap();
833
834 let mut bus_stream = pipeline.bus().unwrap().stream();
835 let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
836
837 let sample_quantity = 5;
838
839 let samples = (0..sample_quantity)
840 .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build())
841 .collect::<Vec<gst::Sample>>();
842
843 let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
844
845 let handoff_count_reference = Arc::new(AtomicUsize::new(0));
846
847 fakesink.connect("handoff", false, {
848 let handoff_count_reference = Arc::clone(&handoff_count_reference);
849
850 move |_| {
851 handoff_count_reference.fetch_add(1, Ordering::AcqRel);
852
853 None
854 }
855 });
856
857 pipeline.set_state(gst::State::Playing).unwrap();
858
859 futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
860 futures_executor::block_on(app_src_sink.close()).unwrap();
861
862 while let Some(message) = futures_executor::block_on(bus_stream.next()) {
863 match message.view() {
864 gst::MessageView::Eos(_) => break,
865 gst::MessageView::Error(_) => unreachable!(),
866 _ => continue,
867 }
868 }
869
870 pipeline.set_state(gst::State::Null).unwrap();
871
872 assert_eq!(
873 handoff_count_reference.load(Ordering::Acquire),
874 sample_quantity
875 );
876 }
877
878 #[test]
879 fn builder_caps_lt() {
880 gst::init().unwrap();
881
882 let caps = &gst::Caps::new_any();
883 {
884 let stream_type = "random-access".to_owned();
885 let appsrc = AppSrc::builder()
886 .property_from_str("stream-type", &stream_type)
887 .caps(caps)
888 .build();
889 assert_eq!(
890 appsrc.property::<crate::AppStreamType>("stream-type"),
891 crate::AppStreamType::RandomAccess
892 );
893 assert!(appsrc.property::<gst::Caps>("caps").is_any());
894 }
895
896 let stream_type = &"random-access".to_owned();
897 {
898 let caps = &gst::Caps::new_any();
899 let appsrc = AppSrc::builder()
900 .property_from_str("stream-type", stream_type)
901 .caps(caps)
902 .build();
903 assert_eq!(
904 appsrc.property::<crate::AppStreamType>("stream-type"),
905 crate::AppStreamType::RandomAccess
906 );
907 assert!(appsrc.property::<gst::Caps>("caps").is_any());
908 }
909 }
910}