欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 明星 > 完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架

完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架

2025/2/5 14:41:45 来源:https://blog.csdn.net/huanghm88/article/details/145167207  浏览:    关键词:完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架
{"cells": [{"cell_type": "markdown","metadata": {},"source": ["# 基于用户的协同过滤算法"]},{"cell_type": "code","execution_count": 1,"metadata": {},"outputs": [],"source": ["# 导入包\n","import random\n","import math\n","import time\n","from tqdm import tqdm"]},{"cell_type": "markdown","metadata": {},"source": ["## 一. 通用函数定义"]},{"cell_type": "code","execution_count": 2,"metadata": {},"outputs": [],"source": ["# 定义装饰器,监控运行时间\n","def timmer(func):\n","    def wrapper(*args, **kwargs):\n","        start_time = time.time()\n","        res = func(*args, **kwargs)\n","        stop_time = time.time()\n","        print('Func %s, run time: %s' % (func.__name__, stop_time - start_time))\n","        return res\n","    return wrapper"]},{"cell_type": "markdown","metadata": {},"source": ["### 1. 数据处理相关\n","1. load data\n","2. split data"]},{"cell_type": "code","execution_count": 3,"metadata": {},"outputs": [],"source": ["class Dataset():\n","    \n","    def __init__(self, fp):\n","        # fp: data file path\n","        self.data = self.loadData(fp)\n","    \n","    @timmer\n","    def loadData(self, fp):\n","        data = []\n","        for l in open(fp):\n","            data.append(tuple(map(int, l.strip().split('::')[:2])))\n","        return data\n","    \n","    @timmer\n","    def splitData(self, M, k, seed=1):\n","        '''\n","        :params: data, 加载的所有(user, item)数据条目\n","        :params: M, 划分的数目,最后需要取M折的平均\n","        :params: k, 本次是第几次划分,k~[0, M)\n","        :params: seed, random的种子数,对于不同的k应设置成一样的\n","        :return: train, test\n","        '''\n","        train, test = [], []\n","        random.seed(seed)\n","        for user, item in self.data:\n","            # 这里与书中的不一致,本人认为取M-1较为合理,因randint是左右都覆盖的\n","            if random.randint(0, M-1) == k:  \n","                test.append((user, item))\n","            else:\n","                train.append((user, item))\n","\n","        # 处理成字典的形式,user->set(items)\n","        def convert_dict(data):\n","            data_dict = {}\n","            for user, item in data:\n","                if user not in data_dict:\n","                    data_dict[user] = set()\n","                data_dict[user].add(item)\n","            data_dict = {k: list(data_dict[k]) for k in data_dict}\n","            return data_dict\n","\n","        return convert_dict(train), convert_dict(test)"]},{"cell_type": "markdown","metadata": {},"source": ["### 2. 评价指标\n","1. Precision\n","2. Recall\n","3. Coverage\n","4. Popularity(Novelty)"]},{"cell_type": "code","execution_count": 4,"metadata": {},"outputs": [],"source": ["class Metric():\n","    \n","    def __init__(self, train, test, GetRecommendation):\n","        '''\n","        :params: train, 训练数据\n","        :params: test, 测试数据\n","        :params: GetRecommendation, 为某个用户获取推荐物品的接口函数\n","        '''\n","        self.train = train\n","        self.test = test\n","        self.GetRecommendation = GetRecommendation\n","        self.recs = self.getRec()\n","        \n","    # 为test中的每个用户进行推荐\n","    def getRec(self):\n","        recs = {}\n","        for user in self.test:\n","            rank = self.GetRecommendation(user)\n","            recs[user] = rank\n","        return recs\n","        \n","    # 定义精确率指标计算方式\n","    def precision(self):\n","        all, hit = 0, 0\n","        for user in self.test:\n","            test_items = set(self.test[user])\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                if item in test_items:\n","                    hit += 1\n","            all += len(rank)\n","        return round(hit / all * 100, 2)\n","    \n","    # 定义召回率指标计算方式\n","    def recall(self):\n","        all, hit = 0, 0\n","        for user in self.test:\n","            test_items = set(self.test[user])\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                if item in test_items:\n","                    hit += 1\n","            all += len(test_items)\n","        return round(hit / all * 100, 2)\n","    \n","    # 定义覆盖率指标计算方式\n","    def coverage(self):\n","        all_item, recom_item = set(), set()\n","        for user in self.test:\n","            for item in self.train[user]:\n","                all_item.add(item)\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                recom_item.add(item)\n","        return round(len(recom_item) / len(all_item) * 100, 2)\n","    \n","    # 定义新颖度指标计算方式\n","    def popularity(self):\n","        # 计算物品的流行度\n","        item_pop = {}\n","        for user in self.train:\n","            for item in self.train[user]:\n","                if item not in item_pop:\n","                    item_pop[item] = 0\n","                item_pop[item] += 1\n","\n","        num, pop = 0, 0\n","        for user in self.test:\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                # 取对数,防止因长尾问题带来的被流行物品所主导\n","                pop += math.log(1 + item_pop[item])\n","                num += 1\n","        return round(pop / num, 6)\n","    \n","    def eval(self):\n","        metric = {'Precision': self.precision(),\n","                  'Recall': self.recall(),\n","                  'Coverage': self.coverage(),\n","                  'Popularity': self.popularity()}\n","        print('Metric:', metric)\n","        return metric"]},{"cell_type": "markdown","metadata": {},"source": ["## 二. 算法实现\n","1. Random\n","2. MostPopular\n","3. UserCF\n","4. UserIIF"]},{"cell_type": "code","execution_count": 5,"metadata": {},"outputs": [],"source": ["# 1. 随机推荐\n","def Random(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 可忽略\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation,推荐接口函数\n","    '''\n","    items = {}\n","    for user in train:\n","        for item in train[user]:\n","            items[item] = 1\n","    \n","    def GetRecommendation(user):\n","        # 随机推荐N个未见过的\n","        user_items = set(train[user])\n","        rec_items = {k: items[k] for k in items if k not in user_items}\n","        rec_items = list(rec_items.items())\n","        random.shuffle(rec_items)\n","        return rec_items[:N]\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 6,"metadata": {},"outputs": [],"source": ["# 2. 热门推荐\n","def MostPopular(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 可忽略\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    items = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in items:\n","                items[item] = 0\n","            items[item] += 1\n","        \n","    def GetRecommendation(user):\n","        # 随机推荐N个没见过的最热门的\n","        user_items = set(train[user])\n","        rec_items = {k: items[k] for k in items if k not in user_items}\n","        rec_items = list(sorted(rec_items.items(), key=lambda x: x[1], reverse=True))\n","        return rec_items[:N]\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 7,"metadata": {},"outputs": [],"source": ["# 3. 基于用户余弦相似度的推荐\n","def UserCF(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 超参数,设置取TopK相似用户数目\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    # 计算item->user的倒排索引\n","    item_users = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in item_users:\n","                item_users[item] = []\n","            item_users[item].append(user)\n","    \n","    # 计算用户相似度矩阵\n","    sim = {}\n","    num = {}\n","    for item in item_users:\n","        users = item_users[item]\n","        for i in range(len(users)):\n","            u = users[i]\n","            if u not in num:\n","                num[u] = 0\n","            num[u] += 1\n","            if u not in sim:\n","                sim[u] = {}\n","            for j in range(len(users)):\n","                if j == i: continue\n","                v = users[j]\n","                if v not in sim[u]:\n","                    sim[u][v] = 0\n","                sim[u][v] += 1\n","    for u in sim:\n","        for v in sim[u]:\n","            sim[u][v] /= math.sqrt(num[u] * num[v])\n","    \n","    # 按照相似度排序\n","    sorted_user_sim = {k: list(sorted(v.items(), \\\n","                               key=lambda x: x[1], reverse=True)) \\\n","                       for k, v in sim.items()}\n","    \n","    # 获取接口函数\n","    def GetRecommendation(user):\n","        items = {}\n","        seen_items = set(train[user])\n","        for u, _ in sorted_user_sim[user][:K]:\n","            for item in train[u]:\n","                # 要去掉用户见过的\n","                if item not in seen_items:\n","                    if item not in items:\n","                        items[item] = 0\n","                    items[item] += sim[user][u]\n","        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]\n","        return recs\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 8,"metadata": {},"outputs": [],"source": ["# 4. 基于改进的用户余弦相似度的推荐\n","def UserIIF(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 超参数,设置取TopK相似用户数目\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    # 计算item->user的倒排索引\n","    item_users = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in item_users:\n","                item_users[item] = []\n","            item_users[item].append(user)\n","    \n","    # 计算用户相似度矩阵\n","    sim = {}\n","    num = {}\n","    for item in item_users:\n","        users = item_users[item]\n","        for i in range(len(users)):\n","            u = users[i]\n","            if u not in num:\n","                num[u] = 0\n","            num[u] += 1\n","            if u not in sim:\n","                sim[u] = {}\n","            for j in range(len(users)):\n","                if j == i: continue\n","                v = users[j]\n","                if v not in sim[u]:\n","                    sim[u][v] = 0\n","                # 相比UserCF,主要是改进了这里\n","                sim[u][v] += 1 / math.log(1 + len(users))\n","    for u in sim:\n","        for v in sim[u]:\n","            sim[u][v] /&#

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com