使用步幅实现高效的移动平均滤波器
- 2025-02-08 08:52:00
- admin 原创
- 44
问题描述:
我最近在本篇文章的答案中了解了步幅,并且想知道如何使用它们来比我在这篇文章中提出的(使用卷积滤波器)更有效地计算移动平均滤波器。
这是我目前所拥有的。它获取原始数组的视图,然后将其滚动必要的量,并对内核值求和以计算平均值。我知道边缘处理不正确,但我可以事后处理这个问题……有没有更好更快的方法?目标是过滤高达 5000x5000 x 16 层的大型浮点数组,这是一项scipy.ndimage.filters.convolve
相当缓慢的任务。
请注意,我正在寻找 8 邻域连接,即 3x3 过滤器取 9 个像素(焦点像素周围 8 个)的平均值,并将该值分配给新图像中的像素。
import numpy, scipy
filtsize = 3
a = numpy.arange(100).reshape((10,10))
b = numpy.lib.stride_tricks.as_strided(a, shape=(a.size,filtsize), strides=(a.itemsize, a.itemsize))
for i in range(0, filtsize-1):
if i > 0:
b += numpy.roll(b, -(pow(filtsize,2)+1)*i, 0)
filtered = (numpy.sum(b, 1) / pow(filtsize,2)).reshape((a.shape[0],a.shape[1]))
scipy.misc.imsave("average.jpg", filtered)
编辑澄清我如何看待这个工作:
当前代码:
使用 stride_tricks 生成一个类似 [[0,1,2],[1,2,3],[2,3,4]...] 的数组,它对应于滤波器内核的顶行。
沿垂直轴滚动以获取内核的中间行 [[10,11,12],[11,12,13],[13,14,15]...] 并将其添加到我在 1) 中获得的数组中
重复此操作以获得内核的底行 [[20,21,22],[21,22,23],[22,23,24]...]。此时,我取每行的总和并将其除以过滤器中的元素数量,得到每个像素的平均值(移动 1 行和 1 列,并且边缘有些奇怪,但我可以稍后处理)。
我希望更好地利用 stride_tricks 来直接获取整个数组的 9 个值或内核元素的总和,或者有人可以说服我另一种更有效的方法......
解决方案 1:
不管怎样,以下是使用“花哨”跨步技巧的方法。我本来打算昨天发布这个,但被实际工作分散了注意力!:)
@Paul 和 @eat 都使用各种其他方法实现了很好的效果。为了继续之前的问题,我想我会发布 N 维等效方法。
但是,你无法显著地击败scipy.ndimage
>1D数组的函数。(但scipy.ndimage.uniform_filter
应该可以击败)scipy.ndimage.convolve
此外,如果您尝试获取多维移动窗口,则每当您无意中复制数组时,内存使用量都可能会激增。虽然初始“滚动”数组只是原始数组内存的一个视图,但复制数组的任何中间步骤都会生成比原始数组大几个数量级的副本(例如,假设您正在使用 100x100 的原始数组... 它的视图(对于 (3,3) 的过滤器大小)将是 98x98x3x3,但使用与原始数组相同的内存。但是,任何副本都将使用完整的98x98x3x3 数组所需的内存量!!)
基本上,当您想要在 ndarray 的单个轴上矢量化移动窗口操作时,使用疯狂步进技巧非常有用。它使得计算诸如移动标准偏差等内容变得非常容易,而且开销很小。当您想开始沿多个轴执行此操作时,这是可能的,但您通常最好使用更专业的函数。(例如scipy.ndimage
,等)
无论如何,你可以这样做:
import numpy as np
def rolling_window_lastaxis(a, window):
"""Directly taken from Erik Rigtorp's post to numpy-discussion.
<http://www.mail-archive.com/numpy-discussion@scipy.org/msg29450.html>"""
if window < 1:
raise ValueError, "`window` must be at least 1."
if window > a.shape[-1]:
raise ValueError, "`window` is too long."
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def rolling_window(a, window):
if not hasattr(window, '__iter__'):
return rolling_window_lastaxis(a, window)
for i, win in enumerate(window):
if win > 1:
a = a.swapaxes(i, -1)
a = rolling_window_lastaxis(a, win)
a = a.swapaxes(-2, i)
return a
filtsize = (3, 3)
a = np.zeros((10,10), dtype=np.float)
a[5:7,5] = 1
b = rolling_window(a, filtsize)
blurred = b.mean(axis=-1).mean(axis=-1)
因此,我们得到的b = rolling_window(a, filtsize)
是一个 8x8x3x3 数组,这实际上是与原始 10x10 数组相同的内存视图。我们可以轻松地沿不同的轴使用不同的过滤器大小,或者仅沿 N 维数组的选定轴进行操作(即,filtsize = (0,3,0,3)
在 4 维数组上会给我们一个 6 维视图)。
然后,我们可以反复将任意函数应用于最后一个轴,以有效地计算移动窗口中的事物。
mean
但是,由于我们在每个步骤(或或其他步骤)中存储的临时数组比原始数组大得多std
,因此这根本不是内存高效的!它也不会非常快。
相当于ndimage
:
blurred = scipy.ndimage.uniform_filter(a, filtsize, output=a)
这将处理各种边界条件,无需临时复制数组即可进行“模糊”,而且速度非常快。跨步技巧是将函数沿一个轴应用于移动窗口的好方法,但通常不是沿多个轴执行的好方法……
无论如何,这只是我的 0.02 美元……
解决方案 2:
我对 Python 不够熟悉,无法为此写出代码,但加速卷积的两种最佳方法是分离过滤器或使用傅里叶变换。
分离式过滤器:卷积为 O(M*N),其中 M 和 N 分别是图像和过滤器中的像素数。由于使用 3×3 内核进行平均过滤相当于先使用 3×1 内核进行过滤,然后使用 1×3 内核进行过滤,因此,(3+3)/(3*3)
通过使用两个一维内核进行连续卷积,您可以获得约 30% 的速度提升(随着内核变大,效果显然会更好)。当然,您仍然可以在这里使用步幅技巧。
傅里叶变换:conv(A,B)
等同于ifft(fft(A)*fft(B))
,即直接空间中的卷积变为傅里叶空间中的乘法,其中A
是您的图像,B
是您的过滤器。由于傅里叶变换的(元素)乘法要求 A 和 B 大小相同,因此 B 是一个数组,其中size(A)
您的内核位于图像的正中心,其他地方为零。要将 3×3 内核放置在数组的中心,您可能需要填充A
为奇数大小。根据您对傅里叶变换的实现,这可能比卷积快得多(并且如果您多次应用相同的过滤器,您可以预先计算fft(B)
,从而节省另外 30% 的计算时间)。
解决方案 3:
让我们来看看:
您的问题不是很清楚,但我现在假设您希望显著改善这种平均方法。
import numpy as np
from numpy.lib import stride_tricks as st
def mf(A, k_shape= (3, 3)):
m= A.shape[0]- 2
n= A.shape[1]- 2
strides= A.strides+ A.strides
new_shape= (m, n, k_shape[0], k_shape[1])
A= st.as_strided(A, shape= new_shape, strides= strides)
return np.sum(np.sum(A, -1), -1)/ np.prod(k_shape)
if __name__ == '__main__':
A= np.arange(100).reshape((10, 10))
print mf(A)
现在,您实际上期望什么样的性能改进?
更新:
首先,警告:当前状态下的代码无法正确适应“内核”形状。不过,这不是我现在主要关心的问题(无论如何,如何正确适应的想法已经存在)。
我只是直观地选择了 4D A 的新形状,对于我来说,考虑将 2D“内核”中心置于原始 2D A 的每个网格位置的中心确实很有意义。
但 4D 整形实际上可能不是“最佳”的。我认为这里真正的问题是求和的性能。为了充分利用机器的缓存架构,应该能够找到“最佳顺序”(4D A)。但是,对于“小型”阵列(它们与机器缓存“协作”)和那些不与机器缓存“协作”(至少不是那么直接)的大型阵列,该顺序可能不一样。
更新 2:
这是 的一个略微修改的版本mf
。显然,最好先将其重塑为 3D 数组,然后只进行点积而不是求和(这样做有好处,因为内核可以是任意的)。但是,它仍然比 Pauls 更新的函数慢 3 倍(在我的计算机上)。
def mf(A):
k_shape= (3, 3)
k= np.prod(k_shape)
m= A.shape[0]- 2
n= A.shape[1]- 2
strides= A.strides* 2
new_shape= (m, n)+ k_shape
A= st.as_strided(A, shape= new_shape, strides= strides)
w= np.ones(k)/ k
return np.dot(A.reshape((m, n, -1)), w)
解决方案 4:
我确信需要修复的一件事是您的视图数组b
。
它有一些未分配内存的项目,因此会崩溃。
鉴于您对算法的新描述,首先需要修复的是您正在跨越分配范围之外a
:
bshape = (a.size-filtsize+1, filtsize)
bstrides = (a.itemsize, a.itemsize)
b = numpy.lib.stride_tricks.as_strided(a, shape=bshape, strides=bstrides)
更新
因为我仍然不太了解该方法,而且似乎有更简单的方法来解决这个问题,所以我只把它放在这里:
A = numpy.arange(100).reshape((10,10))
shifts = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
B = A[1:-1, 1:-1].copy()
for dx,dy in shifts:
xstop = -1+dx or None
ystop = -1+dy or None
B += A[1+dx:xstop, 1+dy:ystop]
B /= 9
...这似乎是一种简单的方法。唯一多余的操作是它只分配和填充B
一次。无论如何,所有加法、除法和索引都必须完成。如果您要处理 16 个波段,B
如果您的意图是保存图像,您仍然只需要分配一次。即使这没有帮助,它也可能澄清为什么我不理解这个问题,或者至少可以作为计时其他方法加速的基准。这在我的笔记本电脑上在一个 5k x 5k 的 float64 数组上运行了 2.6 秒,其中 0.5 是创建B