BSD-3-Clause licensed by Alexey Kuleshevich
Maintained by [email protected]
This version can be pinned in stack with:scheduler-2.0.0.1@sha256:3e36c5a50536d20f03320defd64e3d3adb6309b471ae048efda7df9d54361308,2556

Module documentation for 2.0.0.1

scheduler

This is a work stealing scheduler, which is very useful for tasks parallelization.

Whenever you have many actions you’d like to perform in parallel, but would only like to use a few threads to do the actual computation, this package is for you.

Language Github Actions Coveralls Gitter.im
GitHub top language Build Status Coverage Status Join the chat at https://gitter.im/haskell-massiv/Lobby
Gihub Hackage Nightly LTS
scheduler Hackage Nightly Nightly

QuickStart

A few examples in order to get up and running quickly.

Schedule simple actions

Work scheduling that does some side effecty stuff and discards the results:

interleaveFooBar :: IO ()
interleaveFooBar = do
  withScheduler_ (ParN 2) $ \ scheduler -> do
    putStrLn "Scheduling 1st job"
    scheduleWork scheduler (putStr "foo")
    putStrLn "Scheduling 2nd job"
    scheduleWork scheduler (putStr "bar")
    putStrLn "Awaiting for jobs to be executed:"
  putStrLn "\nDone"

In the example above two workers will be created to handle the only two jobs that have been scheduled. Printing with putStr is not thread safe, so the output that you would get with above function is likely to be interleaved:

λ> interleaveFooBar
Scheduling 1st job
Scheduling 2nd job
Awaiting for jobs to be executed:
foboar
Done

Important to note that only when inner action supplied to the withScheduler_ exits will the scheduler start executing scheduled jobs.

Keeping the results of computation

Another common scenario is to schedule some jobs that produce useful results. In the example below four works will be spawned off. Due to ParOn each of the workers will be pinned to a particular core.

scheduleSums :: IO [Int]
scheduleSums =
  withScheduler (ParOn [1..4]) $ \ scheduler -> do
    scheduleWork scheduler $ pure (10 + 1)
    scheduleWork scheduler $ pure (20 + 2)
    scheduleWork scheduler $ pure (30 + 3)
    scheduleWork scheduler $ pure (40 + 4)
    scheduleWork scheduler $ pure (50 + 5)

Despite that the fact that sums are computed in parallel, the results of computation will appear in the same order they’ve been scheduled:

λ> scheduleSums
[11,22,33,44,55]

Identify workers doing the work

Since version scheduler-1.2.0 it is possible to identify which worker is doing the job. This is especially useful for limiting resources to particular workers that should not be shared between separate threads.

λ> let scheduleId = (`scheduleWorkId` (\ i -> threadDelay 100000 >> pure i))
λ> withScheduler (ParOn [4,7,5]) $ \s -> scheduleId s >> scheduleId s >> scheduleId s
[WorkerId {getWorkerId = 0},WorkerId {getWorkerId = 1},WorkerId {getWorkerId = 2}]
λ> withScheduler (ParN 3) $ \s -> scheduleId s >> scheduleId s >> scheduleId s
[WorkerId {getWorkerId = 1},WorkerId {getWorkerId = 2},WorkerId {getWorkerId = 0}]
λ> withScheduler (ParN 3) $ \s -> scheduleId s >> scheduleId s >> scheduleId s
[WorkerId {getWorkerId = 0},WorkerId {getWorkerId = 1},WorkerId {getWorkerId = 2}]

Exceptions

Whenever any of the scheduled jobs result in an exception, all of the workers will be killed and the exception will get re-thrown in the scheduling thread:

infiniteJobs :: IO ()
infiniteJobs = do
  withScheduler_ (ParN 5) $ \ scheduler -> do
    scheduleWork scheduler $ putStrLn $ repeat 'a'
    scheduleWork scheduler $ putStrLn $ repeat 'b'
    scheduleWork scheduler $ putStrLn $ repeat 'c'
    scheduleWork scheduler $ pure (4 `div` (0 :: Int))
    scheduleWork scheduler $ putStrLn $ repeat 'd'
  putStrLn "\nDone"

Note, that if there was no exception, printing would never stop.

λ> infiniteJobs
aaaaaaaaabcdd*** Exception: divide by zero

Nested jobs

Scheduling actions can themselves schedule actions indefinitely. That of course means that order of results produced is no longer deterministic, which is to be expected.

nestedJobs :: IO ()
nestedJobs = do
  withScheduler_ (ParN 5) $ \ scheduler -> do
    scheduleWork scheduler $ putStr $ replicate 10 'a'
    scheduleWork scheduler $ do
      putStr $ replicate 10 'b'
      scheduleWork scheduler $ do
        putStr $ replicate 10 'c'
        scheduleWork scheduler $ putStr $ replicate 10 'e'
      scheduleWork scheduler $ putStr $ replicate 10 'd'
    scheduleWork scheduler $ putStr $ replicate 10 'f'
  putStrLn "\nDone"

The order in which characters appear is important, since it directly relates to the actual order in which jobs are being scheduled and executed:

  • c, d and e characters will always appear after b
  • e will always appear after c
λ> nestedJobs
abbafbafbafbafbafbafbafbafbaffcdcdcdcdcdcdcdcdcdcdeeeeeeeeee
Done

Nested parallelism

Nothing really prevents you from having a scheduler within a scheduler. Of course, having multiple schedulers at the same time seems like an unnecessary overhead, which it is, but if you do have a use case for it, don’t make me stop you, it is OK to go that route.

nestedSchedulers :: IO ()
nestedSchedulers = do
  withScheduler_ (ParN 2) $ \ outerScheduler -> do
    scheduleWork outerScheduler $ putStr $ replicate 10 'a'
    scheduleWork outerScheduler $ do
      putStr $ replicate 10 'b'
      withScheduler_ (ParN 2) $ \ innerScheduler -> do
        scheduleWork innerScheduler $ do
          putStr $ replicate 10 'c'
          scheduleWork outerScheduler $ putStr $ replicate 10 'e'
        scheduleWork innerScheduler $ putStr $ replicate 10 'd'
    scheduleWork outerScheduler $ putStr $ replicate 10 'f'
  putStrLn "\nDone"

Note that the inner scheduler’s job schedules a job for the outer scheduler, which is a bit crazy, but totally safe.

λ> nestedSchedulers
aabababababababababbffffffffffcccccccdcdcdcdddededededeeeeee
Done

Single worker schedulers

If we only have one worker, than everything becomes sequential and deterministic. Consider the same example from before, but with Seq computation strategy.

nestedSequentialSchedulers :: IO ()
nestedSequentialSchedulers = do
  withScheduler_ Seq $ \ outerScheduler -> do
    scheduleWork outerScheduler $ putStr $ replicate 10 'a'
    scheduleWork outerScheduler $ do
      putStr $ replicate 10 'b'
      withScheduler_ Seq $ \ innerScheduler -> do
        scheduleWork innerScheduler $ do
          putStr $ replicate 10 'c'
          scheduleWork outerScheduler $ putStr $ replicate 10 'e'
        scheduleWork innerScheduler $ putStr $ replicate 10 'd'
    scheduleWork outerScheduler $ putStr $ replicate 10 'f'
  putStrLn "\nDone"

No more interleaving, everything is done in the same order each time the function is invoked.

λ> nestedSchedulers
aaaaaaaaaabbbbbbbbbbccccccccccddddddddddffffffffffeeeeeeeeee
Done

Premature termination

It is possible to make all of the workers stop whatever they are doing and get either their progress thus far or simply return an element we were looking for.

For example, we would like to find the 10th letter in the English alphabet in parallel using 8 threads. The way we do it is we schedule 26 tasks, and the first one that will find the letter with such index will terminate all the workers and return the result:

λ> let f sch i c = scheduleWork sch $ if i == 10 then putChar '-' >> terminateWith sch c else threadDelay 1000 >> putChar c >> pure c
λ> withScheduler (ParN 8) (\ scheduler -> zipWithM (f scheduler) [1 :: Int ..] ['a'..'z'])
ab-dec"j"

Benchmarks

It is always good to see some benchmarks. Below is a very simple comparison of:

Similar functions are used for replicateM functionality from the corresponding libraries.

Benchmarked functions sum (sum of elements in a list) and fib (fibonacci number) are pretty straightforward, we simply replicateM them many times or mapM same functions over a list. Although scheduler is already pretty good at it, it does look like there might be some room for improvement. pooled* functions from unliftio have the best performance, and I don’t think they can be made any faster for such workloads, but they do not allow submitting more work during computation, as such nested parallelism while reusing the same workers is impossible, thus functionally they are inferior.

Benchmarks can be reproduced with stack bench inside this repo.

scheduler-1.1.0: benchmarks
Running 1 benchmarks...
Benchmark scheduler: RUNNING...
benchmarking replicate/Fib(1000/10000)/scheduler/replicateConcurrently
time                 10.16 ms   (9.235 ms .. 11.09 ms)
                     0.954 R²   (0.899 R² .. 0.982 R²)
mean                 10.42 ms   (9.965 ms .. 11.26 ms)
std dev              1.732 ms   (982.5 μs .. 2.587 ms)
variance introduced by outliers: 76% (severely inflated)

benchmarking replicate/Fib(1000/10000)/unliftio/pooledReplicateConcurrently
time                 8.476 ms   (8.034 ms .. 8.867 ms)
                     0.986 R²   (0.972 R² .. 0.994 R²)
mean                 8.671 ms   (8.410 ms .. 9.090 ms)
std dev              909.4 μs   (634.5 μs .. 1.279 ms)
variance introduced by outliers: 58% (severely inflated)

benchmarking replicate/Fib(1000/10000)/streamly/replicateM
time                 11.90 ms   (11.18 ms .. 12.46 ms)
                     0.976 R²   (0.935 R² .. 0.994 R²)
mean                 12.72 ms   (12.24 ms .. 14.27 ms)
std dev              1.897 ms   (625.2 μs .. 3.881 ms)
variance introduced by outliers: 69% (severely inflated)

benchmarking replicate/Fib(1000/10000)/async/replicateConcurrently
time                 21.16 ms   (19.59 ms .. 22.38 ms)
                     0.982 R²   (0.953 R² .. 0.995 R²)
mean                 22.46 ms   (21.27 ms .. 24.73 ms)
std dev              3.644 ms   (1.499 ms .. 5.373 ms)
variance introduced by outliers: 69% (severely inflated)

benchmarking replicate/Fib(1000/10000)/monad-par/replicateM
time                 31.78 ms   (30.64 ms .. 32.87 ms)
                     0.994 R²   (0.986 R² .. 0.998 R²)
mean                 32.59 ms   (31.86 ms .. 33.98 ms)
std dev              2.290 ms   (1.296 ms .. 3.640 ms)
variance introduced by outliers: 28% (moderately inflated)

benchmarking replicate/Fib(1000/10000)/base/replicateM
time                 31.35 ms   (30.91 ms .. 31.71 ms)
                     0.999 R²   (0.998 R² .. 1.000 R²)
mean                 31.56 ms   (31.06 ms .. 32.64 ms)
std dev              1.440 ms   (674.1 μs .. 2.516 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking replicate/Sum(1000/1000)/scheduler/replicateConcurrently
time                 17.10 ms   (16.25 ms .. 17.79 ms)
                     0.984 R²   (0.960 R² .. 0.995 R²)
mean                 17.12 ms   (16.57 ms .. 17.97 ms)
std dev              1.690 ms   (858.3 μs .. 2.444 ms)
variance introduced by outliers: 46% (moderately inflated)

benchmarking replicate/Sum(1000/1000)/unliftio/pooledReplicateConcurrently
time                 15.91 ms   (15.74 ms .. 16.06 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 15.97 ms   (15.87 ms .. 16.15 ms)
std dev              301.2 μs   (198.0 μs .. 421.2 μs)

benchmarking replicate/Sum(1000/1000)/streamly/replicateM
time                 17.51 ms   (17.06 ms .. 17.96 ms)
                     0.997 R²   (0.995 R² .. 0.999 R²)
mean                 17.99 ms   (17.74 ms .. 18.64 ms)
std dev              865.5 μs   (599.3 μs .. 1.295 ms)
variance introduced by outliers: 17% (moderately inflated)

benchmarking replicate/Sum(1000/1000)/async/replicateConcurrently
time                 28.82 ms   (26.60 ms .. 31.82 ms)
                     0.972 R²   (0.948 R² .. 0.987 R²)
mean                 33.20 ms   (31.69 ms .. 34.13 ms)
std dev              2.548 ms   (1.706 ms .. 3.197 ms)
variance introduced by outliers: 28% (moderately inflated)

benchmarking replicate/Sum(1000/1000)/monad-par/replicateM
time                 58.84 ms   (55.71 ms .. 61.47 ms)
                     0.995 R²   (0.991 R² .. 0.999 R²)
mean                 64.34 ms   (62.70 ms .. 65.83 ms)
std dev              3.150 ms   (2.697 ms .. 3.721 ms)
variance introduced by outliers: 15% (moderately inflated)

benchmarking replicate/Sum(1000/1000)/base/replicateM
time                 56.26 ms   (55.82 ms .. 56.74 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 56.70 ms   (56.47 ms .. 57.21 ms)
std dev              618.6 μs   (331.6 μs .. 969.4 μs)

benchmarking map/Fib(2000)/scheduler/traverseConcurrently
time                 12.23 ms   (11.51 ms .. 12.90 ms)
                     0.968 R²   (0.933 R² .. 0.989 R²)
mean                 13.65 ms   (12.87 ms .. 14.50 ms)
std dev              1.985 ms   (1.455 ms .. 2.696 ms)
variance introduced by outliers: 68% (severely inflated)

benchmarking map/Fib(2000)/unliftio/pooledTraverseConcurrently
time                 6.843 ms   (6.435 ms .. 7.201 ms)
                     0.973 R²   (0.939 R² .. 0.992 R²)
mean                 7.223 ms   (6.946 ms .. 8.280 ms)
std dev              1.240 ms   (537.0 μs .. 2.535 ms)
variance introduced by outliers: 82% (severely inflated)

benchmarking map/Fib(2000)/streamly/mapM
time                 21.89 ms   (21.01 ms .. 23.13 ms)
                     0.991 R²   (0.985 R² .. 0.996 R²)
mean                 21.44 ms   (20.16 ms .. 24.26 ms)
std dev              4.518 ms   (1.773 ms .. 8.475 ms)
variance introduced by outliers: 80% (severely inflated)

benchmarking map/Fib(2000)/async/mapConcurrently
time                 37.37 ms   (32.21 ms .. 41.57 ms)
                     0.966 R²   (0.936 R² .. 0.993 R²)
mean                 41.32 ms   (38.94 ms .. 47.96 ms)
std dev              7.065 ms   (2.678 ms .. 12.36 ms)
variance introduced by outliers: 65% (severely inflated)

benchmarking map/Fib(2000)/par/mapM
time                 7.381 ms   (6.934 ms .. 7.848 ms)
                     0.975 R²   (0.953 R² .. 0.987 R²)
mean                 7.519 ms   (7.256 ms .. 8.123 ms)
std dev              1.178 ms   (677.4 μs .. 2.015 ms)
variance introduced by outliers: 78% (severely inflated)

benchmarking map/Fib(2000)/monad-par/mapM
time                 24.57 ms   (23.52 ms .. 25.30 ms)
                     0.994 R²   (0.986 R² .. 0.998 R²)
mean                 26.15 ms   (25.48 ms .. 27.60 ms)
std dev              2.016 ms   (1.267 ms .. 3.175 ms)
variance introduced by outliers: 30% (moderately inflated)

benchmarking map/Fib(2000)/base/mapM
time                 23.79 ms   (22.66 ms .. 24.75 ms)
                     0.993 R²   (0.987 R² .. 0.999 R²)
mean                 24.66 ms   (23.96 ms .. 26.84 ms)
std dev              2.532 ms   (891.4 μs .. 4.701 ms)
variance introduced by outliers: 46% (moderately inflated)

benchmarking map/Sum(2000)/scheduler/traverseConcurrently
time                 36.13 ms   (35.31 ms .. 36.92 ms)
                     0.998 R²   (0.996 R² .. 0.999 R²)
mean                 36.93 ms   (36.03 ms .. 38.43 ms)
std dev              2.456 ms   (1.045 ms .. 4.098 ms)
variance introduced by outliers: 24% (moderately inflated)

benchmarking map/Sum(2000)/unliftio/pooledTraverseConcurrently
time                 31.99 ms   (31.60 ms .. 32.39 ms)
                     0.998 R²   (0.993 R² .. 1.000 R²)
mean                 32.87 ms   (32.43 ms .. 33.90 ms)
std dev              1.454 ms   (913.4 μs .. 2.241 ms)
variance introduced by outliers: 12% (moderately inflated)

benchmarking map/Sum(2000)/streamly/mapM
time                 39.56 ms   (36.74 ms .. 43.24 ms)
                     0.988 R²   (0.977 R² .. 0.999 R²)
mean                 43.58 ms   (42.12 ms .. 44.80 ms)
std dev              2.595 ms   (1.621 ms .. 3.785 ms)
variance introduced by outliers: 20% (moderately inflated)

benchmarking map/Sum(2000)/async/mapConcurrently
time                 48.82 ms   (45.75 ms .. 51.32 ms)
                     0.992 R²   (0.983 R² .. 0.998 R²)
mean                 54.91 ms   (52.92 ms .. 57.00 ms)
std dev              3.800 ms   (2.823 ms .. 5.142 ms)
variance introduced by outliers: 22% (moderately inflated)

benchmarking map/Sum(2000)/par/mapM
time                 57.28 ms   (56.62 ms .. 58.04 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 55.69 ms   (54.85 ms .. 56.29 ms)
std dev              1.282 ms   (994.2 μs .. 1.599 ms)

benchmarking map/Sum(2000)/monad-par/mapM
time                 237.4 ms   (198.5 ms .. 264.7 ms)
                     0.988 R²   (0.966 R² .. 1.000 R²)
mean                 257.7 ms   (242.2 ms .. 263.0 ms)
std dev              11.57 ms   (384.2 μs .. 14.53 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking map/Sum(2000)/base/mapM
time                 147.1 ms   (141.4 ms .. 150.9 ms)
                     0.999 R²   (0.996 R² .. 1.000 R²)
mean                 152.6 ms   (150.1 ms .. 155.1 ms)
std dev              3.676 ms   (2.725 ms .. 5.066 ms)
variance introduced by outliers: 12% (moderately inflated)

Beware of Demons

Any sort of concurrency primitives such as mutual exclusion, semaphores, etc. can easily lead to deadlocks, starvation and other common problems. Try to avoid them and be careful if you do end up using them.

Changes

2.0.0

  • Switch type parameter of Scheduler from monad m to state token s. Which also means that constraints on many functions got tighter (i.e. MonadPrim, MonadPrimBase)
  • Remove m type parameter for SchedulerWS, since it can only work in IO like monad anyways.
  • Switch type parameter of Batch from monad m to state token s.
  • Swap order of arguments for replicateWork for consistency
  • Add replicateWork_ that is slightly more efficient than replicateWork

1.5.0

Despite that the major part of the version was bumped up, this release does not include any breaking changes, only improvements and additions.

  • Addition of a batch concept with BacthId which:

    • makes it possible for a Scheduler to be “resumable” with the help of waitForBatch, waitForBatch_ and waitForBatchR
    • allows to cancel batches prematurely while keeping the results and not terminating the scheduler itself. This can be done with new functions: cancelBatch and cancelBatchWith. In order to check for batch status getCurrentBatchId and hasBatchFinished functions can be used.
  • Addition of GlobalScheduler that can be reused throughout the codebase, thus reducing initialization overhead.

1.4.2

  • Add withTrivialScheduler
  • Add Results data type as well as corresponding functions:
    • withSchedulerR
    • withSchedulerWSR
    • withTrivialSchedulerR

1.4.1

  • Add functions: replicateWork

1.4.0

  • Worker id has been promoted from Int to a newtype wrapper WorkerId.
  • Addition of SchedulerWS and WorkerStates data types. As well as the related MutexException
  • Functions that came along with stateful worker threads:
    • initWorkerStates
    • workerStatesComp
    • scheduleWorkState
    • scheduleWorkState_
    • withSchedulerWS
    • withSchedulerWS_
    • unwrapSchedulerWS
  • Made internal modules accessible, but invisible.

1.3.0

  • Make sure internal Scheduler accessor functions are no longer exported, they only cause breakage.
  • Make sure number of capabilities does not change through out the program execution, as far as scheduler is concerned.

1.2.0

  • Addition of scheduleWorkId and scheduleWorkId_

1.1.0

  • Add functions: replicateConcurrently and replicateConcurrently_
  • Made traverseConcurrently_ lazy, thus making it possible to apply to infinite lists and other such foldables.
  • Fix Monoid instance for Comp
  • Addition of Par' pattern

1.0.0

Initial release.