callmekohei's blog

callmekoheiのひとりごと

(F#)重い処理を並列で処理する(Actor Model)

Summary

アクターモデルで重い処理を並列でしょりする

こんなかんじ

たとえば5秒かかるしょりがあるとします

// 5秒かかる処理
let take5sec =
    System.Threading.Thread.Sleep 5000

この重い処理を10回おこなうとすると普通は50秒かかるかと思いますが、これを並列で処理すると50秒以下で処理できるようになります。

アクターモデルでの定形コード

// 並列処理する定型コード

// MailboxProcessorは長いのでActorと名前を変更
type Actor<'T> = MailboxProcessor<'T>
// 返信用のチャンネル
type Msg = AsyncReplyChannel<int>

let actorBehavior ( actor:Actor<Msg> ) =
    // アクターを使い回すので再帰にしておく
    let rec messageLoop() = async {
        // メッセージハンドラのトリガー
        let! reply = actor.Receive()
        // 処理内容(重い処理)
        take5sec
        // とりあえずスレッドIDを返してみる
        reply.Reply System.Threading.Thread.CurrentThread.ManagedThreadId
        return! messageLoop()
    }
    messageLoop()

let futures actorAmount runnerAmount behavior =

    // アクターをn個用意する
    let roundRobinRouter actorAmount =
        List.init actorAmount (fun _ -> Actor<Msg>.Start( behavior ) )

    // アクターにラウンドロビンで処理を割り当てる
    Seq.initInfinite( fun n -> n % (actorAmount) )
    |> Seq.take runnerAmount
    |> Seq.map ( fun i -> (roundRobinRouter actorAmount).[i].PostAndAsyncReply(id) )

実行してみる

// アクターを5個用意して10個のランナーを作成する
futures 5 10 actorBehavior
|> Async.Parallel
|> Async.RunSynchronously
|> printfn "%A"

結果

[Loading /Users/callmekohei/tmp/abcabc.fsx]
[|17; 15; 18; 21; 15; 17; 17; 17; 17; 15|]
namespace FSI_0010
  val take5sec : unit
  type Actor<'T> = MailboxProcessor<'T>
  type Msg = AsyncReplyChannel<int>
  val actorBehavior : actor:Actor<Msg> -> Async<'a>
  val futures :
    actorAmount:int ->
      runnerAmount:int ->
        behavior:(MailboxProcessor<Msg> -> Async<unit>) -> seq<Async<int>>


*** time : 5.197822 s ***

50秒かかるところが5秒で実施できた!

参考

blog.amedama.jp