diff --git a/sqlx-core/src/postgres/listen.rs b/sqlx-core/src/postgres/listen.rs index b502e2e4..6b8c37f5 100644 --- a/sqlx-core/src/postgres/listen.rs +++ b/sqlx-core/src/postgres/listen.rs @@ -98,10 +98,14 @@ where } /// Consume this listener, returning a `Stream` of notifications. - pub fn into_stream(mut self) -> impl Stream>> { + pub fn into_stream(mut self) -> impl Stream> { stream! { loop { - yield self.recv().await + match self.recv().await { + Ok(Some(msg)) => yield Ok(msg), + Ok(None) => break, + Err(err) => yield Err(err), + } } } } @@ -159,7 +163,7 @@ impl PgPoolListener { impl PgPoolListener { /// Receives the next notification available from any of the subscribed channels. - pub async fn recv(&mut self) -> Result> { + pub async fn recv(&mut self) -> Result { loop { // Ensure we have an active connection to work with. let conn = match &mut self.connection { @@ -186,7 +190,7 @@ impl PgPoolListener { match conn.receive().await? { // We've received an async notification, return it. Some(Message::NotificationResponse(notification)) => { - return Ok(Some(notification.into())); + return Ok(notification.into()); } // Protocol error, return the error. Some(msg) => { @@ -207,7 +211,7 @@ impl PgPoolListener { } /// Consume this listener, returning a `Stream` of notifications. - pub fn into_stream(mut self) -> impl Stream>> { + pub fn into_stream(mut self) -> impl Stream> { stream! { loop { yield self.recv().await