「从 0 到 1」的 Python-requests 爬虫完整教程
「从 0 到 1」的 Python-requests 爬虫完整教程
所有代码均可直接复制成独立文件运行,逐行中文注释、先本地起服务再爬虫的闭环思路不变。
你可以按需拆分成 12 个文件,也可整包丢进一个仓库当教程。
──────────────────
0️⃣ 第 0 章:准备工作(一次性)
──────────────────
# 仅首次执行
pip install flask requests beautifulsoup4 lxml pandas openpyxl mitmproxy grequests aiohttp
本案例使用 uv 管理项目故而本人使用的是以下方式安装依赖
uv add flask requests beautifulsoup4 lxml pandas openpyxl mitmproxy grequests aiohttp
──────────────────
1️⃣ 第 1 章:爬取纯静态 HTML
──────────────────
📁 文件 1:01_fake_site.py
"""
01_fake_site.py
启动后浏览器访问 http://127.0.0.1:5000/static 即可看到页面
----------------------------------------------------------
思路与过程:
1. 使用 Flask 快速启动本地 Web 服务,端口 5000。
2. 将整段 HTML 以长字符串形式写在代码里,省去模板文件。
3. 路由 /static 直接返回该字符串,浏览器即可渲染。
4. 运行脚本后保持终端开启,浏览器访问验证效果。
"""from flask import Flask, render_template_string# 创建 Flask 实例
app = Flask(__name__)# 将示例页面写成多行字符串,方便阅读与修改
STATIC_HTML = """
<!doctype html>
<html><head><meta charset="utf-8"><title>静态示例</title></head><body><h1>今日热榜</h1><ul><li><a href="https://news.com/1">新闻1</a></li><li><a href="https://news.com/2">新闻2</a></li><li><a href="https://news.com/3">新闻3</a></li></ul></body>
</html>
"""@app.route("/static")
def static_page():"""处理 /static 路由:直接返回静态 HTML 字符串,Flask 会自动设置 Content-Type: text/html。"""return STATIC_HTMLif __name__ == "__main__":# debug=True 开启调试模式,代码改动后自动重载# host='127.0.0.1' port=5000 为默认值,可省略app.run(debug=True) # 运行后请保持终端开启
📁 文件 2:01_crawl_static.py
"""
01_crawl_static.py
爬取本地 5000 端口的 /static 路由
思路:
1. 目标地址为 http://127.0.0.1:5000/static;
2. 伪造浏览器 UA 与 Accept 头,绕过最简单的反爬;
3. 使用 requests 发送 GET 请求并设置超时;
4. 检查响应状态码,非 200 直接抛异常;
5. 自动推断网页编码防止中文乱码;
6. 用 BeautifulSoup 解析 HTML;
7. 通过 CSS 选择器提取所有文章标题与链接;
8. 打印验证结果。
"""import requests # 发送 HTTP 请求
from bs4 import BeautifulSoup # 解析 HTML# 1. 目标页面(本地 Flask 服务 /static 路由)
url = "http://127.0.0.1:5000/static"# 2. 伪装成常见浏览器,降低被服务器拒绝的概率
headers = {# "User-Agent": (# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "# "AppleWebKit/537.36 (KHTML, like Gecko) "# "Chrome/139.0.0.0 Safari/537.36"# ),"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
}# 3. 发送 GET 请求,并设置 30 秒超时
resp = requests.get(url, headers=headers, timeout=30)# 4. 若状态码不是 200,直接抛出 HTTPError 异常
resp.raise_for_status()# 5. 自动推断编码(比手动指定 utf-8 更健壮)
resp.encoding = resp.apparent_encoding# 6. 使用 lxml 解析器(速度快,需提前 pip install lxml)
# 如果未安装 lxml,可改用内置 html.parser
soup = BeautifulSoup(resp.text, "lxml")
# soup = BeautifulSoup(resp.text, "html.parser") # 无需额外依赖# 7. 提取所有文章标题与链接
news_list = [] # 用于存放 (标题, 链接) 的元组
for a in soup.select("ul li a"): # 选中 ul 下所有 li 内的 a 标签title = a.text.strip() # 获取文本并去除首尾空白link = a["href"] # 获取 href 属性值news_list.append((title, link))# 8. 打印验证抓取结果
for title, link in news_list:print(title, "->", link)
运行顺序:
终端1: python 01_fake_site.py
终端2: python 01_crawl_static.py
──────────────────
2️⃣ 第 2 章:分页抓取
──────────────────
📁 文件 3:02_fake_site.py
"""
02_fake_site.py
访问 /page/<页码> 返回对应分页
--------------------------------------------------
思路与过程:
1. 使用 Flask 快速启动本地 Web 服务,端口 5000。
2. 采用 Jinja2 的 render_template_string,把整段 HTML 模板写进代码,既省掉外部模板文件,又能利用模板语法灵活渲染。
3. 路由 /page/<int:page> 接收整型页码参数;模板内根据 page 的值循环生成 5 篇文章链接,并在 page<3 时显示“下一页”按钮,实现简单分页。
4. 运行脚本后保持终端开启;浏览器访问 /page/1、/page/2、/page/3 验证效果。
"""from flask import Flask, render_template_string# 创建 Flask 实例
app = Flask(__name__)# 将分页模板写成多行字符串,嵌入 Jinja2 语法
PAGE_TEMPLATE = """
<!doctype html>
<html>
<head><meta charset="utf-8"><title>第{{ page }}页</title>
</head>
<body><!-- 页面标题,显示当前页码 --><h1>文章列表 - 第 {{ page }} 页</h1><!-- 用 Jinja2 的 for-loop 动态生成 5 条文章链接 --><ul>{% for i in range(1, 6) %}<li><a href="/detail/{{ page }}_{{ i }}">文章 {{ page }}-{{ i }}</a></li>{% endfor %}</ul><!-- 如果当前页小于 3,则显示“下一页”链接 -->{% if page < 3 %}<a href="/page/{{ page + 1 }}">下一页</a>{% endif %}
</body>
</html>
"""@app.route("/page/<int:page>")
def page_view(page):"""动态路由:/page/<int:page>接收整数 page,渲染模板并传入当前页码。"""# 使用 render_template_string 直接渲染字符串模板return render_template_string(PAGE_TEMPLATE, page=page)if __name__ == "__main__":# debug=True 开启调试模式,代码改动后自动重载app.run(debug=True)
📁 文件 4:02_crawl_paging.py
"""
02_crawl_paging.py
自动翻页直到没有「下一页」
思路:
1. 从第 1 页开始抓取文章标题列表;
2. 抓取当前页所有文章标题(ul li a);
3. 查找是否存在“下一页”按钮;
4. 若存在,则拼接绝对 URL 继续爬取下一页;
5. 若不存在,则停止循环。
"""import requests # 用于发送 HTTP 请求
from bs4 import BeautifulSoup # 用于解析 HTML
from urllib.parse import urljoin # 将相对路径拼接成绝对 URL# 基础域名,后续所有相对路径都会基于它进行拼接
BASE = "http://127.0.0.1:5000"def crawl_page(page_url):"""抓取单个分页的内容:param page_url: 当前页的完整 URL:return: (articles, next_url)articles: 本页文章标题列表next_url: 如果有“下一页”,返回其绝对 URL;否则返回 None"""# 发起 GET 请求,设置超时 30 秒,防止长时间无响应resp = requests.get(page_url, timeout=30)# 手动指定编码为 utf-8,防止中文乱码resp.encoding = "utf-8"# 使用 lxml 解析器构建 BeautifulSoup 对象soup = BeautifulSoup(resp.text, "lxml")# 通过 CSS 选择器定位所有文章标题# 选择器 "ul li a" 表示:ul 标签下的 li 标签内的 a 标签articles = [a.text.strip() for a in soup.select("ul li a")]# 查找文本内容为“下一页”的 a 标签next_tag = soup.find("a", string="下一页")# 如果找到,则将其相对 href 拼接成绝对 URL;否则返回 Nonenext_url = urljoin(BASE, next_tag["href"]) if next_tag else Nonereturn articles, next_urlif __name__ == "__main__":# 从第一页开始start = BASE + "/page/1"page = 1 # 页码计数器,仅用于打印提示# 当 start 为 None 时停止循环while start:print(f"===== 第 {page} 页 =====")# 抓取当前页内容,并返回下一页 URLarts, start = crawl_page(start)# 打印本页所有文章标题for art in arts:print(art)# 页码自增page += 1
──────────────────
3️⃣ 第 3 章:POST 表单登录
──────────────────
📁 文件 5:03_fake_site.py
"""
03_fake_site_no_session.py
无 session 版登录→传值→受保护页面
--------------------------------------------------
思路与过程:
1. 完全去掉 Flask 的 session 机制,改用 URL 查询参数 + 签名验证。
2. /login 路由:• GET:显示表单• POST:校验用户名密码• 成功后生成一次性 token(此处为固定值 demo_token)• 将 token 通过查询参数 /profile?token=demo_token 重定向
3. /profile 路由:• 从 request.args 取 token• 校验 token 有效性• 有效则显示私密数据;无效返回 401
4. 由于 token 随 URL 携带,刷新页面不会丢失,但比 session 安全性低。生产环境需用带时效、签名的 JWT 或一次性随机 token。
"""from flask import Flask, request, redirect, url_for# 创建 Flask 应用
app = Flask(__name__)# 为了演示,使用固定 token;真实环境应使用带签名的随机 token
VALID_TOKEN = "demo_token"@app.route("/login", methods=["GET", "POST"])
def login():"""登录逻辑:GET -> 显示表单POST -> 验证账号密码并重定向到 /profile?token=xxx"""if request.method == "POST":username = request.form.get("username")password = request.form.get("password")# 固定校验if username == "admin" and password == "123456":# 登录成功 → 把 token 放在查询参数中跳转return redirect(url_for("profile", token=VALID_TOKEN))else:return "登录失败"# GET 请求返回表单return """<form method="post">用户名:<input name="username"><br>密 码:<input type="password" name="password"><br><button>登录</button></form>"""@app.route("/profile")
def profile():"""受保护页面:通过 ?token=xxx 获取令牌,验证后返回私密数据"""token = request.args.get("token")# 简单校验 tokenif token != VALID_TOKEN:return "未授权", 401# 合法令牌 → 显示数据return "欢迎 admin!这是你的私密数据:余额 999 元"if __name__ == "__main__":# 启动服务app.run(debug=True)
📁 文件 6:03_crawl_login.py
"""
03_crawl_login_no_session.py
用 requests 完成无 session 的登录 → 抓取 /profile 数据
--------------------------------------------------
思路与过程:
1. 目标站点:http://127.0.0.1:5000
2. 登录页:/login (POST 表单)
3. 受保护页:/profile(带 token 查询参数)
4. 步骤:① 先 POST /login 提交用户名密码② 登录成功后,Flask 会 302 重定向到/profile?token=demo_token③ 从重定向后的 URL 里提取 token(或直接 GET 带 token 的 URL)④ 最终得到私密数据
"""import requests
from urllib.parse import parse_qs, urlparse# 全局常量
BASE_URL = "http://127.0.0.1:5000"
LOGIN_URL = f"{BASE_URL}/login"
PROFILE_URL = f"{BASE_URL}/profile"# 1. 创建 Session(方便管理 Cookie 与重定向)
sess = requests.Session()# 2. 发送 POST 登录请求
payload = {"username": "admin", "password": "123456"}
login_resp = sess.post(LOGIN_URL, data=payload, allow_redirects=True)# 3. 获取重定向后的最终 URL,并解析出 token
final_url = login_resp.url
parsed = urlparse(final_url)
token = parse_qs(parsed.query).get("token", [None])[0]if not token:print("未检测到 token,登录失败")exit()# 4. 显式访问带 token 的 /profile 页面(也可直接用 final_url)
profile_resp = sess.get(PROFILE_URL, params={"token": token})# 5. 打印结果
print("状态码:", profile_resp.status_code)
print("响应内容:", profile_resp.text)
──────────────────
4️⃣ 第 4 章:抓取 JSON API → Excel
──────────────────
📁 文件 7:04_fake_api.py
"""
04_fake_api.py
优化要点:
1. 成绩使用 random.randint(60, 100),保证区间闭 [60,100]。
2. 常量化:PER_PAGE、TOTAL 等,方便后续调整。
3. 增加错误处理:若 page 参数非法,统一返回 400。
4. 逐行中文注释,代码块清晰易读。
"""from flask import Flask, jsonify, request
import random# ---------------- 初始化 Flask ----------------
app = Flask(__name__)# ---------------- 数据准备 ----------------
TOTAL = 100 # 总学生数
PER_PAGE = 10 # 每页条数# 使用列表推导式一次性生成学生数据
# id: 1~TOTAL
# name: 字符串拼接
# score: 随机整数 60~100(闭区间)
STUDENTS = [{"id": i, "name": f"学生{i}", "score": random.randint(60, 100)}for i in range(1, TOTAL + 1)
]# ---------------- 路由 ----------------
@app.route("/api/students")
def api_students():"""GET /api/students?page=1返回指定页的学生 JSON。"""# 1. 获取 page 参数,缺省为 1,并转为 inttry:page = int(request.args.get("page", 1))except ValueError:# 非整数参数一律返回 400return jsonify({"error": "page must be an integer"}), 400# 2. 计算分页起始索引start = (page - 1) * PER_PAGEend = start + PER_PAGE# 3. 如果起始索引超出范围,返回空列表即可data = STUDENTS[start:end]# 4. 返回 JSON,Flask 会自动设置 Content-Type: application/jsonreturn jsonify(data)# ---------------- 入口 ----------------
if __name__ == "__main__":# debug=True 便于开发时热重载与调试app.run(debug=True)
📁 文件 8:04_crawl_api.py
"""
04_crawl_api.py
爬完所有分页并保存 Excel
思路:
1. 目标 API:http://127.0.0.1:5000/api/students;
2. 使用 GET 参数 page 进行分页遍历;
3. 每次拿到 JSON 列表,若无数据则停止;
4. 将所有分页数据合并到 all_data;
5. 用 pandas 生成 DataFrame 并导出到 students.xlsx;
6. 打印总记录数,提示用户保存成功。
"""import requests # 发送 HTTP 请求
import pandas as pd # 处理表格并保存 Excel# API 根地址
API = "http://127.0.0.1:5000/api/students"# 用于存放所有分页数据的列表
all_data = []# 页码从 1 开始
page = 1# 无限循环,直到某页返回空列表
while True:# 1. 发送 GET 请求,携带 page 参数,超时 30 秒resp = requests.get(API, params={"page": page}, timeout=30)# 2. 将响应内容解析为 JSON(列表)data = resp.json()# 3. 如果返回空列表,说明已到达最后一页,跳出循环if not data:break# 4. 把当前页数据追加到总列表all_data.extend(data)# 5. 页码 +1,继续下一页page += 1# 6. 用 pandas 把列表转成 DataFrame
df = pd.DataFrame(all_data)# 7. 保存为 Excel 文件,不带行索引
df.to_excel("students.xlsx", index=False)# 8. 提示用户保存成功并打印总记录数
print("已保存 students.xlsx,共", len(all_data), "条记录")
──────────────────
5️⃣ 第 5 章:POST 表单登录(Session 版)
──────────────────
📁 文件 9:05_fake_site.py
"""
05_fake_site.py
POST /login 校验 alice/wonder
--------------------------------------------------
思路与过程:
1. 使用 Flask 启动本地服务,监听 127.0.0.1:5000。
2. 仅当用户名为 alice 且密码为 wonder 时判定登录成功。
3. 登录成功后把用户名写入 session,并重定向到 /dashboard。
4. /dashboard 路由需先检查 session,未登录则跳回 /login。
5. 非法凭证返回 403,提示“账号或密码错误”。
6. 运行脚本后保持终端开启,浏览器访问 /login 验证流程。
"""from flask import Flask, request, session, redirect, url_for# 创建 Flask 应用
app = Flask(__name__)# 设置 session 加密密钥(生产环境请改用随机复杂字符串)
app.secret_key = "session-secret"@app.route("/login", methods=["GET", "POST"])
def login():"""GET -> 渲染登录表单POST -> 校验 alice/wonder,成功后写入 session 并重定向"""# ---------- GET 请求 ----------if request.method == "GET":# 返回简易 HTML 表单,表单 action 指向自己 /loginreturn '''<form action="/login" method="post">用户名:<input name="username"><br>密 码:<input type="password" name="password"><br><button>登录</button></form>'''# ---------- POST 请求 ----------user = request.form.get("username") # 取出用户名pwd = request.form.get("password") # 取出密码# 固定账号/密码校验if user == "admin" and pwd == "123456":session["user"] = user # 登录成功,写入 sessionreturn redirect(url_for("dashboard")) # 重定向到仪表盘# 登录失败,返回 403 并附带错误提示return "账号或密码错误", 403@app.route("/dashboard")
def dashboard():"""受保护页面:必须登录才能查看"""# 检查 session 是否存在 userif "user" not in session:return redirect(url_for("login")) # 未登录,跳转到登录页# 已登录,显示欢迎信息return f"欢迎 {session['user']}!这是仪表盘页面。"if __name__ == "__main__":# debug=True 便于开发时热重载app.run(debug=True)
📁 文件 10:05_crawl_post_login.py
"""
05_crawl_post_login.py
用 Session 自动携带 Cookie
--------------------------------------------------
思路与过程:
1. 使用 requests.Session() 创建会话,自动维护 Cookie。
2. 向 /login 发送 POST,携带固定用户名 alice / 密码 wonder。
3. 通过重定向后的 URL 判断是否登录成功(需本地 Flask 返回 302 到 /dashboard)。
4. 登录成功后复用同一 Session 访问 /dashboard,自动携带 Cookie。
5. 打印仪表盘页面源码,验证登录态保持成功。
"""import requests# 登录接口与受保护页面地址
LOGIN_URL = "http://127.0.0.1:5000/login"
DASH_URL = "http://127.0.0.1:5000/dashboard"# 1. 创建会话,后续请求自动携带 Cookie
sess = requests.Session()# 2. 发送 POST 登录请求,超时 10 秒
login_resp = sess.post(LOGIN_URL,data={"username": "admin", "password": "123456"},timeout=10,allow_redirects=True # 默认开启重定向
)# 3. 简单判断:若最终 URL 以 /dashboard 结尾,则视为登录成功
if login_resp.url.endswith("/dashboard"):print("登录成功")
else:print("登录失败")exit()# 4. 复用同一 Session,自动携带登录 Cookie 访问仪表盘
dash_resp = sess.get(DASH_URL, timeout=10)# 5. 打印仪表盘页面源码,验证内容
print(dash_resp.text)
──────────────────
6️⃣ 第 6 章:手动管理 Cookie
──────────────────
📁 文件 11:06_fake_site.py
"""
06_fake_site.py
演示「种下 Cookie → 校验 Cookie」的最小 Flask 示例
--------------------------------------------------
思路与过程:
1. /set_token 路由:把 token=abc123 种进浏览器 Cookie,有效期 1 小时。
2. /private 路由:读取浏览器发来的 Cookie,若 token 等于 abc123 则返回私密数据,否则返回 401 未授权。
3. 运行脚本后保持终端开启,浏览器先访问 /set_token,再访问 /private 验证流程。
"""from flask import Flask, request, make_response# 创建 Flask 应用
app = Flask(__name__)@app.route("/set_token")
def set_token():"""GET /set_token向客户端写入 Cookie:token=abc123,有效期 3600 秒(1 小时)"""# 1. 创建响应对象resp = make_response("已种下 token Cookie")# 2. 在响应头中 Set-Cookie# key="token", value="abc123", max_age=3600 秒resp.set_cookie("token", "abc123", max_age=3600)# 3. 返回响应给浏览器return resp@app.route("/private")
def private():"""GET /private读取请求中的 Cookie,校验 token 是否正确"""# 1. 从请求对象中获取 Cookie 值token = request.cookies.get("token")# 2. 校验 tokenif token == "abc123":# 校验成功,返回私密数据return "私密数据:今天天气真不错"# 3. 校验失败,返回 401 未授权return "未授权", 401if __name__ == "__main__":# 启动服务,debug=True 便于开发调试app.run(debug=True)
📁 文件 12:06_crawl_cookie_manual.py
"""
06_crawl_cookie_manual.py
两种手动带 Cookie 方式
--------------------------------------------------
思路与过程:
1. 使用 requests 发送 GET 请求到 /private;
2. 方式1:在 headers 中直接写 Cookie 字符串;
3. 方式2:用 cookies 参数传入字典,requests 会自动生成 Cookie 头;
4. 两种写法效果等价,打印响应文本验证授权成功。
"""import requests# 受保护页面地址
PRIVATE_URL = "http://127.0.0.1:5000/private"# ---------- 方法1:headers ----------
resp1 = requests.get(PRIVATE_URL,headers={"Cookie": "token=abc123"}, # 直接写原始 Cookie 头timeout=10
)
print("方法1:", resp1.text)# ---------- 方法2:cookies 参数 ----------
resp2 = requests.get(PRIVATE_URL,cookies={"token": "abc123"}, # requests 自动转 Cookie 头timeout=10
)
print("方法2:", resp2.text)
──────────────────
7️⃣ 第 7 章:POST JSON
──────────────────
📁 文件 13:07_fake_api.py
"""
07_fake_api.py
POST /api/search 接收 JSON
--------------------------------------------------
思路与过程:
1. 启动 Flask 本地服务,监听 5000 端口。
2. 仅接受 POST 方法到 /api/search。
3. 先校验 Content-Type 是否为 JSON,否则返回 400。
4. 获取 JSON 体中的 keyword 与 size(默认 5,上限 20)。
5. 根据 keyword 与 size 生成模拟搜索结果列表并返回。
6. 运行脚本后,可用 curl / Postman / Python requests 测试。
"""from flask import Flask, request, jsonify# 创建 Flask 应用
app = Flask(__name__)@app.route("/api/search", methods=["POST"])
def search():"""处理 POST /api/search请求体必须是 JSON,格式:{"keyword": "搜索词","size": 10 # 可选,默认 5,最大 20}返回:{"results": ["搜索词-1", "搜索词-2", ...]}"""# 1. 检查请求体的 Content-Type 是否为 application/jsonif not request.is_json:# 不是 JSON 直接返回 400return jsonify({"error": "需要 JSON"}), 400# 2. 解析 JSON 数据data = request.get_json()# 3. 取出 keyword,缺省为空字符串keyword = data.get("keyword", "")# 4. 取出 size,默认为 5,且限制最大 20size = min(data.get("size", 5), 20)# 5. 根据 keyword 和 size 生成模拟结果列表results = [f"{keyword}-{i}" for i in range(1, size + 1)]# 6. 返回 JSON 响应return jsonify({"results": results})if __name__ == "__main__":# debug=True 开启调试模式,代码改动后自动重载app.run(debug=True)
📁 文件 14:07_crawl_post_json.py
"""
07_crawl_post_json.py
requests.post(..., json=dict) 自动携带 Content-Type: application/json
--------------------------------------------------
思路与过程:
1. 使用 requests.Session() 或全局 requests.post 均可;
2. 通过 json= 参数传入 Python dict,requests 会自动:• 把 dict 序列化为 JSON 字符串• 添加请求头 Content-Type: application/json
3. 设置 timeout 防止长时间阻塞;
4. 打印服务器返回的 JSON 结果验证成功。
"""import requests# 目标接口地址
SEARCH_URL = "http://127.0.0.1:5000/api/search"# 要发送的 JSON 数据
payload = {"keyword": "python", "size": 3}# 发送 POST 请求:json= 自动处理序列化与 Content-Type
resp = requests.post(SEARCH_URL, json=payload, timeout=10)# 解析并打印服务器返回的 JSON
print("服务器返回:", resp.json())
📁 文件 15:07_crawl_post_json_with_header.py
"""
07_crawl_post_json_with_header.py
手动携带 Content-Type: application/json 的 POST 请求
--------------------------------------------------
思路与过程:
1. 与上一版功能完全一致,只是显式写出 headers 字典,把Content-Type: application/json 放进去,便于理解底层细节。
2. 其余步骤与自动 json= 方式相同:• 把 payload 用 json.dumps 转成字符串• 设置 timeout• 打印服务器返回的 JSON
"""import requests
import json # 需要手动序列化# 1. 目标接口
SEARCH_URL = "http://127.0.0.1:5000/api/search"# 2. 待发送的数据
payload = {"keyword": "python", "size": 3}# 3. 显式构造 headers,包含 Content-Type
headers = {"Content-Type": "application/json"
}# 4. 发送 POST 请求:
# data 参数需是 JSON 字符串,因此先用 json.dumps 序列化
resp = requests.post(SEARCH_URL,data=json.dumps(payload), # 手动序列化headers=headers, # 手动指定 Content-Typetimeout=10
)# 5. 解析并打印结果
print("服务器返回:", resp.json())
──────────────────
8️⃣ 第 8 章:反爬绕过
──────────────────
📁 文件 16:08_fake_site.py
"""
08_crawl_antiban.py
反爬策略:UA 轮换、代理池、随机延时、自动重试
--------------------------------------------------
思路与过程:
1. 构建 UA 池、代理池,随机选用,降低被封概率。
2. 使用 requests.Session + urllib3.util.retry.Retry,对 429/503 状态码自动重试(最多 5 次,指数退避)。
3. 每次请求前随机 0.5–1.5 秒延时,模拟人类行为。
4. 主函数循环 10 次演示,真实场景可改为 while True 或调度任务。
"""import random
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry# 1. 代理池(示例为本地假地址,需换成真实可用代理)
PROXY_POOL = ["http://127.0.0.1:8081","http://127.0.0.1:8082",
]# 2. UA 池(主流桌面浏览器)
UA_POOL = ["Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0","Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
]def create_session():"""创建支持自动重试的 Session:return: 已配置好重试策略的 requests.Session 对象"""sess = requests.Session()# 重试策略:最多 5 次;遇到 429/503 时退避重试retry = Retry(total=5, # 总重试次数status_forcelist=[429, 503], # 需要重试的 HTTP 状态码backoff_factor=1 # 退避因子:1 → 1s, 2s, 4s, ...)# 创建带重试的 HTTPAdapter,并挂载到 Sessionadapter = HTTPAdapter(max_retries=retry)sess.mount("http://", adapter)sess.mount("https://", adapter)return sessdef fetch_news():"""单次抓取 /news 接口的封装:- 随机 UA- 随机代理- 随机延时"""# 1. 创建带有重试机制的 Sessionsess = create_session()# 2. 随机选择 UA 与代理headers = {"User-Agent": random.choice(UA_POOL)}proxies = {"http": random.choice(PROXY_POOL)}# 3. 随机延时 0.5–1.5 秒,降低瞬时并发time.sleep(random.uniform(0.5, 1.5))# 4. 发送 GET 请求,超时 10 秒try:resp = sess.get("http://127.0.0.1:5000/news",headers=headers,proxies=proxies,timeout=10)# 打印状态码与 JSON 响应print(resp.status_code, resp.json())except requests.RequestException as e:# 网络或代理异常时打印错误print("请求异常:", e)if __name__ == "__main__":# 演示:连续抓取 10 次for _ in range(10):fetch_news()
📁 文件 17:08_crawl_antiban.py
"""
08_crawl_antiban.py
反爬策略:UA 轮换、代理池、随机延时、自动重试
--------------------------------------------------
思路与过程:
1. 构建 UA 池、代理池,随机选用,降低被封概率。
2. 使用 requests.Session + urllib3.util.retry.Retry,对 429/503 状态码自动重试(最多 5 次,指数退避)。
3. 每次请求前随机 0.5–1.5 秒延时,模拟人类行为。
4. 主函数循环 10 次演示,真实场景可改为 while True 或调度任务。
"""import random
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry# 1. 代理池(示例为本地假地址,需换成真实可用代理)
PROXY_POOL = ["http://127.0.0.1:8080","http://127.0.0.1:8081",
]# 2. UA 池(主流桌面浏览器)
UA_POOL = ["Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0","Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
]def create_session():"""创建支持自动重试的 Session:return: 已配置好重试策略的 requests.Session 对象"""sess = requests.Session()# 重试策略:最多 5 次;遇到 429/503 时退避重试retry = Retry(total=5, # 总重试次数status_forcelist=[429, 503], # 需要重试的 HTTP 状态码backoff_factor=1 # 退避因子:1 → 1s, 2s, 4s, ...)# 创建带重试的 HTTPAdapter,并挂载到 Sessionadapter = HTTPAdapter(max_retries=retry)sess.mount("http://", adapter)sess.mount("https://", adapter)return sessdef fetch_news():"""单次抓取 /news 接口的封装:- 随机 UA- 随机代理- 随机延时"""# 1. 创建带有重试机制的 Sessionsess = create_session()# 2. 随机选择 UA 与代理headers = {"User-Agent": random.choice(UA_POOL)}proxies = {"http": random.choice(PROXY_POOL)}# 3. 随机延时 0.5–1.5 秒,降低瞬时并发time.sleep(random.uniform(0.5, 1.5))# 4. 发送 GET 请求,超时 10 秒try:resp = sess.get("http://127.0.0.1:5000/news",headers=headers,proxies=proxies,timeout=10)# 打印状态码与 JSON 响应print(resp.status_code, resp.json())except requests.RequestException as e:# 网络或代理异常时打印错误print("请求异常:", e)if __name__ == "__main__":# 演示:连续抓取 10 次for _ in range(1000):fetch_news()
用 mitmproxy / Charles / Fiddler 启动真实本地代理 mitmproxy -p 8081 和 mitmproxy -p 8082
──────────────────
9️⃣ 第 9 章:高并发
──────────────────
📁 文件 18:09_fake_slow.py
"""
09_fake_slow.py
访问 /item/<uid> 时强制延迟 0.5 秒,模拟“慢接口”
--------------------------------------------------
思路与过程:
1. 启动 Flask 本地服务,监听 5000 端口。
2. 路由 /item/<int:uid> 接收任何整数 uid。
3. 在视图函数内使用 time.sleep(0.5) 人为阻塞 0.5 秒,模拟数据库查询或外部依赖耗时。
4. 返回 JSON:{"id": uid, "name": "商品{uid}"}。
5. 运行脚本后,浏览器或 curl 访问 /item/123 即可体验延迟。
"""from flask import Flask, jsonify
import time# 创建 Flask 应用
app = Flask(__name__)@app.route("/item/<int:uid>")
def item(uid):"""GET /item/<uid>:param uid: 商品 ID(正整数):return: 延迟 0.5 秒后的 JSON 响应"""# 人为阻塞 0.5 秒,模拟慢接口time.sleep(0.5)# 构造并返回 JSON 数据return jsonify({"id": uid,"name": f"商品{uid}"})if __name__ == "__main__":# debug=True 方便调试;生产环境可关闭app.run(debug=True)
📁 文件 19:09_crawl_grequests.py
"""
09_crawl_grequests.py
使用 grequests 对慢接口进行并发抓取
--------------------------------------------------
思路与过程:
1. 构建 20 条 URL:http://127.0.0.1:5000/item/1 .. /item/20
2. 用生成器表达式把每条 URL 封装成 grequests.AsyncRequest 对象
3. grequests.map 并发执行,最大并发数 size=5(相当于 5 个协程)
4. 遍历返回的 Response 列表,只打印状态码 200 的 JSON 数据
"""import grequests# 1. 生成 20 条 URL
urls = [f"http://127.0.0.1:5000/item/{i}" for i in range(1, 21)]# 2. 创建异步请求生成器(不会立即发送)
rs = (grequests.get(u) for u in urls)# 3. 并发执行,size=5 表示最多同时 5 个协程
for r in grequests.map(rs, size=5):if r and r.ok: # r 可能为 None(异常时)print(r.json()) # 打印返回的 JSON
📁 文件 20:09_crawl_aiohttp.py
"""
09_crawl_aiohttp.py
使用 asyncio + aiohttp 进行高并发抓取
--------------------------------------------------
思路与过程:
1. 构建 20 条 URL:http://127.0.0.1:5000/item/1 .. /item/20
2. 创建全局 TCPConnector,限制总并发 20,单域名并发 10
3. 创建 aiohttp.ClientSession,复用连接
4. 为每条 URL 创建协程任务 fetch
5. asyncio.gather 并发执行并收集结果
6. 打印每条返回的 JSON
"""import asyncio
import aiohttp# 1. 生成 20 条 URL
URLS = [f"http://127.0.0.1:5000/item/{i}" for i in range(1, 21)]async def fetch(session, url):"""单个协程任务:发送 GET 请求并解析 JSON:param session: aiohttp.ClientSession 实例:param url: 待抓取链接:return: JSON 数据"""async with session.get(url) as resp:return await resp.json()async def main():"""主协程:创建会话 → 创建任务 → 并发执行 → 打印结果"""# 2. 创建连接池:总并发 20,单域名并发 10conn = aiohttp.TCPConnector(limit=20, limit_per_host=10)# 3. 创建会话async with aiohttp.ClientSession(connector=conn) as session:# 4. 为每条 URL 创建任务tasks = [asyncio.create_task(fetch(session, u)) for u in URLS]# 5. 并发执行并收集结果for r in await asyncio.gather(*tasks):print(r)if __name__ == "__main__":# 6. 运行事件循环asyncio.run(main())
──────────────────
🔟 第 10 章:合法合规与限流
──────────────────
📁 文件 21:10_fake_limit.py
"""
10_fake_limit.py
演示 robots.txt 声明 + IP 级别速率限制
--------------------------------------------------
思路与过程:
1. 提供 /robots.txt,声明禁止抓取 /slow 路径,符合爬虫礼仪。
2. /slow/<n> 接口:• 每个 IP 在 1 秒内最多允许 2 次请求,超量返回 429。• 使用全局字典 + 列表记录每个 IP 最近的请求时间戳。• 每次请求前清理 1 秒前的旧记录,再判断当前次数。
3. 若未触发 429,则 sleep 1 秒模拟耗时计算,返回 n 的平方。
4. 运行脚本后:• 浏览器/爬虫访问 /robots.txt 可见禁止规则;• 快速刷新 /slow/5 两次以上即可看到 429。
"""from flask import Flask, Response, request
import time# 创建 Flask 应用
app = Flask(__name__)@app.route("/robots.txt")
def robots():"""返回纯文本 robots.txt:User-agent: *Disallow: /slow"""return Response("User-agent: *\nDisallow: /slow\n", mimetype="text/plain")@app.route("/slow/<int:n>")
def slow(n):"""GET /slow/<n>:param n: 整数参数:return: {"id": n, "square": n*n}限速逻辑:同一 IP 1 秒内最多 2 次请求"""# 1. 生成唯一 key,用于存储该 IP 的时间戳列表key = f"rate-{request.remote_addr}"# 2. 获取当前时间戳now = time.time()# 3. 取出或创建该 IP 的时间戳列表call_list = globals().setdefault(key, [])# 4. 清理 1 秒前的旧记录call_list[:] = [t for t in call_list if now - t < 1]# 5. 判断 1 秒内是否已达 2 次if len(call_list) >= 2:return "Too Many Requests", 429# 6. 记录本次请求时间call_list.append(now)# 7. 模拟耗时 1 秒time.sleep(1)# 8. 返回计算结果return {"id": n, "square": n * n}if __name__ == "__main__":# debug=True 便于调试app.run(debug=True)
📁 文件 22:10_crawl_robot_friendly.py
"""
10_crawl_robot_friendly.py
合规并发:先读 robots,再用 Semaphore/TCPConnector 限流
--------------------------------------------------
思路与过程:
1. 先用 urllib.robotparser 读取 /robots.txt,确认目标路径是否被禁止。
2. 若允许抓取,则使用 asyncio + aiohttp:• 全局并发信号量 Semaphore 限制同一时刻最多 CONCUR 个协程;• TCPConnector 的 limit_per_host 再做一层连接池限制;• 每次请求后固定延时 DELAY 秒,避免触发 429。
3. 遇到 429 或异常时记录日志并跳过。
4. 最终并发抓取 TOTAL 条数据。
"""import asyncio
import aiohttp
import logging
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin# 基础地址与抓取配置
BASE = "http://127.0.0.1:5000"
PATH = "/slow"
CONCUR = 2 # 并发协程上限
TOTAL = 5 # 总抓取数量
DELAY = 1.2 # 每次抓取后的固定延时(秒)# 日志格式
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")async def allowed(path):"""读取 robots.txt 并判断给定路径是否允许抓取。:param path: 相对路径,如 "/slow":return: True 允许,False 禁止"""rp = RobotFileParser()rp.set_url(urljoin(BASE, "/robots.txt")) # 拼接完整 URLtry:rp.read() # 下载并解析 robots.txtreturn rp.can_fetch("*", urljoin(BASE, path))except Exception as e:logging.warning("robots.txt 读取失败:%s", e)# 失败时默认允许抓取return Trueasync def fetch_one(session, sem, n):"""单个抓取协程:• 受信号量 sem 控制并发• 固定延时 DELAY 秒• 处理 429 限速:param session: aiohttp.ClientSession:param sem: asyncio.Semaphore:param n: 商品编号:return: dict 或 None"""async with sem: # 并发闸门await asyncio.sleep(DELAY) # 固定延时,降低瞬时压力url = urljoin(BASE, f"{PATH}/{n}")async with session.get(url) as resp:if resp.status == 429:logging.warning("限速 %s", url)return Nonedata = await resp.json()logging.info("成功 %s -> %s", url, data)return dataasync def main():"""主协程:检查 robots → 创建会话 → 并发抓取 → 等待完成"""# 1. 检查 robots.txtif not await allowed(PATH):logging.error("robots 禁止抓取 %s", PATH)return# 2. 创建并发控制工具sem = asyncio.Semaphore(CONCUR) # 协程级并发限制conn = aiohttp.TCPConnector(limit_per_host=CONCUR) # 连接池级限制# 3. 创建 aiohttp 会话async with aiohttp.ClientSession(connector=conn) as session:# 4. 创建任务列表tasks = [asyncio.create_task(fetch_one(session, sem, i))for i in range(1, TOTAL + 1)]# 5. 并发执行并等待全部完成await asyncio.gather(*tasks)if __name__ == "__main__":# 运行事件循环asyncio.run(main())
──────────────────
1️⃣1️⃣ 第 11 章:生产级通用组件
──────────────────
📁 文件 23:11_crawler_base.py
"""
crawler_base.py
通用异步爬虫基类,集成 UA / 代理 / 重试 / robots / 限流
--------------------------------------------------
思路与过程:
1. 对外暴露一个 **BaseAsyncCrawler** 基类,使用者只需继承并重写 parse()。
2. 构造阶段一次性接受:• base_url 根域名• UA 池 / 代理池 随机轮换• sem 信号量 控制并发量• retries 最大重试次数• per_host TCPConnector 的单域名连接上限• timeout 统一超时时间
3. 实现 __aenter__ / __aexit__,保证 async with 语法下自动创建 / 关闭 aiohttp.ClientSession。
4. robots_ok() 方法先读取 /robots.txt,再判断目标 path 是否被禁止。
5. rand_proxy() 随机返回一个代理字典;若代理池为空则返回空字典,退化为直连。
6. fetch() 内部带重试循环:指数退避 + 随机抖动;失败抛 RuntimeError。
7. crawl_one() 对外统一入口:先做 robots 检查,再抓取,再解析。
8. 子类仅需重写 async def parse(self, resp) -> Any,即可完成业务定制。
"""import asyncio
import aiohttp
import logging
import random
from typing import Any, List, Dict
from urllib.robotparser import RobotFileParser
from aiohttp import ClientTimeout# 统一日志格式
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")class BaseAsyncCrawler:"""通用异步爬虫基类"""def __init__(self,base_url: str,*,ua_pool: List[str] = None,proxy_pool: List[str] = None,sem: int = 5,retries: int = 3,per_host: int = 10,timeout: int = 15,):""":param base_url: 站点根地址,如 http://127.0.0.1:5000:param ua_pool: UA 列表,None 时默认简单 UA:param proxy_pool: 代理列表,None 或空列表时直连:param sem: asyncio.Semaphore 并发量:param retries: 最大重试次数:param per_host: aiohttp.TCPConnector 单域名连接上限:param timeout: aiohttp.ClientTimeout 总超时秒数"""self.base = base_url.rstrip("/") # 去掉末尾斜杠,方便拼接self.ua_pool = ua_pool or ["Mozilla/5.0"]self.proxy_pool = proxy_pool or []self.sem = asyncio.Semaphore(sem) # 并发闸门self.retries = retriesself.timeout = ClientTimeout(total=timeout)self.connector = aiohttp.TCPConnector(limit_per_host=per_host)self.session = None # 会在 __aenter__ 中赋值# ---------- 上下文管理器 ----------async def __aenter__(self):"""async with 进入时自动创建 aiohttp ClientSession"""self.session = aiohttp.ClientSession(connector=self.connector,timeout=self.timeout,headers={"User-Agent": random.choice(self.ua_pool)},)return selfasync def __aexit__(self, *args):"""async with 退出时关闭会话"""if self.session:await self.session.close()# ---------- robots.txt 检查 ----------async def robots_ok(self, path: str) -> bool:"""读取并解析 /robots.txt,判断给定路径是否允许抓取:param path: 相对路径,如 /item/1:return: True 允许,False 禁止"""rp = RobotFileParser()rp.set_url(f"{self.base}/robots.txt")try:rp.read() # 下载并解析return rp.can_fetch("*", f"{self.base}{path}")except Exception as e:logging.warning("robots.txt 读取失败:%s", e)return True # 失败时默认允许# ---------- 代理选择 ----------def rand_proxy(self) -> Dict[str, str]:"""随机返回一个代理字典,无代理时返回空"""return {"http": random.choice(self.proxy_pool)} if self.proxy_pool else {}# ---------- 带重试的抓取 ----------async def fetch(self, url: str, **kw) -> aiohttp.ClientResponse:"""带重试的 GET 请求:param url: 完整 URL:param kw: 额外参数,如 params、headers:return: ClientResponse"""for attempt in range(1, self.retries + 1):try:async with self.sem: # 并发闸门async with self.session.get(url,proxy=self.rand_proxy().get("http"),**kw,) as resp:return resp # 成功直接返回except Exception as e:# 指数退避 + 随机抖动await asyncio.sleep(2 ** attempt + random.random())# 重试耗尽raise RuntimeError("max retries reached")# ---------- 解析钩子 ----------async def parse(self, resp: aiohttp.ClientResponse) -> Any:"""子类必须重写,用于把响应转换为业务数据默认抛 NotImplementedError"""raise NotImplementedError# ---------- 对外统一抓取入口 ----------async def crawl_one(self, path: str) -> Any:"""校验 robots → 抓取 → 解析:param path: 相对路径,如 /item/1:return: 经 parse() 处理后的业务数据"""url = f"{self.base}{path}"if not await self.robots_ok(path):logging.info("robots 禁止 %s", url)return Noneresp = await self.fetch(url)return await self.parse(resp)
📁 文件 24:11_demo_use_base.py
"""
11_demo_use_base.py
继承 BaseAsyncCrawler 抓取 /slow 的平方结果
--------------------------------------------------
思路与过程:
1. 从 crawler_base.py 引入 BaseAsyncCrawler。
2. 新建 SquareCrawler 子类,仅重写 parse() 方法:• 从响应 JSON 中提取 "square" 字段并返回。
3. main() 中:• 使用 async with 初始化 SquareCrawler,并发上限 sem=3;• 为 1–10 的 /slow/<n> 创建并发任务;• 过滤 None 结果后打印所有平方值。
4. 运行脚本前请确保 10_fake_limit.py 已启动。
"""import asyncio
from crawler_base import BaseAsyncCrawlerclass SquareCrawler(BaseAsyncCrawler):"""继承基类,只关心如何解析数据"""async def parse(self, resp):"""把响应 JSON 中的 square 字段提取出来:param resp: aiohttp.ClientResponse:return: int 平方值"""data = await resp.json()return data["square"]async def main():"""并发抓取 1–10 的平方"""# 使用 async with 自动管理 ClientSessionasync with SquareCrawler(base_url="http://127.0.0.1:5000",sem=3 # 并发上限 3) as crawler:# 创建 10 个任务tasks = [asyncio.create_task(crawler.crawl_one(f"/slow/{i}"))for i in range(1, 11)]# 并发执行并过滤 Noneresults = [r for r in await asyncio.gather(*tasks) if r is not None]print("抓取结果:", results)if __name__ == "__main__":# 启动事件循环asyncio.run(main())
──────────────────
完结撒花
──────────────────
至此,一份「从 0 到 1」的 Python-requests 爬虫完整教程 全部奉上:
• 本地模拟站点 → 逐行中文注释 → 可直接 python xxx.py
运行
• 覆盖静态、分页、登录、Cookie、JSON、反爬、高并发、合规限流、生产封装等 11 大主题
• 按章节拆文件即可作为课程或脚手架使用
祝爬得开心,合法合规!