hubFS: THE place for F#

. . . are you on The Hub?
Welcome to hubFS: THE place for F# Sign in | Join | Help
in Search

Hell Is Other Languages

Concurrency on a single thread

Introduction

F# has the async computation expression for writing parallel programs. Async achieves concurrency by using the CLR ThreadPool to queue work items for each logical thread created in an async expression (e.g., through Async.spawn or Async.parallel). Using the thread pool is good because it reuses existing threads instead of creating and destroying new threads. However the thread pool has limitations when a large number of threads are created that may result in deadlock (if queued work items don't execute because of blocked threads), increased memory usage, and poorer performance due to excessive context switching.

Suppose we want to write a parallel application that creates a large number of concurrently executing threads. Threads are good because they present a sequential control flow model that is easy to understand and reason about, and having a large number of threads facilitates better resource utilization. To do this we would like to implement our own scalable concurrent threading system for our application that uses virtual threads not backed by the CLR thread pool or OS threads. To enable scalability, the creation of virtual threads in the system will be cheap (about the cost of a newobj instruction) and not consume OS-level resources (and similarly for synchronization primitives also).

The implementation of this system will be based on the standard F# async: it will be a computation expression based on the continuation monad and the users' code will look similar and be functionally the same. However operationally it will be different from the standard async because it will use a fixed number of threads instead of the thread pool. (For version 1 in this post it will use only 1 thread, but a version 2 might extend that to 1 thread per CPU.)

You can download the F# code for this post here.

Scheduler

At the core of this new async monad is the thread scheduler. The scheduler makes multiple threads of execution and multiplexes them together to give the illusion of concurrency. The scheduler is entirely managed code and uses continuations to manipulate control flow. The state of the scheduler consists of three parts:

type Scheduler () =
  val ready : LinkedList<unit -> unit>;
  val pending : int ref

The ready queue is a list of continuations that represent threads ready to run. It is a LinkedList from the BCL to allow fast insertion/removal from the beginning and end of the list. The basic operation of the one real OS thread in the system is this loop below.

let rec loop () =
  if ready.Count > 0 then
    let k = ready.Dequeue ()
    k ()
    loop ()

It checks to see if the ready queue is non-empty. If so, it removes the first continuation, executes it (which may modify the ready queue) and repeats. However asynchronous I/O operations complicate this simple algorithm.

Asynchronous I/O

An I/O operation (like reading or writing to a disk or network) can take a long time and during this time the CPU is idle. If we performed a blocking I/O operation on our one real thread then all virtual threads would be suspended – not good. Therefore it is imperative that the programmer using this async monad never uses blocking operations and always uses non-blocking I/O. Win32 supports non-blocking I/O called 'overlapped' I/O. The basic principle is that a thread makes a system call to start an I/O operation. The system call returns immediately before the operation is completed. The operation is performed by the OS and not on the process's thread. When the operation is complete, a callback is executed on one of the threads in the process. The key point here is that the non-blocking I/O operation didn't create or consume an OS thread, thereby by using this we can stick to our one-thread budget. Non-blocking I/O is exposed in the BCL through Begin/End methods on various I/O objects (FileStream, etc.) and on Windows these methods are probably implemented using Win32 overlapped I/O.

The pending field in the state of the scheduler contains the number of logical threads currently in an asynchronous I/O operation. So when ready.Count = 0 and !pending = 0 the computation has terminated.

The Monad

So let's get started looking at the code. Additionally this will also serve as a guide to writing (not-necessarily-concurrency-related) computation expression builders in F#. The monad used in this code is based on the papers by Classen and Li and Zdancewic.

The type of the monad used here is

type Async<'a> = Async of (Scheduler * ('a -> unit) * (exn -> unit) -> unit)

In general all F# monad types need to be function types to achieve the delayed evaluation implicit in a computation expression block. Furthermore you'd typically want to wrap this type in a discriminated union tag to aid type inference. Hence making the general form of a monad type

type M<'a> = M of (… -> …)

In the Async type the left of the function arrow takes three arguments: the state of the scheduler of type Scheduler, a success continuation of type ('a -> unit) and an exception continuation of type (exn -> unit).

The Scheduler type was mentioned above. The Scheduler state supports the following operations.

member QueueFirst : (unit -> unit) -> Lock<unit>
member QueueLast : (unit -> unit) -> Lock<unit>
member Dequeue : unit -> Lock<(unit -> unit) option>
member IncrPending : unit -> Lock<unit>
member DecrPending : unit -> Lock<unit>
member Run : Lock<'a> -> 'a

Since the Scheduler state will be accessed by two threads: the single application thread and the CLR IO completion thread, access to the data structure needs to be synchronized. This is done using the lock monad from my previous blog post.

Return & Bind

We now implement the standard return and bind operations for a continuation monad.

let mreturn x = Async (fun (sched, sk, ek) -> sk x)

let apply (Async m) (sched, sk, ek) = m (sched, sk, ek)

let bind m f =
  Async (fun (sched, sk, ek) ->
    apply m (sched, (fun x -> apply (f x) (sched, sk, ek)), ek))

The implementation of return is fine; however there is a problem with the implementation of bind because it does not take into account the possibility of exceptions. If the function f throws and exception then this must be caught somewhere and execution diverted to the exception continuation.

type Result<'a> = Success of 'a | Error of exn

let tryApply f x = try Success (f x) with e -> Error e

let apply2 f x (sched, sk, ek) =
  match tryApply f x with
    | Success m -> apply m (sched, sk, ek)
    | Error e -> ek e

let bind m f =
  Async (fun (sched, sk, ek) ->
    apply m (sched, (fun x -> apply2 f x (sched, sk, ek)), ek))

The tryApply function will always be used whenever applying an "outside" function (a function defined outside the async library) so that any exceptions thrown by this function can be reified as values and control flow handled appropriately.

Mzero & Mplus

Two primitive operations for the async monad are stop and fork. The stop operation terminates the current thread. The fork operation duplicates the current thread and returns different values on each. For example, if there is a thread executing the expression "bind (fork m1 m2) f" then after the evaluation of fork there will be two threads executing: one executing the expression "bind m1 f" and the other executing "bind m2 f". Fork is the primitive method for creating new threads in the async monad. The implementation for stop and fork are as follows.

let stop =
  Async (fun (sched, sk, ek) -> ())

let fork m1 m2 =
  Async (fun (sched, sk, ek) ->
    sched.QueueLast (fun () -> apply m2 (sched, sk, ek)) |> sched.Run
    apply m1 (sched, sk, ek))

Unfortunately calling stop stops the current thread without running any finally handlers that are in scope. Ideally there should be a third continuation (called the finally continuation) that is executed by stop, but we'll ignore this issue for now. The fork operation works by queuing the second branch of the fork at the end of the scheduler's ready queue and executing the first branch immediately. Stop and fork form the mzero and mplus operations of a MonadPlus type class instance for the async monad.

Yield

The yield operation forces the thread to give up its execution and re-queues itself at the end of the ready queue.

let myield =
  Async (fun (sched, sk, ek) ->
     sched.QueueLast sk |> sched.Run)

Callback

The callback operation is used to invoke asynchronous IO operations. In the CLR base class library, asynchronous IO operations are defined by two methods: a Begin method and an End method (e.g. for reading from a stream BeginRead and EndRead). These two methods are passed to the callback function below. This function first increments the schedulers pending count to mark the start of a new asynchronous IO operation. It then calls the Begin method passing the function cb as the callback. If the Begin method was successful then this thread yields, otherwise the pending count is decremented and the exception continuation invoked. When the system has performed the IO operation it calls cb on some special thread (not the scheduler's thread). Therefore cb cannot continue execution of the continuation monad; instead it must marshal a continuation back onto the scheduler's ready queue. So it first calls the End method to get the result of the IO operation. If the IO operation failed for some reason then can exception will be thrown in this call. Depending on the success of End, either the success or exception continuation is chosen and queued at the front of the ready queue. The pending count is decremented and the lock is pulsed to wake up the scheduler thread which may be blocking waiting for new continuations to execute.

val callback : (AsyncCallback -> IAsyncResult) -> (IAsyncResult -> 'a) -> Async<'a>
let callback (beginFunc : _ -> IAsyncResult) endFunc =
  Async (fun (sched, sk, ek) ->
    let cb (ar : IAsyncResult) =
      let k =
        match tryApply endFunc ar with
        | Success x -> fun () -> sk x
        | Error e -> fun () -> ek e
      lock { do! sched.QueueFirst k
             do! sched.DecrPending ()
             return! Lock.pulseAll } |> sched.Run
    sched.IncrPending () |> sched.Run
    match tryApply beginFunc (AsyncCallback(cb)) with
      | Success ar -> ()         
      | Error e -> sched.DecrPending () |> sched.Run
                   ek e)

Run

Finally, we need to run function to execute the async monad. The run function creates a new scheduler and a place to store results. When executing, an async computation may create multiple threads that terminate successfully (viz. without calling stop) and thereby producing multiple results (and exceptions). Therefore the result of executing an async computation is a list of Result values. An initial continuation is added to the ready queue that runs the target monad with success and exception continuations that add respectively to the result list. Continuations are then dequeued and run iteratively until there is no more work to do.

let run m =
  let sched = new Scheduler()
  let result = ref []
  let init () = apply m (sched, (fun x -> result := Success x :: !result), (fun e -> result := Error e :: !result))
  sched.QueueFirst init |> sched.Run
  let f () = sched.Dequeue () |> sched.Run
  for k in Seq.unfold0 f do k ()
  !result

Many applications however will expect a single result from an async computation, so the runOne function is also created to only return the first result.

let runOne m =
  match run m with
    | [] -> failwith "no value"
    | Success x::_ -> x
    | Error e::_ -> raise e

Synchronization

There needs to be some way for threads within an async computation to synchronize with each other. We can't use the CLR's synchronization primitives because they will block the scheduler thread; therefore we need to create our own. So we will create a manual reset event (and leave an auto reset event as an exercise for the reader).

A manual reset event will be represented as an optional list of continuations. A value of None means that the event is in the signalled state. A value of Some ws means that the event is in the non-signalled state and the list ws contains the continuations waiting for the event to become signalled. Hence a value of Some [] means that the event is not signalled but no one is waiting on it. The type of a manual reset event is

type MRE = MRE of (unit -> unit) list option ref

To create a new MRE in the specified state

let newMRE state = if state then MRE (ref None) else MRE (ref (Some []))

Waiting on a MRE causes its state to be analysed. Access to the MRE's internal mutable state does not need to be synchronized because there is only one real thread in the system. If it is signalled then the success continuation can be invoke immediately. If it is not signalled then the success continuation is added to the wait list and the thread yields.

let waitMRE (MRE mre) =
  Async (fun (sched, sk, ek) ->
    match !mre with
      | None -> sk ()
      | Some wait -> mre := Some (sk :: wait))

When setting an MRE, its state is again analysed. If it is already signalled then do nothing. Otherwise set the state to signalled and add each of the continuations in the wait list to the ready queue. Finally, regardless of the initial state, the success continuations in executed since setting an MRE is a non-blocking operation.

let setMRE (MRE mre) =
  Async (fun (sched, sk, ek) ->
    match !mre with
      | None -> ()
      | Some wait -> mre := None
                     Lock.iter (sched.QueueFirst) wait |> sched.Run
    sk ())

As well as auto reset events, other synchronization primitives can also be defined, such as MVars.

Creating threads

In many applications fork is not a particularly intuitive operation. A spawn operation that takes an async computation executes it on a new thread and then ends that thread is more intuitive in many scenarios. We can write a spawn operation in terms of the primitives we have already defined.

let spawn m =
  async { let join = newMRE false
          return! fork (mreturn join)
                       (async { do! tryWith m (fun _ -> mreturn ())
                                do! setMRE join
                                return! stop }) }

spawn starts by creating a new MRE that will be used signal the end of the spawned thread. It then forks. To one thread it returns the MRE so the caller can synchronize on the completion of the spawned computation. The other thread executes the target computation expression, ignoring any exceptions raised, signals the MRE and terminates.

One of the most useful async operations is parallel which takes a list of computations, executes them all in parallel and returns a list of their results.

let parallel ms =
  async { let results = Array.zero_create (List.length ms)
          let task = fun i m ->
            async { let! x = m
                    return results.[ i ] <- x } |> spawn
          let! joins = mapi task ms
          do! iter waitMRE joins
          return results |> Array.to_list }

parallel first finds the length of the list and allocates an array to hold all the results. It then creates a task for each element in the list that executes the async computation and stores its result in the corresponding position in the results array. Spawning all these tasks produces a list of MREs which are used to wait for all the tasks to complete. When all the MREs have been signalled the function returns the list of results.

Another useful control flow operation is first that evaluates a target computation (which may fork) but continues only with the first thread that completes. It creates a new ARE in the signalled state. The first thread that completes the computation will reset the ARE and return the result. Other threads will see the non-signalled ARE and terminate.

let first m =
  async { let once = newARE true
          let! x = m
          let! b = tryWaitARE once
          if b then return x
          else return! stop }

Conclusion

This blog post is already long enough so I'll stop it here before covering applications. In the code download you can find examples of a web crawler, image resizer and the concurrency example from this paper by Haller and Odersky. Two natural extensions to the system presented here are making the scheduler (1) multi-threaded and (2) distributed. Using .NET binary serialization it should be possible to serialize the continuations as delegates and send them to a different process/host. Additionally the STM monad could be built-in under the continuation monad as the primitive means for blocking, synchronization and inter-process communication.

Published Sunday, August 03, 2008 8:40 PM by gneverov
Attachment(s): async.zip

Comments

 

Jason Haley said:

August 3, 2008 8:45 AM
 

Dew Drop - August 13, 2008 | Alvin Ashcraft's Morning Dew said:

August 13, 2008 6:31 AM
 

Recent Links Tagged With "continuations" - JabberTags said:

September 28, 2008 5:34 PM
 

F# and Functional Programming Resources « Angel “Java” Lopez on Blog said:

October 9, 2008 3:15 AM
 

Angel "Java" Lopez said:

F# es un lenguaje funcional, creado por Microsoft. Implementado bajo el soporte de .NET CLR, es un lenguaje
October 11, 2008 8:54 AM
 

Recursos de F# y Programaci??n Funcional | Buanzolandia said:

October 11, 2008 5:11 PM
 

User links about "linkedlist" on iLinkShare said:

March 5, 2009 7:03 AM
 

Websites tagged "attachment" on Postsaver said:

August 14, 2009 8:01 AM
 

Concurrency on a single thread – 22th Edition | ArcadePortal said:

September 1, 2009 7:28 PM
Anonymous comments are disabled

This Blog

Post Calendar

<August 2008>
SuMoTuWeThFrSa
272829303112
3456789
10111213141516
17181920212223
24252627282930
31123456

Syndication

Powered by Community Server, by Telligent Systems