gstreamer/subclass/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    hash::{Hash, Hasher},
5    ptr,
6    sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, prelude::*, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{ffi, TaskHandle, TaskPool};
13
14pub trait TaskPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<TaskPool>> {
15    // rustdoc-stripper-ignore-next
16    /// Handle to be returned from the `push` function to allow the caller to wait for the task's
17    /// completion.
18    ///
19    /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle
20    /// that does nothing on `join` or drop.
21    type Handle: TaskHandle;
22
23    // rustdoc-stripper-ignore-next
24    /// Prepare the task pool to accept tasks.
25    ///
26    /// This defaults to doing nothing.
27    // rustdoc-stripper-ignore-next-stop
28    /// Prepare the taskpool for accepting [`TaskPoolExtManual::push()`][crate::prelude::TaskPoolExtManual::push()] operations.
29    ///
30    /// MT safe.
31    fn prepare(&self) -> Result<(), glib::Error> {
32        Ok(())
33    }
34
35    // rustdoc-stripper-ignore-next
36    /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped.
37    ///
38    /// This is mainly used internally to ensure proper cleanup of internal data structures in test
39    /// suites.
40    // rustdoc-stripper-ignore-next-stop
41    /// Wait for all tasks to be stopped. This is mainly used internally
42    /// to ensure proper cleanup of internal data structures in test suites.
43    ///
44    /// MT safe.
45    fn cleanup(&self) {}
46
47    // rustdoc-stripper-ignore-next
48    /// Deliver a task to the pool.
49    ///
50    /// If returning `Ok`, you need to call the `func` eventually.
51    ///
52    /// If returning `Err`, the `func` must be dropped without calling it.
53    // rustdoc-stripper-ignore-next-stop
54    /// Start the execution of a new thread from `self`.
55    /// ## `func`
56    /// the function to call
57    ///
58    /// # Returns
59    ///
60    /// a pointer that should be used
61    /// for the gst_task_pool_join function. This pointer can be [`None`], you
62    /// must check `error` to detect errors. If the pointer is not [`None`] and
63    /// `gst_task_pool_join()` is not used, call `gst_task_pool_dispose_handle()`
64    /// instead.
65    fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
66}
67
68unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
69    fn class_init(klass: &mut glib::Class<Self>) {
70        Self::parent_class_init::<T>(klass);
71        let klass = klass.as_mut();
72        klass.prepare = Some(task_pool_prepare::<T>);
73        klass.cleanup = Some(task_pool_cleanup::<T>);
74        klass.push = Some(task_pool_push::<T>);
75        klass.join = Some(task_pool_join::<T>);
76
77        #[cfg(feature = "v1_20")]
78        {
79            klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
80        }
81    }
82}
83
84unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
85    ptr: *mut ffi::GstTaskPool,
86    error: *mut *mut glib::ffi::GError,
87) {
88    let instance = &*(ptr as *mut T::Instance);
89    let imp = instance.imp();
90
91    match imp.prepare() {
92        Ok(()) => {}
93        Err(err) => {
94            if !error.is_null() {
95                *error = err.into_glib_ptr();
96            }
97        }
98    }
99}
100
101unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
102    let instance = &*(ptr as *mut T::Instance);
103    let imp = instance.imp();
104
105    imp.cleanup();
106}
107
108unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
109    ptr: *mut ffi::GstTaskPool,
110    func: ffi::GstTaskPoolFunction,
111    user_data: gpointer,
112    error: *mut *mut glib::ffi::GError,
113) -> gpointer {
114    let instance = &*(ptr as *mut T::Instance);
115    let imp = instance.imp();
116
117    let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
118
119    match imp.push(func.clone()) {
120        Ok(None) => ptr::null_mut(),
121        Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
122        Err(err) => {
123            func.prevent_call();
124            if !error.is_null() {
125                *error = err.into_glib_ptr();
126            }
127            ptr::null_mut()
128        }
129    }
130}
131
132unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
133    if id.is_null() {
134        let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
135        crate::warning!(
136            crate::CAT_RUST,
137            obj = wrap.as_ref(),
138            "Tried to join null handle"
139        );
140        return;
141    }
142
143    let handle = Box::from_raw(id as *mut T::Handle);
144    handle.join();
145}
146
147#[cfg(feature = "v1_20")]
148#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
149unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
150    ptr: *mut ffi::GstTaskPool,
151    id: gpointer,
152) {
153    if id.is_null() {
154        let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
155        crate::warning!(
156            crate::CAT_RUST,
157            obj = wrap.as_ref(),
158            "Tried to dispose null handle"
159        );
160        return;
161    }
162
163    let handle = Box::from_raw(id as *mut T::Handle);
164    drop(handle);
165}
166
167// rustdoc-stripper-ignore-next
168/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push).
169#[derive(Debug)]
170pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
171
172// `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function
173// has not been called and will never be called after `push` returns `Err`.
174
175#[derive(Debug)]
176struct TaskPoolFunctionInner {
177    func: unsafe extern "C" fn(gpointer),
178    user_data: gpointer,
179    warn_on_drop: bool,
180}
181
182unsafe impl Send for TaskPoolFunctionInner {}
183
184impl TaskPoolFunction {
185    fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
186        let inner = TaskPoolFunctionInner {
187            func,
188            user_data,
189            warn_on_drop: true,
190        };
191        Self(Arc::new(Mutex::new(Some(inner))))
192    }
193
194    #[inline]
195    fn clone(&self) -> Self {
196        Self(self.0.clone())
197    }
198
199    // rustdoc-stripper-ignore-next
200    /// Consume and execute the function.
201    pub fn call(self) {
202        let mut inner = self
203            .0
204            .lock()
205            .unwrap()
206            .take()
207            .expect("TaskPoolFunction has already been dropped");
208        inner.warn_on_drop = false;
209        unsafe { (inner.func)(inner.user_data) }
210    }
211
212    fn prevent_call(self) {
213        let mut inner = self
214            .0
215            .lock()
216            .unwrap()
217            .take()
218            .expect("TaskPoolFunction has already been called");
219        inner.warn_on_drop = false;
220        drop(inner);
221    }
222
223    #[inline]
224    fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
225        Arc::as_ptr(&self.0)
226    }
227}
228
229impl Drop for TaskPoolFunctionInner {
230    fn drop(&mut self) {
231        if self.warn_on_drop {
232            crate::warning!(crate::CAT_RUST, "Leaked task function");
233        }
234    }
235}
236
237impl PartialEq for TaskPoolFunction {
238    fn eq(&self, other: &Self) -> bool {
239        self.as_ptr() == other.as_ptr()
240    }
241}
242
243impl Eq for TaskPoolFunction {}
244
245impl PartialOrd for TaskPoolFunction {
246    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
247        Some(self.cmp(other))
248    }
249}
250
251impl Ord for TaskPoolFunction {
252    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
253        self.as_ptr().cmp(&other.as_ptr())
254    }
255}
256
257impl Hash for TaskPoolFunction {
258    fn hash<H: Hasher>(&self, state: &mut H) {
259        self.as_ptr().hash(state)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use std::{
266        sync::{
267            atomic,
268            mpsc::{channel, TryRecvError},
269        },
270        thread,
271    };
272
273    use super::*;
274    use crate::prelude::*;
275
276    pub mod imp {
277        use super::*;
278
279        #[derive(Default)]
280        pub struct TestPool {
281            pub(super) prepared: atomic::AtomicBool,
282            pub(super) cleaned_up: atomic::AtomicBool,
283        }
284
285        #[glib::object_subclass]
286        impl ObjectSubclass for TestPool {
287            const NAME: &'static str = "TestPool";
288            type Type = super::TestPool;
289            type ParentType = TaskPool;
290        }
291
292        impl ObjectImpl for TestPool {}
293
294        impl GstObjectImpl for TestPool {}
295
296        impl TaskPoolImpl for TestPool {
297            type Handle = TestHandle;
298
299            fn prepare(&self) -> Result<(), glib::Error> {
300                self.prepared.store(true, atomic::Ordering::SeqCst);
301                Ok(())
302            }
303
304            fn cleanup(&self) {
305                self.cleaned_up.store(true, atomic::Ordering::SeqCst);
306            }
307
308            fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
309                let handle = thread::spawn(move || func.call());
310                Ok(Some(TestHandle(handle)))
311            }
312        }
313
314        pub struct TestHandle(thread::JoinHandle<()>);
315
316        impl TaskHandle for TestHandle {
317            fn join(self) {
318                self.0.join().unwrap();
319            }
320        }
321    }
322
323    glib::wrapper! {
324        pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
325    }
326
327    unsafe impl Send for TestPool {}
328    unsafe impl Sync for TestPool {}
329
330    impl TestPool {
331        pub fn new() -> Self {
332            Self::default()
333        }
334    }
335
336    impl Default for TestPool {
337        fn default() -> Self {
338            glib::Object::new()
339        }
340    }
341
342    #[test]
343    fn test_simple_subclass() {
344        crate::init().unwrap();
345
346        let pool = TestPool::new();
347        pool.prepare().unwrap();
348
349        let (sender, receiver) = channel();
350
351        let handle = pool
352            .push(move || {
353                sender.send(()).unwrap();
354            })
355            .unwrap();
356        let handle = handle.unwrap();
357
358        assert_eq!(receiver.recv(), Ok(()));
359
360        handle.join();
361        assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
362
363        pool.cleanup();
364
365        let imp = pool.imp();
366        assert!(imp.prepared.load(atomic::Ordering::SeqCst));
367        assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
368    }
369}