土法炮製:Embedding 層是如何實現的?

本文將介紹另一個典型層的實現:Embedding。

使用 Keras 的做法

同樣,按照老規矩,先看看 Keras 的 Embedding 使用,然後再用 TF 實現。這裡的示例也同樣來自於一個常見的例子,imdb 評論分類。

<code>import tensorflow as tffrom tensorflow.keras.datasets import imdbfrom tensorflow.keras import preprocessingmax_features = 10000maxlen = 20# 加載數據(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)# 預處理數據,將數據分割成相等長度x_train = preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)x_test = preprocessing.sequence.pad_sequences(x_test, maxlen=maxlen)# 模型定義model = tf.keras.models.Sequential([  tf.keras.layers.Embedding(max_features, 8, input_length=maxlen),  tf.keras.layers.Flatten(),  tf.keras.layers.Dense(1, activation='sigmoid')])model.compile(optimizer='rmsprop',              loss='binary_crossentropy',              metrics=['accuracy'])# 訓練model.fit(x_train, y_train, epochs=10, batch_size=32, validation_split=0.2)# 預測print(y_train[:3])print(model.predict_classes(x_test[:3]).flatten())/<code>

使用 TF 實現自定義結構

Embedding 本質上是一個全連接層,形式上,可以將其視為一個索引表,每一行對應一個單詞相應的稠密向量,如下圖(摘自 What the heck is Word Embedding ):

土法炮製:Embedding 層是如何實現的?

通常,會採用單詞在文件中的索引來編碼,因此從效果上來講,one-hot 後跟一個全連接層等價於一個 embedding 層,因為相乘之後,得到值就是單詞對應的向量,如下圖(摘自 What the heck is Word Embedding ):

土法炮製:Embedding 層是如何實現的?

介紹完基本情況,那麼接下來就看看如何實現吧。

MyDense

跟前幾篇的代碼差不多,換了一個激活函數。

<code>class MyDense(Layer):    def __init__(self, units=32):        super(MyDense, self).__init__()        self.units = units    def build(self, input_shape):        self.w = self.add_weight(shape=(input_shape[-1], self.units),            initializer='random_normal', trainable=True)        self.b = self.add_weight(shape=(self.units,),            initializer='random_normal', trainable=True)    def call(self, inputs):        return tf.nn.sigmoid(tf.matmul(inputs, self.w) + self.b)/<code>

MyEmbedding

自定義 Embedding 層,注意:

  • 繼承 Layer 的規範,實現 build 和 call 。
  • build 負責初始化權重和偏置,來自 Layer 。注意它們的 shape:權重:單詞個數 x Embedding 維數偏置:無,因為不需要
  • call 負責計算(返回對應 word index 的權重),來自 Layer 。
<code>class MyEmbedding(Layer):        def __init__(self, input_unit, output_unit):        super(MyEmbedding, self).__init__()        self.input_unit = input_unit        self.output_unit = output_unit    def build(self, input_shape):        self.embedding = self.add_weight(shape=(self.input_unit, self.output_unit),            initializer='random_normal', trainable=True)    def call(self, inputs):        return tf.nn.embedding_lookup(self.embedding, inputs)/<code>

MyFlatten

跟上一篇差不多,shape 變了而已。

<code>class MyFlatten(Layer):        def __init__(self):        super(MyFlatten, self).__init__()        def call(self, inputs):        shape = inputs.get_shape().as_list()        return tf.reshape(inputs, [shape[0], shape[1] * shape[2]])/<code>

MyModel

與前文代碼差別不大,在更新準確率之前需要對預測值做變換。

<code>class MyModel(Layer):    def __init__(self, layers):        super(MyModel, self).__init__()        self.layers = layers    def call(self, inputs):        x = self.layers[0](inputs)        for layer in self.layers[1:-1]:            x = layer(x)        return self.layers[-1](x)        def train(self, x_train, y_train, epochs = 5):        loss = tf.keras.losses.BinaryCrossentropy()        optimizer = tf.keras.optimizers.RMSprop()        accuracy = tf.keras.metrics.Accuracy()        dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))        dataset = dataset.shuffle(buffer_size=1024).batch(64)        for epoch in range(epochs):            for step, (x, y) in enumerate(dataset):                with tf.GradientTape() as tape:                    # Forward pass.                    y_pred = model(x)                                        # Loss value for this batch.                    loss_value = loss(y, y_pred)                    # Get gradients of loss wrt the weights.                    gradients = tape.gradient(loss_value, model.trainable_weights)                    # Update the weights of our linear layer.                    optimizer.apply_gradients(zip(gradients, model.trainable_weights))                                        # Update the running accuracy.                    accuracy.update_state(y, tf.cast(y_pred >= 0.5, dtype=tf.int64))                print('Epoch:', epoch, ', Loss from last epoch: %.3f' % loss_value, ', Total running accuracy so far: %.3f' % accuracy.result(), end='\\r')            print('\\n')/<code> 

搞定,接下來就看看效果了:

<code># 定義model = MyModel([    MyEmbedding(max_features, 8),    MyFlatten(),    MyDense(1)])# 訓練model.train(x_train, y_train, 10)# 預測print(y_train[:20])print(tf.cast(model(x_test[:20]) >= 0.5, dtype=tf.int64).numpy().flatten())/<code>

就結果而言,稍微有點過擬合,但基本可用了。


分享到:


相關文章: