如何逐行读取大型文本文件,而不将它们加载到内存中?[重复]
- 2024-12-17 08:30:00
- admin 原创
- 152
问题描述:
我想逐行读取一个大文件(>5GB),而不将其所有内容加载到内存中。我无法使用,readlines()
因为它会在内存中创建一个非常大的列表。
解决方案 1:
使用for
文件对象上的循环逐行读取。使用with open(...)
上下文管理器确保读取后关闭文件:
with open("log.txt") as infile:
for line in infile:
print(line)
解决方案 2:
您需要做的就是使用文件对象作为迭代器。
for line in open("log.txt"):
do_something_with(line)
更好的是在最近的 Python 版本中使用上下文管理器。
with open("log.txt") as fileobject:
for line in fileobject:
do_something_with(line)
这也会自动关闭该文件。
解决方案 3:
最好使用迭代器。
相关: fileinput
——迭代来自多个输入流的行。
来自文档:
import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
process(line)
这将避免一次将整个文件复制到内存中。
解决方案 4:
一种老派的方法:
fh = open(file_name, 'rt')
line = fh.readline()
while line:
# do stuff with line
line = fh.readline()
fh.close()
解决方案 5:
请尝试以下操作:
with open('filename','r',buffering=100000) as f:
for line in f:
print line
解决方案 6:
如果文件中没有换行符,请执行以下操作:
with open('large_text.txt') as f:
while True:
c = f.read(1024)
if not c:
break
print(c,end='')
解决方案 7:
我简直不敢相信它竟然像 @john-la-rooy 的回答那样简单。因此,我cp
使用逐行读写重新创建了该命令。速度快得令人难以置信。
#!/usr/bin/env python3.6
import sys
with open(sys.argv[2], 'w') as outfile:
with open(sys.argv[1]) as infile:
for line in infile:
outfile.write(line)
解决方案 8:
blaze项目在过去 6 年中取得了长足进步。它有一个简单的 API,涵盖了 Pandas 功能的一个有用子集。
dask.dataframe负责内部分块,支持许多可并行操作,并允许您轻松地将切片导出回 pandas 以进行内存操作。
import dask.dataframe as dd
df = dd.read_csv('filename.csv')
df.head(10) # return first 10 rows
df.tail(10) # return last 10 rows
# iterate rows
for idx, row in df.iterrows():
...
# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()
# slice by column
df[df.my_field=='XYZ'].compute()
解决方案 9:
这是用于加载任意大小的文本文件而不会引起内存问题的代码。
它支持 GB 大小的文件
https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d
下载文件data_loading_utils.py并将其导入到你的代码中
用法
import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000
def process_lines(data, eof, file_name):
# check if end of file reached
if not eof:
# process data, data is one single line of the file
else:
# end of file reached
data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)
process_lines方法是回调函数。它将针对所有行调用,参数数据每次代表文件的一行。
您可以根据机器硬件配置来配置变量CHUNK_SIZE 。
解决方案 10:
我意识到这个问题很久以前就有人回答过了,但这里有一种并行执行的方法,不会消耗内存(如果你试图将每行都放入池中,就会出现这种情况)。显然,将 readJSON_line2 函数换成更合理的函数 - 只是为了说明这一点!
加速将取决于文件大小以及您对每一行执行的操作 - 但最坏的情况是对于小文件并且仅使用 JSON 读取器读取它,我看到使用以下设置的 ST 具有类似的性能。
希望对某些人有用:
def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json
linesRtn = []
for lineIn in linesIn:
if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
linesRtn.append(lineRtn)
return linesRtn
# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp
path1 = "C:\/user\\Documents\\\"
file1 = "someBigJson.json"
nBuffer = 20*nCPUs # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead
iJob = 0 #Tracker for SMP jobs submitted into pool
iiJob = 0 #Tracker for SMP jobs extracted back out of pool
jobs = [] #SMP job holder
MTres3 = [] #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1
#Cleanup
del chunk, jobs, temp1
pool.close()
解决方案 11:
怎么样?将文件分成几块,然后逐行读取,因为读取文件时,操作系统会缓存下一行。如果逐行读取文件,则无法有效利用缓存的信息。
相反,将文件分成块并将整个块加载到内存中,然后进行处理。
def chunks(file,size=1024):
while 1:
startat=fh.tell()
print startat #file's object current position from the start
fh.seek(size,1) #offset from current postion -->1
data=fh.readline()
yield startat,fh.tell()-startat #doesnt store whole list in memory
if not data:
break
if os.path.isfile(fname):
try:
fh=open(fname,'rb')
except IOError as e: #file --> permission denied
print "I/O error({0}): {1}".format(e.errno, e.strerror)
except Exception as e1: #handle other exceptions such as attribute errors
print "Unexpected error: {0}".format(e1)
for ele in chunks(fh):
fh.seek(ele[0])#startat
data=fh.read(ele[1])#endat
print data
解决方案 12:
谢谢!我最近转换到了 Python 3,但使用 readlines(0) 读取大文件时却很沮丧。这解决了问题。但为了获取每一行,我不得不多做几个步骤。每行前面都有一个“b”,我猜它是二进制格式。使用“decode(utf-8)”将其更改为 ascii。
然后我必须删除每行中间的“=\n”。
然后我在新线上分割线条。
b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
data_chunk = (a_data.replace('=
','').strip()) #Splitting characters removed
data_list = data_chunk.split('
') #List containing lines in chunk
#print(data_list,'
')
#time.sleep(1)
for j in range(len(data_list)): #iterate through data_list to get each item
i += 1
line_of_data = data_list[j]
print(line_of_data)
这是 Arohi 代码中从“打印数据”正上方开始的代码。
解决方案 13:
这是我找到的关于此问题的最佳解决方案,并且我在 330 MB 的文件上尝试过。
lineno = 500
line_length = 8
with open('catfour.txt', 'r') as file:
file.seek(lineno * (line_length + 2))
print(file.readline(), end='')
其中 line_length 是一行中的字符数。例如,“abcd”的行长为 4。
我在行长度中添加了 2 来跳过 '\n' 字符并移动到下一个字符。
解决方案 14:
当您想要并行工作并且只读取数据块但使用新行保持其干净时,这可能会很有用。
def readInChunks(fileObj, chunkSize=1024):
while True:
data = fileObj.read(chunkSize)
if not data:
break
while data[-1:] != '
':
data+=fileObj.read(1)
yield data