C# 创建线程的多种方式之线程池和任务

1. 线程池

创建,释放线程都需要消耗很多时间,所以如果有许多的用时较短的小任务需要同时完成且不需要过多的控制,则可以选择线程池来实现,即ThreadPool类.

对于线程所执行的任务来说,可以把线程分为两种类型:工作者线程和I/O线程。工作者线程用来完成一些计算的任务,在任务执行的过程中,需要CPU不间断地处理,所以,在工作者线程的执行过程中,CPU和线程的资源是充分利用的。对于I/O线程,在.Net中通过以Begin开头的方法来完成启动,以End开头的方法来处理结果。.

SetMaxThreads() :设置最大工作线程,双核系统默认设置工作线程数1023,IO线程数1000。若线程池中的数目达到最大,那么最新工作将排队等待;

SetMinThreads():  设置最小工作线程,即线程池中至少会准备多少个线程备用,一旦超过此数目,线程池便会扩充(花时间);

在实际应用中需要根据线程的并发数量合理设置最大,最小线程数,这样才能合理优化资源。如果最小线程数过底,则线程池便会不断创建,销毁线程,浪费资源,如果设的过高,就会造成很多线程空闲等待。

GetAvailableThreads():获取当前可用线程数量,即最大线程数-正在使用的线程数;

QueueUserWorkItem():调用方法,无法取消;

            Console.WriteLine("Main Start....");
            ThreadPool.SetMaxThreads(100,100);       //设置最大线程数
            ThreadPool.SetMinThreads(5, 10);       //设置最小线程数
            int workThread, IOThread;
            ThreadPool.GetMaxThreads(out workThread, out IOThread);       //获取最大线程数
            Console.WriteLine("total is {0}, IOThread is {1}", workThread, IOThread);
            ThreadPool.GetMinThreads(out workThread, out IOThread);       //获取最小线程数
            Console.WriteLine("total is {0}, IOThread is {1}", workThread, IOThread);
            ThreadPool.QueueUserWorkItem(Calculate, 30);          //调用方法
            ThreadPool.GetAvailableThreads(out workThread, out IOThread);     //读取当前可用线程
            Console.WriteLine("total is {0}, IOThread is {1}", workThread, IOThread);
            ThreadPool.QueueUserWorkItem(Calculate, 30);          //调用方法
            ThreadPool.GetAvailableThreads(out workThread, out IOThread);     //读取当前可用线程
            Console.WriteLine("total is {0}, IOThread is {1}", workThread, IOThread);
            ThreadPool.QueueUserWorkItem(Calculate, 30);          //调用方法
            ThreadPool.GetAvailableThreads(out workThread, out IOThread);     //读取当前可用线程
            Console.WriteLine("total is {0}, IOThread is {1}", workThread, IOThread);
            Console.ReadLine();

运行结果:

Main Start....
total is 100, IOThread is 100
total is 5, IOThread is 10
total is 98, IOThread is 100
total is 97, IOThread is 100
total is 95, IOThread is 100
Sum is 435
Sum is 435
Sum is 435

2. 任务

任务表示应完成的某个单元的工作,这个单元的工作可以在单独的线程中完成,也可同步完成(调用RunSynchronously())。 创建启动任务有2种方式:

Task类,TaskFactory类(可以Task.Factory获取实例),两种方式均可接受Action, Action<Object>,Func<Object,Tresult>类型的函数。
       static void Main(string[] args)
        {
            
            Console.WriteLine("Main Start....");
            Task<int> t1 = new Task<int>(Calculate, 20);
            t1.Start();
            while(!t1.IsCompleted){
                Console.WriteLine("Waiting Result....");
                Thread.Sleep(500);
            }
            Console.WriteLine("Result is " + t1.Result);
            Console.ReadLine();
        }

        private static int Calculate(object obj)
        {
            int sum = 0;
            int total = (int)obj;
            for (int i = 0; i < total;i++ )
            {
                sum += i;
                Thread.Sleep(100);
            }
            return sum;
        }

IsComplete属性表示任务是否完成,Result属性返回结果,与委托的异步调用类似。接着使用TaskFactory.

       static void Main(string[] args)
        {
            
            Console.WriteLine("Main Start....");
            Task<int> t1 = Task<int>.Factory.StartNew(obj =>
            {
                int sum = 0;
                int total = (int)obj;
                for (int i = 0; i < total; i++)
                {
                    sum += i;
                    Thread.Sleep(100);
                }
                return sum;
            }, 20);
            while(!t1.IsCompleted){
                Console.WriteLine("Waiting result....");
                Thread.Sleep(500);
            }
            Console.WriteLine("Result is " + t1.Result);
            Console.ReadLine();
        }

在Task类的重载构造函数中,还有可以传递CancellationToken结构参数,TaskCreationOptions枚举参数。

CancellationToken 用来传播有关应取消操作的通知,通过实例化创建取消标记CancellationTokenSource对象,该管理取消令牌对象中检索其CancellationTokenSource.Token属性获取,下面的例子参考了MSDN:

        static void Main(string[] args)
        {
            CancellationTokenSource cs = new CancellationTokenSource();     //创建CancellationTokenSource实例
            Console.WriteLine("Main Start....");
            CancellationToken token = cs.Token;                            //获取CancellationTokenSource.Token属性
            List<Task<int>> tasks = new List<Task<int>>();
            int num = 5;                                           //任务数量
            for(int y=0;y<num;y++){
                tasks.Add(Task<int>.Factory.StartNew((obj) =>      //创建并启动任务
                {
                    int sum = 0;
                    int total = (int)obj;
                    for (int i = 0; i < total; i++)
                    {
                        sum += i;
                        Thread.Sleep(100);
                        if (sum > 200)
                        {
                            if (!cs.IsCancellationRequested)    // 判断CancellationTokenSource是否已经调用Cancel方法,调用一次Cancel后就无需调用
                            {
                                cs.Cancel(false);             //当sum>200, 调用 CancellationTokenSource.Cancel 方法以提供取消通知
                                Console.WriteLine("Task" + total / 5 + " has been cancelled when sum is " + sum);
                                break;
                            }
                        }
                    }                   
                    return sum;
                }, y*5, token));                     //将 CancellationTokenSource.Token 属性返回的标记传递给每个侦听取消的任务或线程                   
            }
            try
            {
                Task<double> task2 = Task.Factory.ContinueWhenAll<int, double>(tasks.ToArray(), (arrTask) =>         //创建一个延续任务,该任务在指定的任务完成后开始
                {
                    double total = 0;
                    foreach(Task<int> item in arrTask){
                        total += item.Result;
                    }
                    return total / num;
                }, token);                            //将 CancellationTokenSource.Token 属性返回的标记传递给每个侦听取消的任务或线程 
                Console.WriteLine("the Average value is "+task2.Result);
            }
            catch (AggregateException ae)
            {
                foreach (Exception e in ae.InnerExceptions)
                {
                    if (e is TaskCanceledException)
                        Console.WriteLine("Cancel err:"+((TaskCanceledException)e).Message);
                    else
                        Console.WriteLine("Exception: " + e.GetType().Name);
                }
            }
            finally
            {
                 cs.Dispose();                         //调用Dispose方法在使用完CancellationTokenSource对象
                 Console.WriteLine("CancellationTokenSource has been disposed...");
            }
            Console.ReadLine();
        }

当num=5时,运行结果如下:

Main Start.... the Average value is 70 CancellationTokenSource has been disposed...

此时没有调用Cancel,若num=8,运行结果如下:

Main Start.... Task5 has been cancelled when sum is 210 Cancel err:A task was canceled. CancellationTokenSource has been disposed...

在运行第五个任务的时候,sum>200,调用Cancel(),ContinueWhenAll() 出现 AggregateException,并在其InnerExceptions中捕捉到TaskCanceledException。其实运行第6,第7和第8个任务,sum都会大于200,所以采用IsCancellationRequested属性判断是否已经取消。

TaskCreationOptions:指定可控制任务的创建和执行的可选行为的标志。

连续的任务:可以指定在某个任务完成后,开始运行另外一个特定任务。ContinueWith()创建连续特定任务,可以传递需等待完成的任务和TaskContinuationOptions枚举参数。改枚举与TaskCreationOptions类似,除了上述字段还增加了一些字段. 写个下例子:

        static void Main(string[] args)
        {
            Console.WriteLine("Main Start....");
            Task<int> t1 = new Task<int>(Calculate, 10);
            Task<int> t2 = t1.ContinueWith<int>(t =>
            {
                Console.WriteLine("t1 finish, t2 start....");
                return t.Result;
            }, TaskContinuationOptions.NotOnFaulted);     //在没有抛出异常的情况下
            Task t3 = t1.ContinueWith(t =>
            {
                Console.WriteLine("t1 err, t3 start....");
            }, TaskContinuationOptions.OnlyOnFaulted);   //在抛出异常的情况下
            t1.Start();
            Console.WriteLine("t2's result is "+t2.Result);
            Console.ReadLine();
        }

        private static int Calculate(object obj)
        {
            int sum = 0;
            int total = (int)obj;
            for (int i = 0; i < total; i++)
            {
                sum += i;
                Thread.Sleep(100);
            }
            if (sum > 50)
            {
                throw new Exception();
            }
            return sum;
        }

运行结果:

Main Start....
t1 finish, t2 start....
t2's result is 45
 t1 = new Task<int>(Calculate, 15)

但如果 让t1运行抛出异常,同时要注释Console.WriteLine("t2's result is "+t2.Result); 运行后,Studio会弹出异常框,点击Continue, 结果如下:

  Main Start....
  t1 err, t3 start....

t1 运行后,运行t3 ,而不t2。

任务层次结构:当从一个任务里启动另一个任务时,就构成了父/子层次结构。

        static void Main(string[] args)
        {
            Console.WriteLine("Main Start....");
            Task parentTask = new Task(() =>
            {
                Console.WriteLine("parent task start....");
                Task childTask = new Task(() =>
                {
                    Console.WriteLine("child task start....");
                    Thread.Sleep(5000);
                    Console.WriteLine("child task end....");
                },TaskCreationOptions.AttachedToParent);     //父任务和子任务同步
                childTask.Start();
                Console.WriteLine("parent task end....");
            },TaskCreationOptions.None);
            parentTask.Start();
            Thread.Sleep(1000);
            Console.WriteLine(parentTask.Status);      //输出父任务的状态
            Console.ReadLine();
        }

运行结果:

Main Start....
parent task start....
parent task end....
child task start....
WaitingForChildrenToComplete
child task end....

从运行结果可以看出,父任务虽运行到最后,但其状态仍然是WaitingForChildrenToComplete,这是因为父任务中创建子任务时,传递了TaskCreationOptions.AttachedToParent参数,默认父任务与子任务是相互独立的。如果创建父任务时传递了TaskCreationOptions.DenyChildAttach参数,则父任务中创建的子任务均为独立的,TaskCreationOptions.AttachedToParent参数无效。修改上面例子,父任务创建时传递TaskCreationOptions.DenyChildAttach,运行结果:

Main Start....
parent task start....
parent task end....
child task start....
RanToCompletion
child task end....