Kafka

Kafka allows you to distribute the load between nodes receiving data and encrypts communication.

Architecture example:

../_images/image124.PNG

The Kafka installation

To install the Kafka, follow the steps below:

  1. Java installation

    yum install java-11-openjdk-headless.x86_64
    
  2. Create users for Kafka

    useradd kafka -m -d /opt/kafka -s /sbin/nologin
    
  3. Download the installation package::

    https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
    
  4. Unpack installation files to /opt/kafka directory:

    tar -xzvf kafka_2.13-2.7.0.tgz -C /opt/
    mv /opt/kafka_2.13-2.7.0 /opt/kafka
    
  5. Set the necessary permissions

    chown -R kafka:kafka /opt/kafka
    
  6. Edit configs and set the data and log directory:

    vim  /opt/kafka/config/server.properties
    
    log.dirs=/tmp/kafka-logs
    
  7. Set the necessary firewall rules:

    firewall-cmd --permanent --add-port=2181/tcp
    firewall-cmd --permanent --add-port=2888/tcp
    firewall-cmd --permanent --add-port=3888/tcp
    firewall-cmd --permanent --add-port=9092/tcp
    firewall-cmd --reload
    
  8. Create service files:

    vim /usr/lib/systemd/system/zookeeper.service

    [Unit]
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target
    
    [Service]
    Type=simple
    User=kafka
    ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
    ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
    Restart=on-abnormal
    
    [Install]
    WantedBy=multi-user.target
    

    vim create /usr/lib/systemd/system/kafka.service

    [Unit]
    Requires=zookeeper.service
    After=zookeeper.service
    
    [Service]
    Type=simple
    User=kafka
    ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
    ExecStop=/opt/kafka/bin/kafka-server-stop.sh
    Restart=on-abnormal
    
    [Install]
    WantedBy=multi-user.target
    
  9. Reload systemctl daemon and the Kafka services:

    systemctl daemon-reload
    systemctl enable zookeeper kafka
    systemctl start zookeeper kafka
    
  10. To test add the Kafka topic:

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test
    
  11. List existing topics:

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
    
  12. Generate test messages

    /opt/kafka/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
       message 1
       message 2
       ...
    
  13. Read test messages

    /opt/kafka/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    

Enabling encryption in Kafka

  1. Generate server keystore with certificate pair.

    Complete:

    • Certificate validity period;
    • The name of the alias;
    • The FQDN of the server;
    • Server IP;
    keytool -keystore server.keystore.jks -alias {alias_name} -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN},IP:{server_IP}
    
  2. Creating your own CA

    openssl req -new -x509 -keyout rootCA.key -out rootCA.crt -days 365
    
  3. Import CA to server keystore and client keystore:

    keytool -keystore server.truststore.jks -alias CARoot -import -file rootCA.crt
    keytool -keystore client.truststore.jks -alias CARoot -import -file rootCA.crt
    
  4. Create a certificate signing request:

    Complete:

    • The name of the alias;
    • The FQDN of the server;
    • Server IP;
    keytool -keystore server.keystore.jks -alias {alias_name} -certreq -file cert-file -ext SAN=DNS:{FQDN},IP:{server_IP}
    
  5. Sing in certificate

    Complete:

    • The name of the alias;
    • The FQDN of the server;
    • Server IP;
    • Password
    openssl x509 -req -extfile <(printf"subjectAltName = DNS:{FQDN},IP:{server_IP}") -CA rootCA.crt -CAkey rootCA.key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:{password}
    
  6. Import rootCA and cert-signed to server keystore

    keytool -keystore server.keystore.jks -alias CARoot -import -file rootCA.crt
    keytool -keystore server.keystore.jks -alias els710 -import -file cert-signed
    
  7. If you have trusted certificates, you must import them into the JKS keystore as follows:

    Create a keystore:

    Complete:

    • Certificate validity period;
    • The name of the alias;
    • The FQDN of the server;
    • Server IP;
    keytool -keystore client.keystore.jks -alias {alias_name} -validity {validity} -keyalg RSA -genkey
    
  8. Combine the certificate and key file into a certificate in p12 format:

    Complete:

    • your cert name;
    • your key name;
    • friendly name;
    • CA cert file;
    openssl pkcs12 -export -in {your_cert_name} -inkey {your_key_name} -out {your_pair_name}.p12 -name {friendly_name} -CAfile ca.crt -caname root
    
  9. Import the CA certificate into a truststore:

    Complete:

    • CA cert file;
    keytool -keystore client.truststore.jks -alias CARoot -import -file {CAfile}
    
  10. Import the CA certificate into a keystore:

    Complete:

    • CA cert file.
    keytool -keystore client.keystore.jks -alias CARoot -import -file {CAfile}
    
  11. Import the p12 certificate into a keystore:

    Complete:

    • Your p12 pair;
    • Keystore password;
    keytool -importkeystore -deststorepass {keystore_password} -destkeystore client.keystore.jks -srckeystore {your_pair_name}.p12 -srcstoretype PKCS12
    

Configuring Kafka Brokers

  1. In /etc/kafka/server.properties file set the following options:

    Complete:

    • Path to server keystore;
    • Keystore password;
    • Password for certificate key;
    • Path to server truststore;
    • Truststore password.
    listeners=PLAINTEXT://localhost:9092,SSL://{FQDN}:9093
    ssl.keystore.location={path_to_server_keystore}/server.keystore.jks
    ssl.keystore.password={keysotre_passowrd}
    ssl.key.password={key_password}
    ssl.truststore.location={path_to_server_truststore}/server.truststore.jks
    ssl.truststore.password={truststore_passowrd}
    ssl.enabled.protocols=TLSv1.2
    ssl.client.auth=required
    security.inter.broker.protocol=SSL
    
  2. Restart the Kafka service

systemctl restart kafka

Configuring Kafka Clients

  1. Configure the output section in Logstash based on the following example:

    Complete:

    • Server FQDN;
    • Path to client truststore;
    • Truststore password.
    output {
      kafka {
        bootstrap_servers => "{FQDN}:9093"
        security_protocol => "SSL"
        ssl_truststore_type => "JKS"
        ssl_truststore_location => "{path_to_client_truststore}/client.truststore.jks"
        ssl_truststore_password => "{password_to_client_truststore}"
        client_id => "host.name"
        topic_id => "Topic-1"
        codec => json
      }
    }
    
  2. Configure the input section in Logstash based on the following example:

    Complete:

    • Server FQDN;
    • Path to client truststore;
    • Truststore password.
    input {
      kafka {
        bootstrap_servers => "{}:port"
        security_protocol => "SSL"
        ssl_truststore_type => "JKS"
        ssl_truststore_location => "{path_to_client_truststore}/client.truststore.jks"
        ssl_truststore_password => "{password_to_client_truststore}"
        consumer_threads => 4
        topics => [ "Topic-1" ]
        codec => json
        tags => ["kafka"]
       }
    }
    

Log retention for Kafka topic

The Kafka durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem.