如何在Java中實現線程間通信?

參考

  • How To Implement Inter-thread Communication In Java

如何在Java中實現線程間通信?

儘管通常每個子線程只需要完成自己的任務,但是有時我們可能希望多個線程協同工作來完成一項任務,這涉及線程間的通信。

本文介紹的方法和類是:thread.join(),object.wait(),object.notify(),CountdownLatch,CyclicBarrier,FutureTask,Callable等。

這 是本文涵蓋的代碼

我將使用幾個示例來說明如何在Java中實現線程間通信。

  • 如何使兩個線程依次執行?
  • 如何使兩個線程以指定的方式有序相交?
  • 有四個線程:A,B,C和D(在A,B和C都完成執行並且A,B和C必須同步執行之前,不會執行D)。
  • 三名運動員準備分開,然後在他們各自準備就緒後同時開始跑步。
  • 子線程完成任務後,它將結果返回給主線程。

如何使兩個線程依次執行?

假設有兩個線程:線程A和線程B。這兩個線程都可以依次打印三個數字(1-3)。讓我們看一下代碼:

<code>private static void demo1() {
    Thread A = new Thread(new Runnable() {
        @Override
        public void run() {
            printNumber("A");
        }
    });
    Thread B = new Thread(new Runnable() {
        @Override
        public void run() {
            printNumber("B");
        }
    });
    A.start();
    B.start();
}/<code>

printNumber(String)的實現如下,它用於按順序打印三個數字1、2和3:

<code>private static void printNumber(String threadName) {
    int i=0;
    while (i++ < 3) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(threadName + "print:" + i);
    }
}/<code>

我們得到的結果是:

<code>B print: 1
A print: 1
B print: 2
A print: 2
B print: 3
A print: 3
/<code>

您可以看到A和B同時打印數字。
那麼,如果我們希望B在A打印完之後開始打印呢?我們可以使用thread.join()方法,代碼如下:

<code>private static void demo2() {
    Thread A = new Thread(new Runnable() {
        @Override
        public void run() {
            printNumber("A");
        }
    });
    Thread B = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("B starts waiting for A");
            try {
                A.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printNumber("B");
        }
    });
    B.start();
    A.start();
}/<code>

現在獲得的結果是:

<code>B starts waiting for A
A print: 1
A print: 2
A print: 3
 
B print: 1
B print: 2
B print: 3
/<code>

因此,我們可以看到A.join()方法將使B等待直到A完成打印。

如何使兩個線程以指定的方式有序相交?

那麼,如果現在我們希望B在A打印完1之後立即開始打印1,2,3,然後A繼續打印2,3呢?顯然,我們需要更多細粒度的鎖來控制執行順序。

在這裡,我們可以利用object.wait()和object.notify()方法的優勢。代碼如下:

<code>/**
 * A 1, B 1, B 2, B 3, A 2, A 3
 */
private static void demo3() {
    Object lock = new Object();
    Thread A = new Thread(new Runnable() {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("A 1");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("A 2");
                System.out.println("A 3");
            }
        }
    });
    Thread B = new Thread(new Runnable() {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("B 1");
                System.out.println("B 2");
                System.out.println("B 3");
                lock.notify();
            }
        }
    });
    A.start();
    B.start();
}/<code>

結果如下:

<code>A 1
A waiting…
 
B 1
B 2
B 3
A 2
A 3
/<code>

那就是我們想要的。

做了什麼?

首先,我們創建一個由A和B共享的對象鎖:lock = new Object();當A獲得鎖時,它首先打印1,然後調用lock.wait()方法,該方法將使其進入等待狀態,然後移交對鎖的控制。直到A調用lock.wait()方法釋放控制並且B獲得鎖,B才會執行。B在獲得鎖後打印1,2,3,然後調用lock.notify()方法喚醒正在等待的A;喚醒後,A將繼續打印其餘的2、3。

我將日誌添加到上述代碼中,以使其更易於理解。

<code>private static void demo3() {
    Object lock = new Object();
    Thread A = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("INFO: A is waiting for the lock");
            synchronized (lock) {
                System.out.println("INFO: A got the lock");
                System.out.println("A 1");
                try {
                    System.out.println("INFO: A is ready to enter the wait state, giving up control of the lock");
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("INFO: B wakes up A, and A regains the lock");
                System.out.println("A 2");
                System.out.println("A 3");
            }
        }
    });
    Thread B = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("INFO: B is waiting for the lock");
            synchronized (lock) {
                System.out.println("INFO: B got the lock");
                System.out.println("B 1");
                System.out.println("B 2");
                System.out.println("B 3");
                System.out.println("INFO: B ends printing, and calling the notify method");
                lock.notify();
            }
        }
    });
    A.start();
    B.start();/<code>

結果如下:

<code>INFO: A is waiting for the lock
INFO: A got the lock
A 1
INFO: A is ready to enter the wait state, giving up control of the lock
INFO: B is waiting for the lock
INFO: B got the lock
B 1
B 2
B 3
INFO: B ends printing, and calling the notify method
INFO: B wakes up A, and A regains the lock
A 2
A 3
/<code>

在A,B和C都完成同步執行之後執行D

前面介紹的方法thread.join()允許一個線程在等待另一個線程完成運行之後繼續執行。但是,如果我們將A,B和C順序連接到D線程中,它將使A,B和C依次執行,而我們希望它們三個同步運行。

我們要實現的目標是:三個線程A,B和C可以同時開始運行,並且每個線程在獨立運行完成後都會通知D;在A,B和C全部完成運行之後,D才開始運行。因此,我們使用CountdownLatch來實現這種類型的通信。其基本用法是:

  1. 創建一個計數器,並設置一個初始值CountdownLatch countDownLatch = new CountDownLatch(3);
  2. 在等待線程中調用countDownLatch.await()方法,並進入等待狀態,直到計數值變為0;否則,計數值為0。
  3. 在其他線程中調用countDownLatch.countDown()方法,該方法會將計數值減少一;
  4. 當其他線程中的countDown()方法將計數值設為0時,等待線程中的countDownLatch.await()方法將立即退出並繼續執行以下代碼。

實現代碼如下:

<code>private static void runDAfterABC() {
    int worker = 3;
    CountDownLatch countDownLatch = new CountDownLatch(worker);
    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("D is waiting for other three threads");
            try {
                countDownLatch.await();
                System.out.println("All done, D starts working");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
    for (char threadName='A'; threadName <= 'C'; threadName++) {
        final String tN = String.valueOf(threadName);
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(tN + "is working");
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(tN + "finished");
                countDownLatch.countDown();
            }
        }).start();
    }
}/<code>

結果如下:

<code>D is waiting for other three threads
A is working
B is working
C is working
 
A finished
C finished
B finished
All done, D starts working
/<code>

實際上,CountDownLatch本身就是一個倒數計數器,我們將初始計數值設置為3。當D運行時,它首先調用countDownLatch.await()方法來檢查計數器值是否為0,並且它將保持等待狀態。 如果該值不為0。A,B和C分別運行完後,將分別使用countDownLatch.countDown()方法將倒數計數器減1。 當它們全部三個完成運行時,計數器將減少為0;否則,計數器將減少為0。 然後,將觸發D的await()方法到A,B和C結束,並且D將開始繼續執行。


因此,CountDownLatch適用於一個線程需要等待多個線程的情況。

3名選手準備跑步

三個跑步者準備分開,然後在每個跑步者準備就緒後同時開始跑步。

這次,三個線程A,B和C中的每個線程都需要分別進行準備,然後在三個線程全部準備好之後就開始同時運行。 我們應該如何實現呢?

上面的CountDownLatch可以用來遞減計數,但是當計數完成時,只有一個線程的await()方法將獲得響應,因此不能同時觸發多個線程。

為了達到線程互相等待的效果,我們可以使用CyclicBarrier數據結構,其基本用法是:

  1. 首先創建一個公共對象CyclicBarrier,並同時設置等待的線程數CyclicBarrier cyclicBarrier = new CyclicBarrier(3);。
  2. 這些線程開始同時進行準備。準備好之後,他們需要等待其他人完成準備工作,因此請調用cyclicBarrier.await()方法來等待其他人;
  3. 當所有需要同時等待的指定線程都調用cyclicBarrier.await()方法時,這意味著這些線程已經準備就緒,那麼這些線程將開始繼續同時執行。

實現代碼如下。想象一下,有三個跑步者需要同時開始跑步,因此他們需要等待其他跑步者準備就緒。

<code>private static void runABCWhenAllReady() {
    int runner = 3;
    CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
    final Random random = new Random();
    for (char runnerName='A'; runnerName <= 'C'; runnerName++) {
        final String rN = String.valueOf(runnerName);
        new Thread(new Runnable() {
            @Override
            public void run() {
                long prepareTime = random.nextInt(10000) + 100;
                System.out.println(rN + "is preparing for time:" + prepareTime);
                try {
                    Thread.sleep(prepareTime);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(rN + "is prepared, waiting for others");
                    cyclicBarrier.await(); // The current runner is ready, waiting for others to be ready
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(rN + "starts running"); // All the runners are ready to start running together
            }
        }).start();
    }
}/<code>

結果如下:

<code>A is preparing for time: 4131
B is preparing for time: 6349
C is preparing for time: 8206
 
A is prepared, waiting for others
 
B is prepared, waiting for others
 
C is prepared, waiting for others
 
C starts running
A starts running
B starts running
/<code>

子線程將結果返回到主線程

在實際開發中,通常我們需要創建子線程來執行一些耗時的任務,然後將執行結果傳遞迴主線程。那麼如何用Java實現呢?
因此,通常,在創建線程時,我們會將Runnable對象傳遞給Thread以便執行。 Runnable的定義如下:

<code>public interface Runnable {
    public abstract void run();
}/<code>

您可以看到run()方法執行後不會返回任何結果。如果要返回結果怎麼辦?在這裡,您可以使用另一個類似的接口類Callable:

<code>@FunctionalInterface
public interface Callable {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}/<code>

可以看出,Callable的最大區別在於它返回了泛型。

因此,下一個問題是,如何將子線程的結果傳遞回去? Java有一個FutureTask類,可以與Callable一起使用,但是請注意,用於獲取結果的get方法將阻塞主線程。

例如,我們希望子線程計算從1到100的總和,然後將結果返回給主線程。

<code>private static void doTaskWithResultInWorker() {
    Callable callable = new Callable() {
        @Override
        public Integer call() throws Exception {
            System.out.println("Task starts");
            Thread.sleep(1000);
            int result = 0;
            for (int i=0; i<=100; i++) {
                result += i;
            }
            System.out.println("Task finished and return result");
            return result;
        }
    };
    FutureTask futureTask = new FutureTask<>(callable);
    new Thread(futureTask).start();
    try {
        System.out.println("Before futureTask.get()");
        System.out.println("Result:" + futureTask.get());
        System.out.println("After futureTask.get()");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}/<code>

結果如下:

<code>Before futureTask.get()
 
Task starts
Task finished and return result
 
Result: 5050
After futureTask.get()
/<code>

可以看出,當主線程調用futureTask.get()方法時,它將阻塞主線程。 然後Callable開始在內部執行並返回操作結果; 然後futureTask.get()獲取結果,主線程恢復運行。

在這裡,我們可以瞭解到FutureTask和Callable可以直接在主線程中獲取子線程的結果,但是它們將阻塞主線程。 當然,如果您不想阻塞主線程,則可以考慮使用ExecutorService將FutureTask放入線程池中以管理執行。

總結

多線程是現代語言的常見功能,線程間通信,線程同步和線程安全是非常重要的主題。


分享到:


相關文章: