Skip to main content

gstreamer/subclass/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    hash::{Hash, Hasher},
5    ptr,
6    sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, prelude::*, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{TaskHandle, TaskPool, ffi};
13
14pub trait TaskPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<TaskPool>> {
15    // rustdoc-stripper-ignore-next
16    /// Handle to be returned from the `push` function to allow the caller to wait for the task's
17    /// completion.
18    ///
19    /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle
20    /// that does nothing on `join` or drop.
21    type Handle: TaskHandle;
22
23    // rustdoc-stripper-ignore-next
24    /// Prepare the task pool to accept tasks.
25    ///
26    /// This defaults to doing nothing.
27    // rustdoc-stripper-ignore-next-stop
28    /// Prepare the taskpool for accepting [`TaskPoolExtManual::push()`][crate::prelude::TaskPoolExtManual::push()] operations.
29    ///
30    /// MT safe.
31    fn prepare(&self) -> Result<(), glib::Error> {
32        Ok(())
33    }
34
35    // rustdoc-stripper-ignore-next
36    /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped.
37    ///
38    /// This is mainly used internally to ensure proper cleanup of internal data structures in test
39    /// suites.
40    // rustdoc-stripper-ignore-next-stop
41    /// Wait for all tasks to be stopped. This is mainly used internally
42    /// to ensure proper cleanup of internal data structures in test suites.
43    ///
44    /// MT safe.
45    fn cleanup(&self) {}
46
47    // rustdoc-stripper-ignore-next
48    /// Deliver a task to the pool.
49    ///
50    /// If returning `Ok`, you need to call the `func` eventually.
51    ///
52    /// If returning `Err`, the `func` must be dropped without calling it.
53    // rustdoc-stripper-ignore-next-stop
54    /// Start the execution of a new thread from `self`.
55    /// ## `func`
56    /// the function to call
57    ///
58    /// # Returns
59    ///
60    /// a pointer that should be used
61    /// for the gst_task_pool_join function. This pointer can be [`None`], you
62    /// must check `error` to detect errors. If the pointer is not [`None`] and
63    /// `gst_task_pool_join()` is not used, call `gst_task_pool_dispose_handle()`
64    /// instead.
65    fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
66}
67
68unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
69    fn class_init(klass: &mut glib::Class<Self>) {
70        Self::parent_class_init::<T>(klass);
71        let klass = klass.as_mut();
72        klass.prepare = Some(task_pool_prepare::<T>);
73        klass.cleanup = Some(task_pool_cleanup::<T>);
74        klass.push = Some(task_pool_push::<T>);
75        klass.join = Some(task_pool_join::<T>);
76
77        #[cfg(feature = "v1_20")]
78        {
79            klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
80        }
81    }
82}
83
84unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
85    ptr: *mut ffi::GstTaskPool,
86    error: *mut *mut glib::ffi::GError,
87) {
88    unsafe {
89        let instance = &*(ptr as *mut T::Instance);
90        let imp = instance.imp();
91
92        match imp.prepare() {
93            Ok(()) => {}
94            Err(err) => {
95                if !error.is_null() {
96                    *error = err.into_glib_ptr();
97                }
98            }
99        }
100    }
101}
102
103unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
104    unsafe {
105        let instance = &*(ptr as *mut T::Instance);
106        let imp = instance.imp();
107
108        imp.cleanup();
109    }
110}
111
112unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
113    ptr: *mut ffi::GstTaskPool,
114    func: ffi::GstTaskPoolFunction,
115    user_data: gpointer,
116    error: *mut *mut glib::ffi::GError,
117) -> gpointer {
118    unsafe {
119        let instance = &*(ptr as *mut T::Instance);
120        let imp = instance.imp();
121
122        let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
123
124        match imp.push(func.clone()) {
125            Ok(None) => ptr::null_mut(),
126            Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
127            Err(err) => {
128                func.prevent_call();
129                if !error.is_null() {
130                    *error = err.into_glib_ptr();
131                }
132                ptr::null_mut()
133            }
134        }
135    }
136}
137
138unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
139    unsafe {
140        if id.is_null() {
141            let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
142            crate::warning!(
143                crate::CAT_RUST,
144                obj = wrap.as_ref(),
145                "Tried to join null handle"
146            );
147            return;
148        }
149
150        let handle = Box::from_raw(id as *mut T::Handle);
151        handle.join();
152    }
153}
154
155#[cfg(feature = "v1_20")]
156#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
157unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
158    ptr: *mut ffi::GstTaskPool,
159    id: gpointer,
160) {
161    unsafe {
162        if id.is_null() {
163            let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
164            crate::warning!(
165                crate::CAT_RUST,
166                obj = wrap.as_ref(),
167                "Tried to dispose null handle"
168            );
169            return;
170        }
171
172        let handle = Box::from_raw(id as *mut T::Handle);
173        drop(handle);
174    }
175}
176
177// rustdoc-stripper-ignore-next
178/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push).
179#[derive(Debug)]
180pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
181
182// `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function
183// has not been called and will never be called after `push` returns `Err`.
184
185#[derive(Debug)]
186struct TaskPoolFunctionInner {
187    func: unsafe extern "C" fn(gpointer),
188    user_data: gpointer,
189    warn_on_drop: bool,
190}
191
192unsafe impl Send for TaskPoolFunctionInner {}
193
194impl TaskPoolFunction {
195    fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
196        let inner = TaskPoolFunctionInner {
197            func,
198            user_data,
199            warn_on_drop: true,
200        };
201        Self(Arc::new(Mutex::new(Some(inner))))
202    }
203
204    #[inline]
205    fn clone(&self) -> Self {
206        Self(self.0.clone())
207    }
208
209    // rustdoc-stripper-ignore-next
210    /// Consume and execute the function.
211    pub fn call(self) {
212        let mut inner = self
213            .0
214            .lock()
215            .unwrap()
216            .take()
217            .expect("TaskPoolFunction has already been dropped");
218        inner.warn_on_drop = false;
219        unsafe { (inner.func)(inner.user_data) }
220    }
221
222    fn prevent_call(self) {
223        let mut inner = self
224            .0
225            .lock()
226            .unwrap()
227            .take()
228            .expect("TaskPoolFunction has already been called");
229        inner.warn_on_drop = false;
230        drop(inner);
231    }
232
233    #[inline]
234    fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
235        Arc::as_ptr(&self.0)
236    }
237}
238
239impl Drop for TaskPoolFunctionInner {
240    fn drop(&mut self) {
241        if self.warn_on_drop {
242            crate::warning!(crate::CAT_RUST, "Leaked task function");
243        }
244    }
245}
246
247impl PartialEq for TaskPoolFunction {
248    fn eq(&self, other: &Self) -> bool {
249        self.as_ptr() == other.as_ptr()
250    }
251}
252
253impl Eq for TaskPoolFunction {}
254
255impl PartialOrd for TaskPoolFunction {
256    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
257        Some(self.cmp(other))
258    }
259}
260
261impl Ord for TaskPoolFunction {
262    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
263        self.as_ptr().cmp(&other.as_ptr())
264    }
265}
266
267impl Hash for TaskPoolFunction {
268    fn hash<H: Hasher>(&self, state: &mut H) {
269        self.as_ptr().hash(state)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use std::{
276        sync::{
277            atomic,
278            mpsc::{TryRecvError, channel},
279        },
280        thread,
281    };
282
283    use super::*;
284    use crate::prelude::*;
285
286    pub mod imp {
287        use super::*;
288
289        #[derive(Default)]
290        pub struct TestPool {
291            pub(super) prepared: atomic::AtomicBool,
292            pub(super) cleaned_up: atomic::AtomicBool,
293        }
294
295        #[glib::object_subclass]
296        impl ObjectSubclass for TestPool {
297            const NAME: &'static str = "TestPool";
298            type Type = super::TestPool;
299            type ParentType = TaskPool;
300        }
301
302        impl ObjectImpl for TestPool {}
303
304        impl GstObjectImpl for TestPool {}
305
306        impl TaskPoolImpl for TestPool {
307            type Handle = TestHandle;
308
309            fn prepare(&self) -> Result<(), glib::Error> {
310                self.prepared.store(true, atomic::Ordering::SeqCst);
311                Ok(())
312            }
313
314            fn cleanup(&self) {
315                self.cleaned_up.store(true, atomic::Ordering::SeqCst);
316            }
317
318            fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
319                let handle = thread::spawn(move || func.call());
320                Ok(Some(TestHandle(handle)))
321            }
322        }
323
324        pub struct TestHandle(thread::JoinHandle<()>);
325
326        impl TaskHandle for TestHandle {
327            fn join(self) {
328                self.0.join().unwrap();
329            }
330        }
331    }
332
333    glib::wrapper! {
334        pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
335    }
336
337    unsafe impl Send for TestPool {}
338    unsafe impl Sync for TestPool {}
339
340    impl TestPool {
341        pub fn new() -> Self {
342            Self::default()
343        }
344    }
345
346    impl Default for TestPool {
347        fn default() -> Self {
348            glib::Object::new()
349        }
350    }
351
352    #[test]
353    fn test_simple_subclass() {
354        crate::init().unwrap();
355
356        let pool = TestPool::new();
357        pool.prepare().unwrap();
358
359        let (sender, receiver) = channel();
360
361        let handle = pool
362            .push(move || {
363                sender.send(()).unwrap();
364            })
365            .unwrap();
366        let handle = handle.unwrap();
367
368        assert_eq!(receiver.recv(), Ok(()));
369
370        handle.join();
371        assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
372
373        pool.cleanup();
374
375        let imp = pool.imp();
376        assert!(imp.prepared.load(atomic::Ordering::SeqCst));
377        assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
378    }
379}