1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_channel::mpsc::{self, UnboundedReceiver};
11use futures_core::Stream;
12use futures_util::{stream::FusedStream, StreamExt};
13use glib::{
14 ffi::{gboolean, gpointer},
15 prelude::*,
16 source::Priority,
17 translate::*,
18 ControlFlow,
19};
20
21use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
22
23unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
24 bus: *mut ffi::GstBus,
25 msg: *mut ffi::GstMessage,
26 func: gpointer,
27) -> gboolean {
28 let func: &mut F = &mut *(func as *mut F);
29 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
30}
31
32unsafe extern "C" fn destroy_closure_watch<
33 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
34>(
35 ptr: gpointer,
36) {
37 let _ = Box::<F>::from_raw(ptr as *mut _);
38}
39
40fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
41 #[allow(clippy::type_complexity)]
42 let func: Box<F> = Box::new(func);
43 Box::into_raw(func) as gpointer
44}
45
46unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
47 bus: *mut ffi::GstBus,
48 msg: *mut ffi::GstMessage,
49 func: gpointer,
50) -> gboolean {
51 let func: &mut glib::thread_guard::ThreadGuard<F> =
52 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
53 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
54}
55
56unsafe extern "C" fn destroy_closure_watch_local<
57 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
58>(
59 ptr: gpointer,
60) {
61 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
62}
63
64fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
65 #[allow(clippy::type_complexity)]
66 let func: Box<glib::thread_guard::ThreadGuard<F>> =
67 Box::new(glib::thread_guard::ThreadGuard::new(func));
68 Box::into_raw(func) as gpointer
69}
70
71unsafe extern "C" fn trampoline_sync<
72 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73>(
74 bus: *mut ffi::GstBus,
75 msg: *mut ffi::GstMessage,
76 func: gpointer,
77) -> ffi::GstBusSyncReply {
78 let f: &F = &*(func as *const F);
79 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
80
81 if res == ffi::GST_BUS_DROP {
82 ffi::gst_mini_object_unref(msg as *mut _);
83 }
84
85 res
86}
87
88unsafe extern "C" fn destroy_closure_sync<
89 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
90>(
91 ptr: gpointer,
92) {
93 let _ = Box::<F>::from_raw(ptr as *mut _);
94}
95
96fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
97 func: F,
98) -> gpointer {
99 let func: Box<F> = Box::new(func);
100 Box::into_raw(func) as gpointer
101}
102
103impl Bus {
104 #[doc(alias = "gst_bus_add_signal_watch")]
122 #[doc(alias = "gst_bus_add_signal_watch_full")]
123 pub fn add_signal_watch_full(&self, priority: Priority) {
124 unsafe {
125 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
126 }
127 }
128
129 #[doc(alias = "gst_bus_create_watch")]
140 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
141 where
142 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
143 {
144 skip_assert_initialized!();
145 unsafe {
146 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
147 glib::ffi::g_source_set_callback(
148 source,
149 Some(transmute::<
150 *mut (),
151 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
152 >(trampoline_watch::<F> as *mut ())),
153 into_raw_watch(func),
154 Some(destroy_closure_watch::<F>),
155 );
156 glib::ffi::g_source_set_priority(source, priority.into_glib());
157
158 if let Some(name) = name {
159 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
160 }
161
162 from_glib_full(source)
163 }
164 }
165
166 #[doc(alias = "gst_bus_add_watch")]
191 #[doc(alias = "gst_bus_add_watch_full")]
192 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
193 where
194 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
195 {
196 unsafe {
197 let res = ffi::gst_bus_add_watch_full(
198 self.to_glib_none().0,
199 glib::ffi::G_PRIORITY_DEFAULT,
200 Some(trampoline_watch::<F>),
201 into_raw_watch(func),
202 Some(destroy_closure_watch::<F>),
203 );
204
205 if res == 0 {
206 Err(glib::bool_error!("Bus already has a watch"))
207 } else {
208 Ok(BusWatchGuard { bus: self.clone() })
209 }
210 }
211 }
212
213 #[doc(alias = "gst_bus_add_watch")]
214 #[doc(alias = "gst_bus_add_watch_full")]
215 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
216 where
217 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
218 {
219 unsafe {
220 let ctx = glib::MainContext::ref_thread_default();
221 let _acquire = ctx
222 .acquire()
223 .expect("thread default main context already acquired by another thread");
224
225 let res = ffi::gst_bus_add_watch_full(
226 self.to_glib_none().0,
227 glib::ffi::G_PRIORITY_DEFAULT,
228 Some(trampoline_watch_local::<F>),
229 into_raw_watch_local(func),
230 Some(destroy_closure_watch_local::<F>),
231 );
232
233 if res == 0 {
234 Err(glib::bool_error!("Bus already has a watch"))
235 } else {
236 Ok(BusWatchGuard { bus: self.clone() })
237 }
238 }
239 }
240
241 #[doc(alias = "gst_bus_set_sync_handler")]
255 pub fn set_sync_handler<F>(&self, func: F)
256 where
257 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
258 {
259 unsafe {
260 let bus = self.to_glib_none().0;
261
262 #[cfg(not(feature = "v1_18"))]
263 {
264 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
265 std::sync::OnceLock::new();
266
267 let set_once_quark = SET_ONCE_QUARK
268 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
269
270 if crate::version() < (1, 16, 3, 0) {
273 if !glib::gobject_ffi::g_object_get_qdata(
274 bus as *mut _,
275 set_once_quark.into_glib(),
276 )
277 .is_null()
278 {
279 panic!("Bus sync handler can only be set once");
280 }
281
282 glib::gobject_ffi::g_object_set_qdata(
283 bus as *mut _,
284 set_once_quark.into_glib(),
285 1 as *mut _,
286 );
287 }
288 }
289
290 ffi::gst_bus_set_sync_handler(
291 bus,
292 Some(trampoline_sync::<F>),
293 into_raw_sync(func),
294 Some(destroy_closure_sync::<F>),
295 )
296 }
297 }
298
299 pub fn unset_sync_handler(&self) {
300 #[cfg(not(feature = "v1_18"))]
301 {
302 if crate::version() < (1, 16, 3, 0) {
305 return;
306 }
307 }
308
309 unsafe {
310 use std::ptr;
311
312 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
313 }
314 }
315
316 #[doc(alias = "gst_bus_pop")]
317 pub fn iter(&self) -> Iter {
318 self.iter_timed(Some(crate::ClockTime::ZERO))
319 }
320
321 #[doc(alias = "gst_bus_timed_pop")]
322 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
323 Iter {
324 bus: self,
325 timeout: timeout.into(),
326 }
327 }
328
329 #[doc(alias = "gst_bus_pop_filtered")]
330 pub fn iter_filtered<'a>(
331 &'a self,
332 msg_types: &'a [MessageType],
333 ) -> impl Iterator<Item = Message> + 'a {
334 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
335 }
336
337 #[doc(alias = "gst_bus_timed_pop_filtered")]
338 pub fn iter_timed_filtered<'a>(
339 &'a self,
340 timeout: impl Into<Option<crate::ClockTime>>,
341 msg_types: &'a [MessageType],
342 ) -> impl Iterator<Item = Message> + 'a {
343 self.iter_timed(timeout)
344 .filter(move |msg| msg_types.contains(&msg.type_()))
345 }
346
347 #[doc(alias = "gst_bus_timed_pop_filtered")]
365 pub fn timed_pop_filtered(
366 &self,
367 timeout: impl Into<Option<crate::ClockTime>> + Clone,
368 msg_types: &[MessageType],
369 ) -> Option<Message> {
370 loop {
371 let msg = self.timed_pop(timeout.clone())?;
372 if msg_types.contains(&msg.type_()) {
373 return Some(msg);
374 }
375 }
376 }
377
378 #[doc(alias = "gst_bus_pop_filtered")]
392 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
393 loop {
394 let msg = self.pop()?;
395 if msg_types.contains(&msg.type_()) {
396 return Some(msg);
397 }
398 }
399 }
400
401 pub fn stream(&self) -> BusStream {
402 BusStream::new(self)
403 }
404
405 pub fn stream_filtered<'a>(
406 &self,
407 message_types: &'a [MessageType],
408 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
409 self.stream().filter(move |message| {
410 let message_type = message.type_();
411
412 future::ready(message_types.contains(&message_type))
413 })
414 }
415}
416
417#[derive(Debug)]
418pub struct Iter<'a> {
419 bus: &'a Bus,
420 timeout: Option<crate::ClockTime>,
421}
422
423impl Iterator for Iter<'_> {
424 type Item = Message;
425
426 fn next(&mut self) -> Option<Message> {
427 self.bus.timed_pop(self.timeout)
428 }
429}
430
431#[derive(Debug)]
432pub struct BusStream {
433 bus: glib::WeakRef<Bus>,
434 receiver: UnboundedReceiver<Message>,
435}
436
437impl BusStream {
438 fn new(bus: &Bus) -> Self {
439 skip_assert_initialized!();
440
441 let (sender, receiver) = mpsc::unbounded();
442
443 bus.set_sync_handler(move |bus, message| {
444 while let Some(message) = bus.pop() {
447 let _ = sender.unbounded_send(message);
448 }
449
450 let _ = sender.unbounded_send(message.to_owned());
451
452 BusSyncReply::Drop
453 });
454
455 Self {
456 bus: bus.downgrade(),
457 receiver,
458 }
459 }
460}
461
462impl Drop for BusStream {
463 fn drop(&mut self) {
464 if let Some(bus) = self.bus.upgrade() {
465 bus.unset_sync_handler();
466 }
467 }
468}
469
470impl Stream for BusStream {
471 type Item = Message;
472
473 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
474 self.receiver.poll_next_unpin(context)
475 }
476}
477
478impl FusedStream for BusStream {
479 fn is_terminated(&self) -> bool {
480 self.receiver.is_terminated()
481 }
482}
483
484#[derive(Debug)]
489#[must_use = "if unused the bus watch will immediately be removed"]
490pub struct BusWatchGuard {
491 bus: Bus,
492}
493
494impl Drop for BusWatchGuard {
495 fn drop(&mut self) {
496 let _ = self.bus.remove_watch();
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::sync::{Arc, Mutex};
503
504 use super::*;
505
506 #[test]
507 fn test_sync_handler() {
508 crate::init().unwrap();
509
510 let bus = Bus::new();
511 let msgs = Arc::new(Mutex::new(Vec::new()));
512 let msgs_clone = msgs.clone();
513 bus.set_sync_handler(move |_, msg| {
514 msgs_clone.lock().unwrap().push(msg.clone());
515 BusSyncReply::Pass
516 });
517
518 bus.post(crate::message::Eos::new()).unwrap();
519
520 let msgs = msgs.lock().unwrap();
521 assert_eq!(msgs.len(), 1);
522 match msgs[0].view() {
523 crate::MessageView::Eos(_) => (),
524 _ => unreachable!(),
525 }
526 }
527
528 #[test]
529 fn test_bus_stream() {
530 crate::init().unwrap();
531
532 let bus = Bus::new();
533 let bus_stream = bus.stream();
534
535 let eos_message = crate::message::Eos::new();
536 bus.post(eos_message).unwrap();
537
538 let bus_future = bus_stream.into_future();
539 let (message, _) = futures_executor::block_on(bus_future);
540
541 match message.unwrap().view() {
542 crate::MessageView::Eos(_) => (),
543 _ => unreachable!(),
544 }
545 }
546}