gstreamer/
buffer_cursor.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, io, marker::PhantomData, mem, ptr};
4
5use crate::{
6    buffer::{Readable, Writable},
7    ffi, Buffer, BufferRef,
8};
9
10pub struct BufferCursor<T> {
11    buffer: Option<Buffer>,
12    size: u64,
13    num_mem: usize,
14    cur_mem_idx: usize,
15    cur_offset: u64,
16    cur_mem_offset: usize,
17    map_info: ffi::GstMapInfo,
18    phantom: PhantomData<T>,
19}
20
21pub struct BufferRefCursor<T> {
22    buffer: T,
23    size: u64,
24    num_mem: usize,
25    cur_mem_idx: usize,
26    cur_offset: u64,
27    cur_mem_offset: usize,
28    map_info: ffi::GstMapInfo,
29}
30
31macro_rules! define_seek_impl(
32    ($get_buffer_ref:expr) => {
33        fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
34            match pos {
35                io::SeekFrom::Start(off) => {
36                    self.cur_offset = std::cmp::min(self.size, off);
37                }
38                io::SeekFrom::End(off) if off <= 0 => {
39                    self.cur_offset = self.size;
40                }
41                io::SeekFrom::End(off) => {
42                    self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| {
43                        io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer")
44                    })?;
45                }
46                io::SeekFrom::Current(std::i64::MIN) => {
47                    return Err(io::Error::new(
48                        io::ErrorKind::InvalidInput,
49                        "Seek before start of buffer",
50                    ));
51                }
52                io::SeekFrom::Current(off) => {
53                    if off <= 0 {
54                        self.cur_offset =
55                            self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| {
56                                io::Error::new(
57                                    io::ErrorKind::InvalidInput,
58                                    "Seek before start of buffer",
59                                )
60                            })?;
61                    } else {
62                        self.cur_offset = std::cmp::min(
63                            self.size,
64                            self.cur_offset.checked_add(off as u64).unwrap_or(self.size),
65                        );
66                    }
67                }
68            }
69
70            // Work around lifetime annotation issues with closures
71            let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
72            let (range, skip) = buffer_ref(self)
73                .find_memory(self.cur_offset as usize..)
74                .expect("Failed to find memory");
75
76            if range.start != self.cur_mem_idx && !self.map_info.memory.is_null() {
77                unsafe {
78                    ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
79                    self.map_info.memory = ptr::null_mut();
80                }
81            }
82
83            self.cur_mem_idx = range.start;
84            self.cur_mem_offset = skip;
85
86            Ok(self.cur_offset)
87        }
88
89        // Once stabilized
90        //    fn stream_len(&mut self) -> Result<u64, io::Error> {
91        //        Ok(self.size)
92        //    }
93        //
94        //    fn stream_position(&mut self) -> Result<u64, io::Error> {
95        //        Ok(self.current_offset)
96        //    }
97    }
98);
99
100macro_rules! define_read_write_fn_impl(
101    ($self:ident, $data:ident, $data_type:ty, $get_buffer_ref:expr, $map_flags:path, $copy:expr, $split:expr) => {
102        #[allow(clippy::redundant_closure_call)]
103        {
104        let mut copied = 0;
105
106        while !$data.is_empty() && $self.cur_mem_idx < $self.num_mem {
107            // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be
108            // set correctly here already (from constructor, seek and the bottom of the loop)
109            if $self.map_info.memory.is_null() {
110                unsafe {
111                    // Work around lifetime annotation issues with closures
112                    let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
113                    let memory = ffi::gst_buffer_peek_memory(
114                        buffer_ref($self).as_mut_ptr(),
115                        $self.cur_mem_idx as u32,
116                    );
117                    debug_assert!(!memory.is_null());
118
119                    if ffi::gst_memory_map(memory, &mut $self.map_info, $map_flags)
120                        == glib::ffi::GFALSE
121                    {
122                        return Err(io::Error::new(
123                            io::ErrorKind::InvalidData,
124                            "Failed to map memory readable",
125                        ));
126                    }
127                }
128
129                debug_assert!($self.cur_mem_offset < $self.map_info.size);
130            }
131
132            debug_assert!(!$self.map_info.memory.is_null());
133
134            // Copy all data we can currently copy
135            let data_left = $self.map_info.size - $self.cur_mem_offset;
136            let to_copy = std::cmp::min($data.len(), data_left);
137            $copy(&$self.map_info, $self.cur_mem_offset, $data, to_copy);
138            copied += to_copy;
139            $self.cur_offset += to_copy as u64;
140            $self.cur_mem_offset += to_copy;
141            // Work around lifetime annotation issues with closures
142            let split: fn($data_type, usize) -> $data_type = $split;
143            #[allow(clippy::redundant_closure_call)]
144            {
145                $data = split($data, to_copy);
146            }
147
148            // If we're at the end of the current memory, unmap and advance to the next memory
149            if $self.cur_mem_offset == $self.map_info.size {
150                unsafe {
151                    ffi::gst_memory_unmap($self.map_info.memory, &mut $self.map_info);
152                }
153                $self.map_info.memory = ptr::null_mut();
154                $self.cur_mem_idx += 1;
155                $self.cur_mem_offset = 0;
156            }
157        }
158
159        Ok(copied)
160    }}
161);
162
163macro_rules! define_read_impl(
164    ($get_buffer_ref:expr) => {
165        fn read(&mut self, mut data: &mut [u8]) -> Result<usize, io::Error> {
166            define_read_write_fn_impl!(
167                self,
168                data,
169                &mut [u8],
170                $get_buffer_ref,
171                ffi::GST_MAP_READ,
172                |map_info: &ffi::GstMapInfo, off, data: &mut [u8], to_copy| unsafe {
173                    ptr::copy_nonoverlapping(
174                        (map_info.data as *const u8).add(off),
175                        data.as_mut_ptr(),
176                        to_copy,
177                    );
178                },
179                |data, to_copy| &mut data[to_copy..]
180            )
181        }
182    }
183);
184
185macro_rules! define_write_impl(
186    ($get_buffer_ref:expr) => {
187        fn write(&mut self, mut data: &[u8]) -> Result<usize, io::Error> {
188            define_read_write_fn_impl!(
189                self,
190                data,
191                &[u8],
192                $get_buffer_ref,
193                ffi::GST_MAP_WRITE,
194                |map_info: &ffi::GstMapInfo, off, data: &[u8], to_copy| unsafe {
195                    ptr::copy_nonoverlapping(
196                        data.as_ptr(),
197                        (map_info.data as *mut u8).add(off),
198                        to_copy,
199                    );
200                },
201                |data, to_copy| &data[to_copy..]
202            )
203        }
204
205        fn flush(&mut self) -> Result<(), io::Error> {
206            if !self.map_info.memory.is_null() {
207                unsafe {
208                    ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
209                    self.map_info.memory = ptr::null_mut();
210                }
211            }
212
213            Ok(())
214        }
215    }
216);
217
218impl<T> fmt::Debug for BufferCursor<T> {
219    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
220        f.debug_struct("BufferCursor")
221            .field("buffer", &self.buffer)
222            .field("size", &self.size)
223            .field("num_mem", &self.num_mem)
224            .field("cur_mem_idx", &self.cur_mem_idx)
225            .field("cur_offset", &self.cur_offset)
226            .field("cur_mem_offset", &self.cur_mem_offset)
227            .field("map_info", &self.map_info)
228            .finish()
229    }
230}
231
232impl<T> Drop for BufferCursor<T> {
233    fn drop(&mut self) {
234        if !self.map_info.memory.is_null() {
235            unsafe {
236                ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
237            }
238        }
239    }
240}
241
242impl io::Read for BufferCursor<Readable> {
243    define_read_impl!(|s| s.buffer.as_ref().unwrap());
244}
245
246impl io::Write for BufferCursor<Writable> {
247    define_write_impl!(|s| s.buffer.as_ref().unwrap());
248}
249
250impl<T> io::Seek for BufferCursor<T> {
251    define_seek_impl!(|s| s.buffer.as_ref().unwrap());
252}
253
254impl<T> BufferCursor<T> {
255    pub fn stream_len(&mut self) -> Result<u64, io::Error> {
256        Ok(self.size)
257    }
258
259    pub fn stream_position(&mut self) -> Result<u64, io::Error> {
260        Ok(self.cur_offset)
261    }
262
263    #[doc(alias = "get_buffer")]
264    pub fn buffer(&self) -> &BufferRef {
265        self.buffer.as_ref().unwrap().as_ref()
266    }
267
268    pub fn into_buffer(mut self) -> Buffer {
269        self.buffer.take().unwrap()
270    }
271}
272
273impl BufferCursor<Readable> {
274    pub(crate) fn new_readable(buffer: Buffer) -> BufferCursor<Readable> {
275        skip_assert_initialized!();
276        let size = buffer.size() as u64;
277        let num_mem = buffer.n_memory();
278
279        BufferCursor {
280            buffer: Some(buffer),
281            size,
282            num_mem,
283            cur_mem_idx: 0,
284            cur_offset: 0,
285            cur_mem_offset: 0,
286            map_info: unsafe { mem::zeroed() },
287            phantom: PhantomData,
288        }
289    }
290
291    pub fn buffer_owned(&self) -> Buffer {
292        self.buffer.as_ref().unwrap().clone()
293    }
294}
295
296impl BufferCursor<Writable> {
297    pub(crate) fn new_writable(buffer: Buffer) -> Result<BufferCursor<Writable>, glib::BoolError> {
298        skip_assert_initialized!();
299        if !buffer.is_writable() || !buffer.is_all_memory_writable() {
300            return Err(glib::bool_error!("Not all memories are writable"));
301        }
302
303        let size = buffer.size() as u64;
304        let num_mem = buffer.n_memory();
305
306        Ok(BufferCursor {
307            buffer: Some(buffer),
308            size,
309            num_mem,
310            cur_mem_idx: 0,
311            cur_offset: 0,
312            cur_mem_offset: 0,
313            map_info: unsafe { mem::zeroed() },
314            phantom: PhantomData,
315        })
316    }
317}
318
319unsafe impl<T> Send for BufferCursor<T> {}
320unsafe impl<T> Sync for BufferCursor<T> {}
321
322impl<T: fmt::Debug> fmt::Debug for BufferRefCursor<T> {
323    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
324        f.debug_struct("BufferRefCursor")
325            .field("buffer", &self.buffer)
326            .field("size", &self.size)
327            .field("num_mem", &self.num_mem)
328            .field("cur_mem_idx", &self.cur_mem_idx)
329            .field("cur_offset", &self.cur_offset)
330            .field("cur_mem_offset", &self.cur_mem_offset)
331            .field("map_info", &self.map_info)
332            .finish()
333    }
334}
335
336impl<T> Drop for BufferRefCursor<T> {
337    fn drop(&mut self) {
338        if !self.map_info.memory.is_null() {
339            unsafe {
340                ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
341            }
342        }
343    }
344}
345
346impl io::Read for BufferRefCursor<&BufferRef> {
347    define_read_impl!(|s| s.buffer);
348}
349
350impl io::Write for BufferRefCursor<&mut BufferRef> {
351    define_write_impl!(|s| s.buffer);
352}
353
354impl io::Seek for BufferRefCursor<&BufferRef> {
355    define_seek_impl!(|s| s.buffer);
356}
357
358impl io::Seek for BufferRefCursor<&mut BufferRef> {
359    define_seek_impl!(|s| s.buffer);
360}
361
362impl<T> BufferRefCursor<T> {
363    pub fn stream_len(&mut self) -> Result<u64, io::Error> {
364        Ok(self.size)
365    }
366
367    pub fn stream_position(&mut self) -> Result<u64, io::Error> {
368        Ok(self.cur_offset)
369    }
370}
371
372impl<'a> BufferRefCursor<&'a BufferRef> {
373    #[doc(alias = "get_buffer")]
374    pub fn buffer(&self) -> &BufferRef {
375        self.buffer
376    }
377
378    pub(crate) fn new_readable(buffer: &'a BufferRef) -> BufferRefCursor<&'a BufferRef> {
379        skip_assert_initialized!();
380        let size = buffer.size() as u64;
381        let num_mem = buffer.n_memory();
382
383        BufferRefCursor {
384            buffer,
385            size,
386            num_mem,
387            cur_mem_idx: 0,
388            cur_offset: 0,
389            cur_mem_offset: 0,
390            map_info: unsafe { mem::zeroed() },
391        }
392    }
393}
394
395impl<'a> BufferRefCursor<&'a mut BufferRef> {
396    #[doc(alias = "get_buffer")]
397    pub fn buffer(&self) -> &BufferRef {
398        self.buffer
399    }
400
401    pub(crate) fn new_writable(
402        buffer: &'a mut BufferRef,
403    ) -> Result<BufferRefCursor<&'a mut BufferRef>, glib::BoolError> {
404        skip_assert_initialized!();
405        if !buffer.is_all_memory_writable() {
406            return Err(glib::bool_error!("Not all memories are writable"));
407        }
408
409        let size = buffer.size() as u64;
410        let num_mem = buffer.n_memory();
411
412        Ok(BufferRefCursor {
413            buffer,
414            size,
415            num_mem,
416            cur_mem_idx: 0,
417            cur_offset: 0,
418            cur_mem_offset: 0,
419            map_info: unsafe { mem::zeroed() },
420        })
421    }
422}
423
424unsafe impl<T> Send for BufferRefCursor<T> {}
425unsafe impl<T> Sync for BufferRefCursor<T> {}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    #[allow(clippy::cognitive_complexity)]
433    fn test_buffer_cursor() {
434        use std::io::{self, Read, Seek, Write};
435
436        crate::init().unwrap();
437
438        let mut buffer = Buffer::new();
439        {
440            let buffer = buffer.get_mut().unwrap();
441            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
442            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
443            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
444            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
445            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
446        }
447
448        assert!(buffer.is_all_memory_writable());
449        assert_eq!(buffer.n_memory(), 5);
450        assert_eq!(buffer.size(), 30);
451
452        let mut cursor = buffer.into_cursor_writable().unwrap();
453        assert_eq!(cursor.stream_position().unwrap(), 0);
454        cursor.write_all(b"01234567").unwrap();
455        assert_eq!(cursor.stream_position().unwrap(), 8);
456        cursor.write_all(b"890123").unwrap();
457        assert_eq!(cursor.stream_position().unwrap(), 14);
458        cursor.write_all(b"456").unwrap();
459        assert_eq!(cursor.stream_position().unwrap(), 17);
460        cursor.write_all(b"78901234567").unwrap();
461        assert_eq!(cursor.stream_position().unwrap(), 28);
462        cursor.write_all(b"89").unwrap();
463        assert_eq!(cursor.stream_position().unwrap(), 30);
464        assert!(cursor.write_all(b"0").is_err());
465
466        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
467        assert_eq!(cursor.stream_position().unwrap(), 5);
468        cursor.write_all(b"A").unwrap();
469
470        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
471        assert_eq!(cursor.stream_position().unwrap(), 25);
472        cursor.write_all(b"B").unwrap();
473
474        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
475        assert_eq!(cursor.stream_position().unwrap(), 25);
476        cursor.write_all(b"C").unwrap();
477
478        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
479        assert_eq!(cursor.stream_position().unwrap(), 27);
480        cursor.write_all(b"D").unwrap();
481
482        let buffer = cursor.into_buffer();
483
484        let mut cursor = buffer.into_cursor_readable();
485        let mut data = [0; 30];
486
487        assert_eq!(cursor.stream_position().unwrap(), 0);
488        cursor.read_exact(&mut data[0..7]).unwrap();
489        assert_eq!(cursor.stream_position().unwrap(), 7);
490        assert_eq!(&data[0..7], b"01234A6");
491        cursor.read_exact(&mut data[0..5]).unwrap();
492        assert_eq!(cursor.stream_position().unwrap(), 12);
493        assert_eq!(&data[0..5], b"78901");
494        cursor.read_exact(&mut data[0..10]).unwrap();
495        assert_eq!(cursor.stream_position().unwrap(), 22);
496        assert_eq!(&data[0..10], b"2345678901");
497        cursor.read_exact(&mut data[0..8]).unwrap();
498        assert_eq!(cursor.stream_position().unwrap(), 30);
499        assert_eq!(&data[0..8], b"234C6D89");
500        assert!(cursor.read_exact(&mut data[0..1]).is_err());
501
502        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
503        assert_eq!(cursor.stream_position().unwrap(), 5);
504        cursor.read_exact(&mut data[0..1]).unwrap();
505        assert_eq!(&data[0..1], b"A");
506
507        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
508        assert_eq!(cursor.stream_position().unwrap(), 25);
509        cursor.read_exact(&mut data[0..1]).unwrap();
510        assert_eq!(&data[0..1], b"C");
511
512        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
513        assert_eq!(cursor.stream_position().unwrap(), 25);
514        cursor.read_exact(&mut data[0..1]).unwrap();
515        assert_eq!(&data[0..1], b"C");
516
517        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
518        assert_eq!(cursor.stream_position().unwrap(), 27);
519        cursor.read_exact(&mut data[0..1]).unwrap();
520        assert_eq!(&data[0..1], b"D");
521    }
522
523    #[test]
524    #[allow(clippy::cognitive_complexity)]
525    fn test_buffer_cursor_ref() {
526        use std::io::{self, Read, Seek, Write};
527
528        crate::init().unwrap();
529
530        let mut buffer = Buffer::new();
531        {
532            let buffer = buffer.get_mut().unwrap();
533            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
534            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
535            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
536            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
537            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
538        }
539
540        assert!(buffer.is_all_memory_writable());
541        assert_eq!(buffer.n_memory(), 5);
542        assert_eq!(buffer.size(), 30);
543
544        {
545            let buffer = buffer.get_mut().unwrap();
546
547            let mut cursor = buffer.as_cursor_writable().unwrap();
548            assert_eq!(cursor.stream_position().unwrap(), 0);
549            cursor.write_all(b"01234567").unwrap();
550            assert_eq!(cursor.stream_position().unwrap(), 8);
551            cursor.write_all(b"890123").unwrap();
552            assert_eq!(cursor.stream_position().unwrap(), 14);
553            cursor.write_all(b"456").unwrap();
554            assert_eq!(cursor.stream_position().unwrap(), 17);
555            cursor.write_all(b"78901234567").unwrap();
556            assert_eq!(cursor.stream_position().unwrap(), 28);
557            cursor.write_all(b"89").unwrap();
558            assert_eq!(cursor.stream_position().unwrap(), 30);
559            assert!(cursor.write_all(b"0").is_err());
560
561            assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
562            assert_eq!(cursor.stream_position().unwrap(), 5);
563            cursor.write_all(b"A").unwrap();
564
565            assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
566            assert_eq!(cursor.stream_position().unwrap(), 25);
567            cursor.write_all(b"B").unwrap();
568
569            assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
570            assert_eq!(cursor.stream_position().unwrap(), 25);
571            cursor.write_all(b"C").unwrap();
572
573            assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
574            assert_eq!(cursor.stream_position().unwrap(), 27);
575            cursor.write_all(b"D").unwrap();
576        }
577
578        let mut cursor = buffer.as_cursor_readable();
579        let mut data = [0; 30];
580
581        assert_eq!(cursor.stream_position().unwrap(), 0);
582        cursor.read_exact(&mut data[0..7]).unwrap();
583        assert_eq!(cursor.stream_position().unwrap(), 7);
584        assert_eq!(&data[0..7], b"01234A6");
585        cursor.read_exact(&mut data[0..5]).unwrap();
586        assert_eq!(cursor.stream_position().unwrap(), 12);
587        assert_eq!(&data[0..5], b"78901");
588        cursor.read_exact(&mut data[0..10]).unwrap();
589        assert_eq!(cursor.stream_position().unwrap(), 22);
590        assert_eq!(&data[0..10], b"2345678901");
591        cursor.read_exact(&mut data[0..8]).unwrap();
592        assert_eq!(cursor.stream_position().unwrap(), 30);
593        assert_eq!(&data[0..8], b"234C6D89");
594        assert!(cursor.read_exact(&mut data[0..1]).is_err());
595
596        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
597        assert_eq!(cursor.stream_position().unwrap(), 5);
598        cursor.read_exact(&mut data[0..1]).unwrap();
599        assert_eq!(&data[0..1], b"A");
600
601        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
602        assert_eq!(cursor.stream_position().unwrap(), 25);
603        cursor.read_exact(&mut data[0..1]).unwrap();
604        assert_eq!(&data[0..1], b"C");
605
606        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
607        assert_eq!(cursor.stream_position().unwrap(), 25);
608        cursor.read_exact(&mut data[0..1]).unwrap();
609        assert_eq!(&data[0..1], b"C");
610
611        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
612        assert_eq!(cursor.stream_position().unwrap(), 27);
613        cursor.read_exact(&mut data[0..1]).unwrap();
614        assert_eq!(&data[0..1], b"D");
615    }
616}