gstreamer/
clock.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    cmp,
5    marker::Unpin,
6    pin::Pin,
7    ptr,
8    sync::{atomic, atomic::AtomicI32},
9};
10
11use futures_core::{Future, Stream};
12use glib::{
13    ffi::{gboolean, gpointer},
14    prelude::*,
15    translate::*,
16};
17use libc::c_void;
18
19use crate::{
20    ffi, prelude::*, Clock, ClockEntryType, ClockError, ClockFlags, ClockReturn, ClockSuccess,
21    ClockTime, ClockTimeDiff,
22};
23
24glib::wrapper! {
25    #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
26    pub struct ClockId(Shared<c_void>);
27
28    match fn {
29        ref => |ptr| ffi::gst_clock_id_ref(ptr),
30        unref => |ptr| ffi::gst_clock_id_unref(ptr),
31    }
32}
33
34impl ClockId {
35    #[doc(alias = "get_time")]
36    #[doc(alias = "gst_clock_id_get_time")]
37    #[doc(alias = "GST_CLOCK_ENTRY_TIME")]
38    pub fn time(&self) -> ClockTime {
39        unsafe {
40            try_from_glib(ffi::gst_clock_id_get_time(self.to_glib_none().0))
41                .expect("undefined time")
42        }
43    }
44
45    #[doc(alias = "gst_clock_id_unschedule")]
46    pub fn unschedule(&self) {
47        unsafe { ffi::gst_clock_id_unschedule(self.to_glib_none().0) }
48    }
49
50    #[doc(alias = "gst_clock_id_wait")]
51    pub fn wait(&self) -> (Result<ClockSuccess, ClockError>, ClockTimeDiff) {
52        unsafe {
53            let mut jitter = 0;
54            let res = try_from_glib(ffi::gst_clock_id_wait(self.to_glib_none().0, &mut jitter));
55            (res, jitter)
56        }
57    }
58
59    #[doc(alias = "gst_clock_id_compare_func")]
60    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
61        unsafe {
62            let res = ffi::gst_clock_id_compare_func(self.to_glib_none().0, other.to_glib_none().0);
63            res.cmp(&0)
64        }
65    }
66
67    #[cfg(feature = "v1_16")]
68    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
69    #[doc(alias = "get_clock")]
70    #[doc(alias = "gst_clock_id_get_clock")]
71    pub fn clock(&self) -> Option<Clock> {
72        unsafe { from_glib_full(ffi::gst_clock_id_get_clock(self.to_glib_none().0)) }
73    }
74
75    #[cfg(feature = "v1_16")]
76    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
77    #[doc(alias = "gst_clock_id_uses_clock")]
78    pub fn uses_clock<P: IsA<Clock>>(&self, clock: &P) -> bool {
79        unsafe {
80            from_glib(ffi::gst_clock_id_uses_clock(
81                self.to_glib_none().0,
82                clock.as_ref().as_ptr(),
83            ))
84        }
85    }
86
87    #[doc(alias = "get_type")]
88    #[doc(alias = "GST_CLOCK_ENTRY_TYPE")]
89    pub fn type_(&self) -> ClockEntryType {
90        unsafe {
91            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
92            from_glib((*ptr).type_)
93        }
94    }
95
96    #[doc(alias = "get_status")]
97    #[doc(alias = "GST_CLOCK_ENTRY_STATUS")]
98    pub fn status(&self) -> &AtomicClockReturn {
99        unsafe {
100            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
101            &*((&(*ptr).status) as *const i32 as *const AtomicClockReturn)
102        }
103    }
104}
105
106#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
107pub struct SingleShotClockId(ClockId);
108
109impl std::ops::Deref for SingleShotClockId {
110    type Target = ClockId;
111
112    #[inline]
113    fn deref(&self) -> &Self::Target {
114        &self.0
115    }
116}
117
118impl From<SingleShotClockId> for ClockId {
119    #[inline]
120    fn from(id: SingleShotClockId) -> ClockId {
121        skip_assert_initialized!();
122        id.0
123    }
124}
125
126impl TryFrom<ClockId> for SingleShotClockId {
127    type Error = glib::BoolError;
128
129    #[inline]
130    fn try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError> {
131        skip_assert_initialized!();
132        match id.type_() {
133            ClockEntryType::Single => Ok(SingleShotClockId(id)),
134            _ => Err(glib::bool_error!("Not a single-shot clock id")),
135        }
136    }
137}
138
139impl SingleShotClockId {
140    #[doc(alias = "gst_clock_id_compare_func")]
141    #[inline]
142    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
143        self.0.compare_by_time(&other.0)
144    }
145
146    #[doc(alias = "gst_clock_id_wait_async")]
147    pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
148    where
149        F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
150    {
151        unsafe extern "C" fn trampoline<
152            F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
153        >(
154            clock: *mut ffi::GstClock,
155            time: ffi::GstClockTime,
156            id: gpointer,
157            func: gpointer,
158        ) -> gboolean {
159            let f: &mut Option<F> = &mut *(func as *mut Option<F>);
160            let f = f.take().unwrap();
161
162            f(
163                &from_glib_borrow(clock),
164                from_glib(time),
165                &from_glib_borrow(id),
166            );
167
168            glib::ffi::GTRUE
169        }
170
171        unsafe extern "C" fn destroy_notify<
172            F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
173        >(
174            ptr: gpointer,
175        ) {
176            let _ = Box::<Option<F>>::from_raw(ptr as *mut _);
177        }
178
179        let func: Box<Option<F>> = Box::new(Some(func));
180
181        unsafe {
182            try_from_glib(ffi::gst_clock_id_wait_async(
183                self.to_glib_none().0,
184                Some(trampoline::<F>),
185                Box::into_raw(func) as gpointer,
186                Some(destroy_notify::<F>),
187            ))
188        }
189    }
190
191    #[allow(clippy::type_complexity)]
192    pub fn wait_async_future(
193        &self,
194    ) -> Result<
195        Pin<
196            Box<
197                dyn Future<Output = Result<(Option<ClockTime>, ClockId), ClockError>>
198                    + Send
199                    + 'static,
200            >,
201        >,
202        ClockError,
203    > {
204        use futures_channel::oneshot;
205
206        let (sender, receiver) = oneshot::channel();
207
208        self.wait_async(move |_clock, jitter, id| {
209            if sender.send((jitter, id.clone())).is_err() {
210                // Unschedule any future calls if the receiver end is disconnected
211                id.unschedule();
212            }
213        })?;
214
215        Ok(Box::pin(async move {
216            receiver.await.map_err(|_| ClockError::Unscheduled)
217        }))
218    }
219}
220
221#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
222pub struct PeriodicClockId(ClockId);
223
224impl std::ops::Deref for PeriodicClockId {
225    type Target = ClockId;
226
227    #[inline]
228    fn deref(&self) -> &Self::Target {
229        &self.0
230    }
231}
232
233impl From<PeriodicClockId> for ClockId {
234    #[inline]
235    fn from(id: PeriodicClockId) -> ClockId {
236        skip_assert_initialized!();
237        id.0
238    }
239}
240
241impl TryFrom<ClockId> for PeriodicClockId {
242    type Error = glib::BoolError;
243
244    #[inline]
245    fn try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError> {
246        skip_assert_initialized!();
247        match id.type_() {
248            ClockEntryType::Periodic => Ok(PeriodicClockId(id)),
249            _ => Err(glib::bool_error!("Not a periodic clock id")),
250        }
251    }
252}
253
254impl PeriodicClockId {
255    #[doc(alias = "get_interval")]
256    #[doc(alias = "GST_CLOCK_ENTRY_INTERVAL")]
257    #[inline]
258    pub fn interval(&self) -> ClockTime {
259        unsafe {
260            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
261            try_from_glib((*ptr).interval).expect("undefined interval")
262        }
263    }
264
265    #[doc(alias = "gst_clock_id_compare_func")]
266    #[inline]
267    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
268        self.0.compare_by_time(&other.0)
269    }
270
271    #[doc(alias = "gst_clock_id_wait_async")]
272    pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
273    where
274        F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
275    {
276        unsafe extern "C" fn trampoline<
277            F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
278        >(
279            clock: *mut ffi::GstClock,
280            time: ffi::GstClockTime,
281            id: gpointer,
282            func: gpointer,
283        ) -> gboolean {
284            let f: &F = &*(func as *const F);
285            f(
286                &from_glib_borrow(clock),
287                from_glib(time),
288                &from_glib_borrow(id),
289            );
290            glib::ffi::GTRUE
291        }
292
293        unsafe extern "C" fn destroy_notify<
294            F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
295        >(
296            ptr: gpointer,
297        ) {
298            let _ = Box::<F>::from_raw(ptr as *mut _);
299        }
300
301        let func: Box<F> = Box::new(func);
302        unsafe {
303            try_from_glib(ffi::gst_clock_id_wait_async(
304                self.to_glib_none().0,
305                Some(trampoline::<F>),
306                Box::into_raw(func) as gpointer,
307                Some(destroy_notify::<F>),
308            ))
309        }
310    }
311
312    #[allow(clippy::type_complexity)]
313    pub fn wait_async_stream(
314        &self,
315    ) -> Result<
316        Pin<Box<dyn Stream<Item = (Option<ClockTime>, ClockId)> + Unpin + Send + 'static>>,
317        ClockError,
318    > {
319        use futures_channel::mpsc;
320
321        let (sender, receiver) = mpsc::unbounded();
322
323        self.wait_async(move |_clock, jitter, id| {
324            if sender.unbounded_send((jitter, id.clone())).is_err() {
325                // Unschedule any future calls if the receiver end is disconnected
326                id.unschedule();
327            }
328        })?;
329
330        Ok(Box::pin(receiver))
331    }
332}
333
334#[repr(transparent)]
335#[derive(Debug)]
336pub struct AtomicClockReturn(AtomicI32);
337
338impl AtomicClockReturn {
339    #[inline]
340    pub fn load(&self) -> ClockReturn {
341        unsafe { from_glib(self.0.load(atomic::Ordering::SeqCst)) }
342    }
343
344    #[inline]
345    pub fn store(&self, val: ClockReturn) {
346        self.0.store(val.into_glib(), atomic::Ordering::SeqCst)
347    }
348
349    #[inline]
350    pub fn swap(&self, val: ClockReturn) -> ClockReturn {
351        unsafe { from_glib(self.0.swap(val.into_glib(), atomic::Ordering::SeqCst)) }
352    }
353
354    #[inline]
355    pub fn compare_exchange(
356        &self,
357        current: ClockReturn,
358        new: ClockReturn,
359    ) -> Result<ClockReturn, ClockReturn> {
360        unsafe {
361            self.0
362                .compare_exchange(
363                    current.into_glib(),
364                    new.into_glib(),
365                    atomic::Ordering::SeqCst,
366                    atomic::Ordering::SeqCst,
367                )
368                .map(|v| from_glib(v))
369                .map_err(|v| from_glib(v))
370        }
371    }
372}
373
374unsafe impl Send for ClockId {}
375unsafe impl Sync for ClockId {}
376
377impl Clock {
378    #[doc(alias = "gst_clock_adjust_with_calibration")]
379    pub fn adjust_with_calibration(
380        internal_target: ClockTime,
381        cinternal: ClockTime,
382        cexternal: ClockTime,
383        cnum: u64,
384        cdenom: u64,
385    ) -> ClockTime {
386        skip_assert_initialized!();
387        unsafe {
388            try_from_glib(ffi::gst_clock_adjust_with_calibration(
389                ptr::null_mut(),
390                internal_target.into_glib(),
391                cinternal.into_glib(),
392                cexternal.into_glib(),
393                cnum,
394                cdenom,
395            ))
396            .expect("undefined ClockTime")
397        }
398    }
399
400    #[doc(alias = "gst_clock_unadjust_with_calibration")]
401    pub fn unadjust_with_calibration(
402        external_target: ClockTime,
403        cinternal: ClockTime,
404        cexternal: ClockTime,
405        cnum: u64,
406        cdenom: u64,
407    ) -> ClockTime {
408        skip_assert_initialized!();
409        unsafe {
410            try_from_glib(ffi::gst_clock_unadjust_with_calibration(
411                ptr::null_mut(),
412                external_target.into_glib(),
413                cinternal.into_glib(),
414                cexternal.into_glib(),
415                cnum,
416                cdenom,
417            ))
418            .expect("undefined ClockTime")
419        }
420    }
421}
422
423mod sealed {
424    pub trait Sealed {}
425    impl<T: super::IsA<super::Clock>> Sealed for T {}
426}
427
428pub trait ClockExtManual: sealed::Sealed + IsA<Clock> + 'static {
429    /// Gets an ID from `self` to trigger a periodic notification.
430    /// The periodic notifications will start at time `start_time` and
431    /// will then be fired with the given `interval`.
432    /// ## `start_time`
433    /// the requested start time
434    /// ## `interval`
435    /// the requested interval
436    ///
437    /// # Returns
438    ///
439    /// a `GstClockID` that can be used to request the
440    ///  time notification.
441    #[doc(alias = "gst_clock_new_periodic_id")]
442    fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId {
443        assert_ne!(interval, ClockTime::ZERO);
444
445        unsafe {
446            PeriodicClockId(from_glib_full(ffi::gst_clock_new_periodic_id(
447                self.as_ref().to_glib_none().0,
448                start_time.into_glib(),
449                interval.into_glib(),
450            )))
451        }
452    }
453
454    /// Reinitializes the provided periodic `id` to the provided start time and
455    /// interval. Does not modify the reference count.
456    /// ## `id`
457    /// a `GstClockID`
458    /// ## `start_time`
459    /// the requested start time
460    /// ## `interval`
461    /// the requested interval
462    ///
463    /// # Returns
464    ///
465    /// [`true`] if the GstClockID could be reinitialized to the provided
466    /// `time`, else [`false`].
467    #[doc(alias = "gst_clock_periodic_id_reinit")]
468    fn periodic_id_reinit(
469        &self,
470        id: &PeriodicClockId,
471        start_time: ClockTime,
472        interval: ClockTime,
473    ) -> Result<(), glib::BoolError> {
474        unsafe {
475            let res: bool = from_glib(ffi::gst_clock_periodic_id_reinit(
476                self.as_ref().to_glib_none().0,
477                id.to_glib_none().0,
478                start_time.into_glib(),
479                interval.into_glib(),
480            ));
481            if res {
482                Ok(())
483            } else {
484                Err(glib::bool_error!("Failed to reinit periodic clock id"))
485            }
486        }
487    }
488
489    /// Gets a `GstClockID` from `self` to trigger a single shot
490    /// notification at the requested time.
491    /// ## `time`
492    /// the requested time
493    ///
494    /// # Returns
495    ///
496    /// a `GstClockID` that can be used to request the
497    ///  time notification.
498    #[doc(alias = "gst_clock_new_single_shot_id")]
499    fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId {
500        unsafe {
501            SingleShotClockId(from_glib_full(ffi::gst_clock_new_single_shot_id(
502                self.as_ref().to_glib_none().0,
503                time.into_glib(),
504            )))
505        }
506    }
507
508    /// Reinitializes the provided single shot `id` to the provided time. Does not
509    /// modify the reference count.
510    /// ## `id`
511    /// a `GstClockID`
512    /// ## `time`
513    /// The requested time.
514    ///
515    /// # Returns
516    ///
517    /// [`true`] if the GstClockID could be reinitialized to the provided
518    /// `time`, else [`false`].
519    #[doc(alias = "gst_clock_single_shot_id_reinit")]
520    fn single_shot_id_reinit(
521        &self,
522        id: &SingleShotClockId,
523        time: ClockTime,
524    ) -> Result<(), glib::BoolError> {
525        unsafe {
526            let res: bool = from_glib(ffi::gst_clock_single_shot_id_reinit(
527                self.as_ref().to_glib_none().0,
528                id.to_glib_none().0,
529                time.into_glib(),
530            ));
531            if res {
532                Ok(())
533            } else {
534                Err(glib::bool_error!("Failed to reinit single shot clock id"))
535            }
536        }
537    }
538
539    fn set_clock_flags(&self, flags: ClockFlags) {
540        unsafe {
541            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
542            let _guard = self.as_ref().object_lock();
543            (*ptr).flags |= flags.into_glib();
544        }
545    }
546
547    fn unset_clock_flags(&self, flags: ClockFlags) {
548        unsafe {
549            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
550            let _guard = self.as_ref().object_lock();
551            (*ptr).flags &= !flags.into_glib();
552        }
553    }
554
555    #[doc(alias = "get_clock_flags")]
556    fn clock_flags(&self) -> ClockFlags {
557        unsafe {
558            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
559            let _guard = self.as_ref().object_lock();
560            from_glib((*ptr).flags)
561        }
562    }
563
564    /// Gets the internal rate and reference time of `self`. See
565    /// [`set_calibration()`][Self::set_calibration()] for more information.
566    ///
567    /// `internal`, `external`, `rate_num`, and `rate_denom` can be left [`None`] if the
568    /// caller is not interested in the values.
569    ///
570    /// # Returns
571    ///
572    ///
573    /// ## `internal`
574    /// a location to store the internal time
575    ///
576    /// ## `external`
577    /// a location to store the external time
578    ///
579    /// ## `rate_num`
580    /// a location to store the rate numerator
581    ///
582    /// ## `rate_denom`
583    /// a location to store the rate denominator
584    #[doc(alias = "gst_clock_get_calibration")]
585    #[doc(alias = "get_calibration")]
586    fn calibration(&self) -> (ClockTime, ClockTime, u64, u64) {
587        unsafe {
588            let mut internal = std::mem::MaybeUninit::uninit();
589            let mut external = std::mem::MaybeUninit::uninit();
590            let mut rate_num = std::mem::MaybeUninit::uninit();
591            let mut rate_denom = std::mem::MaybeUninit::uninit();
592            ffi::gst_clock_get_calibration(
593                self.as_ref().to_glib_none().0,
594                internal.as_mut_ptr(),
595                external.as_mut_ptr(),
596                rate_num.as_mut_ptr(),
597                rate_denom.as_mut_ptr(),
598            );
599            (
600                try_from_glib(internal.assume_init()).expect("mandatory glib value is None"),
601                try_from_glib(external.assume_init()).expect("mandatory glib value is None"),
602                rate_num.assume_init(),
603                rate_denom.assume_init(),
604            )
605        }
606    }
607
608    /// Adjusts the rate and time of `self`. A rate of 1/1 is the normal speed of
609    /// the clock. Values bigger than 1/1 make the clock go faster.
610    ///
611    /// `internal` and `external` are calibration parameters that arrange that
612    /// [`ClockExt::time()`][crate::prelude::ClockExt::time()] should have been `external` at internal time `internal`.
613    /// This internal time should not be in the future; that is, it should be less
614    /// than the value of [`ClockExt::internal_time()`][crate::prelude::ClockExt::internal_time()] when this function is called.
615    ///
616    /// Subsequent calls to [`ClockExt::time()`][crate::prelude::ClockExt::time()] will return clock times computed as
617    /// follows:
618    ///
619    /// **⚠️ The following code is in  C ⚠️**
620    ///
621    /// ``` C
622    ///   time = (internal_time - internal) * rate_num / rate_denom + external
623    /// ```
624    ///
625    /// This formula is implemented in [`ClockExt::adjust_unlocked()`][crate::prelude::ClockExt::adjust_unlocked()]. Of course, it
626    /// tries to do the integer arithmetic as precisely as possible.
627    ///
628    /// Note that [`ClockExt::time()`][crate::prelude::ClockExt::time()] always returns increasing values so when you
629    /// move the clock backwards, [`ClockExt::time()`][crate::prelude::ClockExt::time()] will report the previous value
630    /// until the clock catches up.
631    /// ## `internal`
632    /// a reference internal time
633    /// ## `external`
634    /// a reference external time
635    /// ## `rate_num`
636    /// the numerator of the rate of the clock relative to its
637    ///  internal time
638    /// ## `rate_denom`
639    /// the denominator of the rate of the clock
640    #[doc(alias = "gst_clock_set_calibration")]
641    fn set_calibration(
642        &self,
643        internal: ClockTime,
644        external: ClockTime,
645        rate_num: u64,
646        rate_denom: u64,
647    ) {
648        unsafe {
649            ffi::gst_clock_set_calibration(
650                self.as_ref().to_glib_none().0,
651                internal.into_glib(),
652                external.into_glib(),
653                rate_num,
654                rate_denom,
655            );
656        }
657    }
658}
659
660impl<O: IsA<Clock>> ClockExtManual for O {}
661
662#[cfg(test)]
663mod tests {
664    use std::sync::mpsc::channel;
665
666    use super::*;
667    use crate::SystemClock;
668
669    #[test]
670    fn test_wait() {
671        crate::init().unwrap();
672
673        let clock = SystemClock::obtain();
674        let now = clock.time().unwrap();
675        let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
676        let (res, _) = id.wait();
677
678        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
679    }
680
681    #[test]
682    fn test_wait_async() {
683        crate::init().unwrap();
684
685        let (sender, receiver) = channel();
686
687        let clock = SystemClock::obtain();
688        let now = clock.time().unwrap();
689        let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
690        let res = id.wait_async(move |_, _, _| {
691            sender.send(()).unwrap();
692        });
693
694        assert!(res == Ok(ClockSuccess::Ok));
695
696        assert_eq!(receiver.recv(), Ok(()));
697    }
698
699    #[test]
700    fn test_wait_periodic() {
701        crate::init().unwrap();
702
703        let clock = SystemClock::obtain();
704        let now = clock.time().unwrap();
705        let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
706
707        let (res, _) = id.wait();
708        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
709
710        let (res, _) = id.wait();
711        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
712    }
713
714    #[test]
715    fn test_wait_async_periodic() {
716        crate::init().unwrap();
717
718        let (sender, receiver) = channel();
719
720        let clock = SystemClock::obtain();
721        let now = clock.time().unwrap();
722        let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
723        let res = id.wait_async(move |_, _, _| {
724            let _ = sender.send(());
725        });
726
727        assert!(res == Ok(ClockSuccess::Ok));
728
729        assert_eq!(receiver.recv(), Ok(()));
730        assert_eq!(receiver.recv(), Ok(()));
731    }
732}