09对战平台游戏数据分析
09对战平台游戏数据分析
本文是针对09对战平台的主播弑神90357参与的DOTA -OMG模式的十人对黑游戏的数据进行抓取,增量爬取,和数据分析;
想要看明白,需要有python 爬虫,增量,数据处理模块等技术基础,最好了解dota,LOL,王者荣耀等MOBA游戏基础;
一、网页前端内容
1.1基本情况
还是要带着了解一下网页结构,本次爬取的是 自由作战 ,即 十人对黑 ,一般是互相之间认识的人,互相邀请游戏进入同一房间,凑够10人开始玩;
需要注意的人, 由于对黑并不计算在赛季(天梯模式)的分数,所以对黑的一些数据,会少一些;
这个玩家,以下简称 狗弑 (因这名玩家曾有较强的个人能力,如同疯狗般撕咬对方,故荣获此称号);
默认的页面,一页显示13个数据,只会显示狗弑的每场比赛基本数据;
点开每一个战斗数据,会出现一个iframe,显示该场比赛,十个人的详细数据,但是注意详细数据,在稍微久远(例如一年之前)会有缺失,我爬下来分析的时候,发现总共236页,有近100页的内容存在缺失,每页13个,即有快1300条战斗详情数据,有缺失;
缺失示例:
1.2抓取数据思路
经过仔细分析前端数据,想要抓全数据,准备采用三步;
①第一步:
通过战绩基本信息页,每页13个数据,抓g_id(game_id平台为每局游戏设置一个唯一标识),顺带获取游戏开始时间create_time;同时基本信息页,返回了几个关键数据,pagetotal(该玩家总共有多少条数据),pagesize(每页13个), 这是增量爬取的依据 ;以下和代码简称gid表
通过抓包工具,可以看到,基本信息页,只有狗弑一个人的,而为了全面分析数据,要去详情页抓该局游戏十个人的数据,详情页和基本信息页面的很多内容有重复,所以基本信息页我们只要一部分,看个人选择在哪里抓;
②第二步
点进详情页,会弹出一个iframe,其中有两个xhr有重要数据,首先搞定GameBureauMessage表,以下和代码简称Bureau表;
Bureau表记录该局游戏基本信息,我们要g_id(后续用于表合的字段),time_length(游戏玩了多少秒),win_id(0则近卫胜,1则天灾胜利),create_time之前抓过了,所以不需要;其他字段,记录你是对黑还是赛季模式,记录用的哪个地图等,我们不要;
③第三步
iframe详情页的第二个xhr,简称correlation表,记录了该局游戏,十个人的详细数据;
而在请求头中,也包含g_id,所以我们通过g_id,将基本信息,Bureau表,correlation表连起来;
因为基本信息表中,返回了总共多少个数据和现在是第多少页,所以没办法必须带上;
1.3字段说明:
g_source:0 自由作战,对黑 g_source:4,自由匹配 user_id:平台用户唯一标识
assist_count 助攻数 create_time 游戏开始时间 creep_kill 正补数
hero_level 游戏结束时英雄等级 hero_name 英雄中文名
creep_denies:反补数 dust_count :买(用)粉数
gem_count :买(用)宝石数 eye_count :买(用)眼数
smoke_count :买(用)战争迷雾数 team_id 0为近卫,1为天灾
hurt_value: 总计伤害数值(对敌人英雄) total_money:金钱数 neutral_kill:杀野怪数
kill_count 杀敌方英雄数 killed_count 被杀数(被野怪杀的也计算入内)
title: 游戏称号,例如MVP,助,杀,僵,躺等,这里用了一个巧妙的计算方法,网页前端,返回数据1 ->杀;2->MVP;4 ->助 ;8 ->躺 ;16 ->灵 ;32 ->僵;
如果title为3,则是1+2,即->杀+MVP ;如果为6,则是2+4 ->MVP+助 ;
GameTypeID 1是正版DOTA,2是IM模式,21为OMG模式
CurrentSeason=0 自由匹配 不算做赛季,故为0
creep_killed 不确定是被野怪还是小兵还是敌方塔所杀数,现在对黑模式并未详细记录这个数,都是0(以前平台会把被野怪所杀单独拧出来不算死亡数,现在计算死亡就是死了多少次,不少算)
push_tower_count:拆塔数,由于拆塔数,只在查询某个玩家的时候,会返回,correlation表中没有,按照我们抓十个人的游戏数据来看,决定不去追求这个数据,不然要把很多个不同玩家的数据全抓再合并,那将指数级麻烦;
push_rax_count:猜测为拆非塔的其他建筑数,同拆塔数,舍弃;
items:一串数字,游戏计算结束时,英雄身上的装备,决定舍弃,一是要理解这串数字含义,需逆向或者大量对比,二是游戏结束时,基本都是那些狠得装备,没太大意义;
skills:技能,会有八个,但最后两个为0,故只需要前6个,玩过dota类游戏都知道,英雄本身有四个技能,而OMG模式多的两个技能,如果要对比分析,大几百乘以小几百,会形成高纬稀疏矩阵,故舍弃;
参战率war_rate
,是前端计算而非后端返回的,即某个英雄的 (K+A)/己方K之和 的百分比,四舍五入,这是看前端发现的,并非我猜的…
前面说过,较早的数据有缺失,故后面的数据会出现参战率超过100%的bug,所以在数据全部爬下来后,对很多不足十个玩家的对局,要决定是剔除数据,还是填充什么的?经过考虑,比如要分析,狗弑跟哪个队友在一起胜率高,这个问题,那么缺失数据会造成误判,虽然缺失数据占比有四成,但还是决定删掉,当然这是在分析之后才发现的问题;
同时,有少数两个对局,有一方被完全压制,全员一个人头都没有,会有除0错误,需要后续在判断中加入条件;
二、数据爬取(带增量功能)
2.1概括说明
突然,就写不动了,因为代码中有注释,可以自行看看;由于后端返回的都是轻量的JSON数据,所以小开几个线程,应该不会对平台造成什么影响;
思路大致如下:实现基本同本文上面的三步走方法,但是添加容错,增量,防意外等功能;
第一步 ,抓基本信息页g_id和create_time,多线程抓gid表信息,在本地形成data09_less.txt;
第二步 ,通过data09_less.txt的每个g_id,多线程抓Bureau表的对局基本信息,形成data_09_bureau.txt,说白了就是要Bureau表的win_id信息,不然不知道是近卫赢了还是天灾赢了;经过试验,经常有几个条数据会抓失败,其实应该在requests的时候,就使用重试功能,但是懒得大改了,故添加了对比两个文本长度的功能,会自动将Bureau表漏的补全;
第三步 :还是通过data09_less.txt的每个g_id,多线程抓correlation表信息,也会偶尔有失败的,故通过日志,根据抓取操作时间,判断失败的,再给补上;
由于每个表都有一个唯一的g_id,故三个表可以实现类似SQL的join,pandas的merge,最终形成一个表;
第四步 :增量功能,从上面三步可以看出,最主要的就是g_id,每局游戏唯一标识,Bureau表和correlation表,对于同一个玩家,同一个模式的对局信息,请求头就是g_id不同;通过设置一个update_flag,判断本地的g_id和网页返回的pagetotal数量是否一致,判断是否要更新;利用update_data完成对g_id的更新和判断是否要更新;
2.2代码部分
import requests
from lxml import etree
from urllib import parse
import json
import time
import math
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import logging
import sys
import os
import warnings
class Zhanji09():
# 用于获取有多少页,多少条数据
first_url = "https://score.09game.com/MOBA/BasicDataList?UserID=1477944&GameTypeID=21&CurrentSeason=0&GameSource=0&Time=-1&PageIndex=0&PageSize=13"
# 用于获取页面中每一个数据的g_id和create_time,并未点进iframe数据
page_data_url = "https://score.09game.com/MOBA/BasicDataList?UserID=1477944&GameTypeID=21&CurrentSeason=0&GameSource=0&Time=-1&PageIndex=0&PageSize=13"
# 根据g_id获取对应url的g_id和win_id(iframe数据)
bureau_url_none = "https://score.09game.com/MOBA/GameBureauMessage?GameTypeID=21&GameID="
# 根据g_id获取对应url的详情数据,需要将g_id与详情数据对齐
correlation_url_none = "https://score.09game.com/MOBA/CorrelationPlayerMilitaryExploit?GameTypeID=21&GameID=&GameSource=0&CurrentSeason=0"
def __init__(self,store_path='d:/goushi'):
self.store_path = store_path
if not os.path.exists(self.store_path):
os.makedirs(self.store_path)
if not os.path.exists(self.store_path+'/data09_less.txt'):
self.update_flag = False
print('本次为全量数据')
elif os.path.exists(self.store_path+'/data09_less.txt'):
self.update_flag = True
print('本次为增量追加数据')
def get_total_page(self):
"""获取总共有多少页,向上取证"""
resp = requests.get(self.first_url)
data = json.loads(resp.text)
pagetotal = data['data']['pageTotal']
pagesize = data['data']['pageSize']
page_num = math.ceil(pagetotal/pagesize)
print('有%d页--共计%d条数据'%(page_num,pagetotal))
return page_num,pagesize,pagetotal
def update_data(self):
if not self.update_flag:
print('第一次请用全量下载功能,已自动跳转调用')
self.__multi_thread_get_gid()
return
# 一个文本专门存更新数据,用完就删掉
if os.path.exists(self.store_path + "/data09_less_update.txt"):
os.remove(self.store_path + "/data09_less_update.txt")
print('旧的更新文件已删除')
"""检查数据增量情况,只追加新增数据"""
df_check = pd.read_csv(self.store_path + "/data09_less.txt", names=['g_id', 'create_time'], sep='\t')
has_pagetotal = len(df_check)
has_gids = len(df_check.g_id.unique())
if has_pagetotal != has_gids:
warnings.warn('现存数据有问题,distinct的g_id和总g_id数量不一致')
print('总g_id数%d个--去重g_id数%d个' % (has_pagetotal, has_gids))
new_page_num, pagesize, new_pagetotal = self.get_total_page()
if has_pagetotal == new_pagetotal:
print("数据没有更新")
return
if has_pagetotal < new_pagetotal:
xin_num = new_pagetotal - has_pagetotal
print("数据更新了%d个" % xin_num)
# 向上取证最大页数
yemian_num = math.ceil(xin_num / pagesize)
# 更新页面数较少,则单线程下载,较多则多线程
if yemian_num < 30:
for i in range(yemian_num - 1, -1, -1):
print(f"第{i}页")
url = self.page_data_url.replace("PageIndex=0", "PageIndex={}".format(i))
self.get_gid_base(url, i)
else:
self.__multi_thread_get_gid(yemian_num)
# 去掉重复的g_id
file_path_old = self.store_path + "/data09_less.txt"
file_path_new = self.store_path + "/data09_less_update.txt"
df_1 = pd.read_csv(file_path_old, names=['g_id', 'create_time'], sep='\t')
df_2 = pd.read_csv(file_path_new, names=['g_id', 'create_time'], sep='\t')
print('更新的数据,去重前有%d个'%len(df_2))
# 找到新数据,这里用到了merge的indicator,会添加_merge列,表示数据源是哪个表的
df_3 = pd.merge(df_1, df_2, how='outer', on='g_id', indicator=True) # 会有四个列
df_diff = df_3[df_3['_merge'] == 'right_only'] # 选择新增的部分
df_diff = df_diff[['g_id','create_time_y']] # 只选要的两列
# 去重后覆盖更新文件
with open(file_path_new, mode='w', encoding='utf-8') as f:
df_diff.to_csv(file_path_new,sep='\t',index=False,header=False)
print('更新文件已经覆盖,数据为%d个'%len(df_diff))
# 添加到旧的表后面
df_diff.to_csv(file_path_old, sep='\t', index=False, header=False,mode='a')
def create_logger(self,logger_name='zhanji09_logger'):
"""
日志功能,记录相关信息
"""
self.logger = logging.getLogger(name=logger_name)
self.logger.setLevel(logging.INFO)
self.logger.propagate=False # 不向上传递
# 存储用
handler_file = logging.FileHandler(self.store_path+'/'+'zhanji.log',mode='a',encoding='utf-8')
format_file = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s'
,datefmt='%Y-%m-%d %H:%M:%S')
handler_file.setLevel(logging.ERROR)
handler_file.setFormatter(format_file)
# 输出用
handler_console = logging.StreamHandler(sys.stdout)
handler_console.setLevel(logging.INFO)
format_console = logging.Formatter('%(message)s|%(asctime)s')
handler_console.setFormatter(format_console)
# 要避免重复添加
if not self.logger.handlers:
self.logger.addHandler(handler_file)
self.logger.addHandler(handler_console)
def get_gid_base(self,url, page_num):
"""
多线程的基本功能,获取g_id和create_time
每一页是先存最早的
一页如果满的话,有13个数据
存g_id和时间两列数据
形参page_num单纯为了打印而已
"""
resp = requests.get(url)
cur_data = json.loads(resp.text)
g_id = [x['g_id'] for x in cur_data['data']['listEntity']]
create_time = [x['create_time'] for x in cur_data['data']['listEntity']]
if not self.update_flag:
file_path = self.store_path+"/data09_less.txt"
if self.update_flag:
file_path = self.store_path + "/data09_less_update.txt"
with open(file_path, mode='a', encoding='utf-8') as f:
# 按照最早的到最新的顺序存储
for a, b in zip(g_id[::-1], create_time[::-1]):
f.write(f"{a}\t{b}\n")
# print(f"第{page_num}页数据已存")
self.logger.info(f"第{page_num}页数据已存")
def __multi_thread_get_gid(self):
"""
由于使用多线程,故每一页的顺序不会完全一致
该功能用于首次全量获取数据
"""
if self.update_flag:
print('跳转至数据更新功能')
self.update_data()
if not self.update_flag:
page_num = self.get_total_page()[0]
with ThreadPoolExecutor(max_workers=4) as pool:
for i in range(page_num - 1, -1, -1):
url = self.page_data_url.replace("PageIndex=0", "PageIndex={}".format(i))
pool.submit(self.get_gid_base, url, i)
# 1.4 获取每局bureau表信息
def get_bureau_base(self,url,g_id):
resp = requests.get(url,timeout=5)
try:
if resp.status_code == 200:
cur_data = json.loads(resp.text)
g_id = cur_data['data'][0]['g_id']
win_id = cur_data['data'][0]['win_id']
time_length = cur_data['data'][0]['time_length']
with open(self.store_path+'/data_09_bureau.txt',mode='a') as f:
f.write(f"{g_id}\t{win_id}\t{time_length}\n")
print(f"{g_id}的bureau数据已存")
except Exception as e:
print(e)
self.logger.error(f"{g_id}的bureau数据获取失败")
# finally:
# time.sleep(0.5)
def multi_thread_get_bureau(self):
"""根据存的g_id,去找bureau表信息"""
if not self.update_flag:
file_path = self.store_path+"/data09_less.txt"
if self.update_flag:
file_path = self.store_path + "/data09_less_update.txt"
# 如果文件不存在,则证明无更新数据
if not os.path.exists(file_path):
print("bureau无更新数据")
return
with open(file_path,mode='r') as f:
datas = f.readlines()
with ThreadPoolExecutor(max_workers=8) as pool:
for i in datas:
g_id = i.split("\t")[0]
url = self.bureau_url_none.replace("GameID=","GameID={}".format(g_id))
pool.submit(self.get_bureau_base, url,g_id)
# 1.5 验证两份数据是否等长
def check_data_length(self):
"""
有时候存在个别下载失败的情况,默认g_id数据最全
bureau表信息可能会漏,故补全
改进:在requests方法中,使用出错重试功能
"""
df_1 = pd.read_csv(self.store_path+"/data09_less.txt", header=None, sep='\t')
df_2 = pd.read_csv(self.store_path+'/data_09_bureau.txt', header=None, sep='\t')
if len(df_1)>len(df_2):
print("bureau数据存在缺失")
merge_df = pd.merge(df_1, df_2,how='left',left_on=0,right_on=0)
missed_ids = merge_df[merge_df['1_y'].isnull()][0].values
for i in missed_ids:
url = self.bureau_url_none.replace("GameID=","GameID={}".format(i))
self.get_bureau_base(url, i)
else:
print("bureau数据完整",f"总共{len(df_1)}条数据")
# 1.6获取每局详细数据
def get_correlation_base(self,url, g_id):
"""需要稍微研究一下网页返回的数据结构和信息,部分内容建议存为字符串"""
try:
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
cur_data = json.loads(resp.text)
with open(self.store_path+'/data_09_correlation.txt', mode='a') as f:
for info in cur_data['data']:
user_id = str(info['user_id'])
user_name = info['user_name']
hero_id = info['hero_id']
hero_name = info['hero_name']
hero_level = int(info['hero_level'])
kill_count = int(info['kill_count'])
killed_count = int(info['killed_count'])
assist_count = int(info['assist_count'])
title = str(info['title'])
dust_count = int(info['dust_count'])
eye_count = int(info['eye_count'])
gem_count = int(info['gem_count'])
smoke_count = int(info['smoke_count'])
creep_kill = int(info['creep_kill'])
creep_denies = int(info['creep_denies'])
total_money = int(info['total_money'])
hurt_value = int(info['hurt_value'])
team_id = str(info['team_id'])
neutral_kill = int(info['neutral_kill'])
# items = info['items'] # 装备信息,暂不存储
# skills = info['skills'][:-3] # 去掉最后两个0,暂不存储
# 此时一定要将g_id写入,用于两表合并
f.write(
f"{g_id}\t{user_id}\t{user_name}\t{hero_id}\t{hero_name}\t{hero_level}\t{kill_count}\t{killed_count}\t{assist_count}\t{title}\t{dust_count}\t{eye_count}\t{gem_count}\t{smoke_count}\t{creep_kill}\t{creep_denies}\t{total_money}\t{hurt_value}\t{team_id}\t{neutral_kill}\n")
# print(f"{g_id}的correlation数据已存")
self.logger.info(f"{g_id}的correlation数据已存")
except Exception as e:
print(e)
self.logger.error(f"{g_id}的correlation数据获取失败")
def multi_thread_get_correlation(self):
"""
本地bureau表数据中,也有g_id,之前也对比过length,故以此为标准下载详情页数据
"""
if not self.update_flag:
file_path = self.store_path+"/data09_less.txt"
if self.update_flag:
file_path = self.store_path + "/data09_less_update.txt"
if not os.path.exists(file_path):
print("correlation无更新数据")
return
with open(file_path, mode='r') as f:
datas = f.readlines()
with ThreadPoolExecutor(max_workers=6) as pool:
for i in datas:
g_id = i.split("\t")[0]
url = self.correlation_url_none.replace("GameID=", "GameID={}".format(g_id))
pool.submit(self.get_correlation_base, url, g_id)
# 1.7 补上获取失败的,实测发现偶尔会有数据获取失败的
def check_data_failure(self,time_str ='2025-03-10 18:40:00'):
"""
在下载详情页数据时,也要验证一下是否有失败的
因为日志中有存储,这是根据日志自动补全的功能
需要手动指定间隔的时间线
"""
missed_datas = pd.read_csv(self.store_path+"/zhanji.log"
, sep='|'
, names=['time', 'type', 'message', 'other'])
# 用于指定时间范围,需要先将时间列设为索引
missed_datas['time'] = pd.to_datetime(missed_datas['time'])
missed_datas.set_index(missed_datas['time'])
# 输入比较时间范围
need = missed_datas[missed_datas['time'] > time_str]
if len(need) == 0:
print("详细战斗数据没有下载失败的")
return
# missed_ids = missed_datas.loc[:,'message'].map(lambda x:x.split('的')[0])
missed_ids = need.loc[:, 'message'].map(lambda x: x.split('的')[0])
if len(missed_ids) > 0:
print('存在下载失败的数据%d个' % len(missed_ids))
for g_id in missed_ids:
url = self.correlation_url_none.replace("GameID=", "GameID={}".format(g_id))
self.get_correlation_base(url, g_id)
2.3实例
instance = Zhanji09()
instance.get_total_page()
instance.create_logger()
instance.update_data()
instance.multi_thread_get_bureau()
instance.check_data_length()
instance.multi_thread_get_correlation()
instance.check_data_failure("2025-03-12 13:21:55")
此时大腿拍断,为什么要手搓爬虫,如果用scrapy,工作量直接少很多,有空改一下代码用scrapy搞,此时的数据,缺少两个信息:
一个判断本局获胜还是输,通过win_id是否和team_id相同,0近卫,1是天灾,网页前端返回数据是反着来的,0是输,1是胜利;
二是参战率,需要先将本局本队伍总人头数算出,再用每个人的K+A一除,注意避免除0;
2.4初步清洗
这个就在本地慢慢搞,建议用jupyter notebok这种,慢慢来
# 对数据进行清洗,生成两个关键字段
store_path='d:/goushi'
df_1 = pd.read_csv(store_path+"/data09_less.txt", header=None, sep='\t', names=['g_id', 'create_time'])
df_2 = pd.read_csv(store_path+"/data_09_bureau.txt", header=None, sep='\t', names=['g_id', 'win_id','time_length'])
df_3 = pd.read_csv(store_path+"/data_09_correlation.txt", header=None, sep='\t'
, names=['g_id', 'user_id', 'user_name', 'hero_id', 'hero_name', 'hero_level', 'kill_count', 'killed_count', 'assist_count', 'title', 'dust_count', 'eye_count', 'gem_count', 'smoke_count', 'creep_kill', 'creep_denies', 'total_money', 'hurt_value', 'team_id','neutral_kill']
,encoding='ANSI')
# 会有个别数据有问题
df_3.dropna(inplace=True,subset=['hero_level','total_money'])
# 先融合前两个表
df_merge_1 = pd.merge(df_1,df_2,how='inner',on='g_id')
# 找到不足十人的局,删掉
lack_ids = df_3.groupby('g_id')['user_name'].count()<10
df_merge_1_dropna = pd.merge(df_merge_1,lack_ids,left_on='g_id',right_on =lack_ids.index,how='right')
df_merge_1_dropna_2 = df_merge_1_dropna[df_merge_1_dropna['user_name']==False]
df_merge_1_dropna_2.reset_index(drop=True,inplace=True)
df_merge_1_dropna_2 = df_merge_1_dropna_2.iloc[:,:4]
df_merge_2 = pd.merge(df_merge_1_dropna_2,df_3,how='inner',on='g_id')
获取胜负,0胜1负
def get_win_lost(df):
return 0 if df['win_id']==df['team_id'] else 1
df_merge_2['win_or_lost'] = df_merge_2.apply(get_win_lost,axis=1)
获取参战率
df_merge_3 = pd.merge(df_merge_2,temp_df,how='inner'
,left_on=['g_id','team_id'],right_on = ['g_id','team_id'])
def get_warrate(df):
try: # 发现有少数几盘,队友全崩,一局一个人都没杀,会除0错误
war_rate =round(((df['kill_count'] + df['assist_count'])/df['team_kill_sum'])*100,0)
except ZeroDivisionError:
war_rate = 0
return war_rate
df_merge_3['war_rate'] = df_merge_3.apply(get_warrate,axis=1)
df_merge_3['g_id']=df_merge_3['g_id'].astype(str)
df_merge_3.to_excel('d:/dogshi_data.xlsx',index=False)
import gc
gc.collect()
数据分析这块,有空再更新一下。
已生成EXCEL数据,度盘链接如下:
通过网盘分享的文件:goushi
链接: 提取码: 9527