gstreamer/subclass/
buffer_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6    prelude::*,
7    subclass::{prelude::*, InitializingObject},
8    translate::*,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{ffi, BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef};
14
15pub trait BufferPoolImpl: BufferPoolImplExt + GstObjectImpl + Send + Sync {
16    /// Acquires a buffer from `self`. `buffer` should point to a memory location that
17    /// can hold a pointer to the new buffer. When the pool is empty, this function
18    /// will by default block until a buffer is released into the pool again or when
19    /// the pool is set to flushing or deactivated.
20    ///
21    /// `params` can contain optional parameters to influence the allocation.
22    /// ## `params`
23    /// parameters.
24    ///
25    /// # Returns
26    ///
27    /// a [`FlowReturn`][crate::FlowReturn] such as [`FlowReturn::Flushing`][crate::FlowReturn::Flushing] when the pool is
28    /// inactive.
29    ///
30    /// ## `buffer`
31    /// a location for a [`Buffer`][crate::Buffer]
32    fn acquire_buffer(
33        &self,
34        params: Option<&BufferPoolAcquireParams>,
35    ) -> Result<crate::Buffer, crate::FlowError> {
36        self.parent_acquire_buffer(params)
37    }
38
39    /// Allocate a buffer. the default implementation allocates
40    /// buffers from the configured memory allocator and with the configured
41    /// parameters. All metadata that is present on the allocated buffer will
42    /// be marked as [`MetaFlags::POOLED`][crate::MetaFlags::POOLED] and [`MetaFlags::LOCKED`][crate::MetaFlags::LOCKED] and will
43    /// not be removed from the buffer in `GstBufferPoolClass::reset_buffer`.
44    /// The buffer should have the [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] cleared.
45    /// ## `params`
46    /// parameters.
47    ///
48    /// # Returns
49    ///
50    /// a [`FlowReturn`][crate::FlowReturn] to indicate whether the allocation was
51    /// successful.
52    ///
53    /// ## `buffer`
54    /// a location for a [`Buffer`][crate::Buffer]
55    fn alloc_buffer(
56        &self,
57        params: Option<&BufferPoolAcquireParams>,
58    ) -> Result<crate::Buffer, crate::FlowError> {
59        self.parent_alloc_buffer(params)
60    }
61
62    /// Enter the flushing state.
63    fn flush_start(&self) {
64        self.parent_flush_start()
65    }
66
67    /// Leave the flushing state.
68    fn flush_stop(&self) {
69        self.parent_flush_stop()
70    }
71
72    /// Free a buffer. The default implementation unrefs the buffer.
73    /// ## `buffer`
74    /// the [`Buffer`][crate::Buffer] to free
75    fn free_buffer(&self, buffer: crate::Buffer) {
76        self.parent_free_buffer(buffer)
77    }
78
79    /// Releases `buffer` to `self`. `buffer` should have previously been allocated from
80    /// `self` with [`BufferPoolExtManual::acquire_buffer()`][crate::prelude::BufferPoolExtManual::acquire_buffer()].
81    ///
82    /// This function is usually called automatically when the last ref on `buffer`
83    /// disappears.
84    /// ## `buffer`
85    /// a [`Buffer`][crate::Buffer]
86    fn release_buffer(&self, buffer: crate::Buffer) {
87        self.parent_release_buffer(buffer)
88    }
89
90    /// Reset the buffer to its state when it was freshly allocated.
91    /// The default implementation will clear the flags, timestamps and
92    /// will remove the metadata without the [`MetaFlags::POOLED`][crate::MetaFlags::POOLED] flag (even
93    /// the metadata with [`MetaFlags::LOCKED`][crate::MetaFlags::LOCKED]). If the
94    /// [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] was set, this function can also try to
95    /// restore the memory and clear the [`BufferFlags::TAG_MEMORY`][crate::BufferFlags::TAG_MEMORY] again.
96    /// ## `buffer`
97    /// the [`Buffer`][crate::Buffer] to reset
98    fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
99        self.parent_reset_buffer(buffer)
100    }
101
102    /// Start the bufferpool. The default implementation will preallocate
103    /// min-buffers buffers and put them in the queue.
104    ///
105    /// Subclasses do not need to chain up to the parent's default implementation
106    /// if they don't want min-buffers based preallocation.
107    ///
108    /// # Returns
109    ///
110    /// whether the pool could be started.
111    fn start(&self) -> bool {
112        self.parent_start()
113    }
114
115    /// Stop the bufferpool. the default implementation will free the
116    /// preallocated buffers. This function is called when all the buffers are
117    /// returned to the pool.
118    ///
119    /// # Returns
120    ///
121    /// whether the pool could be stopped.
122    fn stop(&self) -> bool {
123        self.parent_stop()
124    }
125
126    /// Gets a [`None`] terminated array of string with supported bufferpool options for
127    /// `self`. An option would typically be enabled with
128    /// `gst_buffer_pool_config_add_option()`.
129    ///
130    /// # Returns
131    ///
132    /// a [`None`] terminated array
133    ///  of strings.
134    fn options() -> &'static [&'static str] {
135        &[]
136    }
137
138    /// Sets the configuration of the pool. If the pool is already configured, and
139    /// the configuration hasn't changed, this function will return [`true`]. If the
140    /// pool is active, this method will return [`false`] and active configuration
141    /// will remain. Buffers allocated from this pool must be returned or else this
142    /// function will do nothing and return [`false`].
143    ///
144    /// `config` is a [`Structure`][crate::Structure] that contains the configuration parameters for
145    /// the pool. A default and mandatory set of parameters can be configured with
146    /// `gst_buffer_pool_config_set_params()`, `gst_buffer_pool_config_set_allocator()`
147    /// and `gst_buffer_pool_config_add_option()`.
148    ///
149    /// If the parameters in `config` can not be set exactly, this function returns
150    /// [`false`] and will try to update as much state as possible. The new state can
151    /// then be retrieved and refined with [`BufferPoolExtManual::config()`][crate::prelude::BufferPoolExtManual::config()].
152    ///
153    /// This function takes ownership of `config`.
154    /// ## `config`
155    /// a [`Structure`][crate::Structure]
156    ///
157    /// # Returns
158    ///
159    /// [`true`] when the configuration could be set.
160    fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
161        self.parent_set_config(config)
162    }
163}
164
165mod sealed {
166    pub trait Sealed {}
167    impl<T: super::BufferPoolImplExt> Sealed for T {}
168}
169
170pub trait BufferPoolImplExt: sealed::Sealed + ObjectSubclass {
171    fn parent_acquire_buffer(
172        &self,
173        params: Option<&BufferPoolAcquireParams>,
174    ) -> Result<crate::Buffer, crate::FlowError> {
175        unsafe {
176            let data = Self::type_data();
177            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
178            if let Some(f) = (*parent_class).acquire_buffer {
179                let params_ptr = mut_override(params.to_glib_none().0);
180                let mut buffer = std::ptr::null_mut();
181
182                let result = f(
183                    self.obj()
184                        .unsafe_cast_ref::<crate::BufferPool>()
185                        .to_glib_none()
186                        .0,
187                    &mut buffer,
188                    params_ptr,
189                );
190
191                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
192            } else {
193                Err(crate::FlowError::NotSupported)
194            }
195        }
196    }
197
198    fn parent_alloc_buffer(
199        &self,
200        params: Option<&BufferPoolAcquireParams>,
201    ) -> Result<crate::Buffer, crate::FlowError> {
202        unsafe {
203            let data = Self::type_data();
204            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
205            if let Some(f) = (*parent_class).alloc_buffer {
206                let params_ptr = mut_override(params.to_glib_none().0);
207                let mut buffer = std::ptr::null_mut();
208
209                let result = f(
210                    self.obj()
211                        .unsafe_cast_ref::<crate::BufferPool>()
212                        .to_glib_none()
213                        .0,
214                    &mut buffer,
215                    params_ptr,
216                );
217
218                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
219            } else {
220                Err(crate::FlowError::NotSupported)
221            }
222        }
223    }
224
225    fn parent_free_buffer(&self, buffer: crate::Buffer) {
226        unsafe {
227            let data = Self::type_data();
228            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
229            if let Some(f) = (*parent_class).free_buffer {
230                f(
231                    self.obj()
232                        .unsafe_cast_ref::<crate::BufferPool>()
233                        .to_glib_none()
234                        .0,
235                    buffer.into_glib_ptr(),
236                )
237            }
238        }
239    }
240
241    fn parent_release_buffer(&self, buffer: crate::Buffer) {
242        unsafe {
243            let data = Self::type_data();
244            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
245            if let Some(f) = (*parent_class).release_buffer {
246                f(
247                    self.obj()
248                        .unsafe_cast_ref::<crate::BufferPool>()
249                        .to_glib_none()
250                        .0,
251                    buffer.into_glib_ptr(),
252                )
253            }
254        }
255    }
256
257    fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
258        unsafe {
259            let data = Self::type_data();
260            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
261            if let Some(f) = (*parent_class).reset_buffer {
262                f(
263                    self.obj()
264                        .unsafe_cast_ref::<crate::BufferPool>()
265                        .to_glib_none()
266                        .0,
267                    buffer.as_mut_ptr(),
268                )
269            }
270        }
271    }
272
273    fn parent_start(&self) -> bool {
274        unsafe {
275            let data = Self::type_data();
276            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
277            if let Some(f) = (*parent_class).start {
278                let result = f(self
279                    .obj()
280                    .unsafe_cast_ref::<crate::BufferPool>()
281                    .to_glib_none()
282                    .0);
283
284                from_glib(result)
285            } else {
286                true
287            }
288        }
289    }
290
291    fn parent_stop(&self) -> bool {
292        unsafe {
293            let data = Self::type_data();
294            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
295            if let Some(f) = (*parent_class).stop {
296                let result = f(self
297                    .obj()
298                    .unsafe_cast_ref::<crate::BufferPool>()
299                    .to_glib_none()
300                    .0);
301
302                from_glib(result)
303            } else {
304                true
305            }
306        }
307    }
308
309    fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
310        unsafe {
311            let data = Self::type_data();
312            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
313            if let Some(f) = (*parent_class).set_config {
314                let result = f(
315                    self.obj()
316                        .unsafe_cast_ref::<crate::BufferPool>()
317                        .to_glib_none()
318                        .0,
319                    (*config).as_mut_ptr(),
320                );
321
322                from_glib(result)
323            } else {
324                false
325            }
326        }
327    }
328
329    fn parent_flush_start(&self) {
330        unsafe {
331            let data = Self::type_data();
332            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
333            if let Some(f) = (*parent_class).flush_start {
334                f(self
335                    .obj()
336                    .unsafe_cast_ref::<crate::BufferPool>()
337                    .to_glib_none()
338                    .0)
339            }
340        }
341    }
342
343    fn parent_flush_stop(&self) {
344        unsafe {
345            let data = Self::type_data();
346            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
347            if let Some(f) = (*parent_class).flush_stop {
348                f(self
349                    .obj()
350                    .unsafe_cast_ref::<crate::BufferPool>()
351                    .to_glib_none()
352                    .0)
353            }
354        }
355    }
356}
357
358impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
359
360unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
361    fn class_init(klass: &mut glib::Class<Self>) {
362        Self::parent_class_init::<T>(klass);
363        let klass = klass.as_mut();
364        klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
365        klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
366        klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
367        klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
368        klass.start = Some(buffer_pool_start::<T>);
369        klass.stop = Some(buffer_pool_stop::<T>);
370        klass.get_options = Some(buffer_pool_get_options::<T>);
371        klass.set_config = Some(buffer_pool_set_config::<T>);
372        klass.flush_start = Some(buffer_pool_flush_start::<T>);
373        klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
374        klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
375    }
376
377    fn instance_init(instance: &mut InitializingObject<T>) {
378        Self::parent_instance_init(instance);
379
380        // Store the pool options in the instance data
381        // for later retrieval in buffer_pool_get_options
382        let options = T::options();
383        instance.set_instance_data(T::type_(), glib::StrV::from(options));
384    }
385}
386
387unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
388    ptr: *mut ffi::GstBufferPool,
389    buffer: *mut *mut ffi::GstBuffer,
390    params: *mut ffi::GstBufferPoolAcquireParams,
391) -> ffi::GstFlowReturn {
392    let instance = &*(ptr as *mut T::Instance);
393    let imp = instance.imp();
394    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
395
396    match imp.acquire_buffer(params.as_ref()) {
397        Ok(b) => {
398            *buffer = b.into_glib_ptr();
399            ffi::GST_FLOW_OK
400        }
401        Err(err) => err.into_glib(),
402    }
403}
404
405unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
406    ptr: *mut ffi::GstBufferPool,
407    buffer: *mut *mut ffi::GstBuffer,
408    params: *mut ffi::GstBufferPoolAcquireParams,
409) -> ffi::GstFlowReturn {
410    let instance = &*(ptr as *mut T::Instance);
411    let imp = instance.imp();
412    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
413
414    match imp.alloc_buffer(params.as_ref()) {
415        Ok(b) => {
416            *buffer = b.into_glib_ptr();
417            ffi::GST_FLOW_OK
418        }
419        Err(err) => err.into_glib(),
420    }
421}
422
423unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
424    // the GstBufferPool implementation calls this
425    // in finalize where the ref_count will already
426    // be zero and we are actually destroyed
427    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
428    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
429        // flush_start is a no-op in GstBufferPool
430        return;
431    }
432
433    let instance = &*(ptr as *mut T::Instance);
434    let imp = instance.imp();
435    imp.flush_start();
436}
437
438unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
439    // the GstBufferPool implementation calls this
440    // in finalize where the ref_count will already
441    // be zero and we are actually destroyed
442    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
443    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
444        // flush_stop is a no-op in GstBufferPool
445        return;
446    }
447
448    let instance = &*(ptr as *mut T::Instance);
449    let imp = instance.imp();
450    imp.flush_stop();
451}
452
453unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
454    ptr: *mut ffi::GstBufferPool,
455    buffer: *mut ffi::GstBuffer,
456) {
457    // the GstBufferPool implementation calls this
458    // in finalize where the ref_count will already
459    // be zero and we are actually destroyed
460    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
461    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
462        // As a workaround we call free_buffer directly on the
463        // GstBufferPool to prevent leaking the buffer
464        // This will NOT call free_buffer on a subclass.
465        let pool_class =
466            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
467        let pool_class = pool_class.as_ref();
468        if let Some(f) = pool_class.free_buffer {
469            f(ptr, buffer)
470        }
471        return;
472    }
473
474    let instance = &*(ptr as *mut T::Instance);
475    let imp = instance.imp();
476    imp.free_buffer(from_glib_full(buffer));
477}
478
479unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
480    ptr: *mut ffi::GstBufferPool,
481    buffer: *mut ffi::GstBuffer,
482) {
483    let instance = &*(ptr as *mut T::Instance);
484    let imp = instance.imp();
485    imp.release_buffer(from_glib_full(buffer));
486}
487
488unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
489    ptr: *mut ffi::GstBufferPool,
490    buffer: *mut ffi::GstBuffer,
491) {
492    let instance = &*(ptr as *mut T::Instance);
493    let imp = instance.imp();
494    imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
495}
496
497unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
498    ptr: *mut ffi::GstBufferPool,
499) -> glib::ffi::gboolean {
500    let instance = &*(ptr as *mut T::Instance);
501    let imp = instance.imp();
502    imp.start().into_glib()
503}
504
505unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
506    ptr: *mut ffi::GstBufferPool,
507) -> glib::ffi::gboolean {
508    // the GstBufferPool implementation calls this
509    // in finalize where the ref_count will already
510    // be zero and we are actually destroyed
511    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
512    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
513        // As a workaround we call stop directly on the GstBufferPool
514        // This is needed because the default implementation clears
515        // the pool in stop.
516        let pool_class =
517            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
518        let pool_class = pool_class.as_ref();
519        let result = if let Some(f) = pool_class.stop {
520            f(ptr)
521        } else {
522            true.into_glib()
523        };
524
525        return result;
526    }
527
528    let instance = &*(ptr as *mut T::Instance);
529    let imp = instance.imp();
530    imp.stop().into_glib()
531}
532
533unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
534    ptr: *mut ffi::GstBufferPool,
535) -> *mut *const c_char {
536    let instance = &*(ptr as *mut T::Instance);
537    let imp = instance.imp();
538    T::instance_data::<glib::StrV>(imp, T::type_())
539        .map(|p| p.as_ptr() as *mut *const _)
540        .unwrap_or(ptr::null_mut())
541}
542
543unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
544    ptr: *mut ffi::GstBufferPool,
545    config: *mut ffi::GstStructure,
546) -> glib::ffi::gboolean {
547    let instance = &*(ptr as *mut T::Instance);
548    let imp = instance.imp();
549    imp.set_config(BufferPoolConfigRef::from_glib_borrow_mut(config))
550        .into_glib()
551}
552
553#[cfg(test)]
554mod tests {
555    use std::sync::{
556        atomic::{AtomicBool, Ordering},
557        Arc,
558    };
559
560    use super::*;
561    use crate::prelude::*;
562
563    pub mod imp {
564        use super::*;
565
566        #[derive(Default)]
567        pub struct TestBufferPool;
568
569        impl ObjectImpl for TestBufferPool {}
570        impl GstObjectImpl for TestBufferPool {}
571        impl BufferPoolImpl for TestBufferPool {
572            fn options() -> &'static [&'static str] {
573                &["TEST_OPTION"]
574            }
575
576            fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
577                let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
578                config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
579                self.parent_set_config(config)
580            }
581        }
582
583        #[glib::object_subclass]
584        impl ObjectSubclass for TestBufferPool {
585            const NAME: &'static str = "TestBufferPool";
586            type Type = super::TestBufferPool;
587            type ParentType = BufferPool;
588        }
589    }
590
591    glib::wrapper! {
592        pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
593    }
594
595    impl Default for TestBufferPool {
596        fn default() -> Self {
597            glib::Object::new()
598        }
599    }
600
601    #[test]
602    fn test_pool_options() {
603        crate::init().unwrap();
604        let pool = TestBufferPool::default();
605        assert_eq!(pool.options(), vec!["TEST_OPTION"]);
606    }
607
608    #[test]
609    fn test_pool_acquire() {
610        crate::init().unwrap();
611        let pool = TestBufferPool::default();
612        let mut config = pool.config();
613        config.set_params(None, 1024, 1, 1);
614        pool.set_config(config).expect("failed to set pool config");
615        pool.set_active(true).expect("failed to activate pool");
616        let buffer = pool
617            .acquire_buffer(None)
618            .expect("failed to acquire buffer from pool");
619        assert_eq!(buffer.size(), 2048);
620    }
621
622    #[test]
623    fn test_pool_free_on_finalize() {
624        crate::init().unwrap();
625        let pool = TestBufferPool::default();
626        let mut config = pool.config();
627        config.set_params(None, 1024, 1, 1);
628        pool.set_config(config).expect("failed to set pool config");
629        pool.set_active(true).expect("failed to activate pool");
630        let mut buffer = pool
631            .acquire_buffer(None)
632            .expect("failed to acquire buffer from pool");
633        let finalized = Arc::new(AtomicBool::new(false));
634        unsafe {
635            ffi::gst_mini_object_weak_ref(
636                buffer.make_mut().upcast_mut().as_mut_ptr(),
637                Some(buffer_finalized),
638                Arc::into_raw(finalized.clone()) as *mut _,
639            )
640        };
641        // return the buffer to the pool
642        std::mem::drop(buffer);
643        // drop should finalize the buffer pool which frees all allocated buffers
644        std::mem::drop(pool);
645        assert!(finalized.load(Ordering::SeqCst));
646    }
647
648    #[test]
649    fn test_pool_free_on_deactivate() {
650        crate::init().unwrap();
651        let pool = TestBufferPool::default();
652        let mut config = pool.config();
653        config.set_params(None, 1024, 1, 1);
654        pool.set_config(config).expect("failed to set pool config");
655        pool.set_active(true).expect("failed to activate pool");
656        let mut buffer = pool
657            .acquire_buffer(None)
658            .expect("failed to acquire buffer from pool");
659        let finalized = Arc::new(AtomicBool::new(false));
660        unsafe {
661            ffi::gst_mini_object_weak_ref(
662                buffer.make_mut().upcast_mut().as_mut_ptr(),
663                Some(buffer_finalized),
664                Arc::into_raw(finalized.clone()) as *mut _,
665            )
666        };
667        // return the buffer to the pool
668        std::mem::drop(buffer);
669        // de-activating a poll should free all buffers
670        pool.set_active(false).expect("failed to de-activate pool");
671        assert!(finalized.load(Ordering::SeqCst));
672    }
673
674    unsafe extern "C" fn buffer_finalized(
675        data: *mut libc::c_void,
676        _mini_object: *mut ffi::GstMiniObject,
677    ) {
678        let finalized = Arc::from_raw(data as *const AtomicBool);
679        finalized.store(true, Ordering::SeqCst);
680    }
681}