在 Python 中通过块(n 个)迭代一个迭代器?
- 2024-12-16 08:35:00
- admin 原创
- 150
问题描述:
您能想到一个好的方法(也许使用 itertools)将迭代器分成给定大小的块吗?
因此l=[1,2,3,4,5,6,7]
withchunks(l,3)
成为迭代器[1,2,3], [4,5,6], [7]
我可以想到一个小程序来做到这一点,但使用 itertools 可能不是一个好方法。
解决方案 1:
grouper()
文档中的食谱接近itertools
您想要的:
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
"Collect data into non-overlapping fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
# grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
# grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
args = [iter(iterable)] * n
if incomplete == 'fill':
return zip_longest(*args, fillvalue=fillvalue)
if incomplete == 'strict':
return zip(*args, strict=True)
if incomplete == 'ignore':
return zip(*args)
else:
raise ValueError('Expected fill, strict, or ignore')
但是,当最后一块不完整时,这将无法正常工作,因为根据incomplete
模式,它要么用填充值填充最后一块,要么引发异常,要么静默删除不完整的块。
在较新版本的食谱中,他们添加了完全符合您要求的batched
食谱:
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while (batch := tuple(islice(it, n))):
yield batch
最后,一个不太通用的解决方案只适用于序列,但确实可以根据需要处理最后一个块并保留原始序列的类型:
(my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size))
从 python 3.12 开始,你也可以直接使用itertools.batched
。来自文档:
itertools.batched (可迭代,n)
将可迭代对象中的数据批量处理为长度为 n 的元组。最后一批数据可能短于 n。
解决方案 2:
尽管 OP 要求函数以列表或元组的形式返回块,但如果您需要返回迭代器,则可以修改Sven Marnach 的解决方案:
def batched_it(iterable, n):
"Batch data into iterators of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while True:
chunk_it = itertools.islice(it, n)
try:
first_el = next(chunk_it)
except StopIteration:
return
yield itertools.chain((first_el,), chunk_it)
一些基准:http://pastebin.com/YkKFvm8b
只有当你的函数遍历每个块中的元素时,它才会稍微更有效率。
解决方案 3:
Python 3.12 添加了itertools.batched,它适用于所有可迭代对象(包括列表):
>>> from itertools import batched
>>> for batch in batched('ABCDEFG', 3):
... print(batch)
('A', 'B', 'C')
('D', 'E', 'F')
('G',)
解决方案 4:
从 python 3.8 开始,有一个使用:=
运算符的更简单的解决方案:
def grouper(iterator: Iterator, n: int) -> Iterator[list]:
while chunk := list(itertools.islice(iterator, n)):
yield chunk
然后这样称呼它:
>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
注意:您可以在函数iter
中放入grouper
一个来Iterable
代替一个Iterator
。
解决方案 5:
这将适用于任何可迭代对象。它返回生成器的生成器(以实现完全灵活性)。我现在意识到它基本上与@reclosedevs 解决方案相同,但没有多余的东西。不需要try...except
随着StopIteration
向上传播,这正是我们想要的。
当可迭代对象为空时,next(iterable)
需要进行调用来引发,因为如果您允许的话,它将永远继续生成空的生成器。StopIteration
`islice`
这样更好,因为它只有两行,但却很容易理解。
def grouper(iterable, n):
while True:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
请注意,将其next(iterable)
放入元组中。否则,如果next(iterable)
本身是可迭代的,则将itertools.chain
其展平。感谢 Jeremy Brown 指出了这个问题。
解决方案 6:
我今天正在做某件事,想出了一个简单的解决方案。它类似于jsbueno 的group
答案,但我相信当 的长度iterable
可以被 整除时,他的答案会产生空的s 。我的答案在耗尽n
时进行简单检查。iterable
def chunk(iterable, chunk_size):
"""Generates lists of `chunk_size` elements from `iterable`.
>>> list(chunk((2, 3, 5, 7), 3))
[[2, 3, 5], [7]]
>>> list(chunk((2, 3, 5, 7), 2))
[[2, 3], [5, 7]]
"""
iterable = iter(iterable)
while True:
chunk = []
try:
for _ in range(chunk_size):
chunk.append(next(iterable))
yield chunk
except StopIteration:
if chunk:
yield chunk
break
解决方案 7:
这是一个返回惰性块的;map(list, chunks(...))
如果您想要列表,请使用它。
from itertools import islice, chain
from collections import deque
def chunks(items, n):
items = iter(items)
for first in items:
chunk = chain((first,), islice(items, n-1))
yield chunk
deque(chunk, 0)
if __name__ == "__main__":
for chunk in map(list, chunks(range(10), 3)):
print chunk
for i, chunk in enumerate(chunks(range(10), 3)):
if i % 2 == 1:
print "chunk #%d: %s" % (i, list(chunk))
else:
print "skipping #%d" % i
解决方案 8:
简洁的实现是:
chunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))
这是有效的,因为[iter(iterable)]*n
是一个包含相同迭代器 n 次的列表;对其进行压缩会从列表中的每个迭代器(即相同的迭代器)中获取一个项目,因此每个 zip 元素包含一组项目n
。
izip_longest
需要完全使用底层可迭代对象,而不是在达到第一个耗尽的迭代器时停止迭代,这会从中截断任何余数iterable
。这导致需要过滤掉填充值。因此,稍微更强大的实现是:
def chunker(iterable, n):
class Filler(object): pass
return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))
这保证了填充值永远不会是底层可迭代对象中的项。使用上面的定义:
iterable = range(1,11)
map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]
map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]
这个实现几乎满足了你的要求,但是存在一些问题:
def chunks(it, step):
start = 0
while True:
end = start+step
yield islice(it, start, end)
start = end
(不同之处在于,因为islice
不会引发 StopIteration 或超出it
此范围的调用上的任何其他内容,所以会永远产生;还有一个稍微棘手的问题,即islice
必须在迭代此生成器之前使用结果)。
功能上生成移动窗口:
izip(count(0, step), count(step, step))
因此这就变成了:
(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))
但是,这仍然会创建一个无限迭代器。因此,您需要 takewhile(或者其他更好的方法)来限制它:
chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))
g = chunk(range(1,11), 3)
tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])
解决方案 9:
“简单胜于复杂”——只需几行代码就能完成这项工作。只需将其放在某个实用程序模块中即可:
def grouper (iterable, n):
iterable = iter(iterable)
count = 0
group = []
while True:
try:
group.append(next(iterable))
count += 1
if count % n == 0:
yield group
group = []
except StopIteration:
yield group
break
解决方案 10:
代码高尔夫版:
def grouper(iterable, n):
for i in range(0, len(iterable), n):
yield iterable[i:i+n]
用法:
>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']
解决方案 11:
我忘了从哪里找到了这个灵感。我对其进行了一些修改,以便与 Windows 注册表中的 MSI GUID 配合使用:
def nslice(s, n, truncate=False, reverse=False):
"""Splits s into n-sized chunks, optionally reversing the chunks."""
assert n > 0
while len(s) >= n:
if reverse: yield s[:n][::-1]
else: yield s[:n]
s = s[n:]
if len(s) and not truncate:
yield s
reverse
不适用于您的问题,但我在该功能中广泛使用它。
>>> [i for i in nslice([1,2,3,4,5,6,7], 3)]
[[1, 2, 3], [4, 5, 6], [7]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True)]
[[1, 2, 3], [4, 5, 6]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True, reverse=True)]
[[3, 2, 1], [6, 5, 4]]
解决方案 12:
干得好。
def chunksiter(l, chunks):
i,j,n = 0,0,0
rl = []
while n < len(l)/chunks:
rl.append(l[i:j+chunks])
i+=chunks
j+=j+chunks
n+=1
return iter(rl)
def chunksiter2(l, chunks):
i,j,n = 0,0,0
while n < len(l)/chunks:
yield l[i:j+chunks]
i+=chunks
j+=j+chunks
n+=1
例子:
for l in chunksiter([1,2,3,4,5,6,7,8],3):
print(l)
[1, 2, 3]
[4, 5, 6]
[7, 8]
for l in chunksiter2([1,2,3,4,5,6,7,8],3):
print(l)
[1, 2, 3]
[4, 5, 6]
[7, 8]
for l in chunksiter2([1,2,3,4,5,6,7,8],5):
print(l)
[1, 2, 3, 4, 5]
[6, 7, 8]
解决方案 13:
对reclosedev 的答案进行了以下几项改进:
通过将第一个元素的提取委托给 Python 本身,而不是手动使用/块
next
中的调用来提高操作效率,减少循环中的样板代码try
`except StopIteration:`处理用户丢弃任何给定块中的其余元素的情况(例如,
break
在某些条件下对块 s 进行内循环);在reclosedev 的解决方案中,除了第一个元素(肯定会被消耗)之外,任何其他“跳过”的元素实际上都不会被跳过(它们只是成为下一个块的初始元素,这意味着您不再从n
对齐的偏移量中提取数据,并且如果调用者break
sa 循环遍历一个块,则他们必须手动使用剩余元素,即使他们不需要它们)
结合这两个修复可得到:
import collections # At top of file
from itertools import chain, islice # At top of file, denamespaced for slight speed boost
# Pre-create a utility "function" that silently consumes and discards all remaining elements in
# an iterator. This is the fastest way to do so on CPython (deque has a specialized mode
# for maxlen=0 that pulls and discards faster than Python level code can, and by precreating
# the deque and prebinding the extend method, you don't even need to create new deques each time)
_consume = collections.deque(maxlen=0).extend
def batched_it(iterable, n):
"Batch data into sub-iterators of length n. The last batch may be shorter."
# batched_it('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
n -= 1 # First element pulled for us, pre-decrement n so we don't redo it every loop
it = iter(iterable)
for first_el in it:
chunk_it = islice(it, n)
try:
yield chain((first_el,), chunk_it)
finally:
_consume(chunk_it) # Efficiently consume any elements caller didn't consume
在线尝试一下!
解决方案 14:
此函数接受不需要的可迭代对象Sized
,因此它也会接受迭代器。它支持无限可迭代对象,如果选择的块大小小于 1,则会出现错误(即使给出 size == 1 实际上是无用的)。
类型注释当然是可选的,如果您愿意,可以删除/
参数(仅使位置可用)。iterable
T = TypeVar("T")
def chunk(iterable: Iterable[T], /, size: int) -> Generator[list[T], None, None]:
"""Yield chunks of a given size from an iterable."""
if size < 1:
raise ValueError("Cannot make chunks smaller than 1 item.")
def chunker():
current_chunk = []
for item in iterable:
current_chunk.append(item)
if len(current_chunk) == size:
yield current_chunk
current_chunk = []
if current_chunk:
yield current_chunk
# Chunker generator is returned instead of yielding directly so that the size check
# can raise immediately instead of waiting for the first next() call.
return chunker()
解决方案 15:
递归解决方案:
def batched(i: Iterable, split: int) -> Tuple[Iterable, ...]:
if chunk := i[:split]:
yield chunk
yield from batched(i[split:], split)
解决方案 16:
这是一个简单的示例:
n=2
l = list(range(15))
[l[i:i+n] for i in range(len(l)) if i%n==0]
Out[10]: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14]]
for i in range(len(l)):此部分使用 range() 函数和 len(l) 作为上限来指定对 l 的索引进行迭代。
if i % n == 0:此条件过滤新列表的元素。i % n 检查当前索引 i 是否能被 n 整除且无余数。如果是,则该索引处的元素将包含在新列表中;否则,将跳过该元素。
l[i:i+n]:此部分从 l 中提取子列表。它使用切片符号指定从 i 到 i+n-1 的索引范围。因此,对于满足条件 i % n == 0 的每个索引 i,都会从该索引开始创建一个长度为 n 的子列表。
替代方案(对于更大的东西来说更快):
[l[i:i+n] for i in range(0,len(l),n)]