2021-07-20

备战-Java 并发

备战-Java 并发

 

      谁念西风独自凉,萧萧黄叶闭疏窗

 

简介:备战-Java 并发。

一、线程的使用

有三种使用线程的方法:

  • 实现 Runnable 接口;
  • 实现 Callable 接口;
  • 继承 Thread 类。

实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以理解为任务是通过线程驱动从而执行的。

1、实现 Runnable 接口

需要实现接口中的 run() 方法。

1 public class MyRunnable implements Runnable {2  @Override3  public void run() {4   // do your own business5  }6 }

使用 Runnable 实例再创建一个 Thread 实例,然后调用 Thread 实例的 start() 方法来启动线程。

1 public static void main(String[] args) {2  MyRunnable instance = new MyRunnable();3  Thread thread = new Thread(instance);4  thread.start();5 }

View Code

2、实现 Callable 接口

与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。

public class MyCallable implements Callable<Integer> { public Integer call() {  return 666; }}

View Code
1 public static void main(String[] args) throws ExecutionException, InterruptedException {2  MyCallable mc = new MyCallable();3  FutureTask<Integer> ft = new FutureTask<>(mc);4  Thread thread = new Thread(ft);5  thread.start();6  System.out.println(ft.get()); // 6667 }

View Code

3、继承 Thread 类

同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。

当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。

1 public class MyThread extends Thread {2  public void run() {3   // do what you want to do4  }5 }

1 public static void main(String[] args) {2  MyThread mt = new MyThread();3  mt.start();4 }

4、实现Runnable/Callable 接口 VS 继承 Thread

实现接口会更好一些,因为:

  • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

二、基础线程机制

1、Executor

Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。

主要有三种 Executor:

  • CachedThreadPool:一个任务创建一个线程;
  • FixedThreadPool:所有任务只能使用固定大小的线程;
  • SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool。
1 public static void main(String[] args) {2  ExecutorService executorService = Executors.newCachedThreadPool();3  for (int i = 0; i < 5; i++) {4   executorService.execute(new MyRunnable());5  }6  executorService.shutdown();7 }

View Code

2、Daemon

守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。

当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。

main() 属于非守护线程。

在线程启动之前使用 setDaemon() 方法可以将一个线程设置为守护线程。

1 public static void main(String[] args) {2  Thread thread = new Thread(new MyRunnable());3  thread.setDaemon(true);4 }

守护线程应用场景:

  • QQ、飞讯等等聊天软件,主程序是非守护线程,而所有的聊天窗口是守护线程 ,当在聊天的过程中,直接关闭聊天应用程序时,聊天窗口也会随之关闭,但不是立即关闭,而是需要缓冲,等待接收到关闭命令后才会执行窗口关闭操作。
  • JVM 中gc 线程是守护线程,作用就是当所有用户自定义线以及主线程执行完毕后,gc线程才停止。

3、sleep()

Thread.sleep(millisec) 方法会休眠当前正在执行的线程,millisec 单位为毫秒。

sleep() 可能会抛出 InterruptedException,因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

1 public void run() {2  try {3   Thread.sleep(3000);4  } catch (InterruptedException e) {5   e.printStackTrace();6  }7 }

View Code

4、yield()

对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1 public void run() {2  Thread.yield();3 }

三、中断

一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。

1、InterruptedException

通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。

对于以下代码,在 main() 中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep() 方法,因此会抛出一个 InterruptedException,从而提前结束线程,不执行之后的语句。

 1 public class InterruptExample { 2  3  private static class MyThread1 extends Thread { 4   @Override 5   public void run() { 6    try { 7     Thread.sleep(2000); 8     System.out.println("Thread run"); 9    } catch (InterruptedException e) {10     e.printStackTrace();11    }12   }13  }14 }

View Code
1 public static void main(String[] args) throws InterruptedException {2  Thread thread1 = new MyThread1();3  thread1.start();4  thread1.interrupt();5  System.out.println("Main run");6 }

View Code
1 Main run2 java.lang.InterruptedException: sleep interrupted3  at java.lang.Thread.sleep(Native Method)4  at InterruptExample.lambda$main$0(InterruptExample.java:5)5  at InterruptExample$$Lambda$1/713338599.run(Unknown Source)6  at java.lang.Thread.run(Thread.java:745)

View Code

2、interrupted()

如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。

但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

 1 public class InterruptExample { 2  3  private static class MyThread2 extends Thread { 4   @Override 5   public void run() { 6    while (!interrupted()) { 7     // .. 8    } 9    System.out.println("Thread end");10   }11  }12 }

View Code
1 public static void main(String[] args) throws InterruptedException {2  Thread thread2 = new MyThread2();3  thread2.start();4  thread2.interrupt();5  // thread end6 }

View Code

3、Executor 的中断操作

调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。

 1 public static void main(String[] args) { 2  ExecutorService executorService = Executors.newCachedThreadPool(); 3  executorService.execute(() -> { 4   try { 5    Thread.sleep(2000); 6    System.out.println("Thread run"); 7   } catch (InterruptedException e) { 8    e.printStackTrace(); 9   }10  });11  executorService.shutdownNow();12  System.out.println("Main run");13 }

View Code
1 Main run2 java.lang.InterruptedException: sleep interrupted3  at java.lang.Thread.sleep(Native Method)4  at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)5  at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)6  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)7  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)8  at java.lang.Thread.run(Thread.java:745)

View Code

如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。

1 Future<?> future = executorService.submit(() -> {2  // do your own business3 });4 future.cancel(true);

四、互斥同步

Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock。

1、synchronized

同步一个代码块

1 public void func() {2  synchronized (this) {3   // do your own business4  }5 }

它只作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。

对于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。

 1 public class SynchronizedExample { 2  3  public void func1() { 4   synchronized (this) { 5    for (int i = 0; i < 10; i++) { 6     System.out.print(i + " "); 7    } 8   } 9  }10 }

View Code
1 public static void main(String[] args) {2  SynchronizedExample e1 = new SynchronizedExample();3  ExecutorService executorService = Executors.newCachedThreadPool();4  executorService.execute(() -> e1.func1());5  executorService.execute(() -> e1.func1());6 }

View Code
打印:0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。

1 // e1 和 e2 两个不同的对象2 public static void main(String[] args) {3  SynchronizedExample e1 = new SynchronizedExample();4  SynchronizedExample e2 = new SynchronizedExample();5  ExecutorService executorService = Executors.newCachedThreadPool();6  executorService.execute(() -> e1.func1());7  executorService.execute(() -> e2.func1());8 }

View Code
打印:0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

同步一个方法

1 public synchronized void func () {2  // do what you want to do3 }

它和同步代码块一样,作用于同一个对象。

同步一个类

1 public void func() {2  synchronized (SynchronizedExample.class) {3   // do what you want to do4  }5 }

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

 1 public class SynchronizedExample { 2  3  public void func2() { 4   synchronized (SynchronizedExample.class) { 5    for (int i = 0; i < 10; i++) { 6     System.out.print(i + " "); 7    } 8   } 9  }10 }

View Code
1 public static void main(String[] args) {2  SynchronizedExample e1 = new SynchronizedExample();3  SynchronizedExample e2 = new SynchronizedExample();4  ExecutorService executorService = Executors.newCachedThreadPool();5  executorService.execute(() -> e1.func2());6  executorService.execute(() -> e2.func2());7 }

View Code
输出:0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

同步一个静态方法

1 public synchronized static void fun() {2  // synchronized 同步静态方法作用于整个类3 }

2、ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。

 1 public class LockExample { 2  3  private Lock lock = new ReentrantLock(); 4  5  public void func() { 6   lock.lock(); 7   try { 8    for (int i = 0; i < 10; i++) { 9     System.out.print(i + " ");10    }11   } finally {12    lock.unlock(); // 确保释放锁,从而避免发生死锁。13   }14  }15 }

View Code
1 public static void main(String[] args) {2  LockExample lockExample = new LockExample();3  ExecutorService executorService = Executors.newCachedThreadPool();4  executorService.execute(() -> lockExample.func());5  executorService.execute(() -> lockExample.func());6 }

View Code
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

3、比较

锁的实现

synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

性能

新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

ReentrantLock 可中断,而 synchronized 不行。

公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

锁绑定多个条件

一个 ReentrantLock 可以同时绑定多个 Condition 对象。

4、使用选择

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

五、线程之间的协作

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

1、join()

在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

 1 public class JoinExample { 2  3  private class A extends Thread { 4   @Override 5   public void run() { 6    System.out.println("A"); 7   } 8  } 9 10  private class B extends Thread {11 12   private A a;13 14   B(A a) {15    this.a = a;16   }17 18   @Override19   public void run() {20    try {21     a.join();22    } catch (InterruptedException e) {23     e.printStackTrace();24    }25    System.out.println("B");26   }27  }28 29  public void test() {30   A a = new A();31   B b = new B(a);32   b.start();33   a.start();34  }35 }

View Code
1 public static void main(String[] args) {2  JoinExample example = new JoinExample();3  example.test();4 }

输出:
A
B

2、wait()、 notify()、 notifyAll()

调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。

它们都属于 Object 的一部分,而不属于 Thread。

只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。

使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

 1 public class WaitNotifyExample { 2  3  public synchronized void before() { 4   System.out.println("before"); 5   notifyAll(); 6  } 7  8  public synchronized void after() { 9   try {10    wait();11   } catch (InterruptedException e) {12    e.printStackTrace();13   }14   System.out.println("after");15  }16 }

View Code
1 public static void main(String[] args) {2  ExecutorService executorService = Executors.newCachedThreadPool();3  WaitNotifyExample example = new WaitNotifyExample();4  executorService.execute(() -> example.after());5  executorService.execute(() -> example.before());6 }

输出:
before
after

wait() 和 sleep() 的区别

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
  • wait() 会释放锁,sleep() 不会。

3、await() 、signal()、 signalAll()

java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。

相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。

使用 Lock 来获取一个 Condition 对象。

 1 public class AwaitSignalExample { 2  3  private Lock lock = new ReentrantLock(); 4  private Condition condition = lock.newCondition(); 5  6  public void before() { 7   lock.lock(); 8   try { 9    System.out.println("before");10    condition.signalAll();11   } finally {12    lock.unlock();13   }14  }15 16  public void after() {17   lock.lock();18   try {19    condition.await();20    System.out.println("after");21   } catch (InterruptedException e) {22    e.printStackTrace();23   } finally {24    lock.unlock();25   }26  }27 }

View Code
1 public static void main(String[] args) {2  ExecutorService executorService = Executors.newCachedThreadPool();3  AwaitSignalExample example = new AwaitSignalExample();4  executorService.execute(() -> example.after());5  executorService.execute(() -> example.before());6 }

1 输出:2 before3 after

六、线程状态

一个线程只能处于一种状态(六大状态:新建、可运行、阻塞、无限期等待、限期等待、死亡),并且这里的线程状态特指 Java 虚拟机的线程状态,不能反映线程在特定操作系统下的状态。

1、新建(NEW)

创建后尚未启动。

2、可运行(RUNABLE)

正在 Java 虚拟机中运行。但是在操作系统层面,它可能处于运行状态,也可能等待资源调度(例如处理器资源),资源调度完成就进入运行状态。所以该状态的可运行是指可以被运行,具体有没有运行要看底层操作系统的资源调度。

3、阻塞(BLOCKED)

请求获取 monitor lock 从而进入 synchronized 函数或者代码块,但是其它线程已经占用了该 monitor lock,所以出于阻塞状态。要结束该状态进入从而 RUNABLE 需要其他线程释放 monitor lock。

4、无限期等待(WAITING)

等待其它线程显式地唤醒。

阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 monitor lock。而等待是主动的,通过调用 Object.wait() 等方法进入。

进入方法退出方法
没有设置 Timeout 参数的 Object.wait() 方法Object.notify() / Object.notifyAll()
没有设置 Timeout 参数的 Thread.join() 方法被调用的线程执行完毕
LockSupport.park() 方法LockSupport.unpark(Thread)

5、限期等待(TIMED_WAITING)

无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。

进入方法退出方法
Thread.sleep() 方法时间结束
设置了 Timeout 参数的 Object.wait() 方法时间结束 / Object.notify() / Object.notifyAll()
设置了 Timeout 参数的 Thread.join() 方法时间结束 / 被调用的线程执行完毕
LockSupport.parkNanos() 方法LockSupport.unpark(Thread)
LockSupport.parkUntil() 方法LockSupport.unpark(Thread)

调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用"使一个线程睡眠"进行描述。调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,常常用"挂起一个线程"进行描述。睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。

6、死亡(TERMINATED)

可以是线程结束任务之后自己结束,或者产生了异常而结束。

七、J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。

1、CountDownLatch

用来控制一个或者多个线程等待多个线程。

维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

 1 public class CountdownLatchExample { 2  3  public static void main(String[] args) throws InterruptedException { 4   final int totalThread = 10; 5   CountDownLatch countDownLatch = new CountDownLatch(totalThread); 6   ExecutorService executorService = Executors.newCachedThreadPool(); 7   for (int i = 0; i < totalThread; i++) { 8    executorService.execute(() -> { 9     System.out.print("run..");10     countDownLatch.countDown();11    });12   }13   countDownLatch.await();14   System.out.println("end");15   executorService.shutdown();16  }17 }

View Code
输出:run..run..run..run..run..run..run..run..run..run..end

2、CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

 1 public CyclicBarrier(int parties, Runnable barrierAction) { 2  if (parties <= 0) throw new IllegalArgumentException(); 3  this.parties = parties; 4  this.count = parties; 5  this.barrierCommand = barrierAction; 6 } 7  8 public CyclicBarrier(int parties) { 9  this(parties, null);10 }

View Code

 1 public class CyclicBarrierExample { 2  3  public static void main(String[] args) { 4   final int totalThread = 10; 5   CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread); 6   ExecutorService executorService = Executors.newCachedThreadPool(); 7   for (int i = 0; i < totalThread; i++) { 8    executorService.execute(() -> { 9     System.out.print("before..");10     try {11      cyclicBarrier.await();12     } catch (InterruptedException | BrokenBarrierException e) {13      e.printStackTrace();14     }15     System.out.print("after..");16    });17   }18   executorService.shutdown();19  }20 }

View Code
输出:before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

3、Semaphore

Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

 1 public class SemaphoreExample { 2  3  public static void main(String[] args) { 4   final int clientCount = 3; 5   final int totalRequestCount = 10; 6   Semaphore semaphore = new Semaphore(clientCount); 7   ExecutorService executorService = Executors.newCachedThreadPool(); 8   for (int i = 0; i < totalRequestCount; i++) { 9    executorService.execute(()->{10     try {11      semaphore.acquire();12      System.out.print(semaphore.availablePermits() + " ");13     } catch (InterruptedException e) {14      e.printStackTrace();15     } finally {16      semaphore.release();17     }18    });19   }20   executorService.shutdown();21  }22 }

View Code
输出:2 1 2 2 2 2 2 1 2 2

CountDownLatch、CyclicBarrier、Semaphore 简单问连接:https://www.cnblogs.com/taojietaoge/archive/2019/08/01/11188118.html

八、J.U.C - 其它组件

1、FutureTask

在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future<V> 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future<V> 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。

 1 public class FutureTaskExample { 2  3  public static void main(String[] args) throws ExecutionException, InterruptedException { 4   FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { 5    @Override 6    public Integer call() throws Exception { 7     int result = 0; 8     for (int i = 0; i < 100; i++) { 9      Thread.sleep(10);10      result += i;11     }12     return result;13    }14   });15 16   Thread computeThread = new Thread(futureTask);17   computeThread.start();18 19   Thread otherThread = new Thread(() -> {20    System.out.println("other task is running...");21    try {22     Thread.sleep(1000);23    } catch (InterruptedException e) {24     e.printStackTrace();25    }26   });27   otherThread.start();28   System.out.println(futureTask.get());29  }30 }

View Code
1 输出:2 other task is running...3 4950

2、BlockingQueue

java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

  • FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
  • 优先级队列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置。

使用 BlockingQueue 实现生产者消费者问题

 1 public class ProducerConsumer { 2  3  private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); 4  5  private static class Producer extends Thread { 6   @Override 7   public void run() { 8    try { 9     queue.put("product"); // 若队列为满 put() 将阻塞10    } catch (InterruptedException e) {11     e.printStackTrace();12    }13    System.out.print("produce..");14   }15  }16 17  private static class Consumer extends Thread {18 19   @Override20   public void run() {21    try {22     String product = queue.take(); // 若队列为空 take() 将阻塞23    } catch (InterruptedException e) {24     e.printStackTrace();25    }26    System.out.print("consume..");27   }28  }29 }

View Code
 1 public static void main(String[] args) { 2  for (int i = 0; i < 2; i++) { 3   Producer producer = new Producer(); 4   producer.start(); 5  } 6  for (int i = 0; i < 5; i++) { 7   Consumer consumer = new Consumer(); 8   consumer.start(); 9  }10  for (int i = 0; i < 3; i++) {11   Producer producer = new Producer();12   producer.start();13  }14 }

View Code
输出:produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

3、ForkJoin

主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

 1 public class ForkJoinExample extends RecursiveTask<Integer> { 2  3  private final int threshold = 5; 4  private int first; 5  private int last; 6  7  public ForkJoinExample(int first, int last) { 8   this.first = first; 9   this.last = last;10  }11 12  @Override13  protected Integer compute() {14   int result = 0;15   if (last - first <= threshold) {16    // 任务足够小则直接计算17    for (int i = first; i <= last; i++) {18     result += i;19    }20   } else {21    // 拆分成小任务22    int middle = first + (last - first) / 2;23    ForkJoinExample leftTask = new ForkJoinExample(first, middle);24    ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);25    leftTask.fork();26    rightTask.fork();27    result = leftTask.join() + rightTask.join();28   }29   return result;30  }31 }

View Code
1 public static void main(String[] args) throws ExecutionException, InterruptedException {2  ForkJoinExample example = new ForkJoinExample(1, 10000);3  ForkJoinPool forkJoinPool = new ForkJoinPool();4  Future result = forkJoinPool.submit(example);5  System.out.println(result.get());6 }

View Code

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。

九、线程不安全示例

如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。

以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值有可能小于 1000。

 1 public class ThreadUnsafeExample { 2  3  private int cnt = 0; 4  5  public void add() { 6   cnt++; 7  } 8  9  public int get() {10   return cnt;11  }12 }

View Code
 1 public static void main(String[] args) throws InterruptedException { 2  final int threadSize = 1000; 3  ThreadUnsafeExample example = new ThreadUnsafeExample(); 4  final CountDownLatch countDownLatch = new CountDownLatch(threadSize); 5  ExecutorService executorService = Executors.newCachedThreadPool(); 6  for (int i = 0; i < threadSize; i++) { 7   executorService.execute(() -> { 8    example.add(); 9    countDownLatch.countDown();10   });11  }12  countDownLatch.await();13  executorService.shutdown();14  System.out.println(example.get()); // 99715 }

View Code

十、Java 内存模型

Java 内存模型试图屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

Java 运行时内存参考链接:https://www.cnblogs.com/taojietaoge/p/10264416.html


1、主内存与工作内存

处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。


所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。

2、内存间交互操作

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。

  • read:把一个变量的值从主内存传输到工作内存中
  • load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
  • use:把工作内存中一个变量的值传递给执行引擎
  • assign:把一个从执行引擎接收到的值赋给工作内存的变量
  • store:把工作内存的一个变量的值传送到主内存中
  • write:在 store 之后执行,把 store 得到的值放入主内存的变量中
  • lock:作用于主内存的变量
  • unlock

3、内存模型三大特性

原子性

Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。

有一个错误认识就是,int 等原子性的类型在多线程环境中不会出现线程安全问题。前面的线程不安全示例代码中,cnt 属于 int 类型变量,1000 个线程对它进行自增操作之后,得到的值为 997 而不是 1000。

为了方便讨论,将内存间的交互操作简化为 3 个:load、assign、store。

下图演示了两个线程同时对 cnt 进行操作,load、assign、store 这一系列操作整体上看不具备原子性,那么在 T1 修改 cnt 并且还没有将修改后的值写入主内存,T2 依然可以读入旧值。可以看出,这两个线程虽然执行了两次自增运算,但是主内存中 cnt 的值最后为 1 而不是 2。因此对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性。

AtomicInteger 能保证多个线程修改的原子性。

使用 AtomicInteger 重写之前线程不安全的代码之后得到以下线程安全实现:

 1 public class AtomicExample { 2  private AtomicInteger cnt = new AtomicInteger(); 3  4  public void add() { 5   cnt.incrementAndGet(); 6  } 7  8  public int get() { 9   return cnt.get();10  }11 }

View Code
 1 public static void main(String[] args) throws InterruptedException { 2  final int threadSize = 1000; 3  AtomicExample example = new AtomicExample(); // 只修改这条语句 4  final CountDownLatch countDownLatch = new CountDownLatch(threadSize); 5  ExecutorService executorService = Executors.newCachedThreadPool(); 6  for (int i = 0; i < threadSize; i++) { 7   executorService.execute(() -> { 8    example.add(); 9    countDownLatch.countDown();10   });11  }12  countDownLatch.await();13  executorService.shutdown();14  System.out.println(example.get()); // 100015 }

View Code

除了使用原子类之外,也可以使用 synchronized 互斥锁来保证操作的原子性。它对应的内存间交互操作为:lock 和 unlock,在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit。

 1 public class AtomicSynchronizedExample { 2  private int cnt = 0; 3  4  public synchronized void add() { 5   cnt++; 6  } 7  8  public synchronized int get() { 9   return cnt;10  }11 }

View Code
 1 public static void main(String[] args) throws InterruptedException { 2  final int threadSize = 1000; 3  AtomicSynchronizedExample example = new AtomicSynchronizedExample(); 4  final CountDownLatch countDownLatch = new CountDownLatch(threadSize); 5  ExecutorService executorService = Executors.newCachedThreadPool(); 6  for (int i = 0; i < threadSize; i++) { 7   executorService.execute(() -> { 8    example.add(); 9    countDownLatch.countDown();10   });11  }12  countDownLatch.await();13  executorService.shutdown();14  System.out.println(example.get()); // 100015 }

View Code

可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

主要有三种实现可见性的方式:

  • volatile(Java高阶语法volatile 链接:https://www.cnblogs.com/taojietaoge/p/10260888.html)
  • synchronized,对一个变量执行 unlock 操作之前,必须把变量值同步回主内存。
  • final,被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。

对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,因为 volatile 并不能保证操作的原子性。

有序性

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。

4、先行发生原则

上面提到了可以用 volatile 和 synchronized 来保证有序性。除此之外,JVM 还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。

单一线程原则

 

Single Thread rule,在一个线程内,在程序前面的操作先行发生于后面的操作。

管程锁定规则

 

Monitor Lock Rule,一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。

volatile 变量规则

 

Volatile Variable Rule,对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。

原文转载:http://www.shaoqun.com/a/889723.html

跨境电商:https://www.ikjzd.com/

声网:https://www.ikjzd.com/w/2176

夸克:https://www.ikjzd.com/w/1237

黄劲:https://www.ikjzd.com/w/2426


备战-Java并发      谁念西风独自凉,萧萧黄叶闭疏窗简介:备战-Java并发。一、线程的使用有三种使用线程的方法:实现Runnable接口;实现Callable接口;继承Thread类。实现Runnable和Callable接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过Thread来调用。可以理解为任务是通过线程驱动从而执行的。1、实现Runnable
墩煌网:https://www.ikjzd.com/w/189
vava:https://www.ikjzd.com/w/2780
ensogo:https://www.ikjzd.com/w/1485
四姑娘论剑_当美人儿遭遇富二代 :http://www.30bags.com/a/410266.html
四姑娘山,稻城亚丁,丹巴环线七天自助游 - :http://www.30bags.com/a/405870.html
四姑娘山_四姑娘山景点介绍:http://www.30bags.com/a/430472.html
四姑娘山-宝石矿产 - :http://www.30bags.com/a/408162.html
头埋在双腿间吸食花蜜 深一点,我下面好爽:http://lady.shaoqun.com/m/a/247055.html
中学生手淫的危害有多大?掌握了这个就能克服!:http://lady.shaoqun.com/a/426305.html
为什么你每天忙忙碌碌,却还是经常0单?:https://www.ikjzd.com/articles/146733
夫妻过着冰冷的生活?使用这些"小技巧"可以刺激欲望:http://lady.shaoqun.com/a/426306.html
男生很喜欢女生的表现,是立马走火入魔还是舍不得碰:http://lady.shaoqun.com/a/426307.html

No comments:

Post a Comment