Kafka

Kafka allows you to distribute the load between nodes receiving data and encrypts communication.

Architecture example:

../_images/image124.PNG

The Kafka installation

To install the Kafka, follow the steps below:

  1. Java installation

    yum install java-11-openjdk-headless.x86_64
    
  2. Create users for Kafka and Zookeeper

    useradd -r -u 30000 -M kafka
    useradd -r -u 30001 -M -G kafka zookeeper
    
  3. Download the installation package::

    https://kafka.apache.org/downloads

    and

    https://zookeeper.apache.org/releases.html

    wget https://ftp.ps.pl/pub/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
    wget https://ftp.ps.pl/pub/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
    tar -xzvf apache-zookeeper-3.6.2-bin.tar.gz -C /opt/
    tar -xzvf ./kafka_2.13-2.7.0.tgz -C /opt/
    ln -s /opt/apache-zookeeper-3.6.2-bin /opt/zookeeper
    ln -s /opt/kafka_2.13-2.7.0 /opt/kafka
    mkdir /var/lib/{zookeeper,kafka}
    mkdir /var/log/kafka
    
  4. Set the necessary permissions

    chown -R zookeeper: /opt/apache-zookeeper-3.6.2-bin /var/lib/zookeeper
    chown -R kafka: /opt/kafka_2.13-2.7.0 /var/lib/kafka
    chown -h zookeeper: /opt/zookeeper
    chown -h kafka: /opt/kafka
    
  5. Edit configs and set the data and log directory:

    cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
    vim /opt/zookeeper/conf/zoo.cfg
    and change to dataDir=/var/lib/zookeeper
    vim  /opt/kafka/config/server.properties
    and chnage to log.dirs=/var/lib/kafka
    
  6. Set the necessary firewall rules:

    firewall-cmd --permanent --add-port=2181/tcp
    firewall-cmd --permanent --add-port=2888/tcp
    firewall-cmd --permanent --add-port=3888/tcp
    firewall-cmd --permanent --add-port=9092/tcp
    firewall-cmd --reload
    
  7. Create service files:

    vim /usr/lib/systemd/system/zookeeper.service

    [Unit]
    Description=Apache zookeeper service
    Documentation=http://zookeeper.apache.org
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target
    
    [Service]
    Type=forking
    User=zookeeper
    Group=zookeeper
    ExecStart=/opt/zookeeper/bin/zkServer.sh start
    ExecStop=/opt/zookeeper/bin/zkServer.sh stop
    ExecReload=/opt/zookeeper/bin/zkServer.sh restart
    WorkingDirectory=/var/lib/zookeeper
    
    [Install]
    WantedBy=multi-user.target
    

    vim create /usr/lib/systemd/system/kafka.service

    [Unit]
    Description=Apache Kafka Server
    Documentation=http://kafka.apache.org/documentation.html
    Requires=network.target remote-fs.target zookeeper.service
    After=network.target remote-fs.target zookeeper.service
    
    [Service]
    Type=simple
    User=kafka
    Group=kafka
    Environment="LOG_DIR=/var/log/kafka"
    ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    ExecStop=/opt/kafka/bin/kafka-server-stop.sh
    
    [Install]
    WantedBy=multi-user.target
    
  8. Reload systemctl daemon and the Kafka services:

    systemctl daemon-reload
    systemctl enable zookeeper kafka
    systemctl start zookeeper kafka
    
  9. To test add the Kafka topic:

    /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --   replication-factor 1 --partitions 1 --topic test
    
  10. List existing topics:

     /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
    
  11. Generate test messages

    /opt/kafka/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
       something
       somtehing more
    
  12. Read test messages

    /opt/kafka/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    

Enabling encryption in Kafka

Generate SSL key and certificate for each Kafka broker

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA

Configuring Host Name In Certificates

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

Verify content of the generated certificate:

keytool -list -v -keystore server.keystore.jks

Creating your own CA

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

Signing the certificate

keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

Import both the certificate of the CA and the signed certificate into the keystore

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

If you have trusted certificates, you must import them into the JKS keystore as follows:

Create a keystore:

keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey

Combine the certificate and key file into a certificate in p12 format:

openssl pkcs12 -export -in cert_name.crt -inkey key_name.key -out cert_name.p12 -name localhost -CAfile ca.crt -caname root

Import the CA certificate into a truststore:

keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

Import the CA certificate into a keystore:

keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

Import the p12 certificate into a keystore:

keytool -importkeystore -deststorepass MY-KEYSTORE-PASS -destkeystore client.keystore.jks -srckeystore cert_name.p12 -srcstoretype PKCS12

Configuring Kafka Brokers

In /etc/kafka/server.properties file set the following options:

listeners=PLAINTEXT://host.name:port,SSL://host.name:port

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

and restart the Kafka service

systemctl restart kafka

Configuring Kafka Clients

Logstash

Configure the output section in Logstash based on the following example:

output {
  kafka {
    bootstrap_servers => "host.name:port"
    security_protocol => "SSL"
    ssl_truststore_type => "JKS"
    ssl_truststore_location => "/var/private/ssl/client.truststore.jks"
    ssl_truststore_password => "test1234"
    client_id => "host.name"
    topic_id => "Topic-1"
    codec => json
  }
}

Configure the input section in Logstash based on the following example:

input {
  kafka {
    bootstrap_servers => "host.name:port"
    security_protocol => "SSL"
    ssl_truststore_type => "JKS"
    ssl_truststore_location => "/var/private/ssl/client.truststore.jks"
    ssl_truststore_password => "test1234"
    consumer_threads => 4
    topics => [ "Topic-1" ]
    codec => json
    tags => ["kafka"]
   }
}

Log retention for Kafka topic

The Kafka durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem.