Kaip skaityti ir rašyti lentelės duomenis PySpark

Kaip Skaityti Ir Rasyti Lenteles Duomenis Pyspark



Duomenų apdorojimas PySpark yra greitesnis, jei duomenys įkeliami lentelės pavidalu. Naudojant tai naudojant SQL išraiškas, apdorojimas bus greitas. Taigi, prieš siunčiant apdoroti PySpark DataFrame/RDD konvertuoti į lentelę yra geresnis būdas. Šiandien pamatysime, kaip nuskaityti lentelės duomenis į PySpark DataFrame, įrašyti PySpark DataFrame į lentelę ir į esamą lentelę įterpti naują DataFrame naudojant integruotas funkcijas. Eime!

Pyspark.sql.DataFrameWriter.saveAsTable()

Pirmiausia pamatysime, kaip į lentelę įrašyti esamą PySpark DataFrame naudojant funkciją write.saveAsTable(). Norint įrašyti DataFrame į lentelę, reikia lentelės pavadinimo ir kitų pasirenkamų parametrų, pvz., režimų, partionBy ir kt. Jis saugomas kaip parketo dildė.

Sintaksė:







dataframe_obj.write.saveAsTable(kelias/lentelės_pavadinimas,režimas,skirstymas pagal,...)
  1. Lentelės_pavadinimas yra lentelės, sukurtos iš dataframe_obj, pavadinimas.
  2. Lentelės duomenis galime pridėti/perrašyti naudodami mode parametrą.
  3. PartitionBy paima vieną / kelis stulpelius, kad sukurtų skaidinius pagal šių pateiktų stulpelių vertes.

1 pavyzdys:

Sukurkite PySpark DataFrame su 5 eilutėmis ir 4 stulpeliais. Įrašykite šį duomenų rėmelį į lentelę pavadinimu „Agri_Table1“.



importuoti pyspark

iš pyspark.sql importuoti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# ūkininkavimo duomenys su 5 eilutėmis ir 5 stulpeliais

agri =[{ 'Dirvožemio_tipas' : 'juoda' , „Drėkinimo_prieinamumas“ : 'ne' , 'Acres' : 2500 , „Dirvožemio būklė“ : 'sausas' ,
'Šalis' : 'JAV' },

{ 'Dirvožemio_tipas' : 'juoda' , „Drėkinimo_prieinamumas“ : 'taip' , 'Acres' : 3500 , „Dirvožemio būklė“ : 'šlapias' ,
'Šalis' : 'Indija' },

{ 'Dirvožemio_tipas' : 'raudona' , „Drėkinimo_prieinamumas“ : 'taip' , 'Acres' : 210 , „Dirvožemio būklė“ : 'sausas' ,
'Šalis' : „JK“ },

{ 'Dirvožemio_tipas' : 'Kita' , „Drėkinimo_prieinamumas“ : 'ne' , 'Acres' : 1000 , „Dirvožemio būklė“ : 'šlapias' ,
'Šalis' : 'JAV' },

{ 'Dirvožemio_tipas' : 'Smėlis' , „Drėkinimo_prieinamumas“ : 'ne' , 'Acres' : 500 , „Dirvožemio būklė“ : 'sausas' ,
'Šalis' : 'Indija' }]



# sukurkite duomenų rėmelį iš aukščiau pateiktų duomenų

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Įrašykite aukščiau pateiktą DataFrame į lentelę.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Išvestis:







Matome, kad vienas parketo failas sukurtas naudojant ankstesnius PySpark duomenis.



2 pavyzdys:

Apsvarstykite ankstesnį „DataFrame“ ir įrašykite „Agri_Table2“ į lentelę, suskirstydami įrašus pagal stulpelio „Šalis“ reikšmes.

# Įrašykite aukščiau pateiktą DataFrame į lentelę su parametru partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Šalis' ])

Išvestis:

Stulpelyje „Šalis“ yra trys unikalios reikšmės – „India“, „UK“ ir „USA“. Taigi sukuriami trys skirsniai. Kiekvienoje pertvaroje yra parketo failai.

Pyspark.sql.DataFrameReader.table()

Įkelkime lentelę į PySpark DataFrame naudodami spark.read.table() funkciją. Tam reikia tik vieno parametro, kuris yra kelio / lentelės pavadinimas. Tai tiesiogiai įkelia lentelę į PySpark DataFrame, o visos PySpark DataFrame taikomos SQL funkcijos taip pat gali būti taikomos šiame įkeltame duomenų rėmelyje.

Sintaksė:

spark_app.read.table(path/'Lentelės_pavadinimas')

Šiame scenarijuje naudojame ankstesnę lentelę, kuri buvo sukurta iš PySpark DataFrame. Įsitikinkite, kad savo aplinkoje turite įdiegti ankstesnius scenarijaus kodo fragmentus.

Pavyzdys:

Įkelkite lentelę „Agri_Table1“ į „DataFrame“, pavadintą „loaded_data“.

loaded_data = linuxhint_spark_app.read.table( „Agri_Table1“ )

loaded_data.show()

Išvestis:

Matome, kad lentelė įkelta į PySpark DataFrame.

SQL užklausų vykdymas

Dabar mes vykdome kai kurias SQL užklausas įkeltame DataFrame naudodami spark.sql() funkciją.

# Naudokite komandą SELECT, kad būtų rodomi visi aukščiau pateiktos lentelės stulpeliai.

linuxhint_spark_app.sql( „SELECT * from Agri_Table1“ ).Rodyti()

# WHERE sąlyga

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).Rodyti()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).Rodyti()

Išvestis:

  1. Pirmoji užklausa rodo visus stulpelius ir įrašus iš DataFrame.
  2. Antroji užklausa rodo įrašus pagal stulpelį „Dirvožemio_būsena“. Yra tik trys įrašai su „Dry“ elementu.
  3. Paskutinė užklausa pateikia du įrašus su „Acres“, kurie yra didesni nei 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Naudodami funkciją insertInto(), galime pridėti DataFrame prie esamos lentelės. Šią funkciją galime naudoti kartu su selectExpr(), norėdami apibrėžti stulpelių pavadinimus ir įterpti jį į lentelę. Ši funkcija taip pat naudoja lentelės pavadinimą kaip parametrą.

Sintaksė:

DataFrame_obj.write.insertInto('Lentelės_pavadinimas')

Šiame scenarijuje naudojame ankstesnę lentelę, kuri buvo sukurta iš PySpark DataFrame. Įsitikinkite, kad savo aplinkoje turite įdiegti ankstesnius scenarijaus kodo fragmentus.

Pavyzdys:

Sukurkite naują DataFrame su dviem įrašais ir įterpkite juos į lentelę „Agri_Table1“.

importuoti pyspark

iš pyspark.sql importuoti SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# ūkininkavimo duomenys su 2 eilutėmis

agri =[{ 'Dirvožemio_tipas' : 'Smėlis' , „Drėkinimo_prieinamumas“ : 'ne' , 'Acres' : 2500 , „Dirvožemio būklė“ : 'sausas' ,
'Šalis' : 'JAV' },

{ 'Dirvožemio_tipas' : 'Smėlis' , „Drėkinimo_prieinamumas“ : 'ne' , 'Acres' : 1200 , „Dirvožemio būklė“ : 'šlapias' ,
'Šalis' : 'Japonija' }]

# sukurkite duomenų rėmelį iš aukščiau pateiktų duomenų

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Akrai' , 'Šalis' , 'Drėkinimo_prieinamumas' , 'Dirvožemio_tipas' ,
'Dirvožemio būklė' ).write.insertInto( 'Agri_Table1' )

# Rodyti galutinę Agri_Table1

linuxhint_spark_app.sql( „SELECT * from Agri_Table1“ ).Rodyti()

Išvestis:

Dabar bendras DataFrame eilučių skaičius yra 7.

Išvada

Dabar suprantate, kaip įrašyti PySpark DataFrame į lentelę naudojant funkciją write.saveAsTable(). Jis paima lentelės pavadinimą ir kitus pasirenkamus parametrus. Tada įkėlėme šią lentelę į PySpark DataFrame naudodami spark.read.table() funkciją. Tam reikia tik vieno parametro, kuris yra kelio / lentelės pavadinimas. Jei norite pridėti naują DataFrame prie esamos lentelės, naudokite funkciją insertInto().