如何使用 python + NumPy/SciPy 计算滚动/移动平均值?

2024-12-23 08:43:00
admin
原创
59
摘要:问题描述:numpy/scipy 上似乎没有简单计算移动平均值的函数,从而导致复杂的解决方案。我的问题有两个:使用 numpy 实现移动平均的最简单方法是什么(正确)?由于这看起来并不简单而且容易出错,那么有什么理由不将电池包含在这种情况下呢?解决方案 1:如果您只是想要一个简单的非加权移动平均值,您可以轻松...

问题描述:

numpy/scipy 上似乎没有简单计算移动平均值的函数,从而导致复杂的解决方案。

我的问题有两个:

  • 使用 numpy 实现移动平均的最简单方法是什么(正确)?

  • 由于这看起来并不简单而且容易出错,那么有什么理由不将电池包含在这种情况下呢?


解决方案 1:

如果您只是想要一个简单的非加权移动平均值,您可以轻松地使用它来实现np.cumsum,这可能 比基于 FFT 的方法更快

编辑纠正了 Bean 在代码中发现的一个错误索引。编辑

def moving_average(a, n=3):
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n
    
>>> a = np.arange(20)
>>> moving_average(a)
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,
        12.,  13.,  14.,  15.,  16.,  17.,  18.])
>>> moving_average(a, n=4)
array([  1.5,   2.5,   3.5,   4.5,   5.5,   6.5,   7.5,   8.5,   9.5,
        10.5,  11.5,  12.5,  13.5,  14.5,  15.5,  16.5,  17.5])

所以我猜答案是:它确实很容易实现,而且 numpy 也许已经因专门的功能而有点臃肿了。

解决方案 2:

实现此目的的一种简单方法是使用np.convolve。这背后的想法是利用离散卷积的计算方式并使用它来返回滚动平均值np.ones。这可以通过与长度等于我们想要的滑动窗口长度的序列进行卷积来实现。

为了做到这一点,我们可以定义以下函数:

def moving_average(x, w):
    return np.convolve(x, np.ones(w), 'valid') / w

此函数将对序列x和长度为 的 序列进行卷积w。请注意,选择,mode以便valid仅对序列完全重叠的点给出卷积乘积。


一些例子:

x = np.array([5,3,8,10,2,1,5,1,0,2])

对于具有长度窗口的移动平均数,2我们将得到:

moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])

对于长度为的窗口4

moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2.  ])

怎么convolve運作?

让我们更深入地了解离散卷积的计算方式。以下函数旨在复制np.convolve计算输出值的方式:

def mov_avg(x, w):
    for m in range(len(x)-(w-1)):
        yield sum(np.ones(w) * x[m:m+w]) / w 

对于上面同样的例子,结果如下:

list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]

因此,每一步所做的就是取 1 数组和当前窗口之间的内积。在这种情况下,乘以np.ones(w)是多余的,因为我们直接取sum序列的 。

下面是计算第一个输出的示例,以便更清楚一些。假设我们想要一个窗口w=4

[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5

并且以下输出将被计算为:

  [1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75

依此类推,完成所有重叠后,返回序列的移动平均值。

解决方案 3:

NumPy 缺乏特定领域的特定功能,这可能是由于核心团队的纪律性和对 NumPy 首要指令的忠诚:提供 N 维数组类型,以及用于创建和索引这些数组的函数。与许多基础目标一样,这个目标并不小,而 NumPy 做得非常出色。

更大的SciPy包含更大的领域特定库集合(SciPy 开发人员称之为子包) - 例如数值优化(优化)、信号处理(信号)和积分微积分(积分)。

我猜测您想要的函数至少在 SciPy 子包中的一个中(可能是scipy.signal );不过,我会首先在SciPy scikits集合中查找,确定相关的 scikit,然后在那里寻找感兴趣的函数。

Scikits是基于 NumPy/SciPy 独立开发的软件包,针对特定的技术领域(例如scikits-imagescikits-learn等)。其中几个(尤其是用于数值优化的超棒OpenOpt )早在选择归入相对较新的scikits类别之前就已经是备受推崇的成熟项目。Scikits主页上列出了大约 30 个这样的scikits,但其中至少有几个不再处于积极开发中。

遵循这个建议会让你找到scikits-timeseries;但是,该包不再处于积极开发中;实际上,据我所知, Pandas已经成为事实上的基于 NumPy的时间序列库。

Pandas有几个函数可用于计算移动平均值;其中最简单的可能是rolling_mean,其使用方式如下:

>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP

>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')

>>> # the data:
>>> x = NP.arange(0, t.shape[0])

>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)

现在,只需调用函数rolling_mean并传入 Series 对象和窗口大小,在下面的示例中为10 天

>>> d_mva = PD.rolling_mean(D, 10)

>>> # d_mva is the same size as the original Series
>>> d_mva.shape
    (1096,)

>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
    2010-01-01         NaN
    2010-01-02         NaN
    2010-01-03         NaN

验证它是否有效——例如,将原始系列中的值 10 - 15 与使用滚动平均值平滑的新系列进行比较

>>> D[10:15]
     2010-01-11    2.041076
     2010-01-12    2.041076
     2010-01-13    2.720585
     2010-01-14    2.720585
     2010-01-15    3.656987
     Freq: D

>>> d_mva[10:20]
      2010-01-11    3.131125
      2010-01-12    3.035232
      2010-01-13    2.923144
      2010-01-14    2.811055
      2010-01-15    2.785824
      Freq: D

在 Pandas 文档中,rolling_mean 函数和大约十几个其他函数被非正式地归类在移动窗口函数这一标题下;Pandas 中的第二组相关函数被称为指数加权函数(例如,ewma,它计算指数移动加权平均值)。第二组函数未包含在第一组(移动窗口函数)中,可能是因为指数加权变换不依赖于固定长度的窗口

解决方案 4:

这里列出了各种方法,以及一些基准。最好的方法是使用其他库的优化代码的版本。这种bottleneck.move_mean方法可能是最好的。这种scipy.convolve方法也非常快、可扩展,语法和概念上都很简单,但对于非常大的窗口值来说,扩展性不好。numpy.cumsum如果你需要一种纯粹的方法,这种方法就很好numpy

注意:其中一些(例如bottleneck.move_mean)不居中,并且会使您的数据发生偏移。

import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time

def rollavg_direct(a,n): 
    'Direct "for" loop'
    assert n%2==1
    b = a*0.0
    for i in range(len(a)) :
        b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
    return b

def rollavg_comprehension(a,n):
    'List comprehension'
    assert n%2==1
    r,N = int(n/2),len(a)
    return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)]) 

def rollavg_convolve(a,n):
    'scipy.convolve'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]  

def rollavg_convolve_edges(a,n):
    'scipy.convolve, edge handling'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')  

def rollavg_cumsum(a,n):
    'numpy.cumsum'
    assert n%2==1
    cumsum_vec = np.cumsum(np.insert(a, 0, 0)) 
    return (cumsum_vec[n:] - cumsum_vec[:-n]) / n

def rollavg_cumsum_edges(a,n):
    'numpy.cumsum, edge handling'
    assert n%2==1
    N = len(a)
    cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0)) 
    d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))  
    return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d

def rollavg_roll(a,n):
    'Numpy array rolling'
    assert n%2==1
    N = len(a)
    rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
    return a[rolling_idx].mean(axis=0)[n-1:] 

def rollavg_roll_edges(a,n):
    # see https://stackoverflow.com/questions/42101082/fast-numpy-roll
    'Numpy array rolling, edge handling'
    assert n%2==1
    a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
    m = a.shape[1]
    idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
    out = a[np.arange(-n//2,n//2)[:,None], idx]
    d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
    return (out.sum(axis=0)/d)[n//2:]

def rollavg_pandas(a,n):
    'Pandas rolling average'
    return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()

def rollavg_bottlneck(a,n):
    'bottleneck.move_mean'
    return bn.move_mean(a, window=n, min_count=1)

N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve, 
        rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges, 
        rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]

print('Small window (n=3)')
%load_ext memory_profiler
for f in functions : 
    print('
'+f.__doc__+ ' : ')
    %timeit b=f(a,3)

print('
Large window (n=1001)')
for f in functions[0:-2] : 
    print('
'+f.__doc__+ ' : ')
    %timeit b=f(a,1001)

print('
Memory
')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] : 
    print('
'+f.__doc__+ ' : ')
    %memit b=f(a,3)

print('
Large window (n=1001)')
for f in functions[2:-2] : 
    print('
'+f.__doc__+ ' : ')
    %memit b=f(a,1001)

定时,小窗口(n=3)

Direct "for" loop : 

4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

scipy.convolve, edge handling : 
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum : 
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Numpy array rolling : 
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Numpy array rolling, edge handling : 
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

时间,大窗口 (n=1001)

Direct "for" loop : 
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

scipy.convolve, edge handling : 
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

numpy.cumsum : 
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

记忆,小窗口 (n=3)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler

scipy.convolve : 
peak memory: 362.66 MiB, increment: 73.61 MiB

scipy.convolve, edge handling : 
peak memory: 510.24 MiB, increment: 221.19 MiB

numpy.cumsum : 
peak memory: 441.81 MiB, increment: 152.76 MiB

numpy.cumsum, edge handling : 
peak memory: 518.14 MiB, increment: 228.84 MiB

Pandas rolling average : 
peak memory: 449.34 MiB, increment: 160.02 MiB

bottleneck.move_mean : 
peak memory: 374.17 MiB, increment: 75.54 MiB

Numpy array rolling : 
peak memory: 661.29 MiB, increment: 362.65 MiB

Numpy array rolling, edge handling : 
peak memory: 1111.25 MiB, increment: 812.61 MiB

记忆,大窗口 (n=1001)

scipy.convolve : 
peak memory: 370.62 MiB, increment: 71.83 MiB

scipy.convolve, edge handling : 
peak memory: 521.98 MiB, increment: 223.18 MiB

numpy.cumsum : 
peak memory: 451.32 MiB, increment: 152.52 MiB

numpy.cumsum, edge handling : 
peak memory: 527.51 MiB, increment: 228.71 MiB

Pandas rolling average : 
peak memory: 451.25 MiB, increment: 152.50 MiB

bottleneck.move_mean : 
peak memory: 374.64 MiB, increment: 75.85 MiB

解决方案 5:

从 开始Numpy 1.20sliding_window_view提供了一种在元素窗口中滑动/滚动的方法。然后您可以分别对各个窗口进行平均。

例如,对于4-element 窗口:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])

注意的中间结果sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5,  3,  8, 10],
#        [ 3,  8, 10,  2],
#        [ 8, 10,  2,  1],
#        [10,  2,  1,  5],
#        [ 2,  1,  5,  1],
#        [ 1,  5,  1,  0],
#        [ 5,  1,  0,  2]])

解决方案 6:

这个使用 Pandas 的答案是从上面改编而来的,因为rolling_mean它不再是 Pandas 的一部分

# the recommended syntax to import pandas
import pandas as pd
import numpy as np

# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')

# the data:
x = np.arange(0, t.shape[0])

# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)

现在,只需在具有窗口大小的数据框上调用该函数rolling,在下面的示例中为 10 天。

d_mva10 = D.rolling(10).mean()

# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]

2010-01-01    NaN
2010-01-02    NaN
2010-01-03    NaN
2010-01-04    NaN
2010-01-05    NaN
2010-01-06    NaN
2010-01-07    NaN
2010-01-08    NaN
2010-01-09    NaN
2010-01-10    4.5
2010-01-11    5.5
Freq: D, dtype: float64

解决方案 7:

我觉得使用瓶颈可以轻松解决这个问题

请参阅下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

这给出了沿每个轴移动的平均值。

  • “mm” 是“a”的移动平均值。

  • “窗口”是考虑移动平均值的最大条目数。

  • “min_count” 是考虑移动平均值的最小条目数(例如,对于第一个元素或数组具有 nan 值)。

好的部分是 Bottleneck 有助于处理 nan 值并且它也非常高效。

解决方案 8:

如果有人需要一个简单的解决方案,这里有一个

def moving_average(a,n):
    N=len(a)
    return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])

您可以通过添加步骤参数来更改窗口之间的重叠np.arange(0,N-n+1,step)

解决方案 9:

您还可以编写自己的Python C 扩展。

这当然不是最简单的方法,但与用作np.cumsum构建块相比,它可以让你运行得更快,并且内存效率更高。

// moving_average.c
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <Python.h>
#include <numpy/arrayobject.h>

static PyObject *moving_average(PyObject *self, PyObject *args) {
    PyObject *input;
    int64_t window_size;
    PyArg_ParseTuple(args, "Ol", &input, &window_size);
    if (PyErr_Occurred()) return NULL;
    if (!PyArray_Check(input) || !PyArray_ISNUMBER((PyArrayObject *)input)) {
        PyErr_SetString(PyExc_TypeError, "First argument must be a numpy array with numeric dtype");
        return NULL;
    }
    
    int64_t input_size = PyObject_Size(input);
    double *input_data;
    if (PyArray_AsCArray(&input, &input_data, (npy_intp[]){ [0] = input_size }, 1, PyArray_DescrFromType(NPY_DOUBLE)) != 0) {
        PyErr_SetString(PyExc_TypeError, "Failed to simulate C array of type double");
        return NULL;
    }
    
    int64_t output_size = input_size - window_size + 1;
    PyObject *output = PyArray_SimpleNew(1, (npy_intp[]){ [0] = output_size }, NPY_DOUBLE);
    double *output_data = PyArray_DATA((PyArrayObject *)output);
    
    double cumsum_before = 0;
    double cumsum_after = 0;
    for (int i = 0; i < window_size; ++i) {
        cumsum_after += input_data[i];
    }
    for (int i = 0; i < output_size - 1; ++i) {
        output_data[i] = (cumsum_after - cumsum_before) / window_size;
        cumsum_after += input_data[i + window_size];
        cumsum_before += input_data[i];
    }
    output_data[output_size - 1] = (cumsum_after - cumsum_before) / window_size;

    return output;
}

static PyMethodDef methods[] = {
    {
        "moving_average", 
        moving_average, 
        METH_VARARGS, 
        "Rolling mean of numpy array with specified window size"
    },
    {NULL, NULL, 0, NULL}
};

static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "moving_average",
    "C extension for finding the rolling mean of a numpy array",
    -1,
    methods
};

PyMODINIT_FUNC PyInit_moving_average(void) {
    PyObject *module = PyModule_Create(&moduledef);
    import_array();
    return module;
}
  • METH_VARARGS指定该方法仅接受位置参数。PyArg_ParseTuple允许您解析这些位置参数。

  • 通过使用PyErr_SetString并从方法中返回 NULL,您可以从 C 扩展向 Python 解释器发出信号,表明发生了异常。

  • PyArray_AsCArray允许您的方法在输入数组 dtype、对齐、数组是否为C 连续(请参阅“numpy 1d 数组可以不连续吗?”)等方面具有多态性,而无需创建数组的副本。如果您改用PyArray_DATA,则需要自己处理这个问题。

  • PyArray_SimpleNew允许您创建一个新的 numpy 数组。这类似于使用np.empty。该数组将不会被初始化,并且可能包含非确定性垃圾,如果您忘记覆盖它,这可能会让您感到惊讶。

构建 C 扩展

# setup.py
from setuptools import setup, Extension
import numpy

setup(
  ext_modules=[
    Extension(
      'moving_average',
      ['moving_average.c'],
      include_dirs=[numpy.get_include()]
    )
  ]
)

# python setup.py build_ext --build-lib=.

基准

import numpy as np

# Our compiled C extension:
from moving_average import moving_average as moving_average_c

# Answer by Jaime using npcumsum
def moving_average_cumsum(a, n) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

# Answer by yatu using np.convolve
def moving_average_convolve(a, n):
    return np.convolve(a, np.ones(n), 'valid') / n

a = np.random.rand(1_000_000)
print('window_size = 3')
%timeit moving_average_c(a, 3)
%timeit moving_average_cumsum(a, 3)
%timeit moving_average_convolve(a, 3)

print('
window_size = 100')
%timeit moving_average_c(a, 100)
%timeit moving_average_cumsum(a, 100)
%timeit moving_average_convolve(a, 100)
window_size = 3
958 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
4.52 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
809 µs ± 463 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

window_size = 100
977 µs ± 937 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
6.16 ms ± 19.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
14.2 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

解决方案 10:

如果您想仔细注意边缘条件(仅根据边缘处的可用元素计算平均值),以下函数就可以解决问题。

import numpy as np

def running_mean(x, N):
    out = np.zeros_like(x, dtype=np.float64)
    dim_len = x.shape[0]
    for i in range(dim_len):
        if N%2 == 0:
            a, b = i - (N-1)//2, i + (N-1)//2 + 2
        else:
            a, b = i - (N-1)//2, i + (N-1)//2 + 1

        #cap indices to min and max indices
        a = max(0, a)
        b = min(dim_len, b)
        out[i] = np.mean(x[a:b])
    return out

>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])

>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])

解决方案 11:

所有答案似乎都集中在预先计算列表的情况上。对于实际运行的用例,数字一个接一个地出现,这里有一个简单的类,它提供对最后 N 个值求平均值的服务:

import numpy as np
class RunningAverage():
    def __init__(self, stack_size):
        self.stack = [0 for _ in range(stack_size)]
        self.ptr = 0
        self.full_cycle = False
    def add(self,value):
        self.stack[self.ptr] = value
        self.ptr += 1
        if self.ptr == len(self.stack):
            self.full_cycle = True
            self.ptr = 0
    def get_avg(self):
        if self.full_cycle:
            return np.mean(self.stack)
        else:
            return np.mean(self.stack[:self.ptr])

用法:

N = 50  # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
    value = <my computation>
    run_avg.add(value)
    if i % 20 ==0: # print once in 20 iters:
        print(f'the average value is {run_avg.get_avg()}')

解决方案 12:

另一个非常简单(可能是最简单的)的选项是使用该scipy.signal.savgol_filter方法,因为移动平均值只是一个具有 0 阶多项式的 Savitzky-Golay 滤波器。因此,如果您有一个数组x并想使用长度为 5 的窗口对其进行平滑,则可以使用:

from scipy.signal import savgol_filter

y = savgol_filter(x, window_length=5, polyorder=0)

解决方案 13:

我扩展了argentum2f和Vittorio Carmignani的答案。以下是 numba 实现,它击败了迄今为止最知名的瓶颈方法:

import numba
import numpy as np
import time as time
import bottleneck as bn

@numba.njit(fastmath=True)
def rolling_moving_average(arr: np.ndarray, n: int = 2):
    if n <= 0:
        raise ValueError("n must be greater than 0")
    if n > len(arr):
        raise ValueError("n must be less than or equal to the length of the array")

    result = np.empty(len(arr) - n + 1, dtype=arr.dtype)
    sum_window = np.sum(arr[:n])

    mult=1/n

    result[0] = sum_window*mult

    for i in range(1, len(arr) - n + 1):
        sum_window += arr[i + n - 1] - arr[i - 1]
        result[i] = sum_window *mult

    return result

def rollavg_bottlneck(a,n):
    'bottleneck.move_mean'
    return bn.move_mean(a, window=n, min_count=n)

N = 10**6
a = np.random.rand(N)
functions = [rollavg_bottlneck, rolling_moving_average]

for f in functions:
    test_val=f(a,3) # potential prewarm for jit-compiled funcs

for window_size in (3,1001):
    print(f'Window size={window_size}')
    for f in functions:
        print('
'+str(f.__name__)+ ': ')
        %timeit b=f(a,window_size)


print(f"{rollavg_bottlneck(np.arange(10),3)} vs {rolling_moving_average(np.arange(10),3)}")

窗口大小=3

rollavg_bottlneck:每次循环 6.75 毫秒 ± 27.3 微秒(7 次运行的平均值 ± 标准差,每次 100 次循环)

rolling_moving_average:每次循环 5.22 毫秒 ± 53.8 微秒(7 次运行的平均值 ± 标准差,每次 100 次循环)

窗口大小=1001

rollavg_bottlneck:每次循环 6.43 毫秒 ± 42.5 微秒(7 次运行的平均值 ± 标准差,每次 100 次循环)

rolling_moving_average:每次循环 5.25 毫秒 ± 60.9 微秒(7 次运行的平均值 ± 标准差,每次 100 次循环)

[楠楠1.2.3.4.5.6.7.8.] vs [1 2 3 4 5 6 7 8]

请注意,输出在对齐方面略有不同。

解决方案 14:

talib包含一个简单的移动平均工具,以及其他类似的平均工具(即指数移动平均)。下面将该方法与其他一些解决方案进行比较。


%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

需要注意的是,实数必须包含元素dtype = float。否则会引发以下错误

例外:real 不是 double

解决方案 15:

我要么使用已接受答案的解决方案(略作修改以使输出与输入的长度相同),要么使用pandas另一个答案的评论中提到的版本。我在这里总结了两者,并提供了一个可重现的示例以供将来参考:

import numpy as np
import pandas as pd

def moving_average(a, n):
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret / n

def moving_average_centered(a, n):
    return pd.Series(a).rolling(window=n, center=True).mean().to_numpy()

A = [0, 0, 1, 2, 4, 5, 4]
print(moving_average(A, 3))    
# [0.         0.         0.33333333 1.         2.33333333 3.66666667 4.33333333]
print(moving_average_centered(A, 3))
# [nan        0.33333333 1.         2.33333333 3.66666667 4.33333333 nan       ]

解决方案 16:

通过将下面的解决方案与使用 numpy 的 cumsum 的解决方案进行比较,这个解决方案几乎只花费了一半的时间。这是因为它不需要遍历整个数组来执行 cumsum 然后执行所有减法。此外,如果数组很大并且数字很大(可能溢出), cumsum可能会很“危险” 。当然,这里也存在危险,但至少只将必要的数字相加。

def moving_average(array_numbers, n):
    if n > len(array_numbers):
      return []
    temp_sum = sum(array_numbers[:n])
    averages = [temp_sum / float(n)]
    for first_index, item in enumerate(array_numbers[n:]):
        temp_sum += item - array_numbers[first_index]
        averages.append(temp_sum / float(n))
    return averages

解决方案 17:

移动平均线

迭代器方法

  • 在 i 处反转数组,然后简单取从 i 到 n 的平均值。

  • 使用列表推导动态生成迷你数组。

x = np.random.randint(10, size=20)

def moving_average(arr, n):
    return [ (arr[:i+1][::-1][:n]).mean() for i, ele in enumerate(arr) ]
d = 5

moving_average(x, d)

张量卷积

moving_average = np.convolve(x, np.ones(d)/d, mode='valid')

解决方案 18:

我实际上想要一种与接受的答案略有不同的行为。我正在为管道构建移动平均特征提取器sklearn,因此我要求移动平均的输出具有与输入相同的维度。我想要的是移动平均假设序列保持不变,即窗口为 2 的移动平均[1,2,3,4,5]将给出[1.5,2.5,3.5,4.5,5.0]

对于列向量(我的用例),我们得到

def moving_average_col(X, n):
  z2 = np.cumsum(np.pad(X, ((n,0),(0,0)), 'constant', constant_values=0), axis=0)
  z1 = np.cumsum(np.pad(X, ((0,n),(0,0)), 'constant', constant_values=X[-1]), axis=0)
  return (z1-z2)[(n-1):-1]/n

对于数组

def moving_average_array(X, n):
  z2 = np.cumsum(np.pad(X, (n,0), 'constant', constant_values=0))
  z1 = np.cumsum(np.pad(X, (0,n), 'constant', constant_values=X[-1]))
  return (z1-z2)[(n-1):-1]/n

当然,我们不必假设填充值为常数,但在大多数情况下这样做就足够了。

解决方案 19:

for i in range(len(Data)):
    Data[i, 1] = Data[i-lookback:i, 0].sum() / lookback

尝试一下这段代码。我认为它更简单,而且能完成工作。回顾是移动平均线的窗口。

Data[i-lookback:i, 0].sum()我已经引用的0数据集中的第一列,但如果您有多个列,您可以放置​​任何您喜欢的列。

解决方案 20:

这是使用 numba 的快速实现(注意类型)。请注意,它确实包含移位的 nan。

import numpy as np
import numba as nb

@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
        fastmath=True,nopython=True)
def moving_average( array, window ):    
    ret = np.cumsum(array)
    ret[window:] = ret[window:] - ret[:-window]
    ma = ret[window - 1:] / window
    n = np.empty(window-1); n.fill(np.nan)
    return np.concatenate((n.ravel(), ma.ravel())) 

解决方案 21:

如果您已经有一个已知大小的数组

import numpy as np                                         
M=np.arange(12)
                                                               
avg=[]                                                         
i=0
while i<len(M)-2: #for n point average len(M) - (n-1)
        avg.append((M[i]+M[i+1]+M[i+2])/3) #n is denominator                       
        i+=1     
                                                                                                    
print(avg)
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   984  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   0  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   0  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   0  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   0  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用