当前位置: 首页 > web >正文

【Python Cookbook】迭代器与生成器(四)

迭代器与生成器(四)

  • 13.创建数据处理管道
  • 14.展开嵌套的序列
  • 15.顺序迭代合并后的排序迭代对象
  • 16.迭代器代替 while 无限循环

13.创建数据处理管道

你想以数据管道(类似 Unix 管道)的方式迭代处理数据。

比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。

生成器函数是一个实现管道机制的好办法。为了演示,假定你要处理一个非常大的日志文件目录:

foo/access-log-012007.gzaccess-log-022007.gzaccess-log-032007.gz...access-log-012008
bar/access-log-092007.bz2...access-log-022008

假设每个日志文件包含这样的数据:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:

import os
import fnmatch
import gzip
import bz2
import redef gen_find(filepat, top):'''在目录树中查找所有匹配指定通配符模式的文件名。'''for path, dirlist, filelist in os.walk(top):           # 使用 os.walk(top) 递归遍历 top 目录及其子目录。for name in fnmatch.filter(filelist, filepat):     # 对于每个目录,筛选出匹配 filepat 的文件名。yield os.path.join(path, name)                  # 使用 yield 生成每个匹配文件的完整路径(拼接目录路径和文件名)。def gen_opener(filenames):'''按顺序打开一系列文件,每次生成一个文件对象。文件会在下一次迭代前关闭。'''for filename in filenames:if filename.endswith('.gz'):f = gzip.open(filename, 'rt')elif filename.endswith('.bz2'):f = bz2.open(filename, 'rt')else:f = open(filename, 'rt')yield ff.close()def gen_concatenate(iterators):'''将多个迭代器连接成一个单一的序列。'''for it in iterators:yield from itdef gen_grep(pattern, lines):'''在行序列中搜索匹配正则表达式的行。'''pat = re.compile(pattern)for line in lines:if pat.search(line):yield line

现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单词 python 的所有日志行,你可以这样做:

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)      # (?i)是正则表达式的忽略大小写标志,因此会匹配 python、Python、PYTHON 等。
for line in pylines:print(line)

如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))
  • line.rsplit(None, 1)
    • None 表示按任意空白字符(空格、制表符等)分割。
    • 1 表示最多分割一次,从右侧开始分割。
    • 例如:"1234 python 512".rsplit(None, 1)['1234 python', '512']
  • [1] 取分割后的最后一列(如'512')。
  • bytecolumn 是一个生成器表达式,生成所有行的最后一列值。

示例输出

# 输入pylines:
# ["1234 python 512", "42 PYTHON 1024", "python - 2048"]
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
# 生成的bytecolumn内容:
# ['512', '1024', '2048']
# 输入bytecolumn:
# ['512', '1024', '2048']
bytes = (int(x) for x in bytecolumn if x != '-')
# 生成的bytes内容:
# 512, 1024, 2048

以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。

为了理解上述代码,重点是要明白 yield 语句作为数据的生产者,而 for 循环语句作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元素传递给迭代处理管道的下一阶段。在例子最后部分,sum() 函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。

这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就很容易编写和维护它们了。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

使用这种方式的内存效率也不得不提。上述代码即便是在一个超大型文件目录中也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。

在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。itertools.chain() 函数同样有类似的功能,但是它需要将所有可迭代对象作为参数传入。

在上面这个例子中,你可能会写类似这样的语句 lines = itertools.chain(*files),这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 chain() 在这里不能这样使用。上面的方案可以避免这种情况。

files = gen_opener(['a.txt', 'b.gz'])  # 生成器,每次 yield 一个文件对象
lines = itertools.chain(*files)        # 错误!files 会被全部消费,文件可能已关闭
for line in lines:                     # 实际迭代时文件已关闭,可能报错print(line)
files = gen_opener(['a.txt', 'b.gz'])  # 生成器,每次 yield 一个文件对象
lines = gen_concatenate(files)         # 惰性拼接,按需打开文件
for line in lines:                     # 安全迭代print(line)

gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。

最后还有一点需要注意的是,管道方式并不是万能的。有时候你想立即处理所有数据。然而,即便是这种情况,使用生成器管道也可以将这类问题从逻辑上变为工作流的处理方式。

14.展开嵌套的序列

你想将一个多层嵌套的序列展开成一个单层列表。

可以写一个包含 yield from 语句的递归生成器来轻松解决这个问题。比如:

from collections import Iterabledef flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):yield from flatten(x)else:yield xitems = [1, 2, [3, 4, [5, 6], 7], 8]
# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):print(x)

在上面代码中, isinstance(x, Iterable) 检查某个元素是否是可迭代的。如果是的话, yield from 就会返回所有子例程的值。最终返回结果就是一个没有嵌套的简单序列了。

额外的参数 ignore_types 和检测语句 isinstance(x, ignore_types) 用来将字符串和字节排除在可迭代对象外,防止将它们再展开成单个的字符。这样的话字符串数组就能最终返回我们所期望的结果了。比如:

>>> items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
>>> for x in flatten(items):
...     print(x)
...
Dave
Paula
Thomas
Lewis
>>>

语句 yield from 在你想在生成器中调用其他生成器作为子例程的时候非常有用。如果你不使用它的话,那么就必须写额外的 for 循环了。比如:

def flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):for i in flatten(x):yield ielse:yield x

尽管只改了一点点,但是 yield from 语句看上去感觉更好,并且也使得代码更简洁清爽。

之前提到的对于字符串和字节的额外检查是为了防止将它们再展开成单个字符。如果还有其他你不想展开的类型,修改参数 ignore_types 即可。

最后要注意的一点是, yield from 在涉及到基于协程和生成器的并发编程中扮演着更加重要的角色。

15.顺序迭代合并后的排序迭代对象

你有一系列排序序列,想将它们合并后得到一个排序序列并在上面迭代遍历。

heapq.merge() 函数可以帮你解决这个问题。比如:

>>> import heapq
>>> a = [1, 4, 7, 10]
>>> b = [2, 5, 6, 11]
>>> for c in heapq.merge(a, b):
...     print(c)
...
1
2
4
5
6
7
10
11

heapq.merge 可迭代特性意味着它不会立马读取所有序列。这就意味着你可以在非常长的序列中使用它,而不会有太大的开销。比如,下面是一个例子来演示如何合并两个排序文件:

with open('sorted_file_1', 'rt') as file1, \open('sorted_file_2', 'rt') as file2, \open('merged_file', 'wt') as outf:for line in heapq.merge(file1, file2):outf.write(line)

有一点要强调的是 heapq.merge() 需要 所有输入序列必须是排过序的。特别的,它并不会预先读取所有数据到堆栈中或者预先排序,也不会对输入做任何的排序检测。它仅仅是检查所有序列的开始部分并返回最小的那个,这个过程一直会持续直到所有输入序列中的元素都被遍历完。

16.迭代器代替 while 无限循环

你在代码中使用 while 循环来迭代处理数据,因为它需要调用某个函数或者和一般迭代模式不同的测试条件。能不能用迭代器来重写这个循环呢?

一个常见的 IO 操作程序可能会像下面这样:

CHUNKSIZE = 8192def reader(s):while True:data = s.recv(CHUNKSIZE)if data == b'':breakprocess_data(data)

这种代码通常可以使用 iter() 来代替,如下所示:

def reader2(s):for chunk in iter(lambda: s.recv(CHUNKSIZE), b''):pass# process_data(data)

如果你怀疑它到底能不能正常工作,可以试验下一个简单的例子。比如:

>>> import sys
>>> f = open('/etc/passwd')
>>> for chunk in iter(lambda: f.read(10), ''):
...     n = sys.stdout.write(chunk)
...
nobody:*:-2:-2:Unprivileged User:/var/empty:/usr/bin/false
root:*:0:0:System Administrator:/var/root:/bin/sh
daemon:*:1:1:System Services:/var/root:/usr/bin/false
_uucp:*:4:4:Unix to Unix Copy Protocol:/var/spool/uucp:/usr/sbin/uucico
...
>>>

iter 函数一个鲜为人知的特性是它接受一个可选的 callable 对象和一个标记(结尾)值作为输入参数。当以这种方式使用的时候,它会创建一个迭代器, 这个迭代器会不断调用 callable 对象直到返回值和标记值相等为止。

iter(callable, sentinel) 的用法

  • 功能
    创建一个迭代器,重复调用 callable(可调用对象)直到它返回 sentinel(哨兵值)。
  • 在此代码中
    • callablelambda: f.read(10):每次调用时从文件 f 读取 10 字节。
    • sentinel'':当 f.read(10) 返回空字符串时(表示文件结束),迭代停止。
  • 效果
    将文件分块读取,每次 10 字节,避免一次性加载大文件到内存。

这种特殊的方法对于一些特定的会被重复调用的函数很有效果,比如涉及到 I/O 调用的函数。举例来讲,如果你想从套接字或文件中以数据块的方式读取数据,通常你得要不断重复的执行 read()recv() ,并在后面紧跟一个文件结尾测试来决定是否终止。这节中的方案使用一个简单的 iter() 调用就可以将两者结合起来了。其中 lambda 函数参数是为了创建一个无参的 callable 对象,并为 recvread() 方法提供了 size 参数。

http://www.xdnf.cn/news/9807.html

相关文章:

  • 【Java Web】速通HTML
  • 电机控制选 STM32 还是 DSP?技术选型背后的现实博弈
  • day13 leetcode-hot100-24(链表3)
  • 如何利用categraf的exec插件实现对Linux主机系统用户及密码有效期进行监控及告警?
  • 序列化与反序列化
  • 【电路笔记 TMS320F28335DSP】McBSP 从源时钟得到 生成时钟 CLKG 帧同步信号 FSG
  • 【ARM】【FPGA】【硬件开发】Chapter.1 AXI4总线协议
  • 智能穿戴新标杆:SD NAND (贴片式SD卡)与 SOC 如何定义 AI 眼镜未来技术路径
  • pikachu靶场通关笔记08 XSS关卡04-DOM型XSS
  • uniapp 开发企业微信小程序时,如何在当前页面真正销毁前或者关闭小程序前调用一个api接口
  • 华为OD机试真题——Boss的收入(分销网络提成计算)(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • Hive自定义函数案例(UDF、UDAF、UDTF)
  • kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)
  • PostgreSQL数据库配置SSL操作说明书
  • Java互联网大厂面试:从Spring Boot到Kafka的技术深度探索
  • ai工具集:AI材料星ppt生成,让你的演示更出彩
  • Codeforces Round 1025 (Div. 2)
  • springcloud openfeign 请求报错 java.net.UnknownHostException:
  • 小型语言模型:为何“小”才是“大”?
  • 【Python】3.函数与列表
  • RFID测温芯片助力新能源产业安全与能效提升
  • C++容器进阶:深入解析unordered_map与unordered_set的前世今生
  • python打卡day39
  • 【机器学习基础】机器学习入门核心算法:K均值(K-Means)
  • Spring Boot测试框架全面解析
  • 甘特图 dhtmlxGantt.js UA实例
  • SpringMVC核心原理与前后端数据交互机制详解
  • MultipartEntityBuilder上传文件解决中文名乱码
  • openEuler安装MySql8(tar包模式)
  • 阿里通义实验室突破空间音频新纪元!OmniAudio让360°全景视频“声”临其境