Skip to main content

gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{TaskPool, ffi};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    unsafe {
11        let func = Box::from_raw(data as *mut P);
12        func()
13    }
14}
15
16pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
17    /// Start the execution of a new thread from `self`.
18    /// ## `func`
19    /// the function to call
20    ///
21    /// # Returns
22    ///
23    /// a pointer that should be used
24    /// for the gst_task_pool_join function. This pointer can be [`None`], you
25    /// must check `error` to detect errors. If the pointer is not [`None`] and
26    /// `gst_task_pool_join()` is not used, call `gst_task_pool_dispose_handle()`
27    /// instead.
28    #[doc(alias = "gst_task_pool_push")]
29    fn push<P: FnOnce() + Send + 'static>(
30        &self,
31        func: P,
32    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
33        unsafe {
34            let mut error = ptr::null_mut();
35            let func: Box<P> = Box::new(func);
36            let func = Box::into_raw(func);
37
38            let handle = ffi::gst_task_pool_push(
39                self.as_ref().to_glib_none().0,
40                Some(task_pool_trampoline::<P>),
41                func as gpointer,
42                &mut error,
43            );
44
45            if !error.is_null() {
46                debug_assert!(handle.is_null());
47
48                // Assume that task_pool_trampoline was
49                // not called and will not be called
50                drop(Box::from_raw(func));
51
52                return Err(from_glib_full(error));
53            }
54
55            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
56                handle,
57                task_pool: Some(self.as_ref().clone()),
58            });
59
60            Ok(handle)
61        }
62    }
63}
64
65impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
66
67impl TaskPool {
68    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
69        unsafe { ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr()) }
70    }
71
72    #[cfg(feature = "v1_20")]
73    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
74    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
75        unsafe { ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr()) }
76    }
77}
78
79// rustdoc-stripper-ignore-next
80/// A handle for a task which was pushed to a task pool.
81pub trait TaskHandle {
82    // rustdoc-stripper-ignore-next
83    /// Wait for the task to complete.
84    fn join(self);
85}
86
87impl TaskHandle for () {
88    fn join(self) {}
89}
90
91impl TaskHandle for std::convert::Infallible {
92    fn join(self) {}
93}
94
95// rustdoc-stripper-ignore-next
96/// An opaque handle for a task associated with a particular task pool.
97///
98/// Keeps a reference to the pool alive.
99///
100/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
101/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
102#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
103#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
104pub struct TaskPoolTaskHandle {
105    handle: ptr::NonNull<libc::c_void>,
106    task_pool: Option<TaskPool>,
107}
108
109impl TaskHandle for TaskPoolTaskHandle {
110    #[doc(alias = "gst_task_pool_join")]
111    fn join(mut self) {
112        let task_pool = self.task_pool.take().unwrap();
113        unsafe { task_pool.join(self.handle) }
114    }
115}
116
117impl Drop for TaskPoolTaskHandle {
118    #[doc(alias = "gst_task_pool_dispose_handle")]
119    #[inline]
120    fn drop(&mut self) {
121        if let Some(task_pool) = self.task_pool.take() {
122            cfg_if::cfg_if! {
123                if #[cfg(feature = "v1_20")] {
124                    unsafe { task_pool.dispose_handle(self.handle) }
125                } else {
126                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
127                }
128            }
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use std::sync::mpsc::{RecvError, channel};
136
137    use super::*;
138    use crate::prelude::*;
139
140    #[test]
141    fn test_simple() {
142        crate::init().unwrap();
143        let pool = TaskPool::new();
144        pool.prepare().unwrap();
145
146        let (sender, receiver) = channel();
147
148        let handle = pool
149            .push(move || {
150                sender.send(()).unwrap();
151            })
152            .unwrap();
153
154        assert_eq!(receiver.recv(), Ok(()));
155
156        if let Some(handle) = handle {
157            handle.join();
158        }
159
160        // Can't test try_recv here as the default task pool produces no
161        // handles and thus no way to wait for channel destruction
162        assert_eq!(receiver.recv(), Err(RecvError));
163
164        pool.cleanup();
165    }
166}