Skip to main content

gstreamer/
bus.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future,
5    mem::transmute,
6    pin::Pin,
7    sync::{Arc, Mutex},
8    task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{StreamExt, stream::FusedStream};
14use glib::{
15    ControlFlow,
16    ffi::{gboolean, gpointer},
17    prelude::*,
18    source::Priority,
19    translate::*,
20};
21
22use crate::{Bus, BusSyncReply, Message, MessageType, ffi};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25    bus: *mut ffi::GstBus,
26    msg: *mut ffi::GstMessage,
27    func: gpointer,
28) -> gboolean {
29    unsafe {
30        let func: &mut F = &mut *(func as *mut F);
31        func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
32    }
33}
34
35unsafe extern "C" fn destroy_closure_watch<
36    F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
37>(
38    ptr: gpointer,
39) {
40    unsafe {
41        let _ = Box::<F>::from_raw(ptr as *mut _);
42    }
43}
44
45fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
46    #[allow(clippy::type_complexity)]
47    let func: Box<F> = Box::new(func);
48    Box::into_raw(func) as gpointer
49}
50
51unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
52    bus: *mut ffi::GstBus,
53    msg: *mut ffi::GstMessage,
54    func: gpointer,
55) -> gboolean {
56    unsafe {
57        let func: &mut glib::thread_guard::ThreadGuard<F> =
58            &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
59        (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
60    }
61}
62
63unsafe extern "C" fn destroy_closure_watch_local<
64    F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
65>(
66    ptr: gpointer,
67) {
68    unsafe {
69        let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
70    }
71}
72
73fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
74    #[allow(clippy::type_complexity)]
75    let func: Box<glib::thread_guard::ThreadGuard<F>> =
76        Box::new(glib::thread_guard::ThreadGuard::new(func));
77    Box::into_raw(func) as gpointer
78}
79
80unsafe extern "C" fn trampoline_sync<
81    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
82>(
83    bus: *mut ffi::GstBus,
84    msg: *mut ffi::GstMessage,
85    func: gpointer,
86) -> ffi::GstBusSyncReply {
87    unsafe {
88        let f: &F = &*(func as *const F);
89        let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
90
91        if res == ffi::GST_BUS_DROP {
92            ffi::gst_mini_object_unref(msg as *mut _);
93        }
94
95        res
96    }
97}
98
99unsafe extern "C" fn destroy_closure_sync<
100    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
101>(
102    ptr: gpointer,
103) {
104    unsafe {
105        let _ = Box::<F>::from_raw(ptr as *mut _);
106    }
107}
108
109fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
110    func: F,
111) -> gpointer {
112    let func: Box<F> = Box::new(func);
113    Box::into_raw(func) as gpointer
114}
115
116impl Bus {
117    /// Adds a bus signal watch to the default main context with the given `priority`
118    /// (e.g. `G_PRIORITY_DEFAULT`). It is also possible to use a non-default main
119    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()]
120    /// (before one had to create a bus watch source and attach it to the desired
121    /// main context 'manually').
122    ///
123    /// After calling this statement, the bus will emit the "message" signal for each
124    /// message posted on the bus when the `GMainLoop` is running.
125    ///
126    /// This function may be called multiple times. To clean up, the caller is
127    /// responsible for calling [`remove_signal_watch()`][Self::remove_signal_watch()] as many times as this
128    /// function is called.
129    ///
130    /// There can only be a single bus watch per bus, you must remove any signal
131    /// watch before you can set another type of watch.
132    /// ## `priority`
133    /// The priority of the watch.
134    #[doc(alias = "gst_bus_add_signal_watch")]
135    #[doc(alias = "gst_bus_add_signal_watch_full")]
136    pub fn add_signal_watch_full(&self, priority: Priority) {
137        unsafe {
138            ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
139        }
140    }
141
142    /// Create watch for this bus. The [`glib::Source`][crate::glib::Source] will be dispatched whenever
143    /// a message is on the bus. After the GSource is dispatched, the
144    /// message is popped off the bus and unreffed.
145    ///
146    /// As with other watches, there can only be one watch on the bus, including
147    /// any signal watch added with `gst_bus_add_signal_watch`.
148    ///
149    /// # Returns
150    ///
151    /// a [`glib::Source`][crate::glib::Source] that can be added to a `GMainLoop`.
152    #[doc(alias = "gst_bus_create_watch")]
153    pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
154    where
155        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
156    {
157        skip_assert_initialized!();
158        unsafe {
159            let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
160            glib::ffi::g_source_set_callback(
161                source,
162                Some(transmute::<
163                    *mut (),
164                    unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
165                >(trampoline_watch::<F> as *mut ())),
166                into_raw_watch(func),
167                Some(destroy_closure_watch::<F>),
168            );
169            glib::ffi::g_source_set_priority(source, priority.into_glib());
170
171            if let Some(name) = name {
172                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
173            }
174
175            from_glib_full(source)
176        }
177    }
178
179    /// Adds a bus watch to the default main context with the default priority
180    /// ( `G_PRIORITY_DEFAULT` ). It is also possible to use a non-default main
181    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()] (before
182    /// one had to create a bus watch source and attach it to the desired main
183    /// context 'manually').
184    ///
185    /// This function is used to receive asynchronous messages in the main loop.
186    /// There can only be a single bus watch per bus, you must remove it before you
187    /// can set a new one.
188    ///
189    /// The bus watch will only work if a `GMainLoop` is being run.
190    ///
191    /// The watch can be removed using [`remove_watch()`][Self::remove_watch()] or by returning [`false`]
192    /// from `func`. If the watch was added to the default main context it is also
193    /// possible to remove the watch using [`glib::Source::remove()`][crate::glib::Source::remove()].
194    ///
195    /// The bus watch will take its own reference to the `self`, so it is safe to unref
196    /// `self` using `gst_object_unref()` after setting the bus watch.
197    /// ## `func`
198    /// A function to call when a message is received.
199    ///
200    /// # Returns
201    ///
202    /// The event source id or 0 if `self` already got an event source.
203    #[doc(alias = "gst_bus_add_watch")]
204    #[doc(alias = "gst_bus_add_watch_full")]
205    pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
206    where
207        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
208    {
209        unsafe {
210            let res = ffi::gst_bus_add_watch_full(
211                self.to_glib_none().0,
212                glib::ffi::G_PRIORITY_DEFAULT,
213                Some(trampoline_watch::<F>),
214                into_raw_watch(func),
215                Some(destroy_closure_watch::<F>),
216            );
217
218            if res == 0 {
219                Err(glib::bool_error!("Bus already has a watch"))
220            } else {
221                Ok(BusWatchGuard { bus: self.clone() })
222            }
223        }
224    }
225
226    #[doc(alias = "gst_bus_add_watch")]
227    #[doc(alias = "gst_bus_add_watch_full")]
228    pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
229    where
230        F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
231    {
232        unsafe {
233            let ctx = glib::MainContext::ref_thread_default();
234            let _acquire = ctx
235                .acquire()
236                .expect("thread default main context already acquired by another thread");
237
238            let res = ffi::gst_bus_add_watch_full(
239                self.to_glib_none().0,
240                glib::ffi::G_PRIORITY_DEFAULT,
241                Some(trampoline_watch_local::<F>),
242                into_raw_watch_local(func),
243                Some(destroy_closure_watch_local::<F>),
244            );
245
246            if res == 0 {
247                Err(glib::bool_error!("Bus already has a watch"))
248            } else {
249                Ok(BusWatchGuard { bus: self.clone() })
250            }
251        }
252    }
253
254    /// Sets the synchronous handler on the bus. The function will be called
255    /// every time a new message is posted on the bus. Note that the function
256    /// will be called in the same thread context as the posting object. This
257    /// function is usually only called by the creator of the bus. Applications
258    /// should handle messages asynchronously using the gst_bus watch and poll
259    /// functions.
260    ///
261    /// Before 1.16.3 it was not possible to replace an existing handler and
262    /// clearing an existing handler with [`None`] was not thread-safe.
263    /// ## `func`
264    /// The handler function to install
265    /// ## `notify`
266    /// called when `user_data` becomes unused
267    #[doc(alias = "gst_bus_set_sync_handler")]
268    pub fn set_sync_handler<F>(&self, func: F)
269    where
270        F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
271    {
272        unsafe {
273            let bus = self.to_glib_none().0;
274
275            #[allow(clippy::manual_dangling_ptr)]
276            #[cfg(not(feature = "v1_18"))]
277            {
278                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
279                    std::sync::OnceLock::new();
280
281                let set_once_quark = SET_ONCE_QUARK
282                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
283
284                // This is not thread-safe before 1.16.3, see
285                // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
286                if crate::version() < (1, 16, 3, 0) {
287                    if !glib::gobject_ffi::g_object_get_qdata(
288                        bus as *mut _,
289                        set_once_quark.into_glib(),
290                    )
291                    .is_null()
292                    {
293                        panic!("Bus sync handler can only be set once");
294                    }
295
296                    glib::gobject_ffi::g_object_set_qdata(
297                        bus as *mut _,
298                        set_once_quark.into_glib(),
299                        1 as *mut _,
300                    );
301                }
302            }
303
304            ffi::gst_bus_set_sync_handler(
305                bus,
306                Some(trampoline_sync::<F>),
307                into_raw_sync(func),
308                Some(destroy_closure_sync::<F>),
309            )
310        }
311    }
312
313    pub fn unset_sync_handler(&self) {
314        #[cfg(not(feature = "v1_18"))]
315        {
316            // This is not thread-safe before 1.16.3, see
317            // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
318            if crate::version() < (1, 16, 3, 0) {
319                return;
320            }
321        }
322
323        unsafe {
324            use std::ptr;
325
326            ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
327        }
328    }
329
330    #[doc(alias = "gst_bus_pop")]
331    pub fn iter(&self) -> Iter<'_> {
332        self.iter_timed(Some(crate::ClockTime::ZERO))
333    }
334
335    #[doc(alias = "gst_bus_timed_pop")]
336    pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter<'_> {
337        Iter {
338            bus: self,
339            timeout: timeout.into(),
340        }
341    }
342
343    #[doc(alias = "gst_bus_pop_filtered")]
344    pub fn iter_filtered<'a>(
345        &'a self,
346        msg_types: &'a [MessageType],
347    ) -> impl Iterator<Item = Message> + 'a {
348        self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
349    }
350
351    #[doc(alias = "gst_bus_timed_pop_filtered")]
352    pub fn iter_timed_filtered<'a>(
353        &'a self,
354        timeout: impl Into<Option<crate::ClockTime>>,
355        msg_types: &'a [MessageType],
356    ) -> impl Iterator<Item = Message> + 'a {
357        self.iter_timed(timeout)
358            .filter(move |msg| msg_types.contains(&msg.type_()))
359    }
360
361    /// Gets a message from the bus whose type matches the message type mask `types`,
362    /// waiting up to the specified timeout (and discarding any messages that do not
363    /// match the mask provided).
364    ///
365    /// If `timeout` is 0, this function behaves like [`pop_filtered()`][Self::pop_filtered()]. If
366    /// `timeout` is `GST_CLOCK_TIME_NONE`, this function will block forever until a
367    /// matching message was posted on the bus.
368    /// ## `timeout`
369    /// a timeout in nanoseconds, or `GST_CLOCK_TIME_NONE` to wait forever
370    /// ## `types`
371    /// message types to take into account, `GST_MESSAGE_ANY` for any type
372    ///
373    /// # Returns
374    ///
375    /// a [`Message`][crate::Message] matching the
376    ///  filter in `types`, or [`None`] if no matching message was found on
377    ///  the bus until the timeout expired.
378    #[doc(alias = "gst_bus_timed_pop_filtered")]
379    pub fn timed_pop_filtered(
380        &self,
381        timeout: impl Into<Option<crate::ClockTime>>,
382        msg_types: &[MessageType],
383    ) -> Option<Message> {
384        // Infinite wait: just loop forever
385        let Some(timeout) = timeout.into() else {
386            loop {
387                let msg = self.timed_pop(None)?;
388                if msg_types.contains(&msg.type_()) {
389                    return Some(msg);
390                }
391            }
392        };
393
394        // Finite timeout
395        let total = timeout;
396        let start = std::time::Instant::now();
397
398        loop {
399            let elapsed = crate::ClockTime::from_nseconds(start.elapsed().as_nanos() as u64);
400
401            // If timeout budget is exhausted, return None
402            let remaining = total.checked_sub(elapsed)?;
403
404            let msg = self.timed_pop(Some(remaining))?;
405
406            if msg_types.contains(&msg.type_()) {
407                return Some(msg);
408            }
409
410            // Discard non-matching messages without restarting the timeout
411        }
412    }
413
414    /// Gets a message matching `type_` from the bus. Will discard all messages on
415    /// the bus that do not match `type_` and that have been posted before the first
416    /// message that does match `type_`. If there is no message matching `type_` on
417    /// the bus, all messages will be discarded. It is not possible to use message
418    /// enums beyond `GST_MESSAGE_EXTENDED` in the `events` mask.
419    /// ## `types`
420    /// message types to take into account
421    ///
422    /// # Returns
423    ///
424    /// the next [`Message`][crate::Message] matching
425    ///  `type_` that is on the bus, or [`None`] if the bus is empty or there
426    ///  is no message matching `type_`.
427    #[doc(alias = "gst_bus_pop_filtered")]
428    pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
429        loop {
430            let msg = self.pop()?;
431            if msg_types.contains(&msg.type_()) {
432                return Some(msg);
433            }
434        }
435    }
436
437    pub fn stream(&self) -> BusStream {
438        BusStream::new(self)
439    }
440
441    pub fn stream_filtered<'a>(
442        &self,
443        message_types: &'a [MessageType],
444    ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a + use<'a> {
445        self.stream().filter(move |message| {
446            let message_type = message.type_();
447
448            future::ready(message_types.contains(&message_type))
449        })
450    }
451}
452
453#[must_use = "iterators are lazy and do nothing unless consumed"]
454#[derive(Debug)]
455pub struct Iter<'a> {
456    bus: &'a Bus,
457    timeout: Option<crate::ClockTime>,
458}
459
460impl Iterator for Iter<'_> {
461    type Item = Message;
462
463    fn next(&mut self) -> Option<Message> {
464        self.bus.timed_pop(self.timeout)
465    }
466}
467
468#[derive(Debug)]
469pub struct BusStream {
470    bus: glib::WeakRef<Bus>,
471    receiver: UnboundedReceiver<Message>,
472}
473
474impl BusStream {
475    fn new(bus: &Bus) -> Self {
476        skip_assert_initialized!();
477
478        let mutex = Arc::new(Mutex::new(()));
479        let (sender, receiver) = mpsc::unbounded();
480
481        // Use a mutex to ensure that the sync handler is not putting any messages into the sender
482        // until we have removed all previously queued messages from the bus.
483        // This makes sure that the messages are staying in order.
484        //
485        // We could use the bus' object lock here but a separate mutex seems safer.
486        let _mutex_guard = mutex.lock().unwrap();
487        bus.set_sync_handler({
488            let sender = sender.clone();
489            let mutex = mutex.clone();
490
491            move |_bus, message| {
492                let _mutex_guard = mutex.lock().unwrap();
493
494                let _ = sender.unbounded_send(message.to_owned());
495
496                BusSyncReply::Drop
497            }
498        });
499
500        // First pop all messages that might've been previously queued before creating the bus stream.
501        while let Some(message) = bus.pop() {
502            let _ = sender.unbounded_send(message);
503        }
504
505        Self {
506            bus: bus.downgrade(),
507            receiver,
508        }
509    }
510}
511
512impl Drop for BusStream {
513    fn drop(&mut self) {
514        if let Some(bus) = self.bus.upgrade() {
515            bus.unset_sync_handler();
516        }
517    }
518}
519
520impl Stream for BusStream {
521    type Item = Message;
522
523    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
524        self.receiver.poll_next_unpin(context)
525    }
526}
527
528impl FusedStream for BusStream {
529    fn is_terminated(&self) -> bool {
530        self.receiver.is_terminated()
531    }
532}
533
534// rustdoc-stripper-ignore-next
535/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
536///
537/// When dropped the bus watch is removed from the bus.
538#[derive(Debug)]
539#[must_use = "if unused the bus watch will immediately be removed"]
540pub struct BusWatchGuard {
541    bus: Bus,
542}
543
544impl Drop for BusWatchGuard {
545    fn drop(&mut self) {
546        let _ = self.bus.remove_watch();
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use std::{
553        sync::{Arc, Barrier, Mutex},
554        thread,
555        time::{Duration, Instant},
556    };
557
558    use super::*;
559
560    #[test]
561    fn test_sync_handler() {
562        crate::init().unwrap();
563
564        let bus = Bus::new();
565        let msgs = Arc::new(Mutex::new(Vec::new()));
566        let msgs_clone = msgs.clone();
567        bus.set_sync_handler(move |_, msg| {
568            msgs_clone.lock().unwrap().push(msg.clone());
569            BusSyncReply::Pass
570        });
571
572        bus.post(crate::message::Eos::new()).unwrap();
573
574        let msgs = msgs.lock().unwrap();
575        assert_eq!(msgs.len(), 1);
576        match msgs[0].view() {
577            crate::MessageView::Eos(_) => (),
578            _ => unreachable!(),
579        }
580    }
581
582    #[test]
583    fn test_bus_stream() {
584        crate::init().unwrap();
585
586        let bus = Bus::new();
587        let bus_stream = bus.stream();
588
589        let eos_message = crate::message::Eos::new();
590        bus.post(eos_message).unwrap();
591
592        let bus_future = StreamExt::into_future(bus_stream);
593        let (message, _) = futures_executor::block_on(bus_future);
594
595        match message.unwrap().view() {
596            crate::MessageView::Eos(_) => (),
597            _ => unreachable!(),
598        }
599    }
600
601    #[test]
602    fn test_bus_timed_pop_filtered_does_not_restart_timeout_for_filtered_messages() {
603        crate::init().unwrap();
604
605        let bus = Bus::new();
606        let bus_clone = bus.clone();
607
608        let timeout = Duration::from_millis(100);
609        let timeout_allowance = Duration::from_millis(50);
610
611        let barrier = Arc::new(Barrier::new(2));
612        let barrier_clone = barrier.clone();
613
614        // Post non-matching messages for longer than the timeout
615        let message_poster = thread::spawn(move || {
616            // Signal that posting is about to start
617            barrier_clone.wait();
618
619            let start = Instant::now();
620            while start.elapsed() < timeout * 3 {
621                bus_clone
622                    .post(crate::message::DurationChanged::new())
623                    .unwrap();
624                thread::sleep(Duration::from_millis(10));
625            }
626        });
627
628        // Wait until the poster thread is ready
629        barrier.wait();
630
631        let start = Instant::now();
632
633        let msg = bus.timed_pop_filtered(
634            crate::ClockTime::from_mseconds(timeout.as_millis() as u64),
635            &[MessageType::Eos],
636        );
637
638        let elapsed = start.elapsed();
639
640        assert!(msg.is_none());
641
642        // While filtering messages, `timed_pop_filtered` must not restart the
643        // timeout for each discarded message. The timeout applies to the full
644        // wait for a matching message.
645        assert!(
646            elapsed < timeout + timeout_allowance,
647            "timed_pop_filtered exceeded the requested timeout while filtering"
648        );
649
650        message_poster
651            .join()
652            .expect("message_poster thread panicked");
653    }
654}