hubFS: THE place for F#

. . . are you on The Hub?
Welcome to hubFS: THE place for F# Sign in | Join | Help
in Search

MicroThreads in F#

Last post 02-19-2010, 13:19 by eventhelix. 2 replies.
Sort Posts: Previous Next
  •  02-01-2010, 12:58 13004

    MicroThreads in F#

    module MicroThreading =

       type Fiber(ops: seq<obj option>) =

          let enum = ops.GetEnumerator()

          let id = System.Guid.NewGuid().ToString()

          let mutable res = None

          static let pool = new ResizeArray<Fiber>()

          static let mutable curr = None

          static member FindByID id = pool.Find(fun x -> x.ID = id)

          member self.Start() = pool.Add(self)

          member self.Restart() = enum.Reset()

          member self.Kill() =

             pool.Remove self |> ignore

             enum.Dispose()

          member self.Suspend() = pool.Remove self |> ignore

          member self.ToSeq = seq {

                while enum.MoveNext() do yield enum.Current

             }

          static member KillAll() = pool.Clear()

          static member Current = curr.Value

          /// Run all the started fibers

          static member RunAll() =

             while pool.Count <> 0 do

                for i = 0 to pool.Count - 1 do

                   let x = pool.Idea [I]

                   curr <- Some x

                   x.Next

          member self.ID = id

          member internal self.Next =

             if res.IsNone && enum.MoveNext() then

                match enum.Current with

                |Some x -> res <- Some x

                |None -> ()

             else self.Kill()

          member self.Res with get() = res and set v = res <- Some v

       type Fiber<'T>(ops: seq<'T option>) =

          inherit Fiber(Seq.map(Option.map box) ops)

          member self.ToSeq = Seq.map(Option.map unbox<'T>) base.ToSeq

    let suspend() = Fiber<unit> (seq{

          Fiber.Current.Suspend()

          yield None

          yield Some()

       })

    let switch() = Fiber<unit> (seq{

          yield None

          yield Some()

       })

    let waiters = new ResizeArray<(unit -> bool) * Fiber>()

    let waitUntil f = Fiber<unit> (seq{

          if f() then yield None; yield Some()

          else waiters.Add(f, Fiber.Current); yield! suspend().ToSeq

       })

    let internal waitermanager =

       let waitUntilManager = Fiber<unit> (seq{

             while true do

             for i in 0..waiters.Count-1 do

                let f, fi = waiters.Idea [I]

                if f() then waiters.RemoveAt i; fi.Start()

             yield None

          })

       waitUntilManager.Start()

       waitUntilManager

    type FiberBuilder() =

    member self.Zero() = switch()

    member self.Delay (f: unit -> Fiber<_>) = Fiber<_> (seq{

    yield None

    yield! f().ToSeq

    })

    member self.Bind(x: Fiber<'a>, f: 'a -> Fiber<'b>) =

    if x.Res.IsNone then new Fiber<'b>(x.ToSeq

    |> Seq.collect(fun x ->

    match Option.map(fun x -> f(x).ToSeq) x with

    |Some x -> x

    |None -> seq { yield None }))

    else new Fiber<_> { yield! f(unbox x.Res.Value).ToSeq }

    member self.Return x = Fiber<_> (seq{

    yield Some x

    })

    let fiber = FiberBuilder()

    type Channel<'Msg>() =

    let msgs = Queue<'Msg>()

    let id = System.Guid.NewGuid().ToString()

    // interface IChannel<'Msg> with

    // member self.Send msg = msgs.Enqueue msg

    // member self.Receive() = fiber {

    // do! waitUntil(fun() -> msgs.Count <> 0)

    // return msgs.Dequeue()

    // }

    //

    member self.ID = id

    member self.Send msg = msgs.Enqueue msg

    member self.SendAndReply msg = fiber {

    self.Send msg

    return! self.Receive()

    }

    member self.Receive() = fiber {

    do! waitUntil(fun() -> msgs.Count <> 0)

    return msgs.Dequeue()

    }

    /// implements multiple consumers and single producer

    type Link<'Msg>() =

    let channels = new ResizeArray<Channel<'Msg>>()

    member self.ConnectChannel ch = channels.Add ch

    member self.Send msg =

    for x in channels do x.Send msg

    let (<--) (x: Channel<_>) msg = x.Send msg

    enjoy Big Smile [:D].

  •  02-05-2010, 18:16 13030 in reply to 13004

    Re: MicroThreads in F#

    Sounds interesting, but I didn't manage to compile your code. Seems like you use the deprecated syntax for seq (but I'm not sure).

    Could you have a look? Thanks!
  •  02-19-2010, 13:19 13193 in reply to 13030

    Re: MicroThreads in F#

    @Stinger,

    i fixed it.

    Sincerely,

    Saagar

View as RSS news feed in XML
Powered by Community Server, by Telligent Systems