如何在 Python 中实现高效的无限素数生成器?

2024-12-06 08:40:00
摘要:问题描述:这不是作业,我只是好奇。这里的关键词是“无限”。我希望将其用作for p in primes()。我相信这是Haskell中的内置函数。因此,答案不能像“只需进行筛选”这样幼稚。首先,你不知道要消耗多少个连续素数。好吧,假设你一次可以编造 100 个素数。你会使用相同的筛选方法以及素数频率公式吗?我...




我希望将其用作for p in primes()。我相信这是Haskell中的内置函数。


首先,你不知道要消耗多少个连续素数。好吧,假设你一次可以编造 100 个素数。你会使用相同的筛选方法以及素数频率公式吗?



解决方案 1:




import itertools as it
def erat2a( ):
    D = {  }
    yield 2
    for q in it.islice(it.count(3), 0, None, 2):
        p = D.pop(q, None)
        if p is None:
            D[q*q] = q
            yield q
            # old code here:
            # x = p + q
            # while x in D or not (x&1):
            #     x += p
            # changed into:
            x = q + 2*p
            while x in D:
                x += 2*p
            D[x] = p

检查not (x&1)结果证实了x是奇数。但是,由于 q都是p奇数,因此通过添加2*p一半步骤以及奇数测试,可以避免。


如果不介意一些额外的花哨,erat2可以通过以下更改将速度提高 35-40%(注意:由于该itertools.compress功能,需要 Python 2.7+ 或 Python 3+):

import itertools as it
def erat3( ):
    D = { 9: 3, 25: 5 }
    yield 2
    yield 3
    yield 5
    MASK= 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0,
    MODULOS= frozenset( (1, 7, 11, 13, 17, 19, 23, 29) )

    for q in it.compress(
            it.islice(it.count(7), 0, None, 2),
        p = D.pop(q, None)
        if p is None:
            D[q*q] = q
            yield q
            x = q + 2*p
            while x in D or (x%30) not in MODULOS:
                x += 2*p
            D[x] = p

erat3函数利用了这样一个事实:所有素数(2、3、5 除外)对 30 取模后只剩下八个数字:即MODULOS冻结集中包含的数字。因此,在得到最初的三个素数后,我们从 7 开始,处理候选数。

候选数过滤使用函数itertools.compress;“魔法”在于序列MASKMASK有 15 个元素(每 30 个数字中有 15 个奇数,由itertools.islice函数选择),每个可能的候选数都有1,从 7 开始。循环按照itertools.cycle函数指定的方式重复。

候选数过滤的引入需要另一项修改:检查or (x%30) not in MODULOSerat2算法处理所有奇数;现在erat3算法只处理 r30 个候选数,我们需要确保所有奇数都D.keys()只能是这样的 —假— 候选数。



在 Atom 330 Ubuntu 9.10 服务器上,版本 2.6.4 和 3.1.1+:

$ testit
up to 8192
==== python2 erat2 ====
100 loops, best of 3: 18.6 msec per loop
==== python2 erat2a ====
100 loops, best of 3: 14.5 msec per loop
==== python2 erat3 ====
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'compress'
==== python3 erat2 ====
100 loops, best of 3: 19.2 msec per loop
==== python3 erat2a ====
100 loops, best of 3: 14.1 msec per loop
==== python3 erat3 ====
100 loops, best of 3: 11.7 msec per loop

在 AMD Geode LX Gentoo 家庭服务器上,Python 2.6.5 和 3.1.2:

$ testit
up to 8192
==== python2 erat2 ====
10 loops, best of 3: 104 msec per loop
==== python2 erat2a ====
10 loops, best of 3: 81 msec per loop
==== python2 erat3 ====
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'compress'
==== python3 erat2 ====
10 loops, best of 3: 116 msec per loop
==== python3 erat2a ====
10 loops, best of 3: 82 msec per loop
==== python3 erat3 ====
10 loops, best of 3: 66 msec per loop



echo up to $max_num
for python_version in python2 python3
    for function in erat2 erat2a erat3
        echo "==== $python_version $function ===="
        $python_version -O -m timeit -c \n        -s  "import itertools as it, functools as ft, operator as op, primegen; cmp= ft.partial(op.ge, $max_num)" \n            "next(it.dropwhile(cmp, primegen.$function()))"

解决方案 2:

由于 OP 要求高效实施,因此David Eppstein/Alex Martelli对2002 年活跃状态代码进行了重大改进(见此处的回答):在候选者中看到素数的平方之前,不要在字典中记录素数的信息。对于生成的 n 个素数(π(sqrt(n log n)) ~ 2 sqrt(n log n) / log(n log n) ~ 2 sqrt(n / log n)),将空间复杂度降至 O ( sqrt ( n )) 以下,不是 O(n )。因此,时间复杂度也得到了改善,即运行速度更快。


from itertools import count
                                         # ideone.com/aVndFM
def postponed_sieve():                   # postponed sieve, by Will Ness      
    yield 2; yield 3; yield 5; yield 7;  # original code David Eppstein, 
    sieve = {}                           #   Alex Martelli, ActiveState Recipe 2002
    ps = postponed_sieve()               # a separate base Primes Supply:
    p = next(ps) and next(ps)            # (3) a Prime to add to dict
    q = p*p                              # (9) its sQuare 
    for c in count(9,2):                 # the Candidate
        if c in sieve:               # c's a multiple of some base prime
            s = sieve.pop(c)         #     i.e. a composite ; or
        elif c < q:  
             yield c                 # a prime
        else:   # (c==q):            # or the next base prime's square:
            s=count(q+2*p,2*p)       #    (9+6, by 6 : 15,21,27,33,...)
            p=next(ps)               #    (5)
            q=p*p                    #    (25)
        for m in s:                  # the next multiple 
            if m not in sieve:       # no duplicates
        sieve[m] = s                 # original test entry: ideone.com/WFv4f

(此处较旧的原始代码经过了编辑,以纳入如下Tim Peters的回答中所见的更改)。另请参阅此内容以了解相关讨论。

类似基于2-3-5-7 轮子的代码运行速度快约 2.15 倍(非常接近的理论改进3/2 * 5/4 * 7/6 = 2.1875)。

2022 年更新:我最近偶然发现了1990 年代的这个古老的“NESL” ,它实际上使用了相同的sqrt递归技巧。所以太阳底下没有新鲜事。:)

代码可以直接扩充,以直接从给定值开始素数枚举。这可以在这个基于 JS 的条目中看到。

解决方案 3:

为了方便后人,这里重写了Will Ness 的漂亮算法,用于 Python 3。需要进行一些更改(迭代器不再具有.next()方法,但有一个新的next()内置函数)。其他更改只是为了好玩(使用 newyield from <iterable>替换了原始中的四个yield语句。更多更改是为了提高可读性(我不喜欢过度使用 ;-) 1 个字母的变量名)。


def psieve():
    import itertools
    yield from (2, 3, 5, 7)
    D = {}
    ps = psieve()
    p = next(ps)
    assert p == 3
    psq = p*p
    for i in itertools.count(9, 2):
        if i in D:      # composite
            step = D.pop(i)
        elif i < psq:   # prime
            yield i
        else:           # composite, = p*p
            assert i == psq
            step = 2*p
            p = next(ps)
            psq = p*p
        i += step
        while i in D:
            i += step
        D[i] = step

解决方案 4:

这不是我的原始代码,但值得发布。原始代码可在此处找到:http: //code.activestate.com/recipes/117119/

def gen_primes():
  D = {}
  q = 2  # first integer to test for primality.

  while True:
    if q not in D:
      # not marked composite, must be prime  
      yield q 

      #first multiple of q not already marked
      D[q * q] = [q] 
      for p in D[q]:
        D.setdefault(p + q, []).append(p)
      # no longer need D[q], free memory
      del D[q]

    q += 1


primes = gen_primes()
for p in primes:
  print p

在我的桌面上,生成 100 万个素数并将其放入一个集合需要 1.62 秒。

解决方案 5:


对于每个段,将某个区间 [n; n + fragment_size) 内的数字表示为一个位集,并用上限平方根以下的所有素数进行筛选。


解决方案 6:


import itertools
def primeseq():
    prime = [2]
    num = 0
    yield 2
    for i in itertools.count(3, 2):
        is_prime = True
        for num in prime:
            if i % num == 0:
                is_prime = False
            elif num ** 2 > i: 
        if is_prime:
            yield i

解决方案 7:


import heapq

def heapprimegen():
    hp= []
    yield 2
    yield 3
    cn= 3
    nn, inc= 3, 6
    while 1:
        while cn < nn:
            yield cn
            heapq.heappush(hp, (3*cn, 2*cn))
            cn+= 2
        cn= nn+2
        nn, inc= heapq.heappushpop(hp, (nn+inc, inc))


解决方案 8:


此实现使用两个堆(tu 和 wv),它们包含相同数量的元素。每个元素都是一个 int 对。为了找到所有不超过q**2(其中q是素数)的素数,每个堆将包含最多 个2*pi(q-1)元素,其中pi(x)是不大于 的正素数的数量x。因此整数的总数最多为4*pi(floor(sqrt(n)))。(我们可以通过将一半的内容推送到堆中获得 2 的内存因子,但这会使算法变慢。)

上述其他基于字典和堆的方法(例如 erat2b、heap_prime_gen_squares 和 heapprimegen)存储大约 `2*pi(n)' 个整数,因为它们每次找到素数时都会扩展其堆或字典。作为比较:为了找到 1_000_000 个素数,此实现存储少于 4141 个整数,其他实现存储超过 1_000_000 个整数。

import heapq

def heap_prime_gen_smallmem():
    yield 2
    yield 3
    f = 5
    fmar3 = 2
    q = 7
    q6 = 7 * 6
    qmar3 = 4
    tu = [(25, 30), (35, 30)]
    vw = [(25, 30), (35, 30)]
    while True:
        qmar3 += 2   
        if qmar3 == 6:  
            qb = q + 4
            q6b = q6 + 24
            qmar3 = 2
            qb = q + 2
            q6b = q6 + 12
        if q < tu[0][0]:
            d = q * q
            while f < d:
                a, b = vw[0]
                if f < a: 
                    yield f   
                    a, b = vw[0]
                    heapq.heapreplace(vw, (a + b, b))
                    a, b = vw[0]
                    while f >= a:
                        heapq.heapreplace(vw, (a + b, b))
                        a, b = vw[0]   
                fmar3 += 2
                if fmar3 == 6:
                    f += 4
                    fmar3 = 2
                    f += 2
            c = q * qb   
            heapq.heappush(tu, (d, q6))
            heapq.heappush(tu, (c, q6))
            heapq.heappush(vw, (d, q6))
            heapq.heappush(vw, (c, q6))
            a, b = tu[0]
            heapq.heapreplace(tu, (a + b, b))
            a, b = tu[0]  
            while q >= a:
                heapq.heapreplace(tu, (a + b, b))
                a, b = tu[0]
        q = qb
        q6 = q6b

解决方案 9:

这是一个非常快的无限生成器,用 Python2 编写,但可以轻松调整到 Python3。要使用它来添加最多 10**9 的素数,请使用以下命令:

from itertools import takewhile
from functools import partial
from operator import gt
print (sum(takewhile(partial(gt, 10**9), prime_gen_inf())))

它是一种分段筛选法,速度更快,但显然不如 Will Ness 的算法优雅。

from operator import mul
from functools import reduce
def prod(x): return reduce(mul, x, 1)

def build_sieve(wheel):
    w = prod(wheel)
    w_phi = prod([p-1 for p in wheel])
    rems = [a for a in range(w) if all(a % p for p in wheel)]
    assert len(rems) == w_phi
    inv = {a:pow(a, w_phi - 1, w) for a in rems}
        known_p = wheel + rems[1 : rems.index(rems[1]*rems[1])]
    except ValueError:
        known_p = wheel + rems[1:]
    return wheel, w, w_phi, rems, inv, known_p

#Adjust the chunk variable based on your computer's architecture.
#Adjust the line with #! if you don't need "true" infinite.  If you don't need
#primes larger than 1<<32, use array('H', []), if 1<<64 use 'L', if 1<<128 (in
#Python3) use 'Q', otherwise use empty list [].
#To save memory, comment out the lines with #*, and uncomment the commented-out
import itertools
from itertools import islice, count, compress, izip
chain_f = itertools.chain.from_iterable
from array import array
def prime_gen_inf(chunk=250000, sieve_info = build_sieve([2,3,5,7])):
    """    Indefinitely yields primes    """
    wheel, w, w_phi, rems, inv, known_p = sieve_info
    for p in known_p: yield p
    new_n = 0;
    while True:
        size = min(chunk, (p * p - new_n) / w)
        sieve = bytearray([1]) * size * w_phi
        n, new_n = new_n, new_n + size * w
        if not n:
            zero = bytearray([0])
            seen = len(known_p) - len(wheel) + 1
            sieve[:seen:1] = zero * seen
            p_gen = islice(prime_gen_inf(), len(wheel), None)
            new_p = next(p_gen)
            ps = []                                         #! array('H', [])
            p_invs = bytearray([])                                         #*
        while new_p * new_p < new_n:
            p_invs.append(inv[new_p % w])                                  #*
            new_p = next(p_gen)
        for p, p_inv, modp in izip(ps, p_invs, [-n % p for p in ps]):      #*
            s = [(modp + p * (p_inv * (r - modp) % w)) / w for r in rems]  #*
        #for p in ps:
        #    s = [(-n%p + p * (inv[p%w] * (r - -n%p) % w)) / w for r in rems]
            for i, start in enumerate(s):
                slice_size = ((size - start - 1) / p + 1)
                sieve[i + start * w_phi :: p * w_phi] = zero * slice_size
        for p in compress(chain_f(izip(*[count(n+r, w) for r in rems])), sieve):
            yield p

解决方案 10:


import heapq

def heap_prime_gen_squares(): 
    yield 2  
    yield 3  
    h = [(9, 6)]
    n = 5
    while True:
        a, b = h[0]
        while n < a:
            yield n
            heapq.heappush(h, (n * n, n << 1))
            n += 2
        heapq.heapreplace(h, (a + b, b))  # Replace h[0], which is still (a, b).

我对前 100 万个素数的用户时间进行速度测量(数字越小越好):

  • postponed_sieve(基于字典):8.553 秒

  • erat2b(基于字典):9.513 秒

  • erat2a(基于字典):10.313 秒

  • heap_prime_gen_smallmem(基于堆):23.935s

  • heap_prime_gen_squares(基于堆):27.302s

  • heapprimegen(基于字典):145.029s


解决方案 11:

这是一个更接近 Haskell 实现方式的生成器:过滤已知素数的复合素数,然后将剩余素数添加到列表中。

def gen_primes():
    primes = []
    i = 2
    while True:
        prime = True
        for p in primes:
            if not (i % p):
                prime = False
        if prime:
            yield i
        i += 1

解决方案 12:


我使用整数来存储筛选的结果。在二进制格式中,整数是0s 和1s 的列表,0在位置处,i如果i不是素数,则为 ,1如果它可能是素数。必需的无穷大是由于 Python 3 整数是无界的。

def primes():
    container, size = 1 << 2, 3 # we start with 0b100 (from right to left: 0 and 1 are not primes, 2 is
    last_prime = 1
    while True:
        prime = next((j for j in range(last_prime+1, size) if container & 1 << j), None) # find the next prime
        while not prime:
            container, size = expand(container, size, 2**16) # add 65536 cells and sieve the container
            prime = next((j for j in range(last_prime+1, size) if container & 1 << j), None)
        yield prime
        last_prime = prime

1如何扩展容器?只需在容器左侧添加一堆s(二进制格式)并对其进行筛选即可。这与标准筛选法相同,但略有不同。在标准筛选法中,如果我们找到素数i,我们开始在 处交叉单元格i*i,步长为i

这里,这可能已经针对容器的第一部分完成了。如果容器新部分的开始位置比 远,我们只需从新部分的开头开始即可i*i

def expand(container, size, n):
    new_size = size + n
    container += (1 << (new_size + 1) - 1) - (1 << size) # add n 1's
    for i in range(2, new_size):
        if container & (1 << i): # i is a prime
            t = sum(1 << j for j in range(max(i, size // i)*i, new_size, i)) # set 1 for all mutiple
            container &= ~t # cross the cells

    return container, new_size


import itertools
assert 78498 == len(list(itertools.takewhile(lambda p: p<1000000, primes())))
