如何使用 pandas 读取大型 csv 文件?
- 2024-11-25 08:49:00
- admin 原创
- 173
问题描述:
我正在尝试在 Pandas 中读取一个大型 csv 文件(大约 6 GB),但出现了内存错误:
MemoryError Traceback (most recent call last)
<ipython-input-58-67a72687871b> in <module>()
----> 1 data=pd.read_csv('aphro.csv',sep=';')
...
MemoryError:
有什么帮助吗?
解决方案 1:
该错误表明机器没有足够的内存一次性将整个 CSV 读入 DataFrame。假设您不需要一次性将整个数据集放入内存中,避免此问题的一种方法是分块处理 CSV(通过指定chunksize
参数):
chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
# chunk is a DataFrame. To "process" the rows in the chunk:
for index, row in chunk.iterrows():
print(row)
该chunksize
参数指定每个块的行数。(chunksize
当然,最后一个块可能包含少于行数。)
pandas >= 1.2
read_csv
withchunksize
返回一个上下文管理器,使用方式如下:
chunksize = 10 ** 6
with pd.read_csv(filename, chunksize=chunksize) as reader:
for chunk in reader:
process(chunk)
查看GH38225
解决方案 2:
分块不应该总是解决这个问题的首选方法。
文件是否由于重复的非数字数据或不需要的列而很大?
如果是这样,您有时可以通过将列作为类别读取并通过pd.read_csv 参数选择所需的列来节省大量内存usecols
。
您的工作流程是否需要切片、操作、导出?
如果是这样,您可以使用dask.dataframe进行切片、执行计算并迭代导出。分块由 dask 静默执行,它还支持 pandas API 的子集。
如果其他方法都失败了,请通过块逐行读取。
作为最后的手段,通过 pandas或csv 库进行分块。
解决方案 3:
对于大数据,我建议您使用库“dask”,
例如:
# Dataframes implement the Pandas API
import dask.dataframe as dd
df = dd.read_csv('s3://.../2018-*-*.csv')
您可以在此处阅读更多文档。
另一个很好的选择是使用modin,因为所有功能与 pandas 相同,但它利用了 dask 等分布式数据框库。
从我的项目来看,另一个更优秀的库是datatables。
# Datatable python library
import datatable as dt
df = dt.fread("s3://.../2018-*-*.csv")
解决方案 4:
我这样做:
chunks=pd.read_table('aphro.csv',chunksize=1000000,sep=';',\n names=['lat','long','rf','date','slno'],index_col='slno',\n header=None,parse_dates=['date'])
df=pd.DataFrame()
%time df=pd.concat(chunk.groupby(['lat','long',chunk['date'].map(lambda x: x.year)])['rf'].agg(['sum']) for chunk in chunks)
解决方案 5:
您可以将数据以块的形式读入,并将每个块保存为 pickle。
import pandas as pd
import pickle
in_path = "" #Path where the large file is
out_path = "" #Path to save the pickle files to
chunk_size = 400000 #size of chunks relies on your available memory
separator = "~"
reader = pd.read_csv(in_path,sep=separator,chunksize=chunk_size,
low_memory=False)
for i, chunk in enumerate(reader):
out_file = out_path + "/data_{}.pkl".format(i+1)
with open(out_file, "wb") as f:
pickle.dump(chunk,f,pickle.HIGHEST_PROTOCOL)
在下一步中,您将读取泡菜并将每个泡菜附加到所需的数据框中。
import glob
pickle_path = "" #Same Path as out_path i.e. where the pickle files are
data_p_files=[]
for name in glob.glob(pickle_path + "/data_*.pkl"):
data_p_files.append(name)
df = pd.DataFrame([])
for i in range(len(data_p_files)):
df = df.append(pd.read_pickle(data_p_files[i]),ignore_index=True)
解决方案 6:
我想基于已经提供的大多数潜在解决方案做出更全面的回答。我还想指出可能有助于阅读过程的另一个潜在帮助。
选项 1:dtypes
“dtypes” 是一个非常强大的参数,可用于减少read
方法的内存压力。请参阅此和此答案。默认情况下,Pandas 会尝试推断数据的 dtypes。
参照数据结构,每个存储的数据都会进行内存分配。基本层面上参考以下值(下表说明了 C 编程语言的值):
The maximum value of UNSIGNED CHAR = 255
The minimum value of SHORT INT = -32768
The maximum value of SHORT INT = 32767
The minimum value of INT = -2147483648
The maximum value of INT = 2147483647
The minimum value of CHAR = -128
The maximum value of CHAR = 127
The minimum value of LONG = -9223372036854775808
The maximum value of LONG = 9223372036854775807
请参阅此页面查看 NumPy 和 C 类型之间的匹配。
假设您有一个数字的整数数组。您可以在理论上和实践上分配 16 位整数类型的数组,但这样您分配的内存将超过存储该数组的实际需要。为了防止这种情况,您可以设置dtype
选项read_csv
。您不想将数组项存储为长整数,实际上您可以使用 8 位整数(np.int8
或np.uint8
)来容纳它们。
观察以下 dtype 图。
来源:https://pbpython.com/pandas_dtypes.html
您可以将dtype
参数作为 pandas 方法的参数作为字典传递,read
例如 {column: type}。
import numpy as np
import pandas as pd
df_dtype = {
"column_1": int,
"column_2": str,
"column_3": np.int16,
"column_4": np.uint8,
...
"column_n": np.float32
}
df = pd.read_csv('path/to/file', dtype=df_dtype)
选项 2:分块阅读
通过分块读取数据,您可以访问内存中的部分数据,并且可以对数据进行预处理并保留处理后的数据而不是原始数据。如果将此选项与第一个选项dtypes结合起来,效果会更好。
我想指出 pandas cookbook 中有关该过程的部分,您可以在这里找到它。请注意那里的两个部分;
逐块读取 csv
逐块读取 csv 中的特定行
选项 3:Dask
Dask 是一个框架,在Dask 的网站上定义如下:
Dask 为分析提供了高级并行性,为您喜爱的工具提供大规模性能
它诞生是为了覆盖 pandas 无法触及的必要部分。Dask 是一个强大的框架,它通过分布式方式处理数据,让你可以访问更多的数据。
您可以使用 dask 来整体预处理数据,Dask 负责分块部分,因此与 pandas 不同,您只需定义处理步骤并让 Dask 完成工作即可。Dask 在明确通过compute
and/or推动之前不会应用计算(请参阅此处的persist
答案以了解区别)。
其他援助(想法)
为数据设计的 ETL 流程。仅保留原始数据中需要的部分。
+ 首先,使用 Dask 或 PySpark 等框架对整个数据进行 ETL,并导出处理后的数据。
+ 然后看看处理后的数据是否能整体装入内存中。
考虑增加你的 RAM。
考虑在云平台上处理该数据。
解决方案 7:
在使用 chunksize 选项之前,如果您想确定要在分块 for 循环中编写的流程函数,如@unutbu 所述,您可以简单地使用 nrows 选项。
small_df = pd.read_csv(filename, nrows=100)
一旦确定流程块已准备就绪,就可以将其放入整个数据框的分块 for 循环中。
解决方案 8:
read_csv 函数和 read_table 函数基本相同,但在程序中使用 read_table 函数时必须指定分隔符“,”。
def get_from_action_data(fname, chunk_size=100000):
reader = pd.read_csv(fname, header=0, iterator=True)
chunks = []
loop = True
while loop:
try:
chunk = reader.get_chunk(chunk_size)[["user_id", "type"]]
chunks.append(chunk)
except StopIteration:
loop = False
print("Iteration is stopped")
df_ac = pd.concat(chunks, ignore_index=True)
解决方案 9:
解决方案 1:
使用 Pandas 处理大数据
解决方案 2:
TextFileReader = pd.read_csv(path, chunksize=1000) # the number of rows per chunk
dfList = []
for df in TextFileReader:
dfList.append(df)
df = pd.concat(dfList,sort=False)
解决方案 10:
下面是一个例子:
chunkTemp = []
queryTemp = []
query = pd.DataFrame()
for chunk in pd.read_csv(file, header=0, chunksize=<your_chunksize>, iterator=True, low_memory=False):
#REPLACING BLANK SPACES AT COLUMNS' NAMES FOR SQL OPTIMIZATION
chunk = chunk.rename(columns = {c: c.replace(' ', '') for c in chunk.columns})
#YOU CAN EITHER:
#1)BUFFER THE CHUNKS IN ORDER TO LOAD YOUR WHOLE DATASET
chunkTemp.append(chunk)
#2)DO YOUR PROCESSING OVER A CHUNK AND STORE THE RESULT OF IT
query = chunk[chunk[<column_name>].str.startswith(<some_pattern>)]
#BUFFERING PROCESSED DATA
queryTemp.append(query)
#! NEVER DO pd.concat OR pd.DataFrame() INSIDE A LOOP
print("Database: CONCATENATING CHUNKS INTO A SINGLE DATAFRAME")
chunk = pd.concat(chunkTemp)
print("Database: LOADED")
#CONCATENATING PROCESSED DATA
query = pd.concat(queryTemp)
print(query)
解决方案 11:
您可以尝试 sframe,它具有与 pandas 相同的语法,但允许您操作大于 RAM 的文件。
解决方案 12:
如果你使用 pandas 将大文件读入块,然后逐行输出,这就是我所做的
import pandas as pd
def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
for chunk in pd.read_csv(filename,delimiter=',', iterator=True, chunksize=chunk_size, parse_dates=[1] ):
yield (chunk)
def _generator( filename, header=False,chunk_size = 10 ** 5):
chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
for row in chunk:
yield row
if __name__ == "__main__":
filename = r'file.csv'
generator = generator(filename=filename)
while True:
print(next(generator))
解决方案 13:
如果有人仍在寻找类似的东西,我发现这个名为modin的新库可以提供帮助。它使用分布式计算来帮助读取。这里有一篇很好的文章,将其功能与 pandas 进行了比较。它本质上使用与 pandas 相同的功能。
import modin.pandas as pd
pd.read_csv(CSV_FILE_NAME)
解决方案 14:
除了上述答案之外,对于那些想要处理 CSV 然后导出到 csv、parquet 或 SQL 的人来说,d6tstack是另一个不错的选择。您可以加载多个文件,它可以处理数据架构更改(添加/删除列)。核心支持的 Chunked 已经内置。
def apply(dfg):
# do stuff
return dfg
c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',', chunksize=1e6)
# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply, chunksize=1e6)
# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible
解决方案 15:
如果您有csv
包含数据输入的文件millions
,并且想要加载完整数据集,则应该使用dask_cudf
,
import dask_cudf as dc
df = dc.read_csv("large_data.csv")
解决方案 16:
def read_csv_with_progress(file_path, sep):
import pandas as pd
from tqdm import tqdm
chunk_size = 50000 # Number of lines to read in each iteration
# Get the total number of lines in the CSV file
print("Calculating average line length + getting file size")
counter = 0
total_length = 0
num_to_sample = 10
for line in open(file_path, 'r'):
counter += 1
if counter > 1:
total_length += len(line)
if counter == num_to_sample + 1:
break
file_size = os.path.getsize(file_path)
avg_line_length = total_length / num_to_sample
avg_number_of_lines = int(file_size / avg_line_length)
chunks = []
with tqdm(total=avg_number_of_lines, desc='Reading CSV') as pbar:
for chunk in pd.read_csv(file_path, chunksize=chunk_size, low_memory=False, sep=sep):
chunks.append(chunk)
pbar.update(chunk.shape[0])
print("Concating...")
df = pd.concat(chunks, ignore_index=True)
return df