EDIT: Whoops! I wrote this before seeing Jacobs reply to his orginal message.

Hi Jacob,

Nice to hear from new community members so enthusiastic about the langauge. It sounds like an interesting project, is work or just a hobby?

I've fixed the code so it compiles below. The basic problem was that the asynchronous workflow needs to be executed. To do this we assign it to the "workflow" idendifier then use "Async.Run".

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#light
open System.IO
open System.Net.Sockets
open Microsoft.FSharp.Control.CommonExtensions
   
type AsyncStream =
    inherit NetworkStream
    member stream.ReadAsyncBytesWithYield(count : int) =
            let buffer = Array.create count 0uy                    
            let remaining = ref count
            let data = seq { while(!remaining > 0) do
                              let workflow =
                                    async { let! read = stream.ReadAsync(buffer, 0, !remaining)
                                            do remaining := !remaining - read
                                            return buffer.[0..read] }
                              yield! Async.Run workflow }
            data |> Seq.to_list

I guess there could be some performance issues with this code, but I'm unwill to guess at what they might be :) It's also most always best to take the suck it and see apporach meaning testing and profiling.

Cheers,
Rob

By on 1/26/2008 12:49 AM ()

Hi guys,

The second version of the code looks pretty strange to me: why create an Async with a single asynchronous action and then immediately run it? That's just doing a blocking read.

I think you are just after a version that used a bit less mutable state and no "while" loop. Here's one way to do this:

1
2
3
4
5
6
7
8
9
10
11
12
 

type System.IO.Stream with
  /// Will return when count number of bytes have been read
  member stream.ReadAsyncBytes(count : int) =
    let buffer = Array.create count 0uy                    
    let rec readBytes(index, remaining) = 
        async { let! read = stream.ReadAsync(buffer, index, remaining)
                do! readBytes(index + read, remaining - read) }
    readBytes(0,count)

Here's a version that doesn't mutate arrays at all, though to be honest the above is probably better.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 

type System.IO.Stream with
  member stream.ReadAsyncByteList(max:int) =
    async { let buffer = Array.create max 0uy                    
            let! read = stream.ReadAsync(buffer, 0, max)
            return buffer.[0..read-1]  }
            
  member stream.ReadAsyncBytes(count : int) =
    let rec readBytes(remaining) = 
        async { let! read = stream.ReadAsyncByteList(remaining)
                let! rest = readBytes(remaining - read.Length) 
                return Array.append read rest }
    readBytes(count)

I also think you're trying to mix synchronous (= blocking) sequences with async programming (= non-blocking). This won't work. Fundamentally, synchronous sequences work by making blocking calls to methods and properties "Current" and "MoveNext". I believe you would need to define an "asynchronous sequence" to get a non-blocking sequence (i.e. where each "request" for data doesn't block the requesting thread but instead returns an Async computation).

I suspect the underlying types would be:

1
2
3
4
5
6
7
8
9
10
11
12
 

type IAsyncEnumerator<'a> = 
    abstract Current : 'a
    abstract MoveNext : unit -> Async<bool>

type IAsyncEnumerable<'a> = 
    abstract GetEnumerator : unit -> IAsyncEnumerator<'a>

type AsyncSeq = IAsyncEnumerable<'a> 

The types of the combinators would look something like this:

1
2
3
4
5
6
 
AsyncSeq.map : ('a -> Async<'b>) -> AsyncSeq<'a> -> AsyncSeq<'b> 
AsyncSeq.iter : ('a -> Async<unit>) -> AsyncSeq<'a> -> Async<unit> 

etc.

You would be able to build a computation expression builder for this type. It's very similar to monadic programming in Haskell, of course.

Note the existence of both blocking and non-blocking sequences is normal for non-blocking programming: the non-blocking nature of the code inevitably gives rise to this.

Kind regards

don

By on 1/26/2008 3:44 AM ()

Hi Robert and Don,

Thanks for the replies.

I am running a small indie game company – 3 Lives Left – with a couple of friends, so we do need to get some code running to get bread and butter on the table – and F# is such a fun way to do it.
We have been following the language on the side for a year or so. We have tried getting our feets wet with F# earlier, but starting a business and also having to learn a new language was too much. So we have been waiting for your books to become available. It makes it a lot easier to get started. Thanks! :)

Oh yes, I see that I don't win much with only one async operation in the expression now combined with Async.Run. Regarding the Async.Run call, my thought was to exchange that with something like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
      member stream.Read(count : int) =
            let buffer = Array.create count 0uy                    
            let remaining = ref count
            let data = seq {
                            let rec yieldResult result =
                                     remaining := !remaining - result.Length
                                     yield! result
                                     if(!remaining > 0)
                                         Async.Spawn(asyncComputation) 
                                and asyncComputation =
                                    async {
                                            let! read = stream.ReadAsync(buffer, 0, !remaining)
                                            do yieldResult buffer.[0..read]
                                          }
                                Async.Spawn(asyncComputation)                                                                                                               
                               }
            data 

But then I ran into a problem with yield in the yieldResult function* and recursive bindings not allowed in comprehensions. At that time it was late, and my head was too full to cope with more F#.
Looking at the idea now, I am not even sure what I expect the code to do, since spawning the async computation would just yield nothing (?). I think I got it messed up in trying to apply as much new stuff as I could.

To make sure my understanding is correct:
If the solution with only one async call had two or more async calls instead, then it would make sense with the Async.Run call, right? Since then i would get the benefit of the async computation each time I iterated over the seqence? So mixing seqence and async expressions makes sense to do like in this example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#light

open System
open System.IO
open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.CommonExtensions

let data = seq {
                    use fiskFile = File.OpenRead("C:\fisk.txt") 
                    use hestFile = File.OpenRead("C:\hest.txt") 
                    let buffer = Array.create 2 0uy
                    for i in 0..5 do 
                        let asyncop =
                            async {
                                    let! _ = fiskFile.ReadAsync(buffer, 0, 1)
                                    let! _ = hestFile.ReadAsync(buffer, 1, 1) 
                                    return (buffer.[0],  buffer.[1])                                                                            
                            }
                        yield Async.Run(asyncop)     
               }

let writeData() =
    use fiskFile = File.OpenWrite("C:\fisk.txt") 
    use hestFile = File.OpenWrite("C:\hest.txt") 
    let buffer = Array.create 2 0uy
    buffer.[0] <- 1uy
    buffer.[1] <- 2uy
    for i in 0..5 do
        fiskFile.Write(buffer, 0, 1)
        hestFile.Write(buffer, 1, 1)

writeData()
Seq.iter(fun x -> printfn "%A" x) data

Console.ReadKey()


val data : seq<byte * byte>
val writeData : unit -> unit

(1uy, 2uy)
(1uy, 2uy)
(1uy, 2uy)
(1uy, 2uy)
(1uy, 2uy)
(1uy, 2uy)

Thanks for the code suggestions – I changed it to suit my needs to this:

1
2
3
4
5
6
7
8
9
10
11
12
type System.IO.Stream with
  /// Will return when count number of bytes have been read
  member stream.ReadAsyncBytes(count : int) =
    let buffer = Array.create count 0uy                   
    let rec readBytes(index, remaining) =
        async { let! read = stream.ReadAsync(buffer, index, remaining)
                     match remaining - read with 
                     | 0 -> return buffer
                     | _ -> let! res = readBytes(index + read, remaining - read) 
                               return res
                }
    readBytes(0,count)

since I want it to have the signature int -> Async<byte array> - btw I think the termination case is missing in original recursive readBytes function?

Interesting suggestion about the builder – I think I might need to get more into the language before I try, plus I need to find the time – got a game to make :)

Best regards,
Jacob

* is that because yield only makes sense inside the seq expression, and putting it into a function makes it possible to return the function to a context (the seqence) where it no longer makes sense?

By on 1/26/2008 7:19 AM ()

Hi

A more easier to read edition

1
2
3
4
5
6
7
8
9
10
type System.IO.Stream with
  /// Will return when count number of bytes have been read
  member stream.ReadAsyncBytes(count : int) =
    let buffer = Array.create count 0uy                  
    let rec readBytes(index, remaining) =
        async { match remaining with
                | 0 -> return buffer
                | _ -> let! read = stream.ReadAsync(buffer, index, remaining)
                       return! readBytes(index + read, remaining - read)}
    readBytes(0,count)

trying to understand monads I found this

[link:www.haskell.org]

Thought it might help others that, like me, don't have experience with them.

Best regards,

Jacob

By on 1/27/2008 12:48 AM ()

I've been writing my chat server example to use workflows so it doesn't block any threads. I tried at first unsuccesfully to combine async and seq workflows, which when I found this post again. I now fully understands Don's comments about sequences being bloking. Here's my effort at server, any feedback welcome:

[link:fofs.codeplex.com]

By on 4/20/2009 2:24 AM ()

Hi,

been working more on my problem. My current solution looks like this now

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
member stream.ReadAsyncBytes(count : int) =
	async {
		let buffer = Array.create count 0uy                    
		let remaining = ref count
		let data = 
			seq {
				while(!remaining > 0) do
					let result =
					async {
						let! read = stream.ReadAsync(buffer, 0, !remaining)
						do remaining := !remaining - read
						return buffer.[0..read]
					}
					yield! Async.Run(result)                                                                                                               
			   }
			return data |> Seq.to_array 
	}

which actually compiles! Unfortunately it fails my unit tests, as it doesn't return data from the stream...darn (and no more time this evening)

The changes are that I have wrapped the function into an async expression and then found the yield! (yield bang) function in Robert Pickerings excellent article at [link:www.infoq.com]

I am beginning to think that solution 1 might be prettier (in the sense that its easier to understand - at least for old time C#'ers - and a bit shorter)

I have noticed that other posts have nice coloring on their code snippets - where can I find the functionality?

Best regards,
Jacob

Edit:
Come to think of it, perhaps I dont need the int -> Async<byte array> type anymore, but more like something in this direction:

1
2
3
4
5
6
7
8
9
10
11
12
13
member stream.Read(count : int) =
let buffer = Array.create count 0uy                    
let remaining = ref count
let data = seq {
	while(!remaining > 0) do
		let result = async {
			let! read = stream.ReadAsync(buffer, 0, !remaining)
				do remaining := !remaining - read
					return buffer.[0..read]
		}
	yield! Async.Run(result)                                                                                                               
}
data

hmm....

By on 1/25/2008 2:13 PM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper