Java线程间的通信机制

本文最后更新于 2024年11月5日

当需要多个线程共同完成一件任务,而且需要有规律的执行,那么多个线程之间需要一定的通信机制,可以协调他们的工作,以此实现多线程共同操作一份数据。

1.等待唤醒机制

这是一种线程间的协作机制,与争夺锁的竞争机制相对应,当一个线程满足某个条件时,就进入等待状态( wait/wait(m) ),等到其他线程执行完指定的代码后,再将其唤醒,或者可以指定时间,到时间了自动唤醒,有多个线程等待时,如果有需要,可以notifyAll()唤醒所有等待的线程,wait/notify就是一种线程间的协助机制。

wait()notify()notifyAll()都是java.lang.Object中的方法,这说明在Java语言的设计中,任何对象都能充当同步监视器(锁)

1.1 wait

作用:使当前线程等待,直到其他线程调用相同对象的notify()notifyAll()方法,或者线程被中断。
使用场景:当一个线程需要等待某个条件发生时,比如资源可用、状态改变等。
条件:调用wait()方法时,当前线程必须持有该对象的监视器锁(synchronized),否则会抛出 IllegalMonitorStateException 异常。

synchronized (lock) {
    while (condition) {
        lock.wait(); // 释放锁并等待
    }
    // 条件满足后的操作
}

1.2 notify

作用:唤醒在该对象上等待的一个线程。如果有多个线程在等待,则随机选择一个线程唤醒。
使用场景:在某个条件被满足时,通知一个等待的线程继续执行。

synchronized (lock) {
    // 修改条件
    lock.notify(); // 唤醒一个等待线程
}

1.3 notifyAll

作用:唤醒在该对象上等待的所有线程。
使用场景:当条件变化可能影响所有等待线程时,使用 notifyAll() 确保所有线程都有机会重新检查条件。

synchronized (lock) {
    // 修改条件
    lock.notifyAll(); // 唤醒所有等待线程
}

1.4 例:两个线程交替累加

第1次执行:假设线程1先抢到锁,进去后会先唤醒了线程2,但是锁还是线程1持有的,线程2只能等待,线程1将变量i加1变成1,wait释放锁进入等待唤醒状态。

第2次执行:线程2拿到锁后同样先唤醒了线程1,但是现在锁是线程2持有,线程1无法执行,线程2将变量i加1变成2后,wait释放锁进入等待唤醒状态。

第3次执行:线程1拿到锁还是先唤醒线程2然后将变量i加1变成3,然后wait释放锁。

……

第99次执行:线程1拿到锁还是先唤醒线程2然后执行加1将变量i变成99,然后wait释放锁。

第100次执行:线程2进来先唤醒了线程1,然后将i加到100后wait释放锁,此时循环加到100就已经完成,但是线程的执行还没有结束,被唤醒的线程1继续执行,先唤醒了线程2然后进入if判断,但是这次已经不能再加了,所以线程1没有wait就退出了while循环,离开synchronized块后自动失去锁,线程2随后自然得到锁进入synchronized块,同样判断已经不能再加了后随即也跳出while循环,线程1和线程2就都在就绪状态自然结束了,随后JVM进程退出。

public class TestAdd {

    static int i = 0;

    public static void main(String[] args) {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    synchronized (this){
                        this.notify();
                        if (i < 100) {
                            System.out.println(Thread.currentThread().getName() + "---" + ++i);
                        }
                        else {
                            break;
                        }

                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }

                }

            }
        };

        new Thread(runnable).start();
        new Thread(runnable).start();

    }
    
}

1.5 wait退出时检查

lock.wait()阻塞了的线程,一旦被唤醒,或超时时,会从wait()方法下面的代码继续向下执行,即直接跳出if执行下面的代码,需要注意的是此时已经醒来的线程并不会再次判断if中的条件是否满足而是跳出if直接向下执行,这样就会遇到一个问题,线程阻塞期间,其他线程进行的一些操作可能造成条件改变,不能满足if中的条件了,如果不加以二次判断就继续执行,就可能导致程序出错。

synchronized (lock) {

    if (条件不满足) {
        lock.wait();
    }

    //继续执行后面操作
}

Java给出的解决办法是:让wait总是出现在循环中,使用while去判断wait的条件而不是if,当使用while时,线程被唤醒后,不会继续跳出while块向下执行,而是会再判断一次while中的逻辑,如果条件不满足会继续wait,直到条件满足跳出while。

synchronized (lock) {

    while (条件不满足) {
        lock.wait();
    }

    //继续执行后面操作
}

例1:如果使用if,wait中的线程被唤醒时,不会再次判断if中的条件

public class SimpleWakeupDemo {

    private volatile static boolean flag = true; 

    private static final Object lock = new Object();

    public static boolean condition() {
        System.out.println(Thread.currentThread().getName() + "判断flag = " + flag);
        return flag;
    }

    public static void main(String[] args) {

        Thread t1 = new Thread() {
            @Override
            public void run() {
                synchronized (lock) {

                    if (condition()) {
                        try {
                            System.out.println(Thread.currentThread().getName() +" before wait");
                            lock.wait();
                            System.out.println(Thread.currentThread().getName() +" after wait");
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    }

                    System.out.println(Thread.currentThread().getName() + "打印flag = " + flag);


                }
            }
        };

        Thread t2 = new Thread() {
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        Thread.sleep(900);

                        lock.notifyAll();

                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                }
            }
        };

        t1.start();
        t2.start();

    }

}
Thread-0判断flag = true
Thread-0 before wait
Thread-0 after wait
Thread-0打印flag = true

如果改成while,可以看到wait结束后,会再次判断条件是否成立

while (condition()) {
    try {
        System.out.println(Thread.currentThread().getName() +" before wait");
        lock.wait();
        System.out.println(Thread.currentThread().getName() +" after wait");
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

}
Thread-0判断flag = true
Thread-0 before wait
Thread-0 after wait
Thread-0判断flag = true
Thread-0 before wait

例2:生产消费模型:两个线程操作同一变量,一个判断变量为0就加1变成1,另一个判断变量为1就减1变成0

public class SimpleWakeupDemo {

    private volatile static int product = 0; 

    private static final Object lock = new Object();

    public static void main(String[] args) {

        Runnable consumer = new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {

                    while (product < 1) {
                        try {
                            System.out.println(Thread.currentThread().getName() + "不足");
                            System.out.println("before wait");
                            lock.wait();
                            System.out.println("after wait");
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    }

                    product --;
                    System.out.println(Thread.currentThread().getName() + " = " + product);

                    lock.notifyAll();

                }
            }
        };

        Runnable productor = new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        Thread.sleep(900);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                    while (!(product < 1)) {
                        try {
                            System.out.println(Thread.currentThread().getName() + "已满");
                            System.out.println("before wait");
                            lock.wait();
                            System.out.println("after wait");
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    }

                    product ++;
                    System.out.println(Thread.currentThread().getName() + " = " + product);

                    lock.notifyAll();
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            new Thread(productor, "productor-"+i).start();
            new Thread(consumer, "consumer-"+i).start();
        }

    }


}

当使用while时,操作后的数总是0或1

productor-0 = 1
productor-3已满
before wait
consumer-8 = 0
productor-9 = 1
productor-8已满
before wait
consumer-1 = 0
productor-7 = 1
productor-6已满
before wait
consumer-6 = 0
consumer-5不足
before wait
productor-1 = 1
consumer-4 = 0
productor-4 = 1
consumer-3 = 0
productor-2 = 1
consumer-9 = 0
consumer-2不足
before wait
consumer-7不足
before wait
productor-5 = 1
consumer-0 = 0
after wait
consumer-7不足
before wait
after wait
consumer-2不足
before wait
after wait
consumer-5不足
before wait
after wait
productor-6 = 1
after wait
productor-8已满
before wait
after wait
productor-3已满
before wait
after wait
consumer-5 = 0
after wait
consumer-2不足
before wait
after wait
consumer-7不足
before wait
after wait
productor-3 = 1
after wait
productor-8已满
before wait
after wait
consumer-7 = 0
after wait
consumer-2不足
before wait
after wait
productor-8 = 1
after wait
consumer-2 = 0

如果换成if,就会错误的出现其他的数字

productor-0 = 1
productor-2已满
before wait
productor-9已满
before wait
consumer-8 = 0
productor-7 = 1
productor-8已满
before wait
consumer-7 = 0
consumer-6不足
before wait
consumer-4不足
before wait
productor-6 = 1
consumer-5 = 0
productor-5 = 1
productor-4已满
before wait
consumer-3 = 0
productor-1 = 1
productor-3已满
before wait
consumer-2 = 0
consumer-9不足
before wait
consumer-1不足
before wait
consumer-0不足
before wait
after wait
productor-3 = 1
after wait
productor-4 = 2
after wait
consumer-4 = 1
after wait
consumer-6 = 0
after wait
productor-8 = 1
after wait
productor-9 = 2
after wait
productor-2 = 3
after wait
consumer-0 = 2
after wait
consumer-1 = 1
after wait
consumer-9 = 0

1.6 小总结

线程互相交替执行的过程可以简记为:1判断,2干活,3通知

synchronized (Object) {

    while (....) { //判断
        try {
            Object.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    //干活
    //.........

    Object.notifyAll(); //通知

}   

2.线程连接:join方法

主执行中插入其他线程m1,主线程立刻被阻塞,直到插入的线程m1执行完成。

public static void main(String s[]) {

    Thread m1 = new Thread() {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(this.getName() +" = " + i);
            }
        }
    };

    m1.setName("m1");
    m1.start();

    for (int i = 0; i < 100; i++) {

        /**i加到20的时候,插入子线程,子线程执行完了(消亡),主线程再继续*/

        if (i == 20) {
            try {
                m1.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() +" = " + i);
    }


}

3.volatile

volatile是一种轻量级的线程通信机制,具体见:volatile作用分析


"如果文章对您有帮助,可以请作者喝杯咖啡吗?"

微信二维码

微信支付

支付宝二维码

支付宝


Java线程间的通信机制
https://blog.liuzijian.com/post/10e497bb-feb5-f9fb-a256-428f0041960e.html
作者
Liu Zijian
发布于
2022年5月1日
更新于
2024年11月5日
许可协议