亚洲综合社区欧美综合色-欧美逼逼一区二区三区-国产老熟女高潮精品网站-国产日韩最新视频在线看

始創(chuàng)于2000年 股票代碼:831685
咨詢熱線:0371-60135900 注冊有禮 登錄
  • 掛牌上市企業(yè)
  • 60秒人工響應(yīng)
  • 99.99%連通率
  • 7*24h人工
  • 故障100倍補(bǔ)償
全部產(chǎn)品
您的位置: 網(wǎng)站首頁 > 幫助中心>文章內(nèi)容

細(xì)說多線程(七) —— 并行編程與PLINQ

發(fā)布時(shí)間:  2012/9/16 6:38:20

目錄

一、線程的定義

二、線程的基礎(chǔ)知識

三、以ThreadStart方式實(shí)現(xiàn)多線程

四、CLR線程池的工作者線程

五、CLR線程池的I/O線程

六、異步 SqlCommand

七、并行編程與PLINQ

八、計(jì)時(shí)器與鎖

 

七、并行編程與PLINQ

要使用多線程開發(fā),必須非常熟悉Thread的使用,而且在開發(fā)過程中可能會面對很多未知的問題。為了簡化開發(fā),.NET 4.0 特別提供一個(gè)并行編程庫System.Threading.Tasks,它可以簡化并行開發(fā),你無需直接跟線程或線程池打交道,就可以簡單建立多線程應(yīng)用 程序。此外,.NET還提供了新的一組擴(kuò)展方法PLINQ,它具有自動分析查詢功能,如果并行查詢能提高系統(tǒng)效率,則同時(shí)運(yùn)行,如果查詢未能從并行查詢中 受益,則按原順序查詢。下面將詳細(xì)介紹并行操作的方式。

 

7.1 泛型委托

使用并行編程可以同時(shí)操作多個(gè)委托,在介紹并行編程前先簡單介紹一下兩個(gè)泛型委托System.Func<>與System.Action<>。

Func<>是一個(gè)能接受多個(gè)參數(shù)和一個(gè)返回值的泛型委托,它能接受0個(gè)到4個(gè)輸入?yún)?shù), 其中 T1,T2,T3,T4 代表自定的輸入類型,TResult為自定義的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)

Action<>與Func<>十分相似,不同在于Action<>的返回值為void,Action能接受1~16個(gè)參數(shù)
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

 

7.2 任務(wù)并行庫(TPL)

System.Threading.Tasks中的類被統(tǒng)稱為任務(wù)并行庫(Task Parallel Library,TPL),TPL使用CLR線程池把工作分配到CPU,并能自動處理工作分區(qū)、線程調(diào)度、取消支持、狀態(tài)管理以及其他低級別的細(xì)節(jié)操作,極大地簡化了多線程的開發(fā)。

注意:TPL比Thread更具智能性,當(dāng)它判斷任務(wù)集并沒有從并行運(yùn)行中受益,就會選擇按順序運(yùn)行。但并非所有的項(xiàng)目都適合使用并行開發(fā),創(chuàng)建過多并行任務(wù)可能會損害程序的性能,降低運(yùn)行效率。

TPL包括常用的數(shù)據(jù)并行與任務(wù)并行兩種執(zhí)行方式:

7.2.1 數(shù)據(jù)并行

數(shù)據(jù)并行的核心類就是System.Threading.Tasks.Parallel,它包含兩個(gè)靜態(tài)方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。通過這兩個(gè)方法可以并行處理System.Func<>、 System.Action<>委托。

以下一個(gè)例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對List<Person>進(jìn)行并行查詢。
假設(shè)使用單線程方式查詢3個(gè)Person對象,需要用時(shí)大約6秒,在使用并行方式,只需使用2秒就能完成查詢,而且能夠避開Thread的繁瑣處理。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //設(shè)置最大線程數(shù)
6 ThreadPool.SetMaxThreads(1000, 1000); 7 //并行查詢
8 Parallel.For(0, 3,n => 9 { 10 Thread.Sleep(2000); //模擬查詢 11 ThreadPoolMessage(GetPersonList()[n]); 12 }); 13 Console.ReadKey(); 14 } 15 16 //模擬源數(shù)據(jù)
17 static IList<Person> GetPersonList() 18 { 19 var personList = new List<Person>(); 20 21 var person1 = new Person(); 22 person1.ID = 1; 23 person1.Name = "Leslie"; 24 person1.Age = 30; 25 personList.Add(person1); 26 ........... 27 return personList; 28 } 29 30 //顯示線程池現(xiàn)狀
31 static void ThreadPoolMessage(Person person) 32 { 33 int a, b; 34 ThreadPool.GetAvailableThreads(out a, out b); 35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 37 " CompletionPortThreads is :{5}\n", 38 person.ID, person.Name, person.Age, 39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 40 41 Console.WriteLine(message); 42 } 43 }

觀察運(yùn)行結(jié)果,對象并非按照原排列順序進(jìn)行查詢,而是使用并行方式查詢。

 

若想停止操作,可以利用ParallelLoopState參數(shù),下面以ForEach作為例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source為數(shù)據(jù) 集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState參數(shù)當(dāng)中包含有 Break()和 Stop()兩個(gè)方法都可以使迭代停止。Break的使用跟傳統(tǒng)for里面的使用方式相似,但因?yàn)樘幱诓⑿刑幚懋?dāng)中,使用Break并不能保證所有運(yùn)行能 立即停止,在當(dāng)前迭代之前的迭代會繼續(xù)執(zhí)行。若想立即停止操作,可以使用Stop方法,它能保證立即終止所有的操作,無論它們是處于當(dāng)前迭代的之前還是之 后。

     class Program
     {
          static void Main(string[] args)
          {
              //設(shè)置最大線程數(shù)
ThreadPool.SetMaxThreads(1000, 1000); //并行查詢
Parallel.ForEach(GetPersonList(), (person, state) => { if (person.ID == 2) state.Stop(); ThreadPoolMessage(person); }); Console.ReadKey(); } //模擬源數(shù)據(jù)
static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } //顯示線程池現(xiàn)狀
static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

觀察運(yùn)行結(jié)果,當(dāng)Person的ID等于2時(shí),運(yùn)行將會停止。

 

當(dāng)要在多個(gè)線程中調(diào)用本地變量,可以使用以下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一個(gè)參數(shù)為數(shù)據(jù)集;
第二個(gè)參數(shù)是一個(gè)Func委托,用于在每個(gè)線程執(zhí)行前進(jìn)行初始化;
第 三個(gè)參數(shù)是委托Func<Of T1,T2,T3,TResult>,它能對數(shù)據(jù)集的每個(gè)成員進(jìn)行迭代,當(dāng)中T1是數(shù)據(jù)集的成員,T2是一個(gè)ParallelLoopState對 象,它可以控制迭代的狀態(tài),T3是線程中的本地變量;
第四個(gè)參數(shù)是一個(gè)Action委托,用于對每個(gè)線程的最終狀態(tài)進(jìn)行最終操作。

在以下例子中,使用ForEach計(jì)算多個(gè)Order的總體價(jià)格。在ForEach方法中,首先把參數(shù)初始化為0f,然后用把同一個(gè)Order的多 個(gè)OrderItem價(jià)格進(jìn)行累加,計(jì)算出Order的價(jià)格,最后把多個(gè)Order的價(jià)格進(jìn)行累加,計(jì)算出多個(gè)Order的總體價(jià)格。

 1     public class Order
 2     {
 3         public int ID;
 4         public float Price;
 5     }
 6 
 7     public class OrderItem
 8     {
 9         public int ID;
10         public string Goods;
11         public int OrderID;
12         public float Price;
13         public int Count;
14     }
15 
16     class Program
17     {
18         static void Main(string[] args)
19         {
20             //設(shè)置最大線程數(shù)
21 ThreadPool.SetMaxThreads(1000, 1000); 22 float totalPrice = 0f; 23 //并行查詢
24 var parallelResult = Parallel.ForEach(GetOrderList(), 25 () => 0f, //把參數(shù)初始值設(shè)為0
26 (order, state, orderPrice) => 27 { 28 //計(jì)算單個(gè)Order的價(jià)格
29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID) 30 .Sum(item => item.Price * item.Count); 31 order.Price = orderPrice; 32 ThreadPoolMessage(order); 33 34 return orderPrice; 35 }, 36 (finallyPrice) => 37 { 38 totalPrice += finallyPrice;//計(jì)算多個(gè)Order的總體價(jià)格
39 } 40 ); 41 42 while (!parallelResult.IsCompleted) 43 Console.WriteLine("Doing Work!"); 44 45 Console.WriteLine("Total Price is:" + totalPrice); 46 Console.ReadKey(); 47 } 48 //虛擬數(shù)據(jù) 49 static IList<Order> GetOrderList() 50 { 51 IList<Order> orderList = new List<Order>(); 52 Order order1 = new Order(); 53 order1.ID = 1; 54 orderList.Add(order1); 55 ............ 56 return orderList; 57 } 58 //虛擬數(shù)據(jù) 59 static IList<OrderItem> GetOrderItem() 60 { 61 IList<OrderItem> itemList = new List<OrderItem>(); 62 63 OrderItem orderItem1 = new OrderItem(); 64 orderItem1.ID = 1; 65 orderItem1.Goods = "iPhone 4S"; 66 orderItem1.Price = 6700; 67 orderItem1.Count = 2; 68 orderItem1.OrderID = 1; 69 itemList.Add(orderItem1); 70 ........... 71 return itemList; 72 } 73 74 //顯示線程池現(xiàn)狀
75 static void ThreadPoolMessage(Order order) 76 { 77 int a, b; 78 ThreadPool.GetAvailableThreads(out a, out b); 79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" + 80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" + 81 " CompletionPortThreads is:{4}\n", 82 order.ID, order.Price, 83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 84 85 Console.WriteLine(message); 86 } 87 }

運(yùn)行結(jié)果

 

 7.2.2 任務(wù)并行

在TPL當(dāng)中還可以使用Parallel.Invoke方法觸發(fā)多個(gè)異步任務(wù),其中 actions 中可以包含多個(gè)方法或者委托,parallelOptions用于配置Parallel類的操作。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke并行查詢多個(gè)Person,actions當(dāng)中可以綁定方法、lambda表達(dá)式或者委托,注意綁定方法時(shí)必須是返回值為void的無參數(shù)方法。

     class Program
     {
         static void Main(string[] args)
         {
             //設(shè)置最大線程數(shù)
ThreadPool.SetMaxThreads(1000, 1000); //任務(wù)并行
Parallel.Invoke(option, PersonMessage, ()=>ThreadPoolMessage(GetPersonList()[1]), delegate(){ ThreadPoolMessage(GetPersonList()[2]); }); Console.ReadKey(); } static void PersonMessage() { ThreadPoolMessage(GetPersonList()[0]); } //顯示線程池現(xiàn)狀
static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } //模擬源數(shù)據(jù)
static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } }

運(yùn)行結(jié)果

 


7.3 Task簡介

以Thread創(chuàng)建的線程被默認(rèn)為前臺線程,當(dāng)然你可以把線程IsBackground屬性設(shè)置為true,但TPL為此提供了一個(gè)更簡單的類Task。
Task存在于System.Threading.Tasks命名空間當(dāng)中,它可以作為異步委托的簡單替代品。
通過Task的Factory屬性將返回TaskFactory類,以TaskFactory.StartNew(Action)方法可以創(chuàng)建一個(gè)新線程,所創(chuàng)建的線程默認(rèn)為后臺線程。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             ThreadPool.SetMaxThreads(1000, 1000);
 6             Task.Factory.StartNew(() => ThreadPoolMessage());
 7             Console.ReadKey();
 8         }
 9 
10         //顯示線程池現(xiàn)狀
11 static void ThreadPoolMessage() 12 { 13 int a, b; 14 ThreadPool.GetAvailableThreads(out a, out b); 15 string message = string.Format("CurrentThreadId is:{0}\n" + 16 "CurrentThread IsBackground:{1}\n" + 17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n", 18 Thread.CurrentThread.ManagedThreadId, 19 Thread.CurrentThread.IsBackground.ToString(), 20 a.ToString(), b.ToString()); 21 Console.WriteLine(message); 22 } 23 }

運(yùn)行結(jié)果

 

 

若要取消處理,可以利用CancellationTakenSource對象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在 方法中加入CancellationTakenSource對象的CancellationToken屬性,可以控制任務(wù)的運(yùn)行,調(diào)用 CancellationTakenSource.Cancel時(shí)任務(wù)就會自動停止。下面以圖片下載為例子介紹一下TaskFactory的使用。

服務(wù)器端頁面

 <html xmlns="http://www.w3.org/1999/xhtml">
 <head runat="server">
     <title></title>
     <script type="text/C#" runat="server">
private static List<string> url=new List<string>();

protected void Page_Load(object sender, EventArgs e)
{
if (!Page.IsPostBack)
{
url.Clear();
Application["Url"] = null;
}
}

protected void CheckBox_CheckedChanged(object sender, EventArgs e)
{
CheckBox checkBox = (CheckBox)sender;
if (checkBox.Checked)
url.Add(checkBox.Text);
else
url.Remove(checkBox.Text);
Application["Url"]= url;
}
</script> </head> <body> <form id="form1" runat="server" > <div align="left"> <div align="center" style="float: left;"> <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br /> <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br /> <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br /> <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br /> <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br /> <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" /> </div> </div> </form> </body> </html>

首先在服務(wù)器頁面中顯示多個(gè)*.jpg圖片,每個(gè)圖片都有對應(yīng)的CheckBox檢測其選擇情況。
所選擇圖片的路徑會記錄在Application["Url"]當(dāng)中傳遞到Handler.ashx當(dāng)中。

注意:Application是一個(gè)全局變量,此處只是為了顯示Task的使用方式,在ASP.NET開發(fā)應(yīng)該慎用Application。

Handler.ashx 處理圖片的下載,它從 Application["Url"] 當(dāng)中獲取所選擇圖片的路徑,并把圖片轉(zhuǎn)化成byte[]二進(jìn)制數(shù)據(jù)。
再把圖片的數(shù)量,每副圖片的二進(jìn)制數(shù)據(jù)的長度記錄在OutputStream的頭部。
最后把圖片的二進(jìn)制數(shù)據(jù)記入 OutputStream 一并輸出。

 1 public class Handler : IHttpHandler 
 2 {
 3     public void ProcessRequest(HttpContext context)
 4     {
 5         //獲取圖片名,把圖片數(shù)量寫OutputStream
6 List<String> urlList = (List<string>)context.Application["Url"]; 7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); 8 9 //把圖片轉(zhuǎn)換成二進(jìn)制數(shù)據(jù)
10 List<string> imageList = GetImages(urlList); 11 12 //把每副圖片長度寫入OutputStream
13 foreach (string image in imageList) 14 { 15 byte[] imageByte=Convert.FromBase64String(image); 16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4); 17 } 18 19 //把圖片寫入OutputStream
20 foreach (string image in imageList) 21 { 22 byte[] imageByte = Convert.FromBase64String(image); 23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length); 24 } 25 } 26 27 //獲取多個(gè)圖片的二進(jìn)制數(shù)據(jù)
28 private List<string> GetImages(List<string> urlList) 29 { 30 List<string> imageList = new List<string>(); 31 foreach (string url in urlList) 32 imageList.Add(GetImage(url)); 33 return imageList; 34 } 35 36 //獲取單副圖片的二進(jìn)制數(shù)據(jù)
37 private string GetImage(string url) 38 { 39 string path = "E:/My Projects/Example/WebSite/Images/"+url; 40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read); 41 byte[] imgBytes = new byte[10240]; 42 int imgLength = stream.Read(imgBytes, 0, 10240); 43 return Convert.ToBase64String(imgBytes,0,imgLength); 44 } 45 46 public bool IsReusable 47 { 48 get{ return false;} 49 } 50 }

 

客戶端

建立一個(gè)WinForm窗口,里面加入一個(gè)WebBrowser連接到服務(wù)器端的Default.aspx頁面。
當(dāng)按下Download按鍵時(shí),系統(tǒng)就會利用TaskFactory.StartNew的方法建立異步線程,使用WebRequest方法向Handler.ashx發(fā)送請求。
接收到回傳流時(shí),就會根據(jù)頭文件的內(nèi)容判斷圖片的數(shù)量與每副圖片的長度,把二進(jìn)制數(shù)據(jù)轉(zhuǎn)化為*.jpg文件保存。

系統(tǒng)利用TaskFactory.StartNew(action,cancellationToken) 方式異步調(diào)用GetImages方法進(jìn)行圖片下載。
當(dāng) 用戶按下Cancel按鈕時(shí),異步任務(wù)就會停止。值得注意的是,在圖片下載時(shí)調(diào)用了 CancellationToken.ThrowIfCancellationRequested方法,目的在檢查并行任務(wù)的運(yùn)行情況,在并行任務(wù)被停止 時(shí)釋放出OperationCanceledException異常,確保用戶按下Cancel按鈕時(shí),停止所有并行任務(wù)。

     public partial class Form1 : Form
     {
         private CancellationTokenSource tokenSource = new CancellationTokenSource();
         
         public Form1()
         {
             InitializeComponent();
             ThreadPool.SetMaxThreads(1000, 1000);
         }
 
         private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
         {
              Task.Factory.StartNew(GetImages,tokenSource.Token);
         }
 
         private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
         {
             tokenSource.Cancel();
         }
 
         private void GetImages()
         {
             //發(fā)送請求,獲取輸出流
WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx"); Stream responseStream=webRequest.GetResponse().GetResponseStream(); byte[] responseByte = new byte[81960]; IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null); int responseLength = responseStream.EndRead(result); //獲取圖片數(shù)量
int imageCount = BitConverter.ToInt32(responseByte, 0); //獲取每副圖片的長度
int[] lengths = new int[imageCount]; for (int n = 0; n < imageCount; n++) { int length = BitConverter.ToInt32(responseByte, (n + 1) * 4); lengths[n] = length; } try { //保存圖片
for (int n = 0; n < imageCount; n++) { string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n); FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite); //計(jì)算字節(jié)偏移量
int offset = (imageCount + 1) * 4; for (int a = 0; a < n; a++) offset += lengths[a]; file.Write(responseByte, offset, lengths[n]); file.Flush(); //模擬操作
Thread.Sleep(1000); //檢測CancellationToken變化
tokenSource.Token.ThrowIfCancellationRequested(); } } catch (OperationCanceledException ex) { MessageBox.Show("Download cancel!"); } } }


 

7.4 并行查詢(PLINQ)

并行 LINQ (PLINQ) 是 LINQ 模式的并行實(shí)現(xiàn),主要區(qū)別在于 PLINQ 嘗試充分利用系統(tǒng)中的所有處理器。 它利用所有處理器的方法,把數(shù)據(jù)源分成片段,然后在多個(gè)處理器上對單獨(dú)工作線程上的每個(gè)片段并行執(zhí)行查詢, 在許多情況下,并行執(zhí)行意味著查詢運(yùn)行速度顯著提高。但這并不說明所有PLINQ都會使用并行方式,當(dāng)系統(tǒng)測試要并行查詢會對系統(tǒng)性能造成損害時(shí),那將自動化地使用同步執(zhí)行。
在System.Linq.ParallelEnumerable類中,包含了并行查詢的大部分方法。

 

方法成員 

說明

AsParallel

PLINQ 的入口點(diǎn)。 指定如果可能,應(yīng)并行化查詢的其余部分。

AsSequential(Of TSource)

指定查詢的其余部分應(yīng)像非并行 LINQ 查詢一樣按順序運(yùn)行。

AsOrdered

指定 PLINQ 應(yīng)保留查詢的其余部分的源序列排序,直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序?yàn)橹埂?/span>

AsUnordered(Of TSource)

指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。

WithCancellation(Of TSource)

指定 PLINQ 應(yīng)定期監(jiān)視請求取消時(shí)提供的取消標(biāo)記和取消執(zhí)行的狀態(tài)。

WithDegreeOfParallelism(Of TSource)

指定 PLINQ 應(yīng)當(dāng)用來并行化查詢的處理器的最大數(shù)目。

WithMergeOptions(Of TSource)

提供有關(guān) PLINQ 應(yīng)當(dāng)如何(如果可能)將并行結(jié)果合并回到使用線程上的一個(gè)序列的提示。

WithExecutionMode(Of TSource)

指定 PLINQ 應(yīng)當(dāng)如何并行化查詢(即使默認(rèn)行為是按順序運(yùn)行查詢)。

ForAll(Of TSource)

多線程枚舉方法,與循環(huán)訪問查詢結(jié)果不同,它允許在不首先合并回到使用者線程的情況下并行處理結(jié)果。

Aggregate 重載

對于 PLINQ 唯一的重載,它啟用對線程本地分區(qū)的中間聚合以及一個(gè)用于合并所有分區(qū)結(jié)果的最終聚合函數(shù)。


7.4.1 AsParallel

通常想要實(shí)現(xiàn)并行查詢,只需向數(shù)據(jù)源添加 AsParallel 查詢操作即可。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var personList=GetPersonList().AsParallel() 
 6                    .Where(x=>x.Age>30);
 7             Console.ReadKey();
 8         }
 9 
10         //模擬源數(shù)據(jù)
11 static IList<Person> GetPersonList() 12 { 13 var personList = new List<Person>(); 14 15 var person1 = new Person(); 16 person1.ID = 1; 17 person1.Name = "Leslie"; 18 person1.Age = 30; 19 personList.Add(person1); 20 ........... 21 return personList; 22 } 23 }

 

7.4.2 AsOrdered

若要使查詢結(jié)果必須保留源序列排序方式,可以使用AsOrdered方法。
AsOrdered依然使用并行方式,只是在查詢過程加入額外信息,在并行結(jié)束后把查詢結(jié)果再次進(jìn)行排列。

     class Program
     {
         static void Main(string[] args)
         {
             var personList=GetPersonList().AsParallel().AsOrdered()
                 .Where(x=>x.Age<30);
             Console.ReadKey();
         }
 
         static IList<Person> GetPersonList()
         {......}
     }


7.4.3 WithDegreeOfParallelism

默認(rèn)情況下,PLINQ 使用主機(jī)上的所有處理器,這些處理器的數(shù)量最多可達(dá) 64 個(gè)。
通過使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定數(shù)量的處理器。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
 6                 .Where(x=>x.Age<30);
 7             Console.ReadKey();
 8         }
 9 
10         static IList<Person> GetPersonList()
11         {.........}
12     }

 

7.4.4 ForAll

如果要對并行查詢結(jié)果進(jìn)行操作,一般會在for或foreach中執(zhí)行,執(zhí)行枚舉操作時(shí)會使用同步方式。
有見及此,PLINQ中包含了ForAll方法,它可以使用并行方式對數(shù)據(jù)集進(jìn)行操作。

     class Program
     {
         static void Main(string[] args)
         {
             ThreadPool.SetMaxThreads(1000, 1000);
             GetPersonList().AsParallel().ForAll(person =>{
                 ThreadPoolMessage(person);
             });
             Console.ReadKey();
         }
 
         static IList<Person> GetPersonList()
         {.......}
 
          //顯示線程池現(xiàn)狀
static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

運(yùn)行結(jié)果

 

7.4.5 WithCancellation

如果需要停止查詢,可以使用 WithCancellation(Of TSource) 運(yùn)算符并提供 CancellationToken 實(shí)例作為參數(shù)。
與 第三節(jié)Task的例子相似,如果標(biāo)記上的 IsCancellationRequested 屬性設(shè)置為 true,則 PLINQ 將會注意到它,并停止所有線程上的處理,然后引發(fā) OperationCanceledException。這可以保證并行查詢能夠立即停止。

 1     class Program
 2     {
 3         static CancellationTokenSource tokenSource = new CancellationTokenSource();
 4 
 5         static void Main(string[] args)
 6         {
 7             Task.Factory.StartNew(Cancel);
 8             try
 9             {
10                 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11                     .ForAll(person =>
12                     {
13                         ThreadPoolMessage(person);
14                     });
15             }
16             catch (OperationCanceledException ex)
17             { }
18             Console.ReadKey();
19         }
20 
21         //在10~50毫秒內(nèi)發(fā)出停止信號
22 static void Cancel() 23 { 24 Random random = new Random(); 25 Thread.Sleep(random.Next(10,50)); 26 tokenSource.Cancel(); 27 } 28 29 static IList<Person> GetPersonList() 30 {......} 31 32 //顯示線程池現(xiàn)狀
33 static void ThreadPoolMessage(Person person) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 39 " CompletionPortThreads is :{5}\n", 40 person.ID, person.Name, person.Age, 41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 } 億恩-天使(QQ:530997) 電話 037160135991 服務(wù)器租用,托管歡迎咨詢。

本文出自:億恩科技【m.1tcdy.com】

服務(wù)器租用/服務(wù)器托管中國五強(qiáng)!虛擬主機(jī)域名注冊頂級提供商!15年品質(zhì)保障!--億恩科技[ENKJ.COM]

  • 您可能在找
  • 億恩北京公司:
  • 經(jīng)營性ICP/ISP證:京B2-20150015
  • 億恩鄭州公司:
  • 經(jīng)營性ICP/ISP/IDC證:豫B1.B2-20060070
  • 億恩南昌公司:
  • 經(jīng)營性ICP/ISP證:贛B2-20080012
  • 服務(wù)器/云主機(jī) 24小時(shí)售后服務(wù)電話:0371-60135900
  • 虛擬主機(jī)/智能建站 24小時(shí)售后服務(wù)電話:0371-60135900
  • 專注服務(wù)器托管17年
    掃掃關(guān)注-微信公眾號
    0371-60135900
    Copyright© 1999-2019 ENKJ All Rights Reserved 億恩科技 版權(quán)所有  地址:鄭州市高新區(qū)翠竹街1號總部企業(yè)基地億恩大廈  法律顧問:河南亞太人律師事務(wù)所郝建鋒、杜慧月律師   京公網(wǎng)安備41019702002023號
      0
     
     
     
     

    0371-60135900
    7*24小時(shí)客服服務(wù)熱線