Če želite dokončati to lekcijo, morate na svojem računalniku imeti aktivno namestitev programa Kafka. Preberite Namesti Apache Kafka v Ubuntu, če želite vedeti, kako to storiti.
Namestitev odjemalca Python za Apache Kafka
Preden začnemo delati z Apache Kafka v programu Python, moramo namestiti odjemalca Python za Apache Kafka. To lahko storite z uporabo pip (Kazalo paketa Python). Tu je ukaz, kako to doseči:
pip3 namestite kafka-pythonTo bo hitra namestitev na terminalu:
Namestitev odjemalca Python Kafka s pomočjo PIP
Zdaj, ko imamo aktivno namestitev za Apache Kafka in smo namestili tudi odjemalca Python Kafka, smo pripravljeni začeti s kodiranjem.
Izdelava producenta
Prvo, kar morate objaviti na Kafki, je aplikacija proizvajalca, ki lahko pošilja sporočila temam v Kafki.
Upoštevajte, da so proizvajalci Kafke asinhroni proizvajalci sporočil. To pomeni, da operacije, opravljene med objavo sporočila na particiji Kafka Topic, ne blokirajo. Da bodo stvari preproste, bomo za to lekcijo napisali preprost založnik JSON.
Za začetek naredite primerek za Kafka Producer:
iz kafka import KafkaProduceruvoz json
uvozni odtis
proizvajalec = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.odlagališča (v).encode ('utf-8'))
Atribut bootstrap_servers obvešča o gostitelju in vratih za strežnik Kafka. Atribut value_serializer je samo za namen JSON-ove serializacije vrednosti JSON.
Za igranje s producentom Kafka poskusimo natisniti meritve, povezane s skupino Producer in Kafka:
metrike = proizvajalec.meritve ()odtis.pprint (metrike)
Zdaj bomo videli naslednje:
Kafka Mterics
Zdaj pa poskusimo končno poslati nekaj sporočil v čakalno vrsto Kafka. Dober primer bo preprost objekt JSON:
proizvajalec.pošlji ('linuxhint', 'topic': 'kafka')The linuxhint je particija teme, na kateri bo poslan objekt JSON. Ko zaženete skript, ne boste dobili nobenega izhoda, saj je sporočilo poslano na particijo teme. Čas je, da napišemo potrošnika, da bomo lahko preizkusili svojo aplikacijo.
Potrošnik
Zdaj smo pripravljeni vzpostaviti novo povezavo kot potrošniška aplikacija in prejemati sporočila iz teme Kafka. Začnite z izdelavo novega primerka za potrošnika:
iz kafka import KafkaConsumeriz kafka import TopicPartition
print ('Vzpostavljanje povezave.')
potrošnik = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Zdaj tej povezavi dodelite temo in tudi možno vrednost odmika.
print ('Dodelitev teme.')potrošnik.dodeliti ([TopicPartition ('linuxhint', 2)])
Končno smo pripravljeni na tiskanje sporočila:
print ('Pridobivanje sporočila.')za sporočilo v potrošniku:
print ("OFFSET:" + str (sporočilo [0]) + "\ t MSG:" + str (sporočilo))
S tem bomo dobili seznam vseh objavljenih sporočil na Kafka Consumer Topic Partition. Rezultat tega programa bo:
Kafka Consumer
Za kratek povzetek je tukaj celoten produkcijski skript:
iz kafka import KafkaProduceruvoz json
uvozni odtis
proizvajalec = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.odlagališča (v).encode ('utf-8'))
proizvajalec.pošlji ('linuxhint', 'topic': 'kafka')
# metrics = proizvajalec.meritve ()
# pprint.pprint (metrike)
Tukaj je celoten potrošniški program, ki smo ga uporabili:
iz kafka import KafkaConsumeriz kafka import TopicPartition
print ('Vzpostavljanje povezave.')
potrošnik = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Dodelitev teme.')
potrošnik.dodeliti ([TopicPartition ('linuxhint', 2)])
print ('Pridobivanje sporočila.')
za sporočilo v potrošniku:
print ("OFFSET:" + str (sporočilo [0]) + "\ t MSG:" + str (sporočilo))
Zaključek
V tej lekciji smo preučili, kako lahko namestimo in začnemo uporabljati Apache Kafka v naših programih Python. S prikazanim Kafka Client for Python smo pokazali, kako enostavno je izvajati preprosta opravila, povezana s Kafko v Pythonu.