example: file upload with streaming progress bar (#2242)

This commit is contained in:
Greg Johnston 2024-01-29 15:20:19 -05:00 committed by GitHub
parent 0ff1e279a2
commit 6ef1531059
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 172 additions and 3 deletions

View file

@ -31,6 +31,9 @@ web-sys = { version = "0.3.67", features = ["FileList", "File"] }
strum = { version = "0.25.0", features = ["strum_macros", "derive"] }
notify = { version = "6.1.1", optional = true }
pin-project-lite = "0.2.13"
dashmap = { version = "5.5.3", optional = true }
once_cell = { version = "1.19.0", optional = true }
async-broadcast = { version = "0.6.0", optional = true }
[features]
hydrate = ["leptos/hydrate", "leptos_meta/hydrate", "leptos_router/hydrate"]
@ -43,7 +46,10 @@ ssr = [
"leptos_meta/ssr",
"leptos_router/ssr",
"dep:leptos_axum",
"dep:notify"
"dep:notify",
"dep:dashmap",
"dep:once_cell",
"dep:async-broadcast"
]
[package.metadata.cargo-all-features]

View file

@ -55,6 +55,7 @@ pub fn HomePage() -> impl IntoView {
<ServerFnArgumentExample/>
<RkyvExample/>
<FileUpload/>
<FileUploadWithProgress/>
<FileWatcher/>
<CustomEncoding/>
}
@ -331,8 +332,8 @@ pub fn FileUpload() -> impl IntoView {
///
/// On the server, this uses the `multer` crate, which provides a streaming API.
#[server(
input = MultipartFormData,
)]
input = MultipartFormData,
)]
pub async fn file_length(
data: MultipartData,
) -> Result<usize, ServerFnError> {
@ -390,6 +391,168 @@ pub fn FileUpload() -> impl IntoView {
}
}
/// This component uses server functions to upload a file, while streaming updates on the upload
/// progress.
#[component]
pub fn FileUploadWithProgress() -> impl IntoView {
/// In theory, you could create a single server function which
/// 1) received multipart form data
/// 2) returned a stream that contained updates on the progress
///
/// In reality, browsers do not actually support duplexing requests in this way. In other
/// words, every existing browser actually requires that the request stream be complete before
/// it begins processing the response stream.
///
/// Instead, we can create two separate server functions:
/// 1) one that receives multipart form data and begins processing the upload
/// 2) a second that returns a stream of updates on the progress
///
/// This requires us to store some global state of all the uploads. In a real app, you probably
/// shouldn't do exactly what I'm doing here in the demo. For example, this map just
/// distinguishes between files by filename, not by user.
#[cfg(feature = "ssr")]
mod progress {
use async_broadcast::{broadcast, Receiver, Sender};
use dashmap::DashMap;
use futures::Stream;
use once_cell::sync::Lazy;
struct File {
total: usize,
tx: Sender<usize>,
rx: Receiver<usize>,
}
static FILES: Lazy<DashMap<String, File>> = Lazy::new(DashMap::new);
pub async fn add_chunk(filename: &str, len: usize) {
println!("[{filename}]\tadding {len}");
let mut entry =
FILES.entry(filename.to_string()).or_insert_with(|| {
println!("[{filename}]\tinserting channel");
let (tx, rx) = broadcast(128);
File { total: 0, tx, rx }
});
entry.total += len;
let new_total = entry.total;
// we're about to do an async broadcast, so we don't want to hold a lock across it
let tx = entry.tx.clone();
drop(entry);
// now we send the message and don't have to worry about it
tx.broadcast(new_total)
.await
.expect("couldn't send a message over channel");
}
pub fn for_file(filename: &str) -> impl Stream<Item = usize> {
let entry =
FILES.entry(filename.to_string()).or_insert_with(|| {
println!("[{filename}]\tinserting channel");
let (tx, rx) = broadcast(128);
File { total: 0, tx, rx }
});
entry.rx.clone()
}
}
#[server(
input = MultipartFormData,
)]
pub async fn upload_file(data: MultipartData) -> Result<(), ServerFnError> {
let mut data = data.into_inner().unwrap();
while let Ok(Some(mut field)) = data.next_field().await {
let name =
field.file_name().expect("no filename on field").to_string();
while let Ok(Some(chunk)) = field.chunk().await {
let len = chunk.len();
println!("[{name}]\t{len}");
progress::add_chunk(&name, len).await;
// in a real server function, you'd do something like saving the file here
}
}
Ok(())
}
#[server(output = StreamingText)]
pub async fn file_progress(
filename: String,
) -> Result<TextStream, ServerFnError> {
println!("getting progress on {filename}");
// get the stream of current length for the file
let progress = progress::for_file(&filename);
// separate each number with a newline
// the HTTP response might pack multiple lines of this into a single chunk
// we need some way of dividing them up
let progress = progress.map(|bytes| Ok(format!("{bytes}\n")));
Ok(TextStream::new(progress))
}
let (filename, set_filename) = create_signal(None);
let (max, set_max) = create_signal(None);
let (current, set_current) = create_signal(None);
let on_submit = move |ev: SubmitEvent| {
ev.prevent_default();
let target = ev.target().unwrap().unchecked_into::<HtmlFormElement>();
let form_data = FormData::new_with_form(&target).unwrap();
let file = form_data
.get("file_to_upload")
.unchecked_into::<web_sys::File>();
let filename = file.name();
let size = file.size() as usize;
set_filename(Some(filename.clone()));
set_max(Some(size));
set_current(None::<usize>);
spawn_local(async move {
let mut progress = file_progress(filename)
.await
.expect("couldn't initialize stream")
.into_inner();
while let Some(Ok(len)) = progress.next().await {
// the TextStream from the server function will be a series of `usize` values
// however, the response itself may pack those chunks into a smaller number of
// chunks, each with more text in it
// so we've padded them with newspace, and will split them out here
// each value is the latest total, so we'll just take the last one
let len = len
.split('\n')
.filter(|n| !n.is_empty())
.last()
.expect(
"expected at least one non-empty value from \
newline-delimited rows",
)
.parse::<usize>()
.expect("invalid length");
set_current(Some(len));
}
});
spawn_local(async move {
upload_file(form_data.into())
.await
.expect("couldn't upload file");
});
};
view! {
<h3>File Upload with Progress</h3>
<p>A file upload with progress can be handled with two separate server functions.</p>
<aside>See the doc comment on the component for an explanation.</aside>
<form on:submit=on_submit>
<input type="file" name="file_to_upload"/>
<input type="submit"/>
</form>
{move || filename().map(|filename| view! { <p>Uploading {filename}</p> })}
{move || max().map(|max| view! {
<progress max=max value=move || current().unwrap_or_default()/>
})}
}
}
#[component]
pub fn FileWatcher() -> impl IntoView {
#[server(input = GetUrl, output = StreamingText)]