gstreamer/
task_pool.rs
1use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 let func = Box::from_raw(data as *mut P);
11 func()
12}
13
14pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
15 #[doc(alias = "gst_task_pool_push")]
27 fn push<P: FnOnce() + Send + 'static>(
28 &self,
29 func: P,
30 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
31 unsafe {
32 let mut error = ptr::null_mut();
33 let func: Box<P> = Box::new(func);
34 let func = Box::into_raw(func);
35
36 let handle = ffi::gst_task_pool_push(
37 self.as_ref().to_glib_none().0,
38 Some(task_pool_trampoline::<P>),
39 func as gpointer,
40 &mut error,
41 );
42
43 if !error.is_null() {
44 debug_assert!(handle.is_null());
45
46 drop(Box::from_raw(func));
49
50 return Err(from_glib_full(error));
51 }
52
53 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
54 handle,
55 task_pool: Some(self.as_ref().clone()),
56 });
57
58 Ok(handle)
59 }
60 }
61}
62
63impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
64
65impl TaskPool {
66 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
67 ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
68 }
69
70 #[cfg(feature = "v1_20")]
71 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
72 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
73 ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
74 }
75}
76
77pub trait TaskHandle {
80 fn join(self);
83}
84
85impl TaskHandle for () {
86 fn join(self) {}
87}
88
89impl TaskHandle for std::convert::Infallible {
90 fn join(self) {}
91}
92
93#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
101#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
102pub struct TaskPoolTaskHandle {
103 handle: ptr::NonNull<libc::c_void>,
104 task_pool: Option<TaskPool>,
105}
106
107impl TaskHandle for TaskPoolTaskHandle {
108 #[doc(alias = "gst_task_pool_join")]
109 fn join(mut self) {
110 let task_pool = self.task_pool.take().unwrap();
111 unsafe { task_pool.join(self.handle) }
112 }
113}
114
115impl Drop for TaskPoolTaskHandle {
116 #[doc(alias = "gst_task_pool_dispose_handle")]
117 #[inline]
118 fn drop(&mut self) {
119 if let Some(task_pool) = self.task_pool.take() {
120 cfg_if::cfg_if! {
121 if #[cfg(feature = "v1_20")] {
122 unsafe { task_pool.dispose_handle(self.handle) }
123 } else {
124 crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
125 }
126 }
127 }
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use std::sync::mpsc::{channel, RecvError};
134
135 use super::*;
136 use crate::prelude::*;
137
138 #[test]
139 fn test_simple() {
140 crate::init().unwrap();
141 let pool = TaskPool::new();
142 pool.prepare().unwrap();
143
144 let (sender, receiver) = channel();
145
146 let handle = pool
147 .push(move || {
148 sender.send(()).unwrap();
149 })
150 .unwrap();
151
152 assert_eq!(receiver.recv(), Ok(()));
153
154 if let Some(handle) = handle {
155 handle.join();
156 }
157
158 assert_eq!(receiver.recv(), Err(RecvError));
161
162 pool.cleanup();
163 }
164}