PySpark Read.Parquet()

Pyspark Read Parquet



PySpark funkcija write.parquet() įrašo DataFrame į parketo failą, o read.parquet() nuskaito parketo failą į PySpark DataFrame arba bet kurį kitą duomenų šaltinį. Norėdami greitai ir efektyviai apdoroti „Apache Spark“ stulpelius, turime suspausti duomenis. Duomenų suspaudimas taupo mūsų atmintį ir visi stulpeliai paverčiami plokščiu lygiu. Tai reiškia, kad yra plokščio stulpelio lygio saugykla. Failas, kuriame jie saugomi, yra žinomas kaip PARQUET failas.

Šiame vadove daugiausia dėmesio skirsime parketo failo skaitymui / įkėlimui į PySpark DataFrame/SQL, naudodami funkciją read.parquet(), kuri pasiekiama pyspark.sql.DataFrameReader klasėje.

Turinio tema:







Gaukite parketo failą



Perskaitykite parketo failą į PySpark DataFrame



Perskaitykite parketo failą į PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Ši funkcija naudojama nuskaityti parketo failą ir įkelti jį į PySpark DataFrame. Jis paima parketo failo kelią / failo pavadinimą. Galime tiesiog naudoti funkciją read.parquet(), nes tai yra bendroji funkcija.

Sintaksė:



Pažiūrėkime read.parquet() sintaksę:

spark_app.read.parquet(failo_pavadinimas.parquet/path)

Pirmiausia įdiekite PySpark modulį naudodami pip komandą:

pip įdiegti pyspark

Gaukite parketo failą

Norint nuskaityti parketo bylą, reikia duomenų, kuriuose iš tų duomenų sugeneruota parketo byla. Šioje dalyje pamatysime, kaip sugeneruoti parketo failą iš PySpark DataFrame.

Sukurkime PySpark DataFrame su 5 įrašais ir įrašykime tai į parketo failą 'industry_parquet'.

importuoti pyspark

iš pyspark.sql importuoti SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# sukurti duomenų rėmelį, kuriame saugoma pramonės šakos informacija

Industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Žemdirbystė' ,Plotas= 'JAV' ,
Įvertinimas = 'karšta' ,Iš viso_darbuotojų= 100 ),

Eilutė(tipas= 'Žemdirbystė' ,Plotas= 'Indija' ,Įvertinimas= 'karšta' ,Iš viso_darbuotojų= 200 ),

Eilutė(tipas= 'plėtra' ,Plotas= 'JAV' ,Įvertinimas= 'šiltas' ,Iš viso_darbuotojų= 100 ),

Eilutė(tipas= 'Išsilavinimas' ,Plotas= 'JAV' ,Įvertinimas= 'Saunus' ,Iš viso_darbuotojų= 400 ),

Eilutė(tipas= 'Išsilavinimas' ,Plotas= 'JAV' ,Įvertinimas= 'šiltas' ,Iš viso_darbuotojų= dvidešimt )

])

# Faktinis duomenų rėmelis

Industry_df.show()

# Įrašykite industrial_df į parketo failą

Industry_df.coalesce( 1 ).write.parquet( 'pramonės_parketas' )

Išvestis:

Tai DataFrame, kuriame yra 5 įrašai.

Sukuriamas parketo failas ankstesniam DataFrame. Čia mūsų failo pavadinimas su plėtiniu yra „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Mes naudojame šį failą visoje pamokoje.

Perskaitykite parketo failą į PySpark DataFrame

Turime parketo dildę. Perskaitykime šį failą naudodami read.parquet() funkciją ir įkelkime jį į PySpark DataFrame.

importuoti pyspark

iš pyspark.sql importuoti SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# Nuskaitykite parketo failą į objektą dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Rodyti dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Išvestis:

Rodome DataFrame naudodami show() metodą, kuris buvo sukurtas iš parketo failo.

SQL užklausos su parketo failu

Įkėlus į DataFrame, galima sukurti SQL lenteles ir rodyti duomenis, esančius DataFrame. Turime sukurti LAIKINĄ VAIZDĄ ir naudoti SQL komandas, kad grąžintume įrašus iš „DataFrame“, sukurto iš parketo failo.

1 pavyzdys:

Sukurkite laikiną rodinį pavadinimu „Sektoriai“ ir naudokite komandą SELECT, kad būtų rodomi „DataFrame“ įrašai. Galite kreiptis į tai pamoka paaiškina, kaip Spark – SQL sukurti VIEW.

importuoti pyspark

iš pyspark.sql importuoti SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# Nuskaitykite parketo failą į objektą dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Sukurkite vaizdą iš aukščiau esančio parketo failo pavadinimu - 'Sektoriai'

dataframe_from_parquet.createOrReplaceTempView( 'Sektoriai' )

# Užklausa, kad būtų rodomi visi sektorių įrašai

linuxhint_spark_app.sql( 'pasirinkite * iš sektorių' ).Rodyti()

Išvestis:

2 pavyzdys:

Naudodami ankstesnį VIEW, parašykite SQL užklausą:

  1. Norėdami parodyti visus įrašus iš sektorių, priklausančių „Indijai“.
  2. Rodyti visus įrašus iš sektorių, kurių darbuotojas yra didesnis nei 100.
# Užklausa, kad būtų rodomi visi „Indijai“ priklausančių sektorių įrašai.

linuxhint_spark_app.sql( 'select * iš sektorių, kur sritis = 'Indija' ).Rodyti()

# Užklausa, kad būtų rodomi visi įrašai iš sektorių, kurių darbuotojų skaičius didesnis nei 100

linuxhint_spark_app.sql( 'pasirinkite * iš sektorių, kuriuose iš viso_darbuotojų > 100' ).Rodyti()

Išvestis:

Yra tik vienas įrašas, kurio sritis yra „Indija“, ir du įrašai su darbuotojais, kurie yra didesni nei 100.

Perskaitykite parketo failą į PySpark SQL

Pirmiausia turime sukurti VIEW naudodami komandą CREATE. Naudodami raktinį žodį „kelio“ SQL užklausoje, galime nuskaityti parketo failą į „Spark SQL“. Po kelio turime nurodyti failo pavadinimą / vietą.

Sintaksė:

spark_app.sql( 'SUKURTI LAIKINĄJĮ RODINĮ rodinio_pavadinimas NAUDOJANT parketo PARINKČIAS (kelias ' failo_vardas.parketas ')' )

1 pavyzdys:

Sukurkite laikiną rodinį pavadinimu „Sector2“ ir perskaitykite jame parketo failą. Naudodami funkciją sql() parašykite pasirinkimo užklausą, kad būtų rodomi visi rodinyje esantys įrašai.

importuoti pyspark

iš pyspark.sql importuoti SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Linux patarimas“ ).getOrCreate()

# Perskaitykite parketo failą į Spark-SQL

linuxhint_spark_app.sql( 'SUKURTI LAIKINĄ VAIZDĮ Sector2 NAUDOJANT parketo OPTIONS (kelias' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Užklausa, kad būtų rodomi visi 2 sektoriaus įrašai

linuxhint_spark_app.sql( 'select * from Sector2' ).Rodyti()

Išvestis:

2 pavyzdys:

Naudokite ankstesnį VIEW ir parašykite užklausą, kad būtų rodomi visi įrašai, kurių įvertinimas yra „Karštas“ arba „Šaunus“.

# Užklausa, kad būtų rodomi visi 2 sektoriaus įrašai su įvertinimu – karšta arba šalta.

linuxhint_spark_app.sql( 'pasirinkite * iš 2 sektoriaus, kur Įvertinimas = 'Karšta' ARBA Įvertinimas = 'Šaunus'' ).Rodyti()

Išvestis:

Yra trys įrašai su įvertinimu „Karšta“ arba „Šaunu“.

Išvada

PySpark, funkcija write.parquet() įrašo DataFrame į parketo failą. Funkcija read.parquet() nuskaito parketo failą į PySpark DataFrame arba bet kurį kitą duomenų šaltinį. Sužinojome, kaip skaityti parketo failą į PySpark DataFrame ir į PySpark lentelę. Vykdydami šią mokymo programą taip pat aptarėme, kaip sukurti lenteles iš PySpark DataFrame ir filtruoti duomenis naudojant WHERE sąlygą.