gstreamer/subclass/
task_pool.rs1use std::{
4 hash::{Hash, Hasher},
5 ptr,
6 sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, prelude::*, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{TaskHandle, TaskPool, ffi};
13
14pub trait TaskPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<TaskPool>> {
15 type Handle: TaskHandle;
22
23 fn prepare(&self) -> Result<(), glib::Error> {
32 Ok(())
33 }
34
35 fn cleanup(&self) {}
46
47 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
66}
67
68unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
69 fn class_init(klass: &mut glib::Class<Self>) {
70 Self::parent_class_init::<T>(klass);
71 let klass = klass.as_mut();
72 klass.prepare = Some(task_pool_prepare::<T>);
73 klass.cleanup = Some(task_pool_cleanup::<T>);
74 klass.push = Some(task_pool_push::<T>);
75 klass.join = Some(task_pool_join::<T>);
76
77 #[cfg(feature = "v1_20")]
78 {
79 klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
80 }
81 }
82}
83
84unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
85 ptr: *mut ffi::GstTaskPool,
86 error: *mut *mut glib::ffi::GError,
87) {
88 unsafe {
89 let instance = &*(ptr as *mut T::Instance);
90 let imp = instance.imp();
91
92 match imp.prepare() {
93 Ok(()) => {}
94 Err(err) => {
95 if !error.is_null() {
96 *error = err.into_glib_ptr();
97 }
98 }
99 }
100 }
101}
102
103unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
104 unsafe {
105 let instance = &*(ptr as *mut T::Instance);
106 let imp = instance.imp();
107
108 imp.cleanup();
109 }
110}
111
112unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
113 ptr: *mut ffi::GstTaskPool,
114 func: ffi::GstTaskPoolFunction,
115 user_data: gpointer,
116 error: *mut *mut glib::ffi::GError,
117) -> gpointer {
118 unsafe {
119 let instance = &*(ptr as *mut T::Instance);
120 let imp = instance.imp();
121
122 let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
123
124 match imp.push(func.clone()) {
125 Ok(None) => ptr::null_mut(),
126 Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
127 Err(err) => {
128 func.prevent_call();
129 if !error.is_null() {
130 *error = err.into_glib_ptr();
131 }
132 ptr::null_mut()
133 }
134 }
135 }
136}
137
138unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
139 unsafe {
140 if id.is_null() {
141 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
142 crate::warning!(
143 crate::CAT_RUST,
144 obj = wrap.as_ref(),
145 "Tried to join null handle"
146 );
147 return;
148 }
149
150 let handle = Box::from_raw(id as *mut T::Handle);
151 handle.join();
152 }
153}
154
155#[cfg(feature = "v1_20")]
156#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
157unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
158 ptr: *mut ffi::GstTaskPool,
159 id: gpointer,
160) {
161 unsafe {
162 if id.is_null() {
163 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
164 crate::warning!(
165 crate::CAT_RUST,
166 obj = wrap.as_ref(),
167 "Tried to dispose null handle"
168 );
169 return;
170 }
171
172 let handle = Box::from_raw(id as *mut T::Handle);
173 drop(handle);
174 }
175}
176
177#[derive(Debug)]
180pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
181
182#[derive(Debug)]
186struct TaskPoolFunctionInner {
187 func: unsafe extern "C" fn(gpointer),
188 user_data: gpointer,
189 warn_on_drop: bool,
190}
191
192unsafe impl Send for TaskPoolFunctionInner {}
193
194impl TaskPoolFunction {
195 fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
196 let inner = TaskPoolFunctionInner {
197 func,
198 user_data,
199 warn_on_drop: true,
200 };
201 Self(Arc::new(Mutex::new(Some(inner))))
202 }
203
204 #[inline]
205 fn clone(&self) -> Self {
206 Self(self.0.clone())
207 }
208
209 pub fn call(self) {
212 let mut inner = self
213 .0
214 .lock()
215 .unwrap()
216 .take()
217 .expect("TaskPoolFunction has already been dropped");
218 inner.warn_on_drop = false;
219 unsafe { (inner.func)(inner.user_data) }
220 }
221
222 fn prevent_call(self) {
223 let mut inner = self
224 .0
225 .lock()
226 .unwrap()
227 .take()
228 .expect("TaskPoolFunction has already been called");
229 inner.warn_on_drop = false;
230 drop(inner);
231 }
232
233 #[inline]
234 fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
235 Arc::as_ptr(&self.0)
236 }
237}
238
239impl Drop for TaskPoolFunctionInner {
240 fn drop(&mut self) {
241 if self.warn_on_drop {
242 crate::warning!(crate::CAT_RUST, "Leaked task function");
243 }
244 }
245}
246
247impl PartialEq for TaskPoolFunction {
248 fn eq(&self, other: &Self) -> bool {
249 self.as_ptr() == other.as_ptr()
250 }
251}
252
253impl Eq for TaskPoolFunction {}
254
255impl PartialOrd for TaskPoolFunction {
256 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
257 Some(self.cmp(other))
258 }
259}
260
261impl Ord for TaskPoolFunction {
262 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
263 self.as_ptr().cmp(&other.as_ptr())
264 }
265}
266
267impl Hash for TaskPoolFunction {
268 fn hash<H: Hasher>(&self, state: &mut H) {
269 self.as_ptr().hash(state)
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use std::{
276 sync::{
277 atomic,
278 mpsc::{TryRecvError, channel},
279 },
280 thread,
281 };
282
283 use super::*;
284 use crate::prelude::*;
285
286 pub mod imp {
287 use super::*;
288
289 #[derive(Default)]
290 pub struct TestPool {
291 pub(super) prepared: atomic::AtomicBool,
292 pub(super) cleaned_up: atomic::AtomicBool,
293 }
294
295 #[glib::object_subclass]
296 impl ObjectSubclass for TestPool {
297 const NAME: &'static str = "TestPool";
298 type Type = super::TestPool;
299 type ParentType = TaskPool;
300 }
301
302 impl ObjectImpl for TestPool {}
303
304 impl GstObjectImpl for TestPool {}
305
306 impl TaskPoolImpl for TestPool {
307 type Handle = TestHandle;
308
309 fn prepare(&self) -> Result<(), glib::Error> {
310 self.prepared.store(true, atomic::Ordering::SeqCst);
311 Ok(())
312 }
313
314 fn cleanup(&self) {
315 self.cleaned_up.store(true, atomic::Ordering::SeqCst);
316 }
317
318 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
319 let handle = thread::spawn(move || func.call());
320 Ok(Some(TestHandle(handle)))
321 }
322 }
323
324 pub struct TestHandle(thread::JoinHandle<()>);
325
326 impl TaskHandle for TestHandle {
327 fn join(self) {
328 self.0.join().unwrap();
329 }
330 }
331 }
332
333 glib::wrapper! {
334 pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
335 }
336
337 unsafe impl Send for TestPool {}
338 unsafe impl Sync for TestPool {}
339
340 impl TestPool {
341 pub fn new() -> Self {
342 Self::default()
343 }
344 }
345
346 impl Default for TestPool {
347 fn default() -> Self {
348 glib::Object::new()
349 }
350 }
351
352 #[test]
353 fn test_simple_subclass() {
354 crate::init().unwrap();
355
356 let pool = TestPool::new();
357 pool.prepare().unwrap();
358
359 let (sender, receiver) = channel();
360
361 let handle = pool
362 .push(move || {
363 sender.send(()).unwrap();
364 })
365 .unwrap();
366 let handle = handle.unwrap();
367
368 assert_eq!(receiver.recv(), Ok(()));
369
370 handle.join();
371 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
372
373 pool.cleanup();
374
375 let imp = pool.imp();
376 assert!(imp.prepared.load(atomic::Ordering::SeqCst));
377 assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
378 }
379}