gstreamer/subclass/
buffer_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6    prelude::*,
7    subclass::{prelude::*, InitializingObject},
8    translate::*,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{ffi, BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef};
14
15pub trait BufferPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<BufferPool>> {
16    /// Acquires a buffer from `self`. `buffer` should point to a memory location that
17    /// can hold a pointer to the new buffer. When the pool is empty, this function
18    /// will by default block until a buffer is released into the pool again or when
19    /// the pool is set to flushing or deactivated.
20    ///
21    /// `params` can contain optional parameters to influence the allocation.
22    /// ## `params`
23    /// parameters.
24    ///
25    /// # Returns
26    ///
27    /// a [`FlowReturn`][crate::FlowReturn] such as [`FlowReturn::Flushing`][crate::FlowReturn::Flushing] when the pool is
28    /// inactive.
29    ///
30    /// ## `buffer`
31    /// a location for a [`Buffer`][crate::Buffer]
32    fn acquire_buffer(
33        &self,
34        params: Option<&BufferPoolAcquireParams>,
35    ) -> Result<crate::Buffer, crate::FlowError> {
36        self.parent_acquire_buffer(params)
37    }
38
39    /// Allocate a buffer. the default implementation allocates
40    /// buffers from the configured memory allocator and with the configured
41    /// parameters. All metadata that is present on the allocated buffer will
42    /// be marked as [`MetaFlags::POOLED`][crate::MetaFlags::POOLED] and [`MetaFlags::LOCKED`][crate::MetaFlags::LOCKED] and will
43    /// not be removed from the buffer in `GstBufferPoolClass::reset_buffer`.
44    /// The buffer should have the [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] cleared.
45    /// ## `params`
46    /// parameters.
47    ///
48    /// # Returns
49    ///
50    /// a [`FlowReturn`][crate::FlowReturn] to indicate whether the allocation was
51    /// successful.
52    ///
53    /// ## `buffer`
54    /// a location for a [`Buffer`][crate::Buffer]
55    fn alloc_buffer(
56        &self,
57        params: Option<&BufferPoolAcquireParams>,
58    ) -> Result<crate::Buffer, crate::FlowError> {
59        self.parent_alloc_buffer(params)
60    }
61
62    /// Enter the flushing state.
63    fn flush_start(&self) {
64        self.parent_flush_start()
65    }
66
67    /// Leave the flushing state.
68    fn flush_stop(&self) {
69        self.parent_flush_stop()
70    }
71
72    /// Free a buffer. The default implementation unrefs the buffer.
73    /// ## `buffer`
74    /// the [`Buffer`][crate::Buffer] to free
75    fn free_buffer(&self, buffer: crate::Buffer) {
76        self.parent_free_buffer(buffer)
77    }
78
79    /// Releases `buffer` to `self`. `buffer` should have previously been allocated from
80    /// `self` with [`BufferPoolExtManual::acquire_buffer()`][crate::prelude::BufferPoolExtManual::acquire_buffer()].
81    ///
82    /// This function is usually called automatically when the last ref on `buffer`
83    /// disappears.
84    /// ## `buffer`
85    /// a [`Buffer`][crate::Buffer]
86    fn release_buffer(&self, buffer: crate::Buffer) {
87        self.parent_release_buffer(buffer)
88    }
89
90    /// Reset the buffer to its state when it was freshly allocated.
91    /// The default implementation will clear the flags, timestamps and
92    /// will remove the metadata without the [`MetaFlags::POOLED`][crate::MetaFlags::POOLED] flag (even
93    /// the metadata with [`MetaFlags::LOCKED`][crate::MetaFlags::LOCKED]). If the
94    /// [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] was set, this function can also try to
95    /// restore the memory and clear the [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] again.
96    /// ## `buffer`
97    /// the [`Buffer`][crate::Buffer] to reset
98    fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
99        self.parent_reset_buffer(buffer)
100    }
101
102    /// Start the bufferpool. The default implementation will preallocate
103    /// min-buffers buffers and put them in the queue.
104    ///
105    /// Subclasses do not need to chain up to the parent's default implementation
106    /// if they don't want min-buffers based preallocation.
107    ///
108    /// # Returns
109    ///
110    /// whether the pool could be started.
111    fn start(&self) -> bool {
112        self.parent_start()
113    }
114
115    /// Stop the bufferpool. the default implementation will free the
116    /// preallocated buffers. This function is called when all the buffers are
117    /// returned to the pool.
118    ///
119    /// # Returns
120    ///
121    /// whether the pool could be stopped.
122    fn stop(&self) -> bool {
123        self.parent_stop()
124    }
125
126    /// Gets a [`None`] terminated array of string with supported bufferpool options for
127    /// `self`. An option would typically be enabled with
128    /// `gst_buffer_pool_config_add_option()`.
129    ///
130    /// # Returns
131    ///
132    /// a [`None`] terminated array
133    ///  of strings.
134    fn options() -> &'static [&'static str] {
135        &[]
136    }
137
138    /// Sets the configuration of the pool. If the pool is already configured, and
139    /// the configuration hasn't changed, this function will return [`true`]. If the
140    /// pool is active, this method will return [`false`] and active configuration
141    /// will remain. Buffers allocated from this pool must be returned or else this
142    /// function will do nothing and return [`false`].
143    ///
144    /// `config` is a [`Structure`][crate::Structure] that contains the configuration parameters for
145    /// the pool. A default and mandatory set of parameters can be configured with
146    /// `gst_buffer_pool_config_set_params()`, `gst_buffer_pool_config_set_allocator()`
147    /// and `gst_buffer_pool_config_add_option()`.
148    ///
149    /// If the parameters in `config` can not be set exactly, this function returns
150    /// [`false`] and will try to update as much state as possible. The new state can
151    /// then be retrieved and refined with [`BufferPoolExtManual::config()`][crate::prelude::BufferPoolExtManual::config()].
152    ///
153    /// This function takes ownership of `config`.
154    /// ## `config`
155    /// a [`Structure`][crate::Structure]
156    ///
157    /// # Returns
158    ///
159    /// [`true`] when the configuration could be set.
160    fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
161        self.parent_set_config(config)
162    }
163}
164
165pub trait BufferPoolImplExt: BufferPoolImpl {
166    fn parent_acquire_buffer(
167        &self,
168        params: Option<&BufferPoolAcquireParams>,
169    ) -> Result<crate::Buffer, crate::FlowError> {
170        unsafe {
171            let data = Self::type_data();
172            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
173            if let Some(f) = (*parent_class).acquire_buffer {
174                let params_ptr = mut_override(params.to_glib_none().0);
175                let mut buffer = std::ptr::null_mut();
176
177                let result = f(
178                    self.obj()
179                        .unsafe_cast_ref::<crate::BufferPool>()
180                        .to_glib_none()
181                        .0,
182                    &mut buffer,
183                    params_ptr,
184                );
185
186                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
187            } else {
188                Err(crate::FlowError::NotSupported)
189            }
190        }
191    }
192
193    fn parent_alloc_buffer(
194        &self,
195        params: Option<&BufferPoolAcquireParams>,
196    ) -> Result<crate::Buffer, crate::FlowError> {
197        unsafe {
198            let data = Self::type_data();
199            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
200            if let Some(f) = (*parent_class).alloc_buffer {
201                let params_ptr = mut_override(params.to_glib_none().0);
202                let mut buffer = std::ptr::null_mut();
203
204                let result = f(
205                    self.obj()
206                        .unsafe_cast_ref::<crate::BufferPool>()
207                        .to_glib_none()
208                        .0,
209                    &mut buffer,
210                    params_ptr,
211                );
212
213                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
214            } else {
215                Err(crate::FlowError::NotSupported)
216            }
217        }
218    }
219
220    fn parent_free_buffer(&self, buffer: crate::Buffer) {
221        unsafe {
222            let data = Self::type_data();
223            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
224            if let Some(f) = (*parent_class).free_buffer {
225                f(
226                    self.obj()
227                        .unsafe_cast_ref::<crate::BufferPool>()
228                        .to_glib_none()
229                        .0,
230                    buffer.into_glib_ptr(),
231                )
232            }
233        }
234    }
235
236    fn parent_release_buffer(&self, buffer: crate::Buffer) {
237        unsafe {
238            let data = Self::type_data();
239            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
240            if let Some(f) = (*parent_class).release_buffer {
241                f(
242                    self.obj()
243                        .unsafe_cast_ref::<crate::BufferPool>()
244                        .to_glib_none()
245                        .0,
246                    buffer.into_glib_ptr(),
247                )
248            }
249        }
250    }
251
252    fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
253        unsafe {
254            let data = Self::type_data();
255            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
256            if let Some(f) = (*parent_class).reset_buffer {
257                f(
258                    self.obj()
259                        .unsafe_cast_ref::<crate::BufferPool>()
260                        .to_glib_none()
261                        .0,
262                    buffer.as_mut_ptr(),
263                )
264            }
265        }
266    }
267
268    fn parent_start(&self) -> bool {
269        unsafe {
270            let data = Self::type_data();
271            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
272            if let Some(f) = (*parent_class).start {
273                let result = f(self
274                    .obj()
275                    .unsafe_cast_ref::<crate::BufferPool>()
276                    .to_glib_none()
277                    .0);
278
279                from_glib(result)
280            } else {
281                true
282            }
283        }
284    }
285
286    fn parent_stop(&self) -> bool {
287        unsafe {
288            let data = Self::type_data();
289            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
290            if let Some(f) = (*parent_class).stop {
291                let result = f(self
292                    .obj()
293                    .unsafe_cast_ref::<crate::BufferPool>()
294                    .to_glib_none()
295                    .0);
296
297                from_glib(result)
298            } else {
299                true
300            }
301        }
302    }
303
304    fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
305        unsafe {
306            let data = Self::type_data();
307            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
308            if let Some(f) = (*parent_class).set_config {
309                let result = f(
310                    self.obj()
311                        .unsafe_cast_ref::<crate::BufferPool>()
312                        .to_glib_none()
313                        .0,
314                    (*config).as_mut_ptr(),
315                );
316
317                from_glib(result)
318            } else {
319                false
320            }
321        }
322    }
323
324    fn parent_flush_start(&self) {
325        unsafe {
326            let data = Self::type_data();
327            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
328            if let Some(f) = (*parent_class).flush_start {
329                f(self
330                    .obj()
331                    .unsafe_cast_ref::<crate::BufferPool>()
332                    .to_glib_none()
333                    .0)
334            }
335        }
336    }
337
338    fn parent_flush_stop(&self) {
339        unsafe {
340            let data = Self::type_data();
341            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
342            if let Some(f) = (*parent_class).flush_stop {
343                f(self
344                    .obj()
345                    .unsafe_cast_ref::<crate::BufferPool>()
346                    .to_glib_none()
347                    .0)
348            }
349        }
350    }
351}
352
353impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
354
355unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
356    fn class_init(klass: &mut glib::Class<Self>) {
357        Self::parent_class_init::<T>(klass);
358        let klass = klass.as_mut();
359        klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
360        klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
361        klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
362        klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
363        klass.start = Some(buffer_pool_start::<T>);
364        klass.stop = Some(buffer_pool_stop::<T>);
365        klass.get_options = Some(buffer_pool_get_options::<T>);
366        klass.set_config = Some(buffer_pool_set_config::<T>);
367        klass.flush_start = Some(buffer_pool_flush_start::<T>);
368        klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
369        klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
370    }
371
372    fn instance_init(instance: &mut InitializingObject<T>) {
373        Self::parent_instance_init(instance);
374
375        // Store the pool options in the instance data
376        // for later retrieval in buffer_pool_get_options
377        let options = T::options();
378        instance.set_instance_data(T::type_(), glib::StrV::from(options));
379    }
380}
381
382unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
383    ptr: *mut ffi::GstBufferPool,
384    buffer: *mut *mut ffi::GstBuffer,
385    params: *mut ffi::GstBufferPoolAcquireParams,
386) -> ffi::GstFlowReturn {
387    let instance = &*(ptr as *mut T::Instance);
388    let imp = instance.imp();
389    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
390
391    match imp.acquire_buffer(params.as_ref()) {
392        Ok(b) => {
393            *buffer = b.into_glib_ptr();
394            ffi::GST_FLOW_OK
395        }
396        Err(err) => err.into_glib(),
397    }
398}
399
400unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
401    ptr: *mut ffi::GstBufferPool,
402    buffer: *mut *mut ffi::GstBuffer,
403    params: *mut ffi::GstBufferPoolAcquireParams,
404) -> ffi::GstFlowReturn {
405    let instance = &*(ptr as *mut T::Instance);
406    let imp = instance.imp();
407    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
408
409    match imp.alloc_buffer(params.as_ref()) {
410        Ok(b) => {
411            *buffer = b.into_glib_ptr();
412            ffi::GST_FLOW_OK
413        }
414        Err(err) => err.into_glib(),
415    }
416}
417
418unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
419    // the GstBufferPool implementation calls this
420    // in finalize where the ref_count will already
421    // be zero and we are actually destroyed
422    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
423    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
424        // flush_start is a no-op in GstBufferPool
425        return;
426    }
427
428    let instance = &*(ptr as *mut T::Instance);
429    let imp = instance.imp();
430    imp.flush_start();
431}
432
433unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
434    // the GstBufferPool implementation calls this
435    // in finalize where the ref_count will already
436    // be zero and we are actually destroyed
437    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
438    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
439        // flush_stop is a no-op in GstBufferPool
440        return;
441    }
442
443    let instance = &*(ptr as *mut T::Instance);
444    let imp = instance.imp();
445    imp.flush_stop();
446}
447
448unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
449    ptr: *mut ffi::GstBufferPool,
450    buffer: *mut ffi::GstBuffer,
451) {
452    // the GstBufferPool implementation calls this
453    // in finalize where the ref_count will already
454    // be zero and we are actually destroyed
455    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
456    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
457        // As a workaround we call free_buffer directly on the
458        // GstBufferPool to prevent leaking the buffer
459        // This will NOT call free_buffer on a subclass.
460        let pool_class =
461            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
462        let pool_class = pool_class.as_ref();
463        if let Some(f) = pool_class.free_buffer {
464            f(ptr, buffer)
465        }
466        return;
467    }
468
469    let instance = &*(ptr as *mut T::Instance);
470    let imp = instance.imp();
471    imp.free_buffer(from_glib_full(buffer));
472}
473
474unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
475    ptr: *mut ffi::GstBufferPool,
476    buffer: *mut ffi::GstBuffer,
477) {
478    let instance = &*(ptr as *mut T::Instance);
479    let imp = instance.imp();
480    imp.release_buffer(from_glib_full(buffer));
481}
482
483unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
484    ptr: *mut ffi::GstBufferPool,
485    buffer: *mut ffi::GstBuffer,
486) {
487    let instance = &*(ptr as *mut T::Instance);
488    let imp = instance.imp();
489    imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
490}
491
492unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
493    ptr: *mut ffi::GstBufferPool,
494) -> glib::ffi::gboolean {
495    let instance = &*(ptr as *mut T::Instance);
496    let imp = instance.imp();
497    imp.start().into_glib()
498}
499
500unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
501    ptr: *mut ffi::GstBufferPool,
502) -> glib::ffi::gboolean {
503    // the GstBufferPool implementation calls this
504    // in finalize where the ref_count will already
505    // be zero and we are actually destroyed
506    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
507    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
508        // As a workaround we call stop directly on the GstBufferPool
509        // This is needed because the default implementation clears
510        // the pool in stop.
511        let pool_class =
512            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
513        let pool_class = pool_class.as_ref();
514        let result = if let Some(f) = pool_class.stop {
515            f(ptr)
516        } else {
517            true.into_glib()
518        };
519
520        return result;
521    }
522
523    let instance = &*(ptr as *mut T::Instance);
524    let imp = instance.imp();
525    imp.stop().into_glib()
526}
527
528unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
529    ptr: *mut ffi::GstBufferPool,
530) -> *mut *const c_char {
531    let instance = &*(ptr as *mut T::Instance);
532    let imp = instance.imp();
533    T::instance_data::<glib::StrV>(imp, T::type_())
534        .map(|p| p.as_ptr() as *mut *const _)
535        .unwrap_or(ptr::null_mut())
536}
537
538unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
539    ptr: *mut ffi::GstBufferPool,
540    config: *mut ffi::GstStructure,
541) -> glib::ffi::gboolean {
542    let instance = &*(ptr as *mut T::Instance);
543    let imp = instance.imp();
544    imp.set_config(BufferPoolConfigRef::from_glib_borrow_mut(config))
545        .into_glib()
546}
547
548#[cfg(test)]
549mod tests {
550    use std::sync::{
551        atomic::{AtomicBool, Ordering},
552        Arc,
553    };
554
555    use super::*;
556    use crate::prelude::*;
557
558    pub mod imp {
559        use super::*;
560
561        #[derive(Default)]
562        pub struct TestBufferPool;
563
564        impl ObjectImpl for TestBufferPool {}
565        impl GstObjectImpl for TestBufferPool {}
566        impl BufferPoolImpl for TestBufferPool {
567            fn options() -> &'static [&'static str] {
568                &["TEST_OPTION"]
569            }
570
571            fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
572                let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
573                config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
574                self.parent_set_config(config)
575            }
576        }
577
578        #[glib::object_subclass]
579        impl ObjectSubclass for TestBufferPool {
580            const NAME: &'static str = "TestBufferPool";
581            type Type = super::TestBufferPool;
582            type ParentType = BufferPool;
583        }
584    }
585
586    glib::wrapper! {
587        pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
588    }
589
590    impl Default for TestBufferPool {
591        fn default() -> Self {
592            glib::Object::new()
593        }
594    }
595
596    #[test]
597    fn test_pool_options() {
598        crate::init().unwrap();
599        let pool = TestBufferPool::default();
600        assert_eq!(pool.options(), vec!["TEST_OPTION"]);
601    }
602
603    #[test]
604    fn test_pool_acquire() {
605        crate::init().unwrap();
606        let pool = TestBufferPool::default();
607        let mut config = pool.config();
608        config.set_params(None, 1024, 1, 1);
609        pool.set_config(config).expect("failed to set pool config");
610        pool.set_active(true).expect("failed to activate pool");
611        let buffer = pool
612            .acquire_buffer(None)
613            .expect("failed to acquire buffer from pool");
614        assert_eq!(buffer.size(), 2048);
615    }
616
617    #[test]
618    fn test_pool_free_on_finalize() {
619        crate::init().unwrap();
620        let pool = TestBufferPool::default();
621        let mut config = pool.config();
622        config.set_params(None, 1024, 1, 1);
623        pool.set_config(config).expect("failed to set pool config");
624        pool.set_active(true).expect("failed to activate pool");
625        let mut buffer = pool
626            .acquire_buffer(None)
627            .expect("failed to acquire buffer from pool");
628        let finalized = Arc::new(AtomicBool::new(false));
629        unsafe {
630            ffi::gst_mini_object_weak_ref(
631                buffer.make_mut().upcast_mut().as_mut_ptr(),
632                Some(buffer_finalized),
633                Arc::into_raw(finalized.clone()) as *mut _,
634            )
635        };
636        // return the buffer to the pool
637        std::mem::drop(buffer);
638        // drop should finalize the buffer pool which frees all allocated buffers
639        std::mem::drop(pool);
640        assert!(finalized.load(Ordering::SeqCst));
641    }
642
643    #[test]
644    fn test_pool_free_on_deactivate() {
645        crate::init().unwrap();
646        let pool = TestBufferPool::default();
647        let mut config = pool.config();
648        config.set_params(None, 1024, 1, 1);
649        pool.set_config(config).expect("failed to set pool config");
650        pool.set_active(true).expect("failed to activate pool");
651        let mut buffer = pool
652            .acquire_buffer(None)
653            .expect("failed to acquire buffer from pool");
654        let finalized = Arc::new(AtomicBool::new(false));
655        unsafe {
656            ffi::gst_mini_object_weak_ref(
657                buffer.make_mut().upcast_mut().as_mut_ptr(),
658                Some(buffer_finalized),
659                Arc::into_raw(finalized.clone()) as *mut _,
660            )
661        };
662        // return the buffer to the pool
663        std::mem::drop(buffer);
664        // de-activating a poll should free all buffers
665        pool.set_active(false).expect("failed to de-activate pool");
666        assert!(finalized.load(Ordering::SeqCst));
667    }
668
669    unsafe extern "C" fn buffer_finalized(
670        data: *mut libc::c_void,
671        _mini_object: *mut ffi::GstMiniObject,
672    ) {
673        let finalized = Arc::from_raw(data as *const AtomicBool);
674        finalized.store(true, Ordering::SeqCst);
675    }
676}