你好,我是kelly。
今天分享一个工作中遇到问题,以及解决方案。
问题介绍:
问题背景:
有一个12g的csv文件,文件每行是一条包含2对经纬度坐标的定位记录,csv文件共11+亿条记录。
目的:
csv文件每条记录和某一个地理空间库匹配,找到每对经纬度坐标所对应的空间POI名字。
匹配过程遇到的问题:
1、输入数据全部加载进内存会溢出,抛出错误out of Memory。
2、输出结果数据无法一直缓存在内存。
问题分析:
csv文件的每条记录包含2对经纬度坐标,即(lng1, lat1), (lng2, lat2)。简单估算,假设单个坐标使用4个字节Bytes的float存储,2对经纬度坐标(4个float)需要 4*4=16B(ytes)。
已知在内存大小换算时,
1GB=2^10MB=2^20KB=2^30B(ytes);
此外,2^10?≈10^3
因此,预估该csv文件全部加载进内存共需:
11*10^9*16B≈11*2^30*2^4=11*2^34B=11*2^4GB=176GB,显然远远大于电脑4GB内存。
改动前的文件读写代码:
def match(line):
# line包含2对经纬度的坐标点,执行匹配操作
# 具体操作这里省略
return "返回值已转为字符串,这里是示例"
with open("file_output.csv", mode="w", encoding="utf-8") as fw:
with open("file_input.csv", encoding="utf-8") as fr:
count = 0
for line in fr.readlines():
result = match(line) # 假定返回值已转为字符串
fw.write(result + "\n")
改动后的文件读写代码:
with open("file_output.csv", mode="w", encoding="utf-8") as fw:
with open("file_input.csv", encoding="utf-8") as fr:
count = 0
for line in fr: # 改动1:使用迭代器遍历文件数据
result = match(line) # 假定返回值已转为字符串
fw.write(result + "\n")
if count % 1000 == 0:
fw.flush() # 改动2:每隔一段时间将内存数据刷新到磁盘
对比前后,改动有2点:
1、以迭代器的循环方式读取文件:不一次性加载到内存,每次仅读取一条记录。
2、匹配结果写入文件时:每隔固定时间将数据写入磁盘,防止大量数据缓存在内存。
解释说明:
改动1:为什么使用迭代器的循环方式
所创建的fr对象是一个迭代器,使用fr.readlines()操作会将csv全部数据一次性读入(缓存)内存,而直接对fr执行for循环,每次仅仅读取csv文件的一条数据。
补充知识:Python迭代器是一种特殊对象,可以逐个访问容器(各种集合类型,比如列表、元组、字典、字符串等)中的元素,不需要将所有元素都缓存在内存。迭代器在运行过程中,仅仅存储当前迭代数据,不将全部数据都缓存在内存,访问大量数据时非常节约内存。
改动2:为什么调用文件对象的flush函数主动刷新数据
使用Python将数据写入文件时,数据不会立即写入文件,通常会先存放在缓冲区(Python内部创建),减少访问磁盘的次数(磁盘访问比较慢,导致程序运行变慢)。
既然这样,那缓冲区数据什么时候才真正写入文件呢?一般是以下几个情形:
1)显式或者隐式调用文件对象的close函数,关闭文件会自动刷新缓冲器
2)缓冲区满了,
open函数有个与缓冲区相关的参数buffering,一般不设置,缺省值为-1。缺省状态下,缓冲区大小一般为4096或8192个字节大小。具体缓冲器大小可以使用以下命令查看:
In [18]: import os
In [19]: print(io.DEFAULT_BUFFER_SIZE)
8192
3)调用flush函数,即主动将缓冲区的数据立刻写入磁盘,同时清空缓冲器,无需被动等待。
为平衡内存使用率和程序运行速度,因此间隔固定时间写入文件,这样即不占用太多内存,又至于程序运行过慢。
本文原始版本发表链接:
kelly会在公众号「kelly学技术」不定期更新文章,感兴趣的朋友可以关注一下,期待与您交流。
--over--