hubFS: THE place for F#

. . . are you on The Hub?
Welcome to hubFS: THE place for F# Sign in | Join | Help
in Search

MicroThreads in F#

Last post 07-18-2010, 13:03 by eventhelix. 5 replies.
Sort Posts: Previous Next
  •  02-01-2010, 12:58 13004

    MicroThreads in F#

    module MicroThreading =

       type Fiber(ops: seq<obj option>) =

          let enum = ops.GetEnumerator()

          let id = System.Guid.NewGuid().ToString()

          let mutable res = None

          static let pool = new ResizeArray<Fiber>()

          static let mutable curr = None

          static member FindByID id = pool.Find(fun x -> x.ID = id)

          member self.Start() = pool.Add(self)

          member self.Restart() = enum.Reset()

          member self.Kill() =

             pool.Remove self |> ignore

             enum.Dispose()

          member self.Suspend() = pool.Remove self |> ignore

          member self.ToSeq = seq {

                while enum.MoveNext() do yield enum.Current

             }

          static member KillAll() = pool.Clear()

          static member Current = curr.Value

          /// Run all the started fibers

          static member RunAll() =

             while pool.Count <> 0 do

                for i = 0 to pool.Count - 1 do

                   let x = pool.Idea [I]

                   curr <- Some x

                   x.Next

          member self.ID = id

          member internal self.Next =

             if res.IsNone && enum.MoveNext() then

                match enum.Current with

                |Some x -> res <- Some x

                |None -> ()

             else self.Kill()

          member self.Res with get() = res and set v = res <- Some v

       type Fiber<'T>(ops: seq<'T option>) =

          inherit Fiber(Seq.map(Option.map box) ops)

          member self.ToSeq = Seq.map(Option.map unbox<'T>) base.ToSeq

    let suspend() = Fiber<unit> (seq{

          Fiber.Current.Suspend()

          yield None

          yield Some()

       })

    let switch() = Fiber<unit> (seq{

          yield None

          yield Some()

       })

    let waiters = new ResizeArray<(unit -> bool) * Fiber>()

    let waitUntil f = Fiber<unit> (seq{

          if f() then yield None; yield Some()

          else waiters.Add(f, Fiber.Current); yield! suspend().ToSeq

       })

    let internal waitermanager =

       let waitUntilManager = Fiber<unit> (seq{

             while true do

             for i in 0..waiters.Count-1 do

                let f, fi = waiters.Idea [I]

                if f() then waiters.RemoveAt i; fi.Start()

             yield None

          })

       waitUntilManager.Start()

       waitUntilManager

    type FiberBuilder() =

    member self.Zero() = switch()

    member self.Delay (f: unit -> Fiber<_>) = Fiber<_> (seq{

    yield None

    yield! f().ToSeq

    })

    member self.Bind(x: Fiber<'a>, f: 'a -> Fiber<'b>) =

    if x.Res.IsNone then new Fiber<'b>(x.ToSeq

    |> Seq.collect(fun x ->

    match Option.map(fun x -> f(x).ToSeq) x with

    |Some x -> x

    |None -> seq { yield None }))

    else new Fiber<_> { yield! f(unbox x.Res.Value).ToSeq }

    member self.Return x = Fiber<_> (seq{

    yield Some x

    })

    let fiber = FiberBuilder()

    type Channel<'Msg>() =

    let msgs = Queue<'Msg>()

    let id = System.Guid.NewGuid().ToString()

    // interface IChannel<'Msg> with

    // member self.Send msg = msgs.Enqueue msg

    // member self.Receive() = fiber {

    // do! waitUntil(fun() -> msgs.Count <> 0)

    // return msgs.Dequeue()

    // }

    //

    member self.ID = id

    member self.Send msg = msgs.Enqueue msg

    member self.SendAndReply msg = fiber {

    self.Send msg

    return! self.Receive()

    }

    member self.Receive() = fiber {

    do! waitUntil(fun() -> msgs.Count <> 0)

    return msgs.Dequeue()

    }

    /// implements multiple consumers and single producer

    type Link<'Msg>() =

    let channels = new ResizeArray<Channel<'Msg>>()

    member self.ConnectChannel ch = channels.Add ch

    member self.Send msg =

    for x in channels do x.Send msg

    let (<--) (x: Channel<_>) msg = x.Send msg

    enjoy Big Smile [:D].

  •  02-05-2010, 18:16 13030 in reply to 13004

    Re: MicroThreads in F#

    Sounds interesting, but I didn't manage to compile your code. Seems like you use the deprecated syntax for seq (but I'm not sure).

    Could you have a look? Thanks!
  •  02-19-2010, 13:19 13193 in reply to 13030

    Re: MicroThreads in F#

    @Stinger,

    i fixed it.

    Sincerely,

    Saagar

  •  06-15-2010, 12:56 15878 in reply to 13193

    Re: MicroThreads in F#

    Do you have any example code/test that compare this impl to TPL tasks?
  •  06-16-2010, 2:15 15886 in reply to 13004

    Re: MicroThreads in F#

    Small usage example, and I'll translate it and post to russian community.
  •  07-18-2010, 13:03 16193 in reply to 15886

    Re: MicroThreads in F#

    @LOST
    let chan = Channel()
    let fiber1 =
    fiber {
    do! chan.Send "Hi"
    // give control to the other fiber
    yield None
    do! chan.Send "Bye"
    // finish
    yield Some()
    }
    let rec printer =
    fiber {
    let! msg = chan.Recieve()
    yield None
    printfn "%s" msg
    yield! printer }
    Fiber.RunAll()

    @anotheraccount

    These can't be compared with the TPL these do not actually run in the background these are just useful when need spawn off thousands of tasks and only then will they run faster than TPL tasks. Currently im working on something to combine this with the TPL the result of that will be called a smart thread.
View as RSS news feed in XML
Powered by Community Server, by Telligent Systems