Skip to main content

gstreamer/
clock.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    cmp,
5    marker::Unpin,
6    pin::Pin,
7    ptr,
8    sync::{atomic, atomic::AtomicI32},
9};
10
11use futures_core::{Future, Stream};
12use glib::{
13    ffi::{gboolean, gpointer},
14    prelude::*,
15    translate::*,
16};
17use libc::c_void;
18
19use crate::{
20    Clock, ClockEntryType, ClockError, ClockFlags, ClockReturn, ClockSuccess, ClockTime,
21    ClockTimeDiff, ffi, prelude::*,
22};
23
24glib::wrapper! {
25    #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
26    pub struct ClockId(Shared<c_void>);
27
28    match fn {
29        ref => |ptr| ffi::gst_clock_id_ref(ptr),
30        unref => |ptr| ffi::gst_clock_id_unref(ptr),
31    }
32}
33
34impl ClockId {
35    #[doc(alias = "get_time")]
36    #[doc(alias = "gst_clock_id_get_time")]
37    #[doc(alias = "GST_CLOCK_ENTRY_TIME")]
38    pub fn time(&self) -> ClockTime {
39        unsafe {
40            try_from_glib(ffi::gst_clock_id_get_time(self.to_glib_none().0))
41                .expect("undefined time")
42        }
43    }
44
45    #[doc(alias = "gst_clock_id_unschedule")]
46    pub fn unschedule(&self) {
47        unsafe { ffi::gst_clock_id_unschedule(self.to_glib_none().0) }
48    }
49
50    #[doc(alias = "gst_clock_id_wait")]
51    pub fn wait(&self) -> (Result<ClockSuccess, ClockError>, ClockTimeDiff) {
52        unsafe {
53            let mut jitter = 0;
54            let res = try_from_glib(ffi::gst_clock_id_wait(self.to_glib_none().0, &mut jitter));
55            (res, jitter)
56        }
57    }
58
59    #[doc(alias = "gst_clock_id_compare_func")]
60    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
61        unsafe {
62            let res = ffi::gst_clock_id_compare_func(self.to_glib_none().0, other.to_glib_none().0);
63            res.cmp(&0)
64        }
65    }
66
67    #[cfg(feature = "v1_16")]
68    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
69    #[doc(alias = "get_clock")]
70    #[doc(alias = "gst_clock_id_get_clock")]
71    pub fn clock(&self) -> Option<Clock> {
72        unsafe { from_glib_full(ffi::gst_clock_id_get_clock(self.to_glib_none().0)) }
73    }
74
75    #[cfg(feature = "v1_16")]
76    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
77    #[doc(alias = "gst_clock_id_uses_clock")]
78    pub fn uses_clock<P: IsA<Clock>>(&self, clock: &P) -> bool {
79        unsafe {
80            from_glib(ffi::gst_clock_id_uses_clock(
81                self.to_glib_none().0,
82                clock.as_ref().as_ptr(),
83            ))
84        }
85    }
86
87    #[doc(alias = "get_type")]
88    #[doc(alias = "GST_CLOCK_ENTRY_TYPE")]
89    pub fn type_(&self) -> ClockEntryType {
90        unsafe {
91            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
92            from_glib((*ptr).type_)
93        }
94    }
95
96    #[doc(alias = "get_status")]
97    #[doc(alias = "GST_CLOCK_ENTRY_STATUS")]
98    pub fn status(&self) -> &AtomicClockReturn {
99        unsafe {
100            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
101            &*((&(*ptr).status) as *const i32 as *const AtomicClockReturn)
102        }
103    }
104}
105
106#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
107pub struct SingleShotClockId(ClockId);
108
109impl std::ops::Deref for SingleShotClockId {
110    type Target = ClockId;
111
112    #[inline]
113    fn deref(&self) -> &Self::Target {
114        &self.0
115    }
116}
117
118impl From<SingleShotClockId> for ClockId {
119    #[inline]
120    fn from(id: SingleShotClockId) -> ClockId {
121        skip_assert_initialized!();
122        id.0
123    }
124}
125
126impl TryFrom<ClockId> for SingleShotClockId {
127    type Error = glib::BoolError;
128
129    #[inline]
130    fn try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError> {
131        skip_assert_initialized!();
132        match id.type_() {
133            ClockEntryType::Single => Ok(SingleShotClockId(id)),
134            _ => Err(glib::bool_error!("Not a single-shot clock id")),
135        }
136    }
137}
138
139impl SingleShotClockId {
140    #[doc(alias = "gst_clock_id_compare_func")]
141    #[inline]
142    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
143        self.0.compare_by_time(&other.0)
144    }
145
146    #[doc(alias = "gst_clock_id_wait_async")]
147    pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
148    where
149        F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
150    {
151        unsafe extern "C" fn trampoline<
152            F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
153        >(
154            clock: *mut ffi::GstClock,
155            time: ffi::GstClockTime,
156            id: gpointer,
157            func: gpointer,
158        ) -> gboolean {
159            unsafe {
160                let f: &mut Option<F> = &mut *(func as *mut Option<F>);
161                let f = f.take().unwrap();
162
163                f(
164                    &from_glib_borrow(clock),
165                    from_glib(time),
166                    &from_glib_borrow(id),
167                );
168
169                glib::ffi::GTRUE
170            }
171        }
172
173        unsafe extern "C" fn destroy_notify<
174            F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
175        >(
176            ptr: gpointer,
177        ) {
178            unsafe {
179                let _ = Box::<Option<F>>::from_raw(ptr as *mut _);
180            }
181        }
182
183        let func: Box<Option<F>> = Box::new(Some(func));
184
185        unsafe {
186            try_from_glib(ffi::gst_clock_id_wait_async(
187                self.to_glib_none().0,
188                Some(trampoline::<F>),
189                Box::into_raw(func) as gpointer,
190                Some(destroy_notify::<F>),
191            ))
192        }
193    }
194
195    #[allow(clippy::type_complexity)]
196    pub fn wait_async_future(
197        &self,
198    ) -> Result<
199        Pin<
200            Box<
201                dyn Future<Output = Result<(Option<ClockTime>, ClockId), ClockError>>
202                    + Send
203                    + 'static,
204            >,
205        >,
206        ClockError,
207    > {
208        use futures_channel::oneshot;
209
210        let (sender, receiver) = oneshot::channel();
211
212        self.wait_async(move |_clock, jitter, id| {
213            if sender.send((jitter, id.clone())).is_err() {
214                // Unschedule any future calls if the receiver end is disconnected
215                id.unschedule();
216            }
217        })?;
218
219        Ok(Box::pin(async move {
220            receiver.await.map_err(|_| ClockError::Unscheduled)
221        }))
222    }
223}
224
225#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
226pub struct PeriodicClockId(ClockId);
227
228impl std::ops::Deref for PeriodicClockId {
229    type Target = ClockId;
230
231    #[inline]
232    fn deref(&self) -> &Self::Target {
233        &self.0
234    }
235}
236
237impl From<PeriodicClockId> for ClockId {
238    #[inline]
239    fn from(id: PeriodicClockId) -> ClockId {
240        skip_assert_initialized!();
241        id.0
242    }
243}
244
245impl TryFrom<ClockId> for PeriodicClockId {
246    type Error = glib::BoolError;
247
248    #[inline]
249    fn try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError> {
250        skip_assert_initialized!();
251        match id.type_() {
252            ClockEntryType::Periodic => Ok(PeriodicClockId(id)),
253            _ => Err(glib::bool_error!("Not a periodic clock id")),
254        }
255    }
256}
257
258impl PeriodicClockId {
259    #[doc(alias = "get_interval")]
260    #[doc(alias = "GST_CLOCK_ENTRY_INTERVAL")]
261    #[inline]
262    pub fn interval(&self) -> ClockTime {
263        unsafe {
264            let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
265            try_from_glib((*ptr).interval).expect("undefined interval")
266        }
267    }
268
269    #[doc(alias = "gst_clock_id_compare_func")]
270    #[inline]
271    pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
272        self.0.compare_by_time(&other.0)
273    }
274
275    #[doc(alias = "gst_clock_id_wait_async")]
276    pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
277    where
278        F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
279    {
280        unsafe extern "C" fn trampoline<
281            F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
282        >(
283            clock: *mut ffi::GstClock,
284            time: ffi::GstClockTime,
285            id: gpointer,
286            func: gpointer,
287        ) -> gboolean {
288            unsafe {
289                let f: &F = &*(func as *const F);
290                f(
291                    &from_glib_borrow(clock),
292                    from_glib(time),
293                    &from_glib_borrow(id),
294                );
295                glib::ffi::GTRUE
296            }
297        }
298
299        unsafe extern "C" fn destroy_notify<
300            F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
301        >(
302            ptr: gpointer,
303        ) {
304            unsafe {
305                let _ = Box::<F>::from_raw(ptr as *mut _);
306            }
307        }
308
309        let func: Box<F> = Box::new(func);
310        unsafe {
311            try_from_glib(ffi::gst_clock_id_wait_async(
312                self.to_glib_none().0,
313                Some(trampoline::<F>),
314                Box::into_raw(func) as gpointer,
315                Some(destroy_notify::<F>),
316            ))
317        }
318    }
319
320    #[allow(clippy::type_complexity)]
321    pub fn wait_async_stream(
322        &self,
323    ) -> Result<
324        Pin<Box<dyn Stream<Item = (Option<ClockTime>, ClockId)> + Unpin + Send + 'static>>,
325        ClockError,
326    > {
327        use futures_channel::mpsc;
328
329        let (sender, receiver) = mpsc::unbounded();
330
331        self.wait_async(move |_clock, jitter, id| {
332            if sender.unbounded_send((jitter, id.clone())).is_err() {
333                // Unschedule any future calls if the receiver end is disconnected
334                id.unschedule();
335            }
336        })?;
337
338        Ok(Box::pin(receiver))
339    }
340}
341
342#[repr(transparent)]
343#[derive(Debug)]
344pub struct AtomicClockReturn(AtomicI32);
345
346impl AtomicClockReturn {
347    #[inline]
348    pub fn load(&self) -> ClockReturn {
349        unsafe { from_glib(self.0.load(atomic::Ordering::SeqCst)) }
350    }
351
352    #[inline]
353    pub fn store(&self, val: ClockReturn) {
354        self.0.store(val.into_glib(), atomic::Ordering::SeqCst)
355    }
356
357    #[inline]
358    pub fn swap(&self, val: ClockReturn) -> ClockReturn {
359        unsafe { from_glib(self.0.swap(val.into_glib(), atomic::Ordering::SeqCst)) }
360    }
361
362    #[inline]
363    pub fn compare_exchange(
364        &self,
365        current: ClockReturn,
366        new: ClockReturn,
367    ) -> Result<ClockReturn, ClockReturn> {
368        unsafe {
369            self.0
370                .compare_exchange(
371                    current.into_glib(),
372                    new.into_glib(),
373                    atomic::Ordering::SeqCst,
374                    atomic::Ordering::SeqCst,
375                )
376                .map(|v| from_glib(v))
377                .map_err(|v| from_glib(v))
378        }
379    }
380}
381
382unsafe impl Send for ClockId {}
383unsafe impl Sync for ClockId {}
384
385impl Clock {
386    #[doc(alias = "gst_clock_adjust_with_calibration")]
387    pub fn adjust_with_calibration(
388        internal_target: ClockTime,
389        cinternal: ClockTime,
390        cexternal: ClockTime,
391        cnum: u64,
392        cdenom: u64,
393    ) -> ClockTime {
394        skip_assert_initialized!();
395        unsafe {
396            try_from_glib(ffi::gst_clock_adjust_with_calibration(
397                ptr::null_mut(),
398                internal_target.into_glib(),
399                cinternal.into_glib(),
400                cexternal.into_glib(),
401                cnum,
402                cdenom,
403            ))
404            .expect("undefined ClockTime")
405        }
406    }
407
408    #[doc(alias = "gst_clock_unadjust_with_calibration")]
409    pub fn unadjust_with_calibration(
410        external_target: ClockTime,
411        cinternal: ClockTime,
412        cexternal: ClockTime,
413        cnum: u64,
414        cdenom: u64,
415    ) -> ClockTime {
416        skip_assert_initialized!();
417        unsafe {
418            try_from_glib(ffi::gst_clock_unadjust_with_calibration(
419                ptr::null_mut(),
420                external_target.into_glib(),
421                cinternal.into_glib(),
422                cexternal.into_glib(),
423                cnum,
424                cdenom,
425            ))
426            .expect("undefined ClockTime")
427        }
428    }
429}
430
431pub trait ClockExtManual: IsA<Clock> + 'static {
432    /// Gets an ID from `self` to trigger a periodic notification.
433    /// The periodic notifications will start at time `start_time` and
434    /// will then be fired with the given `interval`.
435    /// ## `start_time`
436    /// the requested start time
437    /// ## `interval`
438    /// the requested interval
439    ///
440    /// # Returns
441    ///
442    /// a `GstClockID` that can be used to request the
443    ///  time notification.
444    #[doc(alias = "gst_clock_new_periodic_id")]
445    fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId {
446        assert_ne!(interval, ClockTime::ZERO);
447
448        unsafe {
449            PeriodicClockId(from_glib_full(ffi::gst_clock_new_periodic_id(
450                self.as_ref().to_glib_none().0,
451                start_time.into_glib(),
452                interval.into_glib(),
453            )))
454        }
455    }
456
457    /// Reinitializes the provided periodic `id` to the provided start time and
458    /// interval. Does not modify the reference count.
459    /// ## `id`
460    /// a `GstClockID`
461    /// ## `start_time`
462    /// the requested start time
463    /// ## `interval`
464    /// the requested interval
465    ///
466    /// # Returns
467    ///
468    /// [`true`] if the GstClockID could be reinitialized to the provided
469    /// `time`, else [`false`].
470    #[doc(alias = "gst_clock_periodic_id_reinit")]
471    fn periodic_id_reinit(
472        &self,
473        id: &PeriodicClockId,
474        start_time: ClockTime,
475        interval: ClockTime,
476    ) -> Result<(), glib::BoolError> {
477        unsafe {
478            let res: bool = from_glib(ffi::gst_clock_periodic_id_reinit(
479                self.as_ref().to_glib_none().0,
480                id.to_glib_none().0,
481                start_time.into_glib(),
482                interval.into_glib(),
483            ));
484            if res {
485                Ok(())
486            } else {
487                Err(glib::bool_error!("Failed to reinit periodic clock id"))
488            }
489        }
490    }
491
492    /// Gets a `GstClockID` from `self` to trigger a single shot
493    /// notification at the requested time.
494    /// ## `time`
495    /// the requested time
496    ///
497    /// # Returns
498    ///
499    /// a `GstClockID` that can be used to request the
500    ///  time notification.
501    #[doc(alias = "gst_clock_new_single_shot_id")]
502    fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId {
503        unsafe {
504            SingleShotClockId(from_glib_full(ffi::gst_clock_new_single_shot_id(
505                self.as_ref().to_glib_none().0,
506                time.into_glib(),
507            )))
508        }
509    }
510
511    /// Reinitializes the provided single shot `id` to the provided time. Does not
512    /// modify the reference count.
513    /// ## `id`
514    /// a `GstClockID`
515    /// ## `time`
516    /// The requested time.
517    ///
518    /// # Returns
519    ///
520    /// [`true`] if the GstClockID could be reinitialized to the provided
521    /// `time`, else [`false`].
522    #[doc(alias = "gst_clock_single_shot_id_reinit")]
523    fn single_shot_id_reinit(
524        &self,
525        id: &SingleShotClockId,
526        time: ClockTime,
527    ) -> Result<(), glib::BoolError> {
528        unsafe {
529            let res: bool = from_glib(ffi::gst_clock_single_shot_id_reinit(
530                self.as_ref().to_glib_none().0,
531                id.to_glib_none().0,
532                time.into_glib(),
533            ));
534            if res {
535                Ok(())
536            } else {
537                Err(glib::bool_error!("Failed to reinit single shot clock id"))
538            }
539        }
540    }
541
542    fn set_clock_flags(&self, flags: ClockFlags) {
543        unsafe {
544            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
545            let _guard = self.as_ref().object_lock();
546            (*ptr).flags |= flags.into_glib();
547        }
548    }
549
550    fn unset_clock_flags(&self, flags: ClockFlags) {
551        unsafe {
552            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
553            let _guard = self.as_ref().object_lock();
554            (*ptr).flags &= !flags.into_glib();
555        }
556    }
557
558    #[doc(alias = "get_clock_flags")]
559    fn clock_flags(&self) -> ClockFlags {
560        unsafe {
561            let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
562            let _guard = self.as_ref().object_lock();
563            from_glib((*ptr).flags)
564        }
565    }
566
567    /// Gets the internal rate and reference time of `self`. See
568    /// [`set_calibration()`][Self::set_calibration()] for more information.
569    ///
570    /// `internal`, `external`, `rate_num`, and `rate_denom` can be left [`None`] if the
571    /// caller is not interested in the values.
572    ///
573    /// # Returns
574    ///
575    ///
576    /// ## `internal`
577    /// a location to store the internal time
578    ///
579    /// ## `external`
580    /// a location to store the external time
581    ///
582    /// ## `rate_num`
583    /// a location to store the rate numerator
584    ///
585    /// ## `rate_denom`
586    /// a location to store the rate denominator
587    #[doc(alias = "gst_clock_get_calibration")]
588    #[doc(alias = "get_calibration")]
589    fn calibration(&self) -> (ClockTime, ClockTime, u64, u64) {
590        unsafe {
591            let mut internal = std::mem::MaybeUninit::uninit();
592            let mut external = std::mem::MaybeUninit::uninit();
593            let mut rate_num = std::mem::MaybeUninit::uninit();
594            let mut rate_denom = std::mem::MaybeUninit::uninit();
595            ffi::gst_clock_get_calibration(
596                self.as_ref().to_glib_none().0,
597                internal.as_mut_ptr(),
598                external.as_mut_ptr(),
599                rate_num.as_mut_ptr(),
600                rate_denom.as_mut_ptr(),
601            );
602            (
603                try_from_glib(internal.assume_init()).expect("mandatory glib value is None"),
604                try_from_glib(external.assume_init()).expect("mandatory glib value is None"),
605                rate_num.assume_init(),
606                rate_denom.assume_init(),
607            )
608        }
609    }
610
611    /// Adjusts the rate and time of `self`. A rate of 1/1 is the normal speed of
612    /// the clock. Values bigger than 1/1 make the clock go faster.
613    ///
614    /// `internal` and `external` are calibration parameters that arrange that
615    /// [`ClockExt::time()`][crate::prelude::ClockExt::time()] should have been `external` at internal time `internal`.
616    /// This internal time should not be in the future; that is, it should be less
617    /// than the value of [`ClockExt::internal_time()`][crate::prelude::ClockExt::internal_time()] when this function is called.
618    ///
619    /// Subsequent calls to [`ClockExt::time()`][crate::prelude::ClockExt::time()] will return clock times computed as
620    /// follows:
621    ///
622    /// **⚠️ The following code is in  C ⚠️**
623    ///
624    /// ``` C
625    ///   time = (internal_time - internal) * rate_num / rate_denom + external
626    /// ```
627    ///
628    /// This formula is implemented in [`ClockExt::adjust_unlocked()`][crate::prelude::ClockExt::adjust_unlocked()]. Of course, it
629    /// tries to do the integer arithmetic as precisely as possible.
630    ///
631    /// Note that [`ClockExt::time()`][crate::prelude::ClockExt::time()] always returns increasing values so when you
632    /// move the clock backwards, [`ClockExt::time()`][crate::prelude::ClockExt::time()] will report the previous value
633    /// until the clock catches up.
634    /// ## `internal`
635    /// a reference internal time
636    /// ## `external`
637    /// a reference external time
638    /// ## `rate_num`
639    /// the numerator of the rate of the clock relative to its
640    ///  internal time
641    /// ## `rate_denom`
642    /// the denominator of the rate of the clock
643    #[doc(alias = "gst_clock_set_calibration")]
644    fn set_calibration(
645        &self,
646        internal: ClockTime,
647        external: ClockTime,
648        rate_num: u64,
649        rate_denom: u64,
650    ) {
651        unsafe {
652            ffi::gst_clock_set_calibration(
653                self.as_ref().to_glib_none().0,
654                internal.into_glib(),
655                external.into_glib(),
656                rate_num,
657                rate_denom,
658            );
659        }
660    }
661}
662
663impl<O: IsA<Clock>> ClockExtManual for O {}
664
665#[cfg(test)]
666mod tests {
667    use std::sync::mpsc::channel;
668
669    use super::*;
670    use crate::SystemClock;
671
672    #[test]
673    fn test_wait() {
674        crate::init().unwrap();
675
676        let clock = SystemClock::obtain();
677        let now = clock.time();
678        let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
679        let (res, _) = id.wait();
680
681        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
682    }
683
684    #[test]
685    fn test_wait_async() {
686        crate::init().unwrap();
687
688        let (sender, receiver) = channel();
689
690        let clock = SystemClock::obtain();
691        let now = clock.time();
692        let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
693        let res = id.wait_async(move |_, _, _| {
694            sender.send(()).unwrap();
695        });
696
697        assert!(res == Ok(ClockSuccess::Ok));
698
699        assert_eq!(receiver.recv(), Ok(()));
700    }
701
702    #[test]
703    fn test_wait_periodic() {
704        crate::init().unwrap();
705
706        let clock = SystemClock::obtain();
707        let now = clock.time();
708        let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
709
710        let (res, _) = id.wait();
711        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
712
713        let (res, _) = id.wait();
714        assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
715    }
716
717    #[test]
718    fn test_wait_async_periodic() {
719        crate::init().unwrap();
720
721        let (sender, receiver) = channel();
722
723        let clock = SystemClock::obtain();
724        let now = clock.time();
725        let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
726        let res = id.wait_async(move |_, _, _| {
727            let _ = sender.send(());
728        });
729
730        assert!(res == Ok(ClockSuccess::Ok));
731
732        assert_eq!(receiver.recv(), Ok(()));
733        assert_eq!(receiver.recv(), Ok(()));
734    }
735}