Java线程通信
第一部分:为什么要进行线程通信?
线程通信的根本原因是:为了让多个线程能够有效、安全地协同工作,以完成一个共同的任务。
当程序从单线程变成多线程时,会引入两个核心问题:
竞争条件: 多个线程同时读写共享资源(如一个变量、一个数据结构、一个文件等),导致最终结果依赖于线程执行的精确时序,从而产生不可预测、错误的结果。
经典例子: 两个线程同时对一个银行账户余额进行“+1”操作。理想结果是余额+2,但由于读写交叉执行,最终可能只+1。
内存可见性: 由于现代CPU的多级缓存架构和编译器的优化(如指令重排序),一个线程对共享资源的修改可能不会立即被其他线程看到,导致其他线程使用过期的数据。
因此,线程通信的目的就是为了解决这两个问题,具体来说:
同步(Synchronization): 控制多个线程对共享资源的访问顺序,确保在任一时刻只有一个(或特定数量的)线程可以访问临界资源,从而避免竞争条件。这是线程通信的“安全”目的。
协作(Cooperation): 让线程之间能够彼此通知和等待,以便协调他们的工作步调。例如,生产者线程生产了数据后通知消费者线程来消费;一个线程需要等待其他多个线程都完成任务后才能继续。这是线程通信的“协同”目的。
没有有效的线程通信机制,多线程程序就会陷入混乱,结果不可预测,也就是我们常说的“线程不安全”。
第二部分:多线程通信的方式
线程通信的方式多种多样,可以根据不同的抽象层级和场景来划分。以下是主流的几种方式:
1. 同步/锁机制
使用同步、保护多个线程共享访问的核心工具
// 共享计数器,使用synchronized保证线程安全
class SharedCounter {private int count = 0;public synchronized void increment() {count++;System.out.println(Thread.currentThread().getName() + ": " + count);}
}public class SharedMemoryExample {public static void main(String[] args) {SharedCounter counter = new SharedCounter();// 两个线程共享同一个计数器new Thread(() -> {for(int i = 0; i < 5; i++) counter.increment();}).start();new Thread(() -> {for(int i = 0; i < 5; i++) counter.increment();}).start();}
}
2. 等待/通知机制
这是线程间协作的核心机制,Java 中每个对象都内置了 wait()
, notify()
, notifyAll()
方法,但它们必须在 synchronized
同步块内使用。
// 简单的生产者-消费者模型
class Message {private String content;private boolean empty = true;public synchronized String read() {while(empty) {try {wait(); // 等待消息} catch (InterruptedException e) {}}empty = true;notifyAll(); // 通知生产者return content;}public synchronized void write(String message) {while(!empty) {try {wait(); // 等待消费} catch (InterruptedException e) {}}empty = false;this.content = message;notifyAll(); // 通知消费者}
}public class WaitNotifyExample {public static void main(String[] args) {Message message = new Message();// 消费者线程new Thread(() -> {String result;while((result = message.read()) != null) {System.out.println("收到: " + result);}}).start();// 生产者线程new Thread(() -> {String[] messages = {"你好", "世界", "结束"};for(String msg : messages) {message.write(msg);System.out.println("发送: " + msg);}}).start();}
}
3. volatile 关键字
保证了变量的可见性(一个线程修改后,新值立即对其他线程可见)和禁止指令重排序。它不能保证操作的原子性(例如 volatile int i; i++
这个操作不是原子的)。它通常用作一个简单的状态标志位。
// 使用volatile保证可见性
class VolatileFlag {private volatile boolean flag = false;public void setFlag() {flag = true;System.out.println("标志已设置");}public void checkFlag() {while(!flag) {// 空循环,等待flag变为true}System.out.println("检测到标志变化");}
}public class VolatileExample {public static void main(String[] args) throws InterruptedException {VolatileFlag example = new VolatileFlag();// 检测线程new Thread(() -> example.checkFlag()).start();Thread.sleep(1000); // 等待1秒// 设置线程new Thread(() -> example.setFlag()).start();}
}
4. 信号量
它维护一个许可证数量。线程通过 acquire()
获取许可证,如果数量为0则阻塞;通过 release()
释放许可证。它可以控制同时访问某个资源的线程数量。
// 使用信号量控制资源访问
class ResourcePool {private Semaphore semaphore;public ResourcePool(int limit) {semaphore = new Semaphore(limit);}public void useResource() {try {semaphore.acquire(); // 获取许可System.out.println(Thread.currentThread().getName() + " 使用资源");Thread.sleep(1000); // 模拟资源使用System.out.println(Thread.currentThread().getName() + " 释放资源");semaphore.release(); // 释放许可} catch (InterruptedException e) {e.printStackTrace();}}
}public class SemaphoreExample {public static void main(String[] args) {ResourcePool pool = new ResourcePool(2); // 最多2个线程同时访问// 创建5个线程尝试访问资源for(int i = 0; i < 5; i++) {new Thread(() -> pool.useResource(), "线程-" + i).start();}}
}
5. 管道流
用创建一个管道,一端是写入线程,另一端是读取线程。写入线程向管道输出流写数据,读取线程从管道输入流读数据。Java 提供了 PipedInputStream
和 PipedOutputStream
。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;// 使用管道在线程间传递数据
public class PipeExample {public static void main(String[] args) throws IOException {final PipedOutputStream output = new PipedOutputStream();final PipedInputStream input = new PipedInputStream(output);// 写入线程Thread writer = new Thread(() -> {try {output.write("Hello from pipe!".getBytes());output.close();} catch (IOException e) {e.printStackTrace();}});// 读取线程Thread reader = new Thread(() -> {try {int data;while((data = input.read()) != -1) {System.out.print((char) data);}input.close();} catch (IOException e) {e.printStackTrace();}});writer.start();reader.start();}
}
6. 高级并发工具类
现代编程语言(尤其是Java)提供了大量高级并发工具,简化了线程通信。
CountDownLatch: 一个或多个线程等待其他一组线程完成操作。例如,主线程等待所有工作线程初始化完毕。
CyclicBarrier: 让一组线程互相等待,直到所有线程都到达一个屏障点再继续执行。类似于“等人齐了再开会”。
Exchanger: 用于两个线程之间交换数据。在一个交换点上,两个线程会彼此等待,然后交换各自的数据。
生产者-消费者:用 BlockingQueue 通信,自动“有数据就通知消费,没数据就等待”,多用于替代“管道流”。