bat/tests/syntax-tests/source/Scala/ConcurrentEffectLaws.scala

99 lines
3.2 KiB
Scala

/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats
package effect
package laws
import cats.effect.concurrent.Deferred
import cats.syntax.all._
import cats.laws._
import scala.concurrent.Promise
trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] {
implicit def F: ConcurrentEffect[F]
def runAsyncRunCancelableCoherence[A](fa: F[A]) = {
val fa1 = IO.async[A] { cb =>
F.runAsync(fa)(r => IO(cb(r))).unsafeRunSync()
}
val fa2 = IO.cancelable[A] { cb =>
F.toIO(F.runCancelable(fa)(r => IO(cb(r))).unsafeRunSync())
}
fa1 <-> fa2
}
def runCancelableIsSynchronous[A] = {
val lh = Deferred.uncancelable[F, Unit].flatMap { latch =>
val spawned = Promise[Unit]()
// Never ending task
val ff = F.cancelable[A] { _ =>
spawned.success(()); latch.complete(())
}
// Execute, then cancel
val token = F.delay(F.runCancelable(ff)(_ => IO.unit).unsafeRunSync()).flatMap { cancel =>
// Waiting for the task to start before cancelling it
Async.fromFuture(F.pure(spawned.future)) >> cancel
}
F.liftIO(F.runAsync(token)(_ => IO.unit).toIO) *> latch.get
}
lh <-> F.unit
}
def runCancelableStartCancelCoherence[A](a: A) = {
// Cancellation via runCancelable
val f1: F[A] = for {
effect1 <- Deferred.uncancelable[F, A]
latch <- F.delay(Promise[Unit]())
never = F.cancelable[A] { _ =>
latch.success(()); effect1.complete(a)
}
cancel <- F.liftIO(F.runCancelable(never)(_ => IO.unit).toIO)
// Waiting for the task to start before cancelling it
_ <- Async.fromFuture(F.pure(latch.future)) // TODO get rid of this, IO, and Future here
_ <- cancel
result <- effect1.get
} yield result
// Cancellation via start.flatMap(_.cancel)
val f2: F[A] = for {
effect2 <- Deferred.uncancelable[F, A]
// Using a latch to ensure that the task started
latch <- Deferred.uncancelable[F, Unit]
never = F.bracket(latch.complete(()))(_ => F.never[Unit])(_ => effect2.complete(a))
fiber <- F.start(never)
// Waiting for the task to start before cancelling it
_ <- latch.get
_ <- F.start(fiber.cancel)
result <- effect2.get
} yield result
f1 <-> f2
}
def toIORunCancelableConsistency[A](fa: F[A]) =
ConcurrentEffect.toIOFromRunCancelable(fa) <-> F.toIO(fa)
}
object ConcurrentEffectLaws {
def apply[F[_]](implicit F0: ConcurrentEffect[F], contextShift0: ContextShift[F]): ConcurrentEffectLaws[F] =
new ConcurrentEffectLaws[F] {
val F = F0
val contextShift = contextShift0
}
}