Java 并发编程核心知识体系

一、JUC 包(java.util.concurrent)

JUC 包是 Java 5 引入的并发工具包,提供了比synchronized更高级的并发功能:

1. 原子类(Atomic)

基于 CAS(Compare-and-Swap)实现无锁原子操作:

java

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicDemo {
    private static AtomicInteger counter = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet(); // 原子自增
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.decrementAndGet(); // 原子自减
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("Counter: " + counter.get()); // 输出0
    }
}
2. 锁机制(Lock)

java

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    private final Lock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock(); // 加锁
        try {
            count++;
        } finally {
            lock.unlock(); // 释放锁
        }
    }
}
3. 并发容器
  • ConcurrentHashMap:线程安全的哈希表
  • CopyOnWriteArrayList:写时复制的列表
  • BlockingQueue:阻塞队列(如 LinkedBlockingQueue、ArrayBlockingQueue)

java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueDemo {
    private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

    public static void main(String[] args) {
        // 生产者
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // 消费者
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    Integer item = queue.take();
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

二、锁机制详解

1. 锁的分类
  • 悲观锁:假设会发生冲突,如synchronizedReentrantLock
  • 乐观锁:假设不会发生冲突,如Atomic
  • 可重入锁:同一线程可多次获取,如ReentrantLock
  • 公平锁:按请求顺序获取锁,ReentrantLock(true)
  • 读写锁ReentrantReadWriteLock,读锁共享,写锁互斥
2. synchronized vs ReentrantLock
特性synchronizedReentrantLock
锁获取方式隐式(JVM 层面)显式调用 lock/unlock
可中断性不可中断可中断(lockInterruptibly)
公平性非公平可设置公平 / 非公平
条件变量单一 wait/notify 机制多个 Condition 对象
3. 读写锁示例

java

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Cache {
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = rwl.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = rwl.writeLock();
    private Object data;

    public Object read() {
        readLock.lock();
        try {
            return data;
        } finally {
            readLock.unlock();
        }
    }

    public void write(Object newData) {
        writeLock.lock();
        try {
            data = newData;
        } finally {
            writeLock.unlock();
        }
    }
}

三、IO 流与 NIO

1. 传统 IO(BIO)
  • 阻塞式 IO,一个连接对应一个线程
  • 适用于连接数少且稳定的场景

java

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class BioServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(8080);
        System.out.println("Server started on port 8080");

        while (true) {
            Socket socket = server.accept(); // 阻塞等待连接
            System.out.println("New client connected");

            // 为每个连接创建新线程
            new Thread(() -> {
                try (BufferedReader in = new BufferedReader(
                        new InputStreamReader(socket.getInputStream()));
                     PrintWriter out = new PrintWriter(
                        socket.getOutputStream(), true)) {

                    String inputLine;
                    while ((inputLine = in.readLine()) != null) {
                        out.println("Server received: " + inputLine);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
2. NIO(New IO)
  • 非阻塞 IO,基于 Selector、Channel 和 Buffer
  • 单线程可处理多个连接,适用于高并发场景

java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioServer {
    private static final int PORT = 8080;

    public static void main(String[] args) throws IOException {
        // 创建Selector和ServerSocketChannel
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(PORT));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started on port " + PORT);

        while (true) {
            // 阻塞等待就绪的通道
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    // 处理新连接
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("New client connected: " + clientChannel);
                } else if (key.isReadable()) {
                    // 处理读事件
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = clientChannel.read(buffer);

                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data);
                        String message = new String(data);
                        System.out.println("Received from client: " + message);

                        // 回写数据
                        ByteBuffer response = ByteBuffer.wrap(("Server response: " + message).getBytes());
                        clientChannel.write(response);
                    } else if (bytesRead == -1) {
                        // 客户端关闭连接
                        clientChannel.close();
                        System.out.println("Client disconnected");
                    }
                }

                keyIterator.remove();
            }
        }
    }
}

四、线程池技术

1. 线程池的优势
  • 降低线程创建和销毁的开销
  • 控制线程数量,避免资源耗尽
  • 提供任务管理机制
2. 线程池的创建方式

java

import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 1. 使用Executors工厂方法(不推荐)
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

        // 2. 手动创建(推荐)
        ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
                2,                  // 核心线程数
                5,                  // 最大线程数
                60,                 // 空闲线程存活时间
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),  // 任务队列
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()  // 拒绝策略
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            customThreadPool.submit(() -> {
                System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 关闭线程池
        customThreadPool.shutdown();
    }
}
3. 线程池参数说明
  • corePoolSize:核心线程数,线程池保持的最小线程数
  • maximumPoolSize:最大线程数,线程池允许的最大线程数
  • keepAliveTime:空闲线程存活时间
  • workQueue:任务队列,存储待执行的任务
  • threadFactory:线程工厂,用于创建线程
  • rejectedExecutionHandler:拒绝策略,任务队列满时的处理方式
4. 拒绝策略
  • AbortPolicy:默认策略,抛出 RejectedExecutionException
  • CallerRunsPolicy:由调用线程执行任务
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列中最老的任务

五、并发工具类

1. CountDownLatch

java

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int workerCount = 3;
        CountDownLatch latch = new CountDownLatch(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            new Thread(() -> {
                System.out.println("Worker " + workerId + " started");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Worker " + workerId + " finished");
                latch.countDown(); // 计数减1
            }).start();
        }

        latch.await(); // 等待所有工作线程完成
        System.out.println("All workers have finished");
    }
}
2. CyclicBarrier

java

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("All threads have reached the barrier");
        });

        for (int i = 0; i < parties; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " is working");
                    Thread.sleep(1000);
                    System.out.println("Thread " + threadId + " reached the barrier");
                    barrier.await(); // 等待其他线程
                    System.out.println("Thread " + threadId + " continues execution");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
3. Semaphore

java

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    private static final int MAX_CONNECTIONS = 3;
    private static final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            final int clientId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("Client " + clientId + " acquired connection");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println("Client " + clientId + " released connection");
                }
            }).start();
        }
    }
}

通过掌握以上内容,你可以熟练使用 Java 并发编程的核心技术,包括 JUC 包的各种工具、锁机制、IO 流和线程池,从而编写出高效、安全的并发程序。

博客内容均系原创,未经允许严禁转载!
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇