http://www.c-sharpcorner.com/UploadFile/logisimo/107252009003250AM/1.aspx
The C# Asynchronous Programming Model
By Dave Richter July 27, 2009
The Asynchronous Programming Model is a single pattern that can be used to asynchronously execute both compute-bound and I/O operations. When used with the CLR's thread pool, asynchronous operations can take full advantage of all of the CPUs on a machine.
Preface
Input/Output operations are normally slower that other processing operations. Synchronous programming is when threads need to wait until they can execute until an I/O routine has completed. When threads can continue to execute without waiting for an operation to complete, we say that threads can perform asynchronous I/O. Stated loosely, asynchronous programming is allowing some portions of code to be executed on separate threads. This is referred to as the Asynchronous Programming Model (APM). Throughout the .NET Framework, many classes support using the APM by providing BeginXXX and EndXXX versions of methods. For example, the FileStream
class that is defined in the System.IO
namespace has a Read
method that reads data from a stream. To support the APM model, it also supports BeginRead
and EndRead
methods. This pattern of using BeginXXX and EndReadXXX methods allows you to execute methods asynchronously. The real-world programming strength of the APM is that is a single pattern can be used to asynchronously execute both compute-bound and I/O bound operations. This article purposes to describe the three "rendez-vous" techniques that comprise the APM. This article will start by examining how to use the APM to perform an asynchronous compute-bound operation. I will then describe the three styles of programming with the APM to deal with handling the end of the call in an asynchronous call: wait-until done, polling, and callback. The natural following is then the Thread Pool. Obviously anyone who has ever programmed in .NET is familiar with the ThreadPool
class. This class, when used in combination with the APM, is the way to create scalable and high-performance applications that do take advantage of the dual-core processor technology, the hyper-threading technology, and multiple processors. The reader will find that one of the focal points behind this entire article is the use of the IAsyncResult
interface to perform asynchronous calls.
The code given below is an example of the pattern of using BeginXXX and EndXXX methods to allow you to execute code asynchronously:
using System;
using System.IO;
using System.Threading;
public sealed class Program
{
public static void Main()
{
byte[] buffer = new byte[100];
string filename = String.Concat(Environment.SystemDirectory, "\\ntdll.dll");
FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read,
FileShare.Read, 1024, FileOptions.Asynchronous);
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, null, null);
int numBytes = fs.EndRead(result);
fs.Close();
Console.WriteLine("Read {0} Bytes:", numBytes);
Console.WriteLine(BitConverter.ToString(buffer));
}
}
Here is the output:
Read 100 Bytes:
4D-5A-90-00-03-00-00-00-04-00-00-00-FF-FF-00-00-B8-00-00-00-00-00-00-00-40-00-00
-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-0
0-00-00-00-00-00-00-D0-00-00-00-0E-1F-BA-0E-00-B4-09-CD-21-B8-01-4C-CD-21-54-68-
69-73-20-70-72-6F-67-72-61-6D-20-63-61-6E-6E-6F-74-20-62-65
To explain, the purpose of the example code above was to read some bytes (100) from a file stream asynchronously using the APM. First you have to construct a FileStream
object by calling one its constructors that accepts the System.IO.FileOptions
argument. For this argument, you have to pass in the FileOptions.Asynchronous
flag: this tells the FileStream
object that you intend to perform asynchronous read and write operations against the file. Note that a flag is simply just an algorithm that indicates the status of an operation. In this case, the FileOptions.Asynchronous
flag is set to a binary one. To synchronously read bytes from a FileStream
, you'd call its Read
method, which is prototyped as follows:
public Int32 Read(Byte[] array, Int32 offset, Int32 count)
The Read
method accepts a reference to a Byte[]
that will have its bytes filled with bytes from the file. The count
argument indicates the number of bytes that you need to read. The bytes will be placed in an array (buffer) between offset and (offset + count - 1). Note because we are dealing with a buffer, it will have a length. The Read
method returns the number of bytes actually read from the file. When you call the method , the read occurs synchronously. That is, the method does not return until the requested bytes have been read into the array. This is not a sound practice, as the timing of all I/O operations is unpredictable, and while waiting for the I/O to complete, the calling thread is suspended. A suspended thread is not capable of doing any work and is wasting resources. To asynchronously read bytes from a file, you'd call FileStream
's BeginRead
method:
IAsyncResult BeginRead(Byte[] array, Int32 offset, Int32 numBytes,AsyncCallback userCallback, object stateObject)
The Read
method looks similar to the BeginRead
method. The differences include that the BeginRead
method returns an IAsyncResult
instead of the number of bytes read and that the two parameters are added to the method signature to support the APM. These two parameters will be explained later when we look at the Callback style of handling APM.
The Wait-Until-Done Model
The Wait-Until-Done model allows you to start the asynchronous call and perform other work. Once the other work is done, you can attempt to end the call and it will block until the asynchronous call is complete. The code used above provides an example that nears this model. Let's look at this code closely:
using System;
using System.IO;
using System.Threading;
public sealed class Program
{
public static void Main()
{
byte[] buffer = new byte[100];
string filename = String.Concat(Environment.SystemDirectory, "\\ntdll.dll");
FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read,
FileShare.Read, 1024, FileOptions.Asynchronous);
// make the asynchronous call
fs.Read(buffer, 0, buffer.Length);
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, null, null);
// do some work here while you wait
//Calling EndRead will block until the Async work is complete
int numBytes = fs.EndRead(result);
// don't forget to close the stream
fs.Close();
Console.WriteLine("Read {0} Bytes:", numBytes);
Console.WriteLine(BitConverter.ToString(buffer));
}
}
The output is:
Read 100 Bytes:
20-72-75-6E-20-69-6E-20-44-4F-53-20-6D-6F-64-65-2E-0D-0D-0A-24-00-00-00-00-00-00
-00-34-CB-99-AF-70-AA-F7-FC-70-AA-F7-FC-70-AA-F7-FC-57-6C-8D-FC-71-AA-F7-FC-57-6
C-8A-FC-31-AA-F7-FC-57-6C-99-FC-5F-AA-F7-FC-57-6C-9A-FC-75-AB-F7-FC-57-6C-8B-FC-
71-AA-F7-FC-57-6C-8F-FC-71-AA-F7-FC-52-69-63-68-70-AA-F7-FC
It is nonsense to call a BeginXxx method and then immediately call an EndXxx method, because the calling thread just goes to sleep waiting for the operations to complete. But if you put some between the calls to BeginRead
and EndRead
, we see some of the APM's value because this other code would execute as the bytes are being read from the file:
using System;
using System.IO;
using System.Threading;
public static class Program
{
public static void Main()
{
//ReadMultipleFiles(@"C:\Windows\system32\autoexec.NT", @"c:\Point.cs");
// Open the file indicating asynchronous I/O
FileStream fs = new FileStream(@"C:\windows\system32\autoexec.NT", FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
Byte[] data = new Byte[100];
// Initiate an asynchronous read operation against the FileStream
IAsyncResult ar = fs.BeginRead(data, 0, data.Length, null, null);
// Executing some other code here would be useful...
// Suspend this thread until the asynchronous
// operation completes and get the result
Int32 bytesRead = fs.EndRead(ar);
// No other operations to do, close the file
fs.Close();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead);
Console.WriteLine(BitConverter.ToString(data, 0, bytesRead));
}
private static void ReadMultipleFiles(params String[] pathnames)
{
AsyncStreamRead[] asrs = new AsyncStreamRead[pathnames.Length];
for (Int32 n = 0; n < pathnames.Length; n++)
{
// Open the file indicating asynchronous I/O
Stream stream = new FileStream(pathnames[n], FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
// Initiate an asynchronous read operation against the Stream
asrs[n] = new AsyncStreamRead(stream, 100);
}
// All streams have been opened and all read requests have been
// queued; they are all executing concurrently!
// Now, let's get and display the results
for (Int32 n = 0; n < asrs.Length; n++)
{
Byte[] bytesRead = asrs[n].EndRead();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead.Length);
Console.WriteLine(BitConverter.ToString(bytesRead));
}
}
private sealed class AsyncStreamRead
{
private Stream m_stream;
private IAsyncResult m_ar;
private Byte[] m_data;
public AsyncStreamRead(Stream stream, Int32 numBytes)
{
m_stream = stream;
m_data = new Byte[numBytes];
// Initiate an asynchronous read operation against the Stream
m_ar = stream.BeginRead(m_data, 0, numBytes, null, null);
}
public Byte[] EndRead()
{
// Suspend this thread until the asynchronous
// operation completes and get the result
Int32 numBytesRead = m_stream.EndRead(m_ar);
// No other operations to do, close the stream
m_stream.Close();
// Resize the array to save space
Array.Resize(ref m_data, numBytesRead);
// Return the bytes
return m_data;
}
}
}
Here is the output:
Number of bytes read=100
40-65-63-68-6F-20-6F-66-66-0D-0A-0D-0A-52-45-4D-20-41-55-54-4F-45-58-45-43-2E-42
-41-54-20-69-73-20-6E-6F-74-20-75-73-65-64-20-74-6F-20-69-6E-69-74-69-61-6C-69-7
A-65-20-74-68-65-20-4D-53-2D-44-4F-53-20-65-6E-76-69-72-6F-6E-6D-65-6E-74-2E-0D-
0A-52-45-4D-20-41-55-54-4F-45-58-45-43-2E-4E-54-20-69-73-20
The Polling Model
The polling method is similar, with the exception that code will poll the IAsyncResult
to see whether it has completed. Below are two examples: one basic sample and a more sophisticated sample. The thing to note and keep in mind is that by calling the IsCompleted
property on the IAsyncResult
object returned by the BeginRead
, we can continue to do work as necessary until the operation is complete:
using System;
using System.IO;
using System.Threading;
public sealed class Program
{
public static void Main()
{
byte[] buffer = new byte[100];
string filename = String.Concat(Environment.SystemDirectory, "\\ntdll.dll");
FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read,
FileShare.Read, 1024, FileOptions.Asynchronous);
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, null, null);
while (!result.IsCompleted)
{
// do some work here if the call isn't completed
// you know, execute a code block or something
Thread.Sleep(100);
}
int numBytes = fs.EndRead(result);
fs.Close();
Console.WriteLine("Read {0} Bytes:", numBytes);
Console.WriteLine(BitConverter.ToString(buffer));
}
}
The APM's Method Callback Rendezvous Technique
The Callback model requires that we specify a method to callback on and include any state that we need in the callback method to complete the call. Primarily you queue up an asynchronous I/O request, and then your thread continues doing whatever it wants to do. When the I/O request completes, Windows queues up a work item in the CLR's thread pool. Eventually, a thread pool thread will dequeue the work item and call some method you have written; this is how you know that asynchronous I/O operation has completed. Now, inside that callback method, you first call the EndXxx method to obtain the result of the asynchronous operation, and then the method is free to continue processing the result. When the method returns, the thread pool thread goes back into the pool ready to service another queued work item (or it waits until one shows up). Having said that, let's review the prototype for FileStream
's BeginRead
method:
IAsyncResult BeginRead(Byte[] array, Int32 offset, Int32 numBytes, AsyncCallback userCallback, Object stateObject)
Like BeginRead
, every BeginXxx method's last two parameters are the same: a System.AsyncCallback
and an Object
. AsyncCallback
is a delegate type defined as follows:
delegate void AsyncCallback(IAsyncResult ar);
This delegate indicates the signature required by the callback method that you must implement. Here is example code that was referenced from Jeffrey Richter's book "The CLR via C#":
// (code has been slightly revised)
using System;
using System.IO;
using System.Threading;
public static class Program
{
// The array is static so it can be accessed by Main and ReadIsDone
private static Byte[] s_data = new Byte[100];
public static void Main()
{
ReadMultipleFiles(@"C:\Windows\System32\config.NT", @"C:\point.cs");
APMCallbackUsingAnonymousMethod();
// Show the ID of the thread executing Main
Console.WriteLine("Main thread ID={0}",
Thread.CurrentThread.ManagedThreadId);
//ReadMultipleFiles(@"C:\Windows\System32\Config.NT", @"c:\Point.cs");
// Open the file indicating asynchronous I/O
FileStream fs = new FileStream(@"C:\Windows\System32\config.NT", FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
// Initiate an asynchronous read operation against the FileStream
// Pass the FileStream (fs) to the callback method (ReadIsDone)
fs.BeginRead(s_data, 0, s_data.Length, ReadIsDone, fs);
// Executing some other code here would be useful...
// For this demo, I'll just suspend the primary thread
Console.ReadLine();
}
private static void ReadIsDone(IAsyncResult ar)
{
// Show the ID of the thread executing ReadIsDone
Console.WriteLine("ReadIsDone thread ID={0}",
Thread.CurrentThread.ManagedThreadId);
// Extract the FileStream (state) out of the IAsyncResult object
FileStream fs = (FileStream)ar.AsyncState;
// Get the result
Int32 bytesRead = fs.EndRead(ar);
// No other operations to do, close the file
fs.Close();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead);
Console.WriteLine(BitConverter.ToString(s_data, 0, bytesRead));
}
private static void APMCallbackUsingAnonymousMethod()
{
// Show the ID of the thread executing Main
Console.WriteLine("Main thread ID={0}",
Thread.CurrentThread.ManagedThreadId);
// Open the file indicating asynchronous I/O
FileStream fs = new FileStream(@"C:\Windows\System32\config.NT", FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
Byte[] data = new Byte[100];
// Initiate an asynchronous read operation against the FileStream
// Pass the FileStream (fs) to the callback method (ReadIsDone)
fs.BeginRead(data, 0, data.Length,
delegate(IAsyncResult ar)
{
// Show the ID of the thread executing ReadIsDone
Console.WriteLine("ReadIsDone thread ID={0}",
Thread.CurrentThread.ManagedThreadId);
// Get the result
Int32 bytesRead = fs.EndRead(ar);
// No other operations to do, close the file
fs.Close();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead);
Console.WriteLine(BitConverter.ToString(data, 0, bytesRead));
}, null);
// Executing some other code here would be useful...
// For this demo, I'll just suspend the primary thread
Console.ReadLine();
}
private static void ReadMultipleFiles(params String[] pathnames)
{
for (Int32 n = 0; n < pathnames.Length; n++)
{
// Open the file indicating asynchronous I/O
Stream stream = new FileStream(pathnames[n], FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
// Initiate an asynchronous read operation against the Stream
new AsyncStreamRead(stream, 100,
delegate(Byte[] data)
{
// Process the data.
Console.WriteLine("Number of bytes read={0}", data.Length);
Console.WriteLine(BitConverter.ToString(data));
});
}
// All streams have been opened and all read requests have been
// queued; they are all executing concurrently and they will be
// processed as they complete!
// The primary thread could do other stuff here if it wants to...
// For this demo, I'll just suspend the primary thread
Console.ReadLine();
}
private delegate void StreamBytesRead(Byte[] streamData);
private sealed class AsyncStreamRead
{
private Stream m_stream;
private Byte[] m_data;
StreamBytesRead m_callback;
public AsyncStreamRead(Stream stream, Int32 numBytes,
StreamBytesRead callback)
{
m_stream = stream;
m_data = new Byte[numBytes];
m_callback = callback;
// Initiate an asynchronous read operation against the Stream
stream.BeginRead(m_data, 0, numBytes, ReadIsDone, null);
}
// Called when IO operation completes
private void ReadIsDone(IAsyncResult ar)
{
Int32 numBytesRead = m_stream.EndRead(ar);
// No other operations to do, close the stream
m_stream.Close();
// Resize the array to save space
Array.Resize(ref m_data, numBytesRead);
// Call the application's callback method
m_callback(m_data);
}
}
}
The output is:
Number of bytes read=100
52-45-4D-20-57-69-6E-64-6F-77-73-20-4D-53-2D-44-4F-53-20-53-74-61-72-74-75-70-20
-46-69-6C-65-0D-0A-52-45-4D-0D-0A-52-45-4D-20-43-4F-4E-46-49-47-2E-53-59-53-20-7
6-73-20-43-4F-4E-46-49-47-2E-4E-54-0D-0A-52-45-4D-20-43-4F-4E-46-49-47-2E-53-59-
53-20-69-73-20-6E-6F-74-20-75-73-65-64-20-74-6F-20-69-6E-69
Number of bytes read=100
75-73-69-6E-67-20-53-79-73-74-65-6D-3B-0D-0A-0D-0A-70-75-62-6C-69-63-20-73-74-61
-74-69-63-20-63-6C-61-73-73-20-50-72-6F-67-72-61-6D-20-7B-0D-0A-20-20-20-70-75-6
2-6C-69-63-20-73-74-61-74-69-63-20-76-6F-69-64-20-4D-61-69-6E-28-73-74-72-69-6E-
67-5B-5D-20-61-72-67-73-29-20-7B-0D-0A-20-20-20-20-20-20-56
Main thread ID=1
ReadIsDone thread ID=4
Number of bytes read=100
52-45-4D-20-57-69-6E-64-6F-77-73-20-4D-53-2D-44-4F-53-20-53-74-61-72-74-75-70-20
-46-69-6C-65-0D-0A-52-45-4D-0D-0A-52-45-4D-20-43-4F-4E-46-49-47-2E-53-59-53-20-7
6-73-20-43-4F-4E-46-49-47-2E-4E-54-0D-0A-52-45-4D-20-43-4F-4E-46-49-47-2E-53-59-
53-20-69-73-20-6E-6F-74-20-75-73-65-64-20-74-6F-20-69-6E-69
Using the Thread Pool
This section is going to assume no prior knowledge of creating threads for the sake of leading a pathway to understanding the use of the thread pool. In its simplest form, creating a thread in .NET begins with these four steps:
- You create a method that takes no arguments, nor returns any data.
- You then create a new
ThreadStart
delegate, specifying the method in step 1. - You then create a new
Thread
object, specifying theThreadStart
delegate object in step 2. - Call
Thread.Start
to begin execution of the new thread.
Here is an example:
using System;
using System.Threading;
public class App
{
public static void Main()
{
ThreadStart task = new ThreadStart(BasicWork);
Thread myThread = new Thread(task);
myThread.Start();
}
static void BasicWork()
{
Console.WriteLine("Thread: {0}", Thread.CurrentThread.ManagedThreadId);
}
}
In real world threading, we need to pass information to individual threads. Notice the above example uses the ThreadStart
delegate, which takes no parameters. OK. So we need to pass data to threads. How? By using a new delegate called ParameterizedThreadStart
. This delegate specifies a method signature with a single parameter of type Object
and returns nothing. Here is a code example of passing data to a thread:
using System;
using System.Threading;
public class App
{
public static void Main() {
ParameterizedThreadStart task = new ParameterizedThreadStart(WorkWithParameter;
Thread myThread = new Thread(task);
myThread.Start("Whatcha doin?");
Thread newThread = new Thread(task);
newThread.Start("Nuthin much");
}
static void WorkWithParameter(object o)
{
string info = (string)o;
for (int x = 0; x < 10; ++x)
{
Console.WriteLine("{0}: {1}", info, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
}
}
}
Here is the output:
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
Whatcha doin?: 3
Nuthin much: 4
. . . . . . .
In a lot of cases, it is not necessary to create your own threads. Professional documentation does not even suggest it. The threading in .NET supports a built-in thread pool that can be used in many cases where you might think that you have to create your own threads. Recall the static
method used in the previous example to pass data to a thread:
static void WorkWithParameter(object o)
{
string info = (string)o;
for (int x = 0; x < 10; ++x)
{
Console.WriteLine("{0}: {1}", info, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
}
}
Instead of creating a new thread and controlling it, we use the ThreadPool
to do this work by using its QueueUserWorkItem
method:
WaitCallback workItem = new WaitCallback(WorkWithParameter);
If (!ThreadPool.QueueUserWorkItem(workItem, "ThreadPooled"))
{
Console.WriteLine("Could not queue item");
}
.NET maintains a set of threads of threads that can reused in your application. In real world programming, apparently cost is a factor, and creating threads can cost. This pool of threads is faster because the threads in it can be reused as necessary, saving expensive setup costs. In addition, it helps throttle the number of threads running at any one time in a process by queuing up the work to be performed. As threads are available, the thread pool posts the new work to the thread. In fact, the CLR's thread pool allows developers to set a maximum number of worker threads and I/O threads. Now a compute-bound operation is an operation that requires computation. Examples include recalculating cells in a spreadsheet application and spell-checking words or grammar-checking sentences in a word processing application. To make a reference to CLR expert Jeffrey Richter, a compute-bound operation will not perform any synchronous I/O operation because all synchronous I/O operations suspend the calling thread while the underlying hardware (disk drive, network card, etc.) performs the work. This should establish a clear guideline, as I/O operations are inherently slow compared with other processing due to delays caused by track and sector seek time on random access devices (such as discs , DVDs, etc.), delays caused by the relatively slow data transfer rate between a physical device and system memory, and delays in network data transfer using file servers, storage area networks, and so on. If a concentrically-shaped hard disk spins underneath a read/write head at a rate of rounds per second while a microprocessor ticks at several billions of clock cycles per second, then yes, there will be delays if the operations are synchronous. If a thread is suspended then it is not running, but remains consuming resources. To queue an asynchronous compute-bound operation to the thread pool class, you typically call one of the following methods:
static Boolean QueueUserWorkItem(WaitCallback callback);
static Boolean QueueUserWorkItem(WaitCallback callback, object state);
static Boolean UnsafeQueueUserWorkItem(WaitCallback callback, object state);
These methods queue a "work item" to the thread pool's queue, and then all of these methods return immediately. A work item is just a method identified by the CallBack
parameter that will be called via a thread pool thread. The following code demonstrates how to have a thread pool call a method asynchronously:
using System;
using System.Threading;
public static class Program
{
public static void Main()
{
Console.WriteLine("Main thread: queuing an asynchronous operation");
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5);
Console.WriteLine("Main thread: Doing other work here...");
Thread.Sleep(10000); // Simulating other work (10 seconds)
Console.ReadLine();
}
// This method's signature must match the WaitCallback delegate
private static void ComputeBoundOp(Object state)
{
// This method is executed by a thread pool thread
Console.WriteLine("In ComputeBoundOp: state={0}", state);
Thread.Sleep(1000); // Simulates other work (1 second)
// When this method returns, the thread goes back
// to the pool and waits for another task
}
}
And here is the output (in this case):
Main thread: queuing an asynchronous operation
Main thread: Doing other work here...
In ComputeBoundOp: state=5
The difference in the order of the lines of output is attributed to the fact that the two methods are running asynchronously with respect to each other. Understanding the APM and how to use the built-in thread pool not only leads to applications that run with a higher performance, but also paves the way to the ever-important topic of I/O completion ports. I/O completion ports are part of the foundation of building scalable server applications.