jlearning.cn

《Java并发编程实战》读书笔记——线程池的使用

任务与执行策略之间的隐形耦合

Executor可以将生产者和消费者解耦,但是不是所有的任务都适用所有的执行策略,有些类型的任务需要明确地指定执行策略

  1. 依赖性任务: 如果一个任务是独立的,改变线程池的大小和配置只会影响性能。如果需要以来其他的任务,那么必须小心的维持这些执行策略以避免产生活跃性问题。
  2. 使用线程封闭机制的任务:单线程的Executor能对并发性做出更强的承诺。如果将Executor从单线程环境改为线程池环境,那么将会失去线程安全性。
  3. 对相应时间敏感的task:比如说GUI处理用户操作的task。
  4. 使用了ThreadLocal类的task:只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义。ThreadLocal让每个线程都拥有某个变量的私有版本与线程池重用线程相矛盾。

只有当任务都是同类型,并且相互独立时,线程池的性能才能达到最佳。

  • 如果运行时间较长和较短的任务混合在一起,可能造成“拥塞”
  • 如果任务依赖于其他任务,可能造成“死锁”

线程饥饿死锁

所有正在执行的任务都由于等待其他人处于工作队列中的任务而阻塞,这种现象被称为线程饥饿死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDeadlock{
//这里的newSingleThreadExecutor()方法返回的线程池只包含单个线程
ExecutorService exec = Executors.newSingleThreadExecutor();
public class RenderPageTask implement Callable<String>{
public String call() throws Execptioin{
Future<String> header, footer;
headr = exec.submit(new loadFileTask("header.html"));
footer = exec.submit(new loadFileTask("footer.html"));
String page = renderBody();
//这里的get将会死锁。
return header.get()+page+footer.get();
}
}
}

除了显式的限制,还有一些情况也会发生死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newFixedThreadPool(3);
// 设定await在Barrier对象上的线程数达到4个时, 其await方法才释放
final CyclicBarrier barrier = new CyclicBarrier(4);
// 重复提交4个task, 每个task都await在barrier对象上
// barrier的await方法将一直阻塞, 直到4个线程都到达await点.
// 但是线程池中只有3个线程, 不可能出现4个线程都达到await点的情形, 所以依然会发生死锁
for (int i = 0; i < 4; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("waiting for other tasks arriving at common point");
barrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
}

运行时长较长的任务

线程池中线程的数量应该多于稳定状态下执行较长时长任务的数量。

限定任务等待资源的时间:平台类库中的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join,BlockingQueue.put.CountDownLatch.await,Selector.select等。如果等待超时,可以把任务标识为失败,终止或者重新放回队列。

设置线程池的大小

  • 计算环境:cpu
  • 资源预算:内存
  • 任务的特性:计算密集型还是IO密集型,是否需要JDBC连接这样的稀缺资源。

如果需要执行不同类别并且行为相差较大的任务,可以考虑使用多个线程池。

对于计算密集型task, 合适的size大约为CPU数量+1.

IO密集型task,size = CPU数量 CPU利用率 (1 + 等待时间和计算时间的比例)。

内存等:每个任务对该资源的需求量除资源总量,就是线程池大小的上限。

配置ThradPoolExecutor

ThreadPoolExecutor通用构造函数:

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(
//线程的基本大小,没有任务执行时的大小。
int corePoolSize,
int maximumPoolSize,//最大大小
//存活时间,超过标记为可回收。
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){……}

线程的创建和销毁

  • newCachedThreadPool 最大大小设定为Integer.Max_Value,基本大小为0,超时为1分钟
  • newFixedThreadPool 基本大小和最大大小设定为指定的值,而且创建的线程池不会超时
  • newScheduledThreadExecutor 核心线程数由调用方指定, 最大线程数为Integer.MAX_VALUE, 超时时间为0

管理队列任务

  • 无界队列:newFixedThreadPoll和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue
  • 有界队列ArrayBlockingQueue,有界的LinkedBlockingQueue PriorityBlockingQueue。在队列填满后,使用饱和策略解决。队列的大小必须和线程池的大小一起调节,如果线程池小队列大,会限制吞吐量。
  • 同步移交(Synchronous Handoff): 对于非常大或者无界的线程池,使用SynchronousQueue来避免任务排队,直接将任务从生产者移交给消费者线程。线程池无界或者可以拒绝任务时,SynchronousQWueue才有实际价值,在newCachedThreadPool工厂方法就使用了SynchronousQueue

饱和策略

在有界队列填满之后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略通过setRejectedExecutionHandler来修改。

  • Abort Policy:抛出未检查的RejectedExecutioniException,调用者可以捕获这个异常,然后根据需求编写处理代码。
  • Caller-Runs Policy:将某些任务回退到调用者,从而降低新任务的流量。在提交task的线程中执行task,从而提交task的线程就不能提交task,当这一层的请求队列被填满后,再向上蔓延,一直达到客户端,实现一种平缓的性能降低。
  • Discard policy:抛弃该任务
  • Discard Oldest Policy:抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果使用优先队列,那么将抛弃优先级最高的)

线程工厂

1
2
3
4
5
6
7
8
9
10
11
public class MyThreadFactory implements ThreadFactory{
private final String poolName;
public MyThreadFactory(String poolName){
this.poolName = poolName;
}
public Thread newThread(Runnable runnable){
return new MyAppThread(runnable, pollName);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
// 为自定义的Thread类指定线程名称
super(runnable, name + "-" + created.incrementAndGet());
// 设置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法将在线程运行中抛出未捕获异常时由系统调用
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// 复制debug标志以确保一致的值。。。?
boolean debug = debugLifecycle;
if (debug)
log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug)
log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}

在调用构造函数后再设置ThreadPoolExecutor

1
2
3
4
5
ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)exec).setCorePoolSize(10);
else
throw new AssertionError("Error");

扩展ThreadPoolExecutor

ThreadPoolExecutor类提供了多个”钩子”方法, 以供其子类实现, 比如beforeExecute, afterExecute, terminated等. 所谓”钩子”是指基类预留的, 但是没有提供具体实现的方法, 其方法体为空. 子类可以根据需要为”钩子”提供具体实现。

beforeExecuteafterExecute方法分别在执行task前后调用beforeExecuteafterExecute方法可以用于记录日志, 统计数据等操作。

terminated方法在线程池被关闭后调用。 terminated方法可以用于释放线程池申请的资源。

递归算法的并行化

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么可以使用Executor将穿行循环转化为并行循环。

如果需要提交一个任务集,并等待他们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果。

在递归中并行:将在节点上的计算与递归访问分开,将计算并行化。

1
2
3
4
5
6
7
8
9
10
11
12
public<T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<t> results){
for(final Node<T> n : nodes){
exec.execute(new Runnable(){
public void run(){
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(),results);
}
}

使用sutdown和awaitTermination等方法等待所有的结果:

1
2
3
4
5
6
7
8
9
public<T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedExeception{
ExectorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>;
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE,TimeUnit.SECONDS);
return resultQueue;
}
  • shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
  • awaitTermination方法:接收人timeout和 TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

P151的示例待看