1use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{TaskPool, ffi};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 unsafe {
11 let func = Box::from_raw(data as *mut P);
12 func()
13 }
14}
15
16pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
17 #[doc(alias = "gst_task_pool_push")]
29 fn push<P: FnOnce() + Send + 'static>(
30 &self,
31 func: P,
32 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
33 unsafe {
34 let mut error = ptr::null_mut();
35 let func: Box<P> = Box::new(func);
36 let func = Box::into_raw(func);
37
38 let handle = ffi::gst_task_pool_push(
39 self.as_ref().to_glib_none().0,
40 Some(task_pool_trampoline::<P>),
41 func as gpointer,
42 &mut error,
43 );
44
45 if !error.is_null() {
46 debug_assert!(handle.is_null());
47
48 drop(Box::from_raw(func));
51
52 return Err(from_glib_full(error));
53 }
54
55 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
56 handle,
57 task_pool: Some(self.as_ref().clone()),
58 });
59
60 Ok(handle)
61 }
62 }
63}
64
65impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
66
67impl TaskPool {
68 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
69 unsafe { ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr()) }
70 }
71
72 #[cfg(feature = "v1_20")]
73 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
74 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
75 unsafe { ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr()) }
76 }
77}
78
79pub trait TaskHandle {
82 fn join(self);
85}
86
87impl TaskHandle for () {
88 fn join(self) {}
89}
90
91impl TaskHandle for std::convert::Infallible {
92 fn join(self) {}
93}
94
95#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
103#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
104pub struct TaskPoolTaskHandle {
105 handle: ptr::NonNull<libc::c_void>,
106 task_pool: Option<TaskPool>,
107}
108
109impl TaskHandle for TaskPoolTaskHandle {
110 #[doc(alias = "gst_task_pool_join")]
111 fn join(mut self) {
112 let task_pool = self.task_pool.take().unwrap();
113 unsafe { task_pool.join(self.handle) }
114 }
115}
116
117impl Drop for TaskPoolTaskHandle {
118 #[doc(alias = "gst_task_pool_dispose_handle")]
119 #[inline]
120 fn drop(&mut self) {
121 if let Some(task_pool) = self.task_pool.take() {
122 cfg_if::cfg_if! {
123 if #[cfg(feature = "v1_20")] {
124 unsafe { task_pool.dispose_handle(self.handle) }
125 } else {
126 crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
127 }
128 }
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::sync::mpsc::{RecvError, channel};
136
137 use super::*;
138 use crate::prelude::*;
139
140 #[test]
141 fn test_simple() {
142 crate::init().unwrap();
143 let pool = TaskPool::new();
144 pool.prepare().unwrap();
145
146 let (sender, receiver) = channel();
147
148 let handle = pool
149 .push(move || {
150 sender.send(()).unwrap();
151 })
152 .unwrap();
153
154 assert_eq!(receiver.recv(), Ok(()));
155
156 if let Some(handle) = handle {
157 handle.join();
158 }
159
160 assert_eq!(receiver.recv(), Err(RecvError));
163
164 pool.cleanup();
165 }
166}