Thread
setDaemon: 守护线程, 与主进程共进退
实现了 Runnable
Thread 中的父子进程并没有真正意义上的父子关系, 父进程的终止不会影响子进程的运行
守护进程才有逻辑上的父子关系, 守护进程只对父线程守护(很好理解, 只有一级的关系)
ThreadLocal
本质是一个包装类,没有任何储存信息的功能, ThreadLocal 的实例是一个单纯的 key
ThreadLocal 提供 ThreadLocal.withInitialValue(Supplier supplier) 方法返回一个实现 Supplier 接口的 Threadlocal 的子类 SuppliedThreadLocal, Supplier<T>
提供了一个 T get()
方法要求返回一个值, 同时 SuppliedThreadLocal
覆写了 initialValue() 的方法, 从 return null
--> return supplier.get()
, 将 ThreadLocal 的初始化值直接编译在函数方法里
ThreadLocal 将自身实例作为 key , 通过方法去操作线程的 LocalMap , 将数据储存在线程的 ThreadLocal.ThreadLocalMap ThreadLocals 里面, ThreadLocalMap 是一个 ThreadLocal 的内部类, 自己内部实现了 hash 表, 表里面的元素是 Entry (extends Weakference<ThreadLocal<?>>) 是一个指向 ThreadLocal 的弱引用, 只要没有其他引用指向 ThreadLocal, ThreadLocal 就会被自动回收;
同时 ThreadLocal 在调用 set、remove、rehash 方法时, 会进行一个判断:
// e 是 tab[i] 指向的 Entry , Entry 是一个弱引用类, refersTo 方法判断弱引用的实例是否相等, 当发现有 Entry 弱引用了一个被回收的类的时候(也就是指向 null), 会使用 replaceStaleEntry(key, value, i) 方法(翻译:替换不新鲜的Entry)
if (e.refersTo(null)) {
replaceStaleEntry(key, value, i);
return;
}
// 这个机制可以回收不用的 Value, 避免 OOM
ThreadLocal 信息是储存在 Thread 里面的, 所以 ThreadPool 里面相同的线程执行不同的任务时仍会拿到相同的 Value
ThreadPool
corePoolSize 最大核心线程数
maximumPoolSize 最大线程数, 包括了应急线程
keepAliveTime 应急线程最大存活时间
unit 存活时间的时间单位
workQueue 任务列表, 一般用 ArraysBlockedQueue
threadFactory 线程创造工厂, 经典的工厂模式
RejectedExecutionHandler handler 拒绝策略, 判断是否拒绝接受新的任务
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
核心线程不一定一直是核心线程, 线程执行任务的流程如下:
首先任务直接交给先创建的核心线程, 核心线程不够的时候进入阻塞队列, 阻塞队列不够的时候启用应急线程(以上为默认拒绝策略)
应急线程还在执行(也就是当前线程数大于核心线程数), 任何线程(包括曾经的核心线程)都会在结束任务后进行 keepAliveTime 倒计时, 倒计时结束就会被回收
这也就导致 ThreadLocal 在 ThreadPool 中只要不 remove, 就无法控制下一个任务拿到的 value 究竟遗留自哪个线程, 所以在 ThreadPool 中使用 ThreadLocal 一定要手动 remove 回收.
// 测试源码
package com.dawn;
import java.util.concurrent.*;
public class SourceCode {
public static void main(String[] args) throws InterruptedException {
int taskNum = 10;
CountDownLatch latch = new CountDownLatch(taskNum);
ThreadLocal<String> val = new ThreadLocal<String>();
ExecutorService pool = new ThreadPoolExecutor(
3,5,
1,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int idx = 1; idx<=taskNum; idx ++){
pool.submit(new MyRun(idx,val,latch));
}
latch.await();
System.out.println("前面任务已经完成");
TimeUnit.SECONDS.sleep(5);
latch = new CountDownLatch(taskNum);
for(int idx = taskNum+1; idx<=taskNum+taskNum; idx ++){
pool.execute(new MyRun(idx,val,latch));
}
latch.await();
pool.shutdown();
}
}
class MyRun implements Runnable {
int idx;
ThreadLocal val;
CountDownLatch count;
public MyRun(int idx, ThreadLocal val,CountDownLatch count) {
this.idx = idx;
this.val = val;
this.count = count;
}
@Override
public void run() {
try {
System.out.println("task No."+idx+" running on thread"+ Thread.currentThread().getName().substring(13)+", ThreadLocal from "+val.get());
val.set("task"+idx);
TimeUnit.SECONDS.sleep(2);
if(count!=null) count.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ReentranLock
默认时非公平的:
public ReentrantLock() {
sync = new NonfairSync();
}
ReentranLock 的公平锁和非公平锁的实现区别( lock 方法的流程 )
lock 方法的内部是这样的:
public void lock() {
sync.lock();
}
而 sync 是 ReentranLock 的 ReentranLock 的一个内部 abstract static class, 继承了 AbstractQueuedSynchronizer (AQS) 这个抽象类, 这个抽象类本身是一个等待队列, 内部提供了一大把 protected 方法如下为例, 这些方法都只在内部 throw UnsupportedOperationException (没有提供支持的选项), 需要继承的类来重写实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
回到 sync.lock() 方法, 具体内容是这样的:
final void lock() {
if (!initialTryLock())
acquire(1);
}
acquire 是 AbstractQueuedSynchronizer 的方法, 具体是这样的:
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
而 tryAcquire 方法在上面说了, 假如不重写的话会直接抛出异常, 所以 Sync 的两个实现类 FairSync 和 NonfairSync 重写了这个方法.
而 initialTryLock 是 Sync 内部的一个抽象方法, 要求子类重写的:
/**
* Checks for reentrancy and acquires if lock immediately
* available under fair vs nonfair rules. Locking methods
* perform initialTryLock check before relaying to
* corresponding AQS acquire methods.
*/
abstract boolean initialTryLock();
接下来的重点就是两个不同的实现方式:
/**
* Sync object for unfair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Acquires only if reentrant or queue is empty.
*/
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
/**
* Acquires only if thread is first waiter or empty
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
/**
* Acquire for non-reentrant cases after initialTryLock prescreen
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
其实主要区别就是在 if 往下走的判断上, 简化后两个方法的流程如下:
// Nonfair initialTryLock 直接 CAS
if (CAS){
setExclusiveOwnerThread(Thread.currentThread()); // lock
return true; // 截断
}else if ...
// Acquire
if (getState() == 0 && CAS){
setExclusiveOwnerThread(Thread.currentThread()); // lock
return true; // 截断
// Fair initialTryLock 前置满足 state = 0 && !hasQueuedPredecessors() 后才能尝试 CAS
if (c==0){
if (!hasQueuedThreads() && CAS){
setExclusiveOwnerThread(Thread.currentThread()); // lock
return true; // 截断
}else if ...
// Acquire
if (getState() == 0 && !hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread()); // lock
return true; // 截断
ReentrantReadWriteLock
内部设置两个锁类 ReadLock 和 WriteLock
两个之间的最主要的区别是 lock 方法的实现, ReadLock 和 WriteLock 共享同一个 Sync 实例, lock 方法的实现分别如下:
/**
* Acquires the read lock.
*/
public void lock() {
sync.acquireShared(1);
}
/**
* Acquires the write lock.
*/
public void lock() {
sync.acquire(1);
}
关于 ReentrantReadWriteLock 的公平和非公平的区别, 在如下两个方法
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
FutureTask
是一个内置 Callable 并实现l Runnable 和 Future 接口的包装类, 通过 get 方法拿到 Callable 的结果和异常
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
Sleep-Yeild / Wait-Notify
Sleep 睡醒了自动加入就绪队列
Yeild 直接加入就绪队列
Wait 进入等待阻塞队列
Notify 随机唤醒阻塞队列里面等待的线程并进入就绪队列
NotifyAll 字面意思
LockSupport
park - unpark 类似令牌指令, park 等待令牌, unpark 给予令牌
本身是一个不能被实例化的工具类, 内部通过一个 Unsafe 的静态类来实现各种方法
syn-ed
这个没有源码哈, 只有两个重点记一下
锁升级的: 无锁 --> 偏向锁(只有一个线程的时候, 利于锁重入) --> 轻量级锁(多个进程但是没有竞争) --> 重量级锁(需要 monitor 来管理)
Object 的 MarkWord 在这几种状态下的构成如图
所以看到 Object 作为锁不要没事就去 getHashCode 会破坏锁
Syn-ed 的 CAS 具体是干什么:
在轻量级锁上, CAS 是将自己线程存放的 LockRecord 的地址 CAS 进锁对象的 MarkWord 并且在 LockRecord 内保存锁对象自己的 Normal MarkWord (类似于交换), 释放锁的时候就是将原本锁对象的 Normal MarkWord CAS 回去
在重量级锁上, CAS 都是失效的, 返回 false, 此时锁对象的 MarkWord 的内容是 Monitor 对象的地址, 请求 Monitor 来帮忙管理, 线程的每次 CAS 都会触发 Monitor 来管理前来 CAS 的线程, 判断是来竞争还是重入还是释放的
Monitor 对象里面有三个成员: Owner, EntryList, WaitSet 分别对应处于占用, 直接访问 Sync-ed 导致等待阻塞, 使用 wait 方法导致等待的三类线程
Semaphore
内部一个 Sync 继承 AQS , 先天不公平
实际作用: 最多允许持有 permits 数量的线程并发
CountDownLatch
本质也是内部有一个 Sync 继承 AQS, 所有 await 的都进入请求等待同步器(AQS), 请求类为共享阻塞请求
countdown 方法在 state == 0 时会尝试 signalNext(Head) 唤醒头节点的后续节点
await 方法中将线程直接阻塞在了 doAcquireSharedInterruptibly() 方法中, 继续执行会更新头节点并且继续唤醒下一个线程(完成线程的传递唤醒)
为什么唤醒一个后续节点就可以唤醒所有等待节点呢?
调用 await() 方法的线程都被阻塞在 doAcquireSharedInterruptibly() 方法中。如果线程没有被中断过,会判断 state 的值,这里线程是被调用 countDown 方法唤醒的,所以 state 一定是 0,所以会调用 setHeadAndPropagate() 方法更新头结点并继续唤醒之后的线程。这样就会把依次将所有阻塞的阻塞线程都唤醒。(因为 countDownLatch 的计数器为 0 之后需要将所有调用 await() 阻塞的线程唤醒)
CyclicBarrier
两种构造方法:
public CyclicBarrier(int parties, Runnable barrierAction){}
public CyclicBarrier(int parties){}
内部依然是 AQS 不多赘述
当有线程使用 barrier.await() 方法的时候, 就进入阻塞, 当 parties 个线程使用了 barrier.await() 方法之后, 会先启动 barrierAction 线程(如果没有就跳过这一步), barrierAction 线程结束时再重新开始下一代循环
ConHashMap
细粒度划分的同步 hashmap 把每一个节点都上了一个锁