1. JUC 简介
- 在 Java 5.0 提供了
java.util.concurrent
(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,
用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中
的 Collection 实现等;
2. volatile 关键字
- volatile 关键字: 当多个线程进行操作共享数据时,可以保证内存中的数据是可见的;相较于 synchronized 是一种
较为轻量级的同步策略;
- volatile 不具备"互斥性";
- volatile 不能保证变量的"原子性";
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private boolean flag = false;
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
e.printStackTrace();
}
flag = true;
Sytem.out.println("flag="+isFlag());
}
public boolean isFlag(){
return flag;
}
public void setFlag(boolean flag){
this.flag = flag;
}
}
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
synchronized(td){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
}
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private volatile boolean flag = false;
同上(略)
}
3. i++
的原子性问题
i++
的操作实际上分为三个步骤: "读-改-写";
- 原子性: 就是"i++"的"读-改-写"是不可分割的三个步骤;
- 原子变量: JDK1.5 以后,
java.util.concurrent.atomic
包下,提供了常用的原子变量;
- 原子变量中的值,使用
volatile
修饰,保证了内存可见性;
- CAS(Compare-And-Swap) 算法保证数据的原子性;
int i = 10;
i = i++;
执行步骤:
int temp = i;
i = i + 1;
i = temp;
public class TestAtomicDemo{
public static void main(String[] args){
AtomicDemo ad = new AtomicDemo();
for(int i=0; i < 10; i++){
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable{
private int serialNumber = 0;
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
}
public int getSerialNumber(){
return serialNumber++;
}
}
class AtomicDemo implements Runnable{
private AtomicInteger serialNumber = new AtomicInteger();
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+":"+getSerialNumber());
}
public int getSerialNumber(){
return serialNumber.getAndIncrement();
}
}
3.1 CAS 算法
- CAS(Compare-And-Swap) 算法是硬件对于并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于
管理对共享数据的并发访问;
- CAS 是一种无锁的非阻塞算法的实现;
- CAS 包含了三个操作数:
- 需要读写的内存值: V
- 进行比较的预估值: A
- 拟写入的更新值: B
- 当且仅当 V == A 时, V = B, 否则,将不做任何操作;
class CompareAndSwap{
private int value;
public synchronized int get(){
return value;
}
public synchronized int compareAndSwap(int expectedValue, int newValue){
int oldValue = value;
if(oldValue == expectedValue){
this.value = newValue;
}
return oldValue;
}
public synchronized boolean compareAndSet(int expectedValue, int newValue){
return expectedValue == compareAndSwap(expectedValue, newValue);
}
}
public class TestCompareAndSwap{
public static void main(String[] args){
final CopareAndSwap cas = new CompareAndSwap();
for(int i=0; i<10; i++){
new Thead(new Runnable(){
public void run(){
int expectedValue = cas.get();
boolean b = cas.compareAndSet(expectedValue, (int)(Math.random()*100));
System.out.println(b);
}
}).start();
}
}
}
4. 并发容器类
- Java 5.0 在
java.util.concurrent
包中提供了多种并发容器类来改进同步容器的性能;
4.1 ConcurrentHashMap
- ConcurrentHashMap 同步容器类是 Java5 增加的一个线程安全的哈希表;介于 HashMap 与 Hashtable 之间;
内部采用"锁分段"机制替代Hashtable的独占锁,进而提高性能;
- 此包还提供了设计用于多线程上下文中的
Collection
实现: ConcurrentHashMap
,ConcurrentSkipListMap
ConcurrentSkipListSet
, CopyOnWriteArrayList
和 CopyOnWriteArraySet
;
- 当期望许多线程访问一个给定collection时,
ConcurrentHashMap
通常优于同步的HashMap
;
ConcurrentSkipListMap
通常优于同步的TreeMap
;
- 当期望的读数和遍历远远大于列表的更新数时,
CopyOnWriteArrayList
优于同步的ArrayList
;
4.2 CountDownLatch(闭锁)
CountDownLatch
是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待;
public class TestCountDownLatch{
public static void main(String[] args){
final CountDownLatch latch = new CountDownLatch(10);
LatchDemo ld = new LatchDemo(latch);
long start = System.currentTimeMillis();
for(int i=0; i<10; i++){
new Thread(ld).start();
}
try{
latch.await();
}catch(InterruptedException e){
}
long end = System.currentTimeMillis();
System.out.println("耗费时间为:"+(end - start));
}
}
class LatchDemo implements Runnable{
private CountDownLatch latch;
public LatchDemo(CountDownLatch latch){
this.latch = latch;
}
public void run(){
synchronized(this){
try{
for(int i=0; i<50000; i++){
if(i % 2 == 0){
System.out.println(i);
}
}
}finally{
latch.countDown();
}
}
}
}
5. 创建执行线程的方式三
- 相较于实现 Runnable 接口的方式,实现 Callable 接口类中的方法可以有返回值,并且可以抛出异常;
public class TestCallable{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
try{
Integer sum = result.get();
System.out.println(sum);
}catch(InterruptedException | ExecutionException e){
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer>{
public Integer call() throws Exception{
int sum = 0;
for(int i=0; i<=100; i++){
sum += i;
}
return sum;
}
}
6. 同步锁(Lock)
public class TestLock{
public static void main(String[] args){
Ticket ticket = new Ticket();
new Thread(ticket,"1号窗口").start();
new Thread(ticket,"2号窗口").start();
new Thread(ticket,"3号窗口").start();
}
}
class Ticket implements Runnable{
private int tick = 100;
public void run(){
while(true){
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票为: "+ --tick);
}
}
}
}
class Ticket implements Runnable{
private int tick = 100;
private Lock lock = new ReentrantLock();
public void run(){
while(true){
lock.lock();
try{
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票为: "+ --tick);
}
}finally{
lock.unlock();
}
}
}
}
public class TestABCAlternate{
public static void main(String[] args){
AlternateDemo ad = new AlternateDemo();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopC(i);
System.out.println("--------------------");
}
}
},"C").start();
}
}
class AlternateDemo{
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void loopA(int totalLoop){
lock.lock();
try{
if(number != 1){
condition1.await();
}
for(int i=1; i <= 5; i++){
System.out.println(Thread.currentThread().getName()+"t"+i+"t"+totalLoop);
}
number = 2;
condition2.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void loopB(int totalLoop){
lock.lock();
try{
if(number != 2){
condition2.await();
}
for(int i=1; i <= 15; i++){
System.out.println(Thread.currentThread().getName()+"t"+i+"t"+totalLoop);
}
number = 3;
condition3.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void loopC(int totalLoop){
lock.lock();
try{
if(number != 3){
condition3.await();
}
for(int i=1; i <= 20; i++){
System.out.println(Thread.currentThread().getName()+"t"+i+"t"+totalLoop);
}
number = 1;
condition1.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
7. ReadWriteLock(读写锁)
public class TestReadWriteLock{
public static void main(String[] args){
ReadWriteLockDemo rw = new ReadWriteLockDemo();
new Thread(new Runnable(){
public void run(){
rw.set((int)(Math.random()*100));
}
},"Write:").start();
for(int i=0; i<100; i++){
new Thread(new Runnable(){
public void run(){
rw.get();
}
},"Read:").start();
}
}
}
class ReadWriteLockDemo{
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void get(){
lock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+":"+number);
}finally{
lock.readLock().unlock();
}
}
public void set(int number){
lock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName());
this.number = number;
}finally{
lock.writeLock().unlock();
}
}
}
8. 线程八锁
public class Test{
public static void main(String[] args){
Demo demo = new Demo();
Demo demo2 = new Demo();
new Thread(new Runnable(){
public void run(){
demo.getOne();
}
}).start();
new Thread(new Runnable(){
public void run(){
demo.getTwo();
}
}).start();
}
}
class Demo{
public synchronized void getOne(){
try{
Thread.sleep(3000);
}catch(InterruptedException e){
}
System.out.println("one");
}
public synchronized void getTwo(){
System.out.println("two");
}
}
9. 线程池
- 线程池提供了一个线程队列,队列中保存着所有等待状态的线程;
- 避免了创建与销毁线程的额外开销,提高了响应速度;
- 线程池的体系结构
java.util.concurrent.Executor
: 负责线程的使用和调度的根接口;
ExecutorService
: 子接口,线程池的主要接口;
ThreadPoolExecutor
: 线程池的实现类;
ScheduledExecutorService
: 子接口,负责线程的调度;
ScheduledThreadPoolExecutor
: 继承了线程池的实现类,实现了负责线程调度的子接口;
- 工具类:
Executors
ExecutorService newFixedThreadPool()
: 创建固定大小的线程池;
ExecutorService newCachedThreadPool()
: 缓存线程池,线程池中线程的数量不固定,可以根据需求自动更改数量;
ExecutorService newSingleThreadExecutor()
: 创建单个线程池, 线程池中只有一个线程;
ScheduledExecutorService newScheduledThreadPool()
: 创建固定大小的线程,可以延时或定时的执行任务;
public class TestThreadPool{
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadPoolDemo tpd = new ThreadPoolDemo();
for(int i=0; i<10; i++){
pool.submit(tpd);
}
pool.shutdown();
}
}
class ThreadPoolDemo implements Runnable{
private int i=0;
public void run(){
while(i <= 100){
System.out.println(Thread.currentThread().getName()+" : "+ i++)
}
}
}
9.1 线程调度
public class TestScheduledThreadPool{
public static void main(String[] args) throws Exception{
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for(int i=0; i < 10; i++){
Future<Integer> result = pool.schedule(new Callable<Integer>(){
public Integer call() throws Exception{
int num = new Random().nextInt(100);
System.out.println(Thread.currentThread().getName()+ ":" + num);
return num;
}
}, 3, TimeUnit.SECONDS);
System.out.println(result.get());
}
pool.shutdown();
}
}
10 Fork/Join 框架
public class TestForkJoinPool{
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 100000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
}
}
class ForkJoinSumCalculate extends RecursiveTask<Long>{
private static final long serialVersionUID = 24340990L;
private long start;
private long end;
private static final long THURSHOLD = 10000L;
public ForkJoinSumCalculate(long start, long end){
this.start = start;
this.end = end;
}
public Long compute(){
long length = end - start;
if(length <= THURSHOLD){
long sum = 0L;
for(long i = start; i<=end; i++){
sum += i;
}
return sum;
}else{
long middle = (start + end ) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork();
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}