partial support for streaming requests (doesn't actually work in the browser)

This commit is contained in:
Greg Johnston 2024-01-17 21:26:36 -05:00
parent 320179bc04
commit c5bab09423
10 changed files with 213 additions and 21 deletions

View file

@ -15,7 +15,7 @@ leptos = { path = "../../leptos", features = ["nightly"] }
leptos_axum = { path = "../../integrations/axum", optional = true }
leptos_meta = { path = "../../meta", features = ["nightly"] }
leptos_router = { path = "../../router", features = ["nightly"] }
server_fn = { path = "../../server_fn", features = ["serde-lite", "rkyv", "multipart"] }
server_fn = { path = "../../server_fn", features = ["serde-lite", "rkyv", "multipart" ]}
log = "0.4"
simple_logger = "4.0"
serde = { version = "1", features = ["derive"] }

View file

@ -50,6 +50,7 @@ pub fn HomePage() -> impl IntoView {
<RkyvExample/>
<FileUpload/>
<FileWatcher/>
<StreamingValues/>
}
}
@ -506,3 +507,47 @@ pub fn CustomErrorTypes() -> impl IntoView {
</p>
}
}
#[component]
pub fn StreamingValues() -> impl IntoView {
use futures::StreamExt;
/// You can create server functions that accept streaming values by using the encoding
/// `Streaming` (with type `ByteStream`) or encoding `StreamingText` (with type `TextStream`)
#[server(input = StreamingText, output = StreamingText)]
pub async fn streaming(input: TextStream) -> Result<TextStream, ServerFnError> {
println!("inside streaming() fn");
Ok(TextStream::from(input.into_inner().map(|text| format!("{}!!!", text.unwrap_or_else(|e| e.to_string())))))
}
let mut count = 0;
let (tx, rx) = futures::channel::mpsc::unbounded();
let (result, set_result) = create_signal("Click me...".to_string());
if cfg!(feature = "hydrate") {
spawn_local(async move {
logging::log!("calling streaming server fn");
match streaming(TextStream::new(rx)).await {
Ok(res) => {
logging::log!("after calling streaming()");
let mut stream = res.into_inner();
while let Some(chunk) = stream.next().await {
set_result(chunk.unwrap_or_else(|e| e.to_string()));
}
}, Err(e) => logging::log!("{e}") }
})
}
view! {
<h3>Streaming arguments and responses</h3>
<button
on:click=move |_| {
count += 1;
tx.unbounded_send(Ok(count.to_string())).expect("couldn't send into channel");
}
>
{result}
</button>
}
}

View file

@ -1,13 +1,14 @@
use super::{Encoding, FromRes};
use super::{Encoding, FromReq, FromRes, IntoReq};
use crate::{
error::{NoCustomError, ServerFnError},
request::{ClientReq, Req},
response::{ClientRes, Res},
IntoRes,
};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use http::Method;
use std::pin::Pin;
use std::{fmt::Debug, pin::Pin};
/// An encoding that represents a stream of bytes.
///
@ -19,25 +20,31 @@ impl Encoding for Streaming {
const METHOD: Method = Method::POST;
}
/* impl<CustErr, T, Request> IntoReq<ByteStream, Request, CustErr> for T
impl<CustErr, T, Request> IntoReq<Streaming, Request, CustErr> for T
where
Request: ClientReq<CustErr>,
T: Stream<Item = Bytes> + Send,
T: Stream<Item = Bytes> + Send + Sync + 'static,
{
fn into_req(self, path: &str, accepts: &str) -> Result<Request, ServerFnError<CustErr>> {
Request::try_new_stream(path, ByteStream::CONTENT_TYPE, self)
fn into_req(
self,
path: &str,
accepts: &str,
) -> Result<Request, ServerFnError<CustErr>> {
Request::try_new_streaming(path, accepts, Streaming::CONTENT_TYPE, self)
}
} */
}
/* impl<CustErr, T, Request> FromReq<ByteStream, Request, CustErr> for T
impl<CustErr, T, Request> FromReq<Streaming, Request, CustErr> for T
where
Request: Req<CustErr> + Send + 'static,
T: Stream<Item = Bytes> + Send,
T: From<ByteStream> + 'static,
{
async fn from_req(req: Request) -> Result<Self, ServerFnError<CustErr>> {
req.try_into_stream().await
let data = req.try_into_stream()?;
let s = ByteStream::new(data);
Ok(s.into())
}
} */
}
/// A stream of bytes.
///
@ -55,6 +62,24 @@ impl<CustErr> ByteStream<CustErr> {
}
}
impl<CustErr> Debug for ByteStream<CustErr> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ByteStream").finish()
}
}
impl ByteStream {
/// Creates a new `ByteStream` from the given stream.
pub fn new<T>(
value: impl Stream<Item = Result<T, ServerFnError>> + Send + 'static,
) -> Self
where
T: Into<Bytes>,
{
Self(Box::pin(value.map(|value| value.map(Into::into))))
}
}
impl<S, T> From<S> for ByteStream
where
S: Stream<Item = T> + Send + 'static,
@ -103,6 +128,21 @@ pub struct TextStream<CustErr = NoCustomError>(
Pin<Box<dyn Stream<Item = Result<String, ServerFnError<CustErr>>> + Send>>,
);
impl<CustErr> Debug for TextStream<CustErr> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("TextStream").finish()
}
}
impl TextStream {
/// Creates a new `ByteStream` from the given stream.
pub fn new(
value: impl Stream<Item = Result<String, ServerFnError>> + Send + 'static,
) -> Self {
Self(Box::pin(value.map(|value| value.map(Into::into))))
}
}
impl<CustErr> TextStream<CustErr> {
/// Consumes the wrapper, returning a stream of text.
pub fn into_inner(
@ -122,6 +162,43 @@ where
}
}
impl<CustErr, T, Request> IntoReq<StreamingText, Request, CustErr> for T
where
Request: ClientReq<CustErr>,
T: Into<TextStream>,
{
fn into_req(
self,
path: &str,
accepts: &str,
) -> Result<Request, ServerFnError<CustErr>> {
let data = self.into();
Request::try_new_streaming(
path,
accepts,
Streaming::CONTENT_TYPE,
data.0.map(|chunk| chunk.unwrap_or_default().into()),
)
}
}
impl<CustErr, T, Request> FromReq<StreamingText, Request, CustErr> for T
where
Request: Req<CustErr> + Send + 'static,
T: From<TextStream> + 'static,
{
async fn from_req(req: Request) -> Result<Self, ServerFnError<CustErr>> {
let data = req.try_into_stream()?;
let s = TextStream::new(data.map(|chunk| {
chunk.and_then(|bytes| {
String::from_utf8(bytes.to_vec())
.map_err(|e| ServerFnError::Deserialization(e.to_string()))
})
}));
Ok(s.into())
}
}
impl<CustErr, Response> IntoRes<StreamingText, Response, CustErr>
for TextStream<CustErr>
where

View file

@ -33,7 +33,10 @@ impl From<(HttpRequest, Payload)> for ActixRequest {
}
}
impl<CustErr> Req<CustErr> for ActixRequest {
impl<CustErr> Req<CustErr> for ActixRequest
where
CustErr: 'static,
{
fn as_query(&self) -> Option<&str> {
self.0 .0.uri().query()
}

View file

@ -8,7 +8,10 @@ use http::{
use http_body_util::BodyExt;
use std::borrow::Cow;
impl<CustErr> Req<CustErr> for Request<Body> {
impl<CustErr> Req<CustErr> for Request<Body>
where
CustErr: 'static,
{
fn as_query(&self) -> Option<&str> {
self.uri().query()
}
@ -49,7 +52,7 @@ impl<CustErr> Req<CustErr> for Request<Body> {
fn try_into_stream(
self,
) -> Result<
impl Stream<Item = Result<Bytes, ServerFnError>> + Send,
impl Stream<Item = Result<Bytes, ServerFnError>> + Send + 'static,
ServerFnError<CustErr>,
> {
Ok(self.into_body().into_data_stream().map(|chunk| {

View file

@ -1,9 +1,12 @@
use super::ClientReq;
use crate::{client::get_server_url, error::ServerFnError};
use bytes::Bytes;
use futures::{Stream, StreamExt};
pub use gloo_net::http::Request;
use js_sys::Uint8Array;
use send_wrapper::SendWrapper;
use wasm_bindgen::JsValue;
use wasm_streams::ReadableStream;
use web_sys::{FormData, UrlSearchParams};
/// A `fetch` request made in the browser.
@ -134,4 +137,24 @@ impl<CustErr> ClientReq<CustErr> for BrowserRequest {
.map_err(|e| ServerFnError::Request(e.to_string()))?,
)))
}
fn try_new_streaming(
path: &str,
accepts: &str,
content_type: &str,
body: impl Stream<Item = Bytes> + 'static,
) -> Result<Self, ServerFnError<CustErr>> {
let stream = ReadableStream::from_stream(body.map(|bytes| {
let data = Uint8Array::from(bytes.as_ref());
let data = JsValue::from(data);
Ok(data) as Result<JsValue, JsValue>
}));
Ok(Self(SendWrapper::new(
Request::post(path)
.header("Content-Type", content_type)
.header("Accept", accepts)
.body(stream.into_raw())
.map_err(|e| ServerFnError::Request(e.to_string()))?,
)))
}
}

View file

@ -62,6 +62,14 @@ where
accepts: &str,
body: Self::FormData,
) -> Result<Self, ServerFnError<CustErr>>;
/// Attempts to construct a new `POST` request with a streaming body.
fn try_new_streaming(
path: &str,
accepts: &str,
content_type: &str,
body: impl Stream<Item = Bytes> + Send + 'static,
) -> Result<Self, ServerFnError<CustErr>>;
}
/// Represents the request as received by the server.
@ -95,7 +103,7 @@ where
fn try_into_stream(
self,
) -> Result<
impl Stream<Item = Result<Bytes, ServerFnError>> + Send,
impl Stream<Item = Result<Bytes, ServerFnError>> + Send + 'static,
ServerFnError<CustErr>,
>;
}
@ -104,7 +112,10 @@ where
/// when compiling for the browser.
pub struct BrowserMockReq;
impl<CustErr> Req<CustErr> for BrowserMockReq {
impl<CustErr> Req<CustErr> for BrowserMockReq
where
CustErr: 'static,
{
fn as_query(&self) -> Option<&str> {
unreachable!()
}

View file

@ -1,8 +1,15 @@
use super::ClientReq;
use crate::{client::get_server_url, error::ServerFnError};
use crate::{
client::get_server_url,
error::{ServerFnError, ServerFnErrorErr},
};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use once_cell::sync::Lazy;
use reqwest::header::{ACCEPT, CONTENT_TYPE};
use reqwest::{
header::{ACCEPT, CONTENT_TYPE},
Body,
};
pub use reqwest::{multipart::Form, Client, Method, Request, Url};
pub(crate) static CLIENT: Lazy<Client> = Lazy::new(Client::new);
@ -88,4 +95,25 @@ impl<CustErr> ClientReq<CustErr> for Request {
.build()
.map_err(|e| ServerFnError::Request(e.to_string()))
}
fn try_new_streaming(
path: &str,
accepts: &str,
content_type: &str,
body: impl Stream<Item = Bytes> + 'static,
) -> Result<Self, ServerFnError<CustErr>> {
todo!("Streaming requests are not yet implemented for reqwest.")
/* let url = format!("{}{}", get_server_url(), path);
let body = Body::wrap_stream(
body.map(|chunk| Ok(chunk) as Result<Bytes, ServerFnErrorErr>),
);
CLIENT
.post(url)
.header(CONTENT_TYPE, content_type)
.header(ACCEPT, accepts)
.body(body)
.build()
.map_err(|e| ServerFnError::Request(e.to_string()))
}*/
}
}

View file

@ -64,7 +64,7 @@ pub trait ClientRes<CustErr> {
fn try_into_stream(
self,
) -> Result<
impl Stream<Item = Result<Bytes, ServerFnError>> + Send + 'static,
impl Stream<Item = Result<Bytes, ServerFnError>> + Send + Sync + 'static,
ServerFnError<CustErr>,
>;

View file

@ -351,7 +351,9 @@ pub fn server_macro_impl(
Clone, #server_fn_path::rkyv::Archive, #server_fn_path::rkyv::Serialize, #server_fn_path::rkyv::Deserialize
},
),
Some("MultipartFormData") => (PathInfo::None, quote! {}),
Some("MultipartFormData")
| Some("Streaming")
| Some("StreamingText") => (PathInfo::None, quote! {}),
Some("SerdeLite") => (
PathInfo::Serde,
quote! {