使用 Spark 将列转置为行

2025-02-11 09:51:00
admin
原创
63
摘要:问题描述:我正在尝试将表格中的某些列转置为行。我使用的是 Python 和 Spark 1.5.0。这是我的初始表格:+-----+-----+-----+-------+ | A |col_1|col_2|col_...| +-----+-------------------+ | 1 | 0.0...

问题描述:

我正在尝试将表格中的某些列转置为行。我使用的是 Python 和 Spark 1.5.0。这是我的初始表格:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

我想要这样的东西:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

有人知道我该怎么做吗?谢谢你的帮助。


解决方案 1:

Spark >= 3.4

您可以使用内置melt方法。使用 Python:

df.melt(
    ids=["A"], values=["col_1", "col_2"],
    variableColumnName="key", valueColumnName="val"
)

使用 Scala

df.melt(Array($"A"), Array($"col_1", $"col_2"), "key", "val")

火花 < 3.4

使用基本的 Spark SQL 函数来做这件事相对简单。

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])
   

Scala

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))
  
  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))

解决方案 2:

解决的一种方法是pyspark sql用函数create_mapexplode

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \n                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()

解决方案 3:

您可以使用堆栈功能:

例如:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")

在哪里:

  • 2 是要堆叠的列数(col_1 和 col_2)

  • 'col_1' 是键的字符串

  • col_1 是从中获取值的列

如果您有几列,您可以构建整个堆栈字符串,迭代列名并将其传递给selectExpr

解决方案 4:

Spark 本地线性代数库目前非常薄弱:它们不包含上述的基本操作。

有一个 JIRA 可以针对 Spark 2.1 修复此问题 - 但今天它无法帮助您。

需要考虑的一点是:执行转置可能需要完全打乱数据。

现在您需要直接编写 RDD 代码。我transpose用 scala 编写过 - 但没有用 python 编写过。以下是scala版本:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

因此您可以将其转换为 Python 以供使用。我目前没有足够的带宽来编写/测试它:如果您无法进行转换,请告诉我。

至少——以下内容很容易转换为python

  • zipWithIndex--> enumerate()(python 等效 - 归功于@zero323)

  • map-->[someOperation(x) for x in ..]

  • groupBy-->itertools.groupBy()

flatten下面是没有 Python 等效项 的实现:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

因此您应该能够将它们放在一起以找到解决方案。

解决方案 5:

使用 flatmap。类似下面的方法应该有效

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))

解决方案 6:

我采用了@javadba编写的Scala答案,并创建了一个Python版本,用于转置a中的所有列DataFrame。 这可能与OP要求的有点不同...

from itertools import chain
from pyspark.sql import DataFrame


def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]


def transpose(X):
    """Transpose a PySpark DataFrame.

    Parameters
    ----------
    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    """
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

例如:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|
+---+---+---+

解决方案 7:

一个非常方便的实现方法:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)

解决方案 8:

为了转置 Dataframe pySpark,我使用pivot了临时创建的列,并在操作结束时删除了该列。

假设我们有这样的一个表。我们想要做的是找到每个listed_days_bin值对应的所有用户。

+------------------+-------------+
|  listed_days_bin | users_count | 
+------------------+-------------+
|1                 |            5| 
|0                 |            2|
|0                 |            1| 
|1                 |            3|  
|1                 |            4| 
|2                 |            5| 
|2                 |            7|  
|2                 |            2|  
|1                 |            1|
+------------------+-------------+

创建新的临时列 -'pvt_value'对其进行聚合并透视结果

import pyspark.sql.functions as F


agg_df = df.withColumn('pvt_value', lit(1))\n        .groupby('pvt_value')\n        .pivot('listed_days_bin')\n        .agg(F.sum('users_count')).drop('pvt_value')

新的 Dataframe 应如下所示:

+----+---+---+
|  0 | 1 | 2 | # Columns 
+----+---+---+
|   3| 13| 14| # Users over the bin
+----+---+---+

解决方案 9:

我发现 PySpark 转置太复杂了,所以我只需将我的数据框转换为 Pandas 并使用 transpose() 方法,然后在需要时将数据框转换回 PySpark。

dfOutput = spark.createDataFrame(dfPySpark.toPandas().transpose())
dfOutput.display()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用