mirror of
https://github.com/sharkdp/bat
synced 2024-12-21 01:33:05 +00:00
99 lines
3.2 KiB
Scala
99 lines
3.2 KiB
Scala
|
/*
|
||
|
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package cats
|
||
|
package effect
|
||
|
package laws
|
||
|
|
||
|
import cats.effect.concurrent.Deferred
|
||
|
import cats.syntax.all._
|
||
|
import cats.laws._
|
||
|
|
||
|
import scala.concurrent.Promise
|
||
|
|
||
|
trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] {
|
||
|
implicit def F: ConcurrentEffect[F]
|
||
|
|
||
|
def runAsyncRunCancelableCoherence[A](fa: F[A]) = {
|
||
|
val fa1 = IO.async[A] { cb =>
|
||
|
F.runAsync(fa)(r => IO(cb(r))).unsafeRunSync()
|
||
|
}
|
||
|
val fa2 = IO.cancelable[A] { cb =>
|
||
|
F.toIO(F.runCancelable(fa)(r => IO(cb(r))).unsafeRunSync())
|
||
|
}
|
||
|
fa1 <-> fa2
|
||
|
}
|
||
|
|
||
|
def runCancelableIsSynchronous[A] = {
|
||
|
val lh = Deferred.uncancelable[F, Unit].flatMap { latch =>
|
||
|
val spawned = Promise[Unit]()
|
||
|
// Never ending task
|
||
|
val ff = F.cancelable[A] { _ =>
|
||
|
spawned.success(()); latch.complete(())
|
||
|
}
|
||
|
// Execute, then cancel
|
||
|
val token = F.delay(F.runCancelable(ff)(_ => IO.unit).unsafeRunSync()).flatMap { cancel =>
|
||
|
// Waiting for the task to start before cancelling it
|
||
|
Async.fromFuture(F.pure(spawned.future)) >> cancel
|
||
|
}
|
||
|
F.liftIO(F.runAsync(token)(_ => IO.unit).toIO) *> latch.get
|
||
|
}
|
||
|
lh <-> F.unit
|
||
|
}
|
||
|
|
||
|
def runCancelableStartCancelCoherence[A](a: A) = {
|
||
|
// Cancellation via runCancelable
|
||
|
val f1: F[A] = for {
|
||
|
effect1 <- Deferred.uncancelable[F, A]
|
||
|
latch <- F.delay(Promise[Unit]())
|
||
|
never = F.cancelable[A] { _ =>
|
||
|
latch.success(()); effect1.complete(a)
|
||
|
}
|
||
|
cancel <- F.liftIO(F.runCancelable(never)(_ => IO.unit).toIO)
|
||
|
// Waiting for the task to start before cancelling it
|
||
|
_ <- Async.fromFuture(F.pure(latch.future)) // TODO get rid of this, IO, and Future here
|
||
|
_ <- cancel
|
||
|
result <- effect1.get
|
||
|
} yield result
|
||
|
|
||
|
// Cancellation via start.flatMap(_.cancel)
|
||
|
val f2: F[A] = for {
|
||
|
effect2 <- Deferred.uncancelable[F, A]
|
||
|
// Using a latch to ensure that the task started
|
||
|
latch <- Deferred.uncancelable[F, Unit]
|
||
|
never = F.bracket(latch.complete(()))(_ => F.never[Unit])(_ => effect2.complete(a))
|
||
|
fiber <- F.start(never)
|
||
|
// Waiting for the task to start before cancelling it
|
||
|
_ <- latch.get
|
||
|
_ <- F.start(fiber.cancel)
|
||
|
result <- effect2.get
|
||
|
} yield result
|
||
|
|
||
|
f1 <-> f2
|
||
|
}
|
||
|
|
||
|
def toIORunCancelableConsistency[A](fa: F[A]) =
|
||
|
ConcurrentEffect.toIOFromRunCancelable(fa) <-> F.toIO(fa)
|
||
|
}
|
||
|
|
||
|
object ConcurrentEffectLaws {
|
||
|
def apply[F[_]](implicit F0: ConcurrentEffect[F], contextShift0: ContextShift[F]): ConcurrentEffectLaws[F] =
|
||
|
new ConcurrentEffectLaws[F] {
|
||
|
val F = F0
|
||
|
val contextShift = contextShift0
|
||
|
}
|
||
|
}
|