gstreamer/
task.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{mem, ptr, sync::Arc};
4
5use glib::{prelude::*, translate::*};
6
7use crate::{ffi, Task};
8
9#[allow(clippy::type_complexity)]
10pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
11    func: Box<(F, *mut ffi::GstTask)>,
12    lock: Option<TaskLock>,
13    enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
14    leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
15}
16
17impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
18    #[doc(alias = "gst_task_set_enter_callback")]
19    pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
20        Self {
21            enter_callback: Some(Box::new(enter_callback)),
22            ..self
23        }
24    }
25
26    #[doc(alias = "gst_task_set_enter_callback")]
27    pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>(
28        self,
29        enter_callback: E,
30        predicate: bool,
31    ) -> Self {
32        if predicate {
33            self.enter_callback(enter_callback)
34        } else {
35            self
36        }
37    }
38
39    #[doc(alias = "gst_task_set_enter_callback")]
40    pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>(
41        self,
42        enter_callback: Option<E>,
43    ) -> Self {
44        if let Some(enter_callback) = enter_callback {
45            self.enter_callback(enter_callback)
46        } else {
47            self
48        }
49    }
50
51    #[doc(alias = "gst_task_set_leave_callback")]
52    pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
53        Self {
54            leave_callback: Some(Box::new(leave_callback)),
55            ..self
56        }
57    }
58
59    #[doc(alias = "gst_task_set_leave_callback")]
60    pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>(
61        self,
62        leave_callback: E,
63        predicate: bool,
64    ) -> Self {
65        if predicate {
66            self.leave_callback(leave_callback)
67        } else {
68            self
69        }
70    }
71
72    #[doc(alias = "gst_task_set_leave_callback")]
73    pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>(
74        self,
75        leave_callback: Option<E>,
76    ) -> Self {
77        if let Some(leave_callback) = leave_callback {
78            self.leave_callback(leave_callback)
79        } else {
80            self
81        }
82    }
83
84    #[doc(alias = "gst_task_set_lock")]
85    pub fn lock(self, lock: &TaskLock) -> Self {
86        Self {
87            lock: Some(lock.clone()),
88            ..self
89        }
90    }
91
92    #[doc(alias = "gst_task_set_lock")]
93    pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self {
94        if predicate {
95            self.lock(lock)
96        } else {
97            self
98        }
99    }
100
101    #[doc(alias = "gst_task_set_lock")]
102    pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self {
103        if let Some(lock) = lock {
104            self.lock(lock)
105        } else {
106            self
107        }
108    }
109
110    #[doc(alias = "gst_task_new")]
111    pub fn build(self) -> Task {
112        unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
113            user_data: glib::ffi::gpointer,
114        ) {
115            let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
116            (callback.0)(&from_glib_borrow(callback.1));
117        }
118
119        unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
120            data: glib::ffi::gpointer,
121        ) {
122            let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
123        }
124
125        unsafe extern "C" fn callback_trampoline(
126            task: *mut ffi::GstTask,
127            _thread: *mut glib::ffi::GThread,
128            data: glib::ffi::gpointer,
129        ) {
130            let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
131            callback(&from_glib_borrow(task));
132        }
133
134        #[allow(clippy::type_complexity)]
135        unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
136            let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
137                Box::from_raw(data as *mut _);
138        }
139
140        unsafe {
141            let func_ptr = Box::into_raw(self.func);
142
143            let task: Task = from_glib_full(ffi::gst_task_new(
144                Some(func_trampoline::<F> as _),
145                func_ptr as *mut _,
146                Some(destroy_func::<F> as _),
147            ));
148
149            (*func_ptr).1 = task.to_glib_none().0;
150
151            let lock = self.lock.unwrap_or_default();
152            ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0));
153            task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
154
155            if let Some(enter_callback) = self.enter_callback {
156                ffi::gst_task_set_enter_callback(
157                    task.to_glib_none().0,
158                    Some(callback_trampoline),
159                    Box::into_raw(Box::new(enter_callback)) as *mut _,
160                    Some(destroy_callback),
161                );
162            }
163
164            if let Some(leave_callback) = self.leave_callback {
165                ffi::gst_task_set_leave_callback(
166                    task.to_glib_none().0,
167                    Some(callback_trampoline),
168                    Box::into_raw(Box::new(leave_callback)) as *mut _,
169                    Some(destroy_callback),
170                );
171            }
172
173            task
174        }
175    }
176}
177
178impl Task {
179    #[doc(alias = "gst_task_new")]
180    pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
181        assert_initialized_main_thread!();
182        TaskBuilder {
183            func: Box::new((func, ptr::null_mut())),
184            lock: None,
185            enter_callback: None,
186            leave_callback: None,
187        }
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct TaskLock(Arc<RecMutex>);
193
194impl Default for TaskLock {
195    #[inline]
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201#[derive(Debug)]
202struct RecMutex(glib::ffi::GRecMutex);
203
204unsafe impl Send for RecMutex {}
205unsafe impl Sync for RecMutex {}
206
207#[must_use = "if unused the TaskLock will immediately unlock"]
208pub struct TaskLockGuard<'a>(&'a RecMutex);
209
210impl TaskLock {
211    #[inline]
212    pub fn new() -> Self {
213        unsafe {
214            let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
215            glib::ffi::g_rec_mutex_init(mut_override(&lock.0 .0));
216            lock
217        }
218    }
219
220    // checker-ignore-item
221    #[inline]
222    pub fn lock(&self) -> TaskLockGuard {
223        unsafe {
224            let guard = TaskLockGuard(&self.0);
225            glib::ffi::g_rec_mutex_lock(mut_override(&self.0 .0));
226            guard
227        }
228    }
229}
230
231impl Drop for RecMutex {
232    #[inline]
233    fn drop(&mut self) {
234        unsafe {
235            glib::ffi::g_rec_mutex_clear(&mut self.0);
236        }
237    }
238}
239
240impl Drop for TaskLockGuard<'_> {
241    #[inline]
242    fn drop(&mut self) {
243        unsafe {
244            glib::ffi::g_rec_mutex_unlock(mut_override(&self.0 .0));
245        }
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use std::sync::mpsc::channel;
252
253    use super::*;
254    use crate::prelude::*;
255
256    #[test]
257    fn test_simple() {
258        crate::init().unwrap();
259
260        #[derive(Debug, PartialEq, Eq)]
261        enum Called {
262            Enter,
263            Func,
264            Leave,
265        }
266
267        let (send, recv) = channel();
268        let lock = TaskLock::new();
269
270        let task = Task::builder({
271            let send = send.clone();
272            let mut count = 0;
273            move |task| {
274                count += 1;
275                if count >= 3 {
276                    task.pause().unwrap();
277                }
278                send.send(Called::Func).unwrap();
279            }
280        })
281        .enter_callback({
282            let send = send.clone();
283            move |_task| {
284                send.send(Called::Enter).unwrap();
285            }
286        })
287        .leave_callback({
288            move |_task| {
289                send.send(Called::Leave).unwrap();
290            }
291        })
292        .lock(&lock)
293        .build();
294
295        task.start().unwrap();
296
297        assert_eq!(recv.recv(), Ok(Called::Enter));
298        assert_eq!(recv.recv(), Ok(Called::Func));
299        assert_eq!(recv.recv(), Ok(Called::Func));
300        assert_eq!(recv.recv(), Ok(Called::Func));
301
302        assert_eq!(task.state(), crate::TaskState::Paused);
303        task.stop().unwrap();
304        assert_eq!(recv.recv(), Ok(Called::Leave));
305        task.join().unwrap();
306    }
307}