1use std::{fmt, io, marker::PhantomData, mem, ptr};
4
5use crate::{
6 Buffer, BufferRef,
7 buffer::{Readable, Writable},
8 ffi,
9};
10
11pub struct BufferCursor<T> {
12 buffer: Option<Buffer>,
13 size: u64,
14 num_mem: usize,
15 cur_mem_idx: usize,
16 cur_offset: u64,
17 cur_mem_offset: usize,
18 map_info: ffi::GstMapInfo,
19 phantom: PhantomData<T>,
20}
21
22pub struct BufferRefCursor<T> {
23 buffer: T,
24 size: u64,
25 num_mem: usize,
26 cur_mem_idx: usize,
27 cur_offset: u64,
28 cur_mem_offset: usize,
29 map_info: ffi::GstMapInfo,
30}
31
32macro_rules! define_seek_impl(
33 ($get_buffer_ref:expr) => {
34 fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
35 match pos {
36 io::SeekFrom::Start(off) => {
37 self.cur_offset = std::cmp::min(self.size, off);
38 }
39 io::SeekFrom::End(off) if off <= 0 => {
40 self.cur_offset = self.size;
41 }
42 io::SeekFrom::End(off) => {
43 self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| {
44 io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer")
45 })?;
46 }
47 io::SeekFrom::Current(std::i64::MIN) => {
48 return Err(io::Error::new(
49 io::ErrorKind::InvalidInput,
50 "Seek before start of buffer",
51 ));
52 }
53 io::SeekFrom::Current(off) => {
54 if off <= 0 {
55 self.cur_offset =
56 self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| {
57 io::Error::new(
58 io::ErrorKind::InvalidInput,
59 "Seek before start of buffer",
60 )
61 })?;
62 } else {
63 self.cur_offset = std::cmp::min(
64 self.size,
65 self.cur_offset.checked_add(off as u64).unwrap_or(self.size),
66 );
67 }
68 }
69 }
70
71 let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
73 let (range, skip) = buffer_ref(self)
74 .find_memory(self.cur_offset as usize..)
75 .expect("Failed to find memory");
76
77 if range.start != self.cur_mem_idx && !self.map_info.memory.is_null() {
78 unsafe {
79 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
80 self.map_info.memory = ptr::null_mut();
81 }
82 }
83
84 self.cur_mem_idx = range.start;
85 self.cur_mem_offset = skip;
86
87 Ok(self.cur_offset)
88 }
89
90 }
99);
100
101macro_rules! define_read_write_fn_impl(
102 ($self:ident, $data:ident, $data_type:ty, $get_buffer_ref:expr, $map_flags:path, $copy:expr, $split:expr) => {
103 #[allow(clippy::redundant_closure_call)]
104 {
105 let mut copied = 0;
106
107 while !$data.is_empty() && $self.cur_mem_idx < $self.num_mem {
108 if $self.map_info.memory.is_null() {
111 unsafe {
112 let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
114 let memory = ffi::gst_buffer_peek_memory(
115 buffer_ref($self).as_mut_ptr(),
116 $self.cur_mem_idx as u32,
117 );
118 debug_assert!(!memory.is_null());
119
120 if ffi::gst_memory_map(memory, &mut $self.map_info, $map_flags)
121 == glib::ffi::GFALSE
122 {
123 return Err(io::Error::new(
124 io::ErrorKind::InvalidData,
125 "Failed to map memory readable",
126 ));
127 }
128 }
129
130 debug_assert!($self.cur_mem_offset < $self.map_info.size);
131 }
132
133 debug_assert!(!$self.map_info.memory.is_null());
134
135 let data_left = $self.map_info.size - $self.cur_mem_offset;
137 let to_copy = std::cmp::min($data.len(), data_left);
138 $copy(&$self.map_info, $self.cur_mem_offset, $data, to_copy);
139 copied += to_copy;
140 $self.cur_offset += to_copy as u64;
141 $self.cur_mem_offset += to_copy;
142 let split: fn($data_type, usize) -> $data_type = $split;
144 #[allow(clippy::redundant_closure_call)]
145 {
146 $data = split($data, to_copy);
147 }
148
149 if $self.cur_mem_offset == $self.map_info.size {
151 unsafe {
152 ffi::gst_memory_unmap($self.map_info.memory, &mut $self.map_info);
153 }
154 $self.map_info.memory = ptr::null_mut();
155 $self.cur_mem_idx += 1;
156 $self.cur_mem_offset = 0;
157 }
158 }
159
160 Ok(copied)
161 }}
162);
163
164macro_rules! define_read_impl(
165 ($get_buffer_ref:expr) => {
166 fn read(&mut self, mut data: &mut [u8]) -> Result<usize, io::Error> {
167 define_read_write_fn_impl!(
168 self,
169 data,
170 &mut [u8],
171 $get_buffer_ref,
172 ffi::GST_MAP_READ,
173 |map_info: &ffi::GstMapInfo, off, data: &mut [u8], to_copy| unsafe {
174 ptr::copy_nonoverlapping(
175 (map_info.data as *const u8).add(off),
176 data.as_mut_ptr(),
177 to_copy,
178 );
179 },
180 |data, to_copy| &mut data[to_copy..]
181 )
182 }
183 }
184);
185
186macro_rules! define_write_impl(
187 ($get_buffer_ref:expr) => {
188 fn write(&mut self, mut data: &[u8]) -> Result<usize, io::Error> {
189 define_read_write_fn_impl!(
190 self,
191 data,
192 &[u8],
193 $get_buffer_ref,
194 ffi::GST_MAP_WRITE,
195 |map_info: &ffi::GstMapInfo, off, data: &[u8], to_copy| unsafe {
196 ptr::copy_nonoverlapping(
197 data.as_ptr(),
198 (map_info.data as *mut u8).add(off),
199 to_copy,
200 );
201 },
202 |data, to_copy| &data[to_copy..]
203 )
204 }
205
206 fn flush(&mut self) -> Result<(), io::Error> {
207 if !self.map_info.memory.is_null() {
208 unsafe {
209 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
210 self.map_info.memory = ptr::null_mut();
211 }
212 }
213
214 Ok(())
215 }
216 }
217);
218
219impl<T> fmt::Debug for BufferCursor<T> {
220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221 f.debug_struct("BufferCursor")
222 .field("buffer", &self.buffer)
223 .field("size", &self.size)
224 .field("num_mem", &self.num_mem)
225 .field("cur_mem_idx", &self.cur_mem_idx)
226 .field("cur_offset", &self.cur_offset)
227 .field("cur_mem_offset", &self.cur_mem_offset)
228 .field("map_info", &self.map_info)
229 .finish()
230 }
231}
232
233impl<T> Drop for BufferCursor<T> {
234 fn drop(&mut self) {
235 if !self.map_info.memory.is_null() {
236 unsafe {
237 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
238 }
239 }
240 }
241}
242
243impl io::Read for BufferCursor<Readable> {
244 define_read_impl!(|s| s.buffer.as_ref().unwrap());
245}
246
247impl io::Write for BufferCursor<Writable> {
248 define_write_impl!(|s| s.buffer.as_ref().unwrap());
249}
250
251impl<T> io::Seek for BufferCursor<T> {
252 define_seek_impl!(|s| s.buffer.as_ref().unwrap());
253}
254
255impl<T> BufferCursor<T> {
256 pub fn stream_len(&mut self) -> Result<u64, io::Error> {
257 Ok(self.size)
258 }
259
260 pub fn stream_position(&mut self) -> Result<u64, io::Error> {
261 Ok(self.cur_offset)
262 }
263
264 #[doc(alias = "get_buffer")]
265 pub fn buffer(&self) -> &BufferRef {
266 self.buffer.as_ref().unwrap().as_ref()
267 }
268
269 pub fn into_buffer(mut self) -> Buffer {
270 self.buffer.take().unwrap()
271 }
272}
273
274impl BufferCursor<Readable> {
275 pub(crate) fn new_readable(buffer: Buffer) -> BufferCursor<Readable> {
276 skip_assert_initialized!();
277 let size = buffer.size() as u64;
278 let num_mem = buffer.n_memory();
279
280 BufferCursor {
281 buffer: Some(buffer),
282 size,
283 num_mem,
284 cur_mem_idx: 0,
285 cur_offset: 0,
286 cur_mem_offset: 0,
287 map_info: unsafe { mem::zeroed() },
288 phantom: PhantomData,
289 }
290 }
291
292 pub fn buffer_owned(&self) -> Buffer {
293 self.buffer.as_ref().unwrap().clone()
294 }
295}
296
297impl BufferCursor<Writable> {
298 pub(crate) fn new_writable(buffer: Buffer) -> Result<BufferCursor<Writable>, glib::BoolError> {
299 skip_assert_initialized!();
300 if !buffer.is_writable() || !buffer.is_all_memory_writable() {
301 return Err(glib::bool_error!("Not all memories are writable"));
302 }
303
304 let size = buffer.size() as u64;
305 let num_mem = buffer.n_memory();
306
307 Ok(BufferCursor {
308 buffer: Some(buffer),
309 size,
310 num_mem,
311 cur_mem_idx: 0,
312 cur_offset: 0,
313 cur_mem_offset: 0,
314 map_info: unsafe { mem::zeroed() },
315 phantom: PhantomData,
316 })
317 }
318}
319
320unsafe impl<T> Send for BufferCursor<T> {}
321unsafe impl<T> Sync for BufferCursor<T> {}
322
323impl<T: fmt::Debug> fmt::Debug for BufferRefCursor<T> {
324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325 f.debug_struct("BufferRefCursor")
326 .field("buffer", &self.buffer)
327 .field("size", &self.size)
328 .field("num_mem", &self.num_mem)
329 .field("cur_mem_idx", &self.cur_mem_idx)
330 .field("cur_offset", &self.cur_offset)
331 .field("cur_mem_offset", &self.cur_mem_offset)
332 .field("map_info", &self.map_info)
333 .finish()
334 }
335}
336
337impl<T> Drop for BufferRefCursor<T> {
338 fn drop(&mut self) {
339 if !self.map_info.memory.is_null() {
340 unsafe {
341 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
342 }
343 }
344 }
345}
346
347impl io::Read for BufferRefCursor<&BufferRef> {
348 define_read_impl!(|s| s.buffer);
349}
350
351impl io::Write for BufferRefCursor<&mut BufferRef> {
352 define_write_impl!(|s| s.buffer);
353}
354
355impl io::Seek for BufferRefCursor<&BufferRef> {
356 define_seek_impl!(|s| s.buffer);
357}
358
359impl io::Seek for BufferRefCursor<&mut BufferRef> {
360 define_seek_impl!(|s| s.buffer);
361}
362
363impl<T> BufferRefCursor<T> {
364 pub fn stream_len(&mut self) -> Result<u64, io::Error> {
365 Ok(self.size)
366 }
367
368 pub fn stream_position(&mut self) -> Result<u64, io::Error> {
369 Ok(self.cur_offset)
370 }
371}
372
373impl<'a> BufferRefCursor<&'a BufferRef> {
374 #[doc(alias = "get_buffer")]
375 pub fn buffer(&self) -> &BufferRef {
376 self.buffer
377 }
378
379 pub(crate) fn new_readable(buffer: &'a BufferRef) -> BufferRefCursor<&'a BufferRef> {
380 skip_assert_initialized!();
381 let size = buffer.size() as u64;
382 let num_mem = buffer.n_memory();
383
384 BufferRefCursor {
385 buffer,
386 size,
387 num_mem,
388 cur_mem_idx: 0,
389 cur_offset: 0,
390 cur_mem_offset: 0,
391 map_info: unsafe { mem::zeroed() },
392 }
393 }
394}
395
396impl<'a> BufferRefCursor<&'a mut BufferRef> {
397 #[doc(alias = "get_buffer")]
398 pub fn buffer(&self) -> &BufferRef {
399 self.buffer
400 }
401
402 pub(crate) fn new_writable(
403 buffer: &'a mut BufferRef,
404 ) -> Result<BufferRefCursor<&'a mut BufferRef>, glib::BoolError> {
405 skip_assert_initialized!();
406 if !buffer.is_all_memory_writable() {
407 return Err(glib::bool_error!("Not all memories are writable"));
408 }
409
410 let size = buffer.size() as u64;
411 let num_mem = buffer.n_memory();
412
413 Ok(BufferRefCursor {
414 buffer,
415 size,
416 num_mem,
417 cur_mem_idx: 0,
418 cur_offset: 0,
419 cur_mem_offset: 0,
420 map_info: unsafe { mem::zeroed() },
421 })
422 }
423}
424
425unsafe impl<T> Send for BufferRefCursor<T> {}
426unsafe impl<T> Sync for BufferRefCursor<T> {}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 #[allow(clippy::cognitive_complexity)]
434 fn test_buffer_cursor() {
435 use std::io::{self, Read, Seek, Write};
436
437 crate::init().unwrap();
438
439 let mut buffer = Buffer::new();
440 {
441 let buffer = buffer.get_mut().unwrap();
442 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
443 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
444 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
445 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
446 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
447 }
448
449 assert!(buffer.is_all_memory_writable());
450 assert_eq!(buffer.n_memory(), 5);
451 assert_eq!(buffer.size(), 30);
452
453 let mut cursor = buffer.into_cursor_writable().unwrap();
454 assert_eq!(cursor.stream_position().unwrap(), 0);
455 cursor.write_all(b"01234567").unwrap();
456 assert_eq!(cursor.stream_position().unwrap(), 8);
457 cursor.write_all(b"890123").unwrap();
458 assert_eq!(cursor.stream_position().unwrap(), 14);
459 cursor.write_all(b"456").unwrap();
460 assert_eq!(cursor.stream_position().unwrap(), 17);
461 cursor.write_all(b"78901234567").unwrap();
462 assert_eq!(cursor.stream_position().unwrap(), 28);
463 cursor.write_all(b"89").unwrap();
464 assert_eq!(cursor.stream_position().unwrap(), 30);
465 assert!(cursor.write_all(b"0").is_err());
466
467 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
468 assert_eq!(cursor.stream_position().unwrap(), 5);
469 cursor.write_all(b"A").unwrap();
470
471 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
472 assert_eq!(cursor.stream_position().unwrap(), 25);
473 cursor.write_all(b"B").unwrap();
474
475 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
476 assert_eq!(cursor.stream_position().unwrap(), 25);
477 cursor.write_all(b"C").unwrap();
478
479 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
480 assert_eq!(cursor.stream_position().unwrap(), 27);
481 cursor.write_all(b"D").unwrap();
482
483 let buffer = cursor.into_buffer();
484
485 let mut cursor = buffer.into_cursor_readable();
486 let mut data = [0; 30];
487
488 assert_eq!(cursor.stream_position().unwrap(), 0);
489 cursor.read_exact(&mut data[0..7]).unwrap();
490 assert_eq!(cursor.stream_position().unwrap(), 7);
491 assert_eq!(&data[0..7], b"01234A6");
492 cursor.read_exact(&mut data[0..5]).unwrap();
493 assert_eq!(cursor.stream_position().unwrap(), 12);
494 assert_eq!(&data[0..5], b"78901");
495 cursor.read_exact(&mut data[0..10]).unwrap();
496 assert_eq!(cursor.stream_position().unwrap(), 22);
497 assert_eq!(&data[0..10], b"2345678901");
498 cursor.read_exact(&mut data[0..8]).unwrap();
499 assert_eq!(cursor.stream_position().unwrap(), 30);
500 assert_eq!(&data[0..8], b"234C6D89");
501 assert!(cursor.read_exact(&mut data[0..1]).is_err());
502
503 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
504 assert_eq!(cursor.stream_position().unwrap(), 5);
505 cursor.read_exact(&mut data[0..1]).unwrap();
506 assert_eq!(&data[0..1], b"A");
507
508 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
509 assert_eq!(cursor.stream_position().unwrap(), 25);
510 cursor.read_exact(&mut data[0..1]).unwrap();
511 assert_eq!(&data[0..1], b"C");
512
513 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
514 assert_eq!(cursor.stream_position().unwrap(), 25);
515 cursor.read_exact(&mut data[0..1]).unwrap();
516 assert_eq!(&data[0..1], b"C");
517
518 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
519 assert_eq!(cursor.stream_position().unwrap(), 27);
520 cursor.read_exact(&mut data[0..1]).unwrap();
521 assert_eq!(&data[0..1], b"D");
522 }
523
524 #[test]
525 #[allow(clippy::cognitive_complexity)]
526 fn test_buffer_cursor_ref() {
527 use std::io::{self, Read, Seek, Write};
528
529 crate::init().unwrap();
530
531 let mut buffer = Buffer::new();
532 {
533 let buffer = buffer.get_mut().unwrap();
534 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
535 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
536 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
537 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
538 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
539 }
540
541 assert!(buffer.is_all_memory_writable());
542 assert_eq!(buffer.n_memory(), 5);
543 assert_eq!(buffer.size(), 30);
544
545 {
546 let buffer = buffer.get_mut().unwrap();
547
548 let mut cursor = buffer.as_cursor_writable().unwrap();
549 assert_eq!(cursor.stream_position().unwrap(), 0);
550 cursor.write_all(b"01234567").unwrap();
551 assert_eq!(cursor.stream_position().unwrap(), 8);
552 cursor.write_all(b"890123").unwrap();
553 assert_eq!(cursor.stream_position().unwrap(), 14);
554 cursor.write_all(b"456").unwrap();
555 assert_eq!(cursor.stream_position().unwrap(), 17);
556 cursor.write_all(b"78901234567").unwrap();
557 assert_eq!(cursor.stream_position().unwrap(), 28);
558 cursor.write_all(b"89").unwrap();
559 assert_eq!(cursor.stream_position().unwrap(), 30);
560 assert!(cursor.write_all(b"0").is_err());
561
562 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
563 assert_eq!(cursor.stream_position().unwrap(), 5);
564 cursor.write_all(b"A").unwrap();
565
566 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
567 assert_eq!(cursor.stream_position().unwrap(), 25);
568 cursor.write_all(b"B").unwrap();
569
570 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
571 assert_eq!(cursor.stream_position().unwrap(), 25);
572 cursor.write_all(b"C").unwrap();
573
574 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
575 assert_eq!(cursor.stream_position().unwrap(), 27);
576 cursor.write_all(b"D").unwrap();
577 }
578
579 let mut cursor = buffer.as_cursor_readable();
580 let mut data = [0; 30];
581
582 assert_eq!(cursor.stream_position().unwrap(), 0);
583 cursor.read_exact(&mut data[0..7]).unwrap();
584 assert_eq!(cursor.stream_position().unwrap(), 7);
585 assert_eq!(&data[0..7], b"01234A6");
586 cursor.read_exact(&mut data[0..5]).unwrap();
587 assert_eq!(cursor.stream_position().unwrap(), 12);
588 assert_eq!(&data[0..5], b"78901");
589 cursor.read_exact(&mut data[0..10]).unwrap();
590 assert_eq!(cursor.stream_position().unwrap(), 22);
591 assert_eq!(&data[0..10], b"2345678901");
592 cursor.read_exact(&mut data[0..8]).unwrap();
593 assert_eq!(cursor.stream_position().unwrap(), 30);
594 assert_eq!(&data[0..8], b"234C6D89");
595 assert!(cursor.read_exact(&mut data[0..1]).is_err());
596
597 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
598 assert_eq!(cursor.stream_position().unwrap(), 5);
599 cursor.read_exact(&mut data[0..1]).unwrap();
600 assert_eq!(&data[0..1], b"A");
601
602 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
603 assert_eq!(cursor.stream_position().unwrap(), 25);
604 cursor.read_exact(&mut data[0..1]).unwrap();
605 assert_eq!(&data[0..1], b"C");
606
607 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
608 assert_eq!(cursor.stream_position().unwrap(), 25);
609 cursor.read_exact(&mut data[0..1]).unwrap();
610 assert_eq!(&data[0..1], b"C");
611
612 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
613 assert_eq!(cursor.stream_position().unwrap(), 27);
614 cursor.read_exact(&mut data[0..1]).unwrap();
615 assert_eq!(&data[0..1], b"D");
616 }
617}