Skip to main content

gstreamer/
task.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{mem, ptr, sync::Arc};
4
5use glib::{prelude::*, translate::*};
6
7use crate::{Task, ffi};
8
9#[allow(clippy::type_complexity)]
10pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
11    func: Box<(F, *mut ffi::GstTask)>,
12    lock: Option<TaskLock>,
13    enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
14    leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
15}
16
17impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
18    #[doc(alias = "gst_task_set_enter_callback")]
19    pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
20        Self {
21            enter_callback: Some(Box::new(enter_callback)),
22            ..self
23        }
24    }
25
26    #[doc(alias = "gst_task_set_enter_callback")]
27    pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>(
28        self,
29        enter_callback: E,
30        predicate: bool,
31    ) -> Self {
32        if predicate {
33            self.enter_callback(enter_callback)
34        } else {
35            self
36        }
37    }
38
39    #[doc(alias = "gst_task_set_enter_callback")]
40    pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>(
41        self,
42        enter_callback: Option<E>,
43    ) -> Self {
44        if let Some(enter_callback) = enter_callback {
45            self.enter_callback(enter_callback)
46        } else {
47            self
48        }
49    }
50
51    #[doc(alias = "gst_task_set_leave_callback")]
52    pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
53        Self {
54            leave_callback: Some(Box::new(leave_callback)),
55            ..self
56        }
57    }
58
59    #[doc(alias = "gst_task_set_leave_callback")]
60    pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>(
61        self,
62        leave_callback: E,
63        predicate: bool,
64    ) -> Self {
65        if predicate {
66            self.leave_callback(leave_callback)
67        } else {
68            self
69        }
70    }
71
72    #[doc(alias = "gst_task_set_leave_callback")]
73    pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>(
74        self,
75        leave_callback: Option<E>,
76    ) -> Self {
77        if let Some(leave_callback) = leave_callback {
78            self.leave_callback(leave_callback)
79        } else {
80            self
81        }
82    }
83
84    #[doc(alias = "gst_task_set_lock")]
85    pub fn lock(self, lock: &TaskLock) -> Self {
86        Self {
87            lock: Some(lock.clone()),
88            ..self
89        }
90    }
91
92    #[doc(alias = "gst_task_set_lock")]
93    pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self {
94        if predicate { self.lock(lock) } else { self }
95    }
96
97    #[doc(alias = "gst_task_set_lock")]
98    pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self {
99        if let Some(lock) = lock {
100            self.lock(lock)
101        } else {
102            self
103        }
104    }
105
106    #[doc(alias = "gst_task_new")]
107    pub fn build(self) -> Task {
108        unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
109            user_data: glib::ffi::gpointer,
110        ) {
111            unsafe {
112                let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
113                (callback.0)(&from_glib_borrow(callback.1));
114            }
115        }
116
117        unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
118            data: glib::ffi::gpointer,
119        ) {
120            unsafe {
121                let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
122            }
123        }
124
125        unsafe extern "C" fn callback_trampoline(
126            task: *mut ffi::GstTask,
127            _thread: *mut glib::ffi::GThread,
128            data: glib::ffi::gpointer,
129        ) {
130            unsafe {
131                let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
132                callback(&from_glib_borrow(task));
133            }
134        }
135
136        #[allow(clippy::type_complexity)]
137        unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
138            unsafe {
139                let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
140                    Box::from_raw(data as *mut _);
141            }
142        }
143
144        unsafe {
145            let func_ptr = Box::into_raw(self.func);
146
147            let task: Task = from_glib_full(ffi::gst_task_new(
148                Some(func_trampoline::<F> as _),
149                func_ptr as *mut _,
150                Some(destroy_func::<F> as _),
151            ));
152
153            (*func_ptr).1 = task.to_glib_none().0;
154
155            let lock = self.lock.unwrap_or_default();
156            ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0.0));
157            task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
158
159            if let Some(enter_callback) = self.enter_callback {
160                ffi::gst_task_set_enter_callback(
161                    task.to_glib_none().0,
162                    Some(callback_trampoline),
163                    Box::into_raw(Box::new(enter_callback)) as *mut _,
164                    Some(destroy_callback),
165                );
166            }
167
168            if let Some(leave_callback) = self.leave_callback {
169                ffi::gst_task_set_leave_callback(
170                    task.to_glib_none().0,
171                    Some(callback_trampoline),
172                    Box::into_raw(Box::new(leave_callback)) as *mut _,
173                    Some(destroy_callback),
174                );
175            }
176
177            task
178        }
179    }
180}
181
182impl Task {
183    #[doc(alias = "gst_task_new")]
184    pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
185        assert_initialized_main_thread!();
186        TaskBuilder {
187            func: Box::new((func, ptr::null_mut())),
188            lock: None,
189            enter_callback: None,
190            leave_callback: None,
191        }
192    }
193}
194
195#[derive(Debug, Clone)]
196pub struct TaskLock(Arc<RecMutex>);
197
198impl Default for TaskLock {
199    #[inline]
200    fn default() -> Self {
201        Self::new()
202    }
203}
204
205#[derive(Debug)]
206struct RecMutex(glib::ffi::GRecMutex);
207
208unsafe impl Send for RecMutex {}
209unsafe impl Sync for RecMutex {}
210
211#[must_use = "if unused the TaskLock will immediately unlock"]
212pub struct TaskLockGuard<'a>(&'a RecMutex);
213
214impl TaskLock {
215    #[inline]
216    pub fn new() -> Self {
217        unsafe {
218            let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
219            glib::ffi::g_rec_mutex_init(mut_override(&lock.0.0));
220            lock
221        }
222    }
223
224    // checker-ignore-item
225    #[inline]
226    pub fn lock(&self) -> TaskLockGuard<'_> {
227        unsafe {
228            let guard = TaskLockGuard(&self.0);
229            glib::ffi::g_rec_mutex_lock(mut_override(&self.0.0));
230            guard
231        }
232    }
233}
234
235impl Drop for RecMutex {
236    #[inline]
237    fn drop(&mut self) {
238        unsafe {
239            glib::ffi::g_rec_mutex_clear(&mut self.0);
240        }
241    }
242}
243
244impl Drop for TaskLockGuard<'_> {
245    #[inline]
246    fn drop(&mut self) {
247        unsafe {
248            glib::ffi::g_rec_mutex_unlock(mut_override(&self.0.0));
249        }
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use std::sync::mpsc::channel;
256
257    use super::*;
258    use crate::prelude::*;
259
260    #[test]
261    fn test_simple() {
262        crate::init().unwrap();
263
264        #[derive(Debug, PartialEq, Eq)]
265        enum Called {
266            Enter,
267            Func,
268            Leave,
269        }
270
271        let (send, recv) = channel();
272        let lock = TaskLock::new();
273
274        let task = Task::builder({
275            let send = send.clone();
276            let mut count = 0;
277            move |task| {
278                count += 1;
279                if count >= 3 {
280                    task.pause().unwrap();
281                }
282                send.send(Called::Func).unwrap();
283            }
284        })
285        .enter_callback({
286            let send = send.clone();
287            move |_task| {
288                send.send(Called::Enter).unwrap();
289            }
290        })
291        .leave_callback({
292            move |_task| {
293                send.send(Called::Leave).unwrap();
294            }
295        })
296        .lock(&lock)
297        .build();
298
299        task.start().unwrap();
300
301        assert_eq!(recv.recv(), Ok(Called::Enter));
302        assert_eq!(recv.recv(), Ok(Called::Func));
303        assert_eq!(recv.recv(), Ok(Called::Func));
304        assert_eq!(recv.recv(), Ok(Called::Func));
305
306        assert_eq!(task.state(), crate::TaskState::Paused);
307        task.stop().unwrap();
308        assert_eq!(recv.recv(), Ok(Called::Leave));
309        task.join().unwrap();
310    }
311}