当前位置: 首页 > ops >正文

python进程间通信

目录

(1)使用共享内存(Value 和 Array)

1.使用示例

2.支持哪些数据类型

3.数据保护

(2)使用队列(Queue)

1.使用示例

2.不同程序间用Queue通信

3.共享队列如何唯一标识

4.队列缓冲区溢满相关问题

5.为什么要用with管理Manager()

6.全局的Manager 对象

(3)使用共享内存映射(mmap)

1.mmap基础函数

2.简单数据通信(比如字符串)

3.复杂数据通信(单个结构体)

4.复杂数据通信(多个结构体)

5.关于mmapped_file对象的创建和资源释放


Python进程间通信(IPC)通过multiprocessing模块实现,常用方式包括共享内存(Value/Array)、队列(Queue)、管道(Pipe)等,适合多进程数据交换与协同。

(1)使用共享内存(Value 和 Array)

1.使用示例

如果你有两个独立的Python程序,分别运行在两个不同的进程中,并且你希望它们之间进行共享内存映射通信,可以使用 multiprocessing 模块中的 Value 和 Array,以及 multiprocessing.Manager 模块提供的 Value 和 Array。

比如使用multiprocessing.Manager().Value(类型标识符, 初始值)来创建共享的变量,下面是一个例子,其中Value('i', 0)中'i'表示整数,0表示初始值为0。

进程1:program1.py

import multiprocessing
import timedef process_1(shared_value, lock):for _ in range(5):print(f"进程 1:增加共享值")with lock:shared_value.value += 1time.sleep(1)if __name__ == "__main__":# 使用Manager创建共享值with multiprocessing.Manager() as manager:shared_value = manager.Value('i', 0)lock = manager.Lock()  # 创建用于同步的锁# 创建一个进程p1 = multiprocessing.Process(target=process_1, args=(shared_value, lock))# 启动进程p1.start()# 等待进程结束p1.join()# 显示最终的共享值print("程序 1 - 最终的共享值:", shared_value.value)

进程2:program2.py

import multiprocessing
import timedef process_2(shared_value, lock):for _ in range(5):print(f"进程 2:将共享值加倍")with lock:shared_value.value *= 2time.sleep(1)if __name__ == "__main__":# 使用Manager创建共享值with multiprocessing.Manager() as manager:shared_value = manager.Value('i', 0)lock = manager.Lock()  # 创建用于同步的锁# 创建一个进程p2 = multiprocessing.Process(target=process_2, args=(shared_value, lock))# 启动进程p2.start()# 等待进程结束p2.join()# 显示最终的共享值print("程序 2 - 最终的共享值:", shared_value.value)

在这个例子中,两个程序分别通过 multiprocessing.Manager() 创建一个共享的整数 shared_value,并通过两个不同的进程分别对其进行操作。请注意,multiprocessing.Manager() 提供了一种在不同进程之间共享 Python 对象的方式,它会创建一个服务器进程来管理这些对象。这是一种在独立的Python程序之间进行通信的方法。

2.支持哪些数据类型

multiprocessing.Manager 提供了几种用于创建共享变量的方法,其中包括 Value 和 Array。这些方法支持的数据类型包括:

Value 类型:

  • 'i': 整数 (int)
  • 'd': 双精度浮点数 (double)
  • 'b': 布尔值 (bool)
  • 'c': 字符 (char)

Array 类型:

  • 'i': 整数 (int)
  • 'd': 双精度浮点数 (double)
  • 'b': 布尔值 (bool)
  • 'c': 字符 (char)
  • 'I': 无符号整数 (unsigned int)
  • 'f': 单精度浮点数 (float)

这些类型标识符用于指定共享变量的数据类型。在创建共享变量时,你可以根据实际需要选择合适的类型。

下面是一个使用 multiprocessing.Manager 创建不同类型共享变量的示例:

import multiprocessingif __name__ == "__main__":with multiprocessing.Manager() as manager:# 创建不同类型的共享变量int_value = manager.Value('i', 42)float_value = manager.Value('d', 3.14)bool_value = manager.Value('b', True)char_value = manager.Value('c', 'A')# 创建共享整数数组int_array = manager.Array('i', [1, 2, 3, 4, 5])# 创建共享浮点数数组float_array = manager.Array('d', [1.1, 2.2, 3.3, 4.4, 5.5])# 在这里可以使用创建的共享变量print("int_value:", int_value.value)print("float_value:", float_value.value)print("bool_value:", bool_value.value)print("char_value:", char_value.value)print("int_array:", list(int_array))print("float_array:", list(float_array))

3.数据保护

以下是一个简单的示例,演示了如何使用锁来保护共享变量:

import multiprocessing
import timedef process_1(shared_value, lock):for _ in range(5):with lock:print(f"Process 1: Incrementing shared value")shared_value.value += 1time.sleep(1)def process_2(shared_value, lock):for _ in range(5):with lock:print(f"Process 2: Doubling shared value")shared_value.value *= 2time.sleep(1)if __name__ == "__main__":with multiprocessing.Manager() as manager:shared_value = manager.Value('i', 1)lock = manager.Lock()p1 = multiprocessing.Process(target=process_1, args=(shared_value, lock))p2 = multiprocessing.Process(target=process_2, args=(shared_value, lock))p1.start()p2.start()p1.join()p2.join()print("Final shared value:", shared_value.value)

(2)使用队列(Queue)

1.使用示例

multiprocessing.Manager 通过 Value 和 Array 方法来创建共享变量,这些变量的数据类型一般是固定的。在一次创建之后,其数据类型不能直接更改。

如果你需要处理混合类型的数据,可以考虑使用 multiprocessing.Queue 进行进程间通信。Queue 可以在进程之间传递各种类型的数据,并且它会自动处理进程之间的同步和通信。

下面是一个简单的示例,演示如何使用 multiprocessing.Queue 来处理混合类型的数据:

import multiprocessingdef process_1(queue):# 向队列中放入不同类型的数据queue.put(42)queue.put("Hello")queue.put([1, 2, 3])def process_2(queue):# 从队列中获取数据data1 = queue.get()data2 = queue.get()data3 = queue.get()print("Data received in Process 2:")print("Data 1:", data1)print("Data 2:", data2)print("Data 3:", data3)if __name__ == "__main__":with multiprocessing.Manager() as manager:# 创建一个 Queueshared_queue = manager.Queue()# 创建两个进程,分别处理数据的放入和取出p1 = multiprocessing.Process(target=process_1, args=(shared_queue,))p2 = multiprocessing.Process(target=process_2, args=(shared_queue,))p1.start()p2.start()p1.join()p2.join()

在这个例子中,我们使用 multiprocessing.Queue 在两个进程之间传递了整数、字符串和列表等不同类型的数据。Queue 会处理不同数据类型之间的同步和通信,使得它成为处理混合类型数据的一种有效方式。

请注意,Queue 是基于消息传递的概念,适用于不同进程之间的通信。

2.不同程序间用Queue通信

用Queue的话也可以在两个程序产生的进程之间通信吗?可以的,multiprocessing.Queue 可以用于在两个不同程序产生的进程之间进行通信。Queue 提供了一种在不同进程之间安全传递数据的机制,因此它可以被用于连接不同的程序。

下面是一个简单的例子,演示了如何在两个不同的Python程序中使用multiprocessing.Queue 进行通信:

程序1: program1.py

import multiprocessing
import timedef process_1(queue):for i in range(5):data = f"Message from Process 1, iteration {i}"queue.put(data)time.sleep(1)if __name__ == "__main__":with multiprocessing.Manager() as manager:shared_queue = manager.Queue()p1 = multiprocessing.Process(target=process_1, args=(shared_queue,))p1.start()p1.join()

程序2: program2.py

import multiprocessing
import timedef process_2(queue):for i in range(5):data = queue.get()print(f"Process 2 received: {data}")time.sleep(1)if __name__ == "__main__":with multiprocessing.Manager() as manager:shared_queue = manager.Queue()p2 = multiprocessing.Process(target=process_2, args=(shared_queue,))p2.start()p2.join()

在这个例子中,program1.py 中的进程1向 shared_queue 中放入消息,而 program2.py 中的进程2从 shared_queue 中取出消息并打印。这两个程序可以在不同的Python进程中运行,并通过 multiprocessing.Queue 进行通信。

确保在实际应用中使用适当的同步机制,如锁或信号量,以确保正确的进程间通信。

3.共享队列如何唯一标识

在进程间用Queue通信时,可能会有多个队列的情况,比如创建了三个A、B、C,本来某两个进程是打算用A通信的,进程1在A一端put,进程2在A另一端get,这样是正常的情况,但假设进程2却在B另一端get,这样就不正常了。这就涉及到队列的唯一标识问题,就像人的身份证一样,确保使用的是正确的那个队列。

但multiprocessing.Queue 并没有直接提供唯一标识的机制,Queue 是基于管道(pipe)和信号量(semaphore)来实现的。

不过对于唯一标识的需求,你可以通过在队列的名字中使用唯一的标识符来实现。下面是一个简单的例子,演示如何在 Queue 名字中使用唯一标识符。

程序1: program1.py

import multiprocessing
import timedef process_1(queue_name):with multiprocessing.Manager() as manager:shared_queue = manager.Queue(name=queue_name)for i in range(5):data = f"Message from Process 1, iteration {i}"shared_queue.put(data)time.sleep(1)if __name__ == "__main__":queue_name = "unique_queue_identifier"p1 = multiprocessing.Process(target=process_1, args=(queue_name,))p1.start()p1.join()

程序2: program2.py

import multiprocessing
import timedef process_2(queue_name):with multiprocessing.Manager() as manager:shared_queue = manager.Queue(name=queue_name)for i in range(5):data = shared_queue.get()print(f"Process 2 received: {data}")time.sleep(1)if __name__ == "__main__":queue_name = "unique_queue_identifier"p2 = multiprocessing.Process(target=process_2, args=(queue_name,))p2.start()p2.join()

在这个例子中,queue_name 就是一个唯一的标识符,用于在两个程序中共享队列。请确保在实际应用中使用唯一而有意义的标识符,以防止混淆。如果两个队列使用相同的 name,它们将被认为是同一个队列。

在 multiprocessing.Manager().Queue(name=queue_name) 中,name 参数用于指定队列的唯一标识符。如果两个进程创建 Manager().Queue 时的 name 参数不同,它们将无法访问同一个队列,从而无法进行通信。

在两个程序中,如果你使用不同的 name 参数来创建队列,它们就会创建两个独立的队列,而不是共享同一个队列。这样,这两个队列将是相互独立的,进程1放入的数据不会被进程2接收到。

确保在需要通信的不同进程中使用相同的 name 参数,以便它们能够共享同一个队列。例如,在上面的例子中,queue_name 应该在两个程序中保持一致,以便它们能够访问相同的队列。

4.队列缓冲区溢满相关问题

问题:队列有缓冲区大小限制,如果达到了缓冲区大小的限制,但队列的消费方却迟迟没有消费数据会怎么样?

multiprocessing.Queue 是有缓冲区大小限制的,这个限制可以通过在创建队列时指定 maxsize 参数来设置。如果队列满了(达到了缓冲区大小的限制),在往队列中放入数据时会阻塞,直到队列有足够的空间。

如果队列的消费方迟迟没有读取数据,而队列中的缓冲区已满,那么往队列中放入数据的进程将会被阻塞,等待直到队列有足够的空间。这种情况下,进程将会在放入数据的操作上阻塞,直到有其他进程从队列中取走一些数据为止。

如果有其他进程及时取走了一些数据,使得队列有足够的空间,那么被阻塞的放入数据的进程就会继续执行,并成功往队列中放入数据。

这个阻塞机制使得在并发编程中可以有效地进行同步,保证了进程间数据的安全传递。如果需要非阻塞的操作,可以使用 put_nowait 和 get_nowait 方法,它们在队列满或空时不会阻塞,而是抛出异常。

5.为什么要用with管理Manager()

前面的例子代码中,我们看到with multiprocessing.Manager() as manager这样,with在这里有什么作用?

在 Python 中,with 语句用于简化资源管理,确保在进入和退出代码块时资源得到正确的分配和释放。对于 multiprocessing.Manager() 来说,它是一个上下文管理器(Context Manager),通过 with 语句使用可以确保 Manager 的资源被正确地管理和关闭。

使用 with 语句的好处是,在离开 with 代码块时,不论代码块中发生了什么,资源都会被正确释放,即使在发生异常的情况下也能够保证资源的正确释放。这是通过 __enter__ 和 __exit__ 方法的机制实现的。

如果你不使用 with 语句,你需要手动调用 multiprocessing.Manager() 返回的 Manager 对象的 __enter__ 和 __exit__ 方法。具体而言,你需要手动调用 Manager 对象的 __enter__ 方法以获取 Manager 对象,并在完成使用后手动调用 __exit__ 方法以释放资源。

以下是手动管理 Manager 资源的示例:

import multiprocessing# 手动调用 __enter__ 和 __exit__ 方法
manager = multiprocessing.Manager()
manager.__enter__()# 在这里进行需要的操作,比如创建共享对象、进程间通信等
shared_value = manager.Value('i', 42)
shared_queue = manager.Queue()# ...# 手动调用 __exit__ 方法释放资源
manager.__exit__(None, None, None)

6.全局的Manager 对象

需求:创建好共享变量后在多个地方使用,而不希望在每个使用地方都重复创建和释放资源

你可以考虑使用一个全局的 Manager 对象,并在需要的地方共享这个对象。这样,你就可以在整个应用程序中共享相同的资源池。

以下是一个示例,演示如何在全局范围内创建和共享 Manager 对象:

import multiprocessing# 全局的 Manager 对象
global_manager = multiprocessing.Manager()# 在需要的地方共享 Manager 对象
def function_1():shared_value = global_manager.Value('i', 42)print("Function 1 - Initial shared value:", shared_value.value)def function_2():shared_queue = global_manager.Queue()shared_queue.put("Hello from Function 2")print("Function 2 - Message received:", shared_queue.get())if __name__ == "__main__":# 在主程序中调用 function_1 和 function_2,它们共享相同的 Manager 对象function_1()function_2()

在这个例子中,global_manager 是一个全局的 Manager 对象,被两个函数 function_1 和 function_2 共享。这两个函数分别创建了共享变量(Value)和共享队列(Queue),它们都使用了相同的 global_manager 对象。这样,你就可以在不同的地方共享相同的资源,而不需要在每个地方都创建新的 Manager 对象。

请注意,全局的资源共享需要小心处理并发问题,确保在不同的地方使用时能够正确同步。

(3)使用共享内存映射(mmap)

Python 中有 mmap 模块,它允许在内存中映射文件。mmap 模块提供了一种将文件内容直接映射到内存中的方法,这样可以通过内存操作来访问文件的内容,而不必每次都通过文件 I/O 操作读写文件。

mmap 可以用于实现两个 Python 程序之间的共享内存,从而实现简单的进程间通信。

1.mmap基础函数

  • m.close() 关闭 m 对应的文件;
  • m.find(str, start=0) 从 start 下标开始,在 m 中从左往右寻找子串 str 最早出现的下标;
  • m.flush([offset, n]) 把 m 中从offset开始的n个字节刷到对应的文件中,参数 offset 要么同时指定,要么同时不指定;
  • m.move(dstoff, srcoff, n) 等于 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把从 srcoff 开始的 n 个字节复制到从 dstoff 开始的n个字节,可能会覆盖重叠的部分。
  • m.read(n) 返回一个字符串,从 m 对应的文件中最多读取 n 个字节,将会把 m 对应文件的位置指针向后移动;
  • m.read_byte() 返回一个1字节长的字符串,从 m 对应的文件中读1个字节,要是已经到了EOF还调用 read_byte(),则抛出异常 ValueError;
  • m.readline() 返回一个字符串,从 m 对应文件的当前位置到下一个’\n’,当调用 readline() 时文件位于 EOF,则返回空字符串;
  • m.resize(n) 把 m 的长度改为 n,m 的长度和 m 对应文件的长度是独立的;
  • m.seek(pos, how=0) 同 file 对象的 seek 操作,改变 m 对应的文件的当前位置;
  • m.size() 返回 m 对应文件的长度(不是 m 对象的长度len(m));
  • m.tell() 返回 m 对应文件的当前位置;
  • m.write(str) 把 str 写到 m 对应文件的当前位置,如果从 m 对应文件的当前位置到 m 结尾剩余的空间不足len(str),则抛出 ValueError;
  • m.write_byte(byte) 把1个字节(对应一个字符)写到 m 对应文件的当前位置,实际上 m.write_byte(ch) 等于 m.write(ch)。如果 m 对应文件的当前位置在 m 的结尾,也就是 m 对应文件的当前位置到 m 结尾剩余的空间不足1个字节,write() 抛出异常ValueError,而 write_byte() 什么都不做。

2.简单数据通信(比如字符串)

下面是一个简单的示例,演示如何使用 mmap 进行进程间通信:

程序1:写入共享内存

import mmap
import os
import time# 创建或打开一个共享内存文件
shm_filename = 'shared_memory_file'
size = 100  # 共享内存大小,单位字节with open(shm_filename, 'wb') as file:file.write(b'\0' * size)# 打开共享内存文件
with open(shm_filename, 'r+b') as file:# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_WRITE)# 写入数据到共享内存data_to_write = b'Hello, Program 2!'mmapped_file[:len(data_to_write)] = data_to_write# 关闭内存映射mmapped_file.close()# 等待一段时间,模拟程序1的运行
time.sleep(5)

程序2:读取共享内存

import mmap
import time# 打开共享内存文件
shm_filename = 'shared_memory_file'
with open(shm_filename, 'r') as file:# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)# 读取共享内存中的数据data = mmapped_file[:].rstrip(b'\0')print(f"Received data from Program 1: {data.decode('utf-8')}")# 关闭内存映射mmapped_file.close()

在上述示例中,两个程序通过共享内存文件 shared_memory_file 进行通信。程序1负责写入数据到共享内存,而程序2负责读取共享内存中的数据。程序2会等待一段时间,以模拟程序1的运行。

请注意,这只是一个简单的示例,实际情况中可能需要考虑进程同步、数据格式等问题。对于更复杂的通信需求,你可能需要使用更高级的 IPC(进程间通信)机制,比如使用 multiprocessing 模块中的 Queue 或 Pipe。

3.复杂数据通信(单个结构体)

如何实现共享多个变量:将数据打包为结构体

假设我在向内存中写数的时候要写多个变量的数据,我在取数据的时候怎么能区分开来,正确读到每个变量的值

在内存中写入多个变量的数据时,一种常见的做法是使用结构体或类来组织这些变量,然后将整个结构体或类实例写入内存。这样,你就可以确保在读取数据时正确地访问每个变量的值。

下面是一个简单的示例,演示了如何使用 struct 模块来定义结构体,将结构体实例写入内存,以及在另一个程序中读取结构体实例的数据:

程序1:写入包含整数和浮点数的数据结构

import mmap
import structclass MyDataStruct:def __init__(self, var1, var2, var3):self.var1 = var1self.var2 = var2self.var3 = var3# 创建或打开一个共享内存文件
shm_filename = 'shared_memory_file'  # 文件路径
size = struct.calcsize('ifi')  # int, float, intwith open(shm_filename, 'wb') as file:file.write(b'\0' * size)# 打开共享内存文件
with open(shm_filename, 'r+b') as file:# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_WRITE)# 创建并写入结构体实例data_to_write = MyDataStruct(var1=42, var2=3.14, var3=-7)packed_data = struct.pack('ifi', data_to_write.var1, data_to_write.var2, data_to_write.var3)mmapped_file[:size] = packed_data# 关闭内存映射mmapped_file.close()

程序2:读取包含整数和浮点数的数据结构

import mmap
import structclass MyDataStruct:def __init__(self, var1, var2, var3):self.var1 = var1self.var2 = var2self.var3 = var3# 打开共享内存文件
shm_filename = 'shared_memory_file'
with open(shm_filename, 'r') as file:# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)# 读取结构体实例的数据packed_data = mmapped_file[:struct.calcsize('ifi')]unpacked_data = struct.unpack('ifi', packed_data)# 创建结构体实例received_data = MyDataStruct(*unpacked_data)print(f"Received data from Program 1: var1={received_data.var1}, var2={received_data.var2}, var3={received_data.var3}")# 关闭内存映射mmapped_file.close()

在这个示例中,MyDataStruct 类包含三个变量(var1、var2、var3),程序1创建了一个 MyDataStruct 实例,并使用 struct.pack 将其打包为字节序列,然后将该字节序列写入共享内存。程序2读取共享内存中的字节序列,使用 struct.unpack 解包得到每个变量的值,最后创建了一个新的 MyDataStruct 实例。

请注意,在使用 struct.pack 和 struct.unpack 时,需要确保格式字符串(比如 'iii')与你的数据结构一致,以免数据解析错误。

4.复杂数据通信(多个结构体)

前面那个例子中,只是简单将一条数据(包含多项信息的一个结构体)放在了共享内存中,然后两个进程互相通信。现在情况变得更加复杂一点,假设我们有1000条数据需要进行通信,那该怎么操作?

如果一次性把所有数据写过去,然后另一端一次性读完,不太好。

这里的一个思路是获取足够大的共享内存空间,比如能容纳2千条数据,这样这块共享内存就可以划分为2000个位置,生产端源源不断将第1条数据写在第1个位置,将第2条数据写在第2个位置,或者按其它规则写在不同位置。

然后消费端这边根据规则读取某一个位置的数据即可。比如我这2000个位置分别是2000只股票的坑,要读A股票的数据就是对应的固定的那个坑去读取。

思路:
事先规划好内存映射区的指定大小,
每段固定内存区域放置固定股票的数据,
用哈希字典记录哪只股票在哪个位置,
写数据时,用哈希字典快速找到该股票在内存区域在哪个位置,然后写入,
读数据时,根据哈希字典可快速找到位置,读取对应二进制数据,
然后解包转换为指定结构体数据。

代码文件1:MmapMarketDataTool.py,工具模块,简单实现了打包和解包数据接口

"""
file name: MmapMarketDataTool.py
python version: 3.9
"""
import structclass MarketDataStruct:def __init__(self, stock_code="123456.SH", today="20001100", data_time="998877",pre_close_price=0.0, top_limit_price=0.0, down_limit_price=0.0,last_price=0.0, open_price=0.0, max_price=0.0, min_price=0.0,buy_info=None, sell_info=None):if buy_info is None:buy_info = [(0.0, 0) for _ in range(10)]if sell_info is None:sell_info = [(0.0, 0) for _ in range(10)]self.stock_code = stock_codeself.today = todayself.data_time = data_timeself.pre_close_price = pre_close_priceself.top_limit_price = top_limit_priceself.down_limit_price = down_limit_priceself.last_price = last_priceself.open_price = open_priceself.max_price = max_priceself.min_price = min_priceself.buy_info = buy_infoself.sell_info = sell_info# 单个结构体占用内存大小
# 9s表示一个长度为9字节的字符串, 'f' * 7 表示七个4字节的浮点数, 'i'表示一个4字节的整数
size_str = '9s8s6s' + 'f' * 7 + 'fifi' * 10
size = struct.calcsize(size_str)# 股票个数
stock_num = 5000# 结束时间
end_time = "23:00:00"# 共享内存对应文件的路径
shm_filename = 'shared_market_data_file'def get_packed_data(struct_data: MarketDataStruct):# print(f"stock_code={struct_data.stock_code}, last_price={struct_data.last_price}")packed_data = struct.pack(size_str,struct_data.stock_code.encode('utf-8'),struct_data.today.encode('utf-8'),struct_data.data_time.encode('utf-8'),struct_data.pre_close_price,struct_data.top_limit_price,struct_data.down_limit_price,struct_data.last_price,struct_data.open_price,struct_data.max_price,struct_data.min_price,*sum(struct_data.buy_info, ()),  # 展开买卖十档信息列表*sum(struct_data.sell_info, ()),  # 展开买卖十档信息列表)# unpacked_data = struct.unpack(size_str, packed_data)# print(unpacked_data)# print(len(unpacked_data))return packed_datadef get_unpacked_data(packed_data):# 当使用 struct.unpack 解包数据时,它返回一个包含所有解包值的元组unpacked_data = struct.unpack(size_str, packed_data)# print(unpacked_data)# print(len(unpacked_data))# 字符串类的数据必须正确地编码和解码stock_code = unpacked_data[0].decode('utf-8')today = unpacked_data[1].decode('utf-8')data_time = unpacked_data[2].decode('utf-8')pre_close_price = unpacked_data[3]top_limit_price = unpacked_data[4]down_limit_price = unpacked_data[5]last_price = unpacked_data[6]open_price = unpacked_data[7]max_price = unpacked_data[8]min_price = unpacked_data[9]buy_info = unpacked_data[10:30]sell_info = unpacked_data[30:50]# 因此结构体数据类型应当是接受可变数量的参数received_data = MarketDataStruct(stock_code, today, data_time,pre_close_price, top_limit_price, down_limit_price,last_price, open_price, max_price, min_price,buy_info, sell_info)return received_data

代码文件2:模拟获取数据以及将数据写入共享内存中

"""
python version: 3.9
"""
import mmap
import struct
import threading
import datetime as dt
import time
from MmapMarketDataTool import *# 行情数据字典
market_data_dict = dict()# 获取行情数据
def update_market_data_dict():# 这里用函数模拟,随机获取5000个股票的数据while True:now_time = dt.datetime.now().strftime("%H:%M:%S")# print(now_time)if now_time > end_time:print("-------------update market_data_dict end-------------")breakfor i in range(stock_num):market_data = MarketDataStruct()key = "stock" + format(i+1, "04d")market_data.stock_code = keyprice_str = dt.datetime.now().strftime("%H%M%S")market_data.last_price = float(price_str)market_data_dict[key] = market_data# print(f"stock_code={market_data.stock_code}, last_price={market_data.last_price}")# time.sleep(1)# 将数据写入共享内存中
def mmap_market_data(mmap_file_path):# 文件初始化数据with open(mmap_file_path, 'wb') as file:file.write(b'\0' * size * stock_num)# 打开共享内存文件with open(mmap_file_path, 'r+b') as file:while True:now_time = dt.datetime.now().strftime("%H:%M:%S")# print(now_time)if now_time > end_time:print("-------------mmap_market_data end-------------")break# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_WRITE)key_list = list(market_data_dict.keys())for i in range(len(key_list)):stock_name = key_list[i]data_to_write = market_data_dict[stock_name]packed_data = get_packed_data(data_to_write)mmapped_file[i*size: (i+1)*size] = packed_data# 关闭内存映射mmapped_file.close()# 1.模拟行情源的处理:开启线程不断获取行情数据,更新market_data_dict
print("-------------update market_data_dict start-------------")
get_data_thread = threading.Thread(target=update_market_data_dict, args=())
get_data_thread.start()# 2.将market_data_dict中的数据写入共享内存中
print("-------------mmap_market_data start-------------")
mmap_market_data(shm_filename)

代码文件3:读数据的程序,模拟从共享内存中读取数据,以及转变为结构体数据

"""
python version: 3.9
"""
import mmap
import struct
import threading
import datetime as dt
import time
from MmapMarketDataTool import *read_data_dict = dict()def update_read_data_dict():with open(shm_filename, 'r') as file:while True:now_time = dt.datetime.now().strftime("%H:%M:%S")if now_time > end_time:print("-------------update market_data_dict end-------------")break# 将文件内容映射到内存mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)for i in range(stock_num):# 读取结构体实例的数据packed_data = mmapped_file[i*size: (i+1)*size]# 获取解包数据转化为结构体received_data = get_unpacked_data(packed_data)# 更新read_data_dictstock_code = received_data.stock_coderead_data_dict[stock_code] = received_data# print(f"stock_code={received_data.stock_code}, last_price={received_data.last_price}")# 休眠一段时间,避免频繁读取time.sleep(1)# 关闭内存映射mmapped_file.close()def print_read_data_dict():while True:now_time = dt.datetime.now().strftime("%H:%M:%S")if now_time > end_time:print("-------------print_read_data_dict end-------------")breakif len(read_data_dict) < stock_num:continuefor stock_code in read_data_dict:# print("----------------------------------------")# print(f"stock_code={read_data_dict[stock_code].stock_code}")print(f"stock_code={stock_code}, last_price={read_data_dict[stock_code].last_price}")# 1.开启线程不断获取行情数据,更新read_data_dict
print("-------------update market_data_dict start-------------")
read_data_thread = threading.Thread(target=update_read_data_dict, args=())
read_data_thread.start()# # 2.业务程序通过读取read_data_dict来获取行情数据
print("-------------print_read_data_dict start-------------")
print_data_thread = threading.Thread(target=print_read_data_dict, args=())
print_data_thread.start()

运行看下模拟的效果。先运行写数据的程序(作用是不断更新数据字典,然后将数据字典中的全部数据有序映射到共享内存中)

Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "copyright", "credits" or "license()" for more information.
>>> 
================= RESTART: D:\hhttt\projects\pytest\test1.py =================
-------------update market_data_dict start-------------
-------------mmap_market_data start-------------

然后运行读数据的程序,就读到数据了

stock_code=stock3982, last_price=171155.0
stock_code=stock3983, last_price=171155.0
stock_code=stock3984, last_price=171155.0
stock_code=stock3985, last_price=171155.0
stock_code=stock3986, last_price=171155.0
stock_code=stock3987, last_price=171155.0
stock_code=stock3988, last_price=171155.0
stock_code=stock3989, last_price=171155.0
stock_code=stock3990, last_price=171155.0
stock_code=stock3991, last_price=171155.0
stock_code=stock3992, last_price=171155.0
stock_code=stock3993, last_price=171155.0
stock_code=stock3994, last_price=171155.0
stock_code=stock3995, last_price=171155.0
stock_code=stock3996, last_price=171155.0
stock_code=stock3997, last_price=171155.0
stock_code=stock3998, last_price=171155.0
stock_code=stock3999, last_price=171155.0
stock_code=stock4000, last_price=171155.0
stock_code=stock4001, last_price=171155.0

5.关于mmapped_file对象的创建和资源释放

mmapped_file对象的创建和资源释放只需要操作一次即可,将mmapped_file作为全局变量这样更合适是不是?

是的,通常情况下,mmap 对象的创建和资源释放操作应该尽量减少,因为频繁地创建和释放可能会引起性能问题。

将 mmap 对象作为全局变量可以确保在程序的生命周期内只创建一次,避免了重复的资源操作。这样可以提高程序的效率,特别是在需要频繁读写共享内存时。

在这种情况下,你应该确保在程序退出时正确关闭 mmap 对象,以释放资源。你可以使用类似于 atexit 模块的方法,或者在程序结束时捕获 KeyboardInterrupt 等信号来执行清理操作。以下是一个简单的示例:

import atexit
import mmap# 全局变量,保存 mmap 对象
mmapped_file = Nonedef initialize_mmap():global mmapped_filewith open("example.txt", 'r+b') as file:mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_WRITE)# 在程序退出时执行清理操作
@atexit.register
def cleanup():global mmapped_fileif mmapped_file is not None:mmapped_file.close()# 初始化 mmap 对象
initialize_mmap()# 在程序的其他部分使用 mmapped_file
# ...# 不需要手动关闭 mmapped_file,它将在程序退出时自动关闭

这样的设计确保了在程序的整个生命周期内只有一个 mmap 对象,并在程序退出时正确关闭它。


end

http://www.xdnf.cn/news/9314.html

相关文章:

  • C++数据结构 : map和set的使用
  • 高精度微型导轨在3D打印机中有多重要?
  • 2024 CKA模拟系统制作 | Step-By-Step | 9、题目搭建-扩容deployment副本数量
  • 打破云平台壁垒支持多层级JSON生成的MQTT网关技术解析
  • 《数据结构笔记四》双链表:创建,插入(头插、尾插、中间任意位置插入),删除,遍历,释放内存等核心操作。
  • 释放生产力潜能 AI-Hub智能数据中枢引领企业数字化转型
  • 粒子群优化(Particle Swarm Optimization, PSO)
  • 大模型(7)——向量模型(向量化存储)
  • Science综述:光电超构器件
  • Spring IoC(2)
  • 18、Python字符串全解析:Unicode支持、三种创建方式与长度计算实战
  • 【DeepSeek论文精读】12. DeepSeek-Prover-V2: 通过强化学习实现子目标分解的形式化数学推理
  • 【PhysUnits】14 二进制数的标准化表示(standardization.rs)
  • 【第1章 基础知识】1.6 事件处理
  • 嵌入式自学第二十九天(5.27)
  • 北京大学 | DeepSeek内部研讨资料:AI工具深度测评与选型指南,319页
  • 系统编程day05
  • 基于 STM32 的智慧农业温室控制系统设计与实现
  • 学习python day9
  • DeviceNET转EtherCAT协议转换网关解读
  • Qwen3内置提示词模板解读
  • 数据库大学实验一
  • 投影机三色光源和单色光源实拍对比:一场视觉体验的终极较量
  • 知识图谱系列(4):查询与推理技术
  • 第四十七篇-Tesla P40+Qwen3-30B-A3B部署与测试
  • 什么是PLM软件?离散制造业和流程制造业的主流PLM介绍、国产PLM应用案例
  • 5月27日星期二今日早报简报微语报早读
  • RuoYi前后端分离框架集成Jasypt实现配置信息加密
  • Kubernetes简介及常用命令
  • 高效大电流缓启动电路设计