1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_channel::mpsc::{self, UnboundedReceiver};
11use futures_core::Stream;
12use futures_util::{stream::FusedStream, StreamExt};
13use glib::{
14 ffi::{gboolean, gpointer},
15 prelude::*,
16 source::Priority,
17 translate::*,
18 ControlFlow,
19};
20
21use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
22
23unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
24 bus: *mut ffi::GstBus,
25 msg: *mut ffi::GstMessage,
26 func: gpointer,
27) -> gboolean {
28 let func: &mut F = &mut *(func as *mut F);
29 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
30}
31
32unsafe extern "C" fn destroy_closure_watch<
33 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
34>(
35 ptr: gpointer,
36) {
37 let _ = Box::<F>::from_raw(ptr as *mut _);
38}
39
40fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
41 #[allow(clippy::type_complexity)]
42 let func: Box<F> = Box::new(func);
43 Box::into_raw(func) as gpointer
44}
45
46unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
47 bus: *mut ffi::GstBus,
48 msg: *mut ffi::GstMessage,
49 func: gpointer,
50) -> gboolean {
51 let func: &mut glib::thread_guard::ThreadGuard<F> =
52 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
53 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
54}
55
56unsafe extern "C" fn destroy_closure_watch_local<
57 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
58>(
59 ptr: gpointer,
60) {
61 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
62}
63
64fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
65 #[allow(clippy::type_complexity)]
66 let func: Box<glib::thread_guard::ThreadGuard<F>> =
67 Box::new(glib::thread_guard::ThreadGuard::new(func));
68 Box::into_raw(func) as gpointer
69}
70
71unsafe extern "C" fn trampoline_sync<
72 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73>(
74 bus: *mut ffi::GstBus,
75 msg: *mut ffi::GstMessage,
76 func: gpointer,
77) -> ffi::GstBusSyncReply {
78 let f: &F = &*(func as *const F);
79 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
80
81 if res == ffi::GST_BUS_DROP {
82 ffi::gst_mini_object_unref(msg as *mut _);
83 }
84
85 res
86}
87
88unsafe extern "C" fn destroy_closure_sync<
89 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
90>(
91 ptr: gpointer,
92) {
93 let _ = Box::<F>::from_raw(ptr as *mut _);
94}
95
96fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
97 func: F,
98) -> gpointer {
99 let func: Box<F> = Box::new(func);
100 Box::into_raw(func) as gpointer
101}
102
103impl Bus {
104 #[doc(alias = "gst_bus_add_signal_watch")]
122 #[doc(alias = "gst_bus_add_signal_watch_full")]
123 pub fn add_signal_watch_full(&self, priority: Priority) {
124 unsafe {
125 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
126 }
127 }
128
129 #[doc(alias = "gst_bus_create_watch")]
140 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
141 where
142 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
143 {
144 skip_assert_initialized!();
145 unsafe {
146 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
147 glib::ffi::g_source_set_callback(
148 source,
149 Some(transmute::<
150 *mut (),
151 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
152 >(trampoline_watch::<F> as *mut ())),
153 into_raw_watch(func),
154 Some(destroy_closure_watch::<F>),
155 );
156 glib::ffi::g_source_set_priority(source, priority.into_glib());
157
158 if let Some(name) = name {
159 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
160 }
161
162 from_glib_full(source)
163 }
164 }
165
166 #[doc(alias = "gst_bus_add_watch")]
191 #[doc(alias = "gst_bus_add_watch_full")]
192 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
193 where
194 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
195 {
196 unsafe {
197 let res = ffi::gst_bus_add_watch_full(
198 self.to_glib_none().0,
199 glib::ffi::G_PRIORITY_DEFAULT,
200 Some(trampoline_watch::<F>),
201 into_raw_watch(func),
202 Some(destroy_closure_watch::<F>),
203 );
204
205 if res == 0 {
206 Err(glib::bool_error!("Bus already has a watch"))
207 } else {
208 Ok(BusWatchGuard { bus: self.clone() })
209 }
210 }
211 }
212
213 #[doc(alias = "gst_bus_add_watch")]
214 #[doc(alias = "gst_bus_add_watch_full")]
215 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
216 where
217 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
218 {
219 unsafe {
220 let ctx = glib::MainContext::ref_thread_default();
221 let _acquire = ctx
222 .acquire()
223 .expect("thread default main context already acquired by another thread");
224
225 let res = ffi::gst_bus_add_watch_full(
226 self.to_glib_none().0,
227 glib::ffi::G_PRIORITY_DEFAULT,
228 Some(trampoline_watch_local::<F>),
229 into_raw_watch_local(func),
230 Some(destroy_closure_watch_local::<F>),
231 );
232
233 if res == 0 {
234 Err(glib::bool_error!("Bus already has a watch"))
235 } else {
236 Ok(BusWatchGuard { bus: self.clone() })
237 }
238 }
239 }
240
241 #[doc(alias = "gst_bus_set_sync_handler")]
255 pub fn set_sync_handler<F>(&self, func: F)
256 where
257 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
258 {
259 unsafe {
260 let bus = self.to_glib_none().0;
261
262 #[cfg(not(feature = "v1_18"))]
263 {
264 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
265 std::sync::OnceLock::new();
266
267 let set_once_quark = SET_ONCE_QUARK
268 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
269
270 if crate::version() < (1, 16, 3, 0) {
273 if !glib::gobject_ffi::g_object_get_qdata(
274 bus as *mut _,
275 set_once_quark.into_glib(),
276 )
277 .is_null()
278 {
279 panic!("Bus sync handler can only be set once");
280 }
281
282 glib::gobject_ffi::g_object_set_qdata(
283 bus as *mut _,
284 set_once_quark.into_glib(),
285 1 as *mut _,
286 );
287 }
288 }
289
290 ffi::gst_bus_set_sync_handler(
291 bus,
292 Some(trampoline_sync::<F>),
293 into_raw_sync(func),
294 Some(destroy_closure_sync::<F>),
295 )
296 }
297 }
298
299 pub fn unset_sync_handler(&self) {
300 #[cfg(not(feature = "v1_18"))]
301 {
302 if crate::version() < (1, 16, 3, 0) {
305 return;
306 }
307 }
308
309 unsafe {
310 use std::ptr;
311
312 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
313 }
314 }
315
316 #[doc(alias = "gst_bus_pop")]
317 pub fn iter(&self) -> Iter {
318 self.iter_timed(Some(crate::ClockTime::ZERO))
319 }
320
321 #[doc(alias = "gst_bus_timed_pop")]
322 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
323 Iter {
324 bus: self,
325 timeout: timeout.into(),
326 }
327 }
328
329 #[doc(alias = "gst_bus_pop_filtered")]
330 pub fn iter_filtered<'a>(
331 &'a self,
332 msg_types: &'a [MessageType],
333 ) -> impl Iterator<Item = Message> + 'a {
334 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
335 }
336
337 #[doc(alias = "gst_bus_timed_pop_filtered")]
338 pub fn iter_timed_filtered<'a>(
339 &'a self,
340 timeout: impl Into<Option<crate::ClockTime>>,
341 msg_types: &'a [MessageType],
342 ) -> impl Iterator<Item = Message> + 'a {
343 self.iter_timed(timeout)
344 .filter(move |msg| msg_types.contains(&msg.type_()))
345 }
346
347 #[doc(alias = "gst_bus_timed_pop_filtered")]
365 pub fn timed_pop_filtered(
366 &self,
367 timeout: impl Into<Option<crate::ClockTime>> + Clone,
368 msg_types: &[MessageType],
369 ) -> Option<Message> {
370 loop {
371 let msg = self.timed_pop(timeout.clone())?;
372 if msg_types.contains(&msg.type_()) {
373 return Some(msg);
374 }
375 }
376 }
377
378 #[doc(alias = "gst_bus_pop_filtered")]
392 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
393 loop {
394 let msg = self.pop()?;
395 if msg_types.contains(&msg.type_()) {
396 return Some(msg);
397 }
398 }
399 }
400
401 pub fn stream(&self) -> BusStream {
402 BusStream::new(self)
403 }
404
405 pub fn stream_filtered<'a>(
406 &self,
407 message_types: &'a [MessageType],
408 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
409 self.stream().filter(move |message| {
410 let message_type = message.type_();
411
412 future::ready(message_types.contains(&message_type))
413 })
414 }
415}
416
417#[must_use = "iterators are lazy and do nothing unless consumed"]
418#[derive(Debug)]
419pub struct Iter<'a> {
420 bus: &'a Bus,
421 timeout: Option<crate::ClockTime>,
422}
423
424impl Iterator for Iter<'_> {
425 type Item = Message;
426
427 fn next(&mut self) -> Option<Message> {
428 self.bus.timed_pop(self.timeout)
429 }
430}
431
432#[derive(Debug)]
433pub struct BusStream {
434 bus: glib::WeakRef<Bus>,
435 receiver: UnboundedReceiver<Message>,
436}
437
438impl BusStream {
439 fn new(bus: &Bus) -> Self {
440 skip_assert_initialized!();
441
442 let (sender, receiver) = mpsc::unbounded();
443
444 bus.set_sync_handler(move |bus, message| {
445 while let Some(message) = bus.pop() {
448 let _ = sender.unbounded_send(message);
449 }
450
451 let _ = sender.unbounded_send(message.to_owned());
452
453 BusSyncReply::Drop
454 });
455
456 Self {
457 bus: bus.downgrade(),
458 receiver,
459 }
460 }
461}
462
463impl Drop for BusStream {
464 fn drop(&mut self) {
465 if let Some(bus) = self.bus.upgrade() {
466 bus.unset_sync_handler();
467 }
468 }
469}
470
471impl Stream for BusStream {
472 type Item = Message;
473
474 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
475 self.receiver.poll_next_unpin(context)
476 }
477}
478
479impl FusedStream for BusStream {
480 fn is_terminated(&self) -> bool {
481 self.receiver.is_terminated()
482 }
483}
484
485#[derive(Debug)]
490#[must_use = "if unused the bus watch will immediately be removed"]
491pub struct BusWatchGuard {
492 bus: Bus,
493}
494
495impl Drop for BusWatchGuard {
496 fn drop(&mut self) {
497 let _ = self.bus.remove_watch();
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use std::sync::{Arc, Mutex};
504
505 use super::*;
506
507 #[test]
508 fn test_sync_handler() {
509 crate::init().unwrap();
510
511 let bus = Bus::new();
512 let msgs = Arc::new(Mutex::new(Vec::new()));
513 let msgs_clone = msgs.clone();
514 bus.set_sync_handler(move |_, msg| {
515 msgs_clone.lock().unwrap().push(msg.clone());
516 BusSyncReply::Pass
517 });
518
519 bus.post(crate::message::Eos::new()).unwrap();
520
521 let msgs = msgs.lock().unwrap();
522 assert_eq!(msgs.len(), 1);
523 match msgs[0].view() {
524 crate::MessageView::Eos(_) => (),
525 _ => unreachable!(),
526 }
527 }
528
529 #[test]
530 fn test_bus_stream() {
531 crate::init().unwrap();
532
533 let bus = Bus::new();
534 let bus_stream = bus.stream();
535
536 let eos_message = crate::message::Eos::new();
537 bus.post(eos_message).unwrap();
538
539 let bus_future = bus_stream.into_future();
540 let (message, _) = futures_executor::block_on(bus_future);
541
542 match message.unwrap().view() {
543 crate::MessageView::Eos(_) => (),
544 _ => unreachable!(),
545 }
546 }
547}