協同過濾算法分佈式實現

協同過濾算法分佈式實現

導讀: 本文主要介紹協同過濾基礎知識,以及分佈式實現設計,並最終在 Spark 平臺上對同現相似度、Cosine 相似度、歐幾里得距離相似度、關聯規則進行代碼實現。

▌ 協同過濾推薦介紹

協同過濾推薦是最經典、最常用的推薦算法,該算法通過分析用戶的興趣,在用戶群中找到指定用戶的相似用戶,綜合這些相似用戶對某一信息的評價,形成系統對該指定用戶針對此信息的喜好程度的預測。常見的協同過濾有以下2種。

基於用戶的協同過濾(UserCF),就是基於用戶對物品的評分得到用戶向量(以物品為維度,得到用戶向量),計算相似用戶,然後再進行推薦,如圖1所示。

協同過濾算法分佈式實現

圖1 基於用戶的協同過濾示例

基於物品的協同過濾(ItemCF),就是基於物品所對應的用戶評分得到物品向量(以用戶為維度,得到物品向量),計算相似物品,然後再進行推薦,如圖2所示。

協同過濾算法分佈式實現

圖2 基於物品的協同過濾示例

▌ 相似度計算公式

1. 同現相似度

物品 A 和物品 B 的同現相似度公式定義如下:

協同過濾算法分佈式實現


其中分母 N(A) 是喜歡物品 A 的用戶數、N(B) 是喜歡物品 B 的用戶數,而分子

協同過濾算法分佈式實現

則是同時喜歡物品 A 和物品 B 的用戶數。

2. 歐 幾里得 距離

該距離最初用於計算歐幾里得空間中兩個點的距離,假設 x、y 是 n 維空間中的兩個點,它們之間的歐幾里得距離如下:

協同過濾算法分佈式實現


可以看出,當 n=2 時,歐幾里得距離就是平面上兩個點的距離。

當用歐幾里得距離表示相似度時,一般採用以下公式進行轉換:距離越小,相似度值越大。

協同過濾算法分佈式實現


3. 皮爾遜相關係數

皮爾遜相關係數一般用於計算兩個定距變量間聯繫的緊密程度,它的取值範圍為 [ - 1,+1]。

協同過濾算法分佈式實現

協同過濾算法分佈式實現

是 x 和 y 的樣品標準偏差。

4. Cosine 相似度

Cosine 相似度被廣泛應用於計算文檔數據的相似度:

協同過濾算法分佈式實現


▌ 相似度分佈式實現

1. 同現相似度 分佈式設計

對於同現相似度矩陣計算,首先以用戶 id 為 key 進行 group by 操作,得到每個用戶的所有物品集合,然後對每個用戶的物品集合進行 flatMap 操作:對物品集合生成兩兩物品對(物品,物品),其中只生成上三角部分;之後對(物品,物品)對進行 group by 操作,得到物品與物品的總出現次數,隨後再根據同現相似度公式:

w(i,j) = N(i)∩N(j)/sqrt(N(i)×N(j))

其中分子是 i 與 j 的同現頻次,分母的 N(i) 是 i 頻次、N(j) 是 j 頻次。計算物品與物品的相似度,最終得到所有上三角部分的相似度。過程如圖3所示。

協同過濾算法分佈式實現

圖3 分佈式同現相似度矩陣計算過程

2. 歐幾里得距離相似度分佈式設計

對於歐幾里得距離相似度的計算,採用離散計算公式:

d(x, y) = sqrt(∑((x(i) - y(i))×(x(i) - y(i))))

其中 i 只取 x、y 同現的點,未同現的點不參與相似度計算;

sim(x, y) = m / (1+d(x, y))

m 為 x、y 重疊數(同現次數)。

對於 Cosine 相似度的計算,採用離散計算公式,其中 i 只取 x、y 同現的點,未同現的點不參與相似度計算。

之後的歐幾里得距離相似度計算如下:對(物品,物品)對進行 group by 操作,得到物品與物品的總出現次數,以及(評分 i - 評分 j)平方累加值,並且對累加值開方,最後根據公式 m / (1 + d(x, y)) 計算相似度。過程如圖4所示。

協同過濾算法分佈式實現

圖4 分佈式歐幾里得距離相似度和 Cosine 相似度計算過程

3. Cosine 相似度計算分佈式設計

Cosine 相似度計算如下:對(物品,物品)對進行 group by 操作,得到 x×y=sum(評分i× 評分j),|x|^2=sum(評分i^2),|y|^2=sum(評分j^2),最後根據公式計算相似度。過程如圖4所示。

▌ 相似度計算代碼實現

對同現相似度、Cosine 相似度、歐幾里得距離相似度、關聯規則進行代碼實現,實現語言為 Scala,實現平臺為 Spark,其實現代碼如下:

1. 同現相似度計算

/**
* 同現相似度計算
* w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
* @param user_rdd 用戶評分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def CooccurrenceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
import user_ds.sparkSession.implicits._

// 1 (用戶:物品) => (用戶:(物品集合))
val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")). withColumnRenamed ("collect_set(itemid)", "itemid_set")


// 2 物品:物品,上三角數據
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray. sorted
val result = new ArrayBuffer[(String, String, Double)]()
for (i for (j result += ((itemlist(i), itemlist(j), 1.0))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ"). withColumnRenamed("_3", "score")

// 3 計算物品與物品,上三角,同現頻次
val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))

// 4 計算物品總共出現的頻次
val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum ("score").as("score"))

// 5 計算同現相似度
val user_ds4 = user_ds3.join(user_ds0.withColumnRenamed("itemid", "itemidJ"). withColumnRenamed("score", "sumJ").select("itemidJ", "sumJ"), "itemidJ")

val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI").select("itemidI", "sumI"), "itemidI")

// 根據公式N(i)∩N(j)/sqrt(N(i)*N(j)) 計算
val user_ds6 = user_ds5.withColumn("result", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))

// 6 上、下三角合併
println(s"user_ds6.count(): ${user_ds6.count()}")val user_ds8 = user_ds6. select("itemidI", "itemidJ", "result").union(user_ds6.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))
println(s"user_ds8.count(): ${user_ds8.count()}")

// 7 結果返回
val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
val itemidI = row.getString(0)
val itemidJ = row.getString(1)
val similar = row.getDouble(2)
ItemSimi(itemidI, itemidJ, similar)
}
out
}

2. Cosine 相似度計算

/**
* Cosine相似度計算
* T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i)))
* @param user_rdd 用戶評分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def CosineSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
import user_ds.sparkSession.implicits._

// 1 數據準備
val user_ds1 = user_ds.
withColumn("iv", concat_ws(":", $"itemid", $"pref")).
groupBy("userid").agg(collect_set("iv")).
withColumnRenamed("collect_set(iv)", "itemid_set").
select("userid", "itemid_set")

// 2 物品:物品,上三角數據
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[scala.collection.mutable.WrappedArray [String]](1).toArray. sorted
val result = new ArrayBuffer[(String, String, Double, Double)]()
for (i for (j result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i). split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")

// 3 按照公式計算相似度
// x*y = ∑x(i)y(i)
// |x|^2 = ∑(x(i)*x(i))
// |y|^2 = ∑(y(i)*y(i))
// result = x*y / sqrt(|x|^2) * sqrt(|y|^2)
val user_ds3 = user_ds2.
withColumn("cnt", lit(1)).
groupBy("itemidI", "itemidJ").
agg(sum(($"scoreI" * $"scoreJ")).as("sum_xy"),
sum(($"scoreI" * $"scoreI")).as("sum_x"),
sum(($"scoreJ" * $"scoreJ")).as("sum_y")).
withColumn("result", $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y")))

// 4 上、下三角合併
val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").
union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))

// 5 結果返回
val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
val itemidI = row.getString(0)
val itemidJ = row.getString(1)
val similar = row.getDouble(2)
ItemSimi(itemidI, itemidJ, similar)
}
out
}

3. 歐 幾里得距離相似度計算

/**
* 歐幾里得距離相似度計算
* d(x, y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))
* sim(x, y) = n / (1 + d(x, y))
* @param user_rdd 用戶評分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def EuclideanDistanceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
import user_ds.sparkSession.implicits._

// 1 數據準備
val user_ds1 = user_ds.
withColumn("iv", concat_ws(":", $"itemid", $"pref")).
groupBy("userid").agg(collect_set("iv")).
withColumnRenamed("collect_set(iv)", "itemid_set").
select("userid", "itemid_set")

// 2 物品:物品,上三角數據
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]] (1).toArray.sorted
val result = new ArrayBuffer[(String, String, Double, Double)]()
for (i for (j result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist (i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")

// 3 按照公式計算相似度

// dist = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))
// cntSum = sum(1)
// result = cntSum / (1 + dist)
val user_ds3 = user_ds2.
withColumn("cnt", lit(1)).
groupBy("itemidI", "itemidJ").
agg(sqrt(sum(($"scoreI" - $"scoreJ") * ($"scoreI" - $"scoreJ"))).as("dist"), sum($"cnt").as("cntSum")).
withColumn("result", $"cntSum" / (lit(1.0) + $"dist"))

// 4 上、下三角合併
val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").union(user_ds3.select ($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))

// 5 結果返回
val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
val itemidI = row.getString(0)
val itemidJ = row.getString(1)
val similar = row.getDouble(2)
ItemSimi(itemidI, itemidJ, similar)
}
out
}

4. 關聯規則計算

/**
* 關聯規則計算
* 支持度(Support):在所有項集中{X, Y}出現的可能性,即項集中同時含有X和Y的概率P(X U Y)/P(I),I表
* 示全部事務
* 置信度(Confidence):在先決條件X發生的條件下,關聯結果Y發生的概率,即P(X U Y)/P(X)
* 提升度(lift):在含有X的條件下同時含有Y的可能性與在沒有X的條件下項集中含有Y的可能性之比,即
* confidence(X => Y)/P(Y)
* @param user_rdd 用戶評分
* @param RDD[ItemAssociation] 返回物品相似度

*
*/
def AssociationRules(user_ds: Dataset[ItemPref]): Dataset[ItemAssociation] = {
import user_ds.sparkSession.implicits._
// 1 (用戶:物品) => (用戶:(物品集合))
val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")).withColumnRenamed ("collect_set(itemid)", "itemid_set")

// 2 物品:物品,上三角數據
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[WrappedArray[String]](1).toArray.sorted
val result = new ArrayBuffer[(String, String, Double)]()
for (i for (j result += ((itemlist(i), itemlist(j), 1.0))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ"). withColumnRenamed("_3","score")

// 3 計算物品與物品,上三角,同現頻次
val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))

// 4 計算物品總共出現的頻次
val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum ("score").as("score"))
val user_all = user_ds1.count

// 5 計算支持度(Support)
val user_ds4 = user_ds3.select("itemidI", "itemidJ", "sumIJ").
union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"sumIJ")).
withColumn("support", $"sumIJ" / user_all.toDouble)

// user_ds4.orderBy($"support".desc).show

// 6 置信度(Confidence)
val user_ds5 = user_ds4.
join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI"), "itemidI").
withColumn("confidence", $"sumIJ" / $"sumI")

// user_ds5.orderBy($"confidence".desc).show

// 7 提升度(lift)
val user_ds6 = user_ds5.
join(user_ds0.withColumnRenamed("itemid", "itemidJ").withColumnRenamed("score", "sumJ"), "itemidJ").
withColumn("lift", $"confidence" / ($"sumJ" / user_all.toDouble))

// user_ds6.orderBy($"lift".desc).show


// 計算同現相似度
val user_ds8 = user_ds6.withColumn("similar", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))
// user_ds8.orderBy($"similar".desc).show

// 8 結果返回
val out = user_ds8.select("itemidI", "itemidJ", "support", "confidence", "lift", "similar").map { row =>
val itemidI = row.getString(0)
val itemidJ = row.getString(1)
val support = row.getDouble(2)
val confidence = row.getDouble(3)
val lift = row.getDouble(4)
val similar = row.getDouble(5)
ItemAssociation(itemidI, itemidJ, support, confidence, lift, similar)
}
out
}

總結

在大規模分佈式工程實踐中,當樣本量級比較大的時候,會導致物品向量或者用戶向量維度很高(比如1億用戶,那物品的向量維度會有1億維),會導致計算性能問題。這裡在工程實踐中,如何解決這個問題呢,最簡單粗暴的方案就是考慮採樣方法,進行降維,其中採樣包括:對用戶進行採樣策略和對物品採樣策略,最終目的使得計算性能滿足所需要的性能要求。當然還有一些高級方案,比如可以借鑑 Facebook 的 Faiss 原理,這裡就不具體展開講解。


分享到:


相關文章: