如何在 Python 中实现高效的无限素数生成器?

2024-12-06 08:40:00
admin
原创
81
摘要:问题描述:这不是作业,我只是好奇。这里的关键词是“无限”。我希望将其用作for p in primes()。我相信这是Haskell中的内置函数。因此,答案不能像“只需进行筛选”这样幼稚。首先,你不知道要消耗多少个连续素数。好吧,假设你一次可以编造 100 个素数。你会使用相同的筛选方法以及素数频率公式吗?我...

问题描述:

这不是作业,我只是好奇。

这里的关键词是“无限”。

我希望将其用作for p in primes()。我相信这是Haskell中的内置函数。

因此,答案不能像“只需进行筛选”这样幼稚。

首先,你不知道要消耗多少个连续素数。好吧,假设你一次可以编造 100 个素数。你会使用相同的筛选方法以及素数频率公式吗?

我更喜欢非并发方法。

感谢您的阅读(和写作;))!


解决方案 1:

“如果我看得更远……”

食谱中的功能erat2可以进一步加速(约20-25%):

erat2a

import itertools as it
def erat2a( ):
    D = {  }
    yield 2
    for q in it.islice(it.count(3), 0, None, 2):
        p = D.pop(q, None)
        if p is None:
            D[q*q] = q
            yield q
        else:
            # old code here:
            # x = p + q
            # while x in D or not (x&1):
            #     x += p
            # changed into:
            x = q + 2*p
            while x in D:
                x += 2*p
            D[x] = p

检查not (x&1)结果证实了x是奇数。但是,由于 q都是p奇数,因此通过添加2*p一半步骤以及奇数测试,可以避免。

时代3

如果不介意一些额外的花哨,erat2可以通过以下更改将速度提高 35-40%(注意:由于该itertools.compress功能,需要 Python 2.7+ 或 Python 3+):

import itertools as it
def erat3( ):
    D = { 9: 3, 25: 5 }
    yield 2
    yield 3
    yield 5
    MASK= 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0,
    MODULOS= frozenset( (1, 7, 11, 13, 17, 19, 23, 29) )

    for q in it.compress(
            it.islice(it.count(7), 0, None, 2),
            it.cycle(MASK)):
        p = D.pop(q, None)
        if p is None:
            D[q*q] = q
            yield q
        else:
            x = q + 2*p
            while x in D or (x%30) not in MODULOS:
                x += 2*p
            D[x] = p

erat3函数利用了这样一个事实:所有素数(2、3、5 除外)对 30 取模后只剩下八个数字:即MODULOS冻结集中包含的数字。因此,在得到最初的三个素数后,我们从 7 开始,处理候选数。

候选数过滤使用函数itertools.compress;“魔法”在于序列MASKMASK有 15 个元素(每 30 个数字中有 15 个奇数,由itertools.islice函数选择),每个可能的候选数都有1,从 7 开始。循环按照itertools.cycle函数指定的方式重复。

候选数过滤的引入需要另一项修改:检查or (x%30) not in MODULOSerat2算法处理所有奇数;现在erat3算法只处理 r30 个候选数,我们需要确保所有奇数都D.keys()只能是这样的 —假— 候选数。

基准

结果

在 Atom 330 Ubuntu 9.10 服务器上,版本 2.6.4 和 3.1.1+:

$ testit
up to 8192
==== python2 erat2 ====
100 loops, best of 3: 18.6 msec per loop
==== python2 erat2a ====
100 loops, best of 3: 14.5 msec per loop
==== python2 erat3 ====
Traceback (most recent call last):
…
AttributeError: 'module' object has no attribute 'compress'
==== python3 erat2 ====
100 loops, best of 3: 19.2 msec per loop
==== python3 erat2a ====
100 loops, best of 3: 14.1 msec per loop
==== python3 erat3 ====
100 loops, best of 3: 11.7 msec per loop

在 AMD Geode LX Gentoo 家庭服务器上,Python 2.6.5 和 3.1.2:

$ testit
up to 8192
==== python2 erat2 ====
10 loops, best of 3: 104 msec per loop
==== python2 erat2a ====
10 loops, best of 3: 81 msec per loop
==== python2 erat3 ====
Traceback (most recent call last):
…
AttributeError: 'module' object has no attribute 'compress'
==== python3 erat2 ====
10 loops, best of 3: 116 msec per loop
==== python3 erat2a ====
10 loops, best of 3: 82 msec per loop
==== python3 erat3 ====
10 loops, best of 3: 66 msec per loop

基准代码

模块primegen.py包含erat2erat2aerat3函数。测试脚本如下:

#!/bin/sh
max_num=${1:-8192}
echo up to $max_num
for python_version in python2 python3
do
    for function in erat2 erat2a erat3
    do
        echo "==== $python_version $function ===="
        $python_version -O -m timeit -c \n        -s  "import itertools as it, functools as ft, operator as op, primegen; cmp= ft.partial(op.ge, $max_num)" \n            "next(it.dropwhile(cmp, primegen.$function()))"
    done
done

解决方案 2:

由于 OP 要求高效实施,因此David Eppstein/Alex Martelli对2002 年活跃状态代码进行了重大改进(见此处的回答):在候选者中看到素数的平方之前,不要在字典中记录素数的信息。对于生成的 n 个素数(π(sqrt(n log n)) ~ 2 sqrt(n log n) / log(n log n) ~ 2 sqrt(n / log n)),将空间复杂度降至 O ( sqrt ( n )) 以下,不是 O(n )。因此,时间复杂度也得到了改善,即运行速度更快。

创建一个“滑动筛”作为每个基素数的当前倍数(即,低于当前生产点的平方根)的字典,以及它们的步长值:

from itertools import count
                                         # ideone.com/aVndFM
def postponed_sieve():                   # postponed sieve, by Will Ness      
    yield 2; yield 3; yield 5; yield 7;  # original code David Eppstein, 
    sieve = {}                           #   Alex Martelli, ActiveState Recipe 2002
    ps = postponed_sieve()               # a separate base Primes Supply:
    p = next(ps) and next(ps)            # (3) a Prime to add to dict
    q = p*p                              # (9) its sQuare 
    for c in count(9,2):                 # the Candidate
        if c in sieve:               # c's a multiple of some base prime
            s = sieve.pop(c)         #     i.e. a composite ; or
        elif c < q:  
             yield c                 # a prime
             continue              
        else:   # (c==q):            # or the next base prime's square:
            s=count(q+2*p,2*p)       #    (9+6, by 6 : 15,21,27,33,...)
            p=next(ps)               #    (5)
            q=p*p                    #    (25)
        for m in s:                  # the next multiple 
            if m not in sieve:       # no duplicates
                break
        sieve[m] = s                 # original test entry: ideone.com/WFv4f

(此处较旧的原始代码经过了编辑,以纳入如下Tim Peters的回答中所见的更改)。另请参阅此内容以了解相关讨论。

类似基于2-3-5-7 轮子的代码运行速度快约 2.15 倍(非常接近的理论改进3/2 * 5/4 * 7/6 = 2.1875)。

2022 年更新:我最近偶然发现了1990 年代的这个古老的“NESL” ,它实际上使用了相同的sqrt递归技巧。所以太阳底下没有新鲜事。:)

代码可以直接扩充,以直接从给定值开始素数枚举。这可以在这个基于 JS 的条目中看到。

解决方案 3:

为了方便后人,这里重写了Will Ness 的漂亮算法,用于 Python 3。需要进行一些更改(迭代器不再具有.next()方法,但有一个新的next()内置函数)。其他更改只是为了好玩(使用 newyield from <iterable>替换了原始中的四个yield语句。更多更改是为了提高可读性(我不喜欢过度使用 ;-) 1 个字母的变量名)。

它比原来的速度快得多,但不是因为算法的原因。加速主要是因为删除了原来的add()函数,而是以内联方式执行。

def psieve():
    import itertools
    yield from (2, 3, 5, 7)
    D = {}
    ps = psieve()
    next(ps)
    p = next(ps)
    assert p == 3
    psq = p*p
    for i in itertools.count(9, 2):
        if i in D:      # composite
            step = D.pop(i)
        elif i < psq:   # prime
            yield i
            continue
        else:           # composite, = p*p
            assert i == psq
            step = 2*p
            p = next(ps)
            psq = p*p
        i += step
        while i in D:
            i += step
        D[i] = step

解决方案 4:

这不是我的原始代码,但值得发布。原始代码可在此处找到:http: //code.activestate.com/recipes/117119/

def gen_primes():
  D = {}
  q = 2  # first integer to test for primality.

  while True:
    if q not in D:
      # not marked composite, must be prime  
      yield q 

      #first multiple of q not already marked
      D[q * q] = [q] 
    else:
      for p in D[q]:
        D.setdefault(p + q, []).append(p)
      # no longer need D[q], free memory
      del D[q]

    q += 1

它是一个发电机,因此可以像其他发电机一样使用它。

primes = gen_primes()
for p in primes:
  print p

在我的桌面上,生成 100 万个素数并将其放入一个集合需要 1.62 秒。

解决方案 5:

进行分段筛选,其中段的大小由可用内存或位集的最大大小决定。

对于每个段,将某个区间 [n; n + fragment_size) 内的数字表示为一个位集,并用上限平方根以下的所有素数进行筛选。

使用位集比哈希表或树数据结构占用更少的内存,因为您处理的是密集的数字集。

解决方案 6:

另一种方法:

import itertools
def primeseq():
    prime = [2]
    num = 0
    yield 2
    for i in itertools.count(3, 2):
        is_prime = True
        for num in prime:
            if i % num == 0:
                is_prime = False
                break
            elif num ** 2 > i: 
                break
        if is_prime:
            prime.append(i)
            yield i

解决方案 7:

还有另一个答案,比我在这里的答案更节省内存erat3

import heapq

def heapprimegen():
    hp= []
    yield 2
    yield 3
    cn= 3
    nn, inc= 3, 6
    while 1:
        while cn < nn:
            yield cn
            heapq.heappush(hp, (3*cn, 2*cn))
            cn+= 2
        cn= nn+2
        nn, inc= heapq.heappushpop(hp, (nn+inc, inc))

它维护的是素数倍数的堆(列表),而不是字典。显然,这会损失一些速度。

解决方案 8:

这是一个复杂的基于堆的实现,它并不比其他基于堆的实现快多少(参见我的另一个答案中的速度比较),但它使用的内存要少得多。

此实现使用两个堆(tu 和 wv),它们包含相同数量的元素。每个元素都是一个 int 对。为了找到所有不超过q**2(其中q是素数)的素数,每个堆将包含最多 个2*pi(q-1)元素,其中pi(x)是不大于 的正素数的数量x。因此整数的总数最多为4*pi(floor(sqrt(n)))。(我们可以通过将一半的内容推送到堆中获得 2 的内存因子,但这会使算法变慢。)

上述其他基于字典和堆的方法(例如 erat2b、heap_prime_gen_squares 和 heapprimegen)存储大约 `2*pi(n)' 个整数,因为它们每次找到素数时都会扩展其堆或字典。作为比较:为了找到 1_000_000 个素数,此实现存储少于 4141 个整数,其他实现存储超过 1_000_000 个整数。

import heapq

def heap_prime_gen_smallmem():
    yield 2
    yield 3
    f = 5
    fmar3 = 2
    q = 7
    q6 = 7 * 6
    qmar3 = 4
    tu = [(25, 30), (35, 30)]
    vw = [(25, 30), (35, 30)]
    while True:
        qmar3 += 2   
        if qmar3 == 6:  
            qb = q + 4
            q6b = q6 + 24
            qmar3 = 2
        else:
            qb = q + 2
            q6b = q6 + 12
        if q < tu[0][0]:
            d = q * q
            while f < d:
                a, b = vw[0]
                if f < a: 
                    yield f   
                else:
                    a, b = vw[0]
                    heapq.heapreplace(vw, (a + b, b))
                    a, b = vw[0]
                    while f >= a:
                        heapq.heapreplace(vw, (a + b, b))
                        a, b = vw[0]   
                fmar3 += 2
                if fmar3 == 6:
                    f += 4
                    fmar3 = 2
                else:
                    f += 2
            c = q * qb   
            heapq.heappush(tu, (d, q6))
            heapq.heappush(tu, (c, q6))
            heapq.heappush(vw, (d, q6))
            heapq.heappush(vw, (c, q6))
        else:
            a, b = tu[0]
            heapq.heapreplace(tu, (a + b, b))
            a, b = tu[0]  
            while q >= a:
                heapq.heapreplace(tu, (a + b, b))
                a, b = tu[0]
        q = qb
        q6 = q6b

解决方案 9:

这是一个非常快的无限生成器,用 Python2 编写,但可以轻松调整到 Python3。要使用它来添加最多 10**9 的素数,请使用以下命令:

from itertools import takewhile
from functools import partial
from operator import gt
print (sum(takewhile(partial(gt, 10**9), prime_gen_inf())))

它是一种分段筛选法,速度更快,但显然不如 Will Ness 的算法优雅。

from operator import mul
from functools import reduce
def prod(x): return reduce(mul, x, 1)


def build_sieve(wheel):
    w = prod(wheel)
    w_phi = prod([p-1 for p in wheel])
    rems = [a for a in range(w) if all(a % p for p in wheel)]
    assert len(rems) == w_phi
    inv = {a:pow(a, w_phi - 1, w) for a in rems}
    try:
        known_p = wheel + rems[1 : rems.index(rems[1]*rems[1])]
    except ValueError:
        known_p = wheel + rems[1:]
    return wheel, w, w_phi, rems, inv, known_p

#Adjust the chunk variable based on your computer's architecture.
#
#Adjust the line with #! if you don't need "true" infinite.  If you don't need
#primes larger than 1<<32, use array('H', []), if 1<<64 use 'L', if 1<<128 (in
#Python3) use 'Q', otherwise use empty list [].
#To save memory, comment out the lines with #*, and uncomment the commented-out
#lines 
import itertools
from itertools import islice, count, compress, izip
chain_f = itertools.chain.from_iterable
from array import array
def prime_gen_inf(chunk=250000, sieve_info = build_sieve([2,3,5,7])):
    """    Indefinitely yields primes    """
    wheel, w, w_phi, rems, inv, known_p = sieve_info
    for p in known_p: yield p
    new_n = 0;
    while True:
        size = min(chunk, (p * p - new_n) / w)
        sieve = bytearray([1]) * size * w_phi
        n, new_n = new_n, new_n + size * w
        if not n:
            zero = bytearray([0])
            seen = len(known_p) - len(wheel) + 1
            sieve[:seen:1] = zero * seen
            p_gen = islice(prime_gen_inf(), len(wheel), None)
            new_p = next(p_gen)
            ps = []                                         #! array('H', [])
            p_invs = bytearray([])                                         #*
        while new_p * new_p < new_n:
            ps.append(new_p)
            p_invs.append(inv[new_p % w])                                  #*
            new_p = next(p_gen)
        for p, p_inv, modp in izip(ps, p_invs, [-n % p for p in ps]):      #*
            s = [(modp + p * (p_inv * (r - modp) % w)) / w for r in rems]  #*
        #for p in ps:
        #    s = [(-n%p + p * (inv[p%w] * (r - -n%p) % w)) / w for r in rems]
            for i, start in enumerate(s):
                slice_size = ((size - start - 1) / p + 1)
                sieve[i + start * w_phi :: p * w_phi] = zero * slice_size
        for p in compress(chain_f(izip(*[count(n+r, w) for r in rems])), sieve):
            yield p

解决方案 10:

这是一个简单但不太慢的方法,使用堆而不是字典:

import heapq

def heap_prime_gen_squares(): 
    yield 2  
    yield 3  
    h = [(9, 6)]
    n = 5
    while True:
        a, b = h[0]
        while n < a:
            yield n
            heapq.heappush(h, (n * n, n << 1))
            n += 2
        heapq.heapreplace(h, (a + b, b))  # Replace h[0], which is still (a, b).

我对前 100 万个素数的用户时间进行速度测量(数字越小越好):

  • postponed_sieve(基于字典):8.553 秒

  • erat2b(基于字典):9.513 秒

  • erat2a(基于字典):10.313 秒

  • heap_prime_gen_smallmem(基于堆):23.935s

  • heap_prime_gen_squares(基于堆):27.302s

  • heapprimegen(基于字典):145.029s

因此基于字典的方法似乎是最快的。

解决方案 11:

这是一个更接近 Haskell 实现方式的生成器:过滤已知素数的复合素数,然后将剩余素数添加到列表中。

def gen_primes():
    primes = []
    i = 2
    while True:
        prime = True
        for p in primes:
            if not (i % p):
                prime = False
                break
        if prime:
            yield i
            primes.append(i)
        i += 1

解决方案 12:

我知道这篇文章已经过时了,但我遇到了这个问题……下面的代码基于一个非常简单的想法:埃拉托斯特尼增长筛法。虽然这个解决方案比这里最好的解决方案慢,但它很容易掌握,而且设计得易于阅读……

我使用整数来存储筛选的结果。在二进制格式中,整数是0s 和1s 的列表,0在位置处,i如果i不是素数,则为 ,1如果它可能是素数。必需的无穷大是由于 Python 3 整数是无界的。

def primes():
    container, size = 1 << 2, 3 # we start with 0b100 (from right to left: 0 and 1 are not primes, 2 is
    last_prime = 1
    while True:
        prime = next((j for j in range(last_prime+1, size) if container & 1 << j), None) # find the next prime
        while not prime:
            container, size = expand(container, size, 2**16) # add 65536 cells and sieve the container
            prime = next((j for j in range(last_prime+1, size) if container & 1 << j), None)
        yield prime
        last_prime = prime

1如何扩展容器?只需在容器左侧添加一堆s(二进制格式)并对其进行筛选即可。这与标准筛选法相同,但略有不同。在标准筛选法中,如果我们找到素数i,我们开始在 处交叉单元格i*i,步长为i

这里,这可能已经针对容器的第一部分完成了。如果容器新部分的开始位置比 远,我们只需从新部分的开头开始即可i*i

def expand(container, size, n):
    new_size = size + n
    container += (1 << (new_size + 1) - 1) - (1 << size) # add n 1's
    for i in range(2, new_size):
        if container & (1 << i): # i is a prime
            t = sum(1 << j for j in range(max(i, size // i)*i, new_size, i)) # set 1 for all mutiple
            container &= ~t # cross the cells

    return container, new_size

测试一百万个素数:

import itertools
assert 78498 == len(list(itertools.takewhile(lambda p: p<1000000, primes())))
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   990  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   26  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   27  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   26  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   53  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用