面試卡在多線程?那就分享幾道Java多線程高頻面試題,面試不用愁


面試卡在多線程?那就分享幾道Java多線程高頻面試題,面試不用愁

1. 多線程中的忙循環是什麼?

忙循環就是程序員用循環讓一個線程等待,不像傳統方法wait()、 sleep() 或 yield(),它們都放棄了CPU控制,而忙循環不會放棄CPU,它就是在運行一個空循環。

這麼做的目的是為了保留CPU緩存,在多核系統中,一個等待線程醒來的時候可能會在另一個內核運行,這樣會重建緩存。為了避免重建緩存和減少等待重建的時間就可以使用它了。

2. 什麼是自旋鎖?

沒有獲得鎖的線程一直循環在那裡看是否該鎖的保持者已經釋放了鎖,這就是自旋鎖。

3. 什麼是互斥鎖?

互斥鎖:從等待到解鎖過程,線程會從sleep狀態變為running狀態,過程中有線程上下文的切換,搶佔CPU等開銷。

4. 自旋鎖的優缺點?

自旋鎖不會引起調用者休眠,如果自旋鎖已經被別的線程保持,調用者就一直循環在那裡看是否該自旋鎖的保持者釋放了鎖。由於自旋鎖不會引起調用者休眠,所以自旋鎖的效率遠高於互斥鎖。

雖然自旋鎖效率比互斥鎖高,但它會存在下面兩個問題: 1、自旋鎖一直佔用CPU,在未獲得鎖的情況下,一直運行,如果不能在很短的時間內獲得鎖,會導致CPU效率降低。 2、試圖遞歸地獲得自旋鎖會引起死鎖。遞歸程序決不能在持有自旋鎖時調用它自己,也決不能在遞歸調用時試圖獲得相同的自旋鎖。

由此可見,我們要慎重的使用自旋鎖,自旋鎖適合於鎖使用者保持鎖時間比較短並且鎖競爭不激烈的情況。正是由於自旋鎖使用者一般保持鎖時間非常短,因此選擇自旋而不是睡眠是非常必要的,自旋鎖的效率遠高於互斥鎖。

5. 如何在兩個線程間共享數據?

同一個Runnable,使用全局變量。

第一種:將共享數據封裝到一個對象中,把這個共享數據所在的對象傳遞給不同的Runnable

第二種:將這些Runnable對象作為某一個類的內部類,共享的數據作為外部類的成員變量,對共享數據的操作分配給外部類的方法來完成,以此實現對操作共享數據的互斥和通信,作為內部類的Runnable來操作外部類的方法,實現對數據的操作

<code>class ShareData {
private int x = 0;

public synchronized void addx(){
x++;
System.out.println("x++ : "+x);
}
public synchronized void subx(){
x--;
System.out.println("x-- : "+x);
}
}

public class ThreadsVisitData {

public static ShareData share = new ShareData();

public static void main(String[] args) {
//final ShareData share = new ShareData();
new Thread(new Runnable() {
public void run() {
for(int i = 0;i<100;i++){
share.addx();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
for(int i = 0;i<100;i++){
share.subx();
}
}
}).start();
}
}
/<code>
面試卡在多線程?那就分享幾道Java多線程高頻面試題,面試不用愁

6. Java中Runnable和Callable有什麼不同?

Runnable和Callable都是接口, 不同之處: 1.Callable可以返回一個類型V,而Runnable不可以 2.Callable能夠拋出checked exception,而Runnable不可以。 3.Runnable是自從java1.1就有了,而Callable是1.5之後才加上去的 4.Callable和Runnable都可以應用於executors。而Thread類只支持Runnable.

<code>import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadTestB {
public static void main(String[] args) {
ExecutorService e=Executors.newFixedThreadPool(10);
Future f1=e.submit(new MyCallableA());
Future f2=e.submit(new MyCallableA());
Future f3=e.submit(new MyCallableA());
System.out.println("--Future.get()....");
try {
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
e.shutdown();
}
}

class MyCallableA implements Callable<string>{
public String call() throws Exception {
System.out.println("開始執行Callable");
String[] ss={"zhangsan","lisi"};
long[] num=new long[2];
for(int i=0;i<1000000;i++){
num[(int)(Math.random()*2)]++;
}

if(num[0]>num[1]){
return ss[0];

}else if(num[0] throw new Exception("棄權!");
}else{
return ss[1];
}
}
}
/<string>/<code>

7. Java中CyclicBarrier 和 CountDownLatch有什麼不同?

CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:

CountDownLatch一般用於某個線程A等待若干個其他線程執行完任務之後,它才執行;
CyclicBarrier一般用於一組線程互相等待至某個狀態,然後這一組線程再同時執行;
另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。
CountDownLatch的用法:

<code>public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(){
public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();

new Thread(){

public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();

try {
System.out.println("等待2個子線程執行完畢...");
latch.await();
System.out.println("2個子線程已經執行完畢");
System.out.println("繼續執行主線程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/<code>

CyclicBarrier用法:

<code>public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
@Override
public void run() {
System.out.println("當前線程"+Thread.currentThread().getName());
}
});

for(int i=0;i new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override

public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
try {
Thread.sleep(5000); //以睡眠來模擬寫入數據操作
System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有線程寫入完畢,繼續處理其他任務...");
}
}
}
/<code>

8. Java中interrupted和isInterruptedd方法的區別?

interrupt方法用於中斷線程。調用該方法的線程的狀態為將被置為"中斷"狀態。

注意:線程中斷僅僅是置線程的中斷狀態位,不會停止線程。需要用戶自己去監視線程的狀態為並做處理。支持線程中斷的方法(也就是線程中斷後會拋出interruptedException的方法)就是在監視線程的中斷狀態,一旦線程的中斷狀態被置為“中斷狀態”,就會拋出中斷異常。

isInterrupted 只是簡單的查詢中斷狀態,不會對狀態進行修改。

9. concurrentHashMap的源碼理解以及內部實現原理,為什麼他是同步的且效率高

ConcurrentHashMap 分析

ConcurrentHashMap的結構是比較複雜的,都深究去本質,其實也就是數組和鏈表而已。我們由淺入深慢慢的分析其結構。

先簡單分析一下,ConcurrentHashMap 的成員變量中,包含了一個 Segment 的數組(final Segment[] segments;),而 Segment 是 ConcurrentHashMap 的內部類,然後在 Segment 這個類中,包含了一個 HashEntry 的數組(transient volatile HashEntry[] table;)。而 HashEntry 也是ConcurrentHashMap 的內部類。HashEntry 中,包含了 key 和 value 以及 next 指針(類似於 HashMap 中 Entry),所以 HashEntry 可以構成一個鏈表。

所以通俗的講,ConcurrentHashMap 數據結構為一個 Segment 數組,Segment 的數據結構為 HashEntry 的數組,而 HashEntry 存的是我們的鍵值對,可以構成鏈表。

首先,我們看一下 HashEntry 類。

HashEntry

HashEntry 用來封裝散列映射表中的鍵值對。在 HashEntry 類中,key,hash 和 next 域都被聲明為 final 型,value 域被聲明為 volatile 型。其類的定義為:

<code>static final class HashEntry {
final int hash;
final K key;

volatile V value;
volatile HashEntry next;

HashEntry(int hash, K key, V value, HashEntry next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
...
...
}
/<code>

HashEntry 的學習可以類比著 HashMap 中的 Entry。我們的存儲鍵值對的過程中,散列的時候如果發生“碰撞”,將採用“分離鏈表法”來處理碰撞:把碰撞的 HashEntry 對象鏈接成一個鏈表。

Segment

Segment 的類定義為static final class Segment extends ReentrantLock implements Serializable。其繼承於 ReentrantLock 類,從而使得 Segment 對象可以充當鎖的角色。Segment 中包含HashEntry 的數組,其可以守護其包含的若干個桶(HashEntry的數組)。Segment 在某些意義上有點類似於 HashMap了,都是包含了一個數組,而數組中的元素可以是一個鏈表。

table:table 是由 HashEntry 對象組成的數組如果散列時發生碰撞,碰撞的 HashEntry 對象就以鏈表的形式鏈接成一個鏈表table數組的數組成員代表散列映射表的一個桶每個 table 守護整個 ConcurrentHashMap 包含桶總數的一部分如果併發級別為 16,table 則守護 ConcurrentHashMap 包含的桶總數的 1/16。

count 變量是計算器,表示每個 Segment 對象管理的 table 數組(若干個 HashEntry 的鏈表)包含的HashEntry 對象的個數。之所以在每個Segment對象中包含一個 count 計數器,而不在 ConcurrentHashMap 中使用全局的計數器,是為了避免出現“熱點域”而影響併發性。

<code>/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment extends ReentrantLock implements Serializable {
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry[] table;

/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
transient int modCount;
/**
* 裝載因子
*/
final float loadFactor;
}
/<code>

ConcurrentHashMap

ConcurrentHashMap 的結構中包含的 Segment 的數組,在默認的併發級別會創建包含 16 個 Segment 對象的數組。通過我們上面的知識,我們知道每個 Segment 又包含若干個散列表的桶,每個桶是由 HashEntry 鏈接起來的一個鏈表。如果 key 能夠均勻散列,每個 Segment 大約守護整個散列表桶總數的 1/16。

併發寫操作

在 ConcurrentHashMap 中,當執行 put 方法的時候,會需要加鎖來完成。我們通過代碼來解釋一下具體過程: 當我們 new 一個 ConcurrentHashMap 對象,並且執行put操作的時候,首先會執行 ConcurrentHashMap 類中的 put 方法,該方法源碼為:

<code>/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
*

The value can be retrieved by calling the get method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with key, or
* null if there was no mapping for key
* @throws NullPointerException if the specified key or value is null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}


/<code>

我們通過註釋可以瞭解到,ConcurrentHashMap 不允許空值。該方法首先有一個 Segment 的引用 s,然後會通過 hash() 方法對 key 進行計算,得到哈希值;繼而通過調用 Segment 的 put(K key, int hash, V value, boolean onlyIfAbsent)方法進行存儲操作。該方法源碼為:

<code>final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//加鎖,這裡是鎖定的Segment而不是整個ConcurrentHashMap
HashEntry node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
//得到hash對應的table中的索引index
int index = (tab.length - 1) & hash;
//找到hash對應的是具體的哪個桶,也就是哪個HashEntry鏈表
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else

node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//解鎖
unlock();
}
return oldValue;
}
/<code>

關於該方法的某些關鍵步驟,在源碼上加上了註釋。

需要注意的是:加鎖操作是針對的 hash 值對應的某個 Segment,而不是整個 ConcurrentHashMap。因為 put 操作只是在這個 Segment 中完成,所以並不需要對整個 ConcurrentHashMap 加鎖。所以,此時,其他的線程也可以對另外的 Segment 進行 put 操作,因為雖然該 Segment 被鎖住了,但其他的 Segment 並沒有加鎖。同時,讀線程並不會因為本線程的加鎖而阻塞。

正是因為其內部的結構以及機制,所以 ConcurrentHashMap 在併發訪問的性能上要比Hashtable和同步包裝之後的HashMap的性能提高很多。在理想狀態下,ConcurrentHashMap 可以支持 16 個線程執行併發寫操作(如果併發級別設置為 16),及任意數量線程的讀操作。

總結

在實際的應用中,散列表一般的應用場景是:除了少數插入操作和刪除操作外,絕大多數都是讀取操作,而且讀操作在大多數時候都是成功的。正是基於這個前提,ConcurrentHashMap 針對讀操作做了大量的優化。通過 HashEntry 對象的不變性和用 volatile 型變量協調線程間的內存可見性,使得 大多數時候,讀操作不需要加鎖就可以正確獲得值。這個特性使得 ConcurrentHashMap 的併發性能在分離鎖的基礎上又有了近一步的提高。

ConcurrentHashMap 是一個併發散列映射表的實現,它允許完全併發的讀取,並且支持給定數量的併發更新。相比於 HashTable 和用同步包裝器包裝的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 擁有更高的併發性。在 HashTable 和由同步包裝器包裝的 HashMap 中,使用一個全局的鎖來同步不同線程間的併發訪問。同一時間點,只能有一個線程持有鎖,也就是說在同一時間點,只能有一個線程能訪問容器。這雖然保證多線程間的安全併發訪問,但同時也導致對容器的訪問變成串行化的了。

ConcurrentHashMap 的高併發性主要來自於三個方面:

  • 用分離鎖實現多個線程間的更深層次的共享訪問。
  • 用 HashEntery 對象的不變性來降低執行讀操作的線程在遍歷鏈表期間對加鎖的需求。
  • 通過對同一個 Volatile 變量的寫 / 讀訪問,協調不同線程間讀 / 寫操作的內存可見性。

使用分離鎖,減小了請求 同一個鎖的頻率。

通過 HashEntery 對象的不變性及對同一個 Volatile 變量的讀 / 寫來協調內存可見性,使得 讀操作大多數時候不需要加鎖就能成功獲取到需要的值。由於散列映射表在實際應用中大多數操作都是成功的 讀操作,所以 2 和 3 既可以減少請求同一個鎖的頻率,也可以有效減少持有鎖的時間。通過減小請求同一個鎖的頻率和儘量減少持有鎖的時間 ,使得 ConcurrentHashMap 的併發性相對於 HashTable 和用同步包裝器包裝的 HashMap有了質的提高。

面試卡在多線程?那就分享幾道Java多線程高頻面試題,面試不用愁

10. BlockingQueue的使用?

BlockingQueue的原理

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裡添加元素的線程,消費者是從隊列裡拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

BlockingQueue的核心方法:

  1. add(E e): 添加元素,如果BlockingQueue可以容納,則返回true,否則報異常
  2. offer(E e): 添加元素,如果BlockingQueue可以容納,則返回true,否則返回false.
  3. put(E e): 添加元素,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裡面有空間再繼續.
  4. poll(long timeout, TimeUnit timeUnit): 取走BlockingQueue裡排在首位的對象,若不能立即取出,則可以等timeout參數規定的時間,取不到時返回null
  5. take(): 取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止

BlockingQueue常用實現類

  1. ArrayBlockingQueue: 有界的先入先出順序隊列,構造方法確定隊列的大小.
  2. LinkedBlockingQueue: 無界的先入先出順序隊列,構造方法提供兩種,一種初始化隊列大小,隊列即有界;第二種默認構造方法,隊列無界(有界即Integer.MAX_VALUE)
  3. SynchronousQueue: 特殊的BlockingQueue,沒有空間的隊列,即必須有取的方法阻塞在這裡的時候才能放入元素。
  4. PriorityBlockingQueue: 支持優先級的阻塞隊列 ,存入對象必須實現Comparator接口 (需要注意的是 隊列不是在加入元素的時候進行排序,而是取出的時候,根據Comparator來決定優先級最高的)。

BlockingQueue<> 隊列的作用

BlockingQueue 實現主要用於生產者-使用者隊列,BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的

這是一個生產者-使用者場景的一個用例。注意,BlockingQueue 可以安全地與多個生產者和多個使用者一起使用 此用例來自jdk文檔

<code>//這是一個生產者類
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while(true) {
queue.put(produce());
}
} catch (InterruptedException ex) {
... handle ...
}
}
Object produce() {
...
}
}

//這是一個消費者類
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) {
consume(queue.take());
}
} catch (InterruptedException ex) {
... handle ...
}
}
void consume(Object x) {
...
}
}

//這是實現類
class Setup {

void main() {
//實例一個非阻塞隊列
BlockingQueue q = new SomeQueueImplementation();
//將隊列傳入兩個消費者和一個生產者中
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
/<code>

11. ThreadPool的深入考察?

引言

合理利用線程池能夠帶來三個好處。

  1. 降低資源消耗。通過重複利用已創建的線程降低線程創建和銷燬造成的消耗。
  2. 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

但是要做到合理的利用線程池,必須對其原理了如指掌。

線程池的使用

我們可以通過ThreadPoolExecutor來創建一個線程池。

<code>new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
/<code>

創建一個線程池需要輸入幾個參數:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閒的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。
  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什麼效果。
  • ThreadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。
  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。AbortPolicy:直接拋出異常。CallerRunsPolicy:只用調用者所在線程來運行任務。DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。DiscardPolicy:不處理,丟棄掉。 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工作線程空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。
  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

向線程池提交任務

我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。

<code>threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
/<code>

我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

<code>Future<object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉線程池
executor.shutdown();
}
/<object>/<code>

線程池的關閉

我們可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。

只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。

源碼分析

上面的流程分析讓我們很直觀的瞭解了線程池的工作原理,讓我們再通過源代碼來看看是如何實現的。線程池執行任務的方法如下:

<code>public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果線程數小於基本線程數,則創建線程並執行當前任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如線程數大於等於基本線程數或線程創建失敗,則將當前任務放到工作隊列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果線程池不處於運行中或任務無法放入隊列,並且當前線程數量小於最大允許的線程數量,
則創建一個線程執行任務。
else if (!addIfUnderMaximumPoolSize(command))
//拋出RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}
/<code>

工作線程。線程池創建線程時,會將線程封裝成工作線程Worker,Worker在執行完任務後,還會無限循環獲取工作隊列裡的任務來執行。我們可以從Worker的run方法裡看到這點:

<code>public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}

}
/<code>

合理的配置線程池

要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置儘可能小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則由於線程並不是一直在執行任務,則配置儘可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列裡,那麼優先級低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務線程池裡的任務全是需要向數據庫查詢和插入數據的,所以導致線程池裡的工作線程全部阻塞住,任務積壓在線程池裡。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是後臺任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

線程池的監控

通過線程池提供的參數進行監控。線程池裡有一些屬性在監控線程池的時候可以使用

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運行過程中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。如果線程池不銷燬的話,池裡的線程不會自動銷燬,所以這個大小隻增不+getActiveCount:獲取活動的線程數。

通過擴展線程池進行監控。通過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行後和線程池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裡是空方法。如:

<code>protected void beforeExecute(Thread t, Runnable r) { }
/<code>

12. Java中Semaphore是什麼?

Java中的Semaphore是一種新的同步類,它是一個計數信號。

從概念上講,信號量維護了一個許可集合。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release()添加一個許可,從而可能釋放一個正在阻塞的獲取者。

但是,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數,並採取相應的行動。

信號量常常用於多線程的代碼中,比如數據庫連接池。

13. 同步方法和同步代碼塊的區別是什麼?

同步方法默認用this或者當前類class對象作為鎖; 同步代碼塊可以選擇以什麼來加鎖,比同步方法要更細顆粒度,我們可以選擇只同步會發生同步問題的部分代碼而不是整個方法; 同步方法使用關鍵字 synchronized修飾方法,而同步代碼塊主要是修飾需要進行同步的代碼,用 synchronized(object){代碼內容}進行修飾;

14. 同步方法和同步代碼塊的區別是什麼?

同步方法默認用this或者當前類class對象作為鎖; 同步代碼塊可以選擇以什麼來加鎖,比同步方法要更細顆粒度,我們可以選擇只同步會發生同步問題的部分代碼而不是整個方法; 同步方法使用關鍵字 synchronized修飾方法,而同步代碼塊主要是修飾需要進行同步的代碼,用 synchronized(object){代碼內容}進行修飾;

15. 如何確保N個線程可以訪問N個資源同時又不導致死鎖?

使用多線程的時候,一種非常簡單的避免死鎖的方式就是:指定獲取鎖的順序,並強制線程按照指定的順序獲取鎖。因此,如果所有的線程都是以同樣的順序加鎖和釋放鎖,就不會出現死鎖了。

Java讀者福利:筆者把近一年經歷過的Java崗位面試,和一些刷過的面試題都做成了PDF,PDF都是免費分享,關注私信我:【888】,即可免費領取!

面試卡在多線程?那就分享幾道Java多線程高頻面試題,面試不用愁


分享到:


相關文章: