一、JUC 包(java.util.concurrent)
JUC 包是 Java 5 引入的并发工具包,提供了比synchronized更高级的并发功能:
1. 原子类(Atomic)
基于 CAS(Compare-and-Swap)实现无锁原子操作:
java
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // 原子自增
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.decrementAndGet(); // 原子自减
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Counter: " + counter.get()); // 输出0
}
}
2. 锁机制(Lock)
java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 加锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
}
3. 并发容器
- ConcurrentHashMap:线程安全的哈希表
- CopyOnWriteArrayList:写时复制的列表
- BlockingQueue:阻塞队列(如 LinkedBlockingQueue、ArrayBlockingQueue)
java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
二、锁机制详解
1. 锁的分类
- 悲观锁:假设会发生冲突,如
synchronized、ReentrantLock - 乐观锁:假设不会发生冲突,如
Atomic类 - 可重入锁:同一线程可多次获取,如
ReentrantLock - 公平锁:按请求顺序获取锁,
ReentrantLock(true) - 读写锁:
ReentrantReadWriteLock,读锁共享,写锁互斥
2. synchronized vs ReentrantLock
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 锁获取方式 | 隐式(JVM 层面) | 显式调用 lock/unlock |
| 可中断性 | 不可中断 | 可中断(lockInterruptibly) |
| 公平性 | 非公平 | 可设置公平 / 非公平 |
| 条件变量 | 单一 wait/notify 机制 | 多个 Condition 对象 |
3. 读写锁示例
java
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Cache {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwl.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwl.writeLock();
private Object data;
public Object read() {
readLock.lock();
try {
return data;
} finally {
readLock.unlock();
}
}
public void write(Object newData) {
writeLock.lock();
try {
data = newData;
} finally {
writeLock.unlock();
}
}
}
三、IO 流与 NIO
1. 传统 IO(BIO)
- 阻塞式 IO,一个连接对应一个线程
- 适用于连接数少且稳定的场景
java
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8080);
System.out.println("Server started on port 8080");
while (true) {
Socket socket = server.accept(); // 阻塞等待连接
System.out.println("New client connected");
// 为每个连接创建新线程
new Thread(() -> {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(
socket.getOutputStream(), true)) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println("Server received: " + inputLine);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
2. NIO(New IO)
- 非阻塞 IO,基于 Selector、Channel 和 Buffer
- 单线程可处理多个连接,适用于高并发场景
java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
private static final int PORT = 8080;
public static void main(String[] args) throws IOException {
// 创建Selector和ServerSocketChannel
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server started on port " + PORT);
while (true) {
// 阻塞等待就绪的通道
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理新连接
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("New client connected: " + clientChannel);
} else if (key.isReadable()) {
// 处理读事件
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data);
System.out.println("Received from client: " + message);
// 回写数据
ByteBuffer response = ByteBuffer.wrap(("Server response: " + message).getBytes());
clientChannel.write(response);
} else if (bytesRead == -1) {
// 客户端关闭连接
clientChannel.close();
System.out.println("Client disconnected");
}
}
keyIterator.remove();
}
}
}
}
四、线程池技术
1. 线程池的优势
- 降低线程创建和销毁的开销
- 控制线程数量,避免资源耗尽
- 提供任务管理机制
2. 线程池的创建方式
java
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 使用Executors工厂方法(不推荐)
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 手动创建(推荐)
ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10), // 任务队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
customThreadPool.submit(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
customThreadPool.shutdown();
}
}
3. 线程池参数说明
- corePoolSize:核心线程数,线程池保持的最小线程数
- maximumPoolSize:最大线程数,线程池允许的最大线程数
- keepAliveTime:空闲线程存活时间
- workQueue:任务队列,存储待执行的任务
- threadFactory:线程工厂,用于创建线程
- rejectedExecutionHandler:拒绝策略,任务队列满时的处理方式
4. 拒绝策略
- AbortPolicy:默认策略,抛出 RejectedExecutionException
- CallerRunsPolicy:由调用线程执行任务
- DiscardPolicy:直接丢弃任务
- DiscardOldestPolicy:丢弃队列中最老的任务
五、并发工具类
1. CountDownLatch
java
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch latch = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Worker " + workerId + " finished");
latch.countDown(); // 计数减1
}).start();
}
latch.await(); // 等待所有工作线程完成
System.out.println("All workers have finished");
}
}
2. CyclicBarrier
java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("All threads have reached the barrier");
});
for (int i = 0; i < parties; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is working");
Thread.sleep(1000);
System.out.println("Thread " + threadId + " reached the barrier");
barrier.await(); // 等待其他线程
System.out.println("Thread " + threadId + " continues execution");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
3. Semaphore
java
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private static final int MAX_CONNECTIONS = 3;
private static final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
final int clientId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("Client " + clientId + " acquired connection");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println("Client " + clientId + " released connection");
}
}).start();
}
}
}
通过掌握以上内容,你可以熟练使用 Java 并发编程的核心技术,包括 JUC 包的各种工具、锁机制、IO 流和线程池,从而编写出高效、安全的并发程序。