Skip to main content

gstreamer/
buffer_cursor.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, io, marker::PhantomData, mem, ptr};
4
5use crate::{
6    Buffer, BufferRef,
7    buffer::{Readable, Writable},
8    ffi,
9};
10
11pub struct BufferCursor<T> {
12    buffer: Option<Buffer>,
13    size: u64,
14    num_mem: usize,
15    cur_mem_idx: usize,
16    cur_offset: u64,
17    cur_mem_offset: usize,
18    map_info: ffi::GstMapInfo,
19    phantom: PhantomData<T>,
20}
21
22pub struct BufferRefCursor<T> {
23    buffer: T,
24    size: u64,
25    num_mem: usize,
26    cur_mem_idx: usize,
27    cur_offset: u64,
28    cur_mem_offset: usize,
29    map_info: ffi::GstMapInfo,
30}
31
32macro_rules! define_seek_impl(
33    ($get_buffer_ref:expr) => {
34        fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
35            match pos {
36                io::SeekFrom::Start(off) => {
37                    self.cur_offset = std::cmp::min(self.size, off);
38                }
39                io::SeekFrom::End(off) if off <= 0 => {
40                    self.cur_offset = self.size;
41                }
42                io::SeekFrom::End(off) => {
43                    self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| {
44                        io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer")
45                    })?;
46                }
47                io::SeekFrom::Current(std::i64::MIN) => {
48                    return Err(io::Error::new(
49                        io::ErrorKind::InvalidInput,
50                        "Seek before start of buffer",
51                    ));
52                }
53                io::SeekFrom::Current(off) => {
54                    if off <= 0 {
55                        self.cur_offset =
56                            self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| {
57                                io::Error::new(
58                                    io::ErrorKind::InvalidInput,
59                                    "Seek before start of buffer",
60                                )
61                            })?;
62                    } else {
63                        self.cur_offset = std::cmp::min(
64                            self.size,
65                            self.cur_offset.checked_add(off as u64).unwrap_or(self.size),
66                        );
67                    }
68                }
69            }
70
71            // Work around lifetime annotation issues with closures
72            let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
73            let (range, skip) = buffer_ref(self)
74                .find_memory(self.cur_offset as usize..)
75                .expect("Failed to find memory");
76
77            if range.start != self.cur_mem_idx && !self.map_info.memory.is_null() {
78                unsafe {
79                    ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
80                    self.map_info.memory = ptr::null_mut();
81                }
82            }
83
84            self.cur_mem_idx = range.start;
85            self.cur_mem_offset = skip;
86
87            Ok(self.cur_offset)
88        }
89
90        // Once stabilized
91        //    fn stream_len(&mut self) -> Result<u64, io::Error> {
92        //        Ok(self.size)
93        //    }
94        //
95        //    fn stream_position(&mut self) -> Result<u64, io::Error> {
96        //        Ok(self.current_offset)
97        //    }
98    }
99);
100
101macro_rules! define_read_write_fn_impl(
102    ($self:ident, $data:ident, $data_type:ty, $get_buffer_ref:expr, $map_flags:path, $copy:expr, $split:expr) => {
103        #[allow(clippy::redundant_closure_call)]
104        {
105        let mut copied = 0;
106
107        while !$data.is_empty() && $self.cur_mem_idx < $self.num_mem {
108            // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be
109            // set correctly here already (from constructor, seek and the bottom of the loop)
110            if $self.map_info.memory.is_null() {
111                unsafe {
112                    // Work around lifetime annotation issues with closures
113                    let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
114                    let memory = ffi::gst_buffer_peek_memory(
115                        buffer_ref($self).as_mut_ptr(),
116                        $self.cur_mem_idx as u32,
117                    );
118                    debug_assert!(!memory.is_null());
119
120                    if ffi::gst_memory_map(memory, &mut $self.map_info, $map_flags)
121                        == glib::ffi::GFALSE
122                    {
123                        return Err(io::Error::new(
124                            io::ErrorKind::InvalidData,
125                            "Failed to map memory readable",
126                        ));
127                    }
128                }
129
130                debug_assert!($self.cur_mem_offset < $self.map_info.size);
131            }
132
133            debug_assert!(!$self.map_info.memory.is_null());
134
135            // Copy all data we can currently copy
136            let data_left = $self.map_info.size - $self.cur_mem_offset;
137            let to_copy = std::cmp::min($data.len(), data_left);
138            $copy(&$self.map_info, $self.cur_mem_offset, $data, to_copy);
139            copied += to_copy;
140            $self.cur_offset += to_copy as u64;
141            $self.cur_mem_offset += to_copy;
142            // Work around lifetime annotation issues with closures
143            let split: fn($data_type, usize) -> $data_type = $split;
144            #[allow(clippy::redundant_closure_call)]
145            {
146                $data = split($data, to_copy);
147            }
148
149            // If we're at the end of the current memory, unmap and advance to the next memory
150            if $self.cur_mem_offset == $self.map_info.size {
151                unsafe {
152                    ffi::gst_memory_unmap($self.map_info.memory, &mut $self.map_info);
153                }
154                $self.map_info.memory = ptr::null_mut();
155                $self.cur_mem_idx += 1;
156                $self.cur_mem_offset = 0;
157            }
158        }
159
160        Ok(copied)
161    }}
162);
163
164macro_rules! define_read_impl(
165    ($get_buffer_ref:expr) => {
166        fn read(&mut self, mut data: &mut [u8]) -> Result<usize, io::Error> {
167            define_read_write_fn_impl!(
168                self,
169                data,
170                &mut [u8],
171                $get_buffer_ref,
172                ffi::GST_MAP_READ,
173                |map_info: &ffi::GstMapInfo, off, data: &mut [u8], to_copy| unsafe {
174                    ptr::copy_nonoverlapping(
175                        (map_info.data as *const u8).add(off),
176                        data.as_mut_ptr(),
177                        to_copy,
178                    );
179                },
180                |data, to_copy| &mut data[to_copy..]
181            )
182        }
183    }
184);
185
186macro_rules! define_write_impl(
187    ($get_buffer_ref:expr) => {
188        fn write(&mut self, mut data: &[u8]) -> Result<usize, io::Error> {
189            define_read_write_fn_impl!(
190                self,
191                data,
192                &[u8],
193                $get_buffer_ref,
194                ffi::GST_MAP_WRITE,
195                |map_info: &ffi::GstMapInfo, off, data: &[u8], to_copy| unsafe {
196                    ptr::copy_nonoverlapping(
197                        data.as_ptr(),
198                        (map_info.data as *mut u8).add(off),
199                        to_copy,
200                    );
201                },
202                |data, to_copy| &data[to_copy..]
203            )
204        }
205
206        fn flush(&mut self) -> Result<(), io::Error> {
207            if !self.map_info.memory.is_null() {
208                unsafe {
209                    ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
210                    self.map_info.memory = ptr::null_mut();
211                }
212            }
213
214            Ok(())
215        }
216    }
217);
218
219impl<T> fmt::Debug for BufferCursor<T> {
220    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221        f.debug_struct("BufferCursor")
222            .field("buffer", &self.buffer)
223            .field("size", &self.size)
224            .field("num_mem", &self.num_mem)
225            .field("cur_mem_idx", &self.cur_mem_idx)
226            .field("cur_offset", &self.cur_offset)
227            .field("cur_mem_offset", &self.cur_mem_offset)
228            .field("map_info", &self.map_info)
229            .finish()
230    }
231}
232
233impl<T> Drop for BufferCursor<T> {
234    fn drop(&mut self) {
235        if !self.map_info.memory.is_null() {
236            unsafe {
237                ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
238            }
239        }
240    }
241}
242
243impl io::Read for BufferCursor<Readable> {
244    define_read_impl!(|s| s.buffer.as_ref().unwrap());
245}
246
247impl io::Write for BufferCursor<Writable> {
248    define_write_impl!(|s| s.buffer.as_ref().unwrap());
249}
250
251impl<T> io::Seek for BufferCursor<T> {
252    define_seek_impl!(|s| s.buffer.as_ref().unwrap());
253}
254
255impl<T> BufferCursor<T> {
256    pub fn stream_len(&mut self) -> Result<u64, io::Error> {
257        Ok(self.size)
258    }
259
260    pub fn stream_position(&mut self) -> Result<u64, io::Error> {
261        Ok(self.cur_offset)
262    }
263
264    #[doc(alias = "get_buffer")]
265    pub fn buffer(&self) -> &BufferRef {
266        self.buffer.as_ref().unwrap().as_ref()
267    }
268
269    pub fn into_buffer(mut self) -> Buffer {
270        self.buffer.take().unwrap()
271    }
272}
273
274impl BufferCursor<Readable> {
275    pub(crate) fn new_readable(buffer: Buffer) -> BufferCursor<Readable> {
276        skip_assert_initialized!();
277        let size = buffer.size() as u64;
278        let num_mem = buffer.n_memory();
279
280        BufferCursor {
281            buffer: Some(buffer),
282            size,
283            num_mem,
284            cur_mem_idx: 0,
285            cur_offset: 0,
286            cur_mem_offset: 0,
287            map_info: unsafe { mem::zeroed() },
288            phantom: PhantomData,
289        }
290    }
291
292    pub fn buffer_owned(&self) -> Buffer {
293        self.buffer.as_ref().unwrap().clone()
294    }
295}
296
297impl BufferCursor<Writable> {
298    pub(crate) fn new_writable(buffer: Buffer) -> Result<BufferCursor<Writable>, glib::BoolError> {
299        skip_assert_initialized!();
300        if !buffer.is_writable() || !buffer.is_all_memory_writable() {
301            return Err(glib::bool_error!("Not all memories are writable"));
302        }
303
304        let size = buffer.size() as u64;
305        let num_mem = buffer.n_memory();
306
307        Ok(BufferCursor {
308            buffer: Some(buffer),
309            size,
310            num_mem,
311            cur_mem_idx: 0,
312            cur_offset: 0,
313            cur_mem_offset: 0,
314            map_info: unsafe { mem::zeroed() },
315            phantom: PhantomData,
316        })
317    }
318}
319
320unsafe impl<T> Send for BufferCursor<T> {}
321unsafe impl<T> Sync for BufferCursor<T> {}
322
323impl<T: fmt::Debug> fmt::Debug for BufferRefCursor<T> {
324    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325        f.debug_struct("BufferRefCursor")
326            .field("buffer", &self.buffer)
327            .field("size", &self.size)
328            .field("num_mem", &self.num_mem)
329            .field("cur_mem_idx", &self.cur_mem_idx)
330            .field("cur_offset", &self.cur_offset)
331            .field("cur_mem_offset", &self.cur_mem_offset)
332            .field("map_info", &self.map_info)
333            .finish()
334    }
335}
336
337impl<T> Drop for BufferRefCursor<T> {
338    fn drop(&mut self) {
339        if !self.map_info.memory.is_null() {
340            unsafe {
341                ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
342            }
343        }
344    }
345}
346
347impl io::Read for BufferRefCursor<&BufferRef> {
348    define_read_impl!(|s| s.buffer);
349}
350
351impl io::Write for BufferRefCursor<&mut BufferRef> {
352    define_write_impl!(|s| s.buffer);
353}
354
355impl io::Seek for BufferRefCursor<&BufferRef> {
356    define_seek_impl!(|s| s.buffer);
357}
358
359impl io::Seek for BufferRefCursor<&mut BufferRef> {
360    define_seek_impl!(|s| s.buffer);
361}
362
363impl<T> BufferRefCursor<T> {
364    pub fn stream_len(&mut self) -> Result<u64, io::Error> {
365        Ok(self.size)
366    }
367
368    pub fn stream_position(&mut self) -> Result<u64, io::Error> {
369        Ok(self.cur_offset)
370    }
371}
372
373impl<'a> BufferRefCursor<&'a BufferRef> {
374    #[doc(alias = "get_buffer")]
375    pub fn buffer(&self) -> &BufferRef {
376        self.buffer
377    }
378
379    pub(crate) fn new_readable(buffer: &'a BufferRef) -> BufferRefCursor<&'a BufferRef> {
380        skip_assert_initialized!();
381        let size = buffer.size() as u64;
382        let num_mem = buffer.n_memory();
383
384        BufferRefCursor {
385            buffer,
386            size,
387            num_mem,
388            cur_mem_idx: 0,
389            cur_offset: 0,
390            cur_mem_offset: 0,
391            map_info: unsafe { mem::zeroed() },
392        }
393    }
394}
395
396impl<'a> BufferRefCursor<&'a mut BufferRef> {
397    #[doc(alias = "get_buffer")]
398    pub fn buffer(&self) -> &BufferRef {
399        self.buffer
400    }
401
402    pub(crate) fn new_writable(
403        buffer: &'a mut BufferRef,
404    ) -> Result<BufferRefCursor<&'a mut BufferRef>, glib::BoolError> {
405        skip_assert_initialized!();
406        if !buffer.is_all_memory_writable() {
407            return Err(glib::bool_error!("Not all memories are writable"));
408        }
409
410        let size = buffer.size() as u64;
411        let num_mem = buffer.n_memory();
412
413        Ok(BufferRefCursor {
414            buffer,
415            size,
416            num_mem,
417            cur_mem_idx: 0,
418            cur_offset: 0,
419            cur_mem_offset: 0,
420            map_info: unsafe { mem::zeroed() },
421        })
422    }
423}
424
425unsafe impl<T> Send for BufferRefCursor<T> {}
426unsafe impl<T> Sync for BufferRefCursor<T> {}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    #[allow(clippy::cognitive_complexity)]
434    fn test_buffer_cursor() {
435        use std::io::{self, Read, Seek, Write};
436
437        crate::init().unwrap();
438
439        let mut buffer = Buffer::new();
440        {
441            let buffer = buffer.get_mut().unwrap();
442            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
443            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
444            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
445            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
446            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
447        }
448
449        assert!(buffer.is_all_memory_writable());
450        assert_eq!(buffer.n_memory(), 5);
451        assert_eq!(buffer.size(), 30);
452
453        let mut cursor = buffer.into_cursor_writable().unwrap();
454        assert_eq!(cursor.stream_position().unwrap(), 0);
455        cursor.write_all(b"01234567").unwrap();
456        assert_eq!(cursor.stream_position().unwrap(), 8);
457        cursor.write_all(b"890123").unwrap();
458        assert_eq!(cursor.stream_position().unwrap(), 14);
459        cursor.write_all(b"456").unwrap();
460        assert_eq!(cursor.stream_position().unwrap(), 17);
461        cursor.write_all(b"78901234567").unwrap();
462        assert_eq!(cursor.stream_position().unwrap(), 28);
463        cursor.write_all(b"89").unwrap();
464        assert_eq!(cursor.stream_position().unwrap(), 30);
465        assert!(cursor.write_all(b"0").is_err());
466
467        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
468        assert_eq!(cursor.stream_position().unwrap(), 5);
469        cursor.write_all(b"A").unwrap();
470
471        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
472        assert_eq!(cursor.stream_position().unwrap(), 25);
473        cursor.write_all(b"B").unwrap();
474
475        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
476        assert_eq!(cursor.stream_position().unwrap(), 25);
477        cursor.write_all(b"C").unwrap();
478
479        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
480        assert_eq!(cursor.stream_position().unwrap(), 27);
481        cursor.write_all(b"D").unwrap();
482
483        let buffer = cursor.into_buffer();
484
485        let mut cursor = buffer.into_cursor_readable();
486        let mut data = [0; 30];
487
488        assert_eq!(cursor.stream_position().unwrap(), 0);
489        cursor.read_exact(&mut data[0..7]).unwrap();
490        assert_eq!(cursor.stream_position().unwrap(), 7);
491        assert_eq!(&data[0..7], b"01234A6");
492        cursor.read_exact(&mut data[0..5]).unwrap();
493        assert_eq!(cursor.stream_position().unwrap(), 12);
494        assert_eq!(&data[0..5], b"78901");
495        cursor.read_exact(&mut data[0..10]).unwrap();
496        assert_eq!(cursor.stream_position().unwrap(), 22);
497        assert_eq!(&data[0..10], b"2345678901");
498        cursor.read_exact(&mut data[0..8]).unwrap();
499        assert_eq!(cursor.stream_position().unwrap(), 30);
500        assert_eq!(&data[0..8], b"234C6D89");
501        assert!(cursor.read_exact(&mut data[0..1]).is_err());
502
503        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
504        assert_eq!(cursor.stream_position().unwrap(), 5);
505        cursor.read_exact(&mut data[0..1]).unwrap();
506        assert_eq!(&data[0..1], b"A");
507
508        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
509        assert_eq!(cursor.stream_position().unwrap(), 25);
510        cursor.read_exact(&mut data[0..1]).unwrap();
511        assert_eq!(&data[0..1], b"C");
512
513        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
514        assert_eq!(cursor.stream_position().unwrap(), 25);
515        cursor.read_exact(&mut data[0..1]).unwrap();
516        assert_eq!(&data[0..1], b"C");
517
518        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
519        assert_eq!(cursor.stream_position().unwrap(), 27);
520        cursor.read_exact(&mut data[0..1]).unwrap();
521        assert_eq!(&data[0..1], b"D");
522    }
523
524    #[test]
525    #[allow(clippy::cognitive_complexity)]
526    fn test_buffer_cursor_ref() {
527        use std::io::{self, Read, Seek, Write};
528
529        crate::init().unwrap();
530
531        let mut buffer = Buffer::new();
532        {
533            let buffer = buffer.get_mut().unwrap();
534            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
535            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
536            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
537            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
538            buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
539        }
540
541        assert!(buffer.is_all_memory_writable());
542        assert_eq!(buffer.n_memory(), 5);
543        assert_eq!(buffer.size(), 30);
544
545        {
546            let buffer = buffer.get_mut().unwrap();
547
548            let mut cursor = buffer.as_cursor_writable().unwrap();
549            assert_eq!(cursor.stream_position().unwrap(), 0);
550            cursor.write_all(b"01234567").unwrap();
551            assert_eq!(cursor.stream_position().unwrap(), 8);
552            cursor.write_all(b"890123").unwrap();
553            assert_eq!(cursor.stream_position().unwrap(), 14);
554            cursor.write_all(b"456").unwrap();
555            assert_eq!(cursor.stream_position().unwrap(), 17);
556            cursor.write_all(b"78901234567").unwrap();
557            assert_eq!(cursor.stream_position().unwrap(), 28);
558            cursor.write_all(b"89").unwrap();
559            assert_eq!(cursor.stream_position().unwrap(), 30);
560            assert!(cursor.write_all(b"0").is_err());
561
562            assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
563            assert_eq!(cursor.stream_position().unwrap(), 5);
564            cursor.write_all(b"A").unwrap();
565
566            assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
567            assert_eq!(cursor.stream_position().unwrap(), 25);
568            cursor.write_all(b"B").unwrap();
569
570            assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
571            assert_eq!(cursor.stream_position().unwrap(), 25);
572            cursor.write_all(b"C").unwrap();
573
574            assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
575            assert_eq!(cursor.stream_position().unwrap(), 27);
576            cursor.write_all(b"D").unwrap();
577        }
578
579        let mut cursor = buffer.as_cursor_readable();
580        let mut data = [0; 30];
581
582        assert_eq!(cursor.stream_position().unwrap(), 0);
583        cursor.read_exact(&mut data[0..7]).unwrap();
584        assert_eq!(cursor.stream_position().unwrap(), 7);
585        assert_eq!(&data[0..7], b"01234A6");
586        cursor.read_exact(&mut data[0..5]).unwrap();
587        assert_eq!(cursor.stream_position().unwrap(), 12);
588        assert_eq!(&data[0..5], b"78901");
589        cursor.read_exact(&mut data[0..10]).unwrap();
590        assert_eq!(cursor.stream_position().unwrap(), 22);
591        assert_eq!(&data[0..10], b"2345678901");
592        cursor.read_exact(&mut data[0..8]).unwrap();
593        assert_eq!(cursor.stream_position().unwrap(), 30);
594        assert_eq!(&data[0..8], b"234C6D89");
595        assert!(cursor.read_exact(&mut data[0..1]).is_err());
596
597        assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
598        assert_eq!(cursor.stream_position().unwrap(), 5);
599        cursor.read_exact(&mut data[0..1]).unwrap();
600        assert_eq!(&data[0..1], b"A");
601
602        assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
603        assert_eq!(cursor.stream_position().unwrap(), 25);
604        cursor.read_exact(&mut data[0..1]).unwrap();
605        assert_eq!(&data[0..1], b"C");
606
607        assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
608        assert_eq!(cursor.stream_position().unwrap(), 25);
609        cursor.read_exact(&mut data[0..1]).unwrap();
610        assert_eq!(&data[0..1], b"C");
611
612        assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
613        assert_eq!(cursor.stream_position().unwrap(), 27);
614        cursor.read_exact(&mut data[0..1]).unwrap();
615        assert_eq!(&data[0..1], b"D");
616    }
617}