bat/tests/syntax-tests/highlighted/Scala/ConcurrentEffectLaws.scala

99 lines
21 KiB
Scala
Raw Normal View History

2020-10-04 08:29:14 +00:00
/*
 * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cats
package effect
package laws
import cats.effect.concurrent.Deferred
import cats.syntax.all._
import cats.laws._
import scala.concurrent.Promise
trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] {
 implicit def F: ConcurrentEffect[F]
 def runAsyncRunCancelableCoherence[A](fa: F[A]) = {
 val fa1 = IO.async[A] { cb =>
 F.runAsync(fa)(r => IO(cb(r))).unsafeRunSync()
 }
 val fa2 = IO.cancelable[A] { cb =>
 F.toIO(F.runCancelable(fa)(r => IO(cb(r))).unsafeRunSync())
 }
 fa1 <-> fa2
 }
 def runCancelableIsSynchronous[A] = {
 val lh = Deferred.uncancelable[F, Unit].flatMap { latch =>
 val spawned = Promise[Unit]()
 // Never ending task
 val ff = F.cancelable[A] { _ =>
 spawned.success(()); latch.complete(())
 }
 // Execute, then cancel
 val token = F.delay(F.runCancelable(ff)(_ => IO.unit).unsafeRunSync()).flatMap { cancel =>
 // Waiting for the task to start before cancelling it
 Async.fromFuture(F.pure(spawned.future)) >> cancel
 }
 F.liftIO(F.runAsync(token)(_ => IO.unit).toIO) *> latch.get
 }
 lh <-> F.unit
 }
 def runCancelableStartCancelCoherence[A](a: A) = {
 // Cancellation via runCancelable
 val f1: F[A] = for {
 effect1 <- Deferred.uncancelable[F, A]
 latch <- F.delay(Promise[Unit]())
 never = F.cancelable[A] { _ =>
 latch.success(()); effect1.complete(a)
 }
 cancel <- F.liftIO(F.runCancelable(never)(_ => IO.unit).toIO)
 // Waiting for the task to start before cancelling it
 _ <- Async.fromFuture(F.pure(latch.future)) // TODO get rid of this, IO, and Future here
 _ <- cancel
 result <- effect1.get
 } yield result
 // Cancellation via start.flatMap(_.cancel)
 val f2: F[A] = for {
 effect2 <- Deferred.uncancelable[F, A]
 // Using a latch to ensure that the task started
 latch <- Deferred.uncancelable[F, Unit]
 never = F.bracket(latch.complete(()))(_ => F.never[Unit])(_ => effect2.complete(a))
 fiber <- F.start(never)
 // Waiting for the task to start before cancelling it
 _ <- latch.get
 _ <- F.start(fiber.cancel)
 result <- effect2.get
 } yield result
 f1 <-> f2
 }
 def toIORunCancelableConsistency[A](fa: F[A]) =
 ConcurrentEffect.toIOFromRunCancelable(fa) <-> F.toIO(fa)
}
object ConcurrentEffectLaws {
 def apply[F[_]](implicit F0: ConcurrentEffect[F], contextShift0: ContextShift[F]): ConcurrentEffectLaws[F] =
 new ConcurrentEffectLaws[F] {
 val F = F0
 val contextShift = contextShift0
 }
}