Apache Kafka

Kako brati podatke iz Kafke s Pythonom

Kako brati podatke iz Kafke s Pythonom
Kafka je odprtokodni sistem za distribucijo sporočil za pošiljanje sporočila v razdeljenih in različnih temah. Pretakanje podatkov v realnem času je mogoče izvesti z uporabo Kafke za sprejemanje podatkov med aplikacijami. Ima tri glavne dele. To so proizvajalci, potrošniki in teme. Proizvajalec se uporablja za pošiljanje sporočila na določeno temo, vsako sporočilo pa je priloženo s ključem. Potrošnik se uporablja za branje sporočila o določeni temi iz nabora particij. Podatki, prejeti od proizvajalca in shranjeni na particijah glede na določeno temo. V knjižnici python obstaja veliko knjižnic za ustvarjanje proizvajalcev in potrošnikov za izdelavo sistema sporočanja s pomočjo Kafke. Kako je mogoče podatke iz Kafke brati s pomočjo pythona, je prikazano v tej vadnici.

Predpogoj

Za branje podatkov iz Kafke morate namestiti potrebno knjižnico python. Python3 se v tej vadnici uporablja za pisanje skripta potrošnika in proizvajalca. Če paket pip ni nameščen prej v vašem operacijskem sistemu Linux, morate namestiti pip, preden namestite knjižnico Kafka za python. python3-kafka se v tej vadnici uporablja za branje podatkov iz Kafke. Za namestitev knjižnice zaženite naslednji ukaz.

$ pip namestite python3-kafka

Branje preprostih besedilnih podatkov iz Kafke

Proizvajalec lahko pošlje različne vrste podatkov o določeni temi, ki jo lahko prebere potrošnik. V tem delu te vadnice je prikazano, kako lahko od Kafke pošljemo in prejmemo preproste besedilne podatke s pomočjo proizvajalca in potrošnika.

Ustvarite datoteko z imenom proizvajalec1.py z naslednjim skriptom python. KafkaProducer modul je uvožen iz knjižnice Kafka. Seznam posrednikov mora določiti v času inicializacije objekta proizvajalca, da se poveže s strežnikom Kafka. Privzeta vrata Kafke so '9092". Argument bootstrap_servers se uporablja za določitev imena gostitelja z vrati. "First_Topic'je nastavljeno kot ime teme, s katero bo proizvajalcu poslano besedilno sporočilo. Nato preprosto besedilno sporočilo, 'Pozdravljeni iz Kafke'je poslano z uporabo pošlji () metoda KafkaProducer na temo, 'First_Topic".

proizvajalec1.py:

# Uvozi KafkaProducer iz knjižnice Kafka
iz kafka import KafkaProducer
# Določite strežnik z vrati
bootstrap_servers = ['localhost: 9092']
# Določite ime teme, kjer bo sporočilo objavljeno
topicName = 'First_Topic'
# Inicializirajte spremenljivko proizvajalca
proizvajalec = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Objavi besedilo v določeni temi
proizvajalec.pošlji (topicName, b'Halo od kafka ... ')
# Natisni sporočilo
print ("Sporočilo poslano")

Ustvarite datoteko z imenom potrošnik1.py z naslednjim skriptom python. KafkaConsumer modul je uvožen iz knjižnice Kafka za branje podatkov iz Kafke. sys tukaj se uporablja modul za zaključek skripta. Za branje podatkov iz Kafke se v skriptu potrošnika uporabljata isto ime gostitelja in številka vrat proizvajalca. Ime teme potrošnika in proizvajalca mora biti enako, kar je „Prva_tema".  Nato se potrošniški objekt inicializira s tremi argumenti. Ime teme, ID skupine in podatki o strežniku. za zanka se tukaj uporablja za branje besedila, poslanega od proizvajalca Kafka.

potrošnik1.py:

# Uvozi KafkaConsumer iz knjižnice Kafka
iz kafka import KafkaConsumer
# Uvozi modul sys
uvoz sys
# Določite strežnik z vrati
bootstrap_servers = ['localhost: 9092']
# Določite ime teme, od kod bo prejeto sporočilo
topicName = 'First_Topic'
# Inicializirajte potrošniško spremenljivko
potrošnik = KafkaConsumer (imeName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Preberite in natisnite sporočilo potrošnika
za sporočilo pri porabniku:
print ("Ime teme =% s, Sporočilo =% s"% (sporoč.tema, sporoč.vrednost))
# Prekinite skript
sys.izhod ()

Izhod:

Zaženite naslednji ukaz iz enega terminala, da zaženete skript proizvajalca.

$ python3 proizvajalec1.py

Po pošiljanju sporočila se prikaže naslednji izhod.

Zaženite naslednji ukaz iz drugega terminala, da zaženete potrošniški skript.

$ python3 potrošnik1.py

Rezultat prikazuje ime teme in besedilno sporočilo, poslano od proizvajalca.

Branje podatkov v formatu JSON iz Kafke

Podatke v obliki JSON lahko pošlje proizvajalec Kafka in jih prebere potrošnik Kafka json modul pythona. Kako je mogoče podatke JSON serializirati in odstraniti iz serije, preden jih pošljete in sprejmete z uporabo modula python-kafka, je prikazano v tem delu te vadnice.

Ustvarite python skript z imenom proizvajalec2.py z naslednjim skriptom. Drugi modul z imenom JSON se uvozi z KafkaProducer modul tukaj. vrednost_serializator argument se uporablja z bootstrap_servers argument tukaj za inicializacijo predmeta proizvajalca Kafka. Ta argument pomeni, da bodo podatki JSON kodirani z uporabo 'utf-8'nabor znakov v času pošiljanja. Nato se podatki v obliki JSON pošljejo v imenovano temo JSONtopic.

proizvajalec2.py:

# Uvozi KafkaProducer iz knjižnice Kafka
iz kafka import KafkaProducer
# Uvozite modul JSON za serializacijo podatkov
uvoz json
# Inicializirajte proizvajalčevo spremenljivko in nastavite parameter za kodiranje JSON
proizvajalec = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.odlagališča (v).encode ('utf-8'))
# Pošljite podatke v obliki JSON
proizvajalec.send ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
 
# Natisni sporočilo
print ("Sporočilo poslano v JSONtopic")

Ustvarite python skript z imenom potrošnik2.py z naslednjim skriptom. KafkaConsumer, sys in JSON moduli so uvoženi v tem skriptu. KafkaConsumer modul se uporablja za branje podatkov v formatu JSON iz Kafke. Modul JSON se uporablja za dekodiranje kodiranih podatkov JSON, poslanih od proizvajalca Kafke. Sys modul se uporablja za zaključek skripta. vrednost_deserializator argument se uporablja z bootstrap_servers da določite, kako se bodo dešifrirali podatki JSON. Naslednji, za zanka se uporablja za tiskanje vseh potrošniških zapisov in podatkov JSON, pridobljenih iz Kafke.

potrošnik2.py:

# Uvozi KafkaConsumer iz knjižnice Kafka
iz kafka import KafkaConsumer
# Uvozi modul sys
uvoz sys
# Uvozite modul json za serializacijo podatkov
uvoz json
# Inicializirajte potrošniško spremenljivko in nastavite lastnost za dekodiranje JSON
potrošnik = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
vrednost_deserializator = lambda m: json.obremenitve (m.decode ('utf-8')))
# Preberite podatke iz kafke
za sporočilo v potrošniku:
print ("Potrošniški zapisi: \ n")
natisni (sporočilo)
print ("\ nBranje iz podatkov JSON \ n")
print ("Ime:", sporočilo [6] ['ime'])
print ("E-pošta:", sporočilo [6] ['email'])
# Prekinite skript
sys.izhod ()

Izhod:

Zaženite naslednji ukaz iz enega terminala, da zaženete skript proizvajalca.

$ python3 proizvajalec2.py

Skript bo po pošiljanju podatkov JSON natisnil naslednje sporočilo.

Zaženite naslednji ukaz iz drugega terminala, da zaženete potrošniški skript.

$ python3 potrošnik2.py

Po zagonu skripta se prikaže naslednji izhod.

Zaključek:

Podatke lahko s pomočjo pythona pošilja in prejema Kafka v različnih oblikah. Podatke lahko shranite v bazo podatkov in jih pridobite iz baze podatkov s pomočjo Kafke in pythona. Doma sem, ta vadnica bo uporabniku pythona pomagala, da začne sodelovati s Kafko.

Najboljši emulatorji igralne konzole za Linux
V tem članku bo navedena priljubljena programska oprema za emulacijo igralne konzole, ki je na voljo za Linux. Emulacija je sloj združljivosti program...
Najboljši Linux Distros za igre na srečo v letu 2021
Operacijski sistem Linux je daleč od prvotnega, preprostega, strežniškega videza. Ta OS se je v zadnjih letih izjemno izboljšal in se je zdaj razvil v...
Kako zajeti in pretakati svojo igralno sejo v Linuxu
V preteklosti je bilo igranje iger le hobi, sčasoma pa je igralniška industrija zabeležila veliko rast glede tehnologije in števila igralcev. Občinstv...