module
MicroThreading =
type Fiber(ops: seq<obj option>) =
let enum = ops.GetEnumerator()
let id = System.Guid.NewGuid().ToString()
let mutable res = None
static let pool = new ResizeArray<Fiber>()
static let mutable curr = None
static member FindByID id = pool.Find(fun x -> x.ID = id)
member self.Start() = pool.Add(self)
member self.Restart() = enum.Reset()
member self.Kill() =
pool.Remove self |> ignore
enum.Dispose()
member self.Suspend() = pool.Remove self |> ignore
member self.ToSeq = seq {
while enum.MoveNext() do yield enum.Current
}
static member KillAll() = pool.Clear()
static member Current = curr.Value
/// Run all the started fibers
static member RunAll() =
while pool.Count <> 0 do
for i = 0 to pool.Count - 1 do
let x = pool.
curr <- Some x
x.Next
member self.ID = id
member internal self.Next =
if res.IsNone && enum.MoveNext() then
match enum.Current with
|Some x
-> res <- Some x
|None
-> ()
else self.Kill()
member self.Res with get() = res and set v = res <- Some v
type Fiber<'T>(ops: seq<'T option>) =
inherit Fiber(Seq.map(Option.map box) ops)
member self.ToSeq = Seq.map(Option.map unbox<'T>) base.ToSeq
let suspend() = Fiber<unit> (seq{
Fiber.Current.Suspend()
yield None
yield Some()
})
let switch() = Fiber<unit> (seq{
yield None
yield Some()
})
let waiters = new ResizeArray<(unit -> bool) * Fiber>()
let waitUntil f = Fiber<unit> (seq{
if f() then yield None; yield Some()
else waiters.Add(f, Fiber.Current); yield! suspend().ToSeq
})
let internal waitermanager =
let waitUntilManager = Fiber<unit> (seq{
while true do
for i in 0..waiters.Count-1 do
let f, fi = waiters.
if f() then waiters.RemoveAt i; fi.Start()
yield None
})
waitUntilManager.Start()
waitUntilManager
type FiberBuilder() =
member self.Zero() = switch()
member self.Delay (f: unit -> Fiber<_>) = Fiber<_> (seq{
yield None
yield! f().ToSeq
})
member self.Bind(x: Fiber<'a>, f: 'a -> Fiber<'b>) =
if x.Res.IsNone then new Fiber<'b>(x.ToSeq
|> Seq.collect(
fun x ->
match Option.map(fun x -> f(x).ToSeq) x with
|Some x
-> x
|None
-> seq { yield None }))
else new Fiber<_> { yield! f(unbox x.Res.Value).ToSeq }
member self.Return x = Fiber<_> (seq{
yield Some x
})
let fiber = FiberBuilder()
type Channel<'Msg>() =
let msgs = Queue<'Msg>()
let id = System.Guid.NewGuid().ToString()
// interface IChannel<'Msg> with
// member self.Send msg = msgs.Enqueue msg
// member self.Receive() = fiber {
// do! waitUntil(fun() -> msgs.Count <> 0)
// return msgs.Dequeue()
// }
//
member self.ID = id
member self.Send msg = msgs.Enqueue msg
member self.SendAndReply msg = fiber {
self.Send msg
return! self.Receive()
}
member self.Receive() = fiber {
do! waitUntil(fun() -> msgs.Count <> 0)
return msgs.Dequeue()
}
/// implements multiple consumers and single producer
type Link<'Msg>() =
let channels = new ResizeArray<Channel<'Msg>>()
member self.ConnectChannel ch = channels.Add ch
member self.Send msg =
for x in channels do x.Send msg
let (<--) (x: Channel<_>) msg = x.Send msg
enjoy
.