当前位置: 首页 > news >正文

Python备忘

1. 自定义多线程程序:

import concurrent.futures
import threadingclass CustomThreadPool:def __init__(self, max_workers):self.max_workers = max_workersself.pool = concurrent.futures.ThreadPoolExecutor(max_workers)self.running_num = 0self.semaphore = threading.Semaphore(max_workers)def submit(self, fn, *args, **kwargs):self.semaphore.acquire()  # 获取信号量(如果已满则阻塞)self.running_num += 1future = self.pool.submit(fn, *args, **kwargs)future.add_done_callback(self._callback)return futuredef _callback(self, future):self.running_num -= 1self.semaphore.release()  # 释放信号量,允许新任务提交def shutdown(self):self.pool.shutdown()

以上程序实现:使用这个自定义线程池时,当并发任务数达到上限后,新提交的任务会被阻塞,直到有任务完成。

使用程序:

import time
import random
from custom_threadpool import CustomThreadPool  # 假设上面的类保存在 custom_threadpool.py 中# 模拟下载图片的函数(实际应用中可替换为 requests.get 等真实下载逻辑)
def download_image(url):# 模拟网络请求延迟delay = random.uniform(0.5, 2.0)time.sleep(delay)# 模拟下载结果size = random.randint(100, 1000)print(f"下载完成: {url} (大小: {size}KB, 耗时: {delay:.2f}s)")return {"url": url, "size": size, "status": "success"}# 主程序
def main():# 创建一个最大并发数为3的线程池with CustomThreadPool(max_workers=3) as pool:# 模拟10个图片下载任务image_urls = [f"https://example.com/image_{i}.jpg" for i in range(1, 11)]# 提交所有下载任务futures = []for url in image_urls:future = pool.submit(download_image, url)futures.append(future)# 获取所有任务的结果(按完成顺序输出)for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"处理结果: {result['url']} ({result['size']}KB)")except Exception as e:print(f"下载失败: {e}")if __name__ == "__main__":start_time = time.time()main()print(f"总耗时: {time.time() - start_time:.2f}秒")

2.* ** 参数:

def example(a, b, *args, **kwargs):print(f"固定参数:a={a}, b={b}")print(f"位置参数:{args}")print(f"关键字参数:{kwargs}")example(1, 2, 3, 4, x=5, y=6)
# 输出:
# 固定参数:a=1, b=2
# 位置参数:(3, 4)
# 关键字参数:{'x': 5, 'y': 6}

 

http://www.xdnf.cn/news/907939.html

相关文章:

  • CST人工电源网络阻抗计量校准
  • Python打卡训练营学习记录Day46
  • Arch-hyprland常用配置
  • 【Algo】常见组合类数列
  • 在centos7.9重置qcow2 root密码-qcow2忘记密码
  • 《0/1背包》题集
  • 【大厂机试题解法笔记】最差产品奖
  • 大模型编程助手-windsurf
  • 云服务器厂商机房是什么
  • CMOS图像传感器系列--(二)HDR之DAG技术
  • 跟我学c++中级篇——理解类型推导和C++不同版本的支持
  • 旅行商问题(TSP)的 C++ 动态规划解法教学攻略
  • python --导出数据库表结构(pymysql)
  • React从基础入门到高级实战:React 实战项目 - 项目四:企业级仪表盘
  • Profinet 协议 IO-Link 主站网关(三格电子)
  • DDD架构实战 领域层 事件驱动
  • Hive窗口函数RANGE BETWEEN详解:用法、场景与案例(附真实业务案例)
  • spring重试机制
  • 三菱PLC与西门子PLC如何实现485通讯?
  • 关于锁策略的简单介绍
  • echarts柱状图实现动态展示时报错
  • 电子电气架构 --- 什么是功能架构?
  • QT自定义资源管理器
  • 并查集专题
  • 在 Windows 系统上运行 Docker 容器中的 Ubuntu 镜像并显示 GUI
  • 解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
  • Flutter:下拉框选择
  • Langchain4j 整合向量数据库(10)
  • 黑龙江云前沿服务器租用:便捷高效的灵活之选​
  • 【原神 × 二叉树】角色天赋树、任务分支和圣遗物强化路径的算法秘密!