项目作者: Ketul654

项目描述 :
Word Count Kafka Stream Application
高级语言: Java
项目地址: git://github.com/Ketul654/word-count-stream-application.git
创建时间: 2020-12-29T20:36:59Z
项目社区:https://github.com/Ketul654/word-count-stream-application

开源协议:

下载


Kafka Streams

About

This is kafka stream application to count number of words from a stream of sentences. Words are considered case insensitive i.e. kafka, Kafka, KAFKA and kafKa are considered all the same.

This is developed using java 8 and kafka stream 2.7.0

How to run

  1. Setup kafka cluster on your local machine. Refer Kafka Cluster Setup section for help.

  2. Create input and output topics

    1. bin/kafka-topics.sh --create \
    2. --zookeeper localhost:2181 \
    3. --replication-factor 1 \
    4. --partitions 3 \
    5. --topic word-count-input
    6. bin/kafka-topics.sh --create \
    7. --zookeeper localhost:2181 \
    8. --replication-factor 1 \
    9. --partitions 3 \
    10. --topic word-count-output
  3. Produce messages from kafka console producer to input topic

    1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input
  4. Start Word Count Stream application

  5. Start console consumer to consume messages from output topic

    1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    2. --topic word-count-output \
    3. --from-beginning \
    4. --formatter kafka.tools.DefaultMessageFormatter \
    5. --property print.key=true \
    6. --property print.value=true \
    7. --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    8. --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Kafka Cluster Setup

Follow below steps to set up 3 node cluster on single Mac machine

  • Download Kafka from https://kafka.apache.org/downloads
  • Extract it somewhere by executing tar command on Terminal

    i.e. tar -xvf kafka_2.13-2.6.0.tgz

  • Go to that extracted Kafka folder

    i.e. cd kafka_2.13-2.6.0/

  • Start zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    This will bring up zookeeper on default port 2181 configured in config/zookeeper.properties file

  • Start first broker/node

    bin/kafka-server-start.sh config/server.properties

    This will start broker with below default broker id, log directory and port configured in config/server.properties

    1. broker.id=0
    2. log.dirs=/tmp/kafka-logs
    3. port=9092
  • Create a copy of config/server.properties file for second broker/node

    i.e. cp config/server.properties config/server1.properties

  • Change broker id, log directory and port in config/server1.properties file

    1. broker.id=1
    2. log.dirs=/tmp/kafka-logs-1
    3. port=9093
  • Start second broker/node

    bin/kafka-server-start.sh config/server1.properties

  • Create one more copy of config/server.properties file for third broker/node

    i.e. cp config/server.properties config/server2.properties

  • Change broker id, log directory and port in config/server2.properties file

    1. broker.id=2
    2. log.dirs=/tmp/kafka-logs-2
    3. port=9094
  • Start third broker/node

    bin/kafka-server-start.sh config/server2.properties

  • Check what brokers are up and running

    bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

    will give you below output

    1. Connecting to localhost:2181
    2. WATCHER::
    3. WatchedEvent state:SyncConnected type:None path:null
    4. [0, 1, 2]