Kafka¶
Kafka allows you to distribute the load between nodes receiving data and encrypts communication.
Architecture example:
The Kafka installation¶
To install the Kafka, follow the steps below:
Java installation
yum install java-11-openjdk-headless.x86_64
Create users for Kafka and Zookeeper
useradd -r -u 30000 -M kafka useradd -r -u 30001 -M -G kafka zookeeper
Download the installation package::
https://kafka.apache.org/downloads
and
https://zookeeper.apache.org/releases.html
wget https://ftp.ps.pl/pub/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz wget https://ftp.ps.pl/pub/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz tar -xzvf apache-zookeeper-3.6.2-bin.tar.gz -C /opt/ tar -xzvf ./kafka_2.13-2.7.0.tgz -C /opt/ ln -s /opt/apache-zookeeper-3.6.2-bin /opt/zookeeper ln -s /opt/kafka_2.13-2.7.0 /opt/kafka mkdir /var/lib/{zookeeper,kafka} mkdir /var/log/kafka
Set the necessary permissions
chown -R zookeeper: /opt/apache-zookeeper-3.6.2-bin /var/lib/zookeeper chown -R kafka: /opt/kafka_2.13-2.7.0 /var/lib/kafka chown -h zookeeper: /opt/zookeeper chown -h kafka: /opt/kafka
Edit configs and set the data and log directory:
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg vim /opt/zookeeper/conf/zoo.cfg and change to dataDir=/var/lib/zookeeper vim /opt/kafka/config/server.properties and chnage to log.dirs=/var/lib/kafka
Set the necessary firewall rules:
firewall-cmd --permanent --add-port=2181/tcp firewall-cmd --permanent --add-port=2888/tcp firewall-cmd --permanent --add-port=3888/tcp firewall-cmd --permanent --add-port=9092/tcp firewall-cmd --reload
Create service files:
vim /usr/lib/systemd/system/zookeeper.service
[Unit] Description=Apache zookeeper service Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=forking User=zookeeper Group=zookeeper ExecStart=/opt/zookeeper/bin/zkServer.sh start ExecStop=/opt/zookeeper/bin/zkServer.sh stop ExecReload=/opt/zookeeper/bin/zkServer.sh restart WorkingDirectory=/var/lib/zookeeper [Install] WantedBy=multi-user.target
vim create /usr/lib/systemd/system/kafka.service
[Unit] Description=Apache Kafka Server Documentation=http://kafka.apache.org/documentation.html Requires=network.target remote-fs.target zookeeper.service After=network.target remote-fs.target zookeeper.service [Service] Type=simple User=kafka Group=kafka Environment="LOG_DIR=/var/log/kafka" ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target
Reload
systemctl
daemon and the Kafka services:systemctl daemon-reload systemctl enable zookeeper kafka systemctl start zookeeper kafka
To test add the Kafka topic:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 -- replication-factor 1 --partitions 1 --topic test
List existing topics:
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
Generate test messages
/opt/kafka/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092 something somtehing more
Read test messages
/opt/kafka/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Enabling encryption in Kafka¶
Generate SSL key and certificate for each Kafka broker
keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA
Configuring Host Name In Certificates
keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}
Verify content of the generated certificate:
keytool -list -v -keystore server.keystore.jks
Creating your own CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
Signing the certificate
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
Import both the certificate of the CA and the signed certificate into the keystore
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
If you have trusted certificates, you must import them into the JKS keystore as follows:
Create a keystore:
keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
Combine the certificate and key file into a certificate in p12 format:
openssl pkcs12 -export -in cert_name.crt -inkey key_name.key -out cert_name.p12 -name localhost -CAfile ca.crt -caname root
Import the CA certificate into a truststore:
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
Import the CA certificate into a keystore:
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
Import the p12 certificate into a keystore:
keytool -importkeystore -deststorepass MY-KEYSTORE-PASS -destkeystore client.keystore.jks -srckeystore cert_name.p12 -srcstoretype PKCS12
Configuring Kafka Brokers¶
In /etc/kafka/server.properties
file set the following options:
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234
and restart the Kafka service
systemctl restart kafka
Configuring Kafka Clients¶
Logstash
Configure the output section in Logstash based on the following example:
output {
kafka {
bootstrap_servers => "host.name:port"
security_protocol => "SSL"
ssl_truststore_type => "JKS"
ssl_truststore_location => "/var/private/ssl/client.truststore.jks"
ssl_truststore_password => "test1234"
client_id => "host.name"
topic_id => "Topic-1"
codec => json
}
}
Configure the input section in Logstash based on the following example:
input {
kafka {
bootstrap_servers => "host.name:port"
security_protocol => "SSL"
ssl_truststore_type => "JKS"
ssl_truststore_location => "/var/private/ssl/client.truststore.jks"
ssl_truststore_password => "test1234"
consumer_threads => 4
topics => [ "Topic-1" ]
codec => json
tags => ["kafka"]
}
}
Log retention for Kafka topic¶
The Kafka durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem.