Lgd. Viktor Klang

Systems all the way down

Viktor Klang bio photo


This is the sixth of several posts describing protips for scala.concurrent.Future. For the previous post, click here.

Since many have asked about how to create cancellable/interruptible Future instances I figured I’d write a blog post showcasing how such integration could be achieved.

Why Future is not Cancellable

If you’ve used Future before then you might know that Future is not cancellable, so let’s first explain why that is the case:

The primary reason why Scala’s Future are not Cancellable is because that makes them not freely sharable.

What do I mean when I say “not freely sharable”?

Alice: Creates Promise `p` and hands a reference to its Future `f` to Bob and Mallory.
       Alice is producing a value for it.
Bob: Wants to use the value for some important thing.
Mallory: Cancels `f`
Bob: (╯°□°)╯︵ ┻━┻

Conceptually Promise is cancellable, let me explain:

Alice: Creates Promise `p` with intent to produce a value.
Bob: Obtains a reference to `p` from Alice, Bob is now able to complete it.
Alice: Can during the production of the value of `p` interrogate whether `p`
       has already been completed, and cancel the production of the value.

You can think of it like this: Promise is a permission to write a value and Future is a permission to read a value.

So in the simple case, if you only want to hand out the permission to cancel, hand out: () => p.tryFailure(new CancellationException()), alternatively wrapped in your effects library of choice.

Now, you could imagine a solution where you could turn a cancellable Future into an uncancellable Future—but now you need to both make defensive copies and then you need glue code to track what should be cancellable from where. Not optimal to say the least!

Also, you’d want to track cancellability in the type system so that code can express a need for permission to cancel, leading to a CancellableFuture structure being needed—but then you have coupled the permission to read with the permission to cancel.

This leads us to either having some sort of Cancelable[future.type] alternatively future.Cancelable (dependent type) to track permission to cancel a specific Future, or go with (_: f.type) => p.tryFailure(new CancellationException), where f is the future of promise p.

So in either case cancellation is a concern that needs to be modeled/represented separately. Let’s see how we could model this using the tools we have at our disposal.

Adding interruptibility/cancellability

Let’s start by implementing a class which will integrate with the Java Thread Interrupt machinery. We’ll need to handle the following cases:

* The cancellation happens *before* logic execution
* The cancellation happens *during* logic execution
* The cancellation happens *after* logic execution
* The cancellation never happens
* The cancellation happens more than once
import scala.concurrent._
import java.util.concurrent.CancellationException

final class Interrupt extends (() => Boolean) {
  // We need a state-machine to track the progress.
  // It can have the following states:
  // a null reference means execution has not started.
  // a Thread reference means that the execution has started but is not done.
  // a this reference means that it is already cancelled or is already too late.
  private[this] final var state: AnyRef = null

  /**
   * This is the signal to cancel the execution of the logic.
   * Returns whether the cancellation signal was successully issued or not.
   **/
  override final def apply(): Boolean = this.synchronized {
    state match {
      case null        =>
        state = this
        true
      case _: this.type => false
      case t: Thread   =>
        state = this
        t.interrupt()
        true
    }
  }

  // Initializes right before execution of logic and
  // allows to not run the logic at all if already cancelled.
  private[this] final def enter(): Boolean =
   this.synchronized {
     state match {
        case _: this.type => false
        case null =>
         state = Thread.currentThread
         true
     }
  }

  // Cleans up after the logic has executed
  // Prevents cancellation to occur "too late"
  private[this] final def exit(): Boolean =
   this.synchronized {
     state match {
       case _: this.type => false
       case t: Thread =>
         state = this
         true
     }
  }

  /**
   * Executes the suplied block of logic and returns the result.
   * Throws CancellationException if the block was interrupted.
   **/
  def interruptibly[T](block: =>T): T =
    if (enter()) {
      try block catch {
        case ie: InterruptedException => throw new CancellationException()
      } finally {
        if(!exit() && Thread.interrupted()) 
          () // If we were interrupted and flag was not cleared
      }
    } else throw new CancellationException()
}

Then we want to provide an API for Future, so let’s create an implicit class, and then return a Future and a function which can cancel it.

implicit class FutureInterrupt(val future: Future.type) extends AnyVal {
def interruptibly[T](block: => T)(implicit ec: ExecutionContext): (Future[T], () => Boolean) = {
    val interrupt = new Interrupt()
    (Future(interrupt.interruptibly(block))(ec), interrupt)
  }
}

And here’s how we could use it:

import scala.concurrent.blocking
import scala.concurrent.ExecutionContext.Implicits._

scala> val (f, cancel) = Future.interruptibly {
    blocking { Thread.sleep(10000) }
    println("foo")
}

f: scala.concurrent.Future[Unit] = Future(<not completed>)
cancel: () => Boolean = <function0>

scala> cancel()
res0: Boolean = true

scala> f
res1: scala.concurrent.Future[Unit] = Future(Failure(java.util.concurrent.CancellationException))

Mission accomplished!

You could also add checks for Java Thread Interrupts in your own code to make it respect the interrupt signal by checking Thread.interrupted() and then throwing a new InterruptedException().

Cheers,