1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{StreamExt, stream::FusedStream};
14use glib::{
15 ControlFlow,
16 ffi::{gboolean, gpointer},
17 prelude::*,
18 source::Priority,
19 translate::*,
20};
21
22use crate::{Bus, BusSyncReply, Message, MessageType, ffi};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25 bus: *mut ffi::GstBus,
26 msg: *mut ffi::GstMessage,
27 func: gpointer,
28) -> gboolean {
29 unsafe {
30 let func: &mut F = &mut *(func as *mut F);
31 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
32 }
33}
34
35unsafe extern "C" fn destroy_closure_watch<
36 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
37>(
38 ptr: gpointer,
39) {
40 unsafe {
41 let _ = Box::<F>::from_raw(ptr as *mut _);
42 }
43}
44
45fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
46 #[allow(clippy::type_complexity)]
47 let func: Box<F> = Box::new(func);
48 Box::into_raw(func) as gpointer
49}
50
51unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
52 bus: *mut ffi::GstBus,
53 msg: *mut ffi::GstMessage,
54 func: gpointer,
55) -> gboolean {
56 unsafe {
57 let func: &mut glib::thread_guard::ThreadGuard<F> =
58 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
59 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
60 }
61}
62
63unsafe extern "C" fn destroy_closure_watch_local<
64 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
65>(
66 ptr: gpointer,
67) {
68 unsafe {
69 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
70 }
71}
72
73fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
74 #[allow(clippy::type_complexity)]
75 let func: Box<glib::thread_guard::ThreadGuard<F>> =
76 Box::new(glib::thread_guard::ThreadGuard::new(func));
77 Box::into_raw(func) as gpointer
78}
79
80unsafe extern "C" fn trampoline_sync<
81 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
82>(
83 bus: *mut ffi::GstBus,
84 msg: *mut ffi::GstMessage,
85 func: gpointer,
86) -> ffi::GstBusSyncReply {
87 unsafe {
88 let f: &F = &*(func as *const F);
89 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
90
91 if res == ffi::GST_BUS_DROP {
92 ffi::gst_mini_object_unref(msg as *mut _);
93 }
94
95 res
96 }
97}
98
99unsafe extern "C" fn destroy_closure_sync<
100 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
101>(
102 ptr: gpointer,
103) {
104 unsafe {
105 let _ = Box::<F>::from_raw(ptr as *mut _);
106 }
107}
108
109fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
110 func: F,
111) -> gpointer {
112 let func: Box<F> = Box::new(func);
113 Box::into_raw(func) as gpointer
114}
115
116impl Bus {
117 #[doc(alias = "gst_bus_add_signal_watch")]
135 #[doc(alias = "gst_bus_add_signal_watch_full")]
136 pub fn add_signal_watch_full(&self, priority: Priority) {
137 unsafe {
138 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
139 }
140 }
141
142 #[doc(alias = "gst_bus_create_watch")]
153 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
154 where
155 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
156 {
157 skip_assert_initialized!();
158 unsafe {
159 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
160 glib::ffi::g_source_set_callback(
161 source,
162 Some(transmute::<
163 *mut (),
164 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
165 >(trampoline_watch::<F> as *mut ())),
166 into_raw_watch(func),
167 Some(destroy_closure_watch::<F>),
168 );
169 glib::ffi::g_source_set_priority(source, priority.into_glib());
170
171 if let Some(name) = name {
172 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
173 }
174
175 from_glib_full(source)
176 }
177 }
178
179 #[doc(alias = "gst_bus_add_watch")]
204 #[doc(alias = "gst_bus_add_watch_full")]
205 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
206 where
207 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
208 {
209 unsafe {
210 let res = ffi::gst_bus_add_watch_full(
211 self.to_glib_none().0,
212 glib::ffi::G_PRIORITY_DEFAULT,
213 Some(trampoline_watch::<F>),
214 into_raw_watch(func),
215 Some(destroy_closure_watch::<F>),
216 );
217
218 if res == 0 {
219 Err(glib::bool_error!("Bus already has a watch"))
220 } else {
221 Ok(BusWatchGuard { bus: self.clone() })
222 }
223 }
224 }
225
226 #[doc(alias = "gst_bus_add_watch")]
227 #[doc(alias = "gst_bus_add_watch_full")]
228 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
229 where
230 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
231 {
232 unsafe {
233 let ctx = glib::MainContext::ref_thread_default();
234 let _acquire = ctx
235 .acquire()
236 .expect("thread default main context already acquired by another thread");
237
238 let res = ffi::gst_bus_add_watch_full(
239 self.to_glib_none().0,
240 glib::ffi::G_PRIORITY_DEFAULT,
241 Some(trampoline_watch_local::<F>),
242 into_raw_watch_local(func),
243 Some(destroy_closure_watch_local::<F>),
244 );
245
246 if res == 0 {
247 Err(glib::bool_error!("Bus already has a watch"))
248 } else {
249 Ok(BusWatchGuard { bus: self.clone() })
250 }
251 }
252 }
253
254 #[doc(alias = "gst_bus_set_sync_handler")]
268 pub fn set_sync_handler<F>(&self, func: F)
269 where
270 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
271 {
272 unsafe {
273 let bus = self.to_glib_none().0;
274
275 #[allow(clippy::manual_dangling_ptr)]
276 #[cfg(not(feature = "v1_18"))]
277 {
278 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
279 std::sync::OnceLock::new();
280
281 let set_once_quark = SET_ONCE_QUARK
282 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
283
284 if crate::version() < (1, 16, 3, 0) {
287 if !glib::gobject_ffi::g_object_get_qdata(
288 bus as *mut _,
289 set_once_quark.into_glib(),
290 )
291 .is_null()
292 {
293 panic!("Bus sync handler can only be set once");
294 }
295
296 glib::gobject_ffi::g_object_set_qdata(
297 bus as *mut _,
298 set_once_quark.into_glib(),
299 1 as *mut _,
300 );
301 }
302 }
303
304 ffi::gst_bus_set_sync_handler(
305 bus,
306 Some(trampoline_sync::<F>),
307 into_raw_sync(func),
308 Some(destroy_closure_sync::<F>),
309 )
310 }
311 }
312
313 pub fn unset_sync_handler(&self) {
314 #[cfg(not(feature = "v1_18"))]
315 {
316 if crate::version() < (1, 16, 3, 0) {
319 return;
320 }
321 }
322
323 unsafe {
324 use std::ptr;
325
326 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
327 }
328 }
329
330 #[doc(alias = "gst_bus_pop")]
331 pub fn iter(&self) -> Iter<'_> {
332 self.iter_timed(Some(crate::ClockTime::ZERO))
333 }
334
335 #[doc(alias = "gst_bus_timed_pop")]
336 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter<'_> {
337 Iter {
338 bus: self,
339 timeout: timeout.into(),
340 }
341 }
342
343 #[doc(alias = "gst_bus_pop_filtered")]
344 pub fn iter_filtered<'a>(
345 &'a self,
346 msg_types: &'a [MessageType],
347 ) -> impl Iterator<Item = Message> + 'a {
348 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
349 }
350
351 #[doc(alias = "gst_bus_timed_pop_filtered")]
352 pub fn iter_timed_filtered<'a>(
353 &'a self,
354 timeout: impl Into<Option<crate::ClockTime>>,
355 msg_types: &'a [MessageType],
356 ) -> impl Iterator<Item = Message> + 'a {
357 self.iter_timed(timeout)
358 .filter(move |msg| msg_types.contains(&msg.type_()))
359 }
360
361 #[doc(alias = "gst_bus_timed_pop_filtered")]
379 pub fn timed_pop_filtered(
380 &self,
381 timeout: impl Into<Option<crate::ClockTime>>,
382 msg_types: &[MessageType],
383 ) -> Option<Message> {
384 let Some(timeout) = timeout.into() else {
386 loop {
387 let msg = self.timed_pop(None)?;
388 if msg_types.contains(&msg.type_()) {
389 return Some(msg);
390 }
391 }
392 };
393
394 let total = timeout;
396 let start = std::time::Instant::now();
397
398 loop {
399 let elapsed = crate::ClockTime::from_nseconds(start.elapsed().as_nanos() as u64);
400
401 let remaining = total.checked_sub(elapsed)?;
403
404 let msg = self.timed_pop(Some(remaining))?;
405
406 if msg_types.contains(&msg.type_()) {
407 return Some(msg);
408 }
409
410 }
412 }
413
414 #[doc(alias = "gst_bus_pop_filtered")]
428 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
429 loop {
430 let msg = self.pop()?;
431 if msg_types.contains(&msg.type_()) {
432 return Some(msg);
433 }
434 }
435 }
436
437 pub fn stream(&self) -> BusStream {
438 BusStream::new(self)
439 }
440
441 pub fn stream_filtered<'a>(
442 &self,
443 message_types: &'a [MessageType],
444 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a + use<'a> {
445 self.stream().filter(move |message| {
446 let message_type = message.type_();
447
448 future::ready(message_types.contains(&message_type))
449 })
450 }
451}
452
453#[must_use = "iterators are lazy and do nothing unless consumed"]
454#[derive(Debug)]
455pub struct Iter<'a> {
456 bus: &'a Bus,
457 timeout: Option<crate::ClockTime>,
458}
459
460impl Iterator for Iter<'_> {
461 type Item = Message;
462
463 fn next(&mut self) -> Option<Message> {
464 self.bus.timed_pop(self.timeout)
465 }
466}
467
468#[derive(Debug)]
469pub struct BusStream {
470 bus: glib::WeakRef<Bus>,
471 receiver: UnboundedReceiver<Message>,
472}
473
474impl BusStream {
475 fn new(bus: &Bus) -> Self {
476 skip_assert_initialized!();
477
478 let mutex = Arc::new(Mutex::new(()));
479 let (sender, receiver) = mpsc::unbounded();
480
481 let _mutex_guard = mutex.lock().unwrap();
487 bus.set_sync_handler({
488 let sender = sender.clone();
489 let mutex = mutex.clone();
490
491 move |_bus, message| {
492 let _mutex_guard = mutex.lock().unwrap();
493
494 let _ = sender.unbounded_send(message.to_owned());
495
496 BusSyncReply::Drop
497 }
498 });
499
500 while let Some(message) = bus.pop() {
502 let _ = sender.unbounded_send(message);
503 }
504
505 Self {
506 bus: bus.downgrade(),
507 receiver,
508 }
509 }
510}
511
512impl Drop for BusStream {
513 fn drop(&mut self) {
514 if let Some(bus) = self.bus.upgrade() {
515 bus.unset_sync_handler();
516 }
517 }
518}
519
520impl Stream for BusStream {
521 type Item = Message;
522
523 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
524 self.receiver.poll_next_unpin(context)
525 }
526}
527
528impl FusedStream for BusStream {
529 fn is_terminated(&self) -> bool {
530 self.receiver.is_terminated()
531 }
532}
533
534#[derive(Debug)]
539#[must_use = "if unused the bus watch will immediately be removed"]
540pub struct BusWatchGuard {
541 bus: Bus,
542}
543
544impl Drop for BusWatchGuard {
545 fn drop(&mut self) {
546 let _ = self.bus.remove_watch();
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use std::{
553 sync::{Arc, Barrier, Mutex},
554 thread,
555 time::{Duration, Instant},
556 };
557
558 use super::*;
559
560 #[test]
561 fn test_sync_handler() {
562 crate::init().unwrap();
563
564 let bus = Bus::new();
565 let msgs = Arc::new(Mutex::new(Vec::new()));
566 let msgs_clone = msgs.clone();
567 bus.set_sync_handler(move |_, msg| {
568 msgs_clone.lock().unwrap().push(msg.clone());
569 BusSyncReply::Pass
570 });
571
572 bus.post(crate::message::Eos::new()).unwrap();
573
574 let msgs = msgs.lock().unwrap();
575 assert_eq!(msgs.len(), 1);
576 match msgs[0].view() {
577 crate::MessageView::Eos(_) => (),
578 _ => unreachable!(),
579 }
580 }
581
582 #[test]
583 fn test_bus_stream() {
584 crate::init().unwrap();
585
586 let bus = Bus::new();
587 let bus_stream = bus.stream();
588
589 let eos_message = crate::message::Eos::new();
590 bus.post(eos_message).unwrap();
591
592 let bus_future = StreamExt::into_future(bus_stream);
593 let (message, _) = futures_executor::block_on(bus_future);
594
595 match message.unwrap().view() {
596 crate::MessageView::Eos(_) => (),
597 _ => unreachable!(),
598 }
599 }
600
601 #[test]
602 fn test_bus_timed_pop_filtered_does_not_restart_timeout_for_filtered_messages() {
603 crate::init().unwrap();
604
605 let bus = Bus::new();
606 let bus_clone = bus.clone();
607
608 let timeout = Duration::from_millis(100);
609 let timeout_allowance = Duration::from_millis(50);
610
611 let barrier = Arc::new(Barrier::new(2));
612 let barrier_clone = barrier.clone();
613
614 let message_poster = thread::spawn(move || {
616 barrier_clone.wait();
618
619 let start = Instant::now();
620 while start.elapsed() < timeout * 3 {
621 bus_clone
622 .post(crate::message::DurationChanged::new())
623 .unwrap();
624 thread::sleep(Duration::from_millis(10));
625 }
626 });
627
628 barrier.wait();
630
631 let start = Instant::now();
632
633 let msg = bus.timed_pop_filtered(
634 crate::ClockTime::from_mseconds(timeout.as_millis() as u64),
635 &[MessageType::Eos],
636 );
637
638 let elapsed = start.elapsed();
639
640 assert!(msg.is_none());
641
642 assert!(
646 elapsed < timeout + timeout_allowance,
647 "timed_pop_filtered exceeded the requested timeout while filtering"
648 );
649
650 message_poster
651 .join()
652 .expect("message_poster thread panicked");
653 }
654}