1use std::{fmt, io, marker::PhantomData, mem, ptr};
4
5use crate::{
6 buffer::{Readable, Writable},
7 ffi, Buffer, BufferRef,
8};
9
10pub struct BufferCursor<T> {
11 buffer: Option<Buffer>,
12 size: u64,
13 num_mem: usize,
14 cur_mem_idx: usize,
15 cur_offset: u64,
16 cur_mem_offset: usize,
17 map_info: ffi::GstMapInfo,
18 phantom: PhantomData<T>,
19}
20
21pub struct BufferRefCursor<T> {
22 buffer: T,
23 size: u64,
24 num_mem: usize,
25 cur_mem_idx: usize,
26 cur_offset: u64,
27 cur_mem_offset: usize,
28 map_info: ffi::GstMapInfo,
29}
30
31macro_rules! define_seek_impl(
32 ($get_buffer_ref:expr) => {
33 fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
34 match pos {
35 io::SeekFrom::Start(off) => {
36 self.cur_offset = std::cmp::min(self.size, off);
37 }
38 io::SeekFrom::End(off) if off <= 0 => {
39 self.cur_offset = self.size;
40 }
41 io::SeekFrom::End(off) => {
42 self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| {
43 io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer")
44 })?;
45 }
46 io::SeekFrom::Current(std::i64::MIN) => {
47 return Err(io::Error::new(
48 io::ErrorKind::InvalidInput,
49 "Seek before start of buffer",
50 ));
51 }
52 io::SeekFrom::Current(off) => {
53 if off <= 0 {
54 self.cur_offset =
55 self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| {
56 io::Error::new(
57 io::ErrorKind::InvalidInput,
58 "Seek before start of buffer",
59 )
60 })?;
61 } else {
62 self.cur_offset = std::cmp::min(
63 self.size,
64 self.cur_offset.checked_add(off as u64).unwrap_or(self.size),
65 );
66 }
67 }
68 }
69
70 let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
72 let (range, skip) = buffer_ref(self)
73 .find_memory(self.cur_offset as usize..)
74 .expect("Failed to find memory");
75
76 if range.start != self.cur_mem_idx && !self.map_info.memory.is_null() {
77 unsafe {
78 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
79 self.map_info.memory = ptr::null_mut();
80 }
81 }
82
83 self.cur_mem_idx = range.start;
84 self.cur_mem_offset = skip;
85
86 Ok(self.cur_offset)
87 }
88
89 }
98);
99
100macro_rules! define_read_write_fn_impl(
101 ($self:ident, $data:ident, $data_type:ty, $get_buffer_ref:expr, $map_flags:path, $copy:expr, $split:expr) => {
102 #[allow(clippy::redundant_closure_call)]
103 {
104 let mut copied = 0;
105
106 while !$data.is_empty() && $self.cur_mem_idx < $self.num_mem {
107 if $self.map_info.memory.is_null() {
110 unsafe {
111 let buffer_ref: fn(&Self) -> &BufferRef = $get_buffer_ref;
113 let memory = ffi::gst_buffer_peek_memory(
114 buffer_ref($self).as_mut_ptr(),
115 $self.cur_mem_idx as u32,
116 );
117 debug_assert!(!memory.is_null());
118
119 if ffi::gst_memory_map(memory, &mut $self.map_info, $map_flags)
120 == glib::ffi::GFALSE
121 {
122 return Err(io::Error::new(
123 io::ErrorKind::InvalidData,
124 "Failed to map memory readable",
125 ));
126 }
127 }
128
129 debug_assert!($self.cur_mem_offset < $self.map_info.size);
130 }
131
132 debug_assert!(!$self.map_info.memory.is_null());
133
134 let data_left = $self.map_info.size - $self.cur_mem_offset;
136 let to_copy = std::cmp::min($data.len(), data_left);
137 $copy(&$self.map_info, $self.cur_mem_offset, $data, to_copy);
138 copied += to_copy;
139 $self.cur_offset += to_copy as u64;
140 $self.cur_mem_offset += to_copy;
141 let split: fn($data_type, usize) -> $data_type = $split;
143 #[allow(clippy::redundant_closure_call)]
144 {
145 $data = split($data, to_copy);
146 }
147
148 if $self.cur_mem_offset == $self.map_info.size {
150 unsafe {
151 ffi::gst_memory_unmap($self.map_info.memory, &mut $self.map_info);
152 }
153 $self.map_info.memory = ptr::null_mut();
154 $self.cur_mem_idx += 1;
155 $self.cur_mem_offset = 0;
156 }
157 }
158
159 Ok(copied)
160 }}
161);
162
163macro_rules! define_read_impl(
164 ($get_buffer_ref:expr) => {
165 fn read(&mut self, mut data: &mut [u8]) -> Result<usize, io::Error> {
166 define_read_write_fn_impl!(
167 self,
168 data,
169 &mut [u8],
170 $get_buffer_ref,
171 ffi::GST_MAP_READ,
172 |map_info: &ffi::GstMapInfo, off, data: &mut [u8], to_copy| unsafe {
173 ptr::copy_nonoverlapping(
174 (map_info.data as *const u8).add(off),
175 data.as_mut_ptr(),
176 to_copy,
177 );
178 },
179 |data, to_copy| &mut data[to_copy..]
180 )
181 }
182 }
183);
184
185macro_rules! define_write_impl(
186 ($get_buffer_ref:expr) => {
187 fn write(&mut self, mut data: &[u8]) -> Result<usize, io::Error> {
188 define_read_write_fn_impl!(
189 self,
190 data,
191 &[u8],
192 $get_buffer_ref,
193 ffi::GST_MAP_WRITE,
194 |map_info: &ffi::GstMapInfo, off, data: &[u8], to_copy| unsafe {
195 ptr::copy_nonoverlapping(
196 data.as_ptr(),
197 (map_info.data as *mut u8).add(off),
198 to_copy,
199 );
200 },
201 |data, to_copy| &data[to_copy..]
202 )
203 }
204
205 fn flush(&mut self) -> Result<(), io::Error> {
206 if !self.map_info.memory.is_null() {
207 unsafe {
208 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
209 self.map_info.memory = ptr::null_mut();
210 }
211 }
212
213 Ok(())
214 }
215 }
216);
217
218impl<T> fmt::Debug for BufferCursor<T> {
219 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
220 f.debug_struct("BufferCursor")
221 .field("buffer", &self.buffer)
222 .field("size", &self.size)
223 .field("num_mem", &self.num_mem)
224 .field("cur_mem_idx", &self.cur_mem_idx)
225 .field("cur_offset", &self.cur_offset)
226 .field("cur_mem_offset", &self.cur_mem_offset)
227 .field("map_info", &self.map_info)
228 .finish()
229 }
230}
231
232impl<T> Drop for BufferCursor<T> {
233 fn drop(&mut self) {
234 if !self.map_info.memory.is_null() {
235 unsafe {
236 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
237 }
238 }
239 }
240}
241
242impl io::Read for BufferCursor<Readable> {
243 define_read_impl!(|s| s.buffer.as_ref().unwrap());
244}
245
246impl io::Write for BufferCursor<Writable> {
247 define_write_impl!(|s| s.buffer.as_ref().unwrap());
248}
249
250impl<T> io::Seek for BufferCursor<T> {
251 define_seek_impl!(|s| s.buffer.as_ref().unwrap());
252}
253
254impl<T> BufferCursor<T> {
255 pub fn stream_len(&mut self) -> Result<u64, io::Error> {
256 Ok(self.size)
257 }
258
259 pub fn stream_position(&mut self) -> Result<u64, io::Error> {
260 Ok(self.cur_offset)
261 }
262
263 #[doc(alias = "get_buffer")]
264 pub fn buffer(&self) -> &BufferRef {
265 self.buffer.as_ref().unwrap().as_ref()
266 }
267
268 pub fn into_buffer(mut self) -> Buffer {
269 self.buffer.take().unwrap()
270 }
271}
272
273impl BufferCursor<Readable> {
274 pub(crate) fn new_readable(buffer: Buffer) -> BufferCursor<Readable> {
275 skip_assert_initialized!();
276 let size = buffer.size() as u64;
277 let num_mem = buffer.n_memory();
278
279 BufferCursor {
280 buffer: Some(buffer),
281 size,
282 num_mem,
283 cur_mem_idx: 0,
284 cur_offset: 0,
285 cur_mem_offset: 0,
286 map_info: unsafe { mem::zeroed() },
287 phantom: PhantomData,
288 }
289 }
290
291 pub fn buffer_owned(&self) -> Buffer {
292 self.buffer.as_ref().unwrap().clone()
293 }
294}
295
296impl BufferCursor<Writable> {
297 pub(crate) fn new_writable(buffer: Buffer) -> Result<BufferCursor<Writable>, glib::BoolError> {
298 skip_assert_initialized!();
299 if !buffer.is_writable() || !buffer.is_all_memory_writable() {
300 return Err(glib::bool_error!("Not all memories are writable"));
301 }
302
303 let size = buffer.size() as u64;
304 let num_mem = buffer.n_memory();
305
306 Ok(BufferCursor {
307 buffer: Some(buffer),
308 size,
309 num_mem,
310 cur_mem_idx: 0,
311 cur_offset: 0,
312 cur_mem_offset: 0,
313 map_info: unsafe { mem::zeroed() },
314 phantom: PhantomData,
315 })
316 }
317}
318
319unsafe impl<T> Send for BufferCursor<T> {}
320unsafe impl<T> Sync for BufferCursor<T> {}
321
322impl<T: fmt::Debug> fmt::Debug for BufferRefCursor<T> {
323 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
324 f.debug_struct("BufferRefCursor")
325 .field("buffer", &self.buffer)
326 .field("size", &self.size)
327 .field("num_mem", &self.num_mem)
328 .field("cur_mem_idx", &self.cur_mem_idx)
329 .field("cur_offset", &self.cur_offset)
330 .field("cur_mem_offset", &self.cur_mem_offset)
331 .field("map_info", &self.map_info)
332 .finish()
333 }
334}
335
336impl<T> Drop for BufferRefCursor<T> {
337 fn drop(&mut self) {
338 if !self.map_info.memory.is_null() {
339 unsafe {
340 ffi::gst_memory_unmap(self.map_info.memory, &mut self.map_info);
341 }
342 }
343 }
344}
345
346impl io::Read for BufferRefCursor<&BufferRef> {
347 define_read_impl!(|s| s.buffer);
348}
349
350impl io::Write for BufferRefCursor<&mut BufferRef> {
351 define_write_impl!(|s| s.buffer);
352}
353
354impl io::Seek for BufferRefCursor<&BufferRef> {
355 define_seek_impl!(|s| s.buffer);
356}
357
358impl io::Seek for BufferRefCursor<&mut BufferRef> {
359 define_seek_impl!(|s| s.buffer);
360}
361
362impl<T> BufferRefCursor<T> {
363 pub fn stream_len(&mut self) -> Result<u64, io::Error> {
364 Ok(self.size)
365 }
366
367 pub fn stream_position(&mut self) -> Result<u64, io::Error> {
368 Ok(self.cur_offset)
369 }
370}
371
372impl<'a> BufferRefCursor<&'a BufferRef> {
373 #[doc(alias = "get_buffer")]
374 pub fn buffer(&self) -> &BufferRef {
375 self.buffer
376 }
377
378 pub(crate) fn new_readable(buffer: &'a BufferRef) -> BufferRefCursor<&'a BufferRef> {
379 skip_assert_initialized!();
380 let size = buffer.size() as u64;
381 let num_mem = buffer.n_memory();
382
383 BufferRefCursor {
384 buffer,
385 size,
386 num_mem,
387 cur_mem_idx: 0,
388 cur_offset: 0,
389 cur_mem_offset: 0,
390 map_info: unsafe { mem::zeroed() },
391 }
392 }
393}
394
395impl<'a> BufferRefCursor<&'a mut BufferRef> {
396 #[doc(alias = "get_buffer")]
397 pub fn buffer(&self) -> &BufferRef {
398 self.buffer
399 }
400
401 pub(crate) fn new_writable(
402 buffer: &'a mut BufferRef,
403 ) -> Result<BufferRefCursor<&'a mut BufferRef>, glib::BoolError> {
404 skip_assert_initialized!();
405 if !buffer.is_all_memory_writable() {
406 return Err(glib::bool_error!("Not all memories are writable"));
407 }
408
409 let size = buffer.size() as u64;
410 let num_mem = buffer.n_memory();
411
412 Ok(BufferRefCursor {
413 buffer,
414 size,
415 num_mem,
416 cur_mem_idx: 0,
417 cur_offset: 0,
418 cur_mem_offset: 0,
419 map_info: unsafe { mem::zeroed() },
420 })
421 }
422}
423
424unsafe impl<T> Send for BufferRefCursor<T> {}
425unsafe impl<T> Sync for BufferRefCursor<T> {}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 #[allow(clippy::cognitive_complexity)]
433 fn test_buffer_cursor() {
434 use std::io::{self, Read, Seek, Write};
435
436 crate::init().unwrap();
437
438 let mut buffer = Buffer::new();
439 {
440 let buffer = buffer.get_mut().unwrap();
441 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
442 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
443 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
444 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
445 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
446 }
447
448 assert!(buffer.is_all_memory_writable());
449 assert_eq!(buffer.n_memory(), 5);
450 assert_eq!(buffer.size(), 30);
451
452 let mut cursor = buffer.into_cursor_writable().unwrap();
453 assert_eq!(cursor.stream_position().unwrap(), 0);
454 cursor.write_all(b"01234567").unwrap();
455 assert_eq!(cursor.stream_position().unwrap(), 8);
456 cursor.write_all(b"890123").unwrap();
457 assert_eq!(cursor.stream_position().unwrap(), 14);
458 cursor.write_all(b"456").unwrap();
459 assert_eq!(cursor.stream_position().unwrap(), 17);
460 cursor.write_all(b"78901234567").unwrap();
461 assert_eq!(cursor.stream_position().unwrap(), 28);
462 cursor.write_all(b"89").unwrap();
463 assert_eq!(cursor.stream_position().unwrap(), 30);
464 assert!(cursor.write_all(b"0").is_err());
465
466 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
467 assert_eq!(cursor.stream_position().unwrap(), 5);
468 cursor.write_all(b"A").unwrap();
469
470 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
471 assert_eq!(cursor.stream_position().unwrap(), 25);
472 cursor.write_all(b"B").unwrap();
473
474 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
475 assert_eq!(cursor.stream_position().unwrap(), 25);
476 cursor.write_all(b"C").unwrap();
477
478 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
479 assert_eq!(cursor.stream_position().unwrap(), 27);
480 cursor.write_all(b"D").unwrap();
481
482 let buffer = cursor.into_buffer();
483
484 let mut cursor = buffer.into_cursor_readable();
485 let mut data = [0; 30];
486
487 assert_eq!(cursor.stream_position().unwrap(), 0);
488 cursor.read_exact(&mut data[0..7]).unwrap();
489 assert_eq!(cursor.stream_position().unwrap(), 7);
490 assert_eq!(&data[0..7], b"01234A6");
491 cursor.read_exact(&mut data[0..5]).unwrap();
492 assert_eq!(cursor.stream_position().unwrap(), 12);
493 assert_eq!(&data[0..5], b"78901");
494 cursor.read_exact(&mut data[0..10]).unwrap();
495 assert_eq!(cursor.stream_position().unwrap(), 22);
496 assert_eq!(&data[0..10], b"2345678901");
497 cursor.read_exact(&mut data[0..8]).unwrap();
498 assert_eq!(cursor.stream_position().unwrap(), 30);
499 assert_eq!(&data[0..8], b"234C6D89");
500 assert!(cursor.read_exact(&mut data[0..1]).is_err());
501
502 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
503 assert_eq!(cursor.stream_position().unwrap(), 5);
504 cursor.read_exact(&mut data[0..1]).unwrap();
505 assert_eq!(&data[0..1], b"A");
506
507 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
508 assert_eq!(cursor.stream_position().unwrap(), 25);
509 cursor.read_exact(&mut data[0..1]).unwrap();
510 assert_eq!(&data[0..1], b"C");
511
512 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
513 assert_eq!(cursor.stream_position().unwrap(), 25);
514 cursor.read_exact(&mut data[0..1]).unwrap();
515 assert_eq!(&data[0..1], b"C");
516
517 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
518 assert_eq!(cursor.stream_position().unwrap(), 27);
519 cursor.read_exact(&mut data[0..1]).unwrap();
520 assert_eq!(&data[0..1], b"D");
521 }
522
523 #[test]
524 #[allow(clippy::cognitive_complexity)]
525 fn test_buffer_cursor_ref() {
526 use std::io::{self, Read, Seek, Write};
527
528 crate::init().unwrap();
529
530 let mut buffer = Buffer::new();
531 {
532 let buffer = buffer.get_mut().unwrap();
533 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
534 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
535 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
536 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 5]));
537 buffer.append_memory(crate::Memory::from_mut_slice(vec![0; 10]));
538 }
539
540 assert!(buffer.is_all_memory_writable());
541 assert_eq!(buffer.n_memory(), 5);
542 assert_eq!(buffer.size(), 30);
543
544 {
545 let buffer = buffer.get_mut().unwrap();
546
547 let mut cursor = buffer.as_cursor_writable().unwrap();
548 assert_eq!(cursor.stream_position().unwrap(), 0);
549 cursor.write_all(b"01234567").unwrap();
550 assert_eq!(cursor.stream_position().unwrap(), 8);
551 cursor.write_all(b"890123").unwrap();
552 assert_eq!(cursor.stream_position().unwrap(), 14);
553 cursor.write_all(b"456").unwrap();
554 assert_eq!(cursor.stream_position().unwrap(), 17);
555 cursor.write_all(b"78901234567").unwrap();
556 assert_eq!(cursor.stream_position().unwrap(), 28);
557 cursor.write_all(b"89").unwrap();
558 assert_eq!(cursor.stream_position().unwrap(), 30);
559 assert!(cursor.write_all(b"0").is_err());
560
561 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
562 assert_eq!(cursor.stream_position().unwrap(), 5);
563 cursor.write_all(b"A").unwrap();
564
565 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
566 assert_eq!(cursor.stream_position().unwrap(), 25);
567 cursor.write_all(b"B").unwrap();
568
569 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
570 assert_eq!(cursor.stream_position().unwrap(), 25);
571 cursor.write_all(b"C").unwrap();
572
573 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
574 assert_eq!(cursor.stream_position().unwrap(), 27);
575 cursor.write_all(b"D").unwrap();
576 }
577
578 let mut cursor = buffer.as_cursor_readable();
579 let mut data = [0; 30];
580
581 assert_eq!(cursor.stream_position().unwrap(), 0);
582 cursor.read_exact(&mut data[0..7]).unwrap();
583 assert_eq!(cursor.stream_position().unwrap(), 7);
584 assert_eq!(&data[0..7], b"01234A6");
585 cursor.read_exact(&mut data[0..5]).unwrap();
586 assert_eq!(cursor.stream_position().unwrap(), 12);
587 assert_eq!(&data[0..5], b"78901");
588 cursor.read_exact(&mut data[0..10]).unwrap();
589 assert_eq!(cursor.stream_position().unwrap(), 22);
590 assert_eq!(&data[0..10], b"2345678901");
591 cursor.read_exact(&mut data[0..8]).unwrap();
592 assert_eq!(cursor.stream_position().unwrap(), 30);
593 assert_eq!(&data[0..8], b"234C6D89");
594 assert!(cursor.read_exact(&mut data[0..1]).is_err());
595
596 assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5);
597 assert_eq!(cursor.stream_position().unwrap(), 5);
598 cursor.read_exact(&mut data[0..1]).unwrap();
599 assert_eq!(&data[0..1], b"A");
600
601 assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25);
602 assert_eq!(cursor.stream_position().unwrap(), 25);
603 cursor.read_exact(&mut data[0..1]).unwrap();
604 assert_eq!(&data[0..1], b"C");
605
606 assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25);
607 assert_eq!(cursor.stream_position().unwrap(), 25);
608 cursor.read_exact(&mut data[0..1]).unwrap();
609 assert_eq!(&data[0..1], b"C");
610
611 assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27);
612 assert_eq!(cursor.stream_position().unwrap(), 27);
613 cursor.read_exact(&mut data[0..1]).unwrap();
614 assert_eq!(&data[0..1], b"D");
615 }
616}