在 numpy 数组上映射函数的最有效方法
- 2024-11-28 08:37:00
- admin 原创
- 7
问题描述:
将函数映射到 numpy 数组上的最有效方法是什么?我目前正在做:
import numpy as np
x = np.array([1, 2, 3, 4, 5])
# Obtain array of square of each element in x
squarer = lambda t: t ** 2
squares = np.array([squarer(xi) for xi in x])
但是,这可能非常低效,因为我使用列表推导将新数组构造为 Python 列表,然后再将其转换回 numpy 数组。我们可以做得更好吗?
解决方案 1:
我已经测试了所有建议的方法以及np.array(list(map(f, x)))
(perfplot
我的一个小项目)。
消息 #1:如果您可以使用 numpy 的本机函数,请这样做。
如果您尝试矢量化的函数已经矢量化(如x**2
原始帖子中的示例),则使用它比其他任何方法都要快得多(请注意对数刻度):
如果您确实需要矢量化,那么使用哪种变体并不重要。
重现情节的代码:
import numpy as np
import perfplot
import math
def f(x):
# return math.sqrt(x)
return np.sqrt(x)
vf = np.vectorize(f)
def array_for(x):
return np.array([f(xi) for xi in x])
def array_map(x):
return np.array(list(map(f, x)))
def fromiter(x):
return np.fromiter((f(xi) for xi in x), x.dtype)
def vectorize(x):
return np.vectorize(f)(x)
def vectorize_without_init(x):
return vf(x)
b = perfplot.bench(
setup=np.random.rand,
n_range=[2 ** k for k in range(20)],
kernels=[
f,
array_for,
array_map,
fromiter,
vectorize,
vectorize_without_init,
],
xlabel="len(x)",
)
b.save("out1.svg")
b.show()
解决方案 2:
使用numpy.vectorize
:
import numpy as np
x = np.array([1, 2, 3, 4, 5])
squarer = lambda t: t ** 2
vfunc = np.vectorize(squarer)
vfunc(x)
# Output: array([ 1, 4, 9, 16, 25])
解决方案 3:
总结
正如@user2357112所指出的,应用函数的“直接”方法始终是在 Numpy 数组上映射函数的最快和最简单的方法:
import numpy as np
x = np.array([1, 2, 3, 4, 5])
f = lambda x: x ** 2
squares = f(x)
一般情况下应避免使用np.vectorize
,因为它的性能不佳,并且存在(或曾经存在)许多问题。如果您正在处理其他数据类型,则可能需要研究下面显示的其他方法。
方法比较
以下是一些简单的测试,用于比较三种映射函数的方法,此示例使用 Python 3.6 和 NumPy 1.15.4。首先,设置测试函数:
import timeit
import numpy as np
f = lambda x: x ** 2
vf = np.vectorize(f)
def test_array(x, n):
t = timeit.timeit(
'np.array([f(xi) for xi in x])',
'from __main__ import np, x, f', number=n)
print('array: {0:.3f}'.format(t))
def test_fromiter(x, n):
t = timeit.timeit(
'np.fromiter((f(xi) for xi in x), x.dtype, count=len(x))',
'from __main__ import np, x, f', number=n)
print('fromiter: {0:.3f}'.format(t))
def test_direct(x, n):
t = timeit.timeit(
'f(x)',
'from __main__ import x, f', number=n)
print('direct: {0:.3f}'.format(t))
def test_vectorized(x, n):
t = timeit.timeit(
'vf(x)',
'from __main__ import x, vf', number=n)
print('vectorized: {0:.3f}'.format(t))
使用五个元素进行测试(从最快到最慢排序):
x = np.array([1, 2, 3, 4, 5])
n = 100000
test_direct(x, n) # 0.265
test_fromiter(x, n) # 0.479
test_array(x, n) # 0.865
test_vectorized(x, n) # 2.906
拥有数百个元素:
x = np.arange(100)
n = 10000
test_direct(x, n) # 0.030
test_array(x, n) # 0.501
test_vectorized(x, n) # 0.670
test_fromiter(x, n) # 0.883
并且数组元素有 1000 个或更多:
x = np.arange(1000)
n = 1000
test_direct(x, n) # 0.007
test_fromiter(x, n) # 0.479
test_array(x, n) # 0.516
test_vectorized(x, n) # 0.945
不同版本的 Python/NumPy 和编译器优化会有不同的结果,因此请针对您的环境做类似的测试。
解决方案 4:
周围有numexpr,numba和cython,这个答案的目标是考虑到这些可能性。
但首先让我们说明一个显而易见的事实:无论你如何将 Python 函数映射到 numpy 数组,它仍然是一个 Python 函数,这意味着对于每次评估:
numpy 数组元素必须转换为 Python 对象(例如
Float
)。所有计算都是用 Python 对象完成的,这意味着有解释器、动态调度和不可变对象的开销。
因此,由于上述开销,使用哪种机制来实际循环遍历数组并不会起到很大的作用 - 它比使用 numpy 的内置功能慢得多。
我们来看看下面的例子:
# numpy-functionality
def f(x):
return x+2*x*x+4*x*x*x
# python-function as ufunc
import numpy as np
vf=np.vectorize(f)
vf.__name__="vf"
np.vectorize
被选为纯 Python 函数类方法的代表。使用perfplot
(参见本答案附录中的代码)我们得到以下运行时间:
我们可以看到,numpy 方法比纯 Python 版本快 10 到 100 倍。数组大小越大,性能下降的原因可能是数据不再适合缓存。
值得一提的是,这vectorize
也占用了大量的内存,因此内存使用往往是瓶颈(参见相关的SO 问题)。另请注意,numpy 的文档指出np.vectorize
它“主要是为了方便,而不是为了性能”。
当需要提高性能时,应该使用其他工具,除了从头开始编写 C 扩展之外,还有以下可能性:
人们经常听说,numpy 的性能已经达到了极致,因为它的底层是纯 C 语言。然而,还有很大的改进空间!
矢量化的 numpy 版本使用了大量额外的内存和内存访问。Numexp-library 尝试平铺 numpy 数组,从而获得更好的缓存利用率:
# less cache misses than numpy-functionality
import numexpr as ne
def ne_f(x):
return ne.evaluate("x+2*x*x+4*x*x*x")
得出以下比较:
我无法解释上图中的所有内容:我们可以在开始时看到 numexpr-library 的开销更大,但是因为它更好地利用了缓存,所以对于更大的数组来说它的速度大约快 10 倍!
另一种方法是 jit-compile 该函数,从而获得真正的纯 C UFunc。这是 numba 的方法:
# runtime generated C-function as ufunc
import numba as nb
@nb.vectorize(target="cpu")
def nb_vf(x):
return x+2*x*x+4*x*x*x
它比原始的 numpy 方法快 10 倍:
然而,这个任务很难并行化,因此我们也可以使用prange
来并行计算循环:
@nb.njit(parallel=True)
def nb_par_jitf(x):
y=np.empty(x.shape)
for i in nb.prange(len(x)):
y[i]=x[i]+2*x[i]*x[i]+4*x[i]*x[i]*x[i]
return y
正如预期的那样,对于较小的输入,并行函数速度较慢,但对于较大的输入,并行函数速度更快(几乎是原来的 2 倍):
虽然 numba 专注于优化使用 numpy 数组的操作,但 Cython 是一种更通用的工具。要获得与 numba 相同的性能更为复杂 - 通常取决于 llvm(numba)与本地编译器(gcc/MSVC):
%%cython -c=/openmp -a
import numpy as np
import cython
#single core:
@cython.boundscheck(False)
@cython.wraparound(False)
def cy_f(double[::1] x):
y_out=np.empty(len(x))
cdef Py_ssize_t i
cdef double[::1] y=y_out
for i in range(len(x)):
y[i] = x[i]+2*x[i]*x[i]+4*x[i]*x[i]*x[i]
return y_out
#parallel:
from cython.parallel import prange
@cython.boundscheck(False)
@cython.wraparound(False)
def cy_par_f(double[::1] x):
y_out=np.empty(len(x))
cdef double[::1] y=y_out
cdef Py_ssize_t i
cdef Py_ssize_t n = len(x)
for i in prange(n, nogil=True):
y[i] = x[i]+2*x[i]*x[i]+4*x[i]*x[i]*x[i]
return y_out
Cython 导致函数速度稍微慢一些:
结论
显然,只测试一个函数并不能证明什么。另外,还应该记住,对于所选的函数示例,内存带宽是大于 10^5 个元素的瓶颈 - 因此我们在这个区域对 numba、numexpr 和 cython 的性能相同。
最后,最终答案取决于函数类型、硬件、Python 分布和其他因素。例如,Anaconda-distribution 使用 Intel 的 VML 作为 numpy 函数,因此对于诸如、和类似的超越函数,其性能轻松胜过 numba(除非它使用 SVML,请参阅此SO -post),例如参见以下SO-post。exp
`sin`cos
然而,从这次调查和我迄今为止的经验来看,我想说,只要不涉及超越函数,numba 似乎是最简单、性能最好的工具。
使用perfplot包绘制运行时间:
import perfplot
perfplot.show(
setup=lambda n: np.random.rand(n),
n_range=[2**k for k in range(0,24)],
kernels=[
f,
vf,
ne_f,
nb_vf, nb_par_jitf,
cy_f, cy_par_f,
],
logx=True,
logy=True,
xlabel='len(x)'
)
解决方案 5:
squares = squarer(x)
数组上的算术运算会自动按元素应用,并具有高效的 C 级循环,可避免 Python 级循环或理解所产生的所有解释器开销。
您想要逐元素应用于 NumPy 数组的大多数函数都可以正常工作,但有些可能需要更改。例如,if
无法逐元素工作。您需要将它们转换为使用如下结构numpy.where
:
def using_if(x):
if x < 5:
return x
else:
return x**2
变成
def using_where(x):
return numpy.where(x < 5, x, x**2)
解决方案 6:
ufunc
似乎没有人提到过 numpy 包中内置的工厂方法: np.frompyfunc
,我已经对其进行了测试np.vectorize
,并且比它的性能高出约 20~30%。当然,它的性能不会像规定的 C 代码或甚至(我没有测试过)那样好numba
,但它可以成为比np.vectorize
f = lambda x, y: x * y
f_arr = np.frompyfunc(f, 2, 1)
vf = np.vectorize(f)
arr = np.linspace(0, 1, 10000)
%timeit f_arr(arr, arr) # 307ms
%timeit vf(arr, arr) # 450ms
我也测试了更大的样本,改进是成比例的。另请参阅此处的文档
解决方案 7:
编辑: 原始答案具有误导性, np.sqrt
直接应用于数组,仅产生很小的开销。
在多维情况下,如果你想应用对一维数组进行操作的内置函数,numpy.apply_along_axis是一个不错的选择,也适用于来自 numpy 和 scipy 的更复杂的函数组合。
先前的误导性陈述:
添加方法:
def along_axis(x):
return np.apply_along_axis(f, 0, x)
perfplot 代码给出的性能结果接近np.sqrt
。
解决方案 8:
我相信在较新版本(我使用 1.13)的 numpy 中,您可以通过将 numpy 数组传递给为标量类型编写的函数来简单地调用该函数,它会自动将函数调用应用于 numpy 数组上的每个元素并返回另一个 numpy 数组
>>> import numpy as np
>>> squarer = lambda t: t ** 2
>>> x = np.array([1, 2, 3, 4, 5])
>>> squarer(x)
array([ 1, 4, 9, 16, 25])
解决方案 9:
正如这篇文章中提到的,只需使用如下生成器表达式:
numpy.fromiter((<some_func>(x) for x in <something>),<dtype>,<size of something>)
解决方案 10:
以上所有答案都比较好,但如果您需要使用自定义函数进行映射,并且您有numpy.ndarray
,那么您需要保留数组的形状。
我只比较了两个,但它将保留形状ndarray
。我使用了包含 100 万个条目的数组进行比较。在这里我使用 square 函数,它也是 numpy 内置的,并且性能大大提升,因为如果需要,您可以使用您选择的函数。
import numpy, time
def timeit():
y = numpy.arange(1000000)
now = time.time()
numpy.array([x * x for x in y.reshape(-1)]).reshape(y.shape)
print(time.time() - now)
now = time.time()
numpy.fromiter((x * x for x in y.reshape(-1)), y.dtype).reshape(y.shape)
print(time.time() - now)
now = time.time()
numpy.square(y)
print(time.time() - now)
输出
>>> timeit()
1.162431240081787 # list comprehension and then building numpy array
1.0775556564331055 # from numpy.fromiter
0.002948284149169922 # using inbuilt function
在这里您可以清楚地看到,numpy.fromiter
考虑到简单的方法,效果很好,如果有内置函数可用,请使用它。
解决方案 11:
使用numpy.fromfunction(function, shape, **kwargs)
请参阅“ https://docs.scipy.org/doc/numpy/reference/generated/numpy.fromfunction.html ”
解决方案 12:
一种还不是很常见但易于实现且快速的方法是 Zig / Ziglang
pip install ziglang
创建文件 ->zinptest.zig
export fn npprod(inarray: usize, outarray: usize, lenarray: usize) void {
const inarraydata: [*]u64 = @ptrFromInt(inarray);
var outarraydata: [*]u64 = @ptrFromInt(outarray);
for (0..lenarray) |i| {
outarraydata[i] = inarraydata[i] * inarraydata[i];
}
}
export fn npprod2(inarray: usize, outarray: usize, lenarray: usize) void {
const inarraydata: [*]u64 = @ptrFromInt(inarray);
var outarraydata: [*]u64 = @ptrFromInt(outarray);
for (0..lenarray) |i| {
outarraydata[i] = inarraydata[i] + 2 * inarraydata[i] * inarraydata[i] + 4 * inarraydata[i] * inarraydata[i] * inarraydata[i];
}
}
编写包装器代码
import subprocess
import os
import sys
import subprocess
import ctypes
import numpy as np
# pip install ziglang
def compile_dll():
subprocess.run(
[
sys.executable,
"-m",
"ziglang",
"build-lib",
"zinptest.zig",
"-dynamic",
"-O",
"ReleaseFast",
],
shell=True,
env=os.environ,
cwd=this_folder,
)
def zigproduct(a):
out = np.empty_like(a)
inaddress = a.ctypes._arr.__array_interface__["data"][0] # raw memory address, take care! (data needs to be aligned correcty)
outaddress = out.ctypes._arr.__array_interface__["data"][0]
lena = np.prod(a.shape)
zigprod(inaddress, outaddress, lena)
return out
def zigproduct2(a):
out = np.empty_like(a)
inaddress = a.ctypes._arr.__array_interface__["data"][0]
outaddress = out.ctypes._arr.__array_interface__["data"][0]
lena = np.prod(a.shape)
zigprod2(inaddress, outaddress, lena)
return out
def f(x):
return x + 2 * x * x + 4 * x * x * x
this_folder = os.path.dirname(__file__)
zigdll = os.path.normpath(os.path.join(this_folder, "zinptest.dll"))
if not os.path.exists(zigdll):
compile_dll()
# first zig function (**2)
zigdllloaded = ctypes.cdll.LoadLibrary(zigdll)
zigprod = zigdllloaded.npprod
zigprod.argtypes = [ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64]
zigprod.restype = None
# second zig function (x + 2 * x * x + 4 * x * x * x)
zigprod2 = zigdllloaded.npprod2
zigprod2.argtypes = [ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64]
zigprod2.restype = None
a = np.arange(100000000, dtype=np.uint64)
对结果感到满意
# Np built-in fuctions are highly optimized, not much to improve here
# In [3]: %timeit a**2
# 213 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
# In [4]: %timeit zigproduct(a)
# 215 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
# Now using the function from @ead return x+2*x*x+4*x*x*x
# In [1]: %timeit zigproduct2(a)
# 206 ms ± 606 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# In [2]: %timeit f(a)
# 1.66 s ± 17.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# zigproduct2 is 8 times faster, even faster than zigproduct (God knows why?!?)
# Results are the same
# In[3]: np.all(zigproduct2(a) == f(a))
# Out[3]: True
结论
毫无疑问,这是一种具有美好未来的语言,如果您执行需要更少内存写入访问的操作(np.where / np.argwhere),Zig 的速度会更快!
更多优势
Zig 编译器还可以编译 C 代码,而无需更改上面看到的命令:只需创建一个名为“zinptest.c”的文件并将其用作替代品(npprod2 为 216 毫秒)在我看来,直接使用 C 代码比解密 Numba 奇怪的错误消息更容易。如果您需要处理尺寸,请使用 strides 或 a.shape 属性和 / 和 % 操作
void npprod(unsigned long long inarray, unsigned long long outarray,
unsigned long long lenarray) {
unsigned long long *inarraydata = (unsigned long long *)inarray;
unsigned long long *outarraydata = (unsigned long long *)outarray;
for (int i = 0; i < lenarray; i++) {
outarraydata[i] = inarraydata[i] * inarraydata[i];
}
}
void npprod2(unsigned long long inarray, unsigned long long outarray,
unsigned long long lenarray) {
unsigned long long *inarraydata = (unsigned long long *)inarray;
unsigned long long *outarraydata = (unsigned long long *)outarray;
for (int i = 0; i < lenarray; i++) {
outarraydata[i] = inarraydata[i] + 2 * inarraydata[i] * inarraydata[i] +
4 * inarraydata[i] * inarraydata[i] * inarraydata[i];
}
}
有趣的是,最明显的解决方案——如果你想要拥有 C 速度,就使用 C——在过去 8 年里一直没有被提及。C 不是巫术,大多数时候,尤其是在使用 NumPy 数组(100% C 数组)时,它是最简单、最快的解决方案。而且使用 Zig 编译 C 从未如此简单。
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 项目管理必备:盘点2024年13款好用的项目管理软件