在 Python 中通过块(n 个)迭代一个迭代器?

2024-12-16 08:35:00
admin
原创
150
摘要:问题描述:您能想到一个好的方法(也许使用 itertools)将迭代器分成给定大小的块吗?因此l=[1,2,3,4,5,6,7]withchunks(l,3)成为迭代器[1,2,3], [4,5,6], [7]我可以想到一个小程序来做到这一点,但使用 itertools 可能不是一个好方法。解决方案 1:gr...

问题描述:

您能想到一个好的方法(也许使用 itertools)将迭代器分成给定大小的块吗?

因此l=[1,2,3,4,5,6,7]withchunks(l,3)成为迭代器[1,2,3], [4,5,6], [7]

我可以想到一个小程序来做到这一点,但使用 itertools 可能不是一个好方法。


解决方案 1:

grouper() 文档中的食谱接近itertools您想要的:

def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    if incomplete == 'ignore':
        return zip(*args)
    else:
        raise ValueError('Expected fill, strict, or ignore')

但是,当最后一块不完整时,这将无法正常工作,因为根据incomplete模式,它要么用填充值填充最后一块,要么引发异常,要么静默删除不完整的块。

在较新版本的食谱中,他们添加了完全符合您要求的batched食谱:

def batched(iterable, n):
    "Batch data into tuples of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

最后,一个不太通用的解决方案只适用于序列,但确实可以根据需要处理最后一个块并保留原始序列的类型:

(my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size))

从 python 3.12 开始,你也可以直接使用itertools.batched。来自文档:

itertools.batched (可迭代,n)

将可迭代对象中的数据批量处理为长度为 n 的元组。最后一批数据可能短于 n。

解决方案 2:

尽管 OP 要求函数以列表或元组的形式返回块,但如果您需要返回迭代器,则可以修改Sven Marnach 的解决方案:

def batched_it(iterable, n):
    "Batch data into iterators of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while True:
        chunk_it = itertools.islice(it, n)
        try:
            first_el = next(chunk_it)
        except StopIteration:
            return
        yield itertools.chain((first_el,), chunk_it)

一些基准:http://pastebin.com/YkKFvm8b

只有当你的函数遍历每个块中的元素时,它才会稍微更有效率。

解决方案 3:

Python 3.12 添加了itertools.batched,它适用于所有可迭代对象(包括列表):

>>> from itertools import batched
>>> for batch in batched('ABCDEFG', 3):
...     print(batch)
('A', 'B', 'C')
('D', 'E', 'F')
('G',)

解决方案 4:

从 python 3.8 开始,有一个使用:=运算符的更简单的解决方案:

def grouper(iterator: Iterator, n: int) -> Iterator[list]:
    while chunk := list(itertools.islice(iterator, n)):
        yield chunk

然后这样称呼它:

>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]

注意:您可以在函数iter中放入grouper一个来Iterable代替一个Iterator

解决方案 5:

这将适用于任何可迭代对象。它返回生成器的生成器(以实现完全灵活性)。我现在意识到它基本上与@reclosedevs 解决方案相同,但没有多余的东西。不需要try...except随着StopIteration向上传播,这正是我们想要的。

当可迭代对象为空时,next(iterable)需要进行调用来引发,因为如果您允许的话,它将永远继续生成空的生成器。StopIteration`islice`

这样更好,因为它只有两行,但却很容易理解。

def grouper(iterable, n):
    while True:
        yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))

请注意,将其next(iterable)放入元组中。否则,如果next(iterable)本身是可迭代的,则将itertools.chain其展平。感谢 Jeremy Brown 指出了这个问题。

解决方案 6:

我今天正在做某件事,想出了一个简单的解决方案。它类似于jsbueno 的group答案,但我相信当 的长度iterable可以被 整除时,他的答案会产生空的s 。我的答案在耗尽n时进行简单检查。iterable

def chunk(iterable, chunk_size):
    """Generates lists of `chunk_size` elements from `iterable`.
    
    
    >>> list(chunk((2, 3, 5, 7), 3))
    [[2, 3, 5], [7]]
    >>> list(chunk((2, 3, 5, 7), 2))
    [[2, 3], [5, 7]]
    """
    iterable = iter(iterable)
    while True:
        chunk = []
        try:
            for _ in range(chunk_size):
                chunk.append(next(iterable))
            yield chunk
        except StopIteration:
            if chunk:
                yield chunk
            break

解决方案 7:

这是一个返回惰性块的;map(list, chunks(...))如果您想要列表,请使用它。

from itertools import islice, chain
from collections import deque

def chunks(items, n):
    items = iter(items)
    for first in items:
        chunk = chain((first,), islice(items, n-1))
        yield chunk
        deque(chunk, 0)

if __name__ == "__main__":
    for chunk in map(list, chunks(range(10), 3)):
        print chunk

    for i, chunk in enumerate(chunks(range(10), 3)):
        if i % 2 == 1:
            print "chunk #%d: %s" % (i, list(chunk))
        else:
            print "skipping #%d" % i

解决方案 8:

简洁的实现是:

chunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))

这是有效的,因为[iter(iterable)]*n是一个包含相同迭代器 n 次的列表;对其进行压缩会从列表中的每个迭代器(即相同的迭代器)中获取一个项目,因此每个 zip 元素包含一组项目n

izip_longest需要完全使用底层可迭代对象,而不是在达到第一个耗尽的迭代器时停止迭代,这会从中截断任何余数iterable。这导致需要过滤掉填充值。因此,稍微更强大的实现是:

def chunker(iterable, n):
    class Filler(object): pass
    return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))

这保证了填充值永远不会是底层可迭代对象中的项。使用上面的定义:

iterable = range(1,11)

map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]

map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]

这个实现几乎满足了你的要求,但是存在一些问题:

def chunks(it, step):
  start = 0
  while True:
    end = start+step
    yield islice(it, start, end)
    start = end

(不同之处在于,因为islice不会引发 StopIteration 或超出it此范围的调用上的任何其他内容,所以会永远产生;还有一个稍微棘手的问题,即islice必须在迭代此生成器之前使用结果)。

功能上生成移动窗口:

izip(count(0, step), count(step, step))

因此这就变成了:

(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))

但是,这仍然会创建一个无限迭代器。因此,您需要 takewhile(或者其他更好的方法)来限制它:

chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))

g = chunk(range(1,11), 3)

tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])

解决方案 9:

“简单胜于复杂”——只需几行代码就能完成这项工作。只需将其放在某个实用程序模块中即可:

def grouper (iterable, n):
    iterable = iter(iterable)
    count = 0
    group = []
    while True:
        try:
            group.append(next(iterable))
            count += 1
            if count % n == 0:
                yield group
                group = []
        except StopIteration:
            yield group
            break

解决方案 10:

代码高尔夫版:

def grouper(iterable, n):
    for i in range(0, len(iterable), n):
        yield iterable[i:i+n]

用法:

>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']

解决方案 11:

我忘了从哪里找到了这个灵感。我对其进行了一些修改,以便与 Windows 注册表中的 MSI GUID 配合使用:

def nslice(s, n, truncate=False, reverse=False):
    """Splits s into n-sized chunks, optionally reversing the chunks."""
    assert n > 0
    while len(s) >= n:
        if reverse: yield s[:n][::-1]
        else: yield s[:n]
        s = s[n:]
    if len(s) and not truncate:
        yield s

reverse不适用于您的问题,但我在该功能中广泛使用它。

>>> [i for i in nslice([1,2,3,4,5,6,7], 3)]
[[1, 2, 3], [4, 5, 6], [7]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True)]
[[1, 2, 3], [4, 5, 6]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True, reverse=True)]
[[3, 2, 1], [6, 5, 4]]

解决方案 12:

干得好。

def chunksiter(l, chunks):
    i,j,n = 0,0,0
    rl = []
    while n < len(l)/chunks:        
        rl.append(l[i:j+chunks])        
        i+=chunks
        j+=j+chunks        
        n+=1
    return iter(rl)


def chunksiter2(l, chunks):
    i,j,n = 0,0,0
    while n < len(l)/chunks:        
        yield l[i:j+chunks]
        i+=chunks
        j+=j+chunks        
        n+=1

例子:

for l in chunksiter([1,2,3,4,5,6,7,8],3):
    print(l)

[1, 2, 3]
[4, 5, 6]
[7, 8]

for l in chunksiter2([1,2,3,4,5,6,7,8],3):
    print(l)

[1, 2, 3]
[4, 5, 6]
[7, 8]


for l in chunksiter2([1,2,3,4,5,6,7,8],5):
    print(l)

[1, 2, 3, 4, 5]
[6, 7, 8]

解决方案 13:

对reclosedev 的答案进行了以下几项改进:

  1. 通过将第一个元素的提取委托给 Python 本身,而不是手动使用/块next中的调用来提高操作效率,减少循环中的样板代码try`except StopIteration:`

  2. 处理用户丢弃任何给定块中的其余元素的情况(例如,break在某些条件下对块 s 进行内循环);在reclosedev 的解决方案中,除了第一个元素(肯定会被消耗)之外,任何其他“跳过”的元素实际上都不会被跳过(它们只是成为下一个块的初始元素,这意味着您不再从n对齐的偏移量中提取数据,并且如果调用者breaksa 循环遍历一个块,则他们必须手动使用剩余元素,即使他们不需要它们)

结合这两个修复可得到:

import collections  # At top of file
from itertools import chain, islice  # At top of file, denamespaced for slight speed boost

# Pre-create a utility "function" that silently consumes and discards all remaining elements in
# an iterator. This is the fastest way to do so on CPython (deque has a specialized mode
# for maxlen=0 that pulls and discards faster than Python level code can, and by precreating
# the deque and prebinding the extend method, you don't even need to create new deques each time)
_consume = collections.deque(maxlen=0).extend

def batched_it(iterable, n):
    "Batch data into sub-iterators of length n. The last batch may be shorter."
    # batched_it('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    n -= 1  # First element pulled for us, pre-decrement n so we don't redo it every loop
    it = iter(iterable)
    for first_el in it:
        chunk_it = islice(it, n)
        try:
            yield chain((first_el,), chunk_it)
        finally:
            _consume(chunk_it)  # Efficiently consume any elements caller didn't consume

在线尝试一下!

解决方案 14:

此函数接受不需要的可迭代对象Sized,因此它也会接受迭代器。它支持无限可迭代对象,如果选择的块大小小于 1,则会出现错误(即使给出 size == 1 实际上是无用的)。

类型注释当然是可选的,如果您愿意,可以删除/参数(仅使位置可用)。iterable

T = TypeVar("T")


def chunk(iterable: Iterable[T], /, size: int) -> Generator[list[T], None, None]:
    """Yield chunks of a given size from an iterable."""
    if size < 1:
        raise ValueError("Cannot make chunks smaller than 1 item.")

    def chunker():
        current_chunk = []
        for item in iterable:
            current_chunk.append(item)

            if len(current_chunk) == size:
                yield current_chunk

                current_chunk = []

        if current_chunk:
            yield current_chunk

    # Chunker generator is returned instead of yielding directly so that the size check
    #  can raise immediately instead of waiting for the first next() call.
    return chunker()

解决方案 15:

递归解决方案:

def batched(i: Iterable, split: int) -> Tuple[Iterable, ...]:
    if chunk := i[:split]:
        yield chunk
        yield from batched(i[split:], split)

解决方案 16:

这是一个简单的示例:

n=2
l = list(range(15))
[l[i:i+n] for i in range(len(l)) if i%n==0]
Out[10]: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14]]

for i in range(len(l)):此部分使用 range() 函数和 len(l) 作为上限来指定对 l 的索引进行迭代。

if i % n == 0:此条件过滤新列表的元素。i % n 检查当前索引 i 是否能被 n 整除且无余数。如果是,则该索引处的元素将包含在新列表中;否则,将跳过该元素。

l[i:i+n]:此部分从 l 中提取子列表。它使用切片符号指定从 i 到 i+n-1 的索引范围。因此,对于满足条件 i % n == 0 的每个索引 i,都会从该索引开始创建一个长度为 n 的子列表。

替代方案(对于更大的东西来说更快):

[l[i:i+n] for i in range(0,len(l),n)]
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用