您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關c# 利用Task如何實現一個非阻塞式I/O,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
1、異步讀取文件數據
public static void TaskFromIOStreamAsync(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true); Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length); task.ContinueWith((readTask) => { int amountRead = readTask.Result; //必須在ContinueWith中釋放文件流 fileStream.Dispose(); Console.WriteLine($"Async(Simple) Read {amountRead} bytes"); }); }
上述代碼中,異步讀取數據只讀取了一次,完成讀取后就將執行權交還主線程了。但在真實場景中,需要從流中讀取多次才能獲得全部的數據(如文件數據大于給定緩沖區大小,或處理來自網絡流的數據(數據還沒全部到達機器))。因此,為了完成異步讀取操作,需要連續從流中讀取數據,直到獲取所需全部數據。
上述問題導致需要兩級Task來處理。外層的Task用于全部的讀取工作,供調用程序使用。內層的Task用于每次的讀取操作。
第一次異步讀取會返回一個Task。如果直接返回調用Wait或者ContinueWith的地方,會在第一次讀取結束后繼續向下執行。實際上是希望調用者在完成全部讀取操作后才執行。因此,不能把第一個Task發布會給調用者,需要一個“偽Task”在完成全部讀取操作后再返回。
上述問題需要使用到TaskCompletionSource<T>類解決,該類可以生成一個用于返回的“偽Task”。當異步讀取操作全部完成后,調用其對象的TrySetResult,讓Wait或ContinueWith的調用者繼續執行。
public static Task<long> AsynchronousRead(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; //創建一個返回的偽Task對象 TaskCompletionSource<long> tcs = new TaskCompletionSource<long>(); MemoryStream fileContents = new MemoryStream();//用于保存讀取的內容 FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true); fileContents.Capacity += chunkSize;//指定緩沖區大小。好像Capacity會自動增長,設置與否沒關系,后續寫入多少數據,就增長多少 Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length); task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); //在ContinueWith中循環讀取,讀取完成后,再返回tcs的Task return tcs.Task; } /// <summary> /// 繼續讀取數據 /// </summary> /// <param name="task">讀取數據的線程</param> /// <param name="fileStream">文件流</param> /// <param name="fileContents">文件存放位置</param> /// <param name="buffer">讀取數據緩存</param> /// <param name="tcs">偽Task對象</param> private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs) { if (task.IsCompleted) { int bytesRead = task.Result; fileContents.Write(buffer, 0, bytesRead);//寫入內存區域。似乎Capacity會自動增長 if (bytesRead > 0) { //雖然看似是一個新的任務,但是使用了ContinueWith,所以使用的是同一個線程。 //沒有讀取完,開啟另一個異步繼續讀取 Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length); //此處做了一個循環 newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); } else { //已經全部讀取完,所以需要返回數據 tcs.TrySetResult(fileContents.Length); fileStream.Dispose(); fileContents.Dispose();//應該是在使用了數據之后才釋放數據緩沖區的數據 } } }
2、適應Task的異步編程模式
.NET Framework中的舊版異步方法都帶有“Begin-”和“End-”前綴。這些方法仍然有效,為了接口的一致性,它們可以被封裝到Task中。
FromAsyn方法把流的BeginRead和EndRead方法作為參數,再加上存放數據的緩沖區。BeginRead和EndRead方法會執行,并在EndRead完成后調用Continuation Task,把控制權交回主代碼。上述例子會關閉流并返回轉換的數據
const int ReadSize = 256; /// <summary> /// 從文件中獲取字符串 /// </summary> /// <param name="path">文件路徑</param> /// <returns>字符串</returns> public static Task<string> GetStringFromFile(string path) { FileInfo file = new FileInfo(path); byte[] buffer = new byte[1024];//存放數據的緩沖區 FileStream fileStream = new FileStream( path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length, FileOptions.DeleteOnClose | FileOptions.Asynchronous); Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, ReadSize, null);//此參數為BeginRead需要的參數 TaskCompletionSource<string> tcs = new TaskCompletionSource<string>(); task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs)); return tcs.Task; } /// <summary> /// 讀取數據 /// </summary> /// <param name="taskRead">讀取任務</param> /// <param name="fileStream">文件流</param> /// <param name="buffer">讀取數據存放位置</param> /// <param name="offset">讀取偏移量</param> /// <param name="tcs">偽Task</param> private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs) { int readLength = taskRead.Result; if (readLength > 0) { int newOffset = offset + readLength; Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null); task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs)); } else { tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length)); fileStream.Dispose(); } }
3、使用async 和 await方式讀取數據
下面的示例中,使用了async和await關鍵字實現異步讀取一個文件的同時進行壓縮并寫入另一個文件。所有位于await關鍵字之前的操作都運行于調用者線程,從await開始的操作都是在Continuation Task中運行。但有無法使用這兩個關鍵字的場合:①Task的結束時機不明確時;②必須用到多級Task和TaskCompletionSource時
/// <summary> /// 同步方法的壓縮 /// </summary> /// <param name="lstFiles">文件清單</param> public static void SyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = File.OpenRead(file)) { using (FileStream outputStream = File.OpenWrite(file + ".compressed")) { using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress)) { int read = 0; while((read=inputStream.Read(buffer,0,buffer.Length))>0) { compressStream.Write(buffer, 0,read); } } } } } } /// <summary> /// 異步方法的文件壓縮 /// </summary> /// <param name="lstFiles">需要壓縮的文件</param> /// <returns></returns> public static async Task AsyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = File.OpenRead(file)) { using (FileStream outputStream = File.OpenWrite(file + ".compressed")) { using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress)) { int read = 0; while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0) { await compressStream.WriteAsync(buffer, 0, read); } } } } } }
看完上述內容,你們對c# 利用Task如何實現一個非阻塞式I/O有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。