gstreamer/
bus.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future,
5    mem::transmute,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures_channel::mpsc::{self, UnboundedReceiver};
11use futures_core::Stream;
12use futures_util::{stream::FusedStream, StreamExt};
13use glib::{
14    ffi::{gboolean, gpointer},
15    prelude::*,
16    source::Priority,
17    translate::*,
18    ControlFlow,
19};
20
21use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
22
23unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
24    bus: *mut ffi::GstBus,
25    msg: *mut ffi::GstMessage,
26    func: gpointer,
27) -> gboolean {
28    let func: &mut F = &mut *(func as *mut F);
29    func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
30}
31
32unsafe extern "C" fn destroy_closure_watch<
33    F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
34>(
35    ptr: gpointer,
36) {
37    let _ = Box::<F>::from_raw(ptr as *mut _);
38}
39
40fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
41    #[allow(clippy::type_complexity)]
42    let func: Box<F> = Box::new(func);
43    Box::into_raw(func) as gpointer
44}
45
46unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
47    bus: *mut ffi::GstBus,
48    msg: *mut ffi::GstMessage,
49    func: gpointer,
50) -> gboolean {
51    let func: &mut glib::thread_guard::ThreadGuard<F> =
52        &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
53    (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
54}
55
56unsafe extern "C" fn destroy_closure_watch_local<
57    F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
58>(
59    ptr: gpointer,
60) {
61    let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
62}
63
64fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
65    #[allow(clippy::type_complexity)]
66    let func: Box<glib::thread_guard::ThreadGuard<F>> =
67        Box::new(glib::thread_guard::ThreadGuard::new(func));
68    Box::into_raw(func) as gpointer
69}
70
71unsafe extern "C" fn trampoline_sync<
72    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73>(
74    bus: *mut ffi::GstBus,
75    msg: *mut ffi::GstMessage,
76    func: gpointer,
77) -> ffi::GstBusSyncReply {
78    let f: &F = &*(func as *const F);
79    let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
80
81    if res == ffi::GST_BUS_DROP {
82        ffi::gst_mini_object_unref(msg as *mut _);
83    }
84
85    res
86}
87
88unsafe extern "C" fn destroy_closure_sync<
89    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
90>(
91    ptr: gpointer,
92) {
93    let _ = Box::<F>::from_raw(ptr as *mut _);
94}
95
96fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
97    func: F,
98) -> gpointer {
99    let func: Box<F> = Box::new(func);
100    Box::into_raw(func) as gpointer
101}
102
103impl Bus {
104    /// Adds a bus signal watch to the default main context with the given `priority`
105    /// (e.g. `G_PRIORITY_DEFAULT`). It is also possible to use a non-default main
106    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()]
107    /// (before one had to create a bus watch source and attach it to the desired
108    /// main context 'manually').
109    ///
110    /// After calling this statement, the bus will emit the "message" signal for each
111    /// message posted on the bus when the `GMainLoop` is running.
112    ///
113    /// This function may be called multiple times. To clean up, the caller is
114    /// responsible for calling [`remove_signal_watch()`][Self::remove_signal_watch()] as many times as this
115    /// function is called.
116    ///
117    /// There can only be a single bus watch per bus, you must remove any signal
118    /// watch before you can set another type of watch.
119    /// ## `priority`
120    /// The priority of the watch.
121    #[doc(alias = "gst_bus_add_signal_watch")]
122    #[doc(alias = "gst_bus_add_signal_watch_full")]
123    pub fn add_signal_watch_full(&self, priority: Priority) {
124        unsafe {
125            ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
126        }
127    }
128
129    /// Create watch for this bus. The [`glib::Source`][crate::glib::Source] will be dispatched whenever
130    /// a message is on the bus. After the GSource is dispatched, the
131    /// message is popped off the bus and unreffed.
132    ///
133    /// As with other watches, there can only be one watch on the bus, including
134    /// any signal watch added with `gst_bus_add_signal_watch`.
135    ///
136    /// # Returns
137    ///
138    /// a [`glib::Source`][crate::glib::Source] that can be added to a `GMainLoop`.
139    #[doc(alias = "gst_bus_create_watch")]
140    pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
141    where
142        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
143    {
144        skip_assert_initialized!();
145        unsafe {
146            let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
147            glib::ffi::g_source_set_callback(
148                source,
149                Some(transmute::<
150                    *mut (),
151                    unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
152                >(trampoline_watch::<F> as *mut ())),
153                into_raw_watch(func),
154                Some(destroy_closure_watch::<F>),
155            );
156            glib::ffi::g_source_set_priority(source, priority.into_glib());
157
158            if let Some(name) = name {
159                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
160            }
161
162            from_glib_full(source)
163        }
164    }
165
166    /// Adds a bus watch to the default main context with the default priority
167    /// ( `G_PRIORITY_DEFAULT` ). It is also possible to use a non-default main
168    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()] (before
169    /// one had to create a bus watch source and attach it to the desired main
170    /// context 'manually').
171    ///
172    /// This function is used to receive asynchronous messages in the main loop.
173    /// There can only be a single bus watch per bus, you must remove it before you
174    /// can set a new one.
175    ///
176    /// The bus watch will only work if a `GMainLoop` is being run.
177    ///
178    /// The watch can be removed using [`remove_watch()`][Self::remove_watch()] or by returning [`false`]
179    /// from `func`. If the watch was added to the default main context it is also
180    /// possible to remove the watch using [`glib::Source::remove()`][crate::glib::Source::remove()].
181    ///
182    /// The bus watch will take its own reference to the `self`, so it is safe to unref
183    /// `self` using `gst_object_unref()` after setting the bus watch.
184    /// ## `func`
185    /// A function to call when a message is received.
186    ///
187    /// # Returns
188    ///
189    /// The event source id or 0 if `self` already got an event source.
190    #[doc(alias = "gst_bus_add_watch")]
191    #[doc(alias = "gst_bus_add_watch_full")]
192    pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
193    where
194        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
195    {
196        unsafe {
197            let res = ffi::gst_bus_add_watch_full(
198                self.to_glib_none().0,
199                glib::ffi::G_PRIORITY_DEFAULT,
200                Some(trampoline_watch::<F>),
201                into_raw_watch(func),
202                Some(destroy_closure_watch::<F>),
203            );
204
205            if res == 0 {
206                Err(glib::bool_error!("Bus already has a watch"))
207            } else {
208                Ok(BusWatchGuard { bus: self.clone() })
209            }
210        }
211    }
212
213    #[doc(alias = "gst_bus_add_watch")]
214    #[doc(alias = "gst_bus_add_watch_full")]
215    pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
216    where
217        F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
218    {
219        unsafe {
220            let ctx = glib::MainContext::ref_thread_default();
221            let _acquire = ctx
222                .acquire()
223                .expect("thread default main context already acquired by another thread");
224
225            let res = ffi::gst_bus_add_watch_full(
226                self.to_glib_none().0,
227                glib::ffi::G_PRIORITY_DEFAULT,
228                Some(trampoline_watch_local::<F>),
229                into_raw_watch_local(func),
230                Some(destroy_closure_watch_local::<F>),
231            );
232
233            if res == 0 {
234                Err(glib::bool_error!("Bus already has a watch"))
235            } else {
236                Ok(BusWatchGuard { bus: self.clone() })
237            }
238        }
239    }
240
241    /// Sets the synchronous handler on the bus. The function will be called
242    /// every time a new message is posted on the bus. Note that the function
243    /// will be called in the same thread context as the posting object. This
244    /// function is usually only called by the creator of the bus. Applications
245    /// should handle messages asynchronously using the gst_bus watch and poll
246    /// functions.
247    ///
248    /// Before 1.16.3 it was not possible to replace an existing handler and
249    /// clearing an existing handler with [`None`] was not thread-safe.
250    /// ## `func`
251    /// The handler function to install
252    /// ## `notify`
253    /// called when `user_data` becomes unused
254    #[doc(alias = "gst_bus_set_sync_handler")]
255    pub fn set_sync_handler<F>(&self, func: F)
256    where
257        F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
258    {
259        unsafe {
260            let bus = self.to_glib_none().0;
261
262            #[cfg(not(feature = "v1_18"))]
263            {
264                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
265                    std::sync::OnceLock::new();
266
267                let set_once_quark = SET_ONCE_QUARK
268                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
269
270                // This is not thread-safe before 1.16.3, see
271                // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
272                if crate::version() < (1, 16, 3, 0) {
273                    if !glib::gobject_ffi::g_object_get_qdata(
274                        bus as *mut _,
275                        set_once_quark.into_glib(),
276                    )
277                    .is_null()
278                    {
279                        panic!("Bus sync handler can only be set once");
280                    }
281
282                    glib::gobject_ffi::g_object_set_qdata(
283                        bus as *mut _,
284                        set_once_quark.into_glib(),
285                        1 as *mut _,
286                    );
287                }
288            }
289
290            ffi::gst_bus_set_sync_handler(
291                bus,
292                Some(trampoline_sync::<F>),
293                into_raw_sync(func),
294                Some(destroy_closure_sync::<F>),
295            )
296        }
297    }
298
299    pub fn unset_sync_handler(&self) {
300        #[cfg(not(feature = "v1_18"))]
301        {
302            // This is not thread-safe before 1.16.3, see
303            // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
304            if crate::version() < (1, 16, 3, 0) {
305                return;
306            }
307        }
308
309        unsafe {
310            use std::ptr;
311
312            ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
313        }
314    }
315
316    #[doc(alias = "gst_bus_pop")]
317    pub fn iter(&self) -> Iter {
318        self.iter_timed(Some(crate::ClockTime::ZERO))
319    }
320
321    #[doc(alias = "gst_bus_timed_pop")]
322    pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
323        Iter {
324            bus: self,
325            timeout: timeout.into(),
326        }
327    }
328
329    #[doc(alias = "gst_bus_pop_filtered")]
330    pub fn iter_filtered<'a>(
331        &'a self,
332        msg_types: &'a [MessageType],
333    ) -> impl Iterator<Item = Message> + 'a {
334        self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
335    }
336
337    #[doc(alias = "gst_bus_timed_pop_filtered")]
338    pub fn iter_timed_filtered<'a>(
339        &'a self,
340        timeout: impl Into<Option<crate::ClockTime>>,
341        msg_types: &'a [MessageType],
342    ) -> impl Iterator<Item = Message> + 'a {
343        self.iter_timed(timeout)
344            .filter(move |msg| msg_types.contains(&msg.type_()))
345    }
346
347    /// Gets a message from the bus whose type matches the message type mask `types`,
348    /// waiting up to the specified timeout (and discarding any messages that do not
349    /// match the mask provided).
350    ///
351    /// If `timeout` is 0, this function behaves like [`pop_filtered()`][Self::pop_filtered()]. If
352    /// `timeout` is `GST_CLOCK_TIME_NONE`, this function will block forever until a
353    /// matching message was posted on the bus.
354    /// ## `timeout`
355    /// a timeout in nanoseconds, or `GST_CLOCK_TIME_NONE` to wait forever
356    /// ## `types`
357    /// message types to take into account, `GST_MESSAGE_ANY` for any type
358    ///
359    /// # Returns
360    ///
361    /// a [`Message`][crate::Message] matching the
362    ///  filter in `types`, or [`None`] if no matching message was found on
363    ///  the bus until the timeout expired.
364    #[doc(alias = "gst_bus_timed_pop_filtered")]
365    pub fn timed_pop_filtered(
366        &self,
367        timeout: impl Into<Option<crate::ClockTime>> + Clone,
368        msg_types: &[MessageType],
369    ) -> Option<Message> {
370        loop {
371            let msg = self.timed_pop(timeout.clone())?;
372            if msg_types.contains(&msg.type_()) {
373                return Some(msg);
374            }
375        }
376    }
377
378    /// Gets a message matching `type_` from the bus. Will discard all messages on
379    /// the bus that do not match `type_` and that have been posted before the first
380    /// message that does match `type_`. If there is no message matching `type_` on
381    /// the bus, all messages will be discarded. It is not possible to use message
382    /// enums beyond `GST_MESSAGE_EXTENDED` in the `events` mask.
383    /// ## `types`
384    /// message types to take into account
385    ///
386    /// # Returns
387    ///
388    /// the next [`Message`][crate::Message] matching
389    ///  `type_` that is on the bus, or [`None`] if the bus is empty or there
390    ///  is no message matching `type_`.
391    #[doc(alias = "gst_bus_pop_filtered")]
392    pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
393        loop {
394            let msg = self.pop()?;
395            if msg_types.contains(&msg.type_()) {
396                return Some(msg);
397            }
398        }
399    }
400
401    pub fn stream(&self) -> BusStream {
402        BusStream::new(self)
403    }
404
405    pub fn stream_filtered<'a>(
406        &self,
407        message_types: &'a [MessageType],
408    ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
409        self.stream().filter(move |message| {
410            let message_type = message.type_();
411
412            future::ready(message_types.contains(&message_type))
413        })
414    }
415}
416
417#[must_use = "iterators are lazy and do nothing unless consumed"]
418#[derive(Debug)]
419pub struct Iter<'a> {
420    bus: &'a Bus,
421    timeout: Option<crate::ClockTime>,
422}
423
424impl Iterator for Iter<'_> {
425    type Item = Message;
426
427    fn next(&mut self) -> Option<Message> {
428        self.bus.timed_pop(self.timeout)
429    }
430}
431
432#[derive(Debug)]
433pub struct BusStream {
434    bus: glib::WeakRef<Bus>,
435    receiver: UnboundedReceiver<Message>,
436}
437
438impl BusStream {
439    fn new(bus: &Bus) -> Self {
440        skip_assert_initialized!();
441
442        let (sender, receiver) = mpsc::unbounded();
443
444        bus.set_sync_handler(move |bus, message| {
445            // First pop all messages that might've been previously queued before creating
446            // the bus stream.
447            while let Some(message) = bus.pop() {
448                let _ = sender.unbounded_send(message);
449            }
450
451            let _ = sender.unbounded_send(message.to_owned());
452
453            BusSyncReply::Drop
454        });
455
456        Self {
457            bus: bus.downgrade(),
458            receiver,
459        }
460    }
461}
462
463impl Drop for BusStream {
464    fn drop(&mut self) {
465        if let Some(bus) = self.bus.upgrade() {
466            bus.unset_sync_handler();
467        }
468    }
469}
470
471impl Stream for BusStream {
472    type Item = Message;
473
474    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
475        self.receiver.poll_next_unpin(context)
476    }
477}
478
479impl FusedStream for BusStream {
480    fn is_terminated(&self) -> bool {
481        self.receiver.is_terminated()
482    }
483}
484
485// rustdoc-stripper-ignore-next
486/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
487///
488/// When dropped the bus watch is removed from the bus.
489#[derive(Debug)]
490#[must_use = "if unused the bus watch will immediately be removed"]
491pub struct BusWatchGuard {
492    bus: Bus,
493}
494
495impl Drop for BusWatchGuard {
496    fn drop(&mut self) {
497        let _ = self.bus.remove_watch();
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use std::sync::{Arc, Mutex};
504
505    use super::*;
506
507    #[test]
508    fn test_sync_handler() {
509        crate::init().unwrap();
510
511        let bus = Bus::new();
512        let msgs = Arc::new(Mutex::new(Vec::new()));
513        let msgs_clone = msgs.clone();
514        bus.set_sync_handler(move |_, msg| {
515            msgs_clone.lock().unwrap().push(msg.clone());
516            BusSyncReply::Pass
517        });
518
519        bus.post(crate::message::Eos::new()).unwrap();
520
521        let msgs = msgs.lock().unwrap();
522        assert_eq!(msgs.len(), 1);
523        match msgs[0].view() {
524            crate::MessageView::Eos(_) => (),
525            _ => unreachable!(),
526        }
527    }
528
529    #[test]
530    fn test_bus_stream() {
531        crate::init().unwrap();
532
533        let bus = Bus::new();
534        let bus_stream = bus.stream();
535
536        let eos_message = crate::message::Eos::new();
537        bus.post(eos_message).unwrap();
538
539        let bus_future = bus_stream.into_future();
540        let (message, _) = futures_executor::block_on(bus_future);
541
542        match message.unwrap().view() {
543            crate::MessageView::Eos(_) => (),
544            _ => unreachable!(),
545        }
546    }
547}