Steam爬取相关游戏评测
## 因为是第一次爬取Steam。所以作为一次记录发出;有所错误欢迎指出。
无时间指定爬取
import requests
import time
import csv
import osappid = "553850"
# 这里你也可以改成
#appid = int(input()) max_reviews = 10000 # 想爬多少条
# max_reviews = int(input("请输入你想读取的个数(要为10的倍数)"))
batch_size = 100 # 每页请求条数cursor_file = "cursor.txt" # 游标保存文件
data_file = "helldivers_reviews.csv" # 数据保存文件
// 数据保存文件可以改成你要想要的名字,我这个是csv# 读取上次保存的游标,没有则用起始游标 *
def load_cursor():if os.path.exists(cursor_file):with open(cursor_file, "r", encoding="utf-8") as f:cursor = f.read().strip()if cursor:return cursorreturn "*"# 保存游标到文件
def save_cursor(cursor):with open(cursor_file, "w", encoding="utf-8") as f:f.write(cursor)# 保存数据到CSV,追加模式
def save_reviews(reviews):file_exists = os.path.exists(data_file)with open(data_file, "a", encoding="utf-8-sig", newline="") as f:writer = csv.writer(f)if not file_exists:writer.writerow(["username", "recommend", "hours", "comment", "votes_up", "votes_funny"])for r in reviews:writer.writerow([r.get("author", {}).get("steamid", ""),r.get("voted_up"),r.get("author", {}).get("playtime_forever", 0)/60, # 游戏时长(小时)r.get("review"),r.get("votes_up"),r.get("votes_funny")])# 请求一页评论
def fetch_reviews(appid, cursor):url = f"https://store.steampowered.com/appreviews/{appid}"params = {"json": "1","filter": "recent","language": "all","day_range": "30","review_type": "all","purchase_type": "all","cursor": cursor,"num_per_page": batch_size,}headers = {"User-Agent": "Mozilla/5.0"}proxies = {"http": "http://127.0.0.1:7890","https": "http://127.0.0.1:7890"}try:resp = requests.get(url, params=params, headers=headers,proxies=proxies, timeout=10)resp.raise_for_status()data = resp.json()return dataexcept Exception as e:print("请求失败:", e)return Nonedef main():cursor = load_cursor()total_reviews = 0print(f"开始爬取,起始游标: {cursor}")while total_reviews < max_reviews:data = fetch_reviews(appid, cursor)if not data:print("请求失败,等待后重试...")time.sleep(5)continuereviews = data.get("reviews", [])if not reviews:print("没有更多评论了,爬取结束。")breaksave_reviews(reviews)total_reviews += len(reviews)print(f"已爬取 {total_reviews} 条评论")cursor = data.get("cursor")if not cursor:print("无游标,爬取结束。")breaksave_cursor(cursor) # 保存游标,断点续爬关键time.sleep(1) # 防止请求过快print(f"爬取完成,总共爬取 {total_reviews} 条评论")if __name__ == "__main__":main()
这里有一些提示。
第一点: 对于appid 这个
你要爬的游戏ID 在SteamDB查看 或者是https://store.steampowered.com/app/553850/HELLDIVERS_2/ 那个数字就是appid
第二点,我这个是使用的记录上一次的保存游标不断联读取。 什么意思呢,对于Steam爬取,你可能爬取到一半,对方服务器就给你断连了;那么你就需要重新连接读取,而每次的读取都其实是从上次断开连接的地方开始,而不是从头再来。
第三点,因为是爬取Steam,这种国外平台,你没有外部代理的话是访问不了的,所以得要点魔法。地址呢就是填入代理的地址
proxies = {"http": "http://127.0.0.1:7890","https": "http://127.0.0.1:7890"}
例如我这样,7890是我的代理的端口号,实测了一下,Steam++也是可以使用它这个空闲端口的,本来Steam++是448默认端口,但是这个也可以照样使用。
记得是加入request.(xxxx,xxxx,proxies=proxies),不然你是连接不上的。
最后,这个代码是从今天往以前的代码进行爬取,也就是说你如果没有时间要求的话,就不用管太多,如果有的话,就参考我下面的代码。
指定时间段爬取
import requests
import time
import csv
import os
from datetime import datetimeappid = "553850" #游戏appid
batch_size = 100 #每页请求条数
max_reviews_per_lang = 20000 #最大爬取个数cursor_files = {"english": "cursor_english.txt","schinese": "cursor_schinese.txt",
}data_files = {"english": "reviews_english.csv","schinese": "reviews_schinese.csv",
}// 这里我增加了分语言爬取# 时间范围(转为时间戳)
start_date = datetime.strptime("2025-05-20", "%Y-%m-%d")
end_date = datetime.strptime("2025-06-05", "%Y-%m-%d")
start_ts = int(start_date.timestamp())
end_ts = int(end_date.timestamp())def load_cursor(lang):f = cursor_files[lang]if os.path.exists(f):with open(f, "r", encoding="utf-8") as file:cur = file.read().strip()if cur:return curreturn "*"def save_cursor(lang, cursor):f = cursor_files[lang]with open(f, "w", encoding="utf-8") as file:file.write(cursor)def save_reviews(lang, reviews):f = data_files[lang]file_exists = os.path.exists(f)with open(f, "a", encoding="utf-8-sig", newline="") as file:writer = csv.writer(file)if not file_exists:writer.writerow(["username", "recommend", "hours", "comment", "votes_up", "votes_funny", "timestamp"])for r in reviews:ts = r.get("timestamp_created", 0)# 时间过滤,只保存目标时间范围内的评论if ts < start_ts or ts > end_ts:continuewriter.writerow([r.get("author", {}).get("steamid", ""),r.get("voted_up"),r.get("author", {}).get("playtime_forever", 0)/60,r.get("review"),r.get("votes_up"),r.get("votes_funny"),datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")])def fetch_reviews(appid, cursor, lang):url = f"https://store.steampowered.com/appreviews/{appid}"params = {"json": "1","filter": "recent","language": lang,"day_range": "16", # 取最近30天评论,保证覆盖目标时间段"review_type": "all","purchase_type": "all","cursor": cursor,"num_per_page": batch_size,}headers = {"User-Agent": "Mozilla/5.0"}proxies = {"http": "http://127.0.0.1:7890","https": "http://127.0.0.1:7890"}try:resp = requests.get(url, params=params, headers=headers,proxies=proxies, timeout=10)resp.raise_for_status()return resp.json()except Exception as e:print(f"请求失败 ({lang}):", e)return Nonedef crawl_language(lang):print(f"开始爬取语言:{lang}")cursor = load_cursor(lang)total = 0while total < max_reviews_per_lang:data = fetch_reviews(appid, cursor, lang)if not data:print("请求失败,稍后重试...")time.sleep(5)continuereviews = data.get("reviews", [])if not reviews:print("无更多评论,结束爬取", lang)break# 过滤时间后实际保存的评论数count_before = totalsave_reviews(lang, reviews)# 统计有效评论数量(时间范围内)filtered = [r for r in reviews if start_ts <= r.get("timestamp_created", 0) <= end_ts]total += len(filtered)print(f"{lang}: 已保存 {total} 条评论")cursor = data.get("cursor")if not cursor:print(f"{lang}无游标,结束爬取")breaksave_cursor(lang, cursor)time.sleep(1)print(f"{lang}爬取完成,总共保存 {total} 条评论")def main():for lang in ["schinese"]:crawl_language(lang)if __name__ == "__main__":main()
对于这个代码,我增加了几点功能。
1.分语言爬取,第一个代码是没有对玩家评测语言进行分类的,那么就是默认为all,所有的可能语言评论都会被爬取,这个语言---你可以去参考Steam API。应该是开源出来了。
2.分时间段爬取,有一个缺点就是,也是从你指定的时间段开始,但是你的爬取条数会影响到你最终结束的那个评论的时间。比如说,你爬取1000条,从5月30-6月5日,这个时间段的评论数远大于1000的时候,那么你最终的那个结束点,它不会是5月30号的。这个要注意,你可以给很大的值,我实测过,只要它的爬取个数一直没有发生改变的时候,那么你直接终止程序就ok了,不会有什么影响。
获取Steam登录Cookie的脚本
在多分享一个,就是如果你想使用的是Selenium+BeautifulSoup组合,那么你应该是要模拟登录网站的,但是频繁登录就很容易被对方网站给Ban掉,通过加入一个初次登录时的Cookie,不但方便登录而且不那么容易被网站察觉你在爬。
import json
import time
import undetected_chromedriver as uc# 创建 Chrome 浏览器实例
options = uc.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
# 不使用 headless,确保你能手动完成登录
# options.add_argument("--headless") # 不要加这行,手动登录用不到driver = uc.Chrome(options=options)try:# 打开 Steam 登录页面driver.get("https://store.steampowered.com/login/")print("🧭 请手动登录 Steam... 登录完成后不要关闭窗口。")# 等待你手动登录(比如扫码、输入验证码等)input("✅ 登录完成后,按下 Enter 继续...")# 获取 cookiescookies = driver.get_cookies()# 保存 cookies 到本地文件with open("steam_cookies.json", "w", encoding="utf-8") as f:json.dump(cookies, f, indent=2)print("✅ Cookie 已成功保存为 steam_cookies.json")finally:driver.quit()
这个就不做太多解释,应该很容易看懂。
以上的代码都是经过我实际操作过,没有特殊的环境要求配置,你只要把这些库给pip install下来基本上就ok了。
由于是第一次爬虫,所以还有很多地方不太清楚,如果你是想获取动态JavaScript数据的话,那就得去学习逆向了。