gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    let func = Box::from_raw(data as *mut P);
11    func()
12}
13mod sealed {
14    pub trait Sealed {}
15    impl<T: super::IsA<super::TaskPool>> Sealed for T {}
16}
17
18pub trait TaskPoolExtManual: sealed::Sealed + IsA<TaskPool> + 'static {
19    /// Start the execution of a new thread from `self`.
20    /// ## `func`
21    /// the function to call
22    ///
23    /// # Returns
24    ///
25    /// a pointer that should be used
26    /// for the gst_task_pool_join function. This pointer can be [`None`], you
27    /// must check `error` to detect errors. If the pointer is not [`None`] and
28    /// `gst_task_pool_join()` is not used, call `gst_task_pool_dispose_handle()`
29    /// instead.
30    #[doc(alias = "gst_task_pool_push")]
31    fn push<P: FnOnce() + Send + 'static>(
32        &self,
33        func: P,
34    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
35        unsafe {
36            let mut error = ptr::null_mut();
37            let func: Box<P> = Box::new(func);
38            let func = Box::into_raw(func);
39
40            let handle = ffi::gst_task_pool_push(
41                self.as_ref().to_glib_none().0,
42                Some(task_pool_trampoline::<P>),
43                func as gpointer,
44                &mut error,
45            );
46
47            if !error.is_null() {
48                debug_assert!(handle.is_null());
49
50                // Assume that task_pool_trampoline was
51                // not called and will not be called
52                drop(Box::from_raw(func));
53
54                return Err(from_glib_full(error));
55            }
56
57            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
58                handle,
59                task_pool: Some(self.as_ref().clone()),
60            });
61
62            Ok(handle)
63        }
64    }
65}
66
67impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
68
69impl TaskPool {
70    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
71        ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
72    }
73
74    #[cfg(feature = "v1_20")]
75    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
76    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
77        ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
78    }
79}
80
81// rustdoc-stripper-ignore-next
82/// A handle for a task which was pushed to a task pool.
83pub trait TaskHandle {
84    // rustdoc-stripper-ignore-next
85    /// Wait for the task to complete.
86    fn join(self);
87}
88
89impl TaskHandle for () {
90    fn join(self) {}
91}
92
93impl TaskHandle for std::convert::Infallible {
94    fn join(self) {}
95}
96
97// rustdoc-stripper-ignore-next
98/// An opaque handle for a task associated with a particular task pool.
99///
100/// Keeps a reference to the pool alive.
101///
102/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
103/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
104#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
105#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
106pub struct TaskPoolTaskHandle {
107    handle: ptr::NonNull<libc::c_void>,
108    task_pool: Option<TaskPool>,
109}
110
111impl TaskHandle for TaskPoolTaskHandle {
112    #[doc(alias = "gst_task_pool_join")]
113    fn join(mut self) {
114        let task_pool = self.task_pool.take().unwrap();
115        unsafe { task_pool.join(self.handle) }
116    }
117}
118
119impl Drop for TaskPoolTaskHandle {
120    #[doc(alias = "gst_task_pool_dispose_handle")]
121    #[inline]
122    fn drop(&mut self) {
123        if let Some(task_pool) = self.task_pool.take() {
124            cfg_if::cfg_if! {
125                if #[cfg(feature = "v1_20")] {
126                    unsafe { task_pool.dispose_handle(self.handle) }
127                } else {
128                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
129                }
130            }
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use std::sync::mpsc::{channel, RecvError};
138
139    use super::*;
140    use crate::prelude::*;
141
142    #[test]
143    fn test_simple() {
144        crate::init().unwrap();
145        let pool = TaskPool::new();
146        pool.prepare().unwrap();
147
148        let (sender, receiver) = channel();
149
150        let handle = pool
151            .push(move || {
152                sender.send(()).unwrap();
153            })
154            .unwrap();
155
156        assert_eq!(receiver.recv(), Ok(()));
157
158        if let Some(handle) = handle {
159            handle.join();
160        }
161
162        // Can't test try_recv here as the default task pool produces no
163        // handles and thus no way to wait for channel destruction
164        assert_eq!(receiver.recv(), Err(RecvError));
165
166        pool.cleanup();
167    }
168}