Add AsyncSeek trait to Reader to be able to seek inside asset loaders (#12547)

# Objective

For some asset loaders, it can be useful not to read the entire asset
file and just read a specific region of a file. For this, we need a way
to seek at a specific position inside the file

## Solution

I added support for `AsyncSeek` to `Reader`. In my case, I want to only
read a part of a file, and for that I need to seek to a specific point.

## Migration Guide

Every custom reader (which previously only needed the `AsyncRead` trait
implemented) now also needs to implement the `AsyncSeek` trait to add
the seek capability.
This commit is contained in:
BeastLe9enD 2024-03-30 23:26:30 +01:00 committed by GitHub
parent cdecd39e31
commit eb44db4437
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 182 additions and 7 deletions

View file

@ -1,4 +1,4 @@
use futures_io::{AsyncRead, AsyncWrite}; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_lite::Stream; use futures_lite::Stream;
use crate::io::{ use crate::io::{
@ -8,7 +8,7 @@ use crate::io::{
use std::{ use std::{
fs::{read_dir, File}, fs::{read_dir, File},
io::{Read, Write}, io::{Read, Seek, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
task::Poll, task::Poll,
@ -30,6 +30,18 @@ impl AsyncRead for FileReader {
} }
} }
impl AsyncSeek for FileReader {
fn poll_seek(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
) -> Poll<std::io::Result<u64>> {
let this = self.get_mut();
let seek = this.0.seek(pos);
Poll::Ready(seek)
}
}
struct FileWriter(File); struct FileWriter(File);
impl AsyncWrite for FileWriter { impl AsyncWrite for FileWriter {

View file

@ -1,8 +1,9 @@
use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use bevy_utils::HashMap; use bevy_utils::HashMap;
use futures_io::AsyncRead; use futures_io::{AsyncRead, AsyncSeek};
use futures_lite::{ready, Stream}; use futures_lite::{ready, Stream};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::io::SeekFrom;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
@ -236,6 +237,46 @@ impl AsyncRead for DataReader {
} }
} }
impl AsyncSeek for DataReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self
.data
.value()
.len()
.try_into()
.map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
}
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}
impl AssetReader for MemoryAssetReader { impl AssetReader for MemoryAssetReader {
async fn read<'a>(&'a self, path: &'a Path) -> Result<Box<Reader<'a>>, AssetReaderError> { async fn read<'a>(&'a self, path: &'a Path) -> Result<Box<Reader<'a>>, AssetReaderError> {
self.root self.root

View file

@ -22,8 +22,10 @@ pub use futures_lite::{AsyncReadExt, AsyncWriteExt};
pub use source::*; pub use source::*;
use bevy_utils::{BoxedFuture, ConditionalSendFuture}; use bevy_utils::{BoxedFuture, ConditionalSendFuture};
use futures_io::{AsyncRead, AsyncWrite}; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_lite::{ready, Stream}; use futures_lite::{ready, Stream};
use std::io::SeekFrom;
use std::task::Context;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
@ -55,7 +57,11 @@ impl From<std::io::Error> for AssetReaderError {
} }
} }
pub type Reader<'a> = dyn AsyncRead + Unpin + Send + Sync + 'a; pub trait AsyncReadAndSeek: AsyncRead + AsyncSeek {}
impl<T: AsyncRead + AsyncSeek> AsyncReadAndSeek for T {}
pub type Reader<'a> = dyn AsyncReadAndSeek + Unpin + Send + Sync + 'a;
/// Performs read operations on an asset storage. [`AssetReader`] exposes a "virtual filesystem" /// Performs read operations on an asset storage. [`AssetReader`] exposes a "virtual filesystem"
/// API, where asset bytes and asset metadata bytes are both stored and accessible for a given /// API, where asset bytes and asset metadata bytes are both stored and accessible for a given
@ -442,6 +448,108 @@ impl AsyncRead for VecReader {
} }
} }
impl AsyncSeek for VecReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
}
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}
/// An [`AsyncRead`] implementation capable of reading a [`&[u8]`].
pub struct SliceReader<'a> {
bytes: &'a [u8],
bytes_read: usize,
}
impl<'a> SliceReader<'a> {
/// Create a new [`SliceReader`] for `bytes`.
pub fn new(bytes: &'a [u8]) -> Self {
Self {
bytes,
bytes_read: 0,
}
}
}
impl<'a> AsyncRead for SliceReader<'a> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
if self.bytes_read >= self.bytes.len() {
Poll::Ready(Ok(0))
} else {
let n = ready!(Pin::new(&mut &self.bytes[self.bytes_read..]).poll_read(cx, buf))?;
self.bytes_read += n;
Poll::Ready(Ok(n))
}
}
}
impl<'a> AsyncSeek for SliceReader<'a> {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
}
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}
/// Appends `.meta` to the given path. /// Appends `.meta` to the given path.
pub(crate) fn get_meta_path(path: &Path) -> PathBuf { pub(crate) fn get_meta_path(path: &Path) -> PathBuf {
let mut meta_path = path.to_path_buf(); let mut meta_path = path.to_path_buf();

View file

@ -5,7 +5,9 @@ use crate::{
}; };
use async_lock::RwLockReadGuardArc; use async_lock::RwLockReadGuardArc;
use bevy_utils::tracing::trace; use bevy_utils::tracing::trace;
use futures_io::AsyncRead; use futures_io::{AsyncRead, AsyncSeek};
use std::io::SeekFrom;
use std::task::Poll;
use std::{path::Path, pin::Pin, sync::Arc}; use std::{path::Path, pin::Pin, sync::Arc};
use super::ErasedAssetReader; use super::ErasedAssetReader;
@ -139,3 +141,13 @@ impl<'a> AsyncRead for TransactionLockedReader<'a> {
Pin::new(&mut self.reader).poll_read(cx, buf) Pin::new(&mut self.reader).poll_read(cx, buf)
} }
} }
impl<'a> AsyncSeek for TransactionLockedReader<'a> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.reader).poll_seek(cx, pos)
}
}

View file

@ -1,3 +1,4 @@
use crate::io::SliceReader;
use crate::{ use crate::{
io::{ io::{
AssetReaderError, AssetWriterError, MissingAssetWriterError, AssetReaderError, AssetWriterError, MissingAssetWriterError,
@ -343,12 +344,13 @@ impl<'a> ProcessContext<'a> {
let server = &self.processor.server; let server = &self.processor.server;
let loader_name = std::any::type_name::<L>(); let loader_name = std::any::type_name::<L>();
let loader = server.get_asset_loader_with_type_name(loader_name).await?; let loader = server.get_asset_loader_with_type_name(loader_name).await?;
let mut reader = SliceReader::new(self.asset_bytes);
let loaded_asset = server let loaded_asset = server
.load_with_meta_loader_and_reader( .load_with_meta_loader_and_reader(
self.path, self.path,
Box::new(meta), Box::new(meta),
&*loader, &*loader,
&mut self.asset_bytes, &mut reader,
false, false,
true, true,
) )