TensorFlow做Sparse Machine Learning

TensorFlow Sparse現狀及背景

在機器學習這塊,Estimator本身的封裝能夠適應比較多的Dense的場景,而對於Sparse的場景無論是官方demo還是一些業界的大牛都分享的比較少,在很多場景,比如libfm、libffm、Xgboost都支持直接libsvm, field-libsvm的格式中讀入數據,訓練模型沒有原始的實現,沒法直接調包使用,得自己在TensorFlow的框架上構造,所幸Estimator本身的框架支持自定義的input_fn,和自定義的model_fn,筆者過去一段時間工作之餘研究了下,並實現了基於libsvm的Sparse Logistic Regression和Sparse Factorization Machine的一套比較高效的流程, 打通了從數據讀取、模型訓練、到TensorFlow Serving的部署。

TensorFlow中的sparse_tensor實現

我們讀下sparse_tensor的源碼,sparse_tensor.py, 很容易看出來sparse_tensor在TensorFlow中是一個高層的封裝,主要包括indices, values, shape三個部分,這裡很有意思,後面我實踐中遇到一個大坑,可以通過這裡解決,這裡我先賣個關子;

sparse representation的好處

常見的稀疏矩陣的表示有csc,csr,在很多矩陣計算的庫當中有使用,比如python中大家使用比較多的scipy,TensorFlow底層計算模塊eigen,都是用類似的方式來表示稀疏矩陣,舉個例子比如某個商戶有500萬個商品,而用戶產生行為的商品必定遠遠小於500萬,如果都是用dense表示,那麼保存單個用戶行為的商品數據需要500萬個指,而採用稀疏數據表示則保存所需要的空間只需要和你才產生行為的商品數量有關,如下圖100個用戶的在500w上的行為數據如果用dense表示需要大概3G的空間;

TensorFlow做Sparse Machine Learning

需要保存100*5000000個int,而使用csc_matrix,

<code>row = np.array(range(100))
col = np.zeros(100)
data = np.ones(100)
csc_matrix((data, (row, col)), shape=(100, 5000000))/<code>

我們只需要保存3*NNZ(這裡就是100)個int,然後加上一個shape信息,空間佔用大大減少;在內存中,我們通常使用csc來表示Sparse Matrix,而在樣本保存中,通常使用libsvm格式來保存

TensorFlow做Sparse Machine Learning

以空格為sep,label為1, 後續為feature的表示,格式為feature_id: feature_val, 在TensorFlow中我們可以使用TextlineDataset自定義input_fn來解析文本,其他很多相關的技術文章都有提及,但是作為一個程序員總感覺不想走已經走過的路,而且TF官宣tfrecord的讀寫效率高, 考慮到效率問題,我這裡使用TFRecordDataset來做數據的讀取;

LibSVM To TFRecord

解析LibSVM feature_ids, 和feature_vals, 很簡單沒有啥好說的, 直接貼代碼,想要深入瞭解的,可以去看看TF的example.proto, feature.proto, 就大概能瞭解Example和Feature的邏輯了,不用悶悶地只知道別人是這樣寫的。

<code>import codecs
import tensorflow as tf
import logging
logger = logging.getLogger("TFRecSYS")
sh = logging.StreamHandler(stream=None)
logger.setLevel(logging.DEBUG)
fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(process)d %(message)s"
datefmt = "%a %d %b %Y %H:%M:%S"
formatter = logging.Formatter(fmt, datefmt)
sh.setFormatter(formatter)
logger.addHandler(sh)

class LibSVM2TFRecord(object):
def __init__(self, libsvm_filenames, tfrecord_filename, info_interval=10000, tfrecord_large_line_num = 10000000):
self.libsvm_filenames = libsvm_filenames
self.tfrecord_filename = tfrecord_filename
self.info_interval = info_interval
self.tfrecord_large_line_num = tfrecord_large_line_num

def set_transform_files(self, libsvm_filenames, tfrecord_filename):
self.libsvm_filenames = libsvm_filenames
self.tfrecord_filename = tfrecord_filename

def fit(self):

logger.info(self.libsvm_filenames)
writer = tf.python_io.TFRecordWriter(self.tfrecord_filename+".tfrecord")
tfrecord_num = 1
for libsvm_filename in self.libsvm_filenames:
logger.info("Begin to process {0}".format(libsvm_filename))
with codecs.open(libsvm_filename, mode='r', encoding='utf-8') as fread:
line = fread.readline()
line_num = 0
while line:
line = fread.readline()
line_num += 1
if line_num % self.info_interval == 0:
logger.info("Processing the {0} line sample".format(line_num))
if line_num % self.tfrecord_large_line_num == 0:
writer.close()
tfrecord_file_component = self.tfrecord_filename.split(".")
self.tfrecord_filename = self.tfrecord_filename.split("_")[0]+"_%05d.tfrecord"%tfrecord_num
writer = tf.python_io.TFRecordWriter(self.tfrecord_filename)
tfrecord_num += 1
logger.info("Change the tfrecord file to {0}".format(self.tfrecord_filename))
feature_ids = []
vals = []
line_components = line.strip().split(" ")
try:
label = float(line_components[0])
features = line_components[1:]
except IndexError:
logger.info("Index Error, line: {0}".format(line))
continue
for feature in features:
feature_components = feature.split(":")
try:
feature_id = int(feature_components[0])
val = float(feature_components[1])
except IndexError:
logger.info("Index Error: , feature_components: {0}",format(feature))
continue
except ValueError:
logger.info("Value Error: feature_components[0]: {0}".format(feature_components[0]) )
feature_ids.append(feature_id)
vals.append(val)
tfrecord_feature = {
"label" : tf.train.Feature(float_list=tf.train.FloatList(value=[label])),
"feature_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feature_ids)),
"feature_vals": tf.train.Feature(float_list=tf.train.FloatList(value=vals))
}
example = tf.train.Example(features=tf.train.Features(feature=tfrecord_feature))
writer.write(example.SerializeToString())
writer.close()
logger.info("libsvm: {0} transform to tfrecord: {1} successfully".format(libsvm_filename, self.tfrecord_filename))


if __name__ == "__main__":
libsvm_to_tfrecord = LibSVM2TFRecord(["../../data/kdd2010/kdda.libsvm"], "../../data/kdd2010/kdda")
libsvm_to_tfrecord.fit()/<code>
TensorFlow做Sparse Machine Learning

轉成tfrecord文件之後,通常比原始的文件要大一些,具體的格式的說明參考下https://cloud.tencent.com/developer/article/1088751 這篇文章比較詳細地介紹了轉tfrecord和解析tfrecord的用法,另外關於shuffle的buff size的問題,個人感覺問題並不大,在推薦場景下,數據條數多,其實內存消耗也不大,只是在運行前會有比較長載入解析的時間,另外一個問題是,大家應該都會提問的,為啥tfrecord會比自己寫input_fn去接下文本文件最後來的快呢?這裡我只能淺層意義上去猜測,這部分代碼沒有拎出來讀過,所以不做回覆哈,有讀過源碼,瞭解比較深的同學可以解釋下

TFRecord的解析

TensorFlow做Sparse Machine Learning

個人讀了一些解析tfrecord的幾個格式的源碼,現在還有點亂,大概現在貌似代碼中有支持VarLenFeature, SparseFeature, FixedLenFeature, FixedLenSequenceFeature這幾種,但是幾個api的說明裡面貌似對sparsefeature的支持有點磨礪兩可,所以選擇使用VarLenFeature上面的方式, 不知道這裡SparseFeature是怎麼玩的,有時間還得仔細看看。

然後,簡單寫個讀取的demo:

TensorFlow做Sparse Machine Learning

大家可以動手跑跑看,仔細研究的話會發現一些比較有意思的東西,比如VarLenFeature出來的是一個SparseTensor:

TensorFlow做Sparse Machine Learning

這裡我最開始是打算每次sess.run,然後轉換為numpy.array, 然後再喂feed_dict到模型,但是覺得這樣會很麻煩,速度會是瓶頸,如果能過直接使用這裡的SparseTensor去做模型的計算,直接從tfrecord解析,應該會比較好,但是又會遇到另一個問題,後面再詳細說明;這裡簡單提下,我這邊就是直接拿到兩個SparseTensor,直接去到模型,所以模型的設計會和常規的算法會有不同;

Sparse Model的高效實現

<code>import tensorflow as tf
class SparseFactorizationMachine(object):
def __init__(self, model_name="sparse_fm"):
self.model_name = model_name

def build(self, features, labels, mode, params):
print("export features {0}".format(features))
print(mode)
if mode == tf.estimator.ModeKeys.PREDICT:
sp_indexes = tf.SparseTensor(indices=features['DeserializeSparse:0'],
values=features['DeserializeSparse:1'],
dense_shape=features['DeserializeSparse:2'])
sp_vals = tf.SparseTensor(indices=features['DeserializeSparse_1:0'],
values=features['DeserializeSparse_1:1'],
dense_shape=features['DeserializeSparse_1:2'])
if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:
sp_indexes = features['feature_ids']
sp_vals = features['feature_vals']
print("sp: {0}, {1}".format(sp_indexes, sp_vals))
batch_size = params["batch_size"]
feature_max_num = params["feature_max_num"]
optimizer_type = params["optimizer_type"]
factor_vec_size = params["factor_size"]
bias = tf.get_variable(name="b", shape=[1], initializer=tf.glorot_normal_initializer())
w_first_order = tf.get_variable(name='w_first_order', shape=[feature_max_num, 1], initializer=tf.glorot_normal_initializer())
linear_part = tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum") + bias
w_second_order = tf.get_variable(name='w_second_order', shape=[feature_max_num, factor_vec_size], initializer=tf.glorot_normal_initializer())
embedding = tf.nn.embedding_lookup_sparse(w_second_order, sp_indexes, sp_vals, combiner="sum")

embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum")
sum_square = tf.square(embedding)
second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1)
y_hat = linear_part + tf.expand_dims(second_part, -1)
predictions = tf.sigmoid(y_hat)
print "y_hat: {0}, second_part: {1}, linear_part: {2}".format(y_hat, second_part, linear_part)
pred = {"prob": predictions}
export_outputs = {
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)
}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=tf.squeeze(y_hat)))
if optimizer_type == "sgd":
opt = tf.train.GradientDescentOptimizer(learning_rate=params['learning_rate'])
elif optimizer_type == "ftrl":
opt = tf.train.FtrlOptimizer(learning_rate=params['learning_rate'],)
elif optimizer_type == "adam":
opt = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
elif optimizer_type == "momentum":
opt = tf.train.MomentumOptimizer(learning_rate=params['learning_rate'], momentum=params['momentum'])
train_step = opt.minimize(loss,global_step=tf.train.get_global_step())
eval_metric_ops = {
"auc" : tf.metrics.auc(labels, predictions)
}

if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, loss=loss, train_op=train_step)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, loss=loss, eval_metric_ops=eval_metric_ops)/<code>

這裡講個Factorization Machine的實現,會比Sparse Logistic Regression的實現要稍微複雜一點,首先,模型的算法實現,比較簡單,隨便搜下應該大概都知道Factorization Machine的算法原理,fm主要包括兩個部分,一個是LogisticRegression的部分,包括bias和一階特徵,另外一部分是把每一維特徵表示為一個指定大小的vector,去從樣本中去學習對訓練有效的交叉信息:

<code>bias = tf.get_variable(name="b", shape=[1], initializer=tf.glorot_normal_initializer())
w_first_order = tf.get_variable(name='w_first_order', shape=[feature_max_num, 1], initializer=tf.glorot_normal_initializer())

linear_part = tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum") + bias
w_second_order = tf.get_variable(name='w_second_order', shape=[feature_max_num, factor_vec_size], initializer=tf.glorot_normal_initializer())
embedding = tf.nn.embedding_lookup_sparse(w_second_order, sp_indexes, sp_vals, combiner="sum")
embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum")
sum_square = tf.square(embedding)
second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1)
y_hat = linear_part + tf.expand_dims(second_part, -1)
predictions = tf.sigmoid(y_hat)/<code>

這裡和普通的fm唯一不同的是,我使用tf.nn.embedding_lookup_sparse 來計算WX,在海量特徵維度的前提下,做全部的WX相乘是耗時,且沒有必要的,我們只需要取出其中有值的部分來計算即可,比如kdd2010,20216831的特徵,但是計算WX其實就會考驗系統的瓶頸,但是如果經過一個簡單的tf.nn.embedding_lookup_sparse來替代WX,就會先lookup feature_id,對應的embedding的表示,然後乘以相應的weight,最後在每一個樣本上進行一個combiner(sum)的操作,其實就是等同於WX,tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum"), 而在系統方面,由於計算只與NNZ(非零數)有關, 性能則完全沒有任何壓力。二階的部分可以降低時間複雜度,相信應該瞭解FM的都知道,和的平方減去平方的和:

<code>embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum")
sum_square = tf.square(embedding)
second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1)/<code>

由上面的實現,我們只需要把特徵的sp_indexes, sp_val傳出來就可以了, 但是因為這兩者都是SparseTensor,筆者開始想到的不是上述的實現,而是使用tf.sparse.placeholder, 然後喂一個feed_dict,對應SparseTensorValue就可以了,確實是可以的,模型訓練沒有問題,模型export出來也沒有問題(其實是有問題的, 我這裡重寫了Estimator的build_raw_serving_input_receiver_fn使其支持SparseTensor),但是在部署好TensorFlow Serving之後,我發現在客戶端SparseTensorValue貌似不能組成一個TensorProto,tf.make_tensor_proto主要是把請求的值放進一個TensorProto,而TensorProto, https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/framework/tensor.proto,貌似不能直接支持SparseTensorValue去放進TensorProto,所以就無法在部署好TensorFlow Serving後去請求(部署會在後文詳細描述,這裡我也想過能不能改他們的代碼,但是貌似涉及太底層的東西,有點hold不住),但是也是有辦法的,前面文章提到SparseTensor,在TensorFlow中是高階的api,他其實就是由3個Tensor組成,是否可以把SparseTensor本身的3個Tensor暴露出來,然後請求的時候去組這三個Tensor就可以啦,所以只需要找到TFRecord接下出來的sp_indexes, sp_vals就可以了

TensorFlow做Sparse Machine Learning

從這裡很容易看到sp_indexes, sp_vals的TensorName,然後用佔位符替代,然後用這些去組成sp_indexes,sp_vals

TensorFlow做Sparse Machine Learning

TensorFlow做Sparse Machine Learning

說明下,這裡我使用的kdd2010的數據,特徵維度是20216831,樣本數量8407752,我是用我15年的macbook pro跑的, 使用的sgd, 收斂還是比較明顯的, 大家有興趣可以試試,按以往經驗使用其他優化器如adam,ftrl會在這種特徵規模比較大的條件下有比較好的提升,我這裡就走通整個流程,另外機器也不忍心折騰;到了這裡,就訓練出來了一個可用的Sparse FM的模型,接下來要導出模型,這裡的導出模型是導出一個暴露了placeholder的模型,可以在TensorFlow Serving被載入,被請求,不是單純的ckpt;

模型部署

<code>feature_spec = {
'DeserializeSparse:0': tf.placeholder(dtype=tf.int64, name='feature_ids/indices'),
'DeserializeSparse:1': tf.placeholder(dtype=tf.int64, name='feature_ids/values'),
'DeserializeSparse:2': tf.placeholder(dtype=tf.int64, name='feaurte_ids/shape'),
'DeserializeSparse_1:0': tf.placeholder(dtype=tf.int64, name='feature_vals/indices'),
'DeserializeSparse_1:1': tf.placeholder(dtype=tf.float32, name='feature_vals/values'),
'DeserializeSparse_1:2': tf.placeholder(dtype=tf.int64, name='feature_vals/shape')
}
serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec, is_sparse=False)
sparse_fm_model.export_savedmodel(servable_model_dir, serving_input_receiver_fn, as_text=True)/<code>

和前面構造模型的時候對應,只需要把DeserializeSparse的部分暴露出來即可

TensorFlow做Sparse Machine Learning

這裡會以時間戳創建模型,保存成功後temp-1543117151會變為1543117151,接下來,就是要啟動TensorFlow Serving載入模型:docker run -p 8500:8500 --mount type=bind,source=/Users/burness/work/tencent/TFRecSYS/TFRecSYS/runner/save_model,target=/models/ -e MODEL_NAME=sparse_fm -t tensorflow/serving,使用官方提供的docker鏡像來部署環境很方便。

TensorFlow做Sparse Machine Learning

會先載入新的模型,然後unload舊模型,從命令行log信息可以看出gRPC接口為8500剩下的,就下一個client,去請求

<code>import grpc
import sys
sys.path.insert(0, "./")
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import tensorflow as tf
from tensorflow.python.framework import dtypes
import time

import numpy as np
from sklearn import metrics
def get_sp_component(file_name):
with open(file_name, "r") as fread:
for line in fread.readlines():
fea_ids = []
fea_vals = []
line_components = line.strip().split(" ")
label = float(line_components[0])
for part in line_components[1:]:
part_components = part.split(":")
fea_ids.append(int(part_components[0]))
fea_vals.append(float(part_components[1]))
yield (label, fea_ids, fea_vals)
def batch2sparse_component(fea_ids, fea_vals):
feature_id_indices = []
feature_id_values = []
feature_vals_indices = []
feature_vals_values = []
for index, id in enumerate(fea_ids):
feature_id_values += id
for i in range(len(id)):
feature_id_indices.append([index, i])
for index, val in enumerate(fea_vals):
feature_vals_values +=val
for i in range(len(val)):
feature_vals_indices.append([index, i])
return np.array(feature_id_indices, dtype=np.int64), np.array(feature_id_values, dtype=np.int64), np.array(feature_vals_indices, dtype=np.int64), np.array(feature_vals_values, dtype=np.float32)
if __name__ == '__main__':
start_time = time.time()
channel = grpc.insecure_channel("127.0.0.1:8500")
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = predict_pb2.PredictRequest()
request.model_spec.name = "sparse_fm"
record_genertor = get_sp_component("../../data/kdd2010/kdda_t.libsvm")
batch_size = 1000
predictions = np.array([])
labels = []
while True:
try:
batch_label = []
batch_fea_ids = []
batch_fea_vals = []
max_fea_size = 0
for i in range(batch_size):
label, fea_ids, fea_vals = next(record_genertor)
batch_label.append(label)
batch_fea_ids.append(fea_ids)
batch_fea_vals.append(fea_vals)
if len(batch_fea_ids) > max_fea_size:

max_fea_size = len(batch_fea_ids)
shape = np.array([batch_size, max_fea_size],dtype=np.int64 )
batch_feature_id_indices, batch_feature_id_values,batch_feature_val_indices, batch_feature_val_values = batch2sparse_component(batch_fea_ids, batch_fea_vals)
request.inputs["DeserializeSparse:0"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_id_indices))
request.inputs["DeserializeSparse:1"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_id_values))
request.inputs["DeserializeSparse:2"].CopyFrom(tf.contrib.util.make_tensor_proto(shape))
request.inputs["DeserializeSparse_1:0"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_val_indices))
request.inputs["DeserializeSparse_1:1"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_val_values))
request.inputs["DeserializeSparse_1:2"].CopyFrom(tf.contrib.util.make_tensor_proto(shape))
response = stub.Predict(request, 10.0)
results = {}
for key in response.outputs:
tensor_proto = response.outputs[key]
nd_array = tf.contrib.util.make_ndarray(tensor_proto)
results[key] = nd_array
print("cost %ss to predict: " % (time.time() - start_time))
predictions = np.append(predictions, results['output'])
labels += batch_label
print(len(labels), len(predictions))

except StopIteration:
break
fpr, tpr, thresholds = metrics.roc_curve(labels, predictions)
print("auc: {0}",format(metrics.auc(fpr, tpr)))/<code>

開始用一個樣本做測試打出pred的值,成功後,我將所有的測試樣本去組batch去請求,然後計算下auc,對比下eval的時候的auc,差不多,那說明整體流程沒啥問題,另外每1000個樣本耗時大概270多ms,整體感覺還可以。

TensorFlow做Sparse Machine Learning

後續

基本到這裡就差不多了,現在已經支持單個field的Logistic Regression和Factorization Machine,擴展性比較強,只需要重寫算法的類,剩餘的大部分都可以複用,接下來計劃是支持multi-field的數據接入,會實現更高效的Sparse DeepFM, FNN, DIN, DIEN, 其實已經差不多了,現在正在弄可用性,希望能夠通過配置文件直接串起整個流程;另外分佈式的也會支持,這個比較簡單,Estimator本身就ok,只是資源比較少,有機器的時候我試試,雖然我覺得TensorFlow本身的分佈式做的不太讓人理解,但是能夠簡單複用還是比較厲害的;更高維度的支持之後也可以嘗試


分享到:


相關文章: