Update stream impls.

The basic PgListener stream impl now yields `Result<PgNotification>`
elements without an `Option` in the result. The option condition
originally represented the closure of the underlying connection. Now
such conditions will terminate the stream, as one would expect. The
`PgListener.recv()` method signature has not been changed.

PgPoolListener has also been updated. The interfaces on this struct will
never yield an inner `Option` as it will instead acquire a new
connection and continue its work. Both the stream impl & the `recv`
method have received an update to their signatures.
This commit is contained in:
Anthony Dodd 2020-02-20 10:05:01 -06:00 committed by Ryan Leckey
parent 608556f91a
commit 82923a1aaa

View file

@ -98,10 +98,14 @@ where
}
/// Consume this listener, returning a `Stream` of notifications.
pub fn into_stream(mut self) -> impl Stream<Item = Result<Option<PgNotification>>> {
pub fn into_stream(mut self) -> impl Stream<Item = Result<PgNotification>> {
stream! {
loop {
yield self.recv().await
match self.recv().await {
Ok(Some(msg)) => yield Ok(msg),
Ok(None) => break,
Err(err) => yield Err(err),
}
}
}
}
@ -159,7 +163,7 @@ impl PgPoolListener {
impl PgPoolListener {
/// Receives the next notification available from any of the subscribed channels.
pub async fn recv(&mut self) -> Result<Option<PgNotification>> {
pub async fn recv(&mut self) -> Result<PgNotification> {
loop {
// Ensure we have an active connection to work with.
let conn = match &mut self.connection {
@ -186,7 +190,7 @@ impl PgPoolListener {
match conn.receive().await? {
// We've received an async notification, return it.
Some(Message::NotificationResponse(notification)) => {
return Ok(Some(notification.into()));
return Ok(notification.into());
}
// Protocol error, return the error.
Some(msg) => {
@ -207,7 +211,7 @@ impl PgPoolListener {
}
/// Consume this listener, returning a `Stream` of notifications.
pub fn into_stream(mut self) -> impl Stream<Item = Result<Option<PgNotification>>> {
pub fn into_stream(mut self) -> impl Stream<Item = Result<PgNotification>> {
stream! {
loop {
yield self.recv().await