gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    let func = Box::from_raw(data as *mut P);
11    func()
12}
13
14pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
15    /// Start the execution of a new thread from `self`.
16    /// ## `func`
17    /// the function to call
18    ///
19    /// # Returns
20    ///
21    /// a pointer that should be used
22    /// for the gst_task_pool_join function. This pointer can be [`None`], you
23    /// must check `error` to detect errors. If the pointer is not [`None`] and
24    /// `gst_task_pool_join()` is not used, call `gst_task_pool_dispose_handle()`
25    /// instead.
26    #[doc(alias = "gst_task_pool_push")]
27    fn push<P: FnOnce() + Send + 'static>(
28        &self,
29        func: P,
30    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
31        unsafe {
32            let mut error = ptr::null_mut();
33            let func: Box<P> = Box::new(func);
34            let func = Box::into_raw(func);
35
36            let handle = ffi::gst_task_pool_push(
37                self.as_ref().to_glib_none().0,
38                Some(task_pool_trampoline::<P>),
39                func as gpointer,
40                &mut error,
41            );
42
43            if !error.is_null() {
44                debug_assert!(handle.is_null());
45
46                // Assume that task_pool_trampoline was
47                // not called and will not be called
48                drop(Box::from_raw(func));
49
50                return Err(from_glib_full(error));
51            }
52
53            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
54                handle,
55                task_pool: Some(self.as_ref().clone()),
56            });
57
58            Ok(handle)
59        }
60    }
61}
62
63impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
64
65impl TaskPool {
66    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
67        ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
68    }
69
70    #[cfg(feature = "v1_20")]
71    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
72    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
73        ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
74    }
75}
76
77// rustdoc-stripper-ignore-next
78/// A handle for a task which was pushed to a task pool.
79pub trait TaskHandle {
80    // rustdoc-stripper-ignore-next
81    /// Wait for the task to complete.
82    fn join(self);
83}
84
85impl TaskHandle for () {
86    fn join(self) {}
87}
88
89impl TaskHandle for std::convert::Infallible {
90    fn join(self) {}
91}
92
93// rustdoc-stripper-ignore-next
94/// An opaque handle for a task associated with a particular task pool.
95///
96/// Keeps a reference to the pool alive.
97///
98/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
99/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
100#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
101#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
102pub struct TaskPoolTaskHandle {
103    handle: ptr::NonNull<libc::c_void>,
104    task_pool: Option<TaskPool>,
105}
106
107impl TaskHandle for TaskPoolTaskHandle {
108    #[doc(alias = "gst_task_pool_join")]
109    fn join(mut self) {
110        let task_pool = self.task_pool.take().unwrap();
111        unsafe { task_pool.join(self.handle) }
112    }
113}
114
115impl Drop for TaskPoolTaskHandle {
116    #[doc(alias = "gst_task_pool_dispose_handle")]
117    #[inline]
118    fn drop(&mut self) {
119        if let Some(task_pool) = self.task_pool.take() {
120            cfg_if::cfg_if! {
121                if #[cfg(feature = "v1_20")] {
122                    unsafe { task_pool.dispose_handle(self.handle) }
123                } else {
124                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
125                }
126            }
127        }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::sync::mpsc::{channel, RecvError};
134
135    use super::*;
136    use crate::prelude::*;
137
138    #[test]
139    fn test_simple() {
140        crate::init().unwrap();
141        let pool = TaskPool::new();
142        pool.prepare().unwrap();
143
144        let (sender, receiver) = channel();
145
146        let handle = pool
147            .push(move || {
148                sender.send(()).unwrap();
149            })
150            .unwrap();
151
152        assert_eq!(receiver.recv(), Ok(()));
153
154        if let Some(handle) = handle {
155            handle.join();
156        }
157
158        // Can't test try_recv here as the default task pool produces no
159        // handles and thus no way to wait for channel destruction
160        assert_eq!(receiver.recv(), Err(RecvError));
161
162        pool.cleanup();
163    }
164}