1use std::{mem, ptr, sync::Arc};
4
5use glib::{prelude::*, translate::*};
6
7use crate::{Task, ffi};
8
9#[allow(clippy::type_complexity)]
10pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
11 func: Box<(F, *mut ffi::GstTask)>,
12 lock: Option<TaskLock>,
13 enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
14 leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
15}
16
17impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
18 #[doc(alias = "gst_task_set_enter_callback")]
19 pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
20 Self {
21 enter_callback: Some(Box::new(enter_callback)),
22 ..self
23 }
24 }
25
26 #[doc(alias = "gst_task_set_enter_callback")]
27 pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>(
28 self,
29 enter_callback: E,
30 predicate: bool,
31 ) -> Self {
32 if predicate {
33 self.enter_callback(enter_callback)
34 } else {
35 self
36 }
37 }
38
39 #[doc(alias = "gst_task_set_enter_callback")]
40 pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>(
41 self,
42 enter_callback: Option<E>,
43 ) -> Self {
44 if let Some(enter_callback) = enter_callback {
45 self.enter_callback(enter_callback)
46 } else {
47 self
48 }
49 }
50
51 #[doc(alias = "gst_task_set_leave_callback")]
52 pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
53 Self {
54 leave_callback: Some(Box::new(leave_callback)),
55 ..self
56 }
57 }
58
59 #[doc(alias = "gst_task_set_leave_callback")]
60 pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>(
61 self,
62 leave_callback: E,
63 predicate: bool,
64 ) -> Self {
65 if predicate {
66 self.leave_callback(leave_callback)
67 } else {
68 self
69 }
70 }
71
72 #[doc(alias = "gst_task_set_leave_callback")]
73 pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>(
74 self,
75 leave_callback: Option<E>,
76 ) -> Self {
77 if let Some(leave_callback) = leave_callback {
78 self.leave_callback(leave_callback)
79 } else {
80 self
81 }
82 }
83
84 #[doc(alias = "gst_task_set_lock")]
85 pub fn lock(self, lock: &TaskLock) -> Self {
86 Self {
87 lock: Some(lock.clone()),
88 ..self
89 }
90 }
91
92 #[doc(alias = "gst_task_set_lock")]
93 pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self {
94 if predicate { self.lock(lock) } else { self }
95 }
96
97 #[doc(alias = "gst_task_set_lock")]
98 pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self {
99 if let Some(lock) = lock {
100 self.lock(lock)
101 } else {
102 self
103 }
104 }
105
106 #[doc(alias = "gst_task_new")]
107 pub fn build(self) -> Task {
108 unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
109 user_data: glib::ffi::gpointer,
110 ) {
111 unsafe {
112 let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
113 (callback.0)(&from_glib_borrow(callback.1));
114 }
115 }
116
117 unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
118 data: glib::ffi::gpointer,
119 ) {
120 unsafe {
121 let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
122 }
123 }
124
125 unsafe extern "C" fn callback_trampoline(
126 task: *mut ffi::GstTask,
127 _thread: *mut glib::ffi::GThread,
128 data: glib::ffi::gpointer,
129 ) {
130 unsafe {
131 let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
132 callback(&from_glib_borrow(task));
133 }
134 }
135
136 #[allow(clippy::type_complexity)]
137 unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
138 unsafe {
139 let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
140 Box::from_raw(data as *mut _);
141 }
142 }
143
144 unsafe {
145 let func_ptr = Box::into_raw(self.func);
146
147 let task: Task = from_glib_full(ffi::gst_task_new(
148 Some(func_trampoline::<F> as _),
149 func_ptr as *mut _,
150 Some(destroy_func::<F> as _),
151 ));
152
153 (*func_ptr).1 = task.to_glib_none().0;
154
155 let lock = self.lock.unwrap_or_default();
156 ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0.0));
157 task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
158
159 if let Some(enter_callback) = self.enter_callback {
160 ffi::gst_task_set_enter_callback(
161 task.to_glib_none().0,
162 Some(callback_trampoline),
163 Box::into_raw(Box::new(enter_callback)) as *mut _,
164 Some(destroy_callback),
165 );
166 }
167
168 if let Some(leave_callback) = self.leave_callback {
169 ffi::gst_task_set_leave_callback(
170 task.to_glib_none().0,
171 Some(callback_trampoline),
172 Box::into_raw(Box::new(leave_callback)) as *mut _,
173 Some(destroy_callback),
174 );
175 }
176
177 task
178 }
179 }
180}
181
182impl Task {
183 #[doc(alias = "gst_task_new")]
184 pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
185 assert_initialized_main_thread!();
186 TaskBuilder {
187 func: Box::new((func, ptr::null_mut())),
188 lock: None,
189 enter_callback: None,
190 leave_callback: None,
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
196pub struct TaskLock(Arc<RecMutex>);
197
198impl Default for TaskLock {
199 #[inline]
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205#[derive(Debug)]
206struct RecMutex(glib::ffi::GRecMutex);
207
208unsafe impl Send for RecMutex {}
209unsafe impl Sync for RecMutex {}
210
211#[must_use = "if unused the TaskLock will immediately unlock"]
212pub struct TaskLockGuard<'a>(&'a RecMutex);
213
214impl TaskLock {
215 #[inline]
216 pub fn new() -> Self {
217 unsafe {
218 let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
219 glib::ffi::g_rec_mutex_init(mut_override(&lock.0.0));
220 lock
221 }
222 }
223
224 #[inline]
226 pub fn lock(&self) -> TaskLockGuard<'_> {
227 unsafe {
228 let guard = TaskLockGuard(&self.0);
229 glib::ffi::g_rec_mutex_lock(mut_override(&self.0.0));
230 guard
231 }
232 }
233}
234
235impl Drop for RecMutex {
236 #[inline]
237 fn drop(&mut self) {
238 unsafe {
239 glib::ffi::g_rec_mutex_clear(&mut self.0);
240 }
241 }
242}
243
244impl Drop for TaskLockGuard<'_> {
245 #[inline]
246 fn drop(&mut self) {
247 unsafe {
248 glib::ffi::g_rec_mutex_unlock(mut_override(&self.0.0));
249 }
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use std::sync::mpsc::channel;
256
257 use super::*;
258 use crate::prelude::*;
259
260 #[test]
261 fn test_simple() {
262 crate::init().unwrap();
263
264 #[derive(Debug, PartialEq, Eq)]
265 enum Called {
266 Enter,
267 Func,
268 Leave,
269 }
270
271 let (send, recv) = channel();
272 let lock = TaskLock::new();
273
274 let task = Task::builder({
275 let send = send.clone();
276 let mut count = 0;
277 move |task| {
278 count += 1;
279 if count >= 3 {
280 task.pause().unwrap();
281 }
282 send.send(Called::Func).unwrap();
283 }
284 })
285 .enter_callback({
286 let send = send.clone();
287 move |_task| {
288 send.send(Called::Enter).unwrap();
289 }
290 })
291 .leave_callback({
292 move |_task| {
293 send.send(Called::Leave).unwrap();
294 }
295 })
296 .lock(&lock)
297 .build();
298
299 task.start().unwrap();
300
301 assert_eq!(recv.recv(), Ok(Called::Enter));
302 assert_eq!(recv.recv(), Ok(Called::Func));
303 assert_eq!(recv.recv(), Ok(Called::Func));
304 assert_eq!(recv.recv(), Ok(Called::Func));
305
306 assert_eq!(task.state(), crate::TaskState::Paused);
307 task.stop().unwrap();
308 assert_eq!(recv.recv(), Ok(Called::Leave));
309 task.join().unwrap();
310 }
311}