PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformuoti PySpark DataFrame galima naudojant pandas_udf() funkciją. Tai vartotojo apibrėžta funkcija, kuri taikoma PySpark DataFrame su rodykle. Vektorines operacijas galime atlikti naudodami pandas_udf(). Jis gali būti įgyvendintas perduodant šią dekoratoriaus funkciją. Pasinerkime į šį vadovą, kad sužinotume sintaksę, parametrus ir įvairius pavyzdžius.

Turinio tema:

Jei norite sužinoti apie PySpark DataFrame ir modulio diegimą, atlikite tai straipsnis .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () yra PySpark modulyje sql.functions, kurį galima importuoti naudojant raktinį žodį „iš“. Jis naudojamas vektorizuotoms operacijoms atlikti mūsų PySpark DataFrame. Ši funkcija įgyvendinama kaip dekoratorius, perduodant tris parametrus. Po to mes galime sukurti vartotojo apibrėžtą funkciją, kuri grąžina duomenis vektoriniu formatu (kaip mes naudojame seriją / NumPy), naudodami rodyklę. Naudodami šią funkciją galime grąžinti rezultatą.



Struktūra ir sintaksė:



Pirmiausia pažvelkime į šios funkcijos struktūrą ir sintaksę:

@pandas_udf(duomenų tipas)
def funkcijos_pavadinimas(operacija) -> convert_format:
grąžinimo pareiškimas

Čia funkcijos_pavadinimas yra mūsų apibrėžtos funkcijos pavadinimas. Duomenų tipas nurodo duomenų tipą, kurį grąžina ši funkcija. Rezultatą galime grąžinti naudodami raktinį žodį „grįžti“. Visos operacijos atliekamos funkcijos viduje su rodyklės priskyrimu.





Pandas_udf (funkcija ir grąžinimo tipas)

  1. Pirmasis parametras yra vartotojo nustatyta funkcija, kuri jam perduodama.
  2. Antrasis parametras naudojamas nurodyti grąžinimo duomenų tipą iš funkcijos.

Duomenys:

Šiame visame vadove demonstravimui naudojame tik vieną PySpark DataFrame. Visos vartotojo nustatytos funkcijos, kurias mes apibrėžiame, taikomos šiame PySpark DataFrame. Įsitikinkite, kad šį DataFrame sukūrėte savo aplinkoje pirmiausia įdiegę PySpark.



importuoti pyspark

iš pyspark.sql importuoti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

iš pyspark.sql.functions importo pandas_udf

iš pyspark.sql.types importo *

importuoti pandas kaip pandas

# daržovių detalės

daržovė =[{ 'tipas' : 'daržovių' , 'vardas' : 'pomidoras' , 'locate_country' : 'JAV' , 'kiekis' : 800 },

{ 'tipas' : 'vaisiai' , 'vardas' : 'bananas' , 'locate_country' : 'KINIJA' , 'kiekis' : dvidešimt },

{ 'tipas' : 'daržovių' , 'vardas' : 'pomidoras' , 'locate_country' : 'JAV' , 'kiekis' : 800 },

{ 'tipas' : 'daržovių' , 'vardas' : 'Mango' , 'locate_country' : 'JAPANIJA' , 'kiekis' : 0 },

{ 'tipas' : 'vaisiai' , 'vardas' : 'citrina' , 'locate_country' : 'INDIJA' , 'kiekis' : 1700 m },

{ 'tipas' : 'daržovių' , 'vardas' : 'pomidoras' , 'locate_country' : 'JAV' , 'kiekis' : 1200 },

{ 'tipas' : 'daržovių' , 'vardas' : 'Mango' , 'locate_country' : 'JAPANIJA' , 'kiekis' : 0 },

{ 'tipas' : 'vaisiai' , 'vardas' : 'citrina' , 'locate_country' : 'INDIJA' , 'kiekis' : 0 }

]

# sukurkite rinkos duomenų rėmelį iš aukščiau pateiktų duomenų

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Išvestis:

Čia mes sukuriame šį duomenų rėmelį su 4 stulpeliais ir 8 eilutėmis. Dabar mes naudojame pandas_udf(), kad sukurtume vartotojo apibrėžtas funkcijas ir pritaikytume jas šiems stulpeliams.

Pandas_udf() su skirtingais duomenų tipais

Šiame scenarijuje mes sukuriame kai kurias vartotojo nustatytas funkcijas naudodami pandas_udf() ir pritaikome jas stulpeliams ir rodome rezultatus naudodami select() metodą. Kiekvienu atveju mes naudojame pandas.Series, kai atliekame vektorines operacijas. Stulpelių reikšmės laikomos vienmačiu masyvu, o operacija taikoma stulpeliui. Pačiame dekoratoriuje nurodome funkcijos grąžinimo tipą.

1 pavyzdys: Pandas_udf() su eilutės tipu

Čia sukuriame dvi vartotojo apibrėžtas funkcijas su eilutės grąžinimo tipu, kad konvertuotume eilutės tipo stulpelio reikšmes į didžiąsias ir mažąsias raides. Galiausiai šias funkcijas taikome stulpeliams „type“ ir „locate_country“.

# Konvertuokite tipo stulpelį į didžiąsias raides naudodami pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

grąžinti i.str.upper()

# Konvertuokite stulpelį locate_country į mažąsias raides naudodami pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

grąžinti i.str.lower()

# Rodyti stulpelius naudodami select()

market_df.select( 'tipas' ,type_upper_case( 'tipas' ), 'locate_country' ,
country_mažosios raidės( 'locate_country' )).Rodyti()

Išvestis:

Paaiškinimas:

Funkcija StringType() pasiekiama pyspark.sql.types modulyje. Mes jau importavome šį modulį kurdami PySpark DataFrame.

  1. Pirma, UDF (vartotojo apibrėžta funkcija) grąžina eilutes didžiosiomis raidėmis, naudodama str.upper() funkciją. Str.upper() yra serijos duomenų struktūroje (kadangi mes konvertuojame į serijas su rodykle funkcijos viduje), kuri paverčia nurodytą eilutę didžiosiomis raidėmis. Galiausiai ši funkcija taikoma stulpeliui „tipas“, kuris nurodytas pasirinkimo () metodo viduje. Anksčiau visos eilutės tipo stulpelyje yra mažosiomis raidėmis. Dabar jie pakeisti į didžiąsias raides.
  2. Antra, UDF grąžina eilutes didžiosiomis raidėmis, naudodamas str.lower() funkciją. Str.lower() yra serijos duomenų struktūroje, kuri paverčia nurodytą eilutę į mažąsias raides. Galiausiai ši funkcija taikoma stulpeliui „tipas“, kuris nurodytas pasirinkimo () metodo viduje. Anksčiau visos eilutės tipo stulpelyje yra didžiosiomis raidėmis. Dabar jos pakeistos į mažąsias raides.

2 pavyzdys: Pandas_udf() su sveikojo skaičiaus tipu

Sukurkime UDF, kuris konvertuoja PySpark DataFrame sveikojo skaičiaus stulpelį į Pandas seriją ir pridėkite 100 prie kiekvienos reikšmės. Perduokite stulpelį „kiekis“ šiai funkcijai pasirinkimo () metodo viduje.

# Pridėkite 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

grąžinti i+ 100

# Perduokite kiekio stulpelį į aukščiau pateiktą funkciją ir parodykite.

market_df.select( 'kiekis' ,add_100( 'kiekis' )).Rodyti()

Išvestis:

Paaiškinimas:

UDF viduje pakartojame visas reikšmes ir konvertuojame jas į serijas. Po to prie kiekvienos serijos vertės pridedame 100. Galiausiai šiai funkcijai perduodame stulpelį „kiekis“ ir matome, kad prie visų reikšmių pridedama 100.

Pandas_udf() su skirtingais duomenų tipais naudojant Groupby() ir Agg()

Pažvelkime į pavyzdžius, kaip perduoti UDF į sukauptus stulpelius. Čia stulpelių reikšmės pirmiausia sugrupuojamos naudojant groupby() funkciją, o agregavimas atliekamas naudojant agg() funkciją. Mes perduodame savo UDF šioje agregacinėje funkcijoje.

Sintaksė:

pyspark_dataframe_object.groupby( 'grupavimo_stulpelis' ).agg(UDF
(pyspark_dataframe_object[ 'stulpelis' ]))

Čia pirmiausia sugrupuojamos reikšmės grupavimo stulpelyje. Tada apibendrinami kiekvieni sugrupuoti duomenys, atsižvelgiant į mūsų UDF.

1 pavyzdys: Pandas_udf() su bendru vidurkiu()

Čia sukuriame vartotojo apibrėžtą funkciją su grąžinimo tipo float. Funkcijos viduje apskaičiuojame vidurkį naudodami funkciją mean(). Šis UDF perduodamas į stulpelį „kiekis“, kad būtų gautas vidutinis kiekvieno tipo kiekis.

# grąžina vidurkį / vidurkį

@pandas_udf( 'plūdė' )

def medium_function(i: panda.Series) -> float:

grąžinti i.mean ()

# Perduokite kiekio stulpelį funkcijai sugrupuodami tipo stulpelį.

market_df.groupby( 'tipas' ).agg(average_function(market_df[ 'kiekis' ])).Rodyti()

Išvestis:

Grupuojame pagal elementus stulpelyje „tipas“. Susidaro dvi grupės – „vaisių“ ir „daržovių“. Kiekvienos grupės vidurkis apskaičiuojamas ir grąžinamas.

2 pavyzdys: Pandas_udf() su agregate Max() ir Min()

Čia sukuriame dvi vartotojo apibrėžtas funkcijas su sveikojo skaičiaus (int) grąžinimo tipu. Pirmasis UDF grąžina mažiausią reikšmę, o antrasis UDF – didžiausią.

# pandas_udf, kurie grąžina mažiausią reikšmę

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

grąžinti i.min()

# pandas_udf, kurie grąžina didžiausią reikšmę

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

grąžinti i.max()

# Perduokite kiekio stulpelį į min_ pandas_udf sugrupuodami locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'kiekis' ])).Rodyti()

# Perduokite kiekio stulpelį į max_ pandas_udf sugrupuodami locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'kiekis' ])).Rodyti()

Išvestis:

Norėdami grąžinti minimalias ir didžiausias reikšmes, UDF grąžinimo tipe naudojame min() ir max() funkcijas. Dabar sugrupuojame duomenis stulpelyje „locate_country“. Sukuriamos keturios grupės („KINIJA“, „INDIJA“, „JAPANIJA“, „JAV“). Kiekvienai grupei grąžiname maksimalų kiekį. Panašiai grąžiname minimalų kiekį.

Išvada

Iš esmės pandas_udf () naudojamas vektorizuotoms operacijoms atlikti mūsų PySpark DataFrame. Mes matėme, kaip sukurti pandas_udf() ir pritaikyti jį PySpark DataFrame. Norėdami geriau suprasti, aptarėme skirtingus pavyzdžius, atsižvelgdami į visus duomenų tipus (eilutė, slankioji ir sveikasis skaičius). Galima naudoti pandas_udf() su groupby() per funkciją agg().