hubFS: THE place for F#

. . . are you on The Hub?
Welcome to hubFS: THE place for F# Sign in | Join | Help
in Search

Hell Is Other Languages

  • Concurrency on a single thread

    Introduction

    F# has the async computation expression for writing parallel programs. Async achieves concurrency by using the CLR ThreadPool to queue work items for each logical thread created in an async expression (e.g., through Async.spawn or Async.parallel). Using the thread pool is good because it reuses existing threads instead of creating and destroying new threads. However the thread pool has limitations when a large number of threads are created that may result in deadlock (if queued work items don't execute because of blocked threads), increased memory usage, and poorer performance due to excessive context switching.

    Suppose we want to write a parallel application that creates a large number of concurrently executing threads. Threads are good because they present a sequential control flow model that is easy to understand and reason about, and having a large number of threads facilitates better resource utilization. To do this we would like to implement our own scalable concurrent threading system for our application that uses virtual threads not backed by the CLR thread pool or OS threads. To enable scalability, the creation of virtual threads in the system will be cheap (about the cost of a newobj instruction) and not consume OS-level resources (and similarly for synchronization primitives also).

    The implementation of this system will be based on the standard F# async: it will be a computation expression based on the continuation monad and the users' code will look similar and be functionally the same. However operationally it will be different from the standard async because it will use a fixed number of threads instead of the thread pool. (For version 1 in this post it will use only 1 thread, but a version 2 might extend that to 1 thread per CPU.)

    You can download the F# code for this post here.

    Scheduler

    At the core of this new async monad is the thread scheduler. The scheduler makes multiple threads of execution and multiplexes them together to give the illusion of concurrency. The scheduler is entirely managed code and uses continuations to manipulate control flow. The state of the scheduler consists of three parts:

    type Scheduler () =
      val ready : LinkedList<unit -> unit>;
      val pending : int ref

    The ready queue is a list of continuations that represent threads ready to run. It is a LinkedList from the BCL to allow fast insertion/removal from the beginning and end of the list. The basic operation of the one real OS thread in the system is this loop below.

    let rec loop () =
      if ready.Count > 0 then
        let k = ready.Dequeue ()
        k ()
        loop ()

    It checks to see if the ready queue is non-empty. If so, it removes the first continuation, executes it (which may modify the ready queue) and repeats. However asynchronous I/O operations complicate this simple algorithm.

    Asynchronous I/O

    An I/O operation (like reading or writing to a disk or network) can take a long time and during this time the CPU is idle. If we performed a blocking I/O operation on our one real thread then all virtual threads would be suspended – not good. Therefore it is imperative that the programmer using this async monad never uses blocking operations and always uses non-blocking I/O. Win32 supports non-blocking I/O called 'overlapped' I/O. The basic principle is that a thread makes a system call to start an I/O operation. The system call returns immediately before the operation is completed. The operation is performed by the OS and not on the process's thread. When the operation is complete, a callback is executed on one of the threads in the process. The key point here is that the non-blocking I/O operation didn't create or consume an OS thread, thereby by using this we can stick to our one-thread budget. Non-blocking I/O is exposed in the BCL through Begin/End methods on various I/O objects (FileStream, etc.) and on Windows these methods are probably implemented using Win32 overlapped I/O.

    The pending field in the state of the scheduler contains the number of logical threads currently in an asynchronous I/O operation. So when ready.Count = 0 and !pending = 0 the computation has terminated.

    The Monad

    So let's get started looking at the code. Additionally this will also serve as a guide to writing (not-necessarily-concurrency-related) computation expression builders in F#. The monad used in this code is based on the papers by Classen and Li and Zdancewic.

    The type of the monad used here is

    type Async<'a> = Async of (Scheduler * ('a -> unit) * (exn -> unit) -> unit)

    In general all F# monad types need to be function types to achieve the delayed evaluation implicit in a computation expression block. Furthermore you'd typically want to wrap this type in a discriminated union tag to aid type inference. Hence making the general form of a monad type

    type M<'a> = M of (… -> …)

    In the Async type the left of the function arrow takes three arguments: the state of the scheduler of type Scheduler, a success continuation of type ('a -> unit) and an exception continuation of type (exn -> unit).

    The Scheduler type was mentioned above. The Scheduler state supports the following operations.

    member QueueFirst : (unit -> unit) -> Lock<unit>
    member QueueLast : (unit -> unit) -> Lock<unit>
    member Dequeue : unit -> Lock<(unit -> unit) option>
    member IncrPending : unit -> Lock<unit>
    member DecrPending : unit -> Lock<unit>
    member Run : Lock<'a> -> 'a

    Since the Scheduler state will be accessed by two threads: the single application thread and the CLR IO completion thread, access to the data structure needs to be synchronized. This is done using the lock monad from my previous blog post.

    Return & Bind

    We now implement the standard return and bind operations for a continuation monad.

    let mreturn x = Async (fun (sched, sk, ek) -> sk x)

    let apply (Async m) (sched, sk, ek) = m (sched, sk, ek)

    let bind m f =
      Async (fun (sched, sk, ek) ->
        apply m (sched, (fun x -> apply (f x) (sched, sk, ek)), ek))

    The implementation of return is fine; however there is a problem with the implementation of bind because it does not take into account the possibility of exceptions. If the function f throws and exception then this must be caught somewhere and execution diverted to the exception continuation.

    type Result<'a> = Success of 'a | Error of exn

    let tryApply f x = try Success (f x) with e -> Error e

    let apply2 f x (sched, sk, ek) =
      match tryApply f x with
        | Success m -> apply m (sched, sk, ek)
        | Error e -> ek e

    let bind m f =
      Async (fun (sched, sk, ek) ->
        apply m (sched, (fun x -> apply2 f x (sched, sk, ek)), ek))

    The tryApply function will always be used whenever applying an "outside" function (a function defined outside the async library) so that any exceptions thrown by this function can be reified as values and control flow handled appropriately.

    Mzero & Mplus

    Two primitive operations for the async monad are stop and fork. The stop operation terminates the current thread. The fork operation duplicates the current thread and returns different values on each. For example, if there is a thread executing the expression "bind (fork m1 m2) f" then after the evaluation of fork there will be two threads executing: one executing the expression "bind m1 f" and the other executing "bind m2 f". Fork is the primitive method for creating new threads in the async monad. The implementation for stop and fork are as follows.

    let stop =
      Async (fun (sched, sk, ek) -> ())

    let fork m1 m2 =
      Async (fun (sched, sk, ek) ->
        sched.QueueLast (fun () -> apply m2 (sched, sk, ek)) |> sched.Run
        apply m1 (sched, sk, ek))

    Unfortunately calling stop stops the current thread without running any finally handlers that are in scope. Ideally there should be a third continuation (called the finally continuation) that is executed by stop, but we'll ignore this issue for now. The fork operation works by queuing the second branch of the fork at the end of the scheduler's ready queue and executing the first branch immediately. Stop and fork form the mzero and mplus operations of a MonadPlus type class instance for the async monad.

    Yield

    The yield operation forces the thread to give up its execution and re-queues itself at the end of the ready queue.

    let myield =
      Async (fun (sched, sk, ek) ->
         sched.QueueLast sk |> sched.Run)

    Callback

    The callback operation is used to invoke asynchronous IO operations. In the CLR base class library, asynchronous IO operations are defined by two methods: a Begin method and an End method (e.g. for reading from a stream BeginRead and EndRead). These two methods are passed to the callback function below. This function first increments the schedulers pending count to mark the start of a new asynchronous IO operation. It then calls the Begin method passing the function cb as the callback. If the Begin method was successful then this thread yields, otherwise the pending count is decremented and the exception continuation invoked. When the system has performed the IO operation it calls cb on some special thread (not the scheduler's thread). Therefore cb cannot continue execution of the continuation monad; instead it must marshal a continuation back onto the scheduler's ready queue. So it first calls the End method to get the result of the IO operation. If the IO operation failed for some reason then can exception will be thrown in this call. Depending on the success of End, either the success or exception continuation is chosen and queued at the front of the ready queue. The pending count is decremented and the lock is pulsed to wake up the scheduler thread which may be blocking waiting for new continuations to execute.

    val callback : (AsyncCallback -> IAsyncResult) -> (IAsyncResult -> 'a) -> Async<'a>
    let callback (beginFunc : _ -> IAsyncResult) endFunc =
      Async (fun (sched, sk, ek) ->
        let cb (ar : IAsyncResult) =
          let k =
            match tryApply endFunc ar with
            | Success x -> fun () -> sk x
            | Error e -> fun () -> ek e
          lock { do! sched.QueueFirst k
                 do! sched.DecrPending ()
                 return! Lock.pulseAll } |> sched.Run
        sched.IncrPending () |> sched.Run
        match tryApply beginFunc (AsyncCallback(cb)) with
          | Success ar -> ()         
          | Error e -> sched.DecrPending () |> sched.Run
                       ek e)

    Run

    Finally, we need to run function to execute the async monad. The run function creates a new scheduler and a place to store results. When executing, an async computation may create multiple threads that terminate successfully (viz. without calling stop) and thereby producing multiple results (and exceptions). Therefore the result of executing an async computation is a list of Result values. An initial continuation is added to the ready queue that runs the target monad with success and exception continuations that add respectively to the result list. Continuations are then dequeued and run iteratively until there is no more work to do.

    let run m =
      let sched = new Scheduler()
      let result = ref []
      let init () = apply m (sched, (fun x -> result := Success x :: !result), (fun e -> result := Error e :: !result))
      sched.QueueFirst init |> sched.Run
      let f () = sched.Dequeue () |> sched.Run
      for k in Seq.unfold0 f do k ()
      !result

    Many applications however will expect a single result from an async computation, so the runOne function is also created to only return the first result.

    let runOne m =
      match run m with
        | [] -> failwith "no value"
        | Success x::_ -> x
        | Error e::_ -> raise e

    Synchronization

    There needs to be some way for threads within an async computation to synchronize with each other. We can't use the CLR's synchronization primitives because they will block the scheduler thread; therefore we need to create our own. So we will create a manual reset event (and leave an auto reset event as an exercise for the reader).

    A manual reset event will be represented as an optional list of continuations. A value of None means that the event is in the signalled state. A value of Some ws means that the event is in the non-signalled state and the list ws contains the continuations waiting for the event to become signalled. Hence a value of Some [] means that the event is not signalled but no one is waiting on it. The type of a manual reset event is

    type MRE = MRE of (unit -> unit) list option ref

    To create a new MRE in the specified state

    let newMRE state = if state then MRE (ref None) else MRE (ref (Some []))

    Waiting on a MRE causes its state to be analysed. Access to the MRE's internal mutable state does not need to be synchronized because there is only one real thread in the system. If it is signalled then the success continuation can be invoke immediately. If it is not signalled then the success continuation is added to the wait list and the thread yields.

    let waitMRE (MRE mre) =
      Async (fun (sched, sk, ek) ->
        match !mre with
          | None -> sk ()
          | Some wait -> mre := Some (sk :: wait))

    When setting an MRE, its state is again analysed. If it is already signalled then do nothing. Otherwise set the state to signalled and add each of the continuations in the wait list to the ready queue. Finally, regardless of the initial state, the success continuations in executed since setting an MRE is a non-blocking operation.

    let setMRE (MRE mre) =
      Async (fun (sched, sk, ek) ->
        match !mre with
          | None -> ()
          | Some wait -> mre := None
                         Lock.iter (sched.QueueFirst) wait |> sched.Run
        sk ())

    As well as auto reset events, other synchronization primitives can also be defined, such as MVars.

    Creating threads

    In many applications fork is not a particularly intuitive operation. A spawn operation that takes an async computation executes it on a new thread and then ends that thread is more intuitive in many scenarios. We can write a spawn operation in terms of the primitives we have already defined.

    let spawn m =
      async { let join = newMRE false
              return! fork (mreturn join)
                           (async { do! tryWith m (fun _ -> mreturn ())
                                    do! setMRE join
                                    return! stop }) }

    spawn starts by creating a new MRE that will be used signal the end of the spawned thread. It then forks. To one thread it returns the MRE so the caller can synchronize on the completion of the spawned computation. The other thread executes the target computation expression, ignoring any exceptions raised, signals the MRE and terminates.

    One of the most useful async operations is parallel which takes a list of computations, executes them all in parallel and returns a list of their results.

    let parallel ms =
      async { let results = Array.zero_create (List.length ms)
              let task = fun i m ->
                async { let! x = m
                        return results.[ i ] <- x } |> spawn
              let! joins = mapi task ms
              do! iter waitMRE joins
              return results |> Array.to_list }

    parallel first finds the length of the list and allocates an array to hold all the results. It then creates a task for each element in the list that executes the async computation and stores its result in the corresponding position in the results array. Spawning all these tasks produces a list of MREs which are used to wait for all the tasks to complete. When all the MREs have been signalled the function returns the list of results.

    Another useful control flow operation is first that evaluates a target computation (which may fork) but continues only with the first thread that completes. It creates a new ARE in the signalled state. The first thread that completes the computation will reset the ARE and return the result. Other threads will see the non-signalled ARE and terminate.

    let first m =
      async { let once = newARE true
              let! x = m
              let! b = tryWaitARE once
              if b then return x
              else return! stop }

    Conclusion

    This blog post is already long enough so I'll stop it here before covering applications. In the code download you can find examples of a web crawler, image resizer and the concurrency example from this paper by Haller and Odersky. Two natural extensions to the system presented here are making the scheduler (1) multi-threaded and (2) distributed. Using .NET binary serialization it should be possible to serialize the continuations as delegates and send them to a different process/host. Additionally the STM monad could be built-in under the continuation monad as the primitive means for blocking, synchronization and inter-process communication.

  • Using computation expressions to control monitor locks

    Using computation expressions to control monitor locks

    C# has the lock statement to support the use of .NET monitor synchronization.

    lock(this) { x--; }

    This curly-brace delimited block of atomic statements is easily identified in code. However having to call Wait or Pulse inside the block is less appealing because this previously hidden Monitor class now comes to your attention and you have to make sure you wait/pulse on the same object that was locked.

    lock(this) { x--; if(x==0) Monitor.Pulse(this); }

    F# has the lock function which is basically a higher-order function version of C#'s lock statement. It has the same un-niceness with wait/pulse, plus you have to construct the body of the block into a function, and to me braces {…} look better than the verbose (fun () -> …).

    lock this (fun () -> decr x; if x=0 then Monitor.Pulse(this))

    We can solve all these issues and more with computation expressions in F#. We start by defining the Lock monad, which is actually just an instance of the reader monad. A value of type Lock<'a> is a computation that will execute under the context of a lock and return a result of type 'a. The type of the Lock monad is a function that accepts the locked object and returns the result of the computation. The following return and bind functions are the standard implementation of the reader monad.

    type Lock<'a> = Lock of (obj -> 'a)

    let apply (Lock f) lock = f lock
    let mreturn x = Lock (fun lock -> x)
    let bind m f = Lock (fun lock -> apply (f (apply m lock)) lock)

    Next we need a function to run a Lock computation. This function takes as arguments an object to lock and the computation to run. It acquires a lock on the object, runs the computation and then releases the lock.

    (* val run : 'a -> Lock<'b> -> 'b *)
    let run lock m =
      let lock = box lock
      Monitor.Enter(lock)
      try apply m lock
      finally Monitor.Exit(lock)

    Next we'll define some monadic actions for wait and pulse.

    (* val wait : Lock<unit> *)
    let wait = Lock (fun lock -> Monitor.Wait(lock) |> ignore)

    (* val tryWait : int -> Lock<bool> *)
    let tryWait (timeout : int) = Lock (fun lock -> Monitor.Wait(lock, timeout))

    (* val pulse : Lock<unit> *)
    let pulse = Lock (fun lock -> Monitor.Pulse(lock))

    (* val pulseAll : Lock<unit> *)
    let pulseAll = Lock (fun lock -> Monitor.PulseAll(lock))

    Finally we need a builder class to enable computation expression syntax on the Lock type. This code is omitted here but can be downloaded in the full example.

    Now at last we can write lock code in F# with curly braces, without seeing the Monitor class, and without reproducing the locked object to call wait and pulse.

    lock { do decr x
           if x = 0 then return! pulse } |> run this

    Or a more complex example
    lock { while ready.Count = 0 && !pending > 0 do return! wait
           if ready.Count > 0
           then return Some (ready.Dequeue())
           else return None } |> run myLock

    You can even have an alternate run function that takes a timeout on acquiring the lock.

    (* val tryRun : 'a -> int -> Lock<'b> -> 'b option *)
      let tryRun lock (timeout : int) m =
        let lock = box lock
        if Monitor.TryEnter(lock, timeout)
        then try Some (apply m lock)
             finally Monitor.Exit(lock)
        else None

    Additionally the Lock monad can be used to make synchronized code more composable. Say you have method that always needs to execute under a lock but either you don't know which lock or the lock needs to be entered at an outer scope. You can write this method so that it returns a Lock computation, which can be composed with other Lock computations. Consider functions to synchronously access items in a System.Collections.Generic.Queue.

    (* val enqueue : Queue<’a> -> ‘a -> Lock<unit> *)
    let enqueue (q : Queue<_>) x =
      lock { if q.Count = 0 then return! pulseAll
             do q.Enqueue(x) }

    (* val dequeue : Queue<’a> -> Lock<’a> *)
    let dequeue (q : Queue<_>) =
      lock { while q.Count = 0 do return! wait
             return q.Dequeue() }

    Because these functions return a Lock<_> value the caller can never forget to acquire the lock and this will be enforced by the type system (of course the programmer can still acquire the wrong lock). Secondly these functions can naturally call pulse and wait without knowing the object the lock is acquired on.

    With these functions you can easily write code that atomically moves an item between queues and returns the item.

    let move q1 q2 =
      lock { let! x = dequeue q1
             do! enqueue q2 x
             return x } |> run myLock

    So if you have to use locks because you don't like STM or its performance isn't good enough or for legacy reasons, the lock monad could improve readability and abstraction.

  • Software Transactional Memory for F#

    I have written a library for using software transactional memory in F#. The library exposes memory transactions as a monad using F#’s new computation expression feature. It can be downloaded here.

    Software transactional memory (STM) is an approach to concurrent programming that avoids the use of traditional, error-prone locks to control access to shared memory. Instead of using locks a programmer specifies sections of code that will make atomic changes to shared memory.

    The design and implementation of this F# library is blatantly copied from the paper Composable Memory Transactions. I am enormously grateful to the authors of this paper whose clear and precise explanation made the implementation of this complicated piece of software almost trivial.

    The low-level STM machinery is implemented in the file stm.cs. It is written in C# mainly because I was working off existing code I’d written some time ago, but there’s no reason why it could not be written in F#. It is all managed code and internally uses monitors for synchronization. It currently uses a very coarse grained lock meaning that only one transaction can commit at once and every waiting thread is resumed when a transaction completes, but this can be improved with more work. The implementation follows the description in the paper.

    The F# monadic interface is in the file stm.fs. It imports definitions from the C# assembly and defines the monad type Stm<'a> with corresponding builder class to enable computation expression syntax. An issue with the implementation of STM systems is how to access the transaction log to make reads and writes during a transaction. Two common approachs are to use thread-local storage which is slow, or implicitly pass the transaction log as an extra parameter to every function, which requires code generation. The advantage of using computation expressions (or monadic do notation) is that the transaction log is explicitly passed between functions by the programmer’s code, thereby avoiding compiler modification, but also avoiding the need for the programmer to manually manage the transaction log parameter passing scheme as well.

    With these two layers in place one can write STM code in F#. Transacted memory locations have the type TVar<'a>. This is a reference cell like Ref<'a> in F# but its value can only be read and written from inside a transaction. A TVar is created using the newTVar function.

    let x = newTVar 0

    val x : TVar<int>

    This code is similar to let x = ref 0, but being a TVar instead of a Ref, the location x can be safely shared between multiple threads.

    A transaction is defined using a computation expression. A transaction is a value of type Stm<'a> which represents a computation that when executed will make an atomic change to the values of zero or more TVars and then return a value of type 'a. The type system ensures at compile-time that all reads and writes to a TVar only take place inside a transaction. The functions readTVar and writeTVar read and write a TVar respectively. They are analogous to let v = !x and x := v for regular reference cells.

    val readTVar : TVar<'a> -> Stm<'a>
    val writeTVar : TVar<'a> -> 'a -> Stm<unit>

    Let’s define a function that increments the value stored in a TVar.

    let incr x =
      stm {
        let! v = readTVar x
        do! writeTVar x (v+1)
        return v }

    val incr : TVar<int> -> Stm<int>

    The syntax stm { … } is an F# computation expression using the 'stm' monad, which means that this is a memory transaction (as opposed to an async work unit or something). The value of the location is read and stored in v. The incremented value is then written back to the location and v is returned as the result of the computation. When executed the read and write happen as an atomic block which means logically that an interleaving thread won’t access the location between the read and write.

    A transaction is executed by the atomically function.

    val : atomically : Stm<'a> -> 'a

    incr x |> atomically

    val it : int = 0

    incr x |> atomically

    val it : int = 1

    The composability of STM can be seen by the ability to compose incr into a larger transaction that increments a location twice.

    let incr2 x =
      stm {
        let! _ = incr x
        let! v = incr x
        return v }

    val incr2 : TVar<int> -> Stm<int>

    incr2 x |> atomically

    val it : int = 3

    Imagine that incr was part of the public interface of a module. If incr used a lock to perform the read and write atomically then it would be impossible for users of the library to write incr2 unless the module also exposed its locks through a public interface, which would make programming intricate for the user.

    If you see any Haskell STM code, it is straightforward to translate this into F#. For example the paper gives the implementation of an MVar as a code example. The paper describes an MVar as

    An MVar is a mutable location like a TVar, except that it may be either empty, or full with a value. The takeMVar function leaves a full MVar empty, and blocks on an empty MVar. A putMVar on an empty MVar leaves it full, and blocks on a full MVar. So MVars are, in effect, a one-place channel.

    MVars can be implemented in F# STM like so.

    type MVar<'a> = TVar<option<'a> >

    let newEmptyMVar () = newTVar None
    let newFullMVar v = newTVar (Some v)

    let takeMVar mv =
      stm { let! v = readTVar mv
            return! match v with
                    | Some a ->
                        stm { do! writeTVar mv None
                              return a }
                    | None -> retry () }

    let putMVar mv a =
      stm { let! v = readTVar mv
            return! match v with
                    | None -> writeTVar mv (Some a)
                    | Some _ -> retry () }

    val newEmptyMVar : unit -> MVar<'a>
    val newFullMVar : 'a -> MVar<'a>
    val takeMVar : MVar<'a> -> Stm<'a>
    val putMVar : MVar<'a> -> 'a -> Stm<unit>

    The retry operation is used to re-execute a transaction from the beginning. There is no point re-executing the transaction immediately because no memory locations will have changed. So the retry operation blocks until some memory changes that might allow the transaction to make further progress. This is how blocking is implemented with STM.

    Another feature of STM that makes it more composable than locks is how it handles blocking. The get/put MVar operations above are blocking operations. If an MVar library was written using locks and non-blocking versions of these operations were required then they would have to be implemented by the library developer and exposed as part of the library’s public interface. Not so with STM: if the MVar interface only exposes blocking get/put operations then it is possible for the user of the library to write the non-blocking versions, and vice-versa.

    let tryTakeMVar mv =
      orElse
        (stm { let! v = takeMVar mv
               return Some v })
        (stm { return None })

    val tryTakeMVar mv : MVar<'a> -> Stm<'a option>

    The function tryTakeMVar is the non-blocking version of takeMVar; it returns an option value depending on whether a value was available. The key here is the orElse operation. This function takes two transactions and tries to execute the first. If the first fails with a retry/block then it executes the second. So in this example if takeMVar retries/blocks then it will return None.

    Also included in the code download is the Santa program, a singly-linked list queue, and a fixed-length array queue.

    Of course all these great benefits of STM come at a price, and that price is performance. An STM program is probably always going to be slower than a lock-based program. This STM implementation was engineered for correctness and design of the monadic interface and so probably has really bad performance.

This Blog

Post Calendar

<September 2008>
SuMoTuWeThFrSa
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

Syndication

Powered by Community Server, by Telligent Systems