问题背景
我经常使用爬虫来做数据抓取,多线程爬虫方案是必不可少的,正如我在使用 Python 进行科学计算时,需要处理大量存储在 CSV 文件中的数据。由于每个处理过程需要很长时间才能完成,而您拥有多核处理器,所以您尝试使用多进程库中的 Pool 方法来提高计算效率。
您按照如下方式构建了多进程调用:
pool = Pool()
vector_components = []
for sample0 in range(samples):
vector_field_x_i = vector_field_samples_x[sample]
vector_field_y_i = vector_field_samples_y[sample]
vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
vector_field_x_i, vector_field_y_i))
vector_components.append(vector_component)
pool.close()
pool.join()
vector_components = map(lambda k: k.get(), vector_components)
for vector_component in vector_components:
CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
使用此代码,当您处理 500 个元素,每个元素大小为 100 x 100 的数据时,一切正常。但是,当您尝试处理 500 个元素,每个元素大小为 400 x 400 时,在调用 get() 时会收到内存错误。
解决方案
出现内存错误的原因是您的代码在内存中保留了多个列表,包括 vector_field_x、vector_field_y、vector_components,以及在 map() 调用期间创建的 vector_components 的副本。当您尝试处理较大的数据时,这些列表可能变得非常大,从而导致内存不足。
为了解决此问题,您需要避免在内存中保存完整的列表。您可以使用多进程库中的 imap() 方法来实现这一点。imap() 方法返回一个迭代器而不是完整的列表,因此您不必将所有结果都保存在内存中。
以下是使用 imap() 方法重写代码的示例:
import multiprocessing
from functools import partial
def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
vector_field_x_i = vector_fields[0]
vector_field_y_i = vector_fields[1]
# Do whatever is normally done here.
if __name__ == "__main__":
num_workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(num_workers)
# Calculate a good chunksize (based on implementation of pool.map)
chunksize, extra = divmod(samples // 4 * num_workers)
if extra:
chunksize += 1
# Use partial so many arguments can be passed to vector_field_decomposer
func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
# We use a generator expression as an iterable, so we don't create a full list.
results = pool.imap(func,
((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
chunksize=chunksize)
for vector in results:
CsvH.write_vector_field(vector_component,
'../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
pool.close()
pool.join()
通过使用这种方法,您可以避免出现内存错误,并能够处理较大的数据。
请确保你的计算任务是可以并行化的,并且注意到在Windows系统上,mclapply
可能不如在Unix-like系统(如Linux或Mac OS X)上有效。在Windows系统上,你可能需要使用parLapply
函数来代替。如果有更多专业知识不懂得可以评论区一起讨论。