移动平均数

2024-12-19 09:24:00
admin
原创
133
摘要:问题描述:是否有一个 SciPy 函数或 NumPy 函数或 Python 模块可以根据特定窗口计算一维数组的运行平均值?解决方案 1:注意:更有效的解决方案可能包括scipy.ndimage.uniform_filter1d(参见这个答案),或者使用包括talib在内的较新的库talib.MA。使用np.c...

问题描述:

是否有一个 SciPy 函数或 NumPy 函数或 Python 模块可以根据特定窗口计算一维数组的运行平均值?


解决方案 1:

注意:更有效的解决方案可能包括scipy.ndimage.uniform_filter1d(参见这个答案),或者使用包括talib在内的较新的库talib.MA


使用np.convolve

np.convolve(x, np.ones(N)/N, mode='valid')

解释

移动平均值是卷积数学运算的一个示例。对于移动平均值,您可以沿输入滑动一个窗口并计算窗口内容的平均值。对于离散的一维信号,卷积是相同的,只不过您计算的不是平均值,而是任意的线性组合,即,将每个元素乘以相应的系数,然后将结果相加。这些系数(窗口中的每个位置一个)有时称为卷积。N 个值的算术平均值为(x_1 + x_2 + ... + x_N) / N,因此相应的内核为(1/N, 1/N, ..., 1/N),这正是我们使用得到的np.ones(N)/N

参数mode指定np.convolve如何处理边缘。我valid在这里选择了模式,因为我认为这是大多数人期望运行平均值的工作方式,但您可能有其他优先事项。以下图表说明了模式之间的差异:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

运行平均卷积模式

解决方案 2:

高效的解决方案

卷积比直接方法好得多,但(我猜)它使用 FFT,因此速度很慢。然而,特别是对于计算运行平均值,以下方法效果很好

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

要检查的代码

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

注意,两种方法是等效numpy.allclose(result1, result2)True。N越大,时间差异越大。

警告:尽管 cumsum 速度更快,但浮点错误会增加,这可能会导致结果无效/不正确/不可接受

评论指出了这里的浮点错误问题,但我在答案中使其更加明显。

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
  • 累积的点越多,浮点误差就越大(因此 1e5 点是显而易见的,1e6 点更重要,超过 1e6 点,你可能需要重置累加器)

  • 你可以通过使用作弊np.longdouble,但你的浮点错误仍然会变得显著,对于相对较大的点数(大约> 1e5,但取决于你的数据)

  • 你可以绘制误差图并看到它增长得相对较快

  • 卷积解决方案速度较慢,但​​没有浮点精度损失

  • uniform_filter1d 解决方案比此 cumsum 解决方案更快,并且不存在浮点精度损失

解决方案 3:

您可以使用scipy.ndimage.uniform_filter1d:

import numpy as np
from scipy.ndimage import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d

  • 给出具有相同 numpy 形状(即点数)的输出

  • 允许多种方式处理边框,这'reflect'是默认的,但就我而言,我宁愿'nearest'

它也相当快(比上面给出的 cumsum 方法np.convolve快近 50 倍,快 2-5 倍):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

这里有 3 个函数可以让您比较不同实现的错误/速度:

from __future__ import division
import numpy as np
import scipy.ndimage as ndi
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndi.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]

解决方案 4:

更新:下面的示例显示了pandas.rolling_mean在最新版本的 pandas 中已被删除的旧函数。该函数调用的现代等效函数将使用 pandas.Series.rolling:

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

pandas比 NumPy 或 SciPy 更适合这个。它的函数rolling_mean可以方便地完成这项工作。当输入是数组时,它还会返回一个 NumPy 数组。

任何自定义的纯 Python 实现在性能上都很难被超越rolling_mean。以下是针对两个建议解决方案的示例性能:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

关于如何处理边缘值也有一些不错的选择。

解决方案 5:

您可以使用以下方法计算运行平均值:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

但是它很慢。

幸运的是,numpy 包含一个卷积函数,我们可以使用它来加快速度。运行平均值相当于x与一个长向量进行卷积N,所有成员都等于1/N。卷积的 numpy 实现包括起始瞬态,因此您必须删除前 N-1 个点:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

在我的计算机上,快速版本的速度快 20-30 倍,具体取决于输入向量的长度和平均窗口的大小。

请注意,卷积确实包含一种'same'模式,它似乎应该解决启动瞬态问题,但它将其分割在开始和结束之间。

解决方案 6:

对于一个简短、快速的解决方案,可以在一个循环中完成整个操作,没有任何依赖关系,下面的代码效果很好。

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

解决方案 7:

或用于计算的 Python 模块

在 Tradewave.net 的测试中,TA-lib 总是获胜:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

结果:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

在此处输入图片描述

解决方案 8:

有关现成的解决方案,请参阅https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html。它提供具有窗口类型的运行平均值flat。请注意,这比简单的自制卷积方法更复杂一些,因为它尝试通过反射来处理数据开头和结尾的问题(这在您的情况下可能有效也可能无效……)。

首先,您可以尝试:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

解决方案 9:

我知道这是一个老问题,但这里有一个不使用任何额外数据结构或库的解决方案。它与输入列表的元素数量成线性关系,我想不出任何其他方法可以使其更高效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。

注意:使用 numpy 数组而不是列表会更快,但我想消除所有依赖项。也可以通过多线程执行来提高性能

该函数假定输入列表是一维的,所以要小心。

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

例子

假设我们有一个列表data = [ 1, 2, 3, 4, 5, 6 ],我们想要计算周期为 3 的滚动平均值,并且您还想要一个与输入列表大小相同的输出列表(这是最常见的情况)。

第一个元素的索引为 0,因此应根据索引为 -2、-1 和 0 的元素计算滚动平均值。显然,我们没有数据 [-2] 和数据 [-1](除非您想使用特殊边界条件),因此我们假设这些元素为 0。这相当于对列表进行零填充,只不过我们实际上并不填充它,而只是跟踪需要填充的索引(从 0 到 N-1)。

因此,对于前 N 个元素,我们只需在累加器中不断将这些元素相加。

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

从元素 N+1 开始,简单的累积不起作用。result[3] = (2 + 3 + 4)/3 = 3但这与我们预期的不同(sum + 4)/3 = 3.333

计算正确值的方法是从中减去data[0] = 1sum+4从而得到 sum + 4 - 1 = 9

发生这种情况是因为目前sum = data[0] + data[1] + data[2],但对于每个来说也是如此i >= N,因为在减法之前sumdata[i-N] + ... + data[i-2] + data[i-1]

解决方案 10:

我觉得可以使用瓶颈来优雅地解决这个问题

请参阅下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • “mm” 是“a”的移动平均值。

  • “窗口”是考虑移动平均值的最大条目数。

  • “min_count” 是考虑移动平均值的最小条目数(例如,对于前几个元素或数组具有 nan 值)。

好的部分是 Bottleneck 有助于处理 nan 值并且它也非常高效。

解决方案 11:

我还没有检查过它的速度有多快,但你可以尝试:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

解决方案 12:

与 numpy 或 scipy 相比,我建议使用 pandas 来更快地完成此操作:

df['data'].rolling(3).mean()

这将取“数据”列 3 个周期的移动平均值 (MA)。您还可以计算移位版本,例如,可以轻松计算排除当前单元格(向后移位一个)的版本:

df['data'].shift(periods=1).rolling(3).mean()

解决方案 13:

Python标准库解决方案

此生成器函数接受一个可迭代对象和一个窗口大小N ,并得出窗口内当前值的平均值。它使用deque,这是一种类似于列表的数据结构,但针对两端的pop快速修改( ,append)进行了优化。

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        

该函数的实际运行情况如下:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

解决方案 14:

虽然有点晚了,但我已经创建了自己的函数,它不会用零来包围端点或填充,然后也会用来求平均值。更棒的是,它还会在线性间隔的点处重新采样信号。随意自定义代码以获得其他功能。

该方法是与归一化高斯核的简单矩阵乘法。

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.
    
    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array
    
    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    import numpy as np
    N_in = len(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

对添加了正态分布噪声的正弦信号进行简单用法:
在此处输入图片描述

解决方案 15:

另一种使用numpy或寻找移动平均线的方法pandas

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
    lambda a,b: b/a, 
    enumerate(itertools.accumulate(sample), 1))
)

将打印[2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

  • 2.0 = (2)/1

  • 4.0 = (2 + 6) / 2

  • 6.0 = (2 + 6 + 10) / 3

  • ...

解决方案 16:

上面有很多关于计算移动平均值的答案。我的答案增加了两个额外的功能:

  1. 忽略 nan 值

  2. 计算 N 个相邻值的平均值(不包括感兴趣的值本身)

第二个特征对于确定哪些值与总体趋势有一定差异特别有用。

我使用 numpy.cumsum,因为它是最省时的方法(参见上面 Alleo 的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

此代码仅适用于偶数 N。可以通过更改 padded_x 和 n_nan 的 np.insert 来调整奇数。

示例输出(原始为黑色,movavg 为蓝色):
原始数据(黑色)和每个值周围 10 个点的移动平均值(蓝色),不包括该值。nan 值将被忽略。

该代码可以轻松调整,以删除根据少于截止值 = 3 个非 nan 值计算出的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

原始数据(黑色)和移动平均值(蓝色),同时忽略任何少于 3 个非 nan 值的窗口

解决方案 17:

上面的一个答案中埋藏着mab的评论,其中有这种方法。 这是一个简单的移动平均线:bottleneck`move_mean`

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)

min_count是一个方便的参数,它基本上会取数组中该点之前的移动平均值。如果您不设置min_count,它将等于window,并且直到window点的所有内容都将为nan

解决方案 18:

使用@Aikude 的变量,我写了一行代码。

import numpy as np

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3

mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)

>>> [2.0, 3.0, 4.0, 5.0, 6.0]

解决方案 19:

所有上述解决方案都很糟糕,因为它们缺乏

  • 由于使用原生 Python 而非 Numpy 矢量化实现,因此速度更快,

  • 由于使用不当而导致的数值稳定性numpy.cumsum,或

  • 由于O(len(x) * w)卷积的实现而提高了速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(...)添加x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。这导致x[0:w].mean()

x[w+1] / w通过 cumsum,您将通过另外添加和减去来更新第二个平均值x[0] / w,从而得到x[1:w+1].mean()

如此持续,直至x[-w:].mean()达到。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

该解决方案是矢量化的O(m)、可读的和数值稳定的。

解决方案 20:

这个问题现在比 NeXuS 上个月写的时候还要老valid,但我喜欢他的代码处理边缘情况的方式。然而,因为它是一个“简单移动平均数”,所以它的结果落后于它们所应用的数据。我认为,以比 NumPy 的模式、same和更令人满意的方式处理边缘情况full可以通过将类似的方法应用于基于方法来实现convolution()

我的贡献使用中心移动平均值来使其结果与数据对齐。当可用的点数太少而无法使用全尺寸窗口时,移动平均值是从数组边缘的连续较小的窗口计算出来的。[实际上,是从连续较大的窗口计算出来的,但这是一个实现细节。]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

它相对较慢,因为它使用了convolve(),并且可能会被真正的 Pythonista 进行很多修饰,但是,我相信这个想法是正确的。

解决方案 21:

新的convolve配方已合并到 Python 3.10 中。

鉴于


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

代码

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

演示

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一种通用数学运算,可应用于移动平均数。其思想是,给定一些数据,将数据子集(窗口)作为“掩码”或“内核”在数据上滑动,对每个窗口执行特定的数学运算。对于移动平均数,内核是平均值:

在此处输入图片描述

您现在可以通过 使用此实现more_itertools.convolve
more_itertools是一个流行的第三方包;通过 安装> pip install more_itertools

解决方案 22:

通过阅读其他答案,我不认为这是问题所要求的,但我来这里是为了需要保持一个不断增长的值列表的移动平均值。

因此,如果您想要保留从某处(站点、测量设备等)获取的值列表以及最后n更新的值的平均值,则可以使用下面的代码,以最大限度地减少添加新元素的工作量:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

您可以使用以下方法进行测试:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

得出:

值及其平均值作为值的函数 #

解决方案 23:

为了教育目的,让我添加另外两个 Numpy 解决方案(比 cumsum 解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

使用的功能:as_strided、add.reduceat

解决方案 24:

仅使用 Python 标准库(内存高效)

只需给出使用标准库的另一个版本deque。令我惊讶的是,大多数答案都在使用pandasnumpy

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上我在 python 文档中找到了另一种实现

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

然而,在我看来,实现比它应该的要复杂一些。但它一定是在标准 Python 文档中,这是有原因的,有人能评论一下我的实现和标准文档吗?

解决方案 25:

那么移动平均滤波器怎么样?它也是一行代码,并且具有以下优点:如果您需要除矩形以外的其他东西,则可以轻松操纵窗口类型,即数组 a 的 N 长简单移动平均线:

lfilter(np.ones(N)/N, [1], a)[N:]

使用三角窗后:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

注意:我通常会将前 N 个样本视为假样本[N:],但这不是必要的,只是个人选择的问题。

解决方案 26:

我的解决方案基于维基百科的“简单移动平均线”。

from numba import jit
@jit
def sma(x, N):
    s = np.zeros_like(x)
    k = 1 / N
    s[0] = x[0] * k
    for i in range(1, N + 1):
        s[i] = s[i - 1] + x[i] * k
    for i in range(N, x.shape[0]):
        s[i] = s[i - 1] + (x[i] - x[i - N]) * k
    s = s[N - 1:]
    return s

与之前建议的解决方案相比,它比 scipy 最快的解决方案“uniform_filter1d”快两倍,并且具有相同的错误顺序。速度测试:

import numpy as np    
x = np.random.random(10000000)
N = 1000

from scipy.ndimage.filters import uniform_filter1d
%timeit uniform_filter1d(x, size=N)
95.7 ms ± 9.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit sma(x, N)
47.3 ms ± 3.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

误差对比:

np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - uniform_filter1d(x, size=N, mode='constant', origin=-(N//2))[:-(N-1)]))
8.604228440844963e-14
np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - sma(x, N)))
1.41886502547095e-13

解决方案 27:

虽然这里有这个问题的解决方案,但请看一下我的解决方案。它非常简单而且效果很好。

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

解决方案 28:

另一种解决方案是使用标准库和双端队列:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

解决方案 29:

如果您必须对非常小的数组(少于 200 个元素)重复执行此操作,我发现仅使用线性代数即可获得最快的结果。最慢的部分是设置乘法矩阵 y,您只需执行一次,但之后可能会更快。

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]

解决方案 30:

还有另一种方法,使用内置的 numpy 函数(np.lib.stride_tricks.sliding_window_view),但最终速度与(或略快)大致相同np.convolve

按照流行答案的结构,操作方式如下:

M = 10**6
x = np.random.random(M)
N = 10**3
result3 = np.lib.stride_tricks.sliding_window_view(x, N).mean(axis=-1)

这里,numpy 函数调用创建一个M矩阵N,然后mean(axis=-1)计算平均值。

作为一行代码它很不错(速度与np.convolve 此处相似),但比其他一些方法(例如np.cumsum 此处)要慢:

>>> t0 = time.time()
>>> result3 = np.lib.stride_tricks.sliding_window_view(x, N).mean(axis=-1)
>>> t1 = time.time()
>>> print(f'{t1-t0}')
0.4908254146575928

# the convolve solution
>>> t0 = time.time(); result1 = np.convolve(x, np.ones((N,))/N, mode='valid'); t1 = time.time(); print(f'{t1-t0}')
0.6075541973114014

# the cumsum solution
>>> t0 = time.time(); result2 = running_mean(x, N); t1 = time.time(); print(f'{t1-t0}')
0.04239177703857422

有关更多信息,请参阅numpy 文档numpy.lib.stride_tricks.sliding_window_view

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1572  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1355  
  信创产品在政府采购中的占比分析随着信息技术的飞速发展以及国家对信息安全重视程度的不断提高,信创产业应运而生并迅速崛起。信创,即信息技术应用创新,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。政府采购作为推动信创产业发展的重要力量,其对信创产品的采购占比情况备受关注。这不仅关系到信创产业的发展前...
信创和国产化的区别   0  
  信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,摆脱对国外技术的依赖。近年来,国货国用信创发展势头迅猛,在诸多领域取得了显著成果。这一发展趋势对科技创新产生了深远的推动作用,不仅提升了我国在信息技术领域的自主创新能力,还为经济社会的数字化转型提供了坚实支撑。信创推动核心技术突破信创产业的发展促使企业和科研...
信创工作   0  
  信创技术,即信息技术应用创新产业,旨在实现信息技术领域的自主可控与安全可靠。近年来,信创技术发展迅猛,对中小企业产生了深远的影响,带来了诸多不可忽视的价值。在数字化转型的浪潮中,中小企业面临着激烈的市场竞争和复杂多变的环境,信创技术的出现为它们提供了新的发展机遇和支撑。信创技术对中小企业的影响技术架构变革信创技术促使中...
信创国产化   0  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用