12.25 協同過濾(ALS)的原理及Python實現

簡介: 提到ALS相信大家應該都不會覺得陌生(不陌生你點進來幹嘛[捂臉]),它是協同過濾的一種,並被集成到Spark的Mllib庫中。本文就ALS的基本原理進行講解,並手把手、肩並肩地帶您實現這一算法。 原理篇 我們用人話而不是大段的數學公式來講講ALS是怎麼一回事。

提到ALS相信大家應該都不會覺得陌生(不陌生你點進來幹嘛[捂臉]),它是協同過濾的一種,並被集成到Spark的Mllib庫中。本文就ALS的基本原理進行講解,並手把手、肩並肩地帶您實現這一算法。

  1. 原理篇

我們用人話而不是大段的數學公式來講講ALS是怎麼一回事。

1.1 你聽說過推薦算法麼

假如我是豆瓣的CEO,很多豆瓣的用戶在豆瓣電影上都會對電影進行評分。那麼根據這個評分數據,我們有可能知道這些用戶除了自己評過分的電影之外還喜歡或討厭哪些電影嗎?這就是一個典型的推薦問題,解決這一類問題的算法被稱為推薦算法。

1.2 什麼是協同過濾

協同過濾的英文全稱是Collaborative Filtering,簡稱CF。注意,這不是一款遊戲!從字面上分析,協同就是尋找共同點,過濾就是篩選出優質的內容。

1.3 協同過濾的分類

一般來說,協同過濾推薦分為三種類型:

  1. 基於用戶(user-based)的協同過濾,通過計算用戶和用戶的相似度找到跟用戶A相似的用戶B, C, D...再把這些用戶喜歡的內容推薦給A;
  2. 基於物品(item-based)的協同過濾,通過計算物品和物品的相似度找到跟物品1相似的物品2, 3, 4...再把這些物品推薦給看過物品1的用戶們;
  3. 基於模型(model based)的協同過濾。主流的方法可以分為:矩陣分解,關聯算法,聚類算法,分類算法,迴歸算法,神經網絡。

1.4 矩陣分解

矩陣分解 (decomposition, factorization)是將矩陣拆解為數個矩陣的乘積。比如豆瓣電影有m個用戶,n個電影。那麼用戶對電影的評分可以形成一個m行n列的矩陣R,我們可以找到一個m行k列的矩陣U,和一個k行n列的矩陣I,通過U * I來得到矩陣R。

1.5 ALS

如果想通過矩陣分解的方法實現基於模型的協同過濾,ALS是一個不錯的選擇,其英文全稱是Alternating Least Square,翻譯過來是交替最小二乘法。假設用戶為a,物品為b,評分矩陣為R(m, n),可分解為用戶矩陣U(k, m)和物品矩陣I(k, n),其中m, n, k代表矩陣的維度。前方小段數學公式低能預警:

根據矩陣分解的定義,有

協同過濾(ALS)的原理及Python實現


用MSE作為損失函數,為了方便化簡,加法符號左側的常數改為-1/2

協同過濾(ALS)的原理及Python實現

  1. 對損失函數求U_a的一階偏導數,那
  2. 令一階偏導數等於0
  3. 同理,可證

1.6 求解用戶矩陣U和物品矩陣I

矩陣R是已知的,我們隨機生成用戶矩陣U,

  1. 利用1.5中的式5、R和U求出I
  2. 利用1.5中的式6、R和I求出U

如此交替地執行步驟1和步驟2,直到算法收斂或者迭代次數超過了最大限制,最終我們用RMSE來評價模型的好壞。

  1. 實現篇

本人用全宇宙最簡單的編程語言——Python實現了ALS算法,沒有依賴任何第三方庫,便於學習和使用。簡單說明一下實現過程,更詳細的註釋請參考本人github上的代碼。

注:代碼中用到的Matrix類是我寫的一個矩陣類,可以取出矩陣的行或列,計算矩陣的乘法、轉置和逆。

2.1 創建ALS類

初始化,存儲用戶ID、物品ID、用戶ID與用戶矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、用戶已經看過哪些物品、評分矩陣的Shape以及RMSE。

<code>class ALS(object):
def __init__(self):
self.user_ids = None
self.item_ids = None
self.user_ids_dict = None
self.item_ids_dict = None
self.user_matrix = None
self.item_matrix = None
self.user_items = None
self.shape = None
self.rmse = None/<code>

2.2 數據預處理

對訓練數據進行處理,得到用戶ID、物品ID、用戶ID與用戶矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、評分矩陣的Shape、評分矩陣及評分矩陣的轉置。

<code>def _process_data(self, X):
self.user_ids = tuple((set(map(lambda x: x[0], X))))
self.user_ids_dict = dict(map(lambda x: x[::-1],
enumerate(self.user_ids)))

self.item_ids = tuple((set(map(lambda x: x[1], X))))
self.item_ids_dict = dict(map(lambda x: x[::-1],
enumerate(self.item_ids)))

self.shape = (len(self.user_ids), len(self.item_ids))

ratings = defaultdict(lambda: defaultdict(int))
ratings_T = defaultdict(lambda: defaultdict(int))
for row in X:
user_id, item_id, rating = row
ratings[user_id][item_id] = rating
ratings_T[item_id][user_id] = rating

err_msg = "Length of user_ids %d and ratings %d not match!" % (
len(self.user_ids), len(ratings))
assert len(self.user_ids) == len(ratings), err_msg

err_msg = "Length of item_ids %d and ratings_T %d not match!" % (
len(self.item_ids), len(ratings_T))
assert len(self.item_ids) == len(ratings_T), err_msg
return ratings, ratings_T/<code>

2.3 用戶矩陣乘以評分矩陣

實現稠密矩陣與稀疏矩陣的矩陣乘法,得到用戶矩陣與評分矩陣的乘積。

<code>def _users_mul_ratings(self, users, ratings_T):

def f(users_row, item_id):
user_ids = iter(ratings_T[item_id].keys())
scores = iter(ratings_T[item_id].values())
col_nos = map(lambda x: self.user_ids_dict[x], user_ids)
_users_row = map(lambda x: users_row[x], col_nos)
return sum(a * b for a, b in zip(_users_row, scores))

ret = [[f(users_row, item_id) for item_id in self.item_ids]
for users_row in users.data]
return Matrix(ret)/<code>

2.4 物品矩陣乘以評分矩陣

實現稠密矩陣與稀疏矩陣的矩陣乘法,得到物品矩陣與評分矩陣的乘積。

<code>def _items_mul_ratings(self, items, ratings):

def f(items_row, user_id):
item_ids = iter(ratings[user_id].keys())
scores = iter(ratings[user_id].values())

col_nos = map(lambda x: self.item_ids_dict[x], item_ids)
_items_row = map(lambda x: items_row[x], col_nos)
return sum(a * b for a, b in zip(_items_row, scores))

ret = [[f(items_row, user_id) for user_id in self.user_ids]
for items_row in items.data]
return Matrix(ret)/<code>

2.5 生成隨機矩陣

<code>def _gen_random_matrix(self, n_rows, n_colums):
data = [[random() for _ in range(n_colums)] for _ in range(n_rows)]
return Matrix(data)/<code>

2.6 計算RMSE

<code>def _get_rmse(self, ratings):
m, n = self.shape
mse = 0.0
n_elements = sum(map(len, ratings.values()))
for i in range(m):
for j in range(n):
user_id = self.user_ids[i]
item_id = self.item_ids[j]
rating = ratings[user_id][item_id]
if rating > 0:
user_row = self.user_matrix.col(i).transpose
item_col = self.item_matrix.col(j)
rating_hat = user_row.mat_mul(item_col).data[0][0]
square_error = (rating - rating_hat) ** 2
mse += square_error / n_elements
return mse ** 0.5/<code>

2.7 訓練模型

數據預處理

變量k合法性檢查

生成隨機矩陣U

交替計算矩陣U和矩陣I,並打印RMSE信息,直到迭代次數達到max_iter

保存最終的RMSE

<code>def fit(self, X, k, max_iter=10):
ratings, ratings_T = self._process_data(X)
self.user_items = {k: set(v.keys()) for k, v in ratings.items()}
m, n = self.shape

error_msg = "Parameter k must be less than the rank of original matrix"
assert k < min(m, n), error_msg

self.user_matrix = self._gen_random_matrix(k, m)

for i in range(max_iter):
if i % 2:
items = self.item_matrix
self.user_matrix = self._items_mul_ratings(
items.mat_mul(items.transpose).inverse.mat_mul(items),
ratings
)
else:
users = self.user_matrix
self.item_matrix = self._users_mul_ratings(
users.mat_mul(users.transpose).inverse.mat_mul(users),
ratings_T
)
rmse = self._get_rmse(ratings)
print("Iterations: %d, RMSE: %.6f" % (i + 1, rmse))

self.rmse = rmse/<code>

2.8 預測一個用戶

預測一個用戶感興趣的內容,剔除用戶已看過的內容。然後按感興趣分值排序,取出前n_items個內容。

<code>def _predict(self, user_id, n_items):
users_col = self.user_matrix.col(self.user_ids_dict[user_id])
users_col = users_col.transpose

items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0])
items_scores = map(lambda x: (self.item_ids[x[0]], x[1]), items_col)
viewed_items = self.user_items[user_id]
items_scores = filter(lambda x: x[0] not in viewed_items, items_scores)

return sorted(items_scores, key=lambda x: x[1], reverse=True)[:n_items]/<code>

2.9 預測多個用戶

循環調用2.8,預測多個用戶感興趣的內容。

<code>def predict(self, user_ids, n_items=10):
return [self._predict(user_id, n_items) for user_id in user_ids]/<code>

3 效果評估

3.1 main函數

使用電影評分數據集,訓練模型並統計RMSE。

<code>@run_time
def main():
print("Tesing the accuracy of ALS...")

X = load_movie_ratings()

model = ALS()
model.fit(X, k=3, max_iter=5)
print()

print("Showing the predictions of users...")

user_ids = range(1, 5)
predictions = model.predict(user_ids, n_items=2)
for user_id, prediction in zip(user_ids, predictions):
_prediction = [format_prediction(item_id, score)
for item_id, score in prediction]
print("User id:%d recommedation: %s" % (user_id, _prediction))/<code>

3.2 效果展示

設置k=3,迭代5次,並展示了前4個用戶的推薦內容,最終RMSE為0.370,運行時間46.5秒,效果還算不錯~

協同過濾(ALS)的原理及Python實現

點擊“瞭解更多”獲取文章使用的工具函數

關鍵字:算法、Python、搜索推薦


分享到:


相關文章: