diff --git a/examples/server_fns_axum/src/app.rs b/examples/server_fns_axum/src/app.rs index acfdcdfb1..e6a5b5e5e 100644 --- a/examples/server_fns_axum/src/app.rs +++ b/examples/server_fns_axum/src/app.rs @@ -50,7 +50,6 @@ pub fn HomePage() -> impl IntoView { - } } @@ -507,47 +506,3 @@ pub fn CustomErrorTypes() -> impl IntoView {

} } - -#[component] -pub fn StreamingValues() -> impl IntoView { - use futures::StreamExt; - - /// You can create server functions that accept streaming values by using the encoding - /// `Streaming` (with type `ByteStream`) or encoding `StreamingText` (with type `TextStream`) - #[server(input = StreamingText, output = StreamingText)] - pub async fn streaming(input: TextStream) -> Result { - println!("inside streaming() fn"); - Ok(TextStream::from(input.into_inner().map(|text| format!("{}!!!", text.unwrap_or_else(|e| e.to_string()))))) - } - - let mut count = 0; - let (tx, rx) = futures::channel::mpsc::unbounded(); - let (result, set_result) = create_signal("Click me...".to_string()); - - - if cfg!(feature = "hydrate") { - spawn_local(async move { - logging::log!("calling streaming server fn"); - match streaming(TextStream::new(rx)).await { - Ok(res) => { - logging::log!("after calling streaming()"); - let mut stream = res.into_inner(); - while let Some(chunk) = stream.next().await { - set_result(chunk.unwrap_or_else(|e| e.to_string())); - } - }, Err(e) => logging::log!("{e}") } - }) - } - - view! { -

Streaming arguments and responses

- - } -} diff --git a/server_fn/src/codec/stream.rs b/server_fn/src/codec/stream.rs index 38173f607..15e276536 100644 --- a/server_fn/src/codec/stream.rs +++ b/server_fn/src/codec/stream.rs @@ -13,6 +13,15 @@ use std::{fmt::Debug, pin::Pin}; /// An encoding that represents a stream of bytes. /// /// A server function that uses this as its output encoding should return [`ByteStream`]. +/// +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which +/// means that that they do begin handling responses until the full request has been sent. +/// This means that if you use a streaming input encoding, the input stream needs to +/// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. pub struct Streaming; impl Encoding for Streaming { @@ -49,6 +58,15 @@ where /// A stream of bytes. /// /// A server function can return this type if its output encoding is [`Streaming`]. +/// +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which +/// means that that they do begin handling responses until the full request has been sent. +/// This means that if you use a streaming input encoding, the input stream needs to +/// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. pub struct ByteStream( Pin>> + Send>>, ); @@ -115,10 +133,14 @@ where /// /// A server function that uses this as its output encoding should return [`TextStream`]. /// -/// **Note**: Browser fetch requests do not currently support full request duplexing, which +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which /// means that that they do begin handling responses until the full request has been sent. -/// This means that if you use streaming text as an input encoding, the input stream needs to +/// This means that if you use a streaming input encoding, the input stream needs to /// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. pub struct StreamingText; impl Encoding for StreamingText { @@ -130,10 +152,14 @@ impl Encoding for StreamingText { /// /// A server function can return this type if its output encoding is [`StreamingText`]. /// -/// **Note**: Browser fetch requests do not currently support full request duplexing, which +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which /// means that that they do begin handling responses until the full request has been sent. -/// This means that if you use streaming text as an input encoding, the input stream needs to +/// This means that if you use a streaming input encoding, the input stream needs to /// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. pub struct TextStream( Pin>> + Send>>, ); diff --git a/server_fn/src/request/browser.rs b/server_fn/src/request/browser.rs index 4e5db475a..5708ed3d2 100644 --- a/server_fn/src/request/browser.rs +++ b/server_fn/src/request/browser.rs @@ -3,11 +3,11 @@ use crate::{client::get_server_url, error::ServerFnError}; use bytes::Bytes; use futures::{Stream, StreamExt}; pub use gloo_net::http::Request; -use js_sys::Uint8Array; +use js_sys::{Reflect, Uint8Array}; use send_wrapper::SendWrapper; use wasm_bindgen::JsValue; use wasm_streams::ReadableStream; -use web_sys::{FormData, UrlSearchParams}; +use web_sys::{FormData, Headers, RequestInit, UrlSearchParams}; /// A `fetch` request made in the browser. #[derive(Debug)] @@ -143,17 +143,36 @@ impl ClientReq for BrowserRequest { content_type: &str, body: impl Stream + 'static, ) -> Result> { - let stream = ReadableStream::from_stream(body.map(|bytes| { - let data = Uint8Array::from(bytes.as_ref()); - let data = JsValue::from(data); - Ok(data) as Result - })); - Ok(Self(SendWrapper::new( - Request::post(path) - .header("Content-Type", content_type) - .header("Accept", accepts) - .body(stream.into_raw()) - .map_err(|e| ServerFnError::Request(e.to_string()))?, - ))) + let req = streaming_request(path, accepts, content_type, body) + .map_err(|e| ServerFnError::Request(format!("{e:?}")))?; + Ok(Self(SendWrapper::new(req))) } } + +fn streaming_request( + path: &str, + accepts: &str, + content_type: &str, + body: impl Stream + 'static, +) -> Result { + let stream = ReadableStream::from_stream(body.map(|bytes| { + let data = Uint8Array::from(bytes.as_ref()); + let data = JsValue::from(data); + Ok(data) as Result + })) + .into_raw(); + let headers = Headers::new()?; + headers.append("Content-Type", content_type)?; + headers.append("Accept", accepts)?; + let mut init = RequestInit::new(); + init.headers(&headers).method("POST").body(Some(&stream)); + + // Chrome requires setting `duplex: "half"` on streaming requests + Reflect::set( + &init, + &JsValue::from_str("duplex"), + &JsValue::from_str("half"), + )?; + let req = web_sys::Request::new_with_str_and_init(path, &init)?; + Ok(Request::from(req)) +}