編寫高效的PyTorch代碼技巧(下)

原文:https://github.com/vahidk/EffectivePyTorch

前言

這是一份 PyTorch 教程和最佳實踐筆記,目錄如下所示:

  1. PyTorch 基礎
  2. 將模型封裝為模塊
  3. 廣播機制的優缺點
  4. 使用好重載的運算符
  5. 採用 TorchScript 優化運行時間
  6. 構建高效的自定義數據加載類
  7. PyTorch 的數值穩定性

上篇文章的鏈接如下:

https://www.toutiao.com/i6822179592162247171/

這次介紹後面3點,寫出高效的代碼以及保證做數值計算時候的穩定性。

5. 採用 TorchScript 優化運行時間

PyTorch 優化了維度很大的張量的運算操作。在 PyTorch 中對小張量進行太多的運算操作是非常低效的。所以有可能的話,將計算操作都重寫為批次(batch)的形式,可以減少消耗和提高性能。而如果沒辦法自己手動實現批次的運算操作,那麼可以採用 TorchScript 來提升代碼的性能。

TorchScript 是一個 Python 函數的子集,但經過了 PyTorch 的驗證,PyTorch 可以通過其 just in time(jtt) 編譯器來自動優化 TorchScript 代碼,提高性能。

下面給出一個具體的例子。在機器學習應用中非常常見的操作就是 batch gather ,也就是 output[i] = input[i, index[i]]。其代碼實現如下所示:

<code>import torch
def batch_gather(tensor, indices):
    output = []
    for i in range(tensor.size(0)):
        output += [tensor[i][indices[i]]]
    return torch.stack(output)/<code>

通過 torch.jit.script 裝飾器來使用 TorchScript 的代碼:

<code>@torch.jit.script
def batch_gather_jit(tensor, indices):
    output = []
    for i in range(tensor.size(0)):
        output += [tensor[i][indices[i]]]
    return torch.stack(output)/<code>

這個做法可以提高 10% 的運算速度。

但更好的做法還是手動實現批次的運算操作,下面是一個向量化實現的代碼例子,提高了 100 倍的速度:

<code>def batch_gather_vec(tensor, indices):
    shape = list(tensor.shape)
    flat_first = torch.reshape(
        tensor, [shape[0] * shape[1]] + shape[2:])
    offset = torch.reshape(
        torch.arange(shape[0]).cuda() * shape[1],
        [shape[0]] + [1] * (len(indices.shape) - 1))
    output = flat_first[indices + offset]
    return output/<code>

6. 構建高效的自定義數據加載類

上一節介紹瞭如何寫出更加高效的 PyTorch 的代碼,但為了讓你的代碼運行更快,將數據更加高效加載到內存中也是非常重要的。幸運的是 PyTorch 提供了一個很容易加載數據的工具,即 DataLoader 。一個 DataLoader 會採用多個 workers 來同時將數據從 Dataset 類中加載,並且可以選擇使用 Sampler 類來對採樣數據和組成 batch 形式的數據。

如果你可以隨時訪問你的數據,那麼使用 DataLoader 會非常簡單:只需要繼承 Dataset 類別並實現 __getitem__ (讀取每個數據)和 __len__(返回數據集的樣本數量)這兩個方法。下面給出一個代碼例子,如何從給定的文件夾中加載圖片數據:

<code>import glob
import os
import random
import cv2
import torch

class ImageDirectoryDataset(torch.utils.data.Dataset):
    def __init__(path, pattern):
        self.paths = list(glob.glob(os.path.join(path, pattern)))

    def __len__(self):
        return len(self.paths)

    def __item__(self):
        path = random.choice(paths)
        return cv2.imread(path, 1)/<code>

比如想將文件夾內所有的 jpeg 圖片都加載,代碼實現如下所示:

<code>dataloader = torch.utils.data.DataLoader(ImageDirectoryDataset("/data/imagenet/*.jpg"), num_workers=8)
for data in dataloader:
    # do something with data/<code>

這裡採用了 8 個 workers 來並行的從硬盤中讀取數據。這個數量可以根據實際使用機器來進行調試,得到一個最佳的數量。

當你的數據都很大或者你的硬盤讀寫速度很快,採用DataLoader進行隨機讀取數據是可行的。但也可能存在一種情況,就是使用的是一個很慢的連接速度的網絡文件系統,請求單個文件的速度都非常的慢,而這可能就是整個訓練過程中的瓶頸。

一個更好的做法就是將數據保存為一個可以連續讀取的連續文件格式。例如,當你有非常大量的圖片數據,可以採用 tar 命令將其壓縮為一個文件,然後用 python 來從這個壓縮文件中連續的讀取圖片。要實現這個操作,需要用到 PyTorch 的 IterableDataset。創建一個 IterableDataset 類,只需要實現 __iter__ 方法即可。

下面給出代碼實現的例子:

<code>import tarfile
import torch

def tar_image_iterator(path):
    tar = tarfile.open(self.path, "r")
    for tar_info in tar:
        file = tar.extractfile(tar_info)
        content = file.read()
        yield cv2.imdecode(content, 1)
        file.close()
        tar.members = []
    tar.close()

class TarImageDataset(torch.utils.data.IterableDataset):
    def __init__(self, path):
        super().__init__()
        self.path = path

    def __iter__(self):
        yield from tar_image_iterator(self.path)/<code>

不過這個方法有一個問題,

當使用 DataLoader 以及多個 workers 讀取這個數據集的時候,會得到很多重複的數據:

<code>dataloader = torch.utils.data.DataLoader(TarImageDataset("/data/imagenet.tar"), num_workers=8)
for data in dataloader:
    # data contains duplicated items/<code>

這個問題主要是因為每個 worker 都會創建一個單獨的數據集的實例,並且都是從數據集的起始位置開始讀取數據。一種避免這個問題的辦法就是不是壓縮為一個tar 文件,而是將數據劃分成 num_workers 個單獨的 tar 文件,然後每個 worker 分別加載一個,代碼實現如下所示:

<code>class TarImageDataset(torch.utils.data.IterableDataset):
    def __init__(self, paths):
        super().__init__()
        self.paths = paths

    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        # For simplicity we assume num_workers is equal to number of tar files
        if worker_info is None or worker_info.num_workers != len(self.paths):
            raise ValueError("Number of workers doesn't match number of files.")
        yield from tar_image_iterator(self.paths[worker_info.worker_id])/<code>

所以使用例子如下所示:

<code>dataloader = torch.utils.data.DataLoader(
    TarImageDataset(["/data/imagenet_part1.tar", "/data/imagenet_part2.tar"]), num_workers=2)
for data in dataloader:
    # do something with data/<code>

這是一種簡單的避免重複數據的問題。而 tfrecord 則用了比較複雜的辦法來共享數據,具體可以查看:

https://github.com/vahidk/tfrecord

7. PyTorch 的數值穩定性

當使用任意一個數值計算庫,比如 NumPy 或者 PyTorch ,都需要知道一點,編寫數學上正確的代碼不一定會得到正確的結果,你需要確保這個計算是穩定的。

首先以一個簡單的例子開始。從數學上來說,對任意的非零 x ,都可以知道式子 是成立的。 但看看具體實現的時候,是不是總是正確的:

<code>import numpy as np

x = np.float32(1)

y = np.float32(1e-50)  # y would be stored as zero
z = x * y / y

print(z)  # prints nan/<code>

代碼的運行結果是打印 nan ,原因是 y 的數值對於 float32 類型來說非常的小,這導致它的實際數值是 0 而不是 1e-50。

另一種極端情況就是 y 非常的大:

<code>y = np.float32(1e39)  # y would be stored as inf
z = x * y / y

print(z)  # prints nan/<code>

輸出結果依然是 nan ,因為 y 太大而被存儲為 inf 的情況,對於 float32 類型來說,其範圍是 1.4013e-45 ~ 3.40282e+38,當超過這個範圍,就會被置為 0 或者 inf。

下面是如何查看一種數據類型的數值範圍:

<code>print(np.nextafter(np.float32(0), np.float32(1)))  # prints 1.4013e-45
print(np.finfo(np.float32).max)  # print 3.40282e+38/<code>

為了讓計算變得穩定,需要避免過大或者過小的數值。這看起來很容易,但這類問題是很難進行調試,特別是在 PyTorch 中進行梯度下降的時候。這不僅因為需要確保在前向傳播過程中的所有數值都在使用的數據類型的取值範圍內,還要保證在反向傳播中也做到這一點。

下面給出一個代碼例子,計算一個輸出向量的 softmax,一種不好的代碼實現如下所示:

<code>import torch

def unstable_softmax(logits):
    exp = torch.exp(logits)
    return exp / torch.sum(exp)

print(unstable_softmax(torch.tensor([1000., 0.])).numpy())  # prints [ nan, 0.]/<code>

這裡計算 logits 的指數數值可能會得到超出 float32 類型的取值範圍,即過大或過小的數值,這裡最大的 logits 數值是 ln(3.40282e+38) = 88.7,超過這個數值都會導致 nan 。

那麼應該如何避免這種情況,做法很簡單。因為有 ,也就是我們可以對 logits 減去一個常量,但結果保持不變,所以我們選擇logits 的最大值作為這個常數,這種做法,指數函數的取值範圍就會限制為 [-inf, 0] ,然後最終的結果就是 [0.0, 1.0] 的範圍,代碼實現如下所示:

<code>import torch

def softmax(logits):
    exp = torch.exp(logits - torch.reduce_max(logits))
    return exp / torch.sum(exp)

print(softmax(torch.tensor([1000., 0.])).numpy())  # prints [ 1., 0.]/<code>

接下來是一個更復雜點的例子。

假設現在有一個分類問題。我們採用 softmax 函數對輸出值 logits 計算概率。接著定義採用預測值和標籤的交叉熵作為損失函數。對於一個類別分佈的交叉熵可以簡單定義為 :

所以有一個不好的實現交叉熵的代碼實現為:

<code>def unstable_softmax_cross_entropy(labels, logits):
    logits = torch.log(softmax(logits))
    return -torch.sum(labels * logits)

labels = torch.tensor([0.5, 0.5])
logits = torch.tensor([1000., 0.])

xe = unstable_softmax_cross_entropy(labels, logits)

print(xe.numpy())  # prints inf/<code>

在上述代碼實現中,當 softmax 結果趨向於 0,其 log 輸出會趨向於無窮,這就導致計算結果的不穩定性。所以可以對其進行重寫,將 softmax 維度拓展並做一些歸一化的操作:

<code>def softmax_cross_entropy(labels, logits, dim=-1):
    scaled_logits = logits - torch.max(logits)
    normalized_logits = scaled_logits - torch.logsumexp(scaled_logits, dim)
    return -torch.sum(labels * normalized_logits)

labels = torch.tensor([0.5, 0.5])
logits = torch.tensor([1000., 0.])

xe = softmax_cross_entropy(labels, logits)

print(xe.numpy())  # prints 500.0/<code> 

可以驗證計算的梯度也是正確的:

<code>logits.requires_grad_(True)
xe = softmax_cross_entropy(labels, logits)
g = torch.autograd.grad(xe, logits)[0]
print(g.numpy())  # prints [0.5, -0.5]/<code>

這裡需要再次提醒,進行梯度下降操作的時候需要額外的小心謹慎,需要確保每個網絡層的函數和梯度的範圍都在合法的範圍內,指數函數和對數函數在不正確使用的時候都可能導致很大的問題,它們都能將非常小的數值轉換為非常大的數值,或者從很大變為很小的數值。


分享到:


相關文章: