django扩展练习记录
一、Django 中使用 django-apscheduler 实现定时任务
可以方便地管理周期性任务(如每天清理缓存、定时发送邮件等)
1. 安装
pip install django-apscheduler -i https://pypi.tuna.tsinghua.edu.cn/simple #0.7.0
2.添加到应用,python manage.py migrate
# settings.py
INSTALLED_APPS = [# ...'django_apscheduler',
]
python manage.py migrate
执行后,mysql中,多了两个django_apscheduler开头的表。
3. 一个django应用中创建定时任务
可以是一个utils包下,创建一个叫task.py的,我是直接在相关应用下建了一个task.py
也有人也在view.py中, 还有人习惯专门改个app来实现定时任务。
# tasks.py
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
import uuid
# 初始化调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")# --- 定义定时任务函数 ---
def cleanup_temp_data():print("清理临时数据...")def send_daily_report():print("发送日报...")def generate_job_id( jobname):return f"{jobname}_{uuid.uuid4().hex[:6]}"# --- 将任务添加到调度器 ---
scheduler.add_job(cleanup_temp_data,"interval", # 间隔性任务seconds=10, # 每隔 60 秒执行一次# days=1, # 每天执行一次id=generate_job_id("cleanup_job"),# id="cleanup_job",# replace_existing=True, # 允许覆盖同名任务 (即数据库已有id为 cleanup_job,不添加这个就会报错)
)scheduler.add_job(send_daily_report,"cron", # 定时任务(类似 crontab)hour=9, # 每天 9 点执行minute=10, # 加了这行就表示,每天9点 10 分钟id="report_job",replace_existing=True, # 允许覆盖同名任务
)# 注意:这里暂时不启动调度器!在 apps.py 中启动。
# 启动调度器 (这里也启动不了呢)
# scheduler.start()
方式了,在方法上写装饰器@register_job(scheduler,‘interval’,seconds=20,id=‘clenauo_job2’),就不用下方的 scheduler.add_job()部分了
启动项目时的定时任务
4.配置启动
一般定义任务启动是配置在应用中app.py中的,表示启动项目时,来启动定义是任务。
from django.apps import AppConfig
import filelockclass HrunnerConfig(AppConfig):default_auto_field = "django.db.models.BigAutoField"name = "hrunner"# 在 Django 开发环境中,AppConfig.ready() 方法被执行两次是.正常现象,但会导致定时任务重复注册# 生产环境下是没问题的。def ready(self):# 仅在应用加载完成后初始化调度器if not hasattr(self, 'scheduler_started'):from .task import scheduler # 导入当前应用的调度器# 启动调度器scheduler.start()self.scheduler_started = True
5.启动观察第三步代码
python managy runserver 因为开发环境,会导致执行了两次ready。有点点问题。
开发环境下,解决办法。
先清空两个表,然后这里就这样写,但是启动时会报。(原因就是ready执行了两次,第二次报错,故意让第二次没有加成功。。就只有一个任务)
生成环境下。还没尝试
好像,必须要 replace_existing,不然测试环境时,项目启动不起来。
定时任务方式,有两种
后续定时任务
接口添加任务。
定时任务应用(没行)
感觉就把定时任务写死,启动时执行就好
django+ celery异步任务配置
1. 安装celery 和redis
pip install celery redis -i https://pypi.tuna.tsinghua.edu.cn/simple
2. 需要有一个redis服务
3. django项目结构
假设项目名为 myproject,应用名为 myapp:
myproject/
├── myproject/
│ ├── init.py
│ ├── celery.py # 新增 Celery 配置文件
│ ├── settings.py
│ └── urls.py
└── myapp/
├── tasks.py # 存放异步任务
└── …
4. 配置celery
创建 myproject/celery.py
# myproject/celery.py
import os
from celery import Celery# 设置 Django 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')# 创建 Celery 实例
app = Celery('myproject')# 从 Django 配置中读取 Celery 设置(以 CELERY_ 开头的配置)
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现所有 Django 应用中的 tasks.py 文件
app.autodiscover_tasks()
修改 myproject/init.py
# myproject/__init__.py
from .celery import app as celery_app__all__ = ['celery_app'] # 确保 Celery 应用被正确加载
在settings.py中添加配置
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 消息代理用 Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 任务结果存储
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
# 注: TIME_ZONE = 'Asia/Shanghai' # 要和这个时区一致
# 启动celery worker
# celery -A myproject worker --loglevel=info# windows电脑还要添加这个,不然会有点问题
CELERY_WORKER_POOL = 'solo'
# 启动celery worker
# celery -A your_project worker --loglevel=info -P solo
5. 编写异步任务
在应用 myapp 中创建 tasks.py
# myapp/tasks.py
from celery import shared_task@shared_task
def send_welcome_email(email):print(f"模拟发送邮件到 {email}...")# 这里可以写实际发邮件的代码(如调用 Django 的 send_mail)return f"邮件已发送至 {email}"
6. 配置异步任务调用
在视图或任何地方调用
# myapp/views.py
from django.http import JsonResponse
from .tasks import send_welcome_emaildef register(request):email = "user@example.com"# 异步调用任务(.delay() 是快捷方法)send_welcome_email.delay(email)return JsonResponse({"status": "邮件发送中..."})
7.启动服务
a.启动celery worker (注意换成你的项目名)
celery -A myproject worker --loglevel=info
# windows电脑用下面的,并且settings中还要加那个solo
celery -A your_project worker --loglevel=info -P solo
b.启动django服务器
python manage.py runserver
8. 测试
访问注册接口后,观察 Celery Worker 的日志输出:
9. 原理理解
异步调用playwright操控浏览器。
任务方法
from celery import shared_task
from playwright.sync_api import sync_playwright
from django.core.cache import cache@shared_task
def execute_browser_operation( debugger_url, actions):""":param debugger_url: 远程浏览器调试地址(如 http://172.16.1.4:9222):param actions: 操作步骤列表(JSON序列化)"""try:with sync_playwright() as p:# 连接远程浏览器browser = p.chromium.connect_over_cdp(debugger_url)page = browser.contexts[0].pages[0]# 操作浏览器(示例:打开百度)# page.goto("https://www.baidu.com")# page.fill("#kw", "自动化测试")"""action 的结构。[{"type": "navigate", "url": "https://www.baidu.com"},{"type": "fill", "selector": "#kw", "text": "刘亦菲"},{"type": "click", "selector": "#su"},{"type": "select_dropdown", "selector": "#country-select", "value": "china"}]"""# 执行操作步骤result = {'steps': []}for action in actions:if action['type'] == 'navigate':page.goto(action['url'])result['steps'].append(f"导航至 {action['url']}")elif action['type'] == 'fill':page.fill(action['selector'], action['text'])result['steps'].append(f"填写 {action['selector']}")elif action['type'] == 'click':page.click(action['selector'])elif action['type'] == 'select_dropdown':page.select_option(action['selector'], action['value'])# 添加更多操作类型...# 保存截图到媒体目录# screenshot_path = f"media/screenshots/{self.request.id}.png"# page.screenshot(path=screenshot_path)return {'status': 'success',# 'screenshot': screenshot_path,'details': result}except Exception as e:return {'status': 'error', 'message': str(e)}finally:browser.close()
views
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import permissions
from .celery_tasks import execute_browser_operationclass BrowserControlView(APIView):def post(self, request):# 验证参数required_fields = ['debugger_url', 'actions']if not all(field in request.data for field in required_fields):return Response({'error': '缺少必要参数'}, status=400)# 提交异步任务task = execute_browser_operation.delay(debugger_url=request.data['debugger_url'],actions=request.data['actions'])return Response({'task_id': task.id,'status_endpoint': f'/api/task-status/{task.id}/'})# 任务状态查询接口
# myapp/views.py
from celery.result import AsyncResultclass TaskStatusView(APIView):def get(self, request, task_id):task = AsyncResult(task_id)return Response({'task_id': task_id,'status': task.status,'result': task.result if task.ready() else None})
url.py
path('control/', views.BrowserControlView.as_view(), name='browser-control'),path('task-status/<str:task_id>/', views.TaskStatusView.as_view(), name='task-status'),
效果
调接口,可以执行本地浏览器
执行端开放端口方式:
可能需要
# 将本地9222端口暴露到所有网络接口(需管理员权限)
netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=9222 connectaddress=127.0.0.1 connectport=9222
#验证
netsh interface portproxy show all
#删除
netsh interface portproxy delete v4tov4 listenaddress=0.0.0.0 listenport=9222# 启动浏览器
cd "C:\Program Files\Google\Chrome\Application"
chrome.exe --remote-debugging-port=9222 --remote-debugging-address=0.0.0.0 --user-data-dir="C:\playwright_debug"
# 验证 需要看到 (tcp 0.0.0.0:9222别人才能通过ip访问到你)
netstat -ano | findstr :9222开端口
# 查看有没有
netsh advfirewall firewall show rule name=all | findstr "9222"
# 给一个
netsh advfirewall firewall add rule name="Playwright_9222" dir=in action=allow protocol=TCP localport=9222
# 删除
netsh advfirewall firewall delete rule name="Playwright_9222"
bat文件(参考-未成熟1)
@echo off
setlocal enabledelayedexpansion:: 检查管理员权限
net session >nul 2>&1
if %errorlevel% neq 0 (echo 请右键点击此脚本,选择"以管理员身份运行"pauseexit /b 1
):: 1. 设置防火墙规则
echo 正在配置防火墙规则...
netsh advfirewall firewall show rule name="Playwright_9222" >nul 2>&1
if !errorlevel! equ 0 (echo [跳过] 防火墙规则 Playwright_9222 已存在
) else (netsh advfirewall firewall add rule name="Playwright_9222" dir=in action=allow protocol=TCP localport=9222if !errorlevel! neq 0 (echo [错误] 无法创建防火墙规则exit /b 1)echo [成功] 防火墙规则已添加
):: 2. 配置端口转发
echo 正在设置端口代理...
netsh interface portproxy show v4tov4 | findstr "0.0.0.0:9222" >nul
if !errorlevel! equ 0 (echo [跳过] 端口代理规则已存在
) else (netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=9222 connectaddress=127.0.0.1 connectport=9222if !errorlevel! neq 0 (echo [错误] 无法创建端口代理规则exit /b 1)echo [成功] 端口代理已配置
):: 3. 启动 Chrome
echo 正在启动 Chrome 浏览器...
cd /d "C:\Program Files\Google\Chrome\Application"
if not exist "chrome.exe" (echo [错误] 未找到 Chrome 安装路径echo 请检查路径: "C:\Program Files\Google\Chrome\Application"exit /b 1
)if not exist "C:\playwright_debug" (mkdir "C:\playwright_debug"
)
start "" chrome.exe --remote-debugging-port=9222 --remote-debugging-address=0.0.0.0 --user-data-dir="C:\playwright_debug"echo 所有操作已完成
pause
bat2,供参考
@echo off
setlocal enabledelayedexpansion:: 强制管理员权限
if not "%1"=="admin" (powershell start -verb runas '%0' admin & exit):: 清理旧进程和规则
taskkill /im chrome.exe /f >nul 2>&1
netsh interface portproxy delete v4tov4 listenport=9222 >nul 2>&1:: 启动 Chrome(核心修改)
echo 启动 Chrome 调试实例...
cd /d "C:\Program Files\Google\Chrome\Application"
start "" chrome.exe ^
--remote-debugging-port=9222 ^
--remote-debugging-address=0.0.0.0 ^
--user-data-dir="C:\playwright_debug" ^
--no-first-run --disable-extensions:: 延迟等待服务启动
timeout /t 5 /nobreak >nul:: 验证监听状态
echo 当前端口监听状态:
netstat -ano | findstr :9222pause
palywright操作别人电脑调试
from playwright.sync_api import sync_playwrightwith sync_playwright() as p:# 替换为同事电脑的IP地址(如192.168.1.100)# browser = p.chromium.connect_over_cdp("http://127.0.0.1:9222")browser = p.chromium.connect_over_cdp("http://172.16.1.4:9222")default_context = browser.contexts[0] # 获取默认上下文page = default_context.pages[0] # 获取已打开的页面# 操作浏览器(示例:打开百度)page.goto("https://www.baidu.com")page.fill("#kw", "自动化测试")# from playwright.sync_api import sync_playwright
#
# with sync_playwright() as p:
# # 替换为同事电脑的实际IP(如192.168.1.100)
# browser = p.chromium.connect_over_cdp("http://192.168.1.100:9222")
# default_context = browser.contexts[0] # 获取默认上下文
# page = default_context.pages[0] # 获取已打开的页面
#
# # 操作浏览器示例:访问百度并搜索
# page.goto("https://www.baidu.com")
# page.fill("#kw", "自动化测试")
# page.click("#su")
# page.wait_for_timeout(3000) # 等待3秒观察结果