Kaip įdiegti duomenų srautinį perdavimą realiuoju laiku „Python“.

Kaip Idiegti Duomenu Srautini Perdavima Realiuoju Laiku Python



Realaus laiko duomenų srautinio perdavimo Python įdiegimas yra esminis įgūdis šiandieniniame duomenų sraute. Šiame vadove nagrinėjami pagrindiniai žingsniai ir pagrindiniai įrankiai, kaip naudojant Python duomenų perdavimą realiuoju laiku naudojant autentiškumą. Nuo tinkamos sistemos, pvz., „Apache Kafka“ ar „Apache Pulsar“ parinkimo iki „Python“ kodo rašymo, kad būtų lengvas duomenų suvartojimas, apdorojimas ir efektyvi vizualizacija, įgysime įgūdžių, reikalingų kurti judrius ir efektyvius realaus laiko duomenų kanalus.

1 pavyzdys: duomenų srautinio perdavimo realiuoju laiku įgyvendinimas Python

Duomenų srautinio perdavimo realiuoju laiku įgyvendinimas naudojant Python yra labai svarbus šiandienos duomenimis pagrįstame amžiuje ir pasaulyje. Šiame išsamiame pavyzdyje apžvelgsime realaus laiko duomenų srautinio perdavimo sistemos kūrimo procesą naudojant „Apache Kafka“ ir „Python“ sistemoje „Google Colab“.







Norint inicijuoti pavyzdį prieš pradedant kodavimą, būtina sukurti konkrečią aplinką „Google Colab“. Pirmas dalykas, kurį turime padaryti, yra įdiegti reikiamas bibliotekas. Kafka integravimui naudojame „kafka-python“ biblioteką.



! pip diegti kafka-python


Ši komanda įdiegia „kafka-python“ biblioteką, kuri teikia „Python“ funkcijas ir „Apache Kafka“ susiejimą. Tada importuojame reikiamas bibliotekas mūsų projektui. Reikalingų bibliotekų, įskaitant „KafkaProducer“ ir „KafkaConsumer“, importavimas yra klasės iš „kafka-python“ bibliotekos, leidžiančios mums bendrauti su Kafka brokeriais. JSON yra „Python“ biblioteka, skirta dirbti su JSON duomenimis, kuriuos naudojame pranešimų serijai ir deserializavimui.



iš kafka importo KafkaProducer, KafkaConsumer
importuoti json


„Kafka“ prodiuserio sukūrimas





Tai svarbu, nes Kafkos gamintojas siunčia duomenis į Kafkos temą. Mūsų pavyzdyje sukuriame gamintoją, kuris siųstų imituotus duomenis realiuoju laiku į temą, pavadintą „realiojo laiko tema“.

Sukuriame „KafkaProducer“ egzempliorių, kuriame „Kafka“ brokerio adresas nurodomas kaip „localhost:9092“. Tada naudojame „value_serializer“ – funkciją, kuri suskirsto duomenis prieš siunčiant juos Kafkai. Mūsų atveju lambda funkcija koduoja duomenis kaip UTF-8 koduotą JSON. Dabar modeliuokime kai kuriuos duomenis realiuoju laiku ir nusiųskite juos į Kafkos temą.



gamintojas = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( in ) .koduoti ( 'utf-8' ) )
# Imituojami realaus laiko duomenys
duomenys = { 'sensor_id' : 1 , 'temperatūra' : 25.5 , 'drėgmė' : 60.2 }
# Siunčiami duomenys į temą
gamintojas.siųsti ( 'realaus laiko tema' , duomenys )


Šiose eilutėse apibrėžiame „duomenų“ žodyną, kuris vaizduoja imituotus jutiklio duomenis. Tada naudojame „siųsti“ metodą, kad paskelbtume šiuos duomenis „realiojo laiko temoje“.

Tada norime sukurti Kafka vartotoją, o Kafka vartotojas nuskaito Kafkos temos duomenis. Sukuriame vartotoją, kuris vartoja ir apdoroja pranešimus „realiojo laiko temoje“. Sukuriame „KafkaConsumer“ egzempliorių, nurodydami temą, kurią norime vartoti, pvz., (tema realiuoju laiku) ir Kafka brokerio adresą. Tada „value_deserializer“ yra funkcija, kuri deserializuoja iš Kafkos gautus duomenis. Mūsų atveju lambda funkcija iššifruoja duomenis kaip UTF-8 koduotą JSON.

vartotojas = KafkaConsumer ( 'realaus laiko tema' ,
bootstrap_servers = 'localhost: 9092' ,
vertės_deserializatorius =lambda x: json.loads ( x.dekoduoti ( 'utf-8' ) ) )


Naudojame kartotinę kilpą, kad galėtume nuolat vartoti ir apdoroti temos pranešimus.

# Skaityti ir apdoroti duomenis realiuoju laiku
dėl žinutę in vartotojas:
duomenys = pranešimas.vertė
spausdinti ( f „Gauti duomenys: {data}“ )


Mes gauname kiekvieno pranešimo vertę ir savo modeliuojamus jutiklio duomenis kilpoje ir išspausdiname juos į konsolę. „Kafka“ gamintojo ir vartotojo paleidimas apima šio kodo paleidimą „Google Colab“ ir kodo langelių vykdymą atskirai. Gamintojas siunčia modeliuotus duomenis į Kafkos temą, o vartotojas skaito ir atspausdina gautus duomenis.


Išvesties analizė veikiant kodui

Stebėsime realaus laiko duomenis, kurie yra gaminami ir vartojami. Duomenų formatas gali skirtis priklausomai nuo mūsų modeliavimo arba faktinio duomenų šaltinio. Šiame išsamiame pavyzdyje aprašome visą duomenų srautinio perdavimo realiuoju laiku sistemos nustatymo procesą naudojant „Apache Kafka“ ir „Python“ sistemoje „Google Colab“. Paaiškinsime kiekvieną kodo eilutę ir jos reikšmę kuriant šią sistemą. Duomenų srautinis perdavimas realiuoju laiku yra galinga galimybė, o šis pavyzdys yra sudėtingesnių realaus pasaulio programų pagrindas.

2 pavyzdys: Duomenų srautinio perdavimo realiuoju laiku įgyvendinimas Python naudojant vertybinių popierių rinkos duomenis

Padarykime dar vieną unikalų pavyzdį, kaip įgyvendinti Python duomenų srautinį perdavimą realiuoju laiku naudojant kitą scenarijų; šį kartą daugiausia dėmesio skirsime akcijų rinkos duomenims. Sukuriame duomenų srautinio perdavimo realiuoju laiku sistemą, kuri fiksuoja akcijų kainų pokyčius ir apdoroja juos naudodami „Apache Kafka“ ir „Python“ sistemoje „Google Colab“. Kaip parodyta ankstesniame pavyzdyje, pradedame konfigūruodami aplinką sistemoje „Google Colab“. Pirmiausia įdiegiame reikiamas bibliotekas:

! pip diegti kafka-python yfinance


Čia pridedame „yfinance“ biblioteką, kuri leidžia gauti realaus laiko akcijų rinkos duomenis. Toliau importuojame reikiamas bibliotekas. Mes ir toliau naudojame klases „KafkaProducer“ ir „KafkaConsumer“ iš „kafka-python“ bibliotekos Kafka sąveikai. Importuojame JSON, kad galėtume dirbti su JSON duomenimis. Taip pat naudojame „yfinance“, kad gautume realaus laiko akcijų rinkos duomenis. Taip pat importuojame „laiko“ biblioteką, kad pridėtume laiko delsą, kad būtų galima imituoti atnaujinimus realiuoju laiku.

iš kafka importo KafkaProducer, KafkaConsumer
importuoti json
importuoti finansus kaip yf
importuoti laikas


Dabar sukuriame „Kafka“ gamintoją atsargų duomenims. Mūsų „Kafka“ gamintojas gauna realiojo laiko atsargų duomenis ir siunčia juos į „Kafka“ temą, pavadintą „akcijų kaina“.

gamintojas = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( in ) .koduoti ( 'utf-8' ) )

kol Tiesa:
atsargos = yf.Ticker ( 'AAPL' ) # Pavyzdys: Apple Inc. akcijos
stock_data = stock.history ( laikotarpį = '1d' )
paskutinė_kaina = atsargų_duomenys [ 'Uždaryti' ] .iloc [ - 1 ]
duomenys = { 'simbolis' : 'AAPL' , 'kaina' : Paskutinė kaina }
gamintojas.siųsti ( 'akcijos kaina' , duomenys )
laikas.miegas ( 10 ) # Imituokite atnaujinimus realiuoju laiku kas 10 sekundžių


Sukuriame „KafkaProducer“ egzempliorių su Kafka brokerio adresu šiame kode. Ciklo viduje naudojame „yfinance“, kad gautume naujausią „Apple Inc.“ („AAPL“) akcijų kainą. Tada išimame paskutinę uždarymo kainą ir išsiunčiame į temą „akcijų kaina“. Galiausiai įdiegiame laiko delsą, kad imituotume atnaujinimus realiuoju laiku kas 10 sekundžių.

Sukurkime Kafkos vartotoją, kuris skaitys ir apdoros akcijų kainų duomenis iš temos „akcijų kaina“.

vartotojas = KafkaConsumer ( 'akcijos kaina' ,
bootstrap_servers = 'localhost:9092' ,
vertės_deserializatorius =lambda x: json.loads ( x.dekoduoti ( 'utf-8' ) ) )

dėl žinutę in vartotojas:
stock_data = pranešimas.vertė
spausdinti ( f 'Gauti atsargų duomenys: {stock_data['ymbol']} – kaina: {stock_data['price']}' )


Šis kodas panašus į ankstesnio pavyzdžio vartotojo sąranką. Jis nuolat skaito ir apdoroja pranešimus iš temos „akcijų kaina“ ir spausdina akcijų simbolį bei kainą į konsolę. Kodo langelius vykdome nuosekliai, pvz., po vieną „Google Colab“, kad paleistume gamintoją ir vartotoją. Gamintojas gauna ir siunčia realiu laiku akcijų kainų atnaujinimus, o vartotojas skaito ir rodo šiuos duomenis.

! pip diegti kafka-python yfinance
iš kafka importo KafkaProducer, KafkaConsumer
importuoti json
importuoti finansus kaip yf
importuoti laikas
gamintojas = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( in ) .koduoti ( 'utf-8' ) )

kol Tiesa:
atsargos = yf.Ticker ( 'AAPL' ) # Apple Inc. akcijų
stock_data = stock.history ( laikotarpį = '1d' )
paskutinė_kaina = atsargų_duomenys [ 'Uždaryti' ] .iloc [ - 1 ]

duomenys = { 'simbolis' : 'AAPL' , 'kaina' : Paskutinė kaina }

gamintojas.siųsti ( 'akcijos kaina' , duomenys )

laikas.miegas ( 10 ) # Imituokite atnaujinimus realiuoju laiku kas 10 sekundžių
vartotojas = KafkaConsumer ( 'akcijos kaina' ,
bootstrap_servers = 'localhost:9092' ,
vertės_deserializatorius =lambda x: json.loads ( x.dekoduoti ( 'utf-8' ) ) )

dėl žinutę in vartotojas:
stock_data = pranešimas.vertė
spausdinti ( f 'Gauti atsargų duomenys: {stock_data['simbolis']} – kaina: {stock_data['price']}' )


Analizuodami išvestį po kodo paleidimo, stebėsime, kaip gaminami ir vartojami Apple Inc. akcijų kainų atnaujinimai realiuoju laiku.

Išvada

Šiame unikaliame pavyzdyje pademonstravome duomenų srautinio perdavimo realiuoju laiku įgyvendinimą „Python“, naudodami „Apache Kafka“ ir „yfinance“ biblioteką, kad gautume ir apdorotų akcijų rinkos duomenis. Mes išsamiai paaiškinome kiekvieną kodo eilutę. Duomenų srautinis perdavimas realiuoju laiku gali būti taikomas įvairiose srityse, kad būtų galima kurti realaus pasaulio programas finansų, daiktų interneto ir kt.