Python 3.8引入了multiprocessing
模块中的共享内存类SharedMemory
。使用SharedMemory
可以更方便地操作共享内存.
# -*- coding: utf-8 -*-
import csv
import inspect
import json
import os
import traceback
from multiprocessing import shared_memory
class BytesJSONEncoder(json.JSONEncoder):
"""用于解决json.dumps函数发现字典里面有bytes类型的数据无法编码问题"""
def default(self, obj):
if not isinstance(obj, bytes):
return json.JSONEncoder.default(self, obj)
return str(obj, encoding='utf-8')
class SharedMemoryObj:
def __init__(self, name='shm_0', size=1024):
try:
self.shm = shared_memory.SharedMemory(name=name, create=True, size=size) # 尝试创建共享内存,若失败则映射同名内存空间
except:
self.shm = shared_memory.SharedMemory(name=name, create=False)
self.shm_name = self.shm.name
self.length = 0
self.contentbegin = 0
def write(self, obj: dict) -> bool:
"""
将待写内容通过Json序列化,写入共享内存。
写入格式:内容长度 + ~ + 内容
Args:
obj: 待写内容
Returns:
"""
try:
if obj is not None and obj != {}:
obj_str = json.dumps(obj, cls=BytesJSONEncoder)
obj_len = len(obj_str)
content = str(obj_len) + "~" + obj_str
b = content.encode()
self.shm.buf[0:len(b)] = b
return True
else:
return False
except Exception as e:
traceback.print_exc()
return False
def read(self) -> dict:
"""
读取共享内存,返回字典格式内容
Returns:dict
"""
try:
s = bytes(self.shm.buf[:20])
index = str(s, 'utf-8').find("~")
if index != -1:
head = s[0:index]
contentlength = int(head)
content = bytes(self.shm.buf[index + 1:index + 1 + contentlength]).decode('utf-8')
obj = json.loads(content, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in
d.items()})
# obj = json.loads(content)
return obj
else:
return {}
except Exception as e:
traceback.print_exc()
return {}
def update(self, obj):
org_obj = self.read()
org_obj.update(obj)
self.write(org_obj)
def get_protol_config(path):
return FileOpn.read_json(path)
FolderPath = os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe())))
class FileOpn(object):
@staticmethod
def read_csv(file_path, mark):
with open(FolderPath + file_path, newline='', encoding='utf-8-sig') as f:
f_csv = csv.DictReader(f)
new_dict = {}
for row in f_csv:
for i in row.keys():
if i == 'storage_id':
storage_id = int(row[i])
new_dict[storage_id] = {}
new_dict[storage_id][i] = storage_id
new_dict[storage_id][mark] = {}
continue
new_dict[storage_id][mark][i] = int(row[i])
return new_dict
@staticmethod
def read_json(file_path):
try:
with open(FolderPath + file_path, encoding='utf-8') as json_data:
reader = json.load(json_data)
locals().update(reader)
except Exception as e:
reader = None
return reader
if __name__ == '__main__':
machine_config_shared_memory = SharedMemoryObj('sha_machine_config', 8 * 1024 * 1024)
config_36 = "/protol_config_36c.json"
config_test = "/test.json"
get_config_36 = get_protol_config(config_36)
get_config_test = get_protol_config(config_test)
machine_config_shared_memory.update(get_config_36)
machine_config_shared_memory.update(get_config_test)
# machine_config_shared_memory.write(get_config_36)
# machine_config_shared_memory.write(get_config_test)
res = machine_config_shared_memory.read()
print(res)
定义了一个SharedMemoryObj
类,用于创建共享内存,提供存储和读取数据的方法。
write()
方法中,先将待写内容通过Json
序列化为字符串,然后写入共享内存;read()
方法中,先从共享内存中读取字节数据,按照写入格式进行划分,获取内容长度和内容,并将内容反序列化为字典格式返回;update()
方法中,先从共享内存中读取原有字典数据,对其进行更新,然后重新写入共享内存。以上就是关于Python - 进程操作共享内存【SharedMemory】的基本介绍,希望对你有所帮助!