gstreamer/subclass/
task_pool.rs
1use std::{
4 hash::{Hash, Hasher},
5 ptr,
6 sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, prelude::*, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{ffi, TaskHandle, TaskPool};
13
14pub trait TaskPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<TaskPool>> {
15 type Handle: TaskHandle;
22
23 fn prepare(&self) -> Result<(), glib::Error> {
32 Ok(())
33 }
34
35 fn cleanup(&self) {}
46
47 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
66}
67
68unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
69 fn class_init(klass: &mut glib::Class<Self>) {
70 Self::parent_class_init::<T>(klass);
71 let klass = klass.as_mut();
72 klass.prepare = Some(task_pool_prepare::<T>);
73 klass.cleanup = Some(task_pool_cleanup::<T>);
74 klass.push = Some(task_pool_push::<T>);
75 klass.join = Some(task_pool_join::<T>);
76
77 #[cfg(feature = "v1_20")]
78 {
79 klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
80 }
81 }
82}
83
84unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
85 ptr: *mut ffi::GstTaskPool,
86 error: *mut *mut glib::ffi::GError,
87) {
88 let instance = &*(ptr as *mut T::Instance);
89 let imp = instance.imp();
90
91 match imp.prepare() {
92 Ok(()) => {}
93 Err(err) => {
94 if !error.is_null() {
95 *error = err.into_glib_ptr();
96 }
97 }
98 }
99}
100
101unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
102 let instance = &*(ptr as *mut T::Instance);
103 let imp = instance.imp();
104
105 imp.cleanup();
106}
107
108unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
109 ptr: *mut ffi::GstTaskPool,
110 func: ffi::GstTaskPoolFunction,
111 user_data: gpointer,
112 error: *mut *mut glib::ffi::GError,
113) -> gpointer {
114 let instance = &*(ptr as *mut T::Instance);
115 let imp = instance.imp();
116
117 let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
118
119 match imp.push(func.clone()) {
120 Ok(None) => ptr::null_mut(),
121 Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
122 Err(err) => {
123 func.prevent_call();
124 if !error.is_null() {
125 *error = err.into_glib_ptr();
126 }
127 ptr::null_mut()
128 }
129 }
130}
131
132unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
133 if id.is_null() {
134 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
135 crate::warning!(
136 crate::CAT_RUST,
137 obj = wrap.as_ref(),
138 "Tried to join null handle"
139 );
140 return;
141 }
142
143 let handle = Box::from_raw(id as *mut T::Handle);
144 handle.join();
145}
146
147#[cfg(feature = "v1_20")]
148#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
149unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
150 ptr: *mut ffi::GstTaskPool,
151 id: gpointer,
152) {
153 if id.is_null() {
154 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
155 crate::warning!(
156 crate::CAT_RUST,
157 obj = wrap.as_ref(),
158 "Tried to dispose null handle"
159 );
160 return;
161 }
162
163 let handle = Box::from_raw(id as *mut T::Handle);
164 drop(handle);
165}
166
167#[derive(Debug)]
170pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
171
172#[derive(Debug)]
176struct TaskPoolFunctionInner {
177 func: unsafe extern "C" fn(gpointer),
178 user_data: gpointer,
179 warn_on_drop: bool,
180}
181
182unsafe impl Send for TaskPoolFunctionInner {}
183
184impl TaskPoolFunction {
185 fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
186 let inner = TaskPoolFunctionInner {
187 func,
188 user_data,
189 warn_on_drop: true,
190 };
191 Self(Arc::new(Mutex::new(Some(inner))))
192 }
193
194 #[inline]
195 fn clone(&self) -> Self {
196 Self(self.0.clone())
197 }
198
199 pub fn call(self) {
202 let mut inner = self
203 .0
204 .lock()
205 .unwrap()
206 .take()
207 .expect("TaskPoolFunction has already been dropped");
208 inner.warn_on_drop = false;
209 unsafe { (inner.func)(inner.user_data) }
210 }
211
212 fn prevent_call(self) {
213 let mut inner = self
214 .0
215 .lock()
216 .unwrap()
217 .take()
218 .expect("TaskPoolFunction has already been called");
219 inner.warn_on_drop = false;
220 drop(inner);
221 }
222
223 #[inline]
224 fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
225 Arc::as_ptr(&self.0)
226 }
227}
228
229impl Drop for TaskPoolFunctionInner {
230 fn drop(&mut self) {
231 if self.warn_on_drop {
232 crate::warning!(crate::CAT_RUST, "Leaked task function");
233 }
234 }
235}
236
237impl PartialEq for TaskPoolFunction {
238 fn eq(&self, other: &Self) -> bool {
239 self.as_ptr() == other.as_ptr()
240 }
241}
242
243impl Eq for TaskPoolFunction {}
244
245impl PartialOrd for TaskPoolFunction {
246 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
247 Some(self.cmp(other))
248 }
249}
250
251impl Ord for TaskPoolFunction {
252 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
253 self.as_ptr().cmp(&other.as_ptr())
254 }
255}
256
257impl Hash for TaskPoolFunction {
258 fn hash<H: Hasher>(&self, state: &mut H) {
259 self.as_ptr().hash(state)
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use std::{
266 sync::{
267 atomic,
268 mpsc::{channel, TryRecvError},
269 },
270 thread,
271 };
272
273 use super::*;
274 use crate::prelude::*;
275
276 pub mod imp {
277 use super::*;
278
279 #[derive(Default)]
280 pub struct TestPool {
281 pub(super) prepared: atomic::AtomicBool,
282 pub(super) cleaned_up: atomic::AtomicBool,
283 }
284
285 #[glib::object_subclass]
286 impl ObjectSubclass for TestPool {
287 const NAME: &'static str = "TestPool";
288 type Type = super::TestPool;
289 type ParentType = TaskPool;
290 }
291
292 impl ObjectImpl for TestPool {}
293
294 impl GstObjectImpl for TestPool {}
295
296 impl TaskPoolImpl for TestPool {
297 type Handle = TestHandle;
298
299 fn prepare(&self) -> Result<(), glib::Error> {
300 self.prepared.store(true, atomic::Ordering::SeqCst);
301 Ok(())
302 }
303
304 fn cleanup(&self) {
305 self.cleaned_up.store(true, atomic::Ordering::SeqCst);
306 }
307
308 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
309 let handle = thread::spawn(move || func.call());
310 Ok(Some(TestHandle(handle)))
311 }
312 }
313
314 pub struct TestHandle(thread::JoinHandle<()>);
315
316 impl TaskHandle for TestHandle {
317 fn join(self) {
318 self.0.join().unwrap();
319 }
320 }
321 }
322
323 glib::wrapper! {
324 pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
325 }
326
327 unsafe impl Send for TestPool {}
328 unsafe impl Sync for TestPool {}
329
330 impl TestPool {
331 pub fn new() -> Self {
332 Self::default()
333 }
334 }
335
336 impl Default for TestPool {
337 fn default() -> Self {
338 glib::Object::new()
339 }
340 }
341
342 #[test]
343 fn test_simple_subclass() {
344 crate::init().unwrap();
345
346 let pool = TestPool::new();
347 pool.prepare().unwrap();
348
349 let (sender, receiver) = channel();
350
351 let handle = pool
352 .push(move || {
353 sender.send(()).unwrap();
354 })
355 .unwrap();
356 let handle = handle.unwrap();
357
358 assert_eq!(receiver.recv(), Ok(()));
359
360 handle.join();
361 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
362
363 pool.cleanup();
364
365 let imp = pool.imp();
366 assert!(imp.prepared.load(atomic::Ordering::SeqCst));
367 assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
368 }
369}