用300 行代碼帶你看懂 Java 多線程


用300 行代碼帶你看懂 Java 多線程


  線程

  線程(英語:Thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務。在Unix System V及SunOS中也被稱為輕量進程(Lightweight Processes),但輕量進程更多指內核線程(Kernel Thread),而把用戶線程(User Thread)稱為線程。


  1.1 線程與進程的區別


用300 行代碼帶你看懂 Java 多線程


  進程:指在系統中正在運行的一個應用程序;程序一旦運行就是進程;進程——資源分配的最小單位。


  線程:系統分配處理器時間資源的基本單元,或者說進程之內獨立執行的一個單元執行流。線程——程序執行的最小單位。


  也就是,進程可以包含多個線程,而線程是程序執行的最小單位。


  1.2 線程的狀態


用300 行代碼帶你看懂 Java 多線程


  NEW:線程剛創建


  RUNNABLE: 在JVM中正在運行的線程,其中運行狀態可以有運行中RUNNING和READY兩種狀態,由系統調度進行狀態改變。


  BLOCKED:線程處於阻塞狀態,等待監視鎖,可以重新進行同步代碼塊中執行


  WAITING : 等待狀態


  TIMED_WAITING: 調用sleep() join() wait()方法可能導致線程處於等待狀態


  TERMINATED: 線程執行完畢,已經退出


  1.3 Notify和Wait :


  Notify和Wait 的作用


  首先看源碼給出的解釋,這裡翻譯了一下:


  Notify:喚醒一個正在等待這個對象的線程監控。如果有任何線程正在等待這個對象,那麼它們中的一個被選擇被喚醒。選擇是任意的,發生在執行的酌情權。一個線程等待一個對象通過調用一個{@code wait}方法進行監視。


  Notify()需要在同步方法或同步塊中調用,即在調用前,線程也必須獲得該對象的對象級別鎖


  Wait:導致當前線程等待,直到另一個線程調用{@link java.lang.Object#notify()}方法或{@link java.lang.Object#notifyAll()}方法。


  換句話說,這個方法的行為就像它簡單一樣執行調用{@code wait(0)}。當前線程必須擁有該對象的監視器。


  線程釋放此監視器的所有權,並等待另一個線程通知等待該對象的監視器的線程,喚醒通過調用{@code notify}方法或{@code notifyAll}方法。然後線程等待,直到它可以重新取得監視器的所有權,然後繼續執行。


  Wait()的作用是使當前執行代碼的線程進行等待,它是Object類的方法,該方法用來將當前線程置入預執行隊列中,並且在Wait所在的代碼行處停止執行,直到接到通知或被中斷為止。


  在調用Wait方法之前,線程必須獲得該對象的對象級別鎖,即只能在同步方法或同步塊中調用Wait方法。


  Wait和Sleep的區別:


  它們最大本質的區別是,Sleep()不釋放同步鎖,Wait()釋放同步鎖。


  還有用法的上的不同是:Sleep(milliseconds)可以用時間指定來使他自動醒過來,如果時間不到你只能調用Interreput()來強行打斷;Wait()可以用Notify()直接喚起。


  這兩個方法來自不同的類分別是Thread和Object


  最主要是Sleep方法沒有釋放鎖,而Wait方法釋放了鎖,使得其他線程可以使用同步控制塊或者方法。


  1.4 Thread.sleep() 和Thread.yield()的異同


  相同 :Sleep()和yield()都會釋放CPU。


  不同:Sleep()使當前線程進入停滯狀態,所以執行Sleep()的線程在指定的時間內肯定不會執行;yield()只是使當前線程重新回到可執行狀態,所以執行yield()的線程有可能在進入到可執行狀態後馬上又被執行。Sleep()可使優先級低的線程得到執行的機會,當然也可以讓同優先級和高優先級的線程有執行的機會;yield()只能使同優先級的線程有執行的機會。


  1.5 補充:死鎖的概念


  死鎖:指兩個或兩個以上的進程(或線程)在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。


  死鎖產生的四個必要條件(缺一不可):


  互斥條件:顧名思義,線程對資源的訪問是排他性,當該線程釋放資源後下一線程才可進行佔用。


  請求和保持:簡單來說就是自己拿的不放手又等待新的資源到手。線程T1至少已經保持了一個資源R1佔用,但又提出對另一個資源R2請求,而此時,資源R2被其他線程T2佔用,於是該線程T1也必須等待,但又對自己保持的資源R1不釋放。


  不可剝奪:在沒有使用完資源時,其他線性不能進行剝奪。


  循環等待:一直等待對方線程釋放資源。


  我們可以根據死鎖的四個必要條件破壞死鎖的形成。


  1.6 補充:併發和並行的區別


  併發:是指在某個時間段內,多任務交替的執行任務。當有多個線程在操作時,把CPU運行時間劃分成若干個時間段,再將時間段分配給各個線程執行。在一個時間段的線程代碼運行時,其它線程處於掛起狀。


  並行:是指同一時刻同時處理多任務的能力。當有多個線程在操作時,CPU同時處理這些線程請求的能力。


  區別就在於CPU是否能同時處理所有任務,併發不能,並行能。


  1.7 補充:線程安全三要素


  原子性:Atomic包、CAS算法、Synchronized、Lock。


  可見性:Synchronized、Volatile(不能保證原子性)。


  有序性:Happens-before規則。


  1.8 補充:如何實現線程安全


  互斥同步:Synchronized、Lock。


  非阻塞同步:CAS。


  無需同步的方案:如果一個方法本來就不涉及共享數據,那它自然就無需任何同步操作去保證正確性。


  1.9 補充:保證線程安全的機制:


  Synchronized關鍵字


  Lock


  CAS、原子變量


  ThreadLocl:簡單來說就是讓每個線程,對同一個變量,都有自己的獨有副本,每個線程實際訪問的對象都是自己的,自然也就不存在線程安全問題了。


  Volatile


  CopyOnWrite寫時複製


  隨著CPU核心的增多以及互聯網迅速發展,單線程的程序處理速度越來越跟不上發展速度和大數據量的增長速度,多線程應運而生,充分利用CPU資源的同時,極大提高了程序處理速度。


  創建線程的方法


  繼承Thread類:


  實現Runable接口:


  通過Callable和Future創建線程:


  publicclassCallableCreateTest{


  publicstaticvoidmain(String[] args)throwsException{


  // 將Callable包裝成FutureTask,FutureTask也是一種Runnable


  MyCallable callable =newMyCallable();


  FutureTask futureTask =newFutureTask(callable);


  newThread(futureTask).start();


  // get方法會阻塞調用的線程


  Integer sum = futureTask.get();


  System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId() +"="+ sum);


  }


  }


  classMyCallableimplementsCallable{


  @Override


  publicIntegercall()throwsException{


  System.out.println(Thread.currentThread().getName() +"\\t"+ Thread.currentThread().getId() +"\\t"+newDate() +" \\tstarting...");


  intsum =;


  for(inti =; i


  sum += i;


  }


  Thread.sleep(5000);


  System.out.println(Thread.currentThread().getName() +"\\t"+ Thread.currentThread().getId() +"\\t"+newDate() +" \\tover...");


  returnsum;


  }


  }


  線程池方式創建:


  實現Runnable接口這種方式更受歡迎,因為這不需要繼承Thread類。在應用設計中已經繼承了別的對象的情況下,這需要多繼承(而Java不支持多繼承,但可以多實現啊),只能實現接口。同時,線程池也是非常高效的,很容易實現和使用。


  實際開發中,阿里巴巴開發插件一直提倡使用線程池創建線程,原因在下方會解釋,所以上面的代碼我就只簡寫了一些Demo。


  2.1 線程池創建線程


  線程池,顧名思義,線程存放的地方。和數據庫連接池一樣,存在的目的就是為了較少系統開銷,主要由以下幾個特點:


  降低資源消耗。通過重複利用已創建的線程降低線程創建和銷燬造成的消耗(主要)。


  提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。


  提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性。


  Java提供四種線程池創建方式:


  newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閒線程,若無可回收,則新建線程。


  newFixedThreadPool創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。


  newScheduledThreadPool創建一個定長線程池,支持定時及週期性任務執行。


  newSingleThreadExecutor創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。


  通過源碼我們得知ThreadPoolExecutor繼承自AbstractExecutorService,而AbstractExecutorService實現了ExecutorService。


  publicclassThreadPoolExecutorextendsAbstractExecutorService


  publicabstractclassAbstractExecutorServiceimplementsExecutorService


用300 行代碼帶你看懂 Java 多線程


  2.2 ThreadPoolExecutor介紹


  實際項目中,用的最多的就是ThreadPoolExecutor這個類,而《阿里巴巴Java開發手冊》中強制線程池不允許使用Executors去創建,而是通過New ThreadPoolExecutor實例的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。


  我們從ThreadPoolExecutor入手多線程創建方式,先看一下線程池創建的最全參數。


  publicThreadPoolExecutor(intcorePoolSize,


  intmaximumPoolSize,


  longkeepAliveTime,


  TimeUnit unit,


  BlockingQueue workQueue,


  ThreadFactory threadFactory,


  RejectedExecutionHandler handler){


  if(corePoolSize


  maximumPoolSize


  maximumPoolSize


  keepAliveTime


  thrownewIllegalArgumentException();


  if(workQueue ==null|| threadFactory ==null|| handler ==null)


  thrownewNullPointerException();


  this.corePoolSize = corePoolSize;


  this.maximumPoolSize = maximumPoolSize;


  this.workQueue = workQueue;


  this.keepAliveTime = unit.toNanos(keepAliveTime);


  this.threadFactory = threadFactory;


  this.handler = handler;


  }


  參數說明如下:


  corePoolSize:線程池的核心線程數,即便線程池裡沒有任何任務,也會有corePoolSize個線程在候著等任務。


  maximumPoolSize:最大線程數,不管提交多少任務,線程池裡最多工作線程數就是maximumPoolSize。


  keepAliveTime:線程的存活時間。當線程池裡的線程數大於corePoolSize時,如果等了keepAliveTime時長還沒有任務可執行,則線程退出。


  Unit:這個用來指定keepAliveTime的單位,比如秒:TimeUnit.SECONDS。


  BlockingQueue:一個阻塞隊列,提交的任務將會被放到這個隊列裡。


  threadFactory:線程工廠,用來創建線程,主要是為了給線程起名字,默認工廠的線程名字:pool-1-thread-3。


  handler:拒絕策略,當線程池裡線程被耗盡,且隊列也滿了的時候會調用。


  2.2.1BlockingQueue


  對於BlockingQueue個人感覺還需要單獨拿出來說一下。


  BlockingQueue:阻塞隊列,有先進先出(注重公平性)和先進後出(注重時效性)兩種,常見的有兩種阻塞隊列:ArrayBlockingQueue和LinkedBlockingQueue


  隊列的數據結構大致如圖:


用300 行代碼帶你看懂 Java 多線程


  隊列一端進入,一端輸出。而當隊列滿時,阻塞。BlockingQueue核心方法:1. 放入數據put2. 獲取數據take。常見的兩種Queue:


  2.2.2 ArrayBlockingQueue


  基於數組實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。


  一段代碼來驗證一下:


  packagemap;


  importjava.util.concurrent.*;


  publicclassMyTestMap{


  // 定義阻塞隊列大小


  privatestaticfinalintmaxSize =5;


  publicstaticvoidmain(String[] args){


  ArrayBlockingQueuequeue=newArrayBlockingQueue(maxSize);


  newThread(newProductor(queue)).start();


  newThread(newCustomer(queue)).start();


  }


  }


  classCustomerimplementsRunnable{


  privateBlockingQueuequeue;


  Customer(BlockingQueuequeue) {


  this.queue=queue;


  }


  @Override


  publicvoidrun(){


  this.cusume();


  }


  privatevoidcusume(){


  while(true) {


  try{


  intcount = (int)queue.take();


  System.out.println("customer正在消費第"+ count +"個商品===");


  // 只是為了方便觀察輸出結果


  Thread.sleep(10);


  }catch(InterruptedException e) {


  e.printStackTrace();


  }


  }


  }


  }


  classProductorimplementsRunnable{


  privateBlockingQueuequeue;


  privateintcount =1;


  Productor(BlockingQueuequeue) {


  this.queue=queue;


  }


  @Override


  publicvoidrun(){


  this.product();


  }


  privatevoidproduct(){


  while(true) {


  try{


  queue.put(count);


  System.out.println("生產者正在生產第"+ count +"個商品");


  count++;


  }catch(InterruptedException e) {


  e.printStackTrace();


  }


  }


  }


  }


  //輸出如下


  /**


  生產者正在生產第1個商品


  生產者正在生產第2個商品


  生產者正在生產第3個商品


  生產者正在生產第4個商品


  生產者正在生產第5個商品


  customer正在消費第1個商品===


  */


  2.2.3 LinkedBlockingQueue


  基於鏈表的阻塞隊列,內部也維護了一個數據緩衝隊列。需要我們注意的是如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小。


  LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。


  2.2.4 LinkedBlockingQueue和ArrayBlockingQueue的主要區別


  ArrayBlockingQueue的初始化必須傳入隊列大小,LinkedBlockingQueue則可以不傳入。


  ArrayBlockingQueue用一把鎖控制併發,LinkedBlockingQueue倆把鎖控制併發,鎖的細粒度更細。即前者生產者消費者進出都是一把鎖,後者生產者生產進入是一把鎖,消費者消費是另一把鎖。


  ArrayBlockingQueue採用數組的方式存取,LinkedBlockingQueue用Node鏈表方式存取。


  2.2.5handler拒絕策略


  Java提供了4種丟棄處理的方法,當然你也可以自己實現,主要是要實現接口:RejectedExecutionHandler中的方法。


  AbortPolicy:不處理,直接拋出異常。


  CallerRunsPolicy:只用調用者所在線程來運行任務,即提交任務的線程。


  DiscardOldestPolicy:LRU策略,丟棄隊列裡最近最久不使用的一個任務,並執行當前任務。


  DiscardPolicy:不處理,丟棄掉,不拋出異常。


  2.2.6線程池五種狀態


  privatestaticfinalintRUNNING = -1


  privatestaticfinalintSHUTDOWN =


  privatestaticfinalintSTOP =1


  privatestaticfinalintTIDYING =2


  privatestaticfinalintTERMINATED =3


  RUNNING:在這個狀態的線程池能判斷接受新提交的任務,並且也能處理阻塞隊列中的任務。


  SHUTDOWN:處於關閉的狀態,該線程池不能接受新提交的任務,但是可以處理阻塞隊列中已經保存的任務,在線程處於RUNNING狀態,調用shutdown()方法能切換為該狀態。


  STOP:線程池處於該狀態時既不能接受新的任務也不能處理阻塞隊列中的任務,並且能中斷現在線程中的任務。當線程處於RUNNING和SHUTDOWN狀態,調用shutdownNow()方法就可以使線程變為該狀態。


  TIDYING:在SHUTDOWN狀態下阻塞隊列為空,且線程中的工作線程數量為0就會進入該狀態,當在STOP狀態下時,只要線程中的工作線程數量為0就會進入該狀態。


  TERMINATED:在TIDYING狀態下調用terminated()方法就會進入該狀態。可以認為該狀態是最終的終止狀態。


  回到線程池創建ThreadPoolExecutor,我們瞭解了這些參數,再來看看ThreadPoolExecutor的內部工作原理:


用300 行代碼帶你看懂 Java 多線程


  判斷核心線程是否已滿,是進入隊列,否:創建線程


  判斷等待隊列是否已滿,是:查看線程池是否已滿,否:進入等待隊列


  查看線程池是否已滿,是:拒絕,否創建線程


  2.3深入理解ThreadPoolExecutor


  進入Execute方法可以看到:


  publicvoidexecute(Runnable command){


  if(command ==null)


  thrownewNullPointerException();


  intc = ctl.get();


  //判斷當前活躍線程數是否小於corePoolSize,如果小於,則調用addWorker創建線程執行任務


  if(workerCountOf(c)


  if(addWorker(command,true))


  return;


  c = ctl.get();


  }


  //如果不小於corePoolSize,則將任務添加到workQueue隊列。


  if(isRunning(c) && workQueue.offer(command)) {


  intrecheck = ctl.get();


  if(! isRunning(recheck) &&remove(command))


  reject(command);


  elseif(workerCountOf(recheck) ==)


  addWorker(null,false);


  }


  //如果放入workQueue失敗,則創建線程執行任務,如果這時創建線程失敗(當前線程數不小於maximumPoolSize時),就會調用reject(內部調用handler)拒絕接受任務。


  elseif(!addWorker(command,false))


  reject(command);


  }


  AddWorker方法:


  創建Worker對象,同時也會實例化一個Thread對象。在創建Worker時會調用threadFactory來創建一個線程。


  然後啟動這個線程。


  2.3.1線程池中CTL屬性的作用是什麼?


  看源碼第一反應就是這個CTL到底是個什麼東東?有啥用?一番研究得出如下結論:


  CTL屬性包含兩個概念:


  privatefinalAtomicInteger ctl =newAtomicInteger(ctlOf(RUNNING,));


  privatestaticintctlOf(intrs,intwc){returnrs | wc; }


  runState:即rs 表明當前線程池的狀態,是否處於Running,Shutdown,Stop,Tidying。


  workerCount:即wc表明當前有效的線程數。


  我們點擊workerCount即工作狀態記錄值,以RUNNING為例,RUNNING = -1


  privatestaticfinalintCOUNT_BITS = Integer.SIZE -3;


  既然是29位那麼就是Running的值為:


  11100000000000000000000000000000


  |||


  31~29位


  那低28位呢,就是記錄當前線程的總線數啦:


  // Packing and unpacking ctl


  privatestaticintrunStateOf(intc){returnc & ~CAPACITY; }


  privatestaticintworkerCountOf(intc){returnc & CAPACITY; }


  privatestaticintctlOf(intrs,intwc){returnrs | wc; }


  從上述代碼可以看到workerCountOf這個函數傳入ctl之後,是通過CTL&CAPACITY操作來獲取當前運行線程總數的。


  也就是RunningState|WorkCount&CAPACITY,算出來的就是低28位的值。因為CAPACITY得到的就是高3位(29-31位)位0,低28位(0-28位)都是1,所以得到的就是ctl中低28位的值。


  而runStateOf這個方法的話,算的就是RunningState|WorkCount&CAPACITY,高3位的值,因為CAPACITY是CAPACITY的取反,所以得到的就是高3位(29-31位)為1,低28位(0-28位)為0,所以通過&運算後,所得到的值就是高3為的值。


  簡單來說就是ctl中是高3位作為狀態值,低28位作為線程總數值來進行存儲。


  2.3.2 shutdownNow和shutdown的區別


  看源碼發現有兩種近乎一樣的方法,shutdownNow和shutdown,設計者這麼設計自然是有它的道理,那麼這兩個方法的區別在哪呢?


  shutdown會把線程池的狀態改為SHUTDOWN,而shutdownNow把當前線程池狀態改為STOP。


  shutdown只會中斷所有空閒的線程,而shutdownNow會中斷所有的線程。


  shutdown返回方法為空,會將當前任務隊列中的所有任務執行完畢;而shutdownNow把任務隊列中的所有任務都取出來返回。


  2.3.3 線程複用原理


  finalvoidrunWorker(Worker w){


  Thread wt = Thread.currentThread();


  Runnable task = w.firstTask;


  w.firstTask =null;


  w.unlock();// allow interrupts


  boolean completedAbruptly =true;


  try{


  while(task !=null|| (task = getTask()) !=null) {


  w.lock();


  // If pool is stopping, ensure thread is interrupted;


  // if not, ensure thread is not interrupted. This


  // requires a recheck in second case to deal with


  // shutdownNow race while clearing interrupt


  if((runStateAtLeast(ctl.get(), STOP) ||


  (Thread.interrupted() &&


  runStateAtLeast(ctl.get(), STOP))) &&


  !wt.isInterrupted())


  wt.interrupt();


  try{


  beforeExecute(wt, task);


  Throwable thrown =null;


  try{


  task.run();


  }catch(RuntimeException x) {


  thrown = x;throwx;


  }catch(Error x) {


  thrown = x;throwx;


  }catch(Throwable x) {


  thrown = x;thrownewError(x);


  }finally{


  afterExecute(task, thrown);


  }


  }finally{


  task =null;


  w.completedTasks++;


  w.unlock();


  }


  }


  completedAbruptly =false;


  }finally{


  processWorkerExit(w, completedAbruptly);


  }


  }


  就是任務在並不只執行創建時指定的firstTask第一任務,還會從任務隊列的中自己主動取任務執行,而且是有或者無時間限定的阻塞等待,以保證線程的存活。


  默認的是不允許。


  2.4 CountDownLatch和CyclicBarrier區別


  countDownLatch是一個計數器,線程完成一個記錄一個,計數器遞減,只能只用一次。


  CyclicBarrier的計數器更像一個閥門,需要所有線程都到達,然後繼續執行,計數器遞增,提供Reset功能,可以多次使用。


  3. 多線程間通信的幾種方式


  提及多線程又不得不提及多線程通信的機制。首先,要短信線程間通信的模型有兩種:共享內存和消息傳遞,以下方式都是基本這兩種模型來實現的。我們來基本一道面試常見的題目來分析:


  題目:有兩個線程A、B,A線程向一個集合裡面依次添加元素"abc"字符串,一共添加十次,當添加到第五次的時候,希望B線程能夠收到A線程的通知,然後B線程執行相關的業務操作。


  3.1使用volatile關鍵字


  packagethread;


  /**


  *


  *@authorhxz


  *@description多線程測試類


  *@version1.0


  *@data2020年2月15日 上午9:10:09


  */


  publicclassMyThreadTest{


  publicstaticvoidmain(String[] args)throwsException{


  notifyThreadWithVolatile();


  }


  /**


  * 定義一個測試


  */


  privatestaticvolatilebooleanflag =false;


  /**


  * 計算I++,當I==5時,通知線程B


  *@throwsException


  */


  privatestaticvoidnotifyThreadWithVolatile()throwsException{


  Thread thc =newThread("線程A"){


  @Override


  publicvoidrun(){


  for(inti =; i


  if(i ==5) {


  flag =true;


  try{


  Thread.sleep(500L);


  }catch(InterruptedException e) {


  // TODO Auto-generated catch block


  e.printStackTrace();


  }


  break;


  }


  System.out.println(Thread.currentThread().getName() +"===="+ i);


  }


  }


  };


  Thread thd =newThread("線程B") {


  @Override


  publicvoidrun(){


  while(true) {


  // 防止偽喚醒 所以使用了while


  while(flag) {


  System.out.println(Thread.currentThread().getName() +"收到通知");


  System.out.println("do something");


  try{


  Thread.sleep(500L);


  }catch(InterruptedException e) {


  // TODO Auto-generated catch block


  e.printStackTrace();


  }


  return;


  }


  }


  }


  };


  thd.start();


  Thread.sleep(1000L);


  thc.start();


  }


  }


  小編個人認為這是基本上最好的通信方式,因為A發出通知B能夠立馬接受並Do Something。


分享到:


相關文章: