多线程(1)

多线程(1)

一、多线程的创建

继承Thread类,重写run方法

这种方式创建的多线程,Hello ThreadHello World会同时循环打印,每个线程都是独立的线程流,都能执行独立的逻辑。

public class Demo {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        // start()会调用操作系统的api创建线程,以run()作为入口方法执行。
        myThread.start();
        while (true) {
            System.out.println("Hello World");
        }
    }
}

class MyThread extends Thread {
    // 该方法会在合适的时机调用
    @Override
    public void run() {
        while (true) {
            System.out.println("Hello Thread");
        }
    }
}

// Hello Thread
// ...
// Hello World
// ...

如果将start()改成run(),实际上只有一个main线程。此时由于run()先被调用,因此只会循环打印Hello Thread而不会打印Hello World

public class Demo {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        // 将start()改成了run()
        myThread.run();
        while (true) {
            System.out.println("Hello World");
        }
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        while (true) {
            System.out.println("Hello Thread");
        }
    }
}

// Hello Thread
// ...

实现Runnable接口,重写run方法

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        // Runnable没用start()方法,只能与Thread配合使用
        MyRunnable myRunnable = new MyRunnable();
        Thread t = new Thread(myRunnable);
        t.start();
        while (true) {
            System.out.println("Hello Thread");
            Thread.sleep(1000);
        }
    }
}

class MyRunnable implements Runnable {
    @Override
    public void run() {
        while (true) {
            System.out.println("Hello Thread");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
  • 第一种写法(继承Thread),把线程和线程要执行的任务耦合在一起了
  • 第二种写法(实现Runnable),Runnable表示要执行的任务,Thread表示线程,将两者解耦合了

使用匿名内部类,重写run方法

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        // 此处的 t 是 Thread 子类的实例
        Thread t = new Thread() {
            // 此处定义新的属性和方法
            @Override
            public void run() {
                while (true) {
                    System.out.println("Hello Thread");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        
        t.start();
        while (true) {
            System.out.println("Hello main");
            Thread.sleep(1000);
        }
    }
}

适用场景:

  1. 任务只需要使用一次
  2. 代码逻辑简单

使用匿名内部类,基于Runnable

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.println("Hello Thread");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        
        t.start();
        while (true) {
            System.out.println("Hello main");
            Thread.sleep(1000);
        }
    }
}

基于Lambda表达式(匿名内部类)

public class Demo {
    public static void main(String[] args) throws InterruptedException {
    
        Thread t = new Thread(() -> {
            while (true) {
                System.out.println("Hello Thread");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t.start();
        while (true) {
            System.out.println("Hello main");
            Thread.sleep(1000);
        }
    }
}

二、Thread的构造方法和常见属性

构造方法

方法说明
Thread()创建线程对象
Thread(Runnable target)使用Runnable对象创建线程对象
Thread(String name)创建线程对象,并命名
Thread(Runnable target,String name)使用Runnable对象创建线程对象,并命名

常见属性

属性获取方法
IDgetId()
名称getName()
状态getState()
优先级getPriority()
是否后台进程isDaemon()
是否存活isAlive()
是否被中断isInterrupted()

注:此处的ID是JVM分配的,和系统分配的ID是不同内容。Java中无法看到操作系统分配的线程ID。

isDaemon() 演示

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            while (true) {
                System.out.println("Hello Thread");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, "这是一个线程");
        // true -> 成为后台线程
        // 由于此时只有一个前台线程,main线程结束了,进程就结束了
        t.setDaemon(true); 
        t.start();
    }
}

// Hello Thread
//
// Process finished with exit code 0

isAlive() 演示

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
           for (int i = 0; i < 3; i++) {
               System.out.println("Hello Thread");
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });

        System.out.println(t.isAlive()); // false

        t.start();

        Thread.sleep(1000);

        System.out.println(t.isAlive()); // true

        Thread.sleep(3000);

        System.out.println(t.isAlive()); // false
    }
}

三、线程的启动、终止、等待、状态

线程的启动

针对一个Thread对象,只能调用一次start()方法,多次调用会抛出异常。

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            System.out.println("Hello Thread");
        });

        t.start();

        System.out.println("线程第一次启动");

        t.start();

        System.out.println("线程第二次启动");
    }
}

// 线程第一次启动
// 线程第二次启动
// Hello Thread
// Exception in thread "main" java.lang.IllegalThreadStateException
//	at java.base/java.lang.Thread.start(Thread.java:802)
//	at Test.main(Test.java:13)
//
// Process finished with exit code 1

线程的终止

Java中没有“强制终止”操作,让一个线程结束的核心方式,是让线程的入口方法能够执行完。

使用标志位

import java.util.Scanner;

public class Demo {

    private static boolean running = true;

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
           while (running) {
               System.out.println("Hello Thread");
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
            System.out.println("end");
        });
        t.start();
        // 当用户输入后,线程终止
        Scanner scanner = new Scanner(System.in);
        System.out.println("输入任意内容,让t线程终止");
        scanner.next();
        running = false;
    }
}

// 输入任意内容,让t线程终止
// Hello Thread
// Hello Thread
// Hello Thread
// 123
// end
// 
// Process finished with exit code 0

说明:

  1. 将变量写为成员变量,本质是Lambda作为匿名内部类访问外部类。如果将成员变量改为局部变量,要保证变量running不被修改(事实final)。
  2. 如果设置Thread.sleep(10000);,此时结束操作不能及时处理,存在延迟。

使用interrupt方法

import java.util.Scanner;

public class Demo {
   public static void main(String[] args) {
       Thread t = new Thread(() -> {
           while (!Thread.currentThread().isInterrupted()) { // 注意:此处是 !isInterrupted()
               System.out.println("Hello Thread");
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
           System.out.println("线程 t 终止");
       });
       t.start();
       Scanner sc = new Scanner(System.in);
       sc.next();
       t.interrupt();
   }
}

// Hello Thread
// Hello Thread
// ...
// 1
// Exception in thread "Thread-0" java.lang.RuntimeException:
// java.lang.InterruptedException: sleep interrupted
//	 at Test.lambda$main$0(Test.java:11)
//	 at java.base/java.lang.Thread.run(Thread.java:842)
// Caused by: java.lang.InterruptedException: sleep interrupted
//	 at java.base/java.lang.Thread.sleep(Native Method)
//	 at Test.lambda$main$0(Test.java:9)
//	 ... 1 more
// 
// Process finished with exit code 0

说明:

  1. t.interrupt()代表触发线程终止操作,内置了标志位,无需手动定义。
  2. Thread.currentThread().isInterrupted()中,isInterrupted()方法可以获取当前线程是否被标记为中断,Thread.currentThread()指代当前执行该代码的线程引用。
  3. try-catch中的throw new RuntimeException(e);:当t线程处于阻塞状态(如sleep())时,调用interrupt()方法会唤醒线程,并触发InterruptedException异常。
  4. 由于Thread.sleep()占据大部分执行时间,因此几乎不会输出"线程 t 终止"这个结果。
  5. 实际开发中,catch代码块可以进行以下操作:重试任务、记录错误日志、触发监控报警。

线程的等待

当需要一个线程的执行结果供另一个线程使用时,需要让后者等待前者执行完毕,join()方法可以实现该需求。

方法说明
public void join()等待线程结束(无超时)
public void join(long millis)等待线程结束,最多millis毫秒
public void join(long millis, int nanos)同上,提高时间精度

基础演示

public class Demo {
    private static int result = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 1000; i++) {
                sum += i;
            }
            result = sum;
        });
        t.start();
        t.join(); // t 线程是被等待的对象,主线程阻塞等待t线程执行完毕
        System.out.println(result);
    }
}

// 499500
//
// Process finished with exit code 0

反向等待演示

任何一个线程都可以被其他线程等待,以下示例是子线程等待主线程执行完毕:

public class Demo {
    private static int result = 0;

    public static void main(String[] args) {
        Thread mainThread = Thread.currentThread();

        Thread t = new Thread(() -> {
            try {
                mainThread.join();
                System.out.println(result);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        t.start();

        int sum = 0;
        for (int i = 0; i < 1000; i++) {
            sum += i;
        }
        result = sum;
    }
}

// 499500
//
// Process finished with exit code 0

线程的状态

通过t.getState()方法可以获取线程的当前状态,Java线程共有6种状态:

  1. NEW:Thread对象已创建,但还未调用start()方法启动线程。
  2. RUNNABLE:就绪状态,线程正在被CPU执行,或随时可以被CPU调度执行。
  3. TERMINATED:线程的入口方法执行完毕,系统中的对应线程已销毁(Thread对象仍存在)。
  4. WAITING:无固定期限的阻塞,通常由join()(无参)、wait()(无参)引起。
  5. TIMED_WAITING:有固定期限的阻塞,通常由sleep()join(long millis)wait(long millis)引起。
  6. BLOCKED:由于锁竞争(如synchronized)引起的阻塞。

四、synchronized 关键字

synchronized的核心作用是保证同一时间只有一个线程能执行被它修饰的代码块/方法,从而避免多线程并发修改共享资源导致的数据不一致问题,它也被称为“内置锁”或“对象锁”。

用法一:修饰代码块

线程安全问题演示

以下代码中,两个线程同时对count进行自增操作,最终结果不符合预期且每次运行结果不同:

public class Demo {
    public static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                count++;
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                count++;
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count);
    }
}

// 154448 (不符合预期,且每次结果都不同)
//
// Process finished with exit code 0

问题原因

  1. 线程调度是随机的,一个线程执行任何一个指令都可能被CPU调度切换。
  2. 多个线程同时操作同一个共享变量。
  3. 针对变量的修改操作(count++)不是“原子操作”(分为读取、加1、写入三步)。
  4. 内存可见性引起的线程安全问题。
  5. 指令重排序引起的线程安全问题。

解决方法:使用synchronized代码块

synchronized (锁对象) { // 加锁
    // 需要保证线程安全的代码(操作共享变量)
} // 解锁

说明:

  1. 锁对象可以是任意对象(ObjectStringArrayList等)。
  2. 常用初始化锁对象的方式:Object locker = new Object();
  3. 当一个线程对一个锁对象加锁时,另一个线程尝试对同一个锁对象加锁会发生锁竞争,此时后者会进入BLOCKED状态阻塞等待,直到锁对象被解锁。

修正后的代码

public class Demo {
    public static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Object locker = new Object();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                synchronized (locker) {
                    count++;
                }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                synchronized (locker) {
                    count++;
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count);
    }
}

// 20000
//
// Process finished with exit code 0

用法二:修饰方法

当某个方法的执行需要保证线程安全时,可以直接使用synchronized修饰该方法。

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                counter.add3();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                counter.add3();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(counter.getCount());
    }
}

class Counter {
    private int count = 0;

    // 手动加锁(this作为锁对象)
    public void add() {
        synchronized (this) {
            count++;
        }
    }

    // 手动加锁(等价于add())
    public void add2() {
        synchronized (this) {
            count++;
        }
    }

    // synchronized 修饰实例方法,等价于add2(),锁对象为this
    synchronized public void add3() {
        count++;
    }

    public int getCount() {
        return count;
    }
}

// 100000
// 
// Process finished with exit code 0

相关特性

  1. 互斥性synchronized使用的锁存储在对象头中,同一时间只有一个线程能获取该锁,保证代码的互斥执行。
  2. 可重入性synchronized对同一个线程是可重入的,不会出现死锁问题(即同一线程对同一把锁连续加锁两次不会阻塞自己)。

五、volatile 关键字

首先说明:死锁是指两个或多个线程在执行过程中,因互相持有对方需要的锁,且都不肯释放自己持有的资源,导致所有线程都陷入永久阻塞的状态,程序无法继续推进。

相关核心场景

  1. 一个线程被多次加锁(被synchronized的可重入性解决)。
  2. 两个线程两把锁:线程1持有锁A并请求锁B,线程2持有锁B并请求锁A。
  3. M个线程N把锁:经典的“哲学家就餐问题”。
  4. 内存可见性引起的线程安全问题:编译器会对代码进行优化,当CPU发现某个内存读取操作频繁时,会将数据缓存到寄存器/高速缓存中,后续读取直接从缓存获取,不再访问主存。此时其他线程修改主存中的数据,当前线程无法感知,从而产生BUG。

解决内存可见性问题:volatile

volatile关键字可以保证变量的内存可见性,禁止编译器对该变量的读写操作进行优化(强制每次读取都从主存获取,每次写入都刷新到主存),同时也能禁止指令重排序。

import java.util.Scanner;

public class Demo {
    // 解决方法:增加volatile关键字,保证内存可见性
    private static volatile boolean flag = true;

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            while (flag) {
                // 无逻辑,仅循环判断flag
            }
            System.out.println("t1结束");
        });

        Thread t2 = new Thread( () -> {
            System.out.println("请输入任意内容,终止线程t1");
            Scanner scanner = new Scanner(System.in);
            scanner.next();
            flag = false;
            System.out.println("flag = " + flag);
        });
        t1.start();
        t2.start();
    }
}

// 请输入任意内容,终止线程t1
// 123
// t1结束
// flag = false
//
// Process finished with exit code 0

死锁发生的四个必要条件

  1. 互斥性:锁的资源只能被一个线程持有(synchronized具备该特性)。
  2. 不可抢占性:线程持有的锁不能被其他线程强制抢夺,只能由持有者主动释放(synchronized具备该特性)。
  3. 请求与保持性:一个线程持有某个锁后,还能继续请求其他锁。
  4. 循环等待性:多个线程形成环形的锁请求链,每个线程都在等待下一个线程持有的锁。

注:只要打破其中任意一个条件,就能避免死锁,最容易实现的是打破“循环等待性”(统一锁的请求顺序)。

六、wait 和 notify 方法

wait()notify()都属于Object类的方法,意味着所有Java对象都自带这两个方法。它们必须在synchronized同步代码块/方法中使用,否则会抛出IllegalMonitorStateException异常。

核心方法说明

方法作用
wait()让当前线程释放持有的对象锁,进入该对象的等待队列并阻塞,直到被唤醒或中断。
wait(long timeout)限定超时时间,超时后自动唤醒。
notify()唤醒该对象等待队列中的任意一个等待线程。
notifyAll()唤醒该对象等待队列中的所有线程。

wait() 执行流程

线程执行wait()方法时,会完成三件事(原子操作):

  1. 释放当前持有的该对象的锁。
  2. 进入该对象的等待队列,阻塞等待。
  3. notify()/notifyAll()唤醒(或超时、被中断)后,重新竞争获取该对象的锁,获取成功后继续执行后续代码。

演示代码

import java.util.Scanner;

public class Demo {
    public static void main(String[] args){
        Object locker = new Object();
        Thread t1 = new Thread(() -> {
            System.out.println("wait 之前");
            synchronized (locker) {
                try {
                    // 锁对象和调用wait()的对象必须是同一个
                    locker.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println("wait 之后");
        });

        Thread t2 = new Thread( () -> {
            System.out.println("输入任意内容");
            Scanner scanner = new Scanner(System.in);
            scanner.next();
            synchronized (locker) {
                locker.notify();
            }
        });

        t1.start();
        t2.start();
    }
}

// wait 之前
// 输入任意内容
// 123
// wait 之后
// 
// Process finished with exit code 0

wait()sleep() 的区别

  1. 所属类不同:wait()属于Object类,sleep()属于Thread类。
  2. 唤醒方式不同:wait()可以通过notify()/notifyAll()主动唤醒,也可以通过interrupt()中断;sleep()只能通过interrupt()中断,或等待超时自动唤醒。
  3. 锁相关不同:wait()必须搭配锁使用,且会释放锁;sleep()不需要搭配锁使用,也不会释放任何锁。
  4. 用途不同:wait()用于线程间的通信协作,sleep()用于线程的延时执行。

七、多线程案例

单例模式

单例模式是一种创建型设计模式,核心目标是:保证一个类在整个程序运行期间,只能被实例化一次,并且提供一个全局唯一的访问点。有些场景下(如配置管理、连接池)不能出现多个实例,主要通过static修饰变量和方法实现(类方法/类属性在一个Java进程中是单例的)。

饿汉单例模式

Java进程启动时,就创建唯一实例,天然具备线程安全(类加载是线程安全的)。

public class Demo {
    public static void main(String[] args) {
        Singleton s1 = Singleton.getInstance();
        Singleton s2 = Singleton.getInstance();
        System.out.println(s1 == s2);
    }
}

// 这个类只能创建一个实例
class Singleton {

    // 由于被static修饰,这个实例只有一个(类加载时初始化)
    private static Singleton instance = new Singleton();

    // 通过private修饰构造方法,防止外部通过new创建多个实例
    private Singleton() {
    }

    // 想要获取instance对象,通过该全局访问方法获取
    public static Singleton getInstance() {
        return instance;
    }
}

// true
// 
// Process finished with exit code 0

懒汉单例模式

仅当调用getInstance()方法时,才创建唯一实例,懒加载节省资源,但多线程环境下存在线程安全问题,需要双重检查锁定(DCL)优化。

public class Demo {
    public static void main(String[] args) {
        Singleton s1 = Singleton.getInstance();
        Singleton s2 = Singleton.getInstance();
        System.out.println(s1 == s2);
    }
}

class Singleton {
    // 使用volatile解决指令重排序问题,保证实例初始化完成后再被引用
    private static volatile Singleton instance = null; 

    // 私有构造方法,禁止外部实例化
    private Singleton() {
    }

    // 双重检查锁定(DCL),保证线程安全且高效
    public static Singleton getInstance() {
        // 第一层检查:避免非必要的锁竞争,提高性能
        if (instance == null) {
            // 加锁:保证多线程下只有一个线程能进入创建实例
            synchronized (Singleton.class) {
                // 第二层检查:防止多个线程等待锁后重复创建实例
                if (instance == null) {
                    // 此处可能发生指令重排序,需要volatile禁止
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

// true

说明:instance = new Singleton()分为三步(分配内存、初始化实例、引用指向内存),指令重排序可能导致第二步和第三步颠倒,volatile可以禁止该重排序,避免其他线程获取到未初始化完成的实例。

阻塞队列

阻塞队列是线程安全的先进先出(FIFO)数据结构,专为多线程场景设计,核心用于实现“生产者-消费者模型”,能减少锁竞争、解耦生产者和消费者、缓冲突发请求,代价是一定的性能损耗和运维成本。Java标准库中提供了BlockingQueue接口及相关实现类。

核心特性

  1. 队列为满时,生产者继续入队会阻塞,直到队列不满。
  2. 队列为空时,消费者继续出队会阻塞,直到队列不空。

BlockingQueue 的创建

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Demo {
    public static void main(String[] args) {
        BlockingQueue<String> q1 = new ArrayBlockingQueue<>(10); // 基于数组,固定容量,不可扩容
        BlockingQueue<String> q2 = new LinkedBlockingQueue<>(10); // 基于链表,可指定容量(默认无界)
        BlockingQueue<String> q3 = new PriorityBlockingQueue<>(10); // 基于优先级队列,支持优先级排序
    }
}

BlockingQueue 的使用

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> q = new ArrayBlockingQueue<>(1);
        q.put("1"); // 入队列,队列满则阻塞
        String ret = q.take(); // 出队列,队列空则阻塞
        System.out.println(ret);
    }
}

// 1

生产者-消费者模型的实现

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        // 生产者线程:生产元素
        Thread producer = new Thread(() -> {
            try {
                int count = 0;
                while (true) {
                    queue.put(count + "");
                    System.out.println("生产元素:" + count);
                    count++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程:消费元素
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    System.out.println("消费元素:" + queue.take());
                    Thread.sleep(1000); // 模拟消费耗时
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

阻塞队列的模拟实现

class MyBlockingQueue {
    private String[] data = null;
    private int head = 0; // 队头指针(出队)
    private int tail = 0; // 队尾指针(入队)
    private int size = 0; // 当前队列元素个数
    private Object locker = new Object(); // 锁对象

    // 构造方法:初始化队列容量
    public MyBlockingQueue(int capacity) {
        data = new String[capacity];
    }

    // 入队方法:队列满则阻塞
    public void put(String elem) throws InterruptedException {
        synchronized (locker) {
            // 循环判断:避免虚假唤醒
            while (size == data.length) {
                locker.wait();
            }
            // 入队操作
            data[tail] = elem;
            tail = (tail + 1) % data.length; // 循环队列,避免数组越界
            size++;
            // 唤醒等待的消费者线程
            locker.notify();
        }
    }

    // 出队方法:队列空则阻塞
    public String take() throws InterruptedException {
        synchronized (locker) {
            // 循环判断:避免虚假唤醒
            while (size == 0) {
                locker.wait();
            }
            // 出队操作
            String ret = data[head];
            head = (head + 1) % data.length; // 循环队列,避免数组越界
            size--;
            // 唤醒等待的生产者线程
            locker.notify();
            return ret;
        }
    }
}

线程池

线程池是管理一组预先创建好的线程的“池子”,核心作用是降低频繁创建/销毁线程的资源消耗、提高任务响应速度、便于统一管理线程资源,避免无限制创建线程导致系统崩溃。

ThreadPoolExecutor 的构造方法

Java标准库中提供了java.util.concurrent.ThreadPoolExecutor,是线程池的核心实现类,其构造方法参数如下:

ThreadPoolExecutor(int corePoolSize, 
                   int maximumPoolSize, 
                   long keepAliveTime, 
                   TimeUnit unit, 
                   BlockingQueue<Runnable> workQueue, 
                   ThreadFactory threadFactory, 
                   RejectedExecutionHandler handler)

参数说明:

  1. corePoolSize:核心线程数(长期存活,不会随空闲时间释放)。
  2. maximumPoolSize:最大线程数(核心线程 + 非核心线程,非核心线程空闲超时后会被释放)。
  3. keepAliveTime + unit:非核心线程的保活时间及时间单位。
  4. workQueue:线程池的任务阻塞队列,用于存放等待执行的任务。
  5. threadFactory:线程工厂,用于创建线程(可自定义线程名称、优先级等)。
  6. handler:拒绝策略(队列满且达到最大线程数时,处理新任务的策略):
    • ThreadPoolExecutor.AbortPolicy:直接抛出异常(默认)。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用者线程执行任务。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最老的任务。
    • ThreadPoolExecutor.DiscardPolicy:丢弃最新的任务(静默丢弃,无异常)。

简化创建:Executors 工具类

Java提供了Executors工具类,封装了ThreadPoolExecutor,简化线程池的创建:

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

class Demo {
    public static void main(String[] args) {
        // 创建固定数量核心线程的线程池(无非核心线程)
        ExecutorService fixedPool = Executors.newFixedThreadPool(10);

        // 创建可缓存的线程池(核心线程数为0,非核心线程无上限,空闲1分钟释放)
        ExecutorService cachedPool = Executors.newCachedThreadPool();

        // 创建只有一个核心线程的线程池(单线程执行所有任务,保证任务顺序执行)
        ExecutorService singlePool = Executors.newSingleThreadExecutor();

        // 创建带有定时器功能的线程池(支持延迟执行、周期性执行任务)
        ExecutorService scheduledPool = Executors.newScheduledThreadPool(10);
    }
}

注:实际生产环境中,不推荐使用Executors创建线程池(容易出现OOM),建议直接使用ThreadPoolExecutor自定义参数。

ThreadPoolExecutor 的使用

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) {
        // 创建固定10个线程的线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        // 提交100个任务给线程池
        for (int i = 0; i < 100; i++) {
            // 匿名内部类/Lambda捕获的变量必须是事实final
            int id = i;
            service.submit(() -> {
                System.out.println("第" + id + "个任务:" + Thread.currentThread().getName());
            });
        }
        // 关闭线程池(不再接受新任务,等待已提交任务执行完毕)
        service.shutdown();
    }
}

线程池的模拟实现

核心流程:线程池初始化时创建指定数量的工作线程,工作线程循环从阻塞队列中获取任务并执行;用户通过submit()方法提交任务,实质是将任务放入阻塞队列。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class MyThreadPoolExecutor {
    // 任务阻塞队列
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    // 构造方法:创建并启动指定数量的工作线程
    public MyThreadPoolExecutor(int nThread) {
        for (int i = 0; i < nThread; i++) {
            Thread t = new Thread(() -> {
                try {
                    // 循环获取任务,永不退出(简化版)
                    while (true) {
                        Runnable runnable = queue.take();
                        runnable.run(); // 执行任务
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            t.start();
        }
    }

    // 提交任务:将任务放入阻塞队列
    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }
}

public class Demo {
    public static void main(String[] args) {
        // 创建包含10个工作线程的线程池
        MyThreadPoolExecutor service = new MyThreadPoolExecutor(10);
        try {
            // 提交100个任务
            for (int i = 0; i < 100; i++) {
                int id = i;
                service.submit(() -> {
                    System.out.println("第" + id + "个任务:" + Thread.currentThread().getName());
                });
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

定时器

定时器用于在指定延迟后执行任务,或周期性执行任务,Java标准库中提供了Timer类(简单场景)和ScheduledThreadPoolExecutor(多线程场景,推荐)。

Timer 的使用

import java.util.Timer;
import java.util.TimerTask;

public class Demo {
    public static void main(String[] args) {
        Timer timer = new Timer();
        // 延迟3000毫秒执行任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("Hello 1");
            }
        }, 3000);
        // 延迟2000毫秒执行任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("Hello 2");
            }
        }, 2000);
        // 延迟1000毫秒执行任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("Hello 3");
            }
        }, 1000);
    }
}

// Hello 3
// Hello 2
// Hello 1
// 说明:Timer会创建一个前台线程,即使主线程执行完毕,进程也会等待Timer任务执行完成后退出。

Timer 的模拟实现

核心思路:使用优先级队列存放定时任务(按执行时间排序),一个工作线程循环获取最早需要执行的任务,若未到执行时间则阻塞等待,到达时间则执行任务。

import java.util.PriorityQueue;

public class MyTimer {
    // 优先级队列:存放定时任务,按执行时间排序(小顶堆)
    private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
    // 锁对象:用于线程间通信和同步
    private Object locker = new Object();

    // 构造方法:启动工作线程
    public MyTimer() {
        Thread t = new Thread(() -> {
            try {
                while (true) {
                    synchronized (locker) {
                        // 队列为空时,阻塞等待
                        while (queue.isEmpty()) {
                            locker.wait();
                        }
                        // 获取最早需要执行的任务
                        MyTimerTask task = queue.peek();
                        long currentTime = System.currentTimeMillis();
                        // 判断是否到达执行时间
                        if (currentTime >= task.getTime()) {
                            // 执行任务并从队列中移除
                            task.run();
                            queue.poll();
                        } else {
                            // 未到执行时间,阻塞等待时间差(避免忙等)
                            locker.wait(task.getTime() - currentTime);
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();
    }

    // 提交定时任务:延迟delay毫秒后执行
    public void schedule(Runnable task, long delay) {
        synchronized (locker) {
            MyTimerTask myTimerTask = new MyTimerTask(task, delay);
            queue.offer(myTimerTask);
            // 唤醒工作线程(可能有新的更早执行的任务)
            locker.notify();
        }
    }
}

// 定时任务类:实现Comparable接口,支持优先级排序
class MyTimerTask implements Comparable<MyTimerTask> {
    private Runnable runnable; // 待执行的任务
    private long time; // 任务的执行时间(绝对时间,毫秒)

    // 构造方法:初始化任务和执行时间
    public MyTimerTask(Runnable runnable, long delay) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }

    // 执行任务
    public void run() {
        runnable.run();
    }

    // 获取任务执行时间
    public long getTime() {
        return time;
    }

    // 按执行时间排序(小顶堆)
    @Override
    public int compareTo(MyTimerTask o) {
        return (int) (this.time - o.time);
    }
}
数据结构算法 2025-10-03
多线程(2) 2026-01-20

评论区