gstreamer/
task_pool.rs
1use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 let func = Box::from_raw(data as *mut P);
11 func()
12}
13mod sealed {
14 pub trait Sealed {}
15 impl<T: super::IsA<super::TaskPool>> Sealed for T {}
16}
17
18pub trait TaskPoolExtManual: sealed::Sealed + IsA<TaskPool> + 'static {
19 #[doc(alias = "gst_task_pool_push")]
31 fn push<P: FnOnce() + Send + 'static>(
32 &self,
33 func: P,
34 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
35 unsafe {
36 let mut error = ptr::null_mut();
37 let func: Box<P> = Box::new(func);
38 let func = Box::into_raw(func);
39
40 let handle = ffi::gst_task_pool_push(
41 self.as_ref().to_glib_none().0,
42 Some(task_pool_trampoline::<P>),
43 func as gpointer,
44 &mut error,
45 );
46
47 if !error.is_null() {
48 debug_assert!(handle.is_null());
49
50 drop(Box::from_raw(func));
53
54 return Err(from_glib_full(error));
55 }
56
57 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
58 handle,
59 task_pool: Some(self.as_ref().clone()),
60 });
61
62 Ok(handle)
63 }
64 }
65}
66
67impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
68
69impl TaskPool {
70 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
71 ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
72 }
73
74 #[cfg(feature = "v1_20")]
75 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
76 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
77 ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
78 }
79}
80
81pub trait TaskHandle {
84 fn join(self);
87}
88
89impl TaskHandle for () {
90 fn join(self) {}
91}
92
93impl TaskHandle for std::convert::Infallible {
94 fn join(self) {}
95}
96
97#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
105#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
106pub struct TaskPoolTaskHandle {
107 handle: ptr::NonNull<libc::c_void>,
108 task_pool: Option<TaskPool>,
109}
110
111impl TaskHandle for TaskPoolTaskHandle {
112 #[doc(alias = "gst_task_pool_join")]
113 fn join(mut self) {
114 let task_pool = self.task_pool.take().unwrap();
115 unsafe { task_pool.join(self.handle) }
116 }
117}
118
119impl Drop for TaskPoolTaskHandle {
120 #[doc(alias = "gst_task_pool_dispose_handle")]
121 #[inline]
122 fn drop(&mut self) {
123 if let Some(task_pool) = self.task_pool.take() {
124 cfg_if::cfg_if! {
125 if #[cfg(feature = "v1_20")] {
126 unsafe { task_pool.dispose_handle(self.handle) }
127 } else {
128 crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
129 }
130 }
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use std::sync::mpsc::{channel, RecvError};
138
139 use super::*;
140 use crate::prelude::*;
141
142 #[test]
143 fn test_simple() {
144 crate::init().unwrap();
145 let pool = TaskPool::new();
146 pool.prepare().unwrap();
147
148 let (sender, receiver) = channel();
149
150 let handle = pool
151 .push(move || {
152 sender.send(()).unwrap();
153 })
154 .unwrap();
155
156 assert_eq!(receiver.recv(), Ok(()));
157
158 if let Some(handle) = handle {
159 handle.join();
160 }
161
162 assert_eq!(receiver.recv(), Err(RecvError));
165
166 pool.cleanup();
167 }
168}