11
comments
on 2/6/2014 8:04 AM

Many applications want to use multiple cores to execute faster, while retaining the same observable behavior as the sequential version. Async in F# goes a long way to support this, but its parallel sections do not order effects at all. Sometimes you want a combination of ordered and unordered effects. For example, you might want a logging "effect" that displays log messages strictly in the order the program is written, while other effects are free to be scheduled as the machine sees fit. And you want the logs to be processed as soon as available: delaying them until all processing is done is not user-friendly.

Let us do exactly that. The F# presented is definitely not the only, and perhaps not the best way to solve this problem. But I hope the solution is simple enough to convince you that the problem is not a hard one.

First, what are we after? As a model for parallelism, let us take something like the Haskell Par monad with IVar for synchronization, which is an old idea that I find is particularly well presented in this paper. It is simply fork/join with joins accomplished by reading "write-once" variables, which amounts to waiting on the thread to write to the variable. Quite similar to BCL Task model or F# Async SpawnChild in fact. To that we add a logging primitive. We might use it like this, to implement parallel Fibonacci:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    let rec ParFib (n: int) : Par<int> =
        par {
            do! Par.Log (Message (sprintf "%i" n))
            do! Par.DoAsync (Async.Sleep 250)
            match n with
            | 0 | 1 ->
                return 1
            | n ->
                let! a = Par.Spawn (ParFib (n - 1))
                let! b = ParFib (n - 2)
                let! a = Par.Await a
                return a + b
        }
 

Note Par.Spawn which forks its argument, and returns a Future<'T> (this is our IVar), which we then Par.Await to join to the main thread.

When we run this, we expect the messages to appear in the same order as the sequential version, but the program to run faster. We also expect to see logs appearing right away, before the execution finishes. Here is the sequential version:

1
2
3
4
5
6
7
8
9
10
11
12
13
    let rec SFib (n: int) : Par<int> =
        par {
            do! Par.Log (Message (sprintf "%i" n))
            do! Par.DoAsync (Async.Sleep 250)
            match n with
            | 0 | 1 ->
                return 1
            | n ->
                let! a = SFib (n - 1)
                let! b = SFib (n - 2)
                return a + b
        }
 

How to implement this? The only interesting part is combining two Par computations in a way that preserves the log order. If we could have some notion of Stream this would be easy - to combine A and B we would append the streams produced by both. The tricky part is that we should return the combined stream before the computation is done.

It is, in fact, possible to do so. We can implement Stream as you would a lazy list, but replacing Lazy<'T> suspensions with Future<'T> suspensions. The only really tricky function is monadic bind for Par. Here is how it might read:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Par<'T> =
    | Par of (unit -> Stream<Message> * Future<'T>)

module Par =
    let Bind (Par x) (f: 'T1 -> Par<'T2>) : Par<'T2> =
        Par <| fun () ->
            let (streamHead, x) = x ()
            let result = Future.Create()
            let streamVar = Future.Create()
            async {
                try
                    let! x = Future.Await x
                    let (Par yF) = f x
                    let (yS, yV) = yF ()
                    do Future.Set streamVar yS
                    let! y = Future.Await yV
                    return Future.Set result y
                with e ->
                    return Future.Fail result e
            }
            |> Async.Start
            let stream = Stream.Append streamHead (Stream.FromFuture streamVar)
            (stream, result)

That is, we construct a stream by appending a ready part with a future part, and we "set" the future part from a new thread when it is available.

Here is the complete code you can load in FSI to play with: [link:gist.github.com] - it comes with an optimistic lock-free Future, a very simple Stream. It works great for the SFib/ParFib examples. Before copying this to use in production, please note that there might be some bugs lurking in exception semantics for Future - I have not taken the time to verify it yet; also, the optimistic implementation is fun but locking might perform better - your mileage will vary, and I did not do benchmarks.

In conclusion, I hope you are now convinced that:

  • Deterministic logging in presence of parallelism is not hard
  • Write-once variables (here: Futures) are a simple and useful synchronization primitive
.

...

By on 2/6/2014 2:04 PM ()

...

By on 2/6/2014 2:05 PM ()

Anton, I'm not sure what is the *exact* problem you has solved.

"Deterministic logging in presence of parallelism is not hard"

Your approach, for N parallel process, use (N - 1) * sizeEachLog amount of memory (if not first sizeEachLog is consumed on start then N * sizeEachLog).

The sequential version use no memory.

I think is not the same thing.

Moreover, suppose a ImageConverter server, you send one image to the server, and it reply with a complex *perpixel* image filter.

If the server process inputs (images) sequentially then, no memory is needed, each processed pixel is sent immediately, but, of course, only one pixel is computed at a time.

If the server process inputs (images) asynchronously (nondeterministic), no memory is needed and many pixels are computed in parallel (one channel per input is needed).

Using your aproach, the server can process many inputs at a time (FIFO) but, if determinism is needed, you can only sent to client the older pixel, if this pixel is very complex, your stream grow and grow awaiting (you can set restrictions to the FIFO e.g. max length, but it's another history).

Your code *could be useful* yes, but not solve the deterministic vs. nondeterministic parallelization problem (I think, of course).

In conclusion, I hope you are now convinced that:

* Deterministic logging in presence of parallelism is not hard

is false, in general.

Great job, anyway.

Regards!

By on 2/6/2014 2:09 PM ()

For the kind of applications I have in mind, (1) memory use is not part of the semantics and (2) logs are observed at a rate far exceeding the rate at which they are produced.

Consider "sequential erasing": replacing Spawn with identity and Await with Return. Consider semantics of a program to be:

1
val Denote : Program<'T> -> list<Message> * 'T

Approximately; this is simplifying by ignoring DoAsync effects.

Then the "determinism" property is simply that Denote p = Denote (SeqErase p).

This is a reasonable assumption for some applications and not for others.

An application you are describing seems to require some sort of flow control, as you point out yourself "restrictions to the FIFO e.g. max length".

By on 2/7/2014 6:15 AM ()

"For the kind of applications I have in mind, (1) memory use is not part of the semantics and (2) logs are observed at a rate far exceeding the rate at which they are produced."

"This is a reasonable assumption..."

Yes, restricting some problem it turn easy to solve...

My example fit into "Deterministic logging in presence of parallelism", is real, is practical, but is not easy...

Argue that it is invalid, is unreasonable (to me) insofar as you are generalizing your assertion.

By on 2/7/2014 8:36 AM ()

Test comment..

By on 2/6/2014 2:27 PM ()

On the other hand, using lazy you could write (in Haskell)

1
2
3
4
5
6
7
8
9
lpfib :: Int -> (Int, [Int])
lpfib 0 = (1, [1])
lpfib 1 = (1, [1, 1])
lpfib n = aq `par` bq `pseq` (a + b, as ++ bs ++ [a + b])
          where aq@(a, as) = lpfib (n - 1)
                bq@(b, bs) = lpfib (n - 2)
 
-- usage example
main = getArgs >>= mapM_ print . snd . lpfib . read . head
By on 2/7/2014 12:03 AM ()

I love it when someone takes the time to reduce 1000 words of my nonsense to 5 lines of Haskell :)

However I am not convinced your program meets my spec. Seems like `aq` and `bq` are evaluated in parallel, and then, only then, `as ++ bs ++ ..` is constructed. In other words it is impossible to observe the log stream during the evaluation of `aq` and `bq`. See "And you want the logs to be processed as soon as available: delaying them until all processing is done is not user-friendly."

The kind of application I have in mind here, is parallel test runner or a tool like make. Jobs are expensive, logging is cheap. It is not an option to delay logs until all jobs are done, the user needs the feedback sooner.

If you have the time, could you prove me wrong, or show a better Haskell program? It is bound to be shorter than F#..

By on 2/7/2014 6:06 AM ()

"However I am not convinced your program meets my spec....
...it is impossible to observe the log stream during the evaluation"

yes, you are right, but in your case occur the same* behavior!

(*) if you parallelize N process into N processors; my stream is not useful until all process end; your {2..N} streams are not useful until first process end. Here, your argument "it is impossible to observe the log stream during the evaluation" loses meaning...

On the other hand, *I accept* my cheat (is not the same think) *in general* but is closed to the crux of the matter.

My only criticism is about your generalization of your solution.

By on 2/7/2014 8:48 AM ()

Well... You say yourself - (*) is a huge difference. Yes, streams {2..N} wait for stream 1. But stream 1 is observed as it is being evaluated. This was *exactly* the problem I was solving. Sorry, it is not a silver bullet.

By on 2/7/2014 8:54 AM ()

"This was *exactly* the problem I was solving"

Alright. Thanks your clarification.

By on 2/7/2014 8:59 AM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us
Built with WebSharper