gstreamer/
task.rs
1use std::{mem, ptr, sync::Arc};
4
5use glib::{prelude::*, translate::*};
6
7use crate::{ffi, Task};
8
9#[allow(clippy::type_complexity)]
10pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
11 func: Box<(F, *mut ffi::GstTask)>,
12 lock: Option<TaskLock>,
13 enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
14 leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
15}
16
17impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
18 #[doc(alias = "gst_task_set_enter_callback")]
19 pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
20 Self {
21 enter_callback: Some(Box::new(enter_callback)),
22 ..self
23 }
24 }
25
26 #[doc(alias = "gst_task_set_enter_callback")]
27 pub fn enter_callback_if<E: FnMut(&Task) + Send + 'static>(
28 self,
29 enter_callback: E,
30 predicate: bool,
31 ) -> Self {
32 if predicate {
33 self.enter_callback(enter_callback)
34 } else {
35 self
36 }
37 }
38
39 #[doc(alias = "gst_task_set_enter_callback")]
40 pub fn enter_callback_if_some<E: FnMut(&Task) + Send + 'static>(
41 self,
42 enter_callback: Option<E>,
43 ) -> Self {
44 if let Some(enter_callback) = enter_callback {
45 self.enter_callback(enter_callback)
46 } else {
47 self
48 }
49 }
50
51 #[doc(alias = "gst_task_set_leave_callback")]
52 pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
53 Self {
54 leave_callback: Some(Box::new(leave_callback)),
55 ..self
56 }
57 }
58
59 #[doc(alias = "gst_task_set_leave_callback")]
60 pub fn leave_callback_if<E: FnMut(&Task) + Send + 'static>(
61 self,
62 leave_callback: E,
63 predicate: bool,
64 ) -> Self {
65 if predicate {
66 self.leave_callback(leave_callback)
67 } else {
68 self
69 }
70 }
71
72 #[doc(alias = "gst_task_set_leave_callback")]
73 pub fn leave_callback_if_some<E: FnMut(&Task) + Send + 'static>(
74 self,
75 leave_callback: Option<E>,
76 ) -> Self {
77 if let Some(leave_callback) = leave_callback {
78 self.leave_callback(leave_callback)
79 } else {
80 self
81 }
82 }
83
84 #[doc(alias = "gst_task_set_lock")]
85 pub fn lock(self, lock: &TaskLock) -> Self {
86 Self {
87 lock: Some(lock.clone()),
88 ..self
89 }
90 }
91
92 #[doc(alias = "gst_task_set_lock")]
93 pub fn lock_if(self, lock: &TaskLock, predicate: bool) -> Self {
94 if predicate {
95 self.lock(lock)
96 } else {
97 self
98 }
99 }
100
101 #[doc(alias = "gst_task_set_lock")]
102 pub fn lock_if_some(self, lock: Option<&TaskLock>) -> Self {
103 if let Some(lock) = lock {
104 self.lock(lock)
105 } else {
106 self
107 }
108 }
109
110 #[doc(alias = "gst_task_new")]
111 pub fn build(self) -> Task {
112 unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
113 user_data: glib::ffi::gpointer,
114 ) {
115 let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
116 (callback.0)(&from_glib_borrow(callback.1));
117 }
118
119 unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
120 data: glib::ffi::gpointer,
121 ) {
122 let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
123 }
124
125 unsafe extern "C" fn callback_trampoline(
126 task: *mut ffi::GstTask,
127 _thread: *mut glib::ffi::GThread,
128 data: glib::ffi::gpointer,
129 ) {
130 let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
131 callback(&from_glib_borrow(task));
132 }
133
134 #[allow(clippy::type_complexity)]
135 unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
136 let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
137 Box::from_raw(data as *mut _);
138 }
139
140 unsafe {
141 let func_ptr = Box::into_raw(self.func);
142
143 let task: Task = from_glib_full(ffi::gst_task_new(
144 Some(func_trampoline::<F> as _),
145 func_ptr as *mut _,
146 Some(destroy_func::<F> as _),
147 ));
148
149 (*func_ptr).1 = task.to_glib_none().0;
150
151 let lock = self.lock.unwrap_or_default();
152 ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0));
153 task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
154
155 if let Some(enter_callback) = self.enter_callback {
156 ffi::gst_task_set_enter_callback(
157 task.to_glib_none().0,
158 Some(callback_trampoline),
159 Box::into_raw(Box::new(enter_callback)) as *mut _,
160 Some(destroy_callback),
161 );
162 }
163
164 if let Some(leave_callback) = self.leave_callback {
165 ffi::gst_task_set_leave_callback(
166 task.to_glib_none().0,
167 Some(callback_trampoline),
168 Box::into_raw(Box::new(leave_callback)) as *mut _,
169 Some(destroy_callback),
170 );
171 }
172
173 task
174 }
175 }
176}
177
178impl Task {
179 #[doc(alias = "gst_task_new")]
180 pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
181 assert_initialized_main_thread!();
182 TaskBuilder {
183 func: Box::new((func, ptr::null_mut())),
184 lock: None,
185 enter_callback: None,
186 leave_callback: None,
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct TaskLock(Arc<RecMutex>);
193
194impl Default for TaskLock {
195 #[inline]
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201#[derive(Debug)]
202struct RecMutex(glib::ffi::GRecMutex);
203
204unsafe impl Send for RecMutex {}
205unsafe impl Sync for RecMutex {}
206
207#[must_use = "if unused the TaskLock will immediately unlock"]
208pub struct TaskLockGuard<'a>(&'a RecMutex);
209
210impl TaskLock {
211 #[inline]
212 pub fn new() -> Self {
213 unsafe {
214 let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
215 glib::ffi::g_rec_mutex_init(mut_override(&lock.0 .0));
216 lock
217 }
218 }
219
220 #[inline]
222 pub fn lock(&self) -> TaskLockGuard {
223 unsafe {
224 let guard = TaskLockGuard(&self.0);
225 glib::ffi::g_rec_mutex_lock(mut_override(&self.0 .0));
226 guard
227 }
228 }
229}
230
231impl Drop for RecMutex {
232 #[inline]
233 fn drop(&mut self) {
234 unsafe {
235 glib::ffi::g_rec_mutex_clear(&mut self.0);
236 }
237 }
238}
239
240impl Drop for TaskLockGuard<'_> {
241 #[inline]
242 fn drop(&mut self) {
243 unsafe {
244 glib::ffi::g_rec_mutex_unlock(mut_override(&self.0 .0));
245 }
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use std::sync::mpsc::channel;
252
253 use super::*;
254 use crate::prelude::*;
255
256 #[test]
257 fn test_simple() {
258 crate::init().unwrap();
259
260 #[derive(Debug, PartialEq, Eq)]
261 enum Called {
262 Enter,
263 Func,
264 Leave,
265 }
266
267 let (send, recv) = channel();
268 let lock = TaskLock::new();
269
270 let task = Task::builder({
271 let send = send.clone();
272 let mut count = 0;
273 move |task| {
274 count += 1;
275 if count >= 3 {
276 task.pause().unwrap();
277 }
278 send.send(Called::Func).unwrap();
279 }
280 })
281 .enter_callback({
282 let send = send.clone();
283 move |_task| {
284 send.send(Called::Enter).unwrap();
285 }
286 })
287 .leave_callback({
288 move |_task| {
289 send.send(Called::Leave).unwrap();
290 }
291 })
292 .lock(&lock)
293 .build();
294
295 task.start().unwrap();
296
297 assert_eq!(recv.recv(), Ok(Called::Enter));
298 assert_eq!(recv.recv(), Ok(Called::Func));
299 assert_eq!(recv.recv(), Ok(Called::Func));
300 assert_eq!(recv.recv(), Ok(Called::Func));
301
302 assert_eq!(task.state(), crate::TaskState::Paused);
303 task.stop().unwrap();
304 assert_eq!(recv.recv(), Ok(Called::Leave));
305 task.join().unwrap();
306 }
307}