ki

A lightweight structured concurrency library

https://github.com/awkward-squad/ki

Version on this page:1.0.1.0
LTS Haskell 22.39:1.0.1.2
Stackage Nightly 2024-10-31:1.0.1.2
Latest on Hackage:1.0.1.2

See all snapshots ki appears in

BSD-3-Clause licensed by Mitchell Rosen
This version can be pinned in stack with:ki-1.0.1.0@sha256:9c8b49218ed32f70ceefd5b8afb1a6f7a7ff9551d63eef4fa709b7d932b8199d,2583

Module documentation for 1.0.1.0

Depends on 2 packages(full list with versions):
ki ki-unlifted
GitHub CI
Hackage Hackage
Stackage LTS Stackage LTS
Stackage Nightly Stackage Nightly
Dependencies Dependencies

Overview

ki is a lightweight structured-concurrency library inspired by many other projects and blog posts:

A previous version of ki also included a mechanism for soft-cancellation/graceful shutdown, which took inspiration from:

However, this feature was removed (perhaps temporarily) because the design of the API was unsatisfactory.

Documentation

Hackage documentation

Example: Happy Eyeballs

The Happy Eyeballs algorithm is a particularly common example used to demonstrate the advantages of structured concurrency, because it is simple to describe, but can be surprisingly difficult to implement.

The problem can be abstractly described as follows: we have a small set of actions to run, each of which can take arbitrarily long, or fail. Each action is a different way of computing the same value, so we only need to wait for one action to return successfully. We don’t want to run the actions one at a time (because that is likely to take too long), nor all at once (because that is an improper use of resources). Rather, we will begin executing the first action, then wait 250 milliseconds, then begin executing the second, and so on, until one returns successfully.

There are of course a number of ways to implement this algorithm. We’ll do something non-optimal, but simple. Let’s get the imports out of the way first.

import Control.Concurrent
import Control.Monad (when)
import Control.Monad.STM (atomically)
import Data.Function ((&))
import Data.Functor (void)
import Data.List qualified as List
import Data.Maybe (isJust)
import Ki qualified

Next, let’s define a staggeredSpawner helper that implements the majority of the core algorithm: given a list of actions, spawn them all at 250 millisecond intervals. After all actions are spawned, we block until all of them have returned.

staggeredSpawner :: [IO ()] -> IO ()
staggeredSpawner actions = do
  Ki.scoped \scope -> do
    actions
      & map (\action -> void (Ki.fork scope action))
      & List.intersperse (threadDelay 250_000)
      & sequence_
    atomically (Ki.awaitAll scope)

And finally, we wrap this helper with happyEyeballs, which accepts a list of actions, and returns when one action returns successfully, or returns Nothing if all actions fail. Note that in a real implementation, we may want to consider what to do if an action throws an exception. Here, we trust each action to signal failure by returning Nothing.

happyEyeballs :: [IO (Maybe a)] -> IO (Maybe a)
happyEyeballs actions = do
  resultVar <- newEmptyMVar

  let worker action = do
        result <- action
        when (isJust result) do
          _ <- tryPutMVar resultVar result
          pure ()

  Ki.scoped \scope -> do
    _ <-
      Ki.fork scope do
        staggeredSpawner (map worker actions)
        tryPutMVar resultVar Nothing
    takeMVar resultVar

Changes

[1.0.1.0] - 2023-04-03

  • Change #25: Attempting to fork a thread in a closing scope now acts as if it were a child being terminated due to the scope closing. Previously, attempting to fork a thread in a closing scope would throw a runtime exception like error "ki: scope closed".
  • Change #27: Calling awaitAll on a closed scope now returns () instead of blocking forever.

[1.0.0.2] - 2023-01-25

  • Bugfix #20: previously, a child thread could deadlock when attempting to propagate an exception to its parent

[1.0.0.1] - 2022-08-14

  • Compat: support GHC 9.4.1

[1.0.0] - 2022-06-30

  • Breaking: Remove Context type, Ki.Implicit module, and the ability to soft-cancel a Scope.

  • Breaking: Remove Duration type and its associated API, including waitFor and awaitFor.

  • Breaking: Remove Ki.Internal module.

  • Breaking: Generalize async to forkTry.

  • Breaking: Generalize forkWithUnmask to forkWith.

  • Breaking: Make fork_ take an IO Void rather than an IO ().

  • Breaking: Make fork create an unmasked thread, rather than inherit the parent’s masking state.

  • Breaking: Rename waitSTM to awaitAll (replacing the old wait in IO).

  • Change: Make scoped kill threads in the order they were created.

  • Bugfix: Fix small memory leak related to closing a scope.

  • Bugfix: Fix subtle bug related to GHC’s treatment of deadlocked threads.

  • Bugfix: make async (now forkTry) propagate async exceptions.

  • Bugfix: make scoped safe to run with asynchronous exceptions masked.

  • Bugfix: propagate exceptions to creator of scope, not creator of thread

  • Performance: Use atomic fetch-and-add rather than a TVar to track internal child thread ids.

[0.2.0] - 2020-12-17

  • Breaking: Remove ThreadFailed exception wrapper.
  • Breaking: Rename cancelScope to cancel.

[0.1.0.1] - 2020-11-30

  • Misc: Replace AtomicCounter with Int to drop the atomic-primops dependency.

  • Bounds: Lower cabal-version from 3.0 to 2.2 because stack cannot parse 3.0.

[0.1.0] - 2020-11-11

  • Initial release.