如何逐行读取大型文本文件,而不将它们加载到内存中?[重复]

2024-12-17 08:30:00
admin
原创
152
摘要:问题描述:我想逐行读取一个大文件(>5GB),而不将其所有内容加载到内存中。我无法使用,readlines()因为它会在内存中创建一个非常大的列表。解决方案 1:使用for文件对象上的循环逐行读取。使用with open(...)上下文管理器确保读取后关闭文件:with open("log.t...

问题描述:

我想逐行读取一个大文件(>5GB),而不将其所有内容加载到内存中。我无法使用,readlines()因为它会在内存中创建一个非常大的列表。


解决方案 1:

使用for文件对象上的循环逐行读取。使用with open(...)上下文管理器确保读取后关闭文件:

with open("log.txt") as infile:
    for line in infile:
        print(line)

解决方案 2:

您需要做的就是使用文件对象作为迭代器。

for line in open("log.txt"):
    do_something_with(line)

更好的是在最近的 Python 版本中使用上下文管理器。

with open("log.txt") as fileobject:
    for line in fileobject:
        do_something_with(line)

这也会自动关闭该文件。

解决方案 3:

最好使用迭代器。

相关: fileinput——迭代来自多个输入流的行。

来自文档:

import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
    process(line)

这将避免一次将整个文件复制到内存中。

解决方案 4:

一种老派的方法:

fh = open(file_name, 'rt')
line = fh.readline()
while line:
    # do stuff with line
    line = fh.readline()
fh.close()

解决方案 5:

请尝试以下操作:

with open('filename','r',buffering=100000) as f:
    for line in f:
        print line

解决方案 6:

如果文件中没有换行符,请执行以下操作:

with open('large_text.txt') as f:
  while True:
    c = f.read(1024)
    if not c:
      break
    print(c,end='')

解决方案 7:

我简直不敢相信它竟然像 @john-la-rooy 的回答那样简单。因此,我cp使用逐行读写重新创建了该命令。速度快得令人难以置信。

#!/usr/bin/env python3.6

import sys

with open(sys.argv[2], 'w') as outfile:
    with open(sys.argv[1]) as infile:
        for line in infile:
            outfile.write(line)

解决方案 8:

blaze项目在过去 6 年中取得了长足进步。它有一个简单的 API,涵盖了 Pandas 功能的一个有用子集。

dask.dataframe负责内部分块,支持许多可并行操作,并允许您轻松地将切片导出回 pandas 以进行内存操作。

import dask.dataframe as dd

df = dd.read_csv('filename.csv')
df.head(10)  # return first 10 rows
df.tail(10)  # return last 10 rows

# iterate rows
for idx, row in df.iterrows():
    ...

# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()

# slice by column
df[df.my_field=='XYZ'].compute()

解决方案 9:

这是用于加载任意大小的文本文件而不会引起内存问题的代码。
它支持 GB 大小的文件

https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d

下载文件data_loading_utils.py并将其导入到你的代码中

用法

import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000


def process_lines(data, eof, file_name):

    # check if end of file reached
    if not eof:
         # process data, data is one single line of the file

    else:
         # end of file reached

data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)

process_lines方法是回调函数。它将针对所有行调用,参数数据每次代表文件的一行。

您可以根据机器硬件配置来配置变量CHUNK_SIZE 。

解决方案 10:

我意识到这个问题很久以前就有人回答过了,但这里有一种并行执行的方法,不会消耗内存(如果你试图将每行都放入池中,就会出现这种情况)。显然,将 readJSON_line2 函数换成更合理的函数 - 只是为了说明这一点!

加速将取决于文件大小以及您对每一行执行的操作 - 但最坏的情况是对于小文件并且仅使用 JSON 读取器读取它,我看到使用以下设置的 ST 具有类似的性能。

希望对某些人有用:

def readJSON_line2(linesIn):
  #Function for reading a chunk of json lines
   '''
   Note, this function is nonsensical. A user would never use the approach suggested 
   for reading in a JSON file, 
   its role is to evaluate the MT approach for full line by line processing to both 
   increase speed and reduce memory overhead
   '''
   import json

   linesRtn = []
   for lineIn in linesIn:

       if lineIn.strip() != 0:
           lineRtn = json.loads(lineIn)
       else:
           lineRtn = ""
        
       linesRtn.append(lineRtn)

   return linesRtn




# -------------------------------------------------------------------
if __name__ == "__main__":
   import multiprocessing as mp

   path1 = "C:\/user\\Documents\\\"
   file1 = "someBigJson.json"

   nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
   nChunk = 1000 # How many lines are in each chunk
   #Both of the above will require balancing speed against memory overhead

   iJob = 0  #Tracker for SMP jobs submitted into pool
   iiJob = 0  #Tracker for SMP jobs extracted back out of pool

   jobs = []  #SMP job holder
   MTres3 = []  #Final result holder
   chunk = []  
   iBuffer = 0 # Buffer line count
   with open(path1+file1) as f:
      for line in f:
            
          #Send to the chunk
          if len(chunk) < nChunk:
              chunk.append(line)
          else:
              #Chunk full
              #Don't forget to add the current line to chunk
              chunk.append(line)
                
              #Then add the chunk to the buffer (submit to SMP pool)                  
              jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
              iJob +=1
              iBuffer +=1
              #Clear the chunk for the next batch of entries
              chunk = []
                            
          #Buffer is full, any more chunks submitted would cause undue memory overhead
          #(Partially) empty the buffer
          if iBuffer >= nBuffer:
              temp1 = jobs[iiJob].get()
              for rtnLine1 in temp1:
                  MTres3.append(rtnLine1)
              iBuffer -=1
              iiJob+=1
            
      #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
      if chunk:
          jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
          iJob +=1
          iBuffer +=1

      #And gather up the last of the buffer, including the final chunk
      while iiJob < iJob:
          temp1 = jobs[iiJob].get()
          for rtnLine1 in temp1:
              MTres3.append(rtnLine1)
          iiJob+=1

   #Cleanup
   del chunk, jobs, temp1
   pool.close()

解决方案 11:

怎么样?将文件分成几块,然后逐行读取,因为读取文件时,操作系统会缓存下一行。如果逐行读取文件,则无法有效利用缓存的信息。

相反,将文件分成块并将整个块加载到内存中,然后进行处理。

def chunks(file,size=1024):
    while 1:

        startat=fh.tell()
        print startat #file's object current position from the start
        fh.seek(size,1) #offset from current postion -->1
        data=fh.readline()
        yield startat,fh.tell()-startat #doesnt store whole list in memory
        if not data:
            break
if os.path.isfile(fname):
    try:
        fh=open(fname,'rb') 
    except IOError as e: #file --> permission denied
        print "I/O error({0}): {1}".format(e.errno, e.strerror)
    except Exception as e1: #handle other exceptions such as attribute errors
        print "Unexpected error: {0}".format(e1)
    for ele in chunks(fh):
        fh.seek(ele[0])#startat
        data=fh.read(ele[1])#endat
        print data

解决方案 12:

谢谢!我最近转换到了 Python 3,但使用 readlines(0) 读取大文件时却很沮丧。这解决了问题。但为了获取每一行,我不得不多做几个步骤。每行前面都有一个“b”,我猜它是二进制格式。使用“decode(utf-8)”将其更改为 ascii。

然后我必须删除每行中间的“=\n”。

然后我在新线上分割线条。

b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
        a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
        data_chunk = (a_data.replace('=
','').strip()) #Splitting characters removed
        data_list = data_chunk.split('
')  #List containing lines in chunk
        #print(data_list,'
')
        #time.sleep(1)
        for j in range(len(data_list)): #iterate through data_list to get each item 
            i += 1
            line_of_data = data_list[j]
            print(line_of_data)

这是 Arohi 代码中从“打印数据”正上方开始的代码。

解决方案 13:

这是我找到的关于此问题的最佳解决方案,并且我在 330 MB 的文件上尝试过。

lineno = 500
line_length = 8
with open('catfour.txt', 'r') as file:
    file.seek(lineno * (line_length + 2))
    print(file.readline(), end='')

其中 line_length 是一行中的字符数。例如,“abcd”的行长为 4。

我在行长度中添加了 2 来跳过 '\n' 字符并移动到下一个字符。

解决方案 14:

当您想要并行工作并且只读取数据块但使用新行保持其干净时,这可能会很有用。

def readInChunks(fileObj, chunkSize=1024):
    while True:
        data = fileObj.read(chunkSize)
        if not data:
            break
        while data[-1:] != '
':
            data+=fileObj.read(1)
        yield data
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用