当前位置: 首页 > java >正文

Python常见的面试题

生成器与列表的区别

创建方式

  1. 生成器通过yield关键字和(x for x in nums)创建
  2. 列表通过[]或者list()创建

复用

  1. 生成器只能遍历一次,如果需要多次遍历,需要重新创建;
  2. 列表可以多次遍历,元素永久存在

内存占用

  1. 列表一次性占用
  2. 生成器按需生成元素,占用空间小

元素访问

  1. 列表可以访问任意元素
  2. 生成器只能通过next访问下一个元素

装饰器

装饰器的实现

  1. class Wrapper(object):
    def __init__(self, *args):self.args = argsdef __call__(self, func):print(self.args)@functools.wraps(func)def inner(*args, **kwargs):print("--1--")ret = func(*args, **kwargs)print("--2--")return retreturn inner
    
  2. 闭包
    def wrapper(func):
    @functools.wraps(func)
    def inner(*args, **kwargs):print("---1----")ret = func(*args, **kwargs)print("---2----")return ret
    return inner
    

Python类型的底层实现

Python 的垃圾回收

  1. 引用计数器实现python的内存管理
  2. 标记-清除,解决循环引用的问题
  3. 分代回收:新创建的对象放到第0代,经过一定次数的垃圾回收仍然存在,移到第1代;经过一定次数的垃圾回收仍然存在,移到第2代。不同代的双向链表回收频率不同

Python的深拷贝和浅拷贝

元组

import copya = (1, 2)
b = copy.copy(a)print(id(a)), id(b))d = copy.deepcopy(a)
print(id(a), id(d))# 打印的结果a=b,这是因为元组是不可变类型,在Python中元组对象一旦创建后,元组的元素和地址空间不会再变化,Python为了优化内存的使用,在复制元组时,不会创建新对象,而是引用到同一个对象。同理deepcopy也是如此

列表

import copya = [11, 22]
b = [33, 44]
c = [a, b]
d = copy.copy(c)print(id(c), id(d))
# 打印结果不相等,因为列表是可变类型,在复制列表时,会创建新对象,所以id(c) != id(d)print(id(c[0]), id(d[0])
# 打印结果相等,因为c[0] 和 d[0]都指向了ae = a[:]
# 数组的切片也是浅拷贝
print(id(a), id(e))
# 打印结果不同,e和a是不同的对象a.append(00)
print(a, e)
# 打印结果不同,因为a添加了元素,e没有添加,他们是两个不同的对象

delete、drop、truncate区别

数据恢复

  1. delete在事务提交以前是可以回滚回复的
  2. drop一旦执行无法回滚
  3. truncate一旦执行,无法回滚

性能

  1. delete删大量数据,会产生很多日志,性能较低;
  2. drop会删整个表,不逐行处理,性能高;
  3. truncate会删所有数据,保留表结构,比delete性能好

自增计数器

  1. delete删除的时候,不会重置自增计数器;
  2. drop会把整个表删掉,包括自增计数器,在创建表的时候,会对自增计数器做初始化;
  3. truncate会重置自增计数器,在插入新数据时,会从初始值自增

Python的单例模式

class MyClass(object):_instance = Nonedef __init__(self, value):self.value = valuedef __new__(cls, *args, **kwargs):if not cls._instance:cls._instance = super().__new__(cls)return cls._instancea = MyClass(1)
b = MyClass(2)
print(id(a), id(b))

使用装饰器实现单例模式

def singleton(cls):instances = {}def wrapper(*args, **kwargs):if cls not in instances:instances[cls] = cls(*args, **kwargs)return instances[cls]return wrapper@singleton
class MyClass:def __init__(self):pass# 创建两个实例
obj1 = MyClass()
obj2 = MyClass()print(obj1 is obj2)  # 输出 True,说明两个实例是同一个对象

Python的协程

协程

可以理解成一个线程在不同任务之间游走执行,更加高效的运行程序

实现协程的方法

  1. greenlet
  2. gevent
  3. yield关键字
  4. asycio 模块中的装饰器(python3.4引入)
  5. async,await(python3.5引入)

yield实现

def func1():yield 1yield from func2()yield 2def fun2():yield 3yield 4f1 = func1()
for item in f1:print(item)

asyncio 实现

import asyncio
@asyncio.coroutine
def func1():print(1)yield from asyncio.sleep(2)print(2)@asyncio.coroutine
def func2():print(3)yield from asyncio.sleep(2)print(4)tasks = [asyncio.ensure_future(func1()),asyncio.ensure_future(func2()),
]loop = asyncio.get_event_loop()
loop.run_until_complete(tasks)

遇到IO阻塞会自动切换

async & await

import asyncioasync def func1():print(1)await asyncio.sleep(2)print(2)async def func2():print(3)await asyncio.sleep(2)print(4)tasks = [asyncio.ensure_future(func1()),asyncio.ensure_future(func2()),
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

协程的意义

在一个线程中如果遇到IO等待时间,不会让线程浪费这段时间,会利用空闲的时间去完成其他的任务;

案例:去下载三张图片(网络IO)

  • 普通方式
    import requestsdef download_image(url):print("开始下载:", url)response =requests.get(url)print("下载完成")file_name = url.rsplit("_")[-1]with open(file_name, "wb") as f:f.write(response.content)if __name__ == '__main__':url_list = []for item in url_list:download_image(item)
    
  • 协程方式
    import asyncio
    import aiohttp  # 需要安装:pip install aiohttpasync def fetch(session, url):"""异步获取单个URL内容"""async with session.get(url) as response:return await response.text()async def fetch_all(urls):"""并发获取多个URL内容"""async with aiohttp.ClientSession() as session:tasks = []for url in urls:tasks.append(fetch(session, url))# 并发执行所有任务results = await asyncio.gather(*tasks)return resultsasync def main():"""主函数"""urls = ["https://api.github.com/users/1","https://api.github.com/users/2","https://api.github.com/users/3"]# 记录开始时间import timestart_time = time.time()# 执行异步请求results = await fetch_all(urls)# 打印结果和耗时for url, result in zip(urls, results):print(f"URL: {url}, Length: {len(result)}")print(f"Total time: {time.time() - start_time:.2f} seconds")# 运行主函数
    asyncio.run(main())
    

同步与异步

  1. 同步是一个一个执行;
  2. 异步是一个任务执行起来后不等执行结果,直接开启下一个任务;

异步编程

事件循环

理解成一个死循环,去检测某些代码

任务列表 = 【任务1, 任务2, 任务3, 。。。】while True:可执行任务列表,已完成任务列表  =  去任务列表中检查所有任务,并分类返回for 就绪任务 in 已经准备就绪的任务执行已就绪的任务for 已完成的任务 in 已完成的任务列表在任务列表中移除  已完成的任务如果 任务列表 中的任务都完成了,则终止循环
import asyncioloop = asyncio.get_event_loop()  # 去生成或获取一个事件循环
# 把任务放到任务列表中
loop.run_until_complete()

快速上手

协程函数,定义函数时用async def 函数名声明。
协程对象,协程函数() 执行得到的对象

async def func1():  # 协程函数passresult = func1()  # 协程对象

注意:执行协程函数创建协程对象,函数内部代码不会执行

如果想要执行函数内部代码,需要事件循环+协程对象

import asyncio 
async def func1():print("hello world")result = func1()loop = asyncio.get_event_loop()
loop.run_until_complete(result)  # 事件循环+协程对象,会执行任务# asyncio.run( result )  # python3.7之后的写法

await 关键字

await + 可等待的对象(协程对象,Future对象,task对象 --》io等待)

await,只有等待对象的值得到结果后,才会继续往下执行

import asyncioasync def func():print("hello world")await asyncio.sleep(2)print("end")return "返回值"async def func2():print("hello python")response = await func()  # await + 协程对象print(response)asyncio.run(func2())

task对象

在事件循环中添加多个任务
通过asyncio.create_task(协程对象)创建task对象,这样可以让协程加入事件循环中等待被调度执行。python3.7之前使用ensure_future()函数

import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main start")# 创建task对象,并将task1添加到事件循环中task1 = asyncio.create_task(func())task2 = asyncio.create_task(func())print("main stop")# await task1 等待task1执行ret1 = await task1ret2 = await task2print(ret1, ret2)if __name__ == '__main__':asyncio.run(main())

示例2:

import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main start")task_list = [asyncio.create_task(func(), name='n1'),asyncio.create_task(func(), name='n2')]done, pending = await asyncio.wait(task_list, timeout=None)print(done, pending)  # done 和 pending 都是setif __name__ == '__main__':asyncio.run(main())

asyncio.future对象

task类的基类,task继承future,task对象内部await结果是基于future对象的

示例:

import asyncioasync def main():print("main start")# 获取当前的事件循环loop = asyncio.get_running_loop()# 创建一个任务(future对象),这个任务什么也干future1 = loop.create_future()# 等待任务最终结果(future对象),没有结果就一直等待下去await future1  if __name__ == '__main__':asyncio.run(main())

示例2:

import asyncioasync def set_after(fut):await asyncio.sleep(2)fut.set_result("666")async def main():print("main start")# 获取当前的事件循环loop = asyncio.get_running_loop()# 创建一个任务(future对象),这个任务什么也干future1 = loop.create_future()# 手动设置future任务的最终执行结果,future1就可以结束了await loop.create_task(set_after(future1))# 等待任务最终结果(future对象)data = await future1print(data)if __name__ == '__main__':asyncio.run(main())

task 与 future的区别:future需要手动的设置执行结果,task可以通过绑定协程对象的方式,自动获取执行结果

concurrent.futures.Future对象

使用线程池、进程池实现异步操作时用到的对象

import time
from concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print(value)# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)for i in range(10):fut = pool.submit(func, i)  # pool.submit 拿一个线程去执行func函数print(fut)

交叉使用的场景:80%是基于协程异步编程的,第三方模块mysql(不支持asyncio),这种使用【进程、线程做异步编程】

案例:asyncio + 不支持异步的模块

import asyncio
import requestsasync def download_image(url):print("start downloading", url)loop = asyncio.get_event_loop()future = loop.run_in_executor(None, requests.get, url)response = await futureprint("end downloading")file_name = url.rsplit("_")[-1]with open(file_name, "wb") as f:f.write(response.context)if __name__ == '__main__':url_list = ["https://img1.pconline.com.cn/piclib/200906/24/batch/1/35890/12458079117550d4bsowymr.jpg","https://img1.pconline.com.cn/piclib/200906/24/batch/1/35890/1245807911755o2wdytzdjh.jpg"]tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

异步迭代器: 实现了__aiter____anext__方法的对象,__anext__必须返回一个awaitable对象,async for 会处理异步迭代器的 anext 方法返回的可等待对象,知道触发一个stopAsyncIteration异常

异步迭代对象:必须通过__iter__ 方法返回一个asynchronous iterator

示例:

import asyncioclass Reader(object):def __init__(self):self.count = 0async def readline(self):await asyncio.sleep(1)self.count += 1if self.count == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn valasync def func():  # 需要放到函数里执行async forreader = Reader()async for item in reader:print(item)asyncio.run(func())

异步上下文管理器

此对象通过__aexit____aenter__方法对async with语句中的环境进行控制

示例:

import asyncioclass AsyncContextManager(object):def __init__(self):self.conn = Noneasync def do_something(self):# 异步操作数据库return 666async def __aenter__(self):# 异步连接数据库# self.conn = await asyncio.sleep(2)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):# 异步关闭数据库# self.conn.close()await asyncio.sleep(1)async def main():async with AsyncContextManager() as f:result = f.do_something()print(result)asyncio.run(main())

uvloop

事件循环的替代方案,效率可以提高两倍,性能可以与go接近

安装 pip3 install uvloop

示例:

import asyncio
import uvloopasyncio.set_event_loop(uvloop.EventLoopPolicy())
## ...
asyncio.run()

实战场景

异步操作redis

在使用python代码操作redis时,连接/断开/操作都是网络IO
安装redis异步操作模块pip3 install aioredis

示例:

import asyncio
import aioredisasync def execute(address, password):print("start operation", address)# 网络IO操作:创建redis连接redis = await aioredis.create_redis(address, password=password)# 网咯IO操作,在redis中设置hash值car,内部设置三个键值对,{"car": {"key1": 1, "key2": 2, "key3": 3}}await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 网络IO操作:去redis中取值result = await redis.hmgetall('car', encoding="utf-8")print(result)redis.close()# 网络IO操作:关闭redis连接await redis.wait_closed()print("end", address)asyncio.run(execute("http://localhost:6379", "root!root"))

异步操作mysql

pip3 install aiomysql

示例:

FastApi 异步框架

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uuid
import uvicorn# 创建 FastAPI 应用实例
app = FastAPI(title="用户管理API", version="1.0.0")# 定义数据模型
class User(BaseModel):id: Optional[str] = None  # 可选,创建时自动生成name: strage: intemail: str# 模拟数据库
users_db = []# 根路径 - 返回欢迎信息
@app.get("/")
async def root():return {"message": "欢迎使用用户管理API"}# 获取所有用户
@app.get("/users/", response_model=List[User])
async def get_users():return users_db# 根据ID获取用户
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: str):user = next((u for u in users_db if u.id == user_id), None)if not user:raise HTTPException(status_code=404, detail="用户不存在")return user# 创建新用户
@app.post("/users/", response_model=User, status_code=201)
async def create_user(user: User):# 生成唯一IDuser.id = str(uuid.uuid4())users_db.append(user)return user# 更新用户信息
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: str, updated_user: User):index = next((i for i, u in enumerate(users_db) if u.id == user_id), None)if index is None:raise HTTPException(status_code=404, detail="用户不存在")# 更新用户信息(保持ID不变)updated_data = updated_user.dict(exclude_unset=True)users_db[index] = User(**{**users_db[index].dict(), **updated_data})return users_db[index]# 删除用户
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: str):global users_dbusers_db = [u for u in users_db if u.id != user_id]return None# 启动应用(仅用于开发环境)
if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

redis的持久化

RDB(redis database)

定时将内存中的数据生成二进制快照文件(.rdb),全量保存当前数据状态
优点:

  1. 文件体积小(使用二进制压缩),适合备份和远程传输;
  2. redis做恢复的时候,只需要把整个二进制文件加载到内存中就可以重新构建数据库,恢复快

缺点:

  1. 主进程fork一个子进程做备份操作,有性能开销
  2. 会丢失最后一次快照的内容,比如时间间隔设置为5min,在第四分钟的时候,redis发生故障宕机,这个时候会丢失数据

AOF(appendonly file)

对每条写命令作为日志,以append-only追加的方式写入一个日志文件中,在redis重启时,可以通过重放AOF日志里的写入指令来重新构建整个数据集
优点:适用于对数据安全要求较高的场景
缺点:

  1. 文件体积大,可能包含冗余记录(多次写同一个key),需要AOF日志重写,清除冗余记录;
  2. redis恢复慢,需要把整个文件中的写命令全部重放才能恢复

生产环境做法

  1. RDB+AOF的混合模式,RDB作为AOF的前缀,在 AOF 重写时,先写入 RDB 快照内容,再追加后续的写命令日志
  2. 优点:
    • 兼具 RDB 的快速恢复(快照部分)和 AOF 的少量增量日志(仅记录快照后的操作)。
    • 减少 AOF 文件体积,提升恢复速度。

缓存击穿、缓存穿透、缓存雪崩

缓存击穿

  1. 某个key过期时,大量的并发请求绕过缓存服务器,直接访问到了数据库。增加了数据库负载。
  2. 异步定时更新可以解决这类问题

缓存穿透

  1. 大量请求访问key时在缓存中没找到,在数据库中也没找到
  2. 缓存空值,需要设置超时时间;
  3. 布隆过滤器,把某个元素加入布隆过滤器中,然后再去查询某个元素是否一定不在布隆过滤器中,

缓存雪崩

  1. 大量的key同时过期,或者某个redis节点宕机,导致大量的请求直接访问到了数据库
  2. 设置不同的过期时间;
  3. redis做集群部署
http://www.xdnf.cn/news/10144.html

相关文章:

  • vueflow
  • 【仿生机器人】需求案例
  • EWM108-GN06B系列BDS单北斗卫星定位模块产品简介
  • [IMX] 10.串行外围设备接口 - SPI
  • win32相关(创建线程)
  • 多线程(3)
  • MySQL中怎么看是否走了索引
  • 数据库中求最小函数依赖集-最后附解题过程
  • EMQX服务
  • DALI DT6与DALI DT8介绍
  • PlankAssembly 笔记 DeepWiki 正交视图三维重建
  • redis缓存与数据库协调读写机制设计
  • JAVA 集合进阶 泛型类、泛型方法、泛型接口
  • 【算法训练营Day03】链表part1
  • 随笔笔记记录5.28
  • 说一说SAP系统从Non-Unicode到Unicode的演化
  • 674SJBH校园外卖订餐系统V3
  • OpenLayers 图形绘制
  • 卫星地图 App 的实测体验深度解析
  • DeepSeek 赋能工业互联网:设备预测性维护的智能革新之路
  • 突破铁芯CT局限:罗氏线圈的“无磁饱和”技术深度解读
  • 身份证信息OCR识别提取
  • NIO知识点
  • ORM 框架的优缺点分析
  • QSS 的选择器
  • 端午时节,粽香四溢
  • 国密算法简述
  • 【DAY34】GPU训练及类的call方法
  • 从门店到移动端:按摩服务预约系统的架构演进与实践
  • 32、请求处理-【源码分析】-各种类型参数解析原理