Šiame vadove daugiausia dėmesio skirsime parketo failo skaitymui / įkėlimui į PySpark DataFrame/SQL, naudodami funkciją read.parquet(), kuri pasiekiama pyspark.sql.DataFrameReader klasėje.
Turinio tema:
Perskaitykite parketo failą į PySpark DataFrame
Perskaitykite parketo failą į PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Ši funkcija naudojama nuskaityti parketo failą ir įkelti jį į PySpark DataFrame. Jis paima parketo failo kelią / failo pavadinimą. Galime tiesiog naudoti funkciją read.parquet(), nes tai yra bendroji funkcija.
Sintaksė:
Pažiūrėkime read.parquet() sintaksę:
spark_app.read.parquet(failo_pavadinimas.parquet/path)Pirmiausia įdiekite PySpark modulį naudodami pip komandą:
pip įdiegti pyspark
Gaukite parketo failą
Norint nuskaityti parketo bylą, reikia duomenų, kuriuose iš tų duomenų sugeneruota parketo byla. Šioje dalyje pamatysime, kaip sugeneruoti parketo failą iš PySpark DataFrame.
Sukurkime PySpark DataFrame su 5 įrašais ir įrašykime tai į parketo failą 'industry_parquet'.
importuoti pysparkiš pyspark.sql importuoti SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# sukurti duomenų rėmelį, kuriame saugoma pramonės šakos informacija
Industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Žemdirbystė' ,Plotas= 'JAV' ,
Įvertinimas = 'karšta' ,Iš viso_darbuotojų= 100 ),
Eilutė(tipas= 'Žemdirbystė' ,Plotas= 'Indija' ,Įvertinimas= 'karšta' ,Iš viso_darbuotojų= 200 ),
Eilutė(tipas= 'plėtra' ,Plotas= 'JAV' ,Įvertinimas= 'šiltas' ,Iš viso_darbuotojų= 100 ),
Eilutė(tipas= 'Išsilavinimas' ,Plotas= 'JAV' ,Įvertinimas= 'Saunus' ,Iš viso_darbuotojų= 400 ),
Eilutė(tipas= 'Išsilavinimas' ,Plotas= 'JAV' ,Įvertinimas= 'šiltas' ,Iš viso_darbuotojų= dvidešimt )
])
# Faktinis duomenų rėmelis
Industry_df.show()
# Įrašykite industrial_df į parketo failą
Industry_df.coalesce( 1 ).write.parquet( 'pramonės_parketas' )
Išvestis:
Tai DataFrame, kuriame yra 5 įrašai.
Sukuriamas parketo failas ankstesniam DataFrame. Čia mūsų failo pavadinimas su plėtiniu yra „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Mes naudojame šį failą visoje pamokoje.
Perskaitykite parketo failą į PySpark DataFrame
Turime parketo dildę. Perskaitykime šį failą naudodami read.parquet() funkciją ir įkelkime jį į PySpark DataFrame.
importuoti pysparkiš pyspark.sql importuoti SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Nuskaitykite parketo failą į objektą dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Rodyti dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Išvestis:
Rodome DataFrame naudodami show() metodą, kuris buvo sukurtas iš parketo failo.
SQL užklausos su parketo failu
Įkėlus į DataFrame, galima sukurti SQL lenteles ir rodyti duomenis, esančius DataFrame. Turime sukurti LAIKINĄ VAIZDĄ ir naudoti SQL komandas, kad grąžintume įrašus iš „DataFrame“, sukurto iš parketo failo.
1 pavyzdys:
Sukurkite laikiną rodinį pavadinimu „Sektoriai“ ir naudokite komandą SELECT, kad būtų rodomi „DataFrame“ įrašai. Galite kreiptis į tai pamoka paaiškina, kaip Spark – SQL sukurti VIEW.
importuoti pysparkiš pyspark.sql importuoti SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Nuskaitykite parketo failą į objektą dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Sukurkite vaizdą iš aukščiau esančio parketo failo pavadinimu - 'Sektoriai'
dataframe_from_parquet.createOrReplaceTempView( 'Sektoriai' )
# Užklausa, kad būtų rodomi visi sektorių įrašai
linuxhint_spark_app.sql( 'pasirinkite * iš sektorių' ).Rodyti()
Išvestis:
2 pavyzdys:
Naudodami ankstesnį VIEW, parašykite SQL užklausą:
- Norėdami parodyti visus įrašus iš sektorių, priklausančių „Indijai“.
- Rodyti visus įrašus iš sektorių, kurių darbuotojas yra didesnis nei 100.
linuxhint_spark_app.sql( 'select * iš sektorių, kur sritis = 'Indija' ).Rodyti()
# Užklausa, kad būtų rodomi visi įrašai iš sektorių, kurių darbuotojų skaičius didesnis nei 100
linuxhint_spark_app.sql( 'pasirinkite * iš sektorių, kuriuose iš viso_darbuotojų > 100' ).Rodyti()
Išvestis:
Yra tik vienas įrašas, kurio sritis yra „Indija“, ir du įrašai su darbuotojais, kurie yra didesni nei 100.
Perskaitykite parketo failą į PySpark SQL
Pirmiausia turime sukurti VIEW naudodami komandą CREATE. Naudodami raktinį žodį „kelio“ SQL užklausoje, galime nuskaityti parketo failą į „Spark SQL“. Po kelio turime nurodyti failo pavadinimą / vietą.
Sintaksė:
spark_app.sql( 'SUKURTI LAIKINĄJĮ RODINĮ rodinio_pavadinimas NAUDOJANT parketo PARINKČIAS (kelias ' failo_vardas.parketas ')' )1 pavyzdys:
Sukurkite laikiną rodinį pavadinimu „Sector2“ ir perskaitykite jame parketo failą. Naudodami funkciją sql() parašykite pasirinkimo užklausą, kad būtų rodomi visi rodinyje esantys įrašai.
importuoti pysparkiš pyspark.sql importuoti SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Linux patarimas“ ).getOrCreate()
# Perskaitykite parketo failą į Spark-SQL
linuxhint_spark_app.sql( 'SUKURTI LAIKINĄ VAIZDĮ Sector2 NAUDOJANT parketo OPTIONS (kelias' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Užklausa, kad būtų rodomi visi 2 sektoriaus įrašai
linuxhint_spark_app.sql( 'select * from Sector2' ).Rodyti()
Išvestis:
2 pavyzdys:
Naudokite ankstesnį VIEW ir parašykite užklausą, kad būtų rodomi visi įrašai, kurių įvertinimas yra „Karštas“ arba „Šaunus“.
# Užklausa, kad būtų rodomi visi 2 sektoriaus įrašai su įvertinimu – karšta arba šalta.linuxhint_spark_app.sql( 'pasirinkite * iš 2 sektoriaus, kur Įvertinimas = 'Karšta' ARBA Įvertinimas = 'Šaunus'' ).Rodyti()
Išvestis:
Yra trys įrašai su įvertinimu „Karšta“ arba „Šaunu“.
Išvada
PySpark, funkcija write.parquet() įrašo DataFrame į parketo failą. Funkcija read.parquet() nuskaito parketo failą į PySpark DataFrame arba bet kurį kitą duomenų šaltinį. Sužinojome, kaip skaityti parketo failą į PySpark DataFrame ir į PySpark lentelę. Vykdydami šią mokymo programą taip pat aptarėme, kaip sukurti lenteles iš PySpark DataFrame ir filtruoti duomenis naudojant WHERE sąlygą.