使用 Pandas 的“大数据”工作流程[关闭]
- 2024-11-28 08:37:00
- admin 原创
- 171
问题描述:
在学习 pandas 的过程中,我花了好几个月的时间试图找到这个问题的答案。我日常工作中使用 SAS,它的内核支持非常棒。但是,出于许多其他原因,SAS 作为一款软件非常糟糕。
我希望有一天能用 Python 和 Pandas 取代 SAS,但目前我缺少处理大型数据集的核外工作流程。我说的不是需要分布式网络的“大数据”,而是内存不够大但硬盘足够小的文件。
我的第一个想法是使用HDFStore
它将大型数据集保存在磁盘上,并仅将我需要的部分拉入数据框中进行分析。其他人提到 MongoDB 是一种更易于使用的替代方案。我的问题是:
实现以下目标的一些最佳实践工作流程是什么:
将平面文件加载到永久的磁盘数据库结构中
查询该数据库以检索数据并将其输入到 pandas 数据结构中
在熊猫中操作片段后更新数据库
非常感谢现实世界的例子,特别是那些在“大数据”上使用熊猫的人。
编辑——我希望它如何工作的一个例子:
迭代导入大型平面文件并将其存储在永久的磁盘数据库结构中。这些文件通常太大而无法放入内存中。
为了使用 Pandas,我想要读取可以放入内存的这些数据的子集(通常一次只读取几列)。
我将通过对选定的列执行各种操作来创建新的列。
然后我必须将这些新列附加到数据库结构中。
我正在尝试找到执行这些步骤的最佳实践方法。阅读有关 pandas 和 pytables 的链接后发现,添加新列似乎是一个问题。
编辑——具体回答 Jeff 的问题:
我正在构建消费者信用风险模型。数据类型包括电话、SSN 和地址特征;财产价值;犯罪记录、破产等负面信息……我每天使用的数据集平均有近 1,000 到 2,000 个字段,这些字段属于混合数据类型:连续、名义和有序变量,既有数字数据,也有字符数据。我很少追加行,但我确实执行了许多创建新列的操作。
典型的操作包括使用条件逻辑将多列组合成一个新的复合列。例如,
if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
。这些操作的结果是我的数据集中每条记录都有一个新列。最后,我想将这些新列附加到磁盘数据结构中。我将重复第 2 步,使用交叉表和描述性统计数据探索数据,试图找到有趣、直观的关系来建模。
典型的项目文件通常约为 1GB。文件的组织方式是一行包含一条消费者数据记录。每行的每个记录都有相同数量的列。情况总是如此。
在创建新列时,我很少会按行进行子集化。但是,在创建报告或生成描述性统计数据时,我经常按行进行子集化。例如,我可能想为特定业务线(例如零售信用卡)创建一个简单的频率。为此,除了要报告的任何列之外,我还会仅选择业务线 = 零售的记录。但是,在创建新列时,我会提取所有数据行,并且只提取操作所需的列。
建模过程要求我分析每一列,寻找与某些结果变量的有趣关系,并创建描述这些关系的新复合列。我探索的列通常以小组形式完成。例如,我将重点关注一组仅处理房产价值的 20 列,并观察它们与贷款违约之间的关系。探索完这些列并创建新列后,我将转到另一组列,例如大学教育,然后重复该过程。我所做的是创建候选变量来解释我的数据与某些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程。
我很少向数据集添加行。我几乎总是会创建新的列(统计/机器学习术语中的变量或特征)。
解决方案 1:
我经常以这种方式使用数十 GB 的数据,例如,我在磁盘上有通过查询读取的表,创建数据并附加回来。
值得一读的文档和本线程后面的几条关于如何存储数据的建议。
这些细节将影响您存储数据的方式,例如:
请提供尽可能多的细节;我可以帮助您建立一个结构。
数据的大小、行数、列数、列的类型;您是添加行,还是仅添加列?
典型操作是什么样的。例如,对列进行查询以选择一组行和特定列,然后执行操作(内存中),创建新列,保存这些列。
(给出一个玩具示例可以让我们提供更具体的建议。)
处理完之后,你该做什么?第 2 步是临时的还是可重复的?
输入平面文件:有多少,粗略总大小为 Gb。这些文件如何组织,例如按记录组织?每个文件是否包含不同的字段,或者每个文件是否包含一些记录,每个文件包含所有字段?
您是否曾根据条件选择行(记录)的子集(例如,选择字段 A > 5 的行)?然后执行某些操作,还是仅选择包含所有记录的字段 A、B、C(然后执行某些操作)?
您是否对所有列(分组)进行处理,或者是否有很大比例的列可能仅用于报告(例如,您想保留数据,但不需要在最终结果时间之前明确提取该列)?
解决方案
确保你至少0.10.1
安装了 pandas。
逐块读取迭代文件并进行多表查询。
由于 pytables 针对逐行操作进行了优化(这是您查询的内容),我们将为每组字段创建一个表。这样就很容易选择一小组字段(这适用于大表,但这样做效率更高……我想我将来可能会修复这个限制……无论如何这更直观):(
以下是伪代码。)
import numpy as np
import pandas as pd
# create a store
store = pd.HDFStore('mystore.h5')
# this is the key to your storage:
# this maps your fields to a specific group, and defines
# what you want to have as data_columns.
# you might want to create a nice class wrapping this
# (as you will want to have this map and its inversion)
group_map = dict(
A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
B = dict(fields = ['field_10',...... ], dc = ['field_10']),
.....
REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
)
group_map_inverted = dict()
for g, v in group_map.items():
group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
读取文件并创建存储(本质上执行以下操作append_to_multiple
):
for f in files:
# read in the file, additional options may be necessary here
# the chunksize is not strictly necessary, you may be able to slurp each
# file into memory in which case just eliminate this part of the loop
# (you can also change chunksize if necessary)
for chunk in pd.read_table(f, chunksize=50000):
# we are going to append to each table by group
# we are not going to create indexes at this time
# but we *ARE* going to create (some) data_columns
# figure out the field groupings
for g, v in group_map.items():
# create the frame for this group
frame = chunk.reindex(columns = v['fields'], copy = False)
# append it
store.append(g, frame, index=False, data_columns = v['dc'])
现在您拥有文件中的所有表(实际上,如果您愿意,您可以将它们存储在单独的文件中,您可能必须将文件名添加到 group_map,但这可能不是必需的)。
这是获取列和创建新列的方法:
frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
# select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows
# do calculations on this frame
new_frame = cool_function_on_frame(frame)
# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
当您准备好进行后期处理时:
# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
关于 data_columns,您实际上不需要定义任何data_columns;它们允许您根据列选择行。例如:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
在最终的报告生成阶段,它们可能是您最感兴趣的(本质上,数据列与其他列是分开的,如果您定义很多,可能会在一定程度上影响效率)。
您可能还想:
创建一个函数,该函数接受字段列表,在 groups_map 中查找组,然后选择这些组并连接结果,以便获得结果框架(这实际上是 select_as_multiple 所做的)。这样,结构对您来说就非常透明了。
某些数据列上的索引(使得行子集化速度更快)。
启用压缩。
如果您有疑问请告诉我!
解决方案 2:
我认为上述答案缺少一种我认为非常有用的简单方法。
当我有一个文件太大而无法加载到内存中时,我会将该文件拆分成多个较小的文件(按行或列)
示例:假设有 30 天的交易数据,大小约为 30GB,我将其分成每天一个文件,大小约为 1GB。随后,我分别处理每个文件,并在最后汇总结果
最大的优点之一是它允许并行处理文件(多个线程或进程)
另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规 shell 命令来完成,而这在更高级/复杂的文件格式中是不可能的
这种方法并不涵盖所有场景,但在很多场景中非常有用
解决方案 3:
现在,在这个问题提出两年后,出现了一个“核心外”的 Pandas 等效产品:dask。它非常棒!虽然它不支持所有的 Pandas 功能,但你可以用它走得很远。更新:在过去的两年里,它一直得到持续维护,并且有大量的用户社区在使用 Dask。
现在,问题提出四年后, Vaex中又出现了另一个高性能的“核心外”熊猫等价物。它“使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。”它可以处理数十亿行的数据集,并且不会将它们存储在内存中(甚至可以在次优硬件上进行分析)。
解决方案 4:
选项 1:如果您的数据集在 1 到 20GB 之间,您应该购买具有 48GB RAM 的工作站。这样 Pandas 就可以将整个数据集保存在 RAM 中。我知道这不是您在这里寻找的答案,但在具有 4GB RAM 的笔记本电脑上进行科学计算是不合理的。在 2024 年(问题来自 2015 年),我现在有一台具有 128GB RAM 的 Macbook M3 Max Pro,并鼓励人们获得日常工作所需的 RAM。
选项 2:保留 Pandas,但由多台计算机组成的分布式系统提供支持。Pandas 使用一个或至少几个核心。如此多的数据需要多个核心或 GPU来处理,否则即使适合 RAM,处理速度也会很慢。
这里有一些其他的解决方案,它们既多核又多机,现在任何 Pandas 用户都可以轻松使用。
Dask -扩展您拥有的 Python 工具- 具有与 Pandas 非常相似的 API ,并且可以在多台机器上运行。我总是必须启动自己的 Dask pid - 这可能会令人困惑 - 所以我建议使用像Coiled这样的平台即服务 (PaaS)进行实验,然后再在您自己的机器上进行设置。
PySpark 上的 Pandas API - 截至 2024 年已经比较成熟,但您的里程可能会有所不同(YMMV)。请参阅:快速入门。
Modin -Ray上的 Pandas 。我没有用过 Ray,但很多人用过并且喜欢它。
Dask CUDA - NVIDIARAPIDS的一部分,它可以在 DataFrame 计算上使用多个 GPU,包括复杂的算法和正则表达式之类的东西,这对于科学计算来说非常强大。
选项 3:如果您有大数据,请考虑使用处理大数据的领先框架:PySpark参见:API 参考(保持在选项卡中打开)。您可以混合搭配Spark SQL和 PySpark 的数据流 API,它非常强大。我写了一本关于如何做到这一点的书,名为Agile Data Science 2.0,代码仍然是最新的。
解决方案 5:
我知道这是一个老话题,但我认为Blaze库值得一试。它是为这些类型的情况而构建的。
来自文档:
Blaze 将 NumPy 和 Pandas 的可用性扩展到分布式和核外计算。Blaze 提供了类似于 NumPy ND-Array 或 Pandas DataFrame 的接口,但将这些熟悉的接口映射到各种其他计算引擎(如 Postgres 或 Spark)上。
编辑:顺便说一下,它得到了 ContinuumIO 和 NumPy 的作者 Travis Oliphant 的支持。
解决方案 6:
pymongo 就是这种情况。我还在 python 中使用 sql server、sqlite、HDF、ORM(SQLAlchemy)制作了原型。首先,pymongo 是一个基于文档的数据库,因此每个人都是一个文档(dict
属性文档)。许多人组成一个集合,您可以拥有许多集合(人员、股票市场、收入)。
pd.dateframe -> pymongo 注意:我使用chunksize
inread_csv
将其保存为 5 到 10k 条记录(如果记录较大,pymongo 会删除套接字)
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
查询:gt = 大于...
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
.find()
返回一个迭代器,所以我通常用它ichunked
来切分成更小的迭代器。
因为我通常会将 10 个数据源粘贴在一起,那么如何进行连接呢:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
然后(就我而言,有时我必须先将aJoinDF
其合并,然后才能将其合并。)
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
然后您可以通过下面的更新方法将新信息写入您的主集合。(逻辑集合与物理数据源)。
collection.update({primarykey:foo},{key:change})
对于较小的查找,只需进行非规范化。例如,文档中有代码,您只需添加字段代码文本并dict
在创建文档时进行查找即可。
现在,您有了一个基于人员的优质数据集,您可以根据每个案例发挥您的逻辑并创建更多属性。最后,您可以将 3 到最大内存关键指标读入 pandas 并进行数据透视/聚合/数据探索。这对我来说适用于 300 万条记录,其中包含数字/大文本/类别/代码/浮点数/...
您还可以使用 MongoDB 内置的两种方法(MapReduce 和聚合框架)。有关聚合框架的更多信息,请参见此处,因为它似乎比 MapReduce 更简单,并且对于快速聚合工作来说很方便。请注意,我不需要定义字段或关系,并且可以将项目添加到文档中。在 numpy、pandas、python 工具集快速变化的当前状态下,MongoDB 可以帮助我开始工作 :)
解决方案 7:
我发现,对于大数据用例来说,一个有用的技巧是通过将浮点精度降低到 32 位来减少数据量。它并不适用于所有情况,但在许多应用程序中,64 位精度是过度的,而 2 倍的内存节省是值得的。为了使一个显而易见的观点更加明显:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB
>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
解决方案 8:
我发现这个问题有点晚,但我遇到了类似的问题(抵押贷款预付款模型)。我的解决方案是跳过 pandas HDFStore 层并使用直接 pytables。我将每列保存为最终文件中的单独 HDF5 数组。
我的基本工作流程是首先从数据库中获取一个 CSV 文件。我对其进行 gzip 压缩,这样它就不那么大了。然后我将其转换为面向行的 HDF5 文件,方法是在 python 中对其进行迭代,将每行转换为真实数据类型,然后将其写入 HDF5 文件。这需要几十分钟,但它不占用任何内存,因为它只是逐行操作。然后我将面向行的 HDF5 文件“转置”为面向列的 HDF5 文件。
表格转置如下:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
# Get a reference to the input data.
tb = h_in.getNode(table_path)
# Create the output group to hold the columns.
grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
for col_name in tb.colnames:
logger.debug("Processing %s", col_name)
# Get the data.
col_data = tb.col(col_name)
# Create the output array.
arr = h_out.createCArray(grp,
col_name,
tables.Atom.from_dtype(col_data.dtype),
col_data.shape)
# Store the data.
arr[:] = col_data
h_out.flush()
然后读回来看起来像:
def read_hdf5(hdf5_path, group_path="/data", columns=None):
"""Read a transposed data set from a HDF5 file."""
if isinstance(hdf5_path, tables.file.File):
hf = hdf5_path
else:
hf = tables.openFile(hdf5_path)
grp = hf.getNode(group_path)
if columns is None:
data = [(child.name, child[:]) for child in grp]
else:
data = [(child.name, child[:]) for child in grp if child.name in columns]
# Convert any float32 columns to float64 for processing.
for i in range(len(data)):
name, vec = data[i]
if vec.dtype == np.float32:
data[i] = (name, vec.astype(np.float64))
if not isinstance(hdf5_path, tables.file.File):
hf.close()
return pd.DataFrame.from_items(data)
现在,我通常在具有大量内存的机器上运行此程序,因此我可能对内存使用不够谨慎。例如,默认情况下,加载操作会读取整个数据集。
这对我来说通常有效,但它有点笨重,而且我不能使用花哨的 pytables 魔法。
编辑:与记录数组 pytables 默认方法相比,这种方法的真正优势在于,我可以使用 h5r 将数据加载到 R 中,而 h5r 无法处理表格。或者,至少,我无法让它加载异构表格。
解决方案 9:
正如其他人所指出的,几年后出现了一个“核心外”的 Pandas 等效产品:dask。虽然 dask 不是 Pandas 及其所有功能的替代品,但它因以下几个原因而脱颖而出:
Dask 是一个用于分析计算的灵活并行计算库,它针对“大数据”集合(如并行数组、数据框和列表)的交互式计算工作负载的动态任务调度进行了优化,将 NumPy、Pandas 或 Python 迭代器等常用接口扩展到大于内存或分布式环境,并从笔记本电脑扩展到集群。
Dask 强调以下美德:
熟悉:提供并行化的 NumPy 数组和 Pandas DataFrame 对象
灵活:提供任务调度接口,以实现更多自定义工作负载和与其他项目的集成。
本机:通过访问 PyData 堆栈,实现纯 Python 中的分布式计算。
快速:以低开销、低延迟和快速数值算法所需的最低限度的序列化方式运行
纵向扩展:在具有 1000 个核心的集群上弹性运行 横向扩展:在笔记本电脑上通过单个进程轻松设置和运行
响应性:设计时考虑到了交互式计算,它提供了快速反馈和诊断来帮助人类
并添加一个简单的代码示例:
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
替换一些这样的熊猫代码:
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
尤其值得注意的是,通过concurrent.futures
界面提供了提交自定义任务的通用基础设施:
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
解决方案 10:
这里也值得一提的是Ray
,
它是一个分布式计算框架,它以分布式方式为 pandas 提供了自己的实现。
只需替换 pandas 导入,代码就可以按原样工作:
# import pandas as pd
import ray.dataframe as pd
# use pd as usual
可以在这里阅读更多详细信息:
https://rise.cs.berkeley.edu/blog/pandas-on-ray/
更新:处理 pandas 分布的部分已被提取到modin项目中。
现在正确的使用方法是:
# import pandas as pd
import modin.pandas as pd
解决方案 11:
另一个变化
pandas 中的许多操作也可以通过数据库查询(sql、mongo)来完成
使用 RDBMS 或 mongodb 允许您在 DB 查询中执行某些聚合(针对大数据进行了优化,并有效地使用了缓存和索引)
之后,您可以使用 pandas 进行后期处理。
这种方法的优点是,您可以获得处理大数据的数据库优化,同时仍然以高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么以及在核心外做什么的细节。
尽管查询语言和 pandas 不同,但将部分逻辑从一种语言转换到另一种语言通常并不复杂。
解决方案 12:
如果您选择创建分解为多个较小文件的数据管道的简单路径,请考虑Ruffus 。
解决方案 13:
我想指出 Vaex 包。
Vaex 是一个用于惰性 Out-of-Core DataFrames(类似于 Pandas)的 Python 库,用于可视化和探索大型表格数据集。它可以在 N 维网格上计算平均值、总和、计数、标准差等统计数据,每秒最多可计算十亿 (10 9 ) 个对象/行。可视化是使用直方图、密度图和 3D 体积渲染完成的,允许交互式探索大数据。Vaex 使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。
查看文档:https://vaex.readthedocs.io/en/latest/
该 API 与 pandas 的 API 非常接近。
解决方案 14:
我最近遇到了类似的问题。我发现简单地分块读取数据并将其附加到同一个 csv 中效果很好。我的问题是根据另一个表中的信息添加日期列,使用某些列的值如下。这可能会帮助那些对 dask 和 hdf5 感到困惑但更熟悉 pandas 的人,比如我。
def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k
rows at a time and outputs them, appending as needed, to a single csv.
Uses the column of the raster names to get the date.
"""
df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True,
chunksize=100000) #read csv file as 100k chunks
'''Do some stuff'''
count = 1 #for indexing item in time list
for chunk in df: #for each 100k rows
newtime = [] #empty list to append repeating times for different rows
toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
while count <= toiterate.max():
for i in toiterate:
if i ==count:
newtime.append(newyears[count])
count+=1
print "Finished", str(chunknum), "chunks"
chunk["time"] = newtime #create new column in dataframe based on time
outname = "CHIRPS_tanz_time2.csv"
#append each output to same csv, using no header
chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)
解决方案 15:
parquet 文件格式非常适合您描述的用例。您可以使用以下命令高效地读取特定的列子集pd.read_parquet(path_to_file, columns=["foo", "bar"])
https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html
解决方案 16:
目前,我正在“像”你一样工作,只是规模较低,这就是为什么我没有对我的建议提出 PoC。
但是,我似乎发现使用 pickle 作为缓存系统并将各种功能的执行外包到文件中 - 从我的命令/主文件执行这些文件是成功的;例如,我使用 prepare_use.py 来转换对象类型,将数据集分成测试、验证和预测数据集。
使用 pickle 进行缓存是如何工作的?我使用字符串来访问动态创建的 pickle 文件,具体取决于传递了哪些参数和数据集(我尝试通过字符串来捕获并确定程序是否已运行,使用 .shape 表示数据集,使用 dict 表示传递的参数)。根据这些措施,我得到一个字符串来尝试查找和读取 .pickle 文件,如果找到,则可以跳过处理时间,以便跳转到我现在正在执行的执行。
使用数据库时,我也遇到了类似的问题,这也是为什么我乐于使用这个解决方案的原因,但是——肯定有很多限制——例如,由于冗余而存储大量的 pickle 集。可以通过适当的索引来更新转换前后的表——验证信息则需要翻开另一本书(我尝试合并爬取的租金数据,基本上在 2 小时后停止使用数据库——因为我希望在每次转换过程后都跳回来)
我希望我的意见能对你有所帮助。
问候。