Domanda Aggiornamento di una colonna di dataframe in spark


Guardando la nuova scintilla dataframe api, non è chiaro se sia possibile modificare le colonne del dataframe.

Come farei per cambiare un valore in fila x colonna y di un dataframe?

In pandas questo sarebbe df.ix[x,y] = new_value

Modifica: consolidando ciò che è stato detto di seguito, non è possibile modificare il dataframe esistente in quanto è immutabile, ma è possibile restituire un nuovo dataframe con le modifiche desiderate.

Se vuoi semplicemente sostituire un valore in una colonna in base a una condizione, ad esempio np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Se vuoi eseguire qualche operazione su una colonna e creare una nuova colonna che viene aggiunta al dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Se vuoi che la nuova colonna abbia lo stesso nome della vecchia colonna, puoi aggiungere il passaggio aggiuntivo:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')

44
2018-03-17 21:19


origine


risposte:


Sebbene non sia possibile modificare una colonna in quanto tale, è possibile operare su una colonna e restituire un nuovo DataFrame che rifletta tale modifica. Per quello dovresti creare prima un UserDefinedFunction implementare l'operazione da applicare e quindi applicare selettivamente quella funzione solo alla colonna di destinazione. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df ora ha lo stesso schema di old_df (supponendo che old_df.target_column era di tipo StringType pure) ma tutti i valori nella colonna target_column sarà new_value.


51
2018-03-25 13:35



Di solito quando si aggiorna una colonna, vogliamo mappare un vecchio valore ad un nuovo valore. Ecco un modo per farlo in Pyspark senza UDF:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

29
2017-12-21 22:23



DataFrames sono basati su RDD. Gli RDD sono strutture immutabili e non consentono di aggiornare gli elementi sul posto. Per modificare i valori, è necessario creare un nuovo DataFrame trasformando quello originale utilizzando le operazioni DSL o RDD simili a SQL come map.

Uno scivolo altamente raccomandato: Presentazione di DataFrames in Spark per la scienza dei dati su larga scala.


12
2018-03-17 21:51



Proprio come maasg afferma che è possibile creare un nuovo DataFrame dal risultato di una mappa applicata al vecchio DataFrame. Un esempio per un dato DataFrame df con due file:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Notare che se i tipi delle colonne cambiano, è necessario dargli uno schema corretto invece di df.schema. Scopri l'API di org.apache.spark.sql.Row per i metodi disponibili: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Aggiorna] O usando UDF in Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

e se il nome della colonna deve rimanere lo stesso puoi rinominarlo di nuovo:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

11
2017-11-08 21:19