Here is a version using Async.FromContinuations.
However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.
The reason this is not an answer is because System.Net.NetworkInformation.Ping does not scale: it appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the (broken) code above using raw sockets (appears) to use only a few threads and a couple 100Mb -- though this may be incorrect, since that code does not completely work.
However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.
The reason this is not an answer is because System.Net.NetworkInformation.Ping does not scale: it appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the (broken) code above using raw sockets (appears) to use only a few threads and a couple 100Mb -- though this may be incorrect, since that code does not completely work.
type System.Net.NetworkInformation.Ping with
member this.AsyncPing (address:IPAddress) : Async =
let pingAsync =
Async.FromContinuations (fun (cont, econt, ccont) ->
let userToken = new obj()
let rec handler =
PingCompletedEventHandler (fun _ args ->
if userToken = args.UserState then
this.PingCompleted.RemoveHandler(handler)
if args.Cancelled then
ccont (new OperationCanceledException())
elif args.Error <> null then
econt args.Error
else
cont args.Reply)
this.PingCompleted.AddHandler(handler)
this.SendAsync(address, 1000, userToken)
)
async {
use! _holder = Async.OnCancel(fun _ -> this.SendAsyncCancel())
return! pingAsync
}
let AsyncPingTest() =
let pings =
seq {
for net in 0..255 do
for node in 0..255 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
let ping = new Ping()
yield ping.AsyncPing( ip )
}
pings
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.iter ( fun result ->
printfn "%A" result )Your AsyncPing will wait until it actually receives the data. Since it doesn't receive anything, it will expectedly wait forever. You can use :
This will raise a timeout exception if the data is not received soon enough.
Hope this helps
let result = let timeout = 3000 let comp = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr ) Async.RunSynchronously(comp, timeout)
This will raise a timeout exception if the data is not received soon enough.
Hope this helps
Your AsyncPing will wait until it actually receives the data. Since it doesn't receive anything, it will expectedly wait forever.
Good suggestion on using Async.RunSynchronously with a timeout, but I think that applies to the entire parallel operation: when running 1000's of pings at once, they would all need to complete within that time otherwise an exception would be thrown.
I'm actually looking for the reason my AsyncReceiveFrom (or the underlying BCL BeginReceiveFrom) does not honor the timeout, whereas a call to the synchronous BCL ReceiveFrom DOES honor the timeout. On the surface it appears to be a BCL bug, but more likely it's something in my code.
Still looking for help on this one...
I'm actually looking for the reason my AsyncReceiveFrom (or the underlying BCL BeginReceiveFrom) does not honor the timeout, whereas a call to the synchronous BCL ReceiveFrom DOES honor the timeout. On the surface it appears to be a BCL bug, but more likely it's something in my code.
The docs clearly state that the timeout only applies to the sync versions:
[link:msdn.microsoft.com]
The docs clearly state that the timeout only applies to the sync versions
That's a shame... complicates the code quite a bit. Will post if I find a graceful solution.
After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!
See the complete async ping example, further below.
Not sure if the locks are necessary, but better safe than sorry...
Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.
See the complete async ping example, further below.
Not sure if the locks are necessary, but better safe than sorry...
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Synchronous Ping
let Ping (host : IPAddress, timeout : int ) =
let mutable ep = new IPEndPoint( host, 0 )
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let mutable buffer = packet.Bytes
try
if socket.SendTo( buffer, ep ) <= 0 then
raise (SocketException())
buffer <- Array.create (buffer.Length + 20) 0uy
let mutable epr = ep :> EndPoint
if socket.ReceiveFrom( buffer, &epr ) <= 0 then
raise (SocketException())
finally
socket.Close()
buffer
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
//---- Asynchronous Ping
let AsyncPing (ip : IPAddress, timeout : int ) =
async {
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.Connect( IPEndPoint( ip, 0 ) )
let pingTime = System.Diagnostics.Stopwatch()
let packet = EchoMessage()
let outbuffer = packet.Bytes
let err = ref (SocketError())
let isAlive = ref false
try
pingTime.Start()
let! result = socket.AsyncSend( outbuffer, 0, outbuffer.Length, SocketFlags.None, err )
pingTime.Stop()
if result <= 0 then
raise (SocketException(int(!err)))
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
pingTime.Start()
let! reply = socket.AsyncReceiveEx( inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout )
pingTime.Stop()
if result <= 0 && not (!err = SocketError.TimedOut) then
raise (SocketException(int(!err)))
isAlive := not (!err = SocketError.TimedOut)
&& inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?)
&& inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?)
finally
socket.Close()
return (ip, pingTime.Elapsed, !isAlive )
}
let main() =
let pings net =
seq {
for node in 0..255 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
yield Ping.AsyncPing( ip, 1000 )
}
for net in 0..255 do
pings net
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.filter ( fun (_,_,alive) -> alive )
|> Seq.iter ( fun (ip, time, alive) ->
printfn "%A %dms" ip time.Milliseconds)
main()
System.Console.ReadKey() |> ignore
Nice solution – but I’m wondering why you are switching from connectionless UDP to connection full TCP only in your latest example?
Topic tags
- f# × 3660
- compiler × 263
- functional × 199
- c# × 119
- websharper × 114
- classes × 96
- web × 94
- book × 84
- .net × 82
- async × 72
- parallel × 43
- server × 43
- parsing × 41
- testing × 41
- asynchronous × 30
- monad × 28
- ocaml × 26
- tutorial × 26
- haskell × 25
- workflows × 22
- html × 21
- linq × 21
- introduction × 19
- silverlight × 19
- wpf × 19
- fpish × 18
- collections × 14
- pipeline × 14
- templates × 12
- monads × 11
- opinion × 10
- reactive × 10
- plugin × 9
- scheme × 9
- sitelets × 9
- solid × 9
- basics × 8
- concurrent × 8
- deployment × 8
- how-to × 8
- python × 8
- complexity × 7
- javascript × 6
- jquery × 6
- lisp × 6
- real-world × 6
- workshop × 6
- xaml × 6
- conference × 5
- dsl × 5
- java × 5
- metaprogramming × 5
- ml × 5
- scala × 5
- visual studio × 5
- formlets × 4
- fsi × 4
- lift × 4
- sql × 4
- teaching × 4
- alt.net × 3
- aml × 3
- enhancement × 3
- list × 3
- reflection × 3
- blog × 2
- compilation × 2
- computation expressions × 2
- corporate × 2
- courses × 2
- cufp × 2
- enterprise × 2
- entity framework × 2
- erlang × 2
- events × 2
- f# interactive × 2
- fsc × 2
- google maps × 2
- html5 × 2
- http × 2
- interactive × 2
- interface × 2
- iphone × 2
- iteratee × 2
- jobs × 2
- keynote × 2
- mvc × 2
- numeric × 2
- obfuscation × 2
- oop × 2
- packaging × 2
- pattern matching × 2
- pipelines × 2
- rx × 2
- script × 2
- seq × 2
- sockets × 2
- stm × 2
- tcp × 2
- trie × 2
- type × 2
- type provider × 2
- xna × 2
- zh × 2
- .net interop × 1
- 2012 × 1
- abstract class × 1
- accumulator × 1
- active pattern × 1
- addin × 1
- agents × 1
- agile × 1
- android × 1
- anonymous object × 1
- appcelerator × 1
- architecture × 1
- array × 1
- arrays × 1
- asp.net 4.5 × 1
- asp.net mvc × 1
- asp.net mvc 4 × 1
- asp.net web api × 1
- aspnet × 1
- ast × 1
- b-tree × 1
- bistro × 1
- bug × 1
- camtasia studio × 1
- canvas × 1
- class × 1
- client × 1
- clojure × 1
- closures × 1
- cloud × 1
- cms × 1
- coding diacritics × 1
- color highlighting × 1
- combinator × 1
- confirm × 1
- constructor × 1
- continuation-passing style × 1
- coords × 1
- coursera × 1
- csla × 1
- css × 1
- data × 1
- database × 1
- declarative × 1
- delete × 1
- dhtmlx × 1
- discriminated union × 1
- distance × 1
- docs × 1
- documentation × 1
- dol × 1
- domain × 1
- du × 1
- duf-101 × 1
- eclipse × 1
- edsl × 1
- em algorithm × 1
- emacs × 1
- emotion × 1
- error × 1
- etw × 1
- euclidean × 1
- event × 1
- example × 1
- ext js × 1
- extension methods × 1
- extra × 1
- facet pattern × 1
- fantomas × 1
- fear × 1
- float × 1
- fp × 1
- frank × 1
- fsdoc × 1
- fsharp.core × 1
- fsharp.powerpack × 1
- fsharpx × 1
- function × 1
- functional style × 1
- gc × 1
- generic × 1
- geometry × 1
- getlastwin32error × 1
- google × 1
- group × 1
- hash × 1
- history × 1
- hosting × 1
- httpcontext × 1
- https × 1
- hubfs × 1
- ie 8 × 1
- if-doc × 1
- inheritance × 1
- installer × 1
- interpreter × 1
- io × 1
- ios × 1
- ipad × 1
- kendo × 1
- learning × 1
- licensing × 1
- macro × 1
- macros × 1
- maps × 1
- markup × 1
- marshal × 1
- math × 1
- metro style × 1
- micro orm × 1
- minimum-requirements × 1
- multidimensional × 1
- multithreading × 1
- mysql × 1
- mysqlclient × 1
- nancy × 1
- nested × 1
- nested loops × 1
- node × 1
- object relation mapper × 1
- object-oriented × 1
- offline × 1
- option × 1
- orm × 1
- osx × 1
- owin × 1
- paper × 1
- parameter × 1
- performance × 1
- persistent data structure × 1
- phonegap × 1
- pola × 1
- powerpack × 1
- prefix tree × 1
- principle of least authority × 1
- programming × 1
- projekt_feladat × 1
- protected × 1
- provider × 1
- ptvs × 1
- quant × 1
- quotations × 1
- range × 1
- raphael × 1
- razor × 1
- rc × 1
- real-time × 1
- reference × 1
- restful × 1
- round table × 1
- runtime × 1
- scriptcs × 1
- scripting × 1
- service × 1
- session-state × 1
- sitelet × 1
- stickynotes × 1
- stress × 1
- strong name × 1
- structures × 1
- tdd × 1
- template × 1
- tracing × 1
- tsunamiide × 1
- type inference × 1
- type providers × 1
- upload × 1
- vb × 1
- vb.net × 1
- vector × 1
- visual f# × 1
- visual studio 11 × 1
- visual studio shell × 1
- visualstudio × 1
- web api × 1
- webapi × 1
- windows 8 × 1
- windows-phone × 1
- winrt × 1
- xml × 1
|
Copyright (c) 2011-2012 IntelliFactory. All rights reserved. Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us |
Built with WebSharper |
Writing an asynchronous Ping using Raw Sockets in F#, to enable parallel requests using as few threads as possible. Not using "System.Net.NetworkInformation.Ping", because it appears to allocate one thread per request: ping-sweeping a class-B network creates 100's of threads and eventually results in an out-of-memory error. Am also interested in using F# async workflows.
The synchronous version below correctly times out when the target host does not exist/respond, but the asynchronous version hangs. Both work when the host does respond. Not sure if this is a .NET issue, or an F# one...
Any ideas?
(note: the process must run as Admin to allow Raw Socket access)
This throws a timeout:
However, this hangs:
let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 ) |> Async.RunSynchronouslyHere's the code...
module Ping open System open System.Net open System.Net.Sockets open System.Threading //---- ICMP Packet Classes type IcmpMessage (t : byte) = let mutable m_type = t let mutable m_code = 0uy let mutable m_checksum = 0us member this.Type with get() = m_type member this.Code with get() = m_code member this.Checksum = m_checksum abstract Bytes : byte array default this.Bytes with get() = [| m_type m_code byte(m_checksum) byte(m_checksum >>> 8) |] member this.GetChecksum() = let mutable sum = 0ul let bytes = this.Bytes let mutable i = 0 // Sum up uint16s while i < bytes.Length - 1 do sum <- sum + uint32(BitConverter.ToUInt16( bytes, i )) i <- i + 2 // Add in last byte, if an odd size buffer if i <> bytes.Length then sum <- sum + uint32(bytes.[ i ]) // Shuffle the bits sum <- (sum >>> 16) + (sum &&& 0xFFFFul) sum <- sum + (sum >>> 16) sum <- ~~~sum uint16(sum) member this.UpdateChecksum() = m_checksum <- this.GetChecksum() type InformationMessage (t : byte) = inherit IcmpMessage(t) let mutable m_identifier = 0us let mutable m_sequenceNumber = 0us member this.Identifier = m_identifier member this.SequenceNumber = m_sequenceNumber override this.Bytes with get() = Array.append (base.Bytes) [| byte(m_identifier) byte(m_identifier >>> 8) byte(m_sequenceNumber) byte(m_sequenceNumber >>> 8) |] type EchoMessage() = inherit InformationMessage( 8uy ) let mutable m_data = Array.create 32 32uy do base.UpdateChecksum() member this.Data with get() = m_data and set(d) = m_data <- d this.UpdateChecksum() override this.Bytes with get() = Array.append (base.Bytes) (this.Data) //---- Synchronous Ping let Ping (host : IPAddress, timeout : int ) = let mutable ep = new IPEndPoint( host, 0 ) let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp ) socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout ) socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout ) let packet = EchoMessage() let mutable buffer = packet.Bytes try if socket.SendTo( buffer, ep ) <= 0 then raise (SocketException()) buffer <- Array.create (buffer.Length + 20) 0uy let mutable epr = ep :> EndPoint if socket.ReceiveFrom( buffer, &epr ) <= 0 then raise (SocketException()) finally socket.Close() buffer //---- Entensions to the F# Async class to allow up to 5 paramters (not just 3) type Async with static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> = Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction) static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> = Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction) //---- Extensions to the Socket class to provide async SendTo and ReceiveFrom type System.Net.Sockets.Socket with member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) = Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP, this.BeginSendTo, this.EndSendTo ) member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) = Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP, this.BeginReceiveFrom, (fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) ) //---- Asynchronous Ping let AsyncPing (host : IPAddress, timeout : int ) = async { let ep = IPEndPoint( host, 0 ) use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp ) socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout ) socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout ) let packet = EchoMessage() let outbuffer = packet.Bytes try let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep ) if result <= 0 then raise (SocketException()) let epr = ref (ep :> EndPoint) let inbuffer = Array.create (outbuffer.Length + 256) 0uy let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr ) if result <= 0 then raise (SocketException()) return inbuffer finally socket.Close() }