如何區分IO密集型、CPU密集型任務?

前言

日常開發中,我們時常會聽到什麼IO密集型、CPU密集型任務...

那麼這裡提一個問題:大家知道什麼樣的任務或者代碼會被認定為IO/CPU密集?又是用什麼樣的標準來認定IO/CPU密集?

如果你沒有明確的答案,那麼就隨著這篇文章一起來聊一聊吧。

正文

最近團隊裡有基礎技術的同學對項目中的線程池進行了重新設計,調整了IO線程池等線程池的優化。因此藉助這個機會也就瞭解了一波開篇的那些問題。

一、宏觀概念區分

這一部分經驗豐富的同學都很熟悉。比如:

1.1、IO密集型任務

一般來說:文件讀寫、DB讀寫、網絡請求等

1.2、CPU密集型任務

一般來說:計算型代碼、Bitmap轉換、Gson轉換等

二、用代碼區分

上一part都是咱們憑藉經驗劃分的,這一part咱們就來用正經的指標來劃分任務。

先看有哪些數據指標可以用來進行評估(以下方法以系統日誌為準,加之開發經驗為輔):

1. wallTime

任務的整體運行時長(包括了running + runnable + sleep等所有時長)。獲取方案:

<code>run() {    
  long start = System.currentTimeMillis();    
  // 業務代碼    
  long wallTime = System.currentTimeMillis() - start;
}/<code>

2. cpuTime

cputime是任務真正在cpu上跑的時長,即為running時長

獲取方案1:

<code>run() {    
  long start = SystemClock.currentThreadTimeMillis();    
  // 業務代碼    
  long cpuTime = SystemClock.currentThreadTimeMillis() - start;
}/<code>

獲取方案2:

<code>/proc/pid/task/tid/schedse.sum_exec_runtime CPU上的運行時長/<code>
如何區分IO密集型、CPU密集型任務?

3. iowait time/count

指線程的iowait耗時。獲取方案:

<code>/proc/pid/task/tid/sched

se.statistics.iowait_sum IO等待累計時間
se.statistics.iowait_count IO等待累計次數/<code>

具體日誌位置同上

4. runnable time

線程runnabel被調度的時長。獲取方案:

<code>/proc/pid/task/tid/sched

se.statistics.wait_sum 就緒隊列等待累計時間/<code>

具體日誌位置同上

5. sleep time

線程阻塞時長(包括Interruptible-sleep和Uninterruptible-sleep和iowait的時長)。獲取方案:

<code>/proc/pid/task/tid/sched

se.statistics.sum_sleep_runtime 阻塞累計時間/<code>

具體日誌位置同上

6. utime/stime

utime是線程在用戶態運行時長,stime是線程在內核態運行時長。獲取方案:

<code>/proc/pid/task/tid/stat

第14個字段是utime,第15個字段是stime/<code>


如何區分IO密集型、CPU密集型任務?

7. rchar/wchar

wchar是write和pwrite函數寫入的byte數。獲取方案:

<code>/proc/pid/task/tid/io

rchar: ...wchar: .../<code>

(沒找到合適的日誌,暫不討論此情況)基於讀寫char數,我們可以將IO細分成讀IO密集型寫IO密集型

8. page_fault

缺頁中斷次數,分為major/minor fault。獲取方案:

<code>/proc/pid/task/tid/stat

第10個字段是minor_fault,第12個字段是major_fault/<code>


如何區分IO密集型、CPU密集型任務?


9. ctx_switches

線程在用戶/內核態的切換次數,分為voluntary和involuntary兩種切換。獲取方案:

<code>/proc/pid/task/tid/sched

nr_switches 總共切換次數
nr_voluntary_switches 自願切換次數
nr_involuntary_switches 非自願切換次數/<code>

日誌位置同上

10. percpuload

平均每個cpu的執行時長。獲取方案:

<code>/proc/pid/task/tid/sched

avg_per_cpu/<code>

日誌位置同上

有了上述這些指標,我們就可以開始我們的任務確定了。

以下內容,大家可以自行測試加深印象。

2.1、IO密集型任務

比如這段代碼:

<code>val br = BufferedReader(FileReader("xxxx"), 1024)
try {    
  while (br.readLine() != null) {
  }
} finally {    
  if (br != null) {        
    br.close()    
  }
}/<code>

基於上述部分3. iowait time/count,我們可以在對應的日誌文件中看出這段代碼有明顯的iowait

2.2、CPU密集型任務

比如這段代碼:

<code>var n = 0.0
for (i in 0..9999999) {    
  n = Math.cos(i.toDouble()
              )}/<code>

基於上述部分6. utime/stime的內容,看一看出這段代碼utime會佔比非常高,且幾乎沒有stime,此外沒有io相關的耗時。

三、這玩意有啥用?

說白了,我們一切的優化手段都是為了服務於業務。對於業務開發來說:

為了不佔用主線程 -> 所以啟一個新線程 -> 頻繁的new線程又會帶來大量的開銷 -> 所以使用線程池進行復用 -> 而不合理的線程池設計又會帶來線程使用低效,甚至新加入的任務只能等待 -> 優化線程池

舉個最簡單的例子:線程池中放了最大允許倆個線程並行,那麼假設運行中的倆個都是長IO的任務。那麼新來的任務就只能等,哪怕它並不是特別耗時...

因此這玩意有啥用,還不是為更好的線程池設計做指導思想,更好的提升線程運行效率,降低業務上不必要的等待。

這裡提供一些可供參考的工具方法和線程池設計:

3.1、判斷任務類型

這裡貼一些核心的思路,畢竟全部方案數據公司的代碼,我也不方便全部貼出來:

<code>class TaskInfo {    
  var cpuTimeStamp = 0.0    
 var timeStamp = 0.0    var iowaitTime = 0.0    var sleepTime = 0.0    var runnableTime = 0.0    var totalSwitches = 0.0    var voluntarySwitches = 0.0}/<code>
<code>object TaskInfoUtils {    
  private const val SUM_SLEEP_RUNTIME = "se.statistics.sum_sleep_runtime"    
  private const val WAIT_SUM = "se.statistics.wait_sum"    
  private const val IOWAIT_SUM = "se.statistics.iowait_sum"    
  private const val NR_SWITCHES = "nr_switches "    
  private const val NR_VOLUNTARY_SWITCHES = "nr_voluntary_switches"    
  private var schedPath = ThreadLocal()    
  
  fun buildCurTaskInfo(): TaskInfo {        
    val threadInfo = TaskInfo()        
    threadInfo.timeStamp = System.currentTimeMillis().toDouble()        
    threadInfo.cpuTimeStamp = SystemClock.currentThreadTimeMillis().toDouble()        
    if (schedPath.get() == null) {            
      schedPath.set("/proc/${android.os.Process.myPid()}/task/${getTid()}/sched")        
    }        
    BufferedReader(FileReader(schedPath.get()), 
                   READ_BUFFER_SIZE).use { br ->            br.readLines().forEach { line ->                when {                    line.startsWith(SUM_SLEEP_RUNTIME) -> threadInfo.sleepTime = line.split(":")[1].toDouble()                    line.startsWith(WAIT_SUM) -> threadInfo.runnableTime = line.split(":")[1].toDouble()                    line.startsWith(IOWAIT_SUM) -> threadInfo.iowaitTime = line.split(":")[1].toDouble()                    line.startsWith(NR_SWITCHES) -> threadInfo.totalSwitches = line.split(":")[1].toDouble()                    line.startsWith(NR_VOLUNTARY_SWITCHES) -> threadInfo.voluntarySwitches = line.split(":")[1].toDouble()                }            }        }        return threadInfo    }}/<code>
<code>object TaskBoundJudge {
    private const val CPU_CPUTIME_INTERVAL = 0.8
    private const val CPU_SWITCHES_INTERVAL = 0.1
    private const val CPU_IOWAIT_INTERVAL = 0.01
    private const val CPU_SLEEP_INTERVAL = 0.02
    private const val CPU_CPUTIME_WEIGHTS = 0.1
    private const val CPU_SWITCHES_WEIGHTS = 0.35
    private const val CPU_IOWAIT_WEIGHTS = 0.15
    private const val CPU_SLEEP_WEIGHTS = 0.40

    private const val IO_CPUTIME_INTERVAL = 0.5
    private const val IO_SWITCHES_INTERVAL = 0.4
    private const val IO_IOWAIT_INTERVAL = 0.1
    private const val IO_SLEEP_INTERVAL = 0.15
    private const val IO_CPUTIME_WEIGHTS = 0.1
    private const val IO_SWITCHES_WEIGHTS = 0.35
    private const val IO_IOWAIT_WEIGHTS = 0.35
    private const val IO_SLEEP_WEIGHTS = 0.2

    fun isCpuTask(start: TaskInfo?, end: TaskInfo?): Boolean {
        if (start == null || end == null) {
            return false
        }
        val wallTime  = end.timeStamp - start.timeStamp
        val cpuTime = end.cpuTimeStamp - start.cpuTimeStamp
        val runnableTime = end.runnableTime - start.runnableTime
        val totalSwitches = end.totalSwitches - start.totalSwitches
        val voluntarySwitches = end.voluntarySwitches - start.voluntarySwitches
        val iowaitTime = end.iowaitTime - start.iowaitTime
        val sleepTime = end.sleepTime - start.sleepTime
        var result = 0.0
        if (cpuTime / (wallTime - runnableTime) > CPU_CPUTIME_INTERVAL) {
            result += CPU_CPUTIME_WEIGHTS
        }
        if (voluntarySwitches / totalSwitches < CPU_SWITCHES_INTERVAL) {
            result += CPU_SWITCHES_WEIGHTS
        }
        if (iowaitTime / sleepTime < CPU_IOWAIT_INTERVAL) {
            result += CPU_IOWAIT_WEIGHTS
        }
        if (sleepTime / (wallTime - runnableTime) < CPU_SLEEP_INTERVAL) {
            result += CPU_SLEEP_WEIGHTS
        }
        return result > 0.5
    }

    fun isIOTask(start: TaskInfo?, end: TaskInfo?): Boolean {
        if (start == null || end == null) {
            return false
        }
        val wallTime = end.timeStamp - start.timeStamp
        val cpuTime = end.cpuTimeStamp - start.cpuTimeStamp
        val runnableTime = end.runnableTime - start.runnableTime
        val totalSwitches = end.totalSwitches - start.totalSwitches
        val voluntarySwitches = end.voluntarySwitches - start.voluntarySwitches
        val iowaitTime = end.iowaitTime - start.iowaitTime
        val sleepTime = end.sleepTime - start.sleepTime

        var result = 0.0
        if (cpuTime / (wallTime - runnableTime) < IO_CPUTIME_INTERVAL) {
            result += IO_CPUTIME_WEIGHTS
        }
        if (voluntarySwitches / totalSwitches > IO_SWITCHES_INTERVAL) {
            result += IO_SWITCHES_WEIGHTS
        }
        if (iowaitTime / sleepTime > IO_IOWAIT_INTERVAL) {
            result += IO_IOWAIT_WEIGHTS
        }
        if (sleepTime / (wallTime - runnableTime) > IO_SLEEP_INTERVAL) {
            result += IO_SLEEP_WEIGHTS
        }
        return result > 0.5
    }
}/<code>

當我們想對某個方法進行計算是CPU還是IO。可以在這個方法的開始、結束調用 TaskInfoUtils.buildCurTaskInfo();然後調用 TaskBoundJudge.isCpuTask(start,end), TaskBoundJudge.isIOTask(start,end)即可。

3.2、線程池

IO密集型參考線程池:

<code>public static final ExecutorService IO_EXECUTOR = new ThreadPoolExecutor(
    2, 
    128, 
    15, 
    TimeUnit.SECONDS, 
    new SynchronousQueue<>(), 
    new CustomThreadFactory("MDove-IO", CustomThreadPriority.NORMAL),
    AbortPolicy() // 根據業務情況,自行定義拒絕實現。比如上報監控平臺
);/<code>

CPU密集型參考線程池:

<code>public static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
public static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int CPU_CORE_POOL_SIZE = Math.max(Math.min(MAXIMUM_POOL_SIZE, 4), Math.min(CPU_COUNT + 1, 9));

public static final ExecutorService CPU_EXECUTOR = new ThreadPoolExecutor(
    CPU_CORE_POOL_SIZE,
    CPU_COUNT * 2 + 1,
    30, 
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(256),
    new SSThreadFactory("MDove-CPU", CustomThreadPriority.NORMAL),
    AbortPolicy() // 根據業務情況,自行定義拒絕實現。比如上報監控平臺
);/<code>

上述線程池中設計的額外代碼:

<code>class CustomThreadFactory : ThreadFactory {
    var name: String
        private set
    private var priority = CustomThreadPriority.NORMAL

    constructor(name: String, priority: CustomThreadPriority) {
        this.name = name
        this.priority = priority
    }

    override fun newThread(r: Runnable): Thread {
        val name = name + "-" + sCount.incrementAndGet()
        return object : Thread(r, name) {
            override fun run() {
                if (priority == CustomThreadPriority.LOW) {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)
                } else if (priority == CustomThreadPriority.HIGH) {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_DISPLAY)
                }
                super.run()
            }
        }
    }

    companion object {
        private val sCount = AtomicInteger(0)
    }
}

enum class CustomThreadPriority {
    LOW, NORMAL, HIGH, IMMEDIATE
}/<code>

尾聲

OK,這篇文章到這裡就結束了。希望這篇文章能給大家在線程的使用線程池的設計上帶來幫助。

最後,讓我們一起加油吧,“打工人”!


分享到:


相關文章: