将 x 和 y 数组点的笛卡尔积转换为单个二维点数组

2024-11-26 08:37:00
admin
原创
182
摘要:问题描述:我有两个 numpy 数组,用于定义网格的 x 轴和 y 轴。例如:x = numpy.array([1,2,3]) y = numpy.array([4,5]) 我想生成这些数组的笛卡尔积来生成:array([[1,4],[2,4],[3,4],[1,5],[2,5],[3,5]]) 在某种程度上...

问题描述:

我有两个 numpy 数组,用于定义网格的 x 轴和 y 轴。例如:

x = numpy.array([1,2,3])
y = numpy.array([4,5])

我想生成这些数组的笛卡尔积来生成:

array([[1,4],[2,4],[3,4],[1,5],[2,5],[3,5]])

在某种程度上,这并不是非常低效,因为我需要在循环中多次执行此操作。我假设将它们转换为 Python 列表并使用itertools.product并返回到 numpy 数组不是最有效的形式。


解决方案 1:

cartesian_product几乎是规范的

有许多方法可以解决这个问题,它们具有不同的特性。有些方法比其他方法更快,有些方法更通用。经过大量测试和调整后,我发现以下函数(用于计算 n 维)对于许多输入都比大多数其他函数更快。对于一对稍微复杂一些但在许多情况下甚至更快的方法,请参阅Paul Panzercartesian_product的答案。

鉴于这个答案,这不再是我所知道的笛卡尔积最快的numpy实现。然而,我认为它的简单性将继续使其成为未来改进的有用基准:

def cartesian_product(*arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

值得一提的是,此函数的使用ix_方式不同寻常;而记录的用法ix_是生成数组中的索引 ,恰好具有相同形状的数组可用于广播分配。非常感谢mgilson,他启发我尝试使用这种方式,以及unutbu,他对这个答案提供了一些非常有用的反馈,包括使用 的建议。ix_`numpy.result_type`

值得注意的替代方案

有时按 Fortran 顺序写入连续的内存块会更快。这就是此替代方案的基础,cartesian_product_transpose在某些硬件上已被证明比 更快cartesian_product(见下文)。但是,使用相同原理的 Paul Panzer 的答案甚至更快。不过,我还是在这里为感兴趣的读者提供了这一点:

def cartesian_product_transpose(*arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

在理解了 Panzer 的方法之后,我编写了一个新版本,它几乎和他的一样快,而且几乎一样简单cartesian_product

def cartesian_product_simple_transpose(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([la] + [len(a) for a in arrays], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[i, ...] = a
    return arr.reshape(la, -1).T

这似乎有一些恒定时间开销,使得它在小输入时运行速度比 Panzer 慢。但对于较大的输入,在我运行的所有测试中,它的表现与他最快的实现一样好(cartesian_product_transpose_pp)。

在以下部分中,我将介绍一些其他替代方案的测试。这些测试现在有些过时了,但是为了避免重复工作,我决定将它们保留在这里,以保留历史意义。有关最新测试,请参阅 Panzer 的回答以及Nico Schlömer的回答。

针对替代方案进行测试

以下是一系列测试,显示了其中一些函数相对于其他许多替代方案的性能提升。此处显示的所有测试均在四核机器上执行,运行 Mac OS 10.12.5、Python 3.6.1 和numpy1.12.1。众所周知,硬件和软件的变化会产生不同的结果,因此 YMMV。亲自运行这些测试以确保万无一失!

定义:

import numpy
import itertools
from functools import reduce

### Two-dimensional products ###

def repeat_product(x, y):
    return numpy.transpose([numpy.tile(x, len(y)), 
                            numpy.repeat(y, len(x))])

def dstack_product(x, y):
    return numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)

### Generalized N-dimensional products ###

def cartesian_product(*arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

def cartesian_product_transpose(*arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

# from https://stackoverflow.com/a/1235363/577088

def cartesian_product_recursive(*arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:,0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m,1:])
        for j in range(1, arrays[0].size):
            out[j*m:(j+1)*m,1:] = out[0:m,1:]
    return out

def cartesian_product_itertools(*arrays):
    return numpy.array(list(itertools.product(*arrays)))

### Test code ###

name_func = [('repeat_product',                                                 
              repeat_product),                                                  
             ('dstack_product',                                                 
              dstack_product),                                                  
             ('cartesian_product',                                              
              cartesian_product),                                               
             ('cartesian_product_transpose',                                    
              cartesian_product_transpose),                                     
             ('cartesian_product_recursive',                           
              cartesian_product_recursive),                            
             ('cartesian_product_itertools',                                    
              cartesian_product_itertools)]

def test(in_arrays, test_funcs):
    global func
    global arrays
    arrays = in_arrays
    for name, func in test_funcs:
        print('{}:'.format(name))
        %timeit func(*arrays)

def test_all(*in_arrays):
    test(in_arrays, name_func)

# `cartesian_product_recursive` throws an 
# unexpected error when used on more than
# two input arrays, so for now I've removed
# it from these tests.

def test_cartesian(*in_arrays):
    test(in_arrays, name_func[2:4] + name_func[-1:])

x10 = [numpy.arange(10)]
x50 = [numpy.arange(50)]
x100 = [numpy.arange(100)]
x500 = [numpy.arange(500)]
x1000 = [numpy.arange(1000)]

测试结果:

In [2]: test_all(*(x100 * 2))
repeat_product:
67.5 µs ± 633 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
dstack_product:
67.7 µs ± 1.09 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product:
33.4 µs ± 558 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product_transpose:
67.7 µs ± 932 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product_recursive:
215 µs ± 6.01 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_itertools:
3.65 ms ± 38.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [3]: test_all(*(x500 * 2))
repeat_product:
1.31 ms ± 9.28 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
dstack_product:
1.27 ms ± 7.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product:
375 µs ± 4.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_transpose:
488 µs ± 8.88 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_recursive:
2.21 ms ± 38.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
105 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [4]: test_all(*(x1000 * 2))
repeat_product:
10.2 ms ± 132 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
dstack_product:
12 ms ± 120 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product:
4.75 ms ± 57.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_transpose:
7.76 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_recursive:
13 ms ± 209 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
422 ms ± 7.77 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

在所有情况下,cartesian_product如本答案开头所定义的那样,都是最快的。

对于那些接受任意数量输入数组的函数,也值得检查一下性能len(arrays) > 2。(在我能够确定为什么cartesian_product_recursive在这种情况下会抛出错误之前,我已将其从这些测试中删除。)

In [5]: test_cartesian(*(x100 * 3))
cartesian_product:
8.8 ms ± 138 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_transpose:
7.87 ms ± 91.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
518 ms ± 5.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: test_cartesian(*(x50 * 4))
cartesian_product:
169 ms ± 5.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_transpose:
184 ms ± 4.32 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_itertools:
3.69 s ± 73.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [7]: test_cartesian(*(x10 * 6))
cartesian_product:
26.5 ms ± 449 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_transpose:
16 ms ± 133 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
728 ms ± 16 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [8]: test_cartesian(*(x10 * 7))
cartesian_product:
650 ms ± 8.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cartesian_product_transpose:
518 ms ± 7.09 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cartesian_product_itertools:
8.13 s ± 122 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

正如这些测试所示,cartesian_product在输入数组数量超过(大约)四个之前,它一直保持竞争力。此后,cartesian_product_transpose它确实略有优势。

值得重申的是,使用其他硬件和操作系统的用户可能会看到不同的结果。例如,unutbu 报告称,使用 Ubuntu 14.04、Python 3.4.3 和numpy1.14.0.dev0+b7050a9 进行这些测试时看到以下结果:

>>> %timeit cartesian_product_transpose(x500, y500) 
1000 loops, best of 3: 682 µs per loop
>>> %timeit cartesian_product(x500, y500)
1000 loops, best of 3: 1.55 ms per loop

下面,我将详细介绍我之前按照这些思路进行的测试。这些方法的相对性能随着时间的推移而发生变化,适用于不同的硬件和不同版本的 Python 和numpy。虽然它对于使用最新版本的人来说并不是立即有用的numpy,但它说明了自此答案的第一个版本以来情况发生了怎样的变化。

一个简单的替代方案:meshgrid+dstack

目前接受的答案使用tilerepeat将两个数组一起广播。但该函数实际上做了同样的事情。这是传递给转置之前和的meshgrid输出:tile`repeat`

In [1]: import numpy
In [2]: x = numpy.array([1,2,3])
   ...: y = numpy.array([4,5])
   ...: 

In [3]: [numpy.tile(x, len(y)), numpy.repeat(y, len(x))]
Out[3]: [array([1, 2, 3, 1, 2, 3]), array([4, 4, 4, 5, 5, 5])]

下面是输出meshgrid

In [4]: numpy.meshgrid(x, y)
Out[4]: 
[array([[1, 2, 3],
        [1, 2, 3]]), array([[4, 4, 4],
        [5, 5, 5]])]

如您所见,它几乎完全相同。我们只需要重塑结果即可获得完全相同的结果。

In [5]: xt, xr = numpy.meshgrid(x, y)
   ...: [xt.ravel(), xr.ravel()]
Out[5]: [array([1, 2, 3, 1, 2, 3]), array([4, 4, 4, 5, 5, 5])]

不过,我们可以先将输出传递给meshgridto dstack,然后再进行重塑,而不是在此时进行重塑,这样可以节省一些工作:

In [6]: numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)
Out[6]: 
array([[1, 4],
       [2, 4],
       [3, 4],
       [1, 5],
       [2, 5],
       [3, 5]])

与此评论中的说法相反,我没有看到任何证据表明不同的输入会产生不同形状的输出,并且如上所述,它们的作用非常相似,所以如果它们确实如此,那就很奇怪了。如果您找到反例,请告诉我。

测试meshgrid+dstackrepeat+transpose

meshgrid这两种方法的相对性能随着时间的推移而发生了变化。在早期版本的 Python (2.7) 中,对于较小的输入,使用+ 的结果dstack明显更快。(请注意,这些测试来自此答案的旧版本。)定义:

>>> def repeat_product(x, y):
...     return numpy.transpose([numpy.tile(x, len(y)), 
                                numpy.repeat(y, len(x))])
...
>>> def dstack_product(x, y):
...     return numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)
...     

对于中等大小的输入,我看到了显著的速度提升。但我numpy在较新的机器上使用较新版本的 Python (3.6.1) 和 (1.12.1) 重新尝试了这些测试。这两种方法现在几乎完全相同。

旧测试

>>> x, y = numpy.arange(500), numpy.arange(500)
>>> %timeit repeat_product(x, y)
10 loops, best of 3: 62 ms per loop
>>> %timeit dstack_product(x, y)
100 loops, best of 3: 12.2 ms per loop

新测试

In [7]: x, y = numpy.arange(500), numpy.arange(500)
In [8]: %timeit repeat_product(x, y)
1.32 ms ± 24.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [9]: %timeit dstack_product(x, y)
1.26 ms ± 8.47 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

与往常一样,YMMV,但这表明在最新版本的 Python 和 numpy 中,它们是可以互换的。

广义产品功能

一般来说,我们可能认为使用内置函数对于小输入会更快,而对于大输入,专用函数可能会更快。此外,对于广义的 n 维乘积,tilerepeat不会有帮助,因为它们没有明确的高维类似物。因此,也值得研究专用函数的行为。

大多数相关测试都出现在这个答案的开头,但这里是在早期版本的 Python 上执行的一些测试,numpy以供比较。

另一个答案cartesian中定义的函数对于较大的输入表现相当好。(它与上面调用的函数相同。)为了与进行比较,我们只使用两个维度。cartesian_product_recursive`cartesian`dstack_prodct

旧测试再次显示出显著的差异,而新测试几乎没有显示出任何差异。

旧测试

>>> x, y = numpy.arange(1000), numpy.arange(1000)
>>> %timeit cartesian([x, y])
10 loops, best of 3: 25.4 ms per loop
>>> %timeit dstack_product(x, y)
10 loops, best of 3: 66.6 ms per loop

新测试

In [10]: x, y = numpy.arange(1000), numpy.arange(1000)
In [11]: %timeit cartesian([x, y])
12.1 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [12]: %timeit dstack_product(x, y)
12.7 ms ± 334 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

与以前一样,在较小规模上dstack_product仍然有效。cartesian

新测试未显示多余的旧测试

In [13]: x, y = numpy.arange(100), numpy.arange(100)
In [14]: %timeit cartesian([x, y])
215 µs ± 4.75 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [15]: %timeit dstack_product(x, y)
65.7 µs ± 1.15 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

我认为这些区别很有趣,值得记录下来;但最终它们只是纸上谈兵。正如本答案开头的测试所示,所有这些版本几乎总是比cartesian_product本答案开头定义的 慢——而这本身比这个问题答案中最快的实现要慢一点。

解决方案 2:

>>> numpy.transpose([numpy.tile(x, len(y)), numpy.repeat(y, len(x))])
array([[1, 4],
       [2, 4],
       [3, 4],
       [1, 5],
       [2, 5],
       [3, 5]])

请参阅使用 numpy 构建两个数组的所有组合的数组,了解计算 N 个数组的笛卡尔积的一般解决方案。

解决方案 3:

你可以在 python 中进行正常的列表理解

x = numpy.array([1,2,3])
y = numpy.array([4,5])
[[x0, y0] for x0 in x for y0 in y]

这应该给你

[[1, 4], [1, 5], [2, 4], [2, 5], [3, 4], [3, 5]]

解决方案 4:

我也对此很感兴趣,并做了一些性能比较,可能比@senderle 的回答更清楚一些。

对于两个数组(经典情况):

在此处输入图片描述

对于四个阵列:

在此处输入图片描述

(请注意,这里数组的长度只有几十个条目。)


重现情节的代码:

from functools import reduce
import itertools
import numpy
import perfplot


def dstack_product(arrays):
    return numpy.dstack(numpy.meshgrid(*arrays, indexing="ij")).reshape(-1, len(arrays))


# Generalized N-dimensional products
def cartesian_product(arrays):
    la = len(arrays)
    dtype = numpy.find_common_type([a.dtype for a in arrays], [])
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[..., i] = a
    return arr.reshape(-1, la)


def cartesian_product_transpose(arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = reduce(numpy.multiply, broadcasted[0].shape), len(broadcasted)
    dtype = numpy.find_common_type([a.dtype for a in arrays], [])

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T


# from https://stackoverflow.com/a/1235363/577088
def cartesian_product_recursive(arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:, 0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m, 1:])
        for j in range(1, arrays[0].size):
            out[j * m : (j + 1) * m, 1:] = out[0:m, 1:]
    return out


def cartesian_product_itertools(arrays):
    return numpy.array(list(itertools.product(*arrays)))


perfplot.show(
    setup=lambda n: 2 * (numpy.arange(n, dtype=float),),
    n_range=[2 ** k for k in range(13)],
    # setup=lambda n: 4 * (numpy.arange(n, dtype=float),),
    # n_range=[2 ** k for k in range(6)],
    kernels=[
        dstack_product,
        cartesian_product,
        cartesian_product_transpose,
        cartesian_product_recursive,
        cartesian_product_itertools,
    ],
    logx=True,
    logy=True,
    xlabel="len(a), len(b)",
    equality_check=None,
)

解决方案 5:

在@senderle 的示范基础工作的基础上,我提出了两个版本 - 一个用于 C,一个用于 Fortran 布局 - 通常速度更快一些。

  • cartesian_product_transpose_pp`cartesian_product_transpose与使用完全不同策略的@senderle 不同-cartesion_product`该版本的一个版本使用更有利的转置内存布局+一些非常小的优化。

  • cartesian_product_pp坚持使用原始内存布局。它之所以速度快是因为它使用连续复制。事实证明,连续复制要快得多,以至于复制整个内存块(即使其中只有一部分包含有效数据)比仅复制有效位更可取。

一些性能图。我为 C 和 Fortran 布局制作了单独的性能图,因为在我看来,它们是不同的任务。

以“pp”结尾的名字是我的方法。

1)许多微小的因素(每个因素 2 个元素)

在此处输入图片描述在此处输入图片描述

2)许多小因素(每个因素 4 个)

在此处输入图片描述在此处输入图片描述

3)三个等长因子

在此处输入图片描述在此处输入图片描述

4) 两个相等长度的因子

在此处输入图片描述在此处输入图片描述

代码(需要对每个图进行单独运行,因为我不知道如何重置;还需要适当地编辑/注释/删除):

import numpy
import numpy as np
from functools import reduce
import itertools
import timeit
import perfplot

def dstack_product(arrays):
    return numpy.dstack(
        numpy.meshgrid(*arrays, indexing='ij')
        ).reshape(-1, len(arrays))

def cartesian_product_transpose_pp(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty((la, *map(len, arrays)), dtype=dtype)
    idx = slice(None), *itertools.repeat(None, la)
    for i, a in enumerate(arrays):
        arr[i, ...] = a[idx[:la-i]]
    return arr.reshape(la, -1).T

def cartesian_product(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

def cartesian_product_transpose(arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

from itertools import accumulate, repeat, chain

def cartesian_product_pp(arrays, out=None):
    la = len(arrays)
    L = *map(len, arrays), la
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty(L, dtype=dtype)
    arrs = *accumulate(chain((arr,), repeat(0, la-1)), np.ndarray.__getitem__),
    idx = slice(None), *itertools.repeat(None, la-1)
    for i in range(la-1, 0, -1):
        arrs[i][..., i] = arrays[i][idx[:la-i]]
        arrs[i-1][1:] = arrs[i]
    arr[..., 0] = arrays[0][idx]
    return arr.reshape(-1, la)

def cartesian_product_itertools(arrays):
    return numpy.array(list(itertools.product(*arrays)))


# from https://stackoverflow.com/a/1235363/577088
def cartesian_product_recursive(arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:, 0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m, 1:])
        for j in range(1, arrays[0].size):
            out[j*m:(j+1)*m, 1:] = out[0:m, 1:]
    return out

### Test code ###
if False:
  perfplot.save('cp_4el_high.png',
    setup=lambda n: n*(numpy.arange(4, dtype=float),),
                n_range=list(range(6, 11)),
    kernels=[
        dstack_product,
        cartesian_product_recursive,
        cartesian_product,
#        cartesian_product_transpose,
        cartesian_product_pp,
#        cartesian_product_transpose_pp,
        ],
    logx=False,
    logy=True,
    xlabel='#factors',
    equality_check=None
    )
else:
  perfplot.save('cp_2f_T.png',
    setup=lambda n: 2*(numpy.arange(n, dtype=float),),
    n_range=[2**k for k in range(5, 11)],
    kernels=[
#        dstack_product,
#        cartesian_product_recursive,
#        cartesian_product,
        cartesian_product_transpose,
#        cartesian_product_pp,
        cartesian_product_transpose_pp,
        ],
    logx=True,
    logy=True,
    xlabel='length of each factor',
    equality_check=None
    )

解决方案 6:

自 2017 年 10 月起,numpy 现在具有一个np.stack采用 axis 参数的通用函数。使用它,我们可以使用“dstack 和 meshgrid”技术获得“广义笛卡尔积”:

import numpy as np

def cartesian_product(*arrays):
    ndim = len(arrays)
    return (np.stack(np.meshgrid(*arrays), axis=-1)
              .reshape(-1, ndim))

a = np.array([1,2])
b = np.array([10,20])
cartesian_product(a,b)

# output:
# array([[ 1, 10],
#        [ 2, 10],
#        [ 1, 20],
#        [ 2, 20]])  

注意参数axis=-1。这是结果中的最后一个(最内层)轴。它相当于使用axis=ndim

另一条评论,由于笛卡尔积很快就会膨胀,除非我们因为某种原因需要itertools在内存中实现数组,否则如果乘积非常大,我们可能希望即时利用和使用这些值。

解决方案 7:

Scikit-learn包可以快速实现这一点:

from sklearn.utils.extmath import cartesian
product = cartesian((x,y))

请注意,如果您关心输出的顺序,此实现的约定与您想要的不同。对于您的确切顺序,您可以这样做

product = cartesian((y,x))[:, ::-1]

解决方案 8:

我使用了@kennytm 的答案一段时间,但当我尝试在 TensorFlow 中执行相同操作时,却发现 TensorFlow 没有等效的numpy.repeat()。经过一些实验,我想我找到了一个更通用的任意点向量解决方案。

对于numpy:

import numpy as np

def cartesian_product(*args: np.ndarray) -> np.ndarray:
    """
    Produce the cartesian product of arbitrary length vectors.

    Parameters
    ----------
    np.ndarray args
        vector of points of interest in each dimension

    Returns
    -------
    np.ndarray
        the cartesian product of size [m x n] wherein:
            m = prod([len(a) for a in args])
            n = len(args)
    """
    for i, a in enumerate(args):
        assert a.ndim == 1, "arg {:d} is not rank 1".format(i)
    return np.concatenate([np.reshape(xi, [-1, 1]) for xi in np.meshgrid(*args)], axis=1)

对于 TensorFlow:

import tensorflow as tf

def cartesian_product(*args: tf.Tensor) -> tf.Tensor:
    """
    Produce the cartesian product of arbitrary length vectors.

    Parameters
    ----------
    tf.Tensor args
        vector of points of interest in each dimension

    Returns
    -------
    tf.Tensor
        the cartesian product of size [m x n] wherein:
            m = prod([len(a) for a in args])
            n = len(args)
    """
    for i, a in enumerate(args):
        tf.assert_rank(a, 1, message="arg {:d} is not rank 1".format(i))
    return tf.concat([tf.reshape(xi, [-1, 1]) for xi in tf.meshgrid(*args)], axis=1)

解决方案 9:

更一般地,如果你有两个二维 numpy 数组 a 和 b,并且你想要将 a 的每一行与 b 的每一行连接起来(行的笛卡尔积,有点像数据库中的连接),你可以使用这种方法:

import numpy
def join_2d(a, b):
    assert a.dtype == b.dtype
    a_part = numpy.tile(a, (len(b), 1))
    b_part = numpy.repeat(b, len(a), axis=0)
    return numpy.hstack((a_part, b_part))

解决方案 10:

最快的方法是将生成器表达式与 map 函数结合起来:

import numpy
import datetime
a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = (item for sublist in [list(map(lambda x: (x,i),a)) for i in b] for item in sublist)

print (list(foo))

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出(实际上打印的是整个结果列表):

[(0, 0), (1, 0), ...,(998, 199), (999, 199)]
execution time: 1.253567 s

或者使用双生成器表达式:

a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = ((x,y) for x in a for y in b)

print (list(foo))

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出(打印整个列表):

[(0, 0), (1, 0), ...,(998, 199), (999, 199)]
execution time: 1.187415 s

考虑到大部分计算时间都用于打印命令。否则,生成器计算效率相当高。不打印时,计算时间为:

execution time: 0.079208 s

对于生成器表达式+map函数和:

execution time: 0.007093 s

用于双重生成器表达式。

如果你真正想要的是计算每个坐标对的实际乘积,最快的方法是将其作为 numpy 矩阵乘积来解决:

a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = np.dot(np.asmatrix([[i,0] for i in a]), np.asmatrix([[i,0] for i in b]).T)

print (foo)

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出:

 [[     0      0      0 ...,      0      0      0]
 [     0      1      2 ...,    197    198    199]
 [     0      2      4 ...,    394    396    398]
 ..., 
 [     0    997   1994 ..., 196409 197406 198403]
 [     0    998   1996 ..., 196606 197604 198602]
 [     0    999   1998 ..., 196803 197802 198801]]
execution time: 0.003869 s

并且不打印(在这种情况下它不会节省太多,因为实际上只打印出了矩阵的一小部分):

execution time: 0.003083 s

解决方案 11:

这也可以通过使用 itertools.product 方法轻松完成

from itertools import product
import numpy as np

x = np.array([1, 2, 3])
y = np.array([4, 5])
cart_prod = np.array(list(product(*[x, y])),dtype='int32')

结果:array([
[1, 4],

[1, 5],

[2, 4], [

2, 5], [

3, 4],

[3, 5]], dtype=int32)

执行时间:0.000155 秒

解决方案 12:

在特定情况下,您需要对每对执行简单的操作(例如加法),您可以引入一个额外的维度并让广播完成这项工作:

>>> a, b = np.array([1,2,3]), np.array([10,20,30])
>>> a[None,:] + b[:,None]
array([[11, 12, 13],
       [21, 22, 23],
       [31, 32, 33]])

我不确定是否有任何类似的方法可以真正获得这些对。

解决方案 13:

我来晚了,但我遇到了这个问题的一个棘手的变体。假设我想要几个数组的笛卡尔积,但这个笛卡尔积最终比计算机的内存大得多(但是,使用该乘积进行的计算很快,或者至少可以并行化)。

显而易见的解决方案是将这个笛卡尔积分成块,然后一个接一个地处理这些块(以某种“流式”方式)。您可以使用 轻松完成此操作itertools.product,但速度非常慢。此外,这里提出的解决方案(尽管它们很快)都没有给我们这种可能性。我提出的解决方案使用 Numba,比这里提到的“规范”稍快 cartesian_product 。它相当长,因为我尝试在任何地方对其进行优化。

import numba as nb
import numpy as np
from typing import List


@nb.njit(nb.types.Tuple((nb.int32[:, :],
                         nb.int32[:]))(nb.int32[:],
                                       nb.int32[:],
                                       nb.int64, nb.int64))
def cproduct(sizes: np.ndarray, current_tuple: np.ndarray, start_idx: int, end_idx: int):
    """Generates ids tuples from start_id to end_id"""
    assert len(sizes) >= 2
    assert start_idx < end_idx

    tuples = np.zeros((end_idx - start_idx, len(sizes)), dtype=np.int32)
    tuple_idx = 0
    # stores the current combination
    current_tuple = current_tuple.copy()
    while tuple_idx < end_idx - start_idx:
        tuples[tuple_idx] = current_tuple
        current_tuple[0] += 1
        # using a condition here instead of including this in the inner loop
        # to gain a bit of speed: this is going to be tested each iteration,
        # and starting a loop to have it end right away is a bit silly
        if current_tuple[0] == sizes[0]:
            # the reset to 0 and subsequent increment amount to carrying
            # the number to the higher "power"
            current_tuple[0] = 0
            current_tuple[1] += 1
            for i in range(1, len(sizes) - 1):
                if current_tuple[i] == sizes[i]:
                    # same as before, but in a loop, since this is going
                    # to get called less often
                    current_tuple[i + 1] += 1
                    current_tuple[i] = 0
                else:
                    break
        tuple_idx += 1
    return tuples, current_tuple


def chunked_cartesian_product_ids(sizes: List[int], chunk_size: int):
    """Just generates chunks of the cartesian product of the ids of each
    input arrays (thus, we just need their sizes here, not the actual arrays)"""
    prod = np.prod(sizes)

    # putting the largest number at the front to more efficiently make use
    # of the cproduct numba function
    sizes = np.array(sizes, dtype=np.int32)
    sorted_idx = np.argsort(sizes)[::-1]
    sizes = sizes[sorted_idx]
    if chunk_size > prod:
        chunk_bounds = (np.array([0, prod])).astype(np.int64)
    else:
        num_chunks = np.maximum(np.ceil(prod / chunk_size), 2).astype(np.int32)
        chunk_bounds = (np.arange(num_chunks + 1) * chunk_size).astype(np.int64)
        chunk_bounds[-1] = prod
    current_tuple = np.zeros(len(sizes), dtype=np.int32)
    for start_idx, end_idx in zip(chunk_bounds[:-1], chunk_bounds[1:]):
        tuples, current_tuple = cproduct(sizes, current_tuple, start_idx, end_idx)
        # re-arrange columns to match the original order of the sizes list
        # before yielding
        yield tuples[:, np.argsort(sorted_idx)]


def chunked_cartesian_product(*arrays, chunk_size=2 ** 25):
    """Returns chunks of the full cartesian product, with arrays of shape
    (chunk_size, n_arrays). The last chunk will obviously have the size of the
    remainder"""
    array_lengths = [len(array) for array in arrays]
    for array_ids_chunk in chunked_cartesian_product_ids(array_lengths, chunk_size):
        slices_lists = [arrays[i][array_ids_chunk[:, i]] for i in range(len(arrays))]
        yield np.vstack(slices_lists).swapaxes(0,1)


def cartesian_product(*arrays):
    """Actual cartesian product, not chunked, still fast"""
    total_prod = np.prod([len(array) for array in arrays])
    return next(chunked_cartesian_product(*arrays, total_prod))


a = np.arange(0, 3)
b = np.arange(8, 10)
c = np.arange(13, 16)
for cartesian_tuples in chunked_cartesian_product(*[a, b, c], chunk_size=5):
    print(cartesian_tuples)

这将以 5 个 3 元组为块输出笛卡尔积:

[[ 0  8 13]
 [ 0  8 14]
 [ 0  8 15]
 [ 1  8 13]
 [ 1  8 14]]
[[ 1  8 15]
 [ 2  8 13]
 [ 2  8 14]
 [ 2  8 15]
 [ 0  9 13]]
[[ 0  9 14]
 [ 0  9 15]
 [ 1  9 13]
 [ 1  9 14]
 [ 1  9 15]]
[[ 2  9 13]
 [ 2  9 14]
 [ 2  9 15]]

如果您愿意理解这里所做的事情,那么该njitted函数背后的直觉就是在一个奇怪的数字基数中枚举每个“数字”,其元素由输入数组的大小组成(而不是常规二进制、十进制或十六进制基数中的相同数字)。

显然,这种解决方案对于大型产品来说很有吸引力。对于小型产品来说,开销可能会有点大。

注意:由于 numba 仍处于密集开发阶段,我使用 numba 0.50 和 python 3.6 来运行它。

解决方案 14:

请注意,如果您需要整个范围的笛卡尔积,则可以使用以下函数:

numpy.indices((100, 200)).reshape(2, -1).T

或者

numpy.mgrid[0:100, 0:200].reshape(2, -1).T

都将回归

array([[  0,   0],
       [  0,   1],
       [  0,   2],
       ...,
       [ 99, 197],
       [ 99, 198],
       [ 99, 199]])

numpy.indices范围仅从 0 到每个指定的维度,但速度更快。这两个函数还可以在两个以上的维度上进行操作,例如:

numpy.indices((100, 200, 300)).reshape(3, -1).T

numpy.mgrid[0:100, 0:200, 0:300].reshape(3, -1).T

解决方案 15:

还有一个:

>>>x1, y1 = np.meshgrid(x, y)
>>>np.c_[x1.ravel(), y1.ravel()]
array([[1, 4],
       [2, 4],
       [3, 4],
       [1, 5],
       [2, 5],
       [3, 5]])

解决方案 16:

受到Ashkan 的回答的启发,您还可以尝试以下操作。

>>> x, y = np.meshgrid(x, y)
>>> np.concatenate([x.flatten().reshape(-1,1), y.flatten().reshape(-1,1)], axis=1)

这将为您提供所需的笛卡尔积!

解决方案 17:

numpy.tile这是已接受答案的通用版本(使用和函数的多个数组的笛卡尔积numpy.repeat)。

from functors import reduce
from operator import mul

def cartesian_product(arrays):
    return np.vstack(
        np.tile(
            np.repeat(arrays[j], reduce(mul, map(len, arrays[j+1:]), 1)),
            reduce(mul, map(len, arrays[:j]), 1),
        )
        for j in range(len(arrays))
    ).T

解决方案 18:

如果你愿意使用 PyTorch,我认为它效率很高:

>>> import torch

>>> torch.cartesian_prod(torch.as_tensor(x), torch.as_tensor(y))
tensor([[1, 4],
        [1, 5],
        [2, 4],
        [2, 5],
        [3, 4],
        [3, 5]])

你可以很容易地得到一个numpy数组:

>>> torch.cartesian_prod(torch.as_tensor(x), torch.as_tensor(y)).numpy()
array([[1, 4],
       [1, 5],
       [2, 4],
       [2, 5],
       [3, 4],
       [3, 5]])

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用