William Liu

Kafka

Notes from: https://kafka.apache.org/documentation/

What is Kafka?

Apache Kafka is a distributed streaming platform.

A streaming platform can:

Why use Kafka?

Kafka is usually built for two classes of applications:

Kafka Concepts

Kafka Tools

Kafka MirrorMaker - Copy files Kafkacat - command line to test and debug; can produce, consume, list topics and partition information Kafka Connect - Allow import/export of data (e.g. write to S3 kafka messages in JSON)

Kafka Core APIs

Kafka has four core APIs.

Monitoring

For Kafka:

Configs

You can alter configs with the ./bin/kafka-configs.sh.

To alter the details of a topic (e.g. when to log rotate segments), you can run: ./bin/kafka-configs.sh --zookeeper zookeeper1:2181/kafka --alter --entity-type topics --entity-name events-1 --ad-config segments.ms=60000

To delete a config, pass in the --delete-config

Topics

The core abstraction that Kafka provides for a stream of records is the topic.

Example Topic:

Partition 0   0 1 2 3 4 5 6 7 8 9 10 11 12
Partition 1   0 1 2 3 4 5 6 7 8 9
Partition 2   0 1 2 3 4 5 6 7 8 9 19 11 12

Each partition is an ordered, immutable sequence of records that is continually appended to. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition. Offsets start at 0. Offsets keep increasing and are immutable. The Kafka cluster durably persists all published records, whether or not they have been consumed by using a configurable retention period (default: 7 days) Unless you specify a key, partitions are not evenly distributed; they’re randomly distributed.

Advanced Topics

Use kafka-topics.sh and kafka-configs.sh for Kafka tools

kafka-topics.sh

kakfa-configs.sh

Producers

Producers are clients, usually applications that create a stream of messages. Data is written to disk and replicated. Producers can wait for acks, which means it will wait for a number of acknowledgements before confirming the successful delivery of a message.

Producers can specify keys to indicated that a message will go to the same partition every time. Producers send messages to Kafka brokers, which is able to intelligently replicate the data and elect a leader.

Producer Tools

Create a dummy file base64 /dev/random | head -c 10000 | egrep -ao "\w" | tr -d '\n' > myrandomfile.txt For performance testing, you can run: ./bin/kafka-producer-perf-test.sh --topic mytopic --num-records 1000 --throughput 10 --payload-file myrandomfile.txt --producer-props ack=1 bootstrap.server=kafka1:9092,kafka2:9092:kafka3:9092 --payload delimiter A

Important Producer Configs

Some important producer configs are listed here: https://docs.confluent.io/current/installation/configuration/producer-configs.html

Idempotent Producers

There is a setting enable.idempotence for ensuring message safety (will reach destination) and for making sure there are no duplicates. Idempotency attaches a Produce Request ID in the acknowledgement and is able to detect that it is a duplicate.

Another setting that goes along with this is acks=all and retries is max. Some messages will not be retried (e.g. message too large)

Batch Compression

By default, Kafka will send messages as soon as it can. To improve efficency and throughput to the broker, batching with compression can be the quickest way to produce thousands of messages. When multiple records are send to the same partition, the producer can batch them together.

We can set the compression type config to:

Timestamps

When Producers create a message, there is a timestamp associated with each message. If you want data to be consumed in those messages, you’ll need to either 1.) create a timestamp or 2.) use the built-in timestamp

If you have to constantly retry messages, this can mess up ordering.

Serializers

We can create custom serializers or use a generic serializer like Avro Serializer, Thrift, Protobuf, or just String.

Consumers

Consumers read messages and keep track of the offset. When multiple messages are being read, a consumer group can be formed so that each consumer is reading a different message (i.e. no conflicts between reading many messages).

Integrity of Consumers

The consumer goes out to the broker using a polling event. The consumer gets messages, then keeps track of the offset to determine which message it read last. If you have too many threads going on at once, you’ll get a concurrent modification exception. To solve this, make sure there is only one consumer per thread (so we don’t read too many messages at once).

When writing your code for your consumer, make sure to close the connection after you shut down the consumer (otherwise Kafka will pick it up eventually when your consumer stops sending a heartbeat, but best practice is to close when done).

Committed Offset vs Committed Message

A committed offset is sent to Kafka by the consumer to acknowledge that it received AND processed all messages in the partition up to that offset.

A committed message is an acknowledgement sent by the leader replica, indicating to the broker that the message has been committed (based on the acknowledgement setting).

Offset Read vs Offset Processed

A consumer can start reading an offset, but never finish. This is a lost message. To completely process the message, the consumer must commit the message. Consumers may need to retry the records multiple times.

Important Consumer Configs

Some important consumer configs are:

Verify consumer offsets

Consumer offsets are stored in a topic called __consumer_offsets. You can verify consumer offsets are being commited by reading that topic like:

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --tpic __consumer_offsets --format 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter

To track a message, we need to know the topic, its leader broker, partition, and offset.

Handling Duplicate Messages on a Consumer

Since Kafka consumers poll for data, it allows control over how often we read messages and if we want to replay messages. Some configs to keep in mind:

So the solution is to create a unique string for your messages (say the record.topic() + record.partition() + record.offset()) and add this id to the index request in your consumer. Since we have this unique id, the message will only be read once and will elminate the duplicate.

Clusters and Brokers

A Kafka cluster is made up of many Kafka brokers. A Kafka broker is a server that receives messages from the producer, assigns offsets and stores the messages on disk. Brokers replicate data across brokers in order to create fault tolerance.

One broker is automatically selected as the controller, so this broker assigns the partitions for each broker as well as monitors other brokers for failures (so it can fail over).

You can set replication across brokers, meaning data is replicated across many brokers. With replication, each Topic and Partition has a Leader and N Replica brokers (N being your replication factor setting). For example:

Broker 1
Topic A - Partition 0 (Leader)
Topic A - Partition 2 (Replica)

Broker 2
Topic A - Partition 2 (Leader)
Topic A - Partition 1 (Replica)

Broker 3
Topic A - Partition 1 (Leader)
Topic A - Partition 0 (Replica)

Broker Details

Each broker has an ID and this is very important. Zookeeper uses the broker id to help recover information.

Zookeeper

Zookeeper helps keep consensus within a cluster, meaning all brokers know which broker is the Controller, the brokers are aware of each other, and what partition is is the Leader. Kafka requires Zookeeper in order to run. Since Zookeeper helps with Leader Election, we must have an odd number of Zookeeper servers to keep quorum.

In a production setting, you want multiple zookeepers to create an ensemble.

Zookeeper Shell

We can use kafka/bin/zookeeper-shell.sh to run zookeeper specific commands. To view ephemeral nodes in Zookeeper with kafka/bin/zookeeper-shell.sh zookeeper1:2181/kafka ls get /controller

zookeeper.out logs has your zookeeper errors (e.g. say zookeeper is not running)

Install Kafka and Zookeeper

You can either install kafka and zookeeper using binaries or with containers. Installing the binary takes more time, but allows you to run our programs as a service (e.g. sudo service zookeeper start, sudo service kafka start) by adding to /etc/init.d/kafka and /etc/init.d/zookeeper The below is with containers:

Add Docker to Your Package Repository

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

sudo add-apt-repository    "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"

Update Packages and Install Docker

sudo apt update

sudo apt install -y docker-ce=18.06.1~ce~3-0~ubuntu

Add Your User to the Docker Group

sudo usermod -a -G docker cloud_user

Install Docker Compose

sudo -i
curl -L https://github.com/docker/compose/releases/download/1.24.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose

Install Java

sudo apt install -y default-jdk

Get the Kafka Binaries

wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -xvf kafka_2.12-2.2.0.tgz

Create Your First Topic

./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 1

Describe the Topic

./bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

Important Locations

./bin is where the executables are (e.g. ./bin/kafka-topics.sh) ./config is where the configurations are

Kafka Bin Commands

There are a few Kafka commands and you can see the available options by not typing in any additional params or arguments. For example ./bin/kafka-topics.sh gives you:

./kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,
                                           replica assignment, and/or
                                           configuration for the topic.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered.The
                                           following is a list of valid
                                           configurations:
                                             cleanup.policy
                                             compression.type
                                             delete.retention.ms
                                             file.delete.delay.ms
                                             flush.messages
                                             flush.ms
                                             follower.replication.throttled.
                                           replicas
                                             index.interval.bytes
                                             leader.replication.throttled.replicas
                                             max.message.bytes
                                             message.format.version
                                             message.timestamp.difference.max.ms
                                             message.timestamp.type
                                             min.cleanable.dirty.ratio
                                             min.compaction.lag.ms
                                             min.insync.replicas
                                             preallocate
                                             retention.bytes
                                             retention.ms
                                             segment.bytes
                                             segment.index.bytes
                                             segment.jitter.ms
                                             segment.ms
                                             unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option).
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting
                                           topics, the action will only execute
                                           if the topic exists
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or
                                           describe. Can also accept a regular
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--zookeeper <String: hosts>              REQUIRED: The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple hosts can be
                                           given to allow fail-over.

Things to note:

Consumer Groups

Consumer Groups allow you multiple consumers to analyze messages in a topic in parallel without overlap. For example, say you’re producing messages to mytopic. You can then run two consumers (consumer1 and consumer2) to read from mytopic and specify the same consumer group myconsumergroup. When a message is produced, the messages are split across the consumers (i.e. some messages go to consumer1 and some messages go to consumer2`) without any overlap. We can add and remove consumers as needed.

You can also create a new consumer group to read messages from a topic (e.g. bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --group application1 --from-beginning)

When you describe a consumer group, you’ll see columns like:

kafka-consumer-groups

Use the kafka-consumer-groups command to change the configuration of our brokers and topics while the cluster is up and running.

Documentation is here: https://kafka.apache.org/documentation/#basic_ops_consumer_group

Commands include:

Offset reset options

--reset-offsets has following scenarios to choose from (atleast one scenario must be selected):

--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.

Replicas

Replicas are important in Kafka so that we can achieve fault tolerance. If one broker goes down, we want to still ensure that there is zero data loss. Replicas are based off of Topic AND Partition. There are two different types of replicas:

The replicas also have offsets.

So what do these replicas do?

Replicas - In Sync vs Out of Sync

If the replica is not able to keep up (and the check is every 10 seconds), then the replica is labeled as ‘out of sync’ instead of ‘in sync’. In sync replicas can take over leader if the current leader goes down. Another way that a replica may become ‘out of sync’ is if it cannot communicate with Zookeeper every 6 seconds.

Requests

There are a few different types of requests, which can come from either a producer or a consumer. These types are:

Request Process

All of the above requests go through this type of process:

Example Request

So how does this all work?

Say we have a produce request that gets written to the leader broker, but that data hasn’t been replicated to our replicas yet. If we make a fetch request and our replicas are not in-sync yet, even if we send the request to the leader broker, the data is not yet available. The reasoning is that if the leader were to fail, our data has not been replicated across yet so we do not have any data loss.

Requests and Acks

If acks are set to all, then data is written to a buffer called purgatory till the data is written to all brokers, then it’s available to all consumer clients.

Kafka is different than other databases in that it uses a zero copy method to send messages to the clients (making it extra efficient). It skips the network channel directly to the client instead of any intermediate buffers (like saving to a local cache first). When making a request from a client, you can specify the amount of data (in case you request too much, it’ll try to store to memory so it might crash).

Partitions

Partitions are part of a topic. Partitions are split out amongst brokers to create resiliency and makes it easier to consume/subscribe to multiple partitions. A partition can only be appended to (can only add, not delete).

Partitions are stored based on the server.properties under the log.dirs setting (e.g. /data/kafka). In each, you will see something like:

To view logs, you can run ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /data/kafka/events-1/00000000003064504069.log You will see the actual payload (i.e. what the message said) and its baseOffset

The higher number of partitions, the higher the end-to-end latency.

Reassign Partitions

We can reassign partitions or change the replication factor for a partition and topics.

With ./bin/kafka-reassign-partitions.sh --zookeeper1:2181/kafka --generate --topics-to-move-json-file topics.json --broker-list 3,2,1

With a JSON file:

topics.json file
{"topics":
 [{"topic": "test"}],
 "version":1
}

Partition Reassignment Failure fix w/ Zookeeper

If your broker fails during partition reassignment, you can try deleting the reassigned partitions OR by deleting the controller node

bin/zookeeper-shell.sh --zookeeper1:2181/kafka

ls /  # to view nodes on Zookeeper
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

ls /admin
[delete_topics]

delete /admin/reassign_partitions

delete /controller

Partition Rebalancing

Consumers in a Consumer Group share ownership of the topics and partitions that they subscribe to. When a new consumer is added to the consumer group, it starts consuming messages previously consumed by other consumers. Same thing happens when a consumer dies; other consumers pick up the work.

Moving partition ownership from one consumer to another consumer is called partition rebalancing. During a rebalance, the entire consumer group will not be consuming messages. The consumer sends a heartbeat when it polls for data and when it commits offsets. If there isn’t a heartbeat within a certain timeframe (as specificed by your configs), the session will time out, then the coordinator will trigger a rebalance.

Partition Assignment

Kafka has two built-in consumer to partition assignment strategies; assign or subscribe. The first consumer to join a consumer group becomes the consumer group leader. The consumer group leader sends partition assignments to the consumer group coordinator, which sends assignments out to all consumers. Each consumer only has their list of partition assignments while the consumer group leader has all partition assignments. This process repeats every time a partition rebalance happens.

partition.assignment.strategy is how you want to assign partitions to consumers. Options include:

To make sure there isn’t too many partition rebalances, consider writing a Consumer Rebalance Listener to monitor the number of partition rebalances.

Consumer Group Coordinator

Segments

When Kafka writes to a partition, it writes to a segment, specifically the active segment. Once the segment’s size limit has been reached, a new segment is opened and that becomes the new active segment. Segments are named by their base offset. The base offset of a segment is an offset greater than offsets in previous segments and less than or equal to offsets in that segment.

So what is a segment? On disk, a partition is a directory and each segment is made up of two files: an index file and a log file. A partition events-1 with segments 00000000003064504069 and 00000000003065011416 looks like:

tree kafka | head -n 6
kafka
├── events-1
│ ├── 00000000003064504069.index
│ ├── 00000000003064504069.log
│ ├── 00000000003065011416.index
│ ├── 00000000003065011416.log

Segment logs are where messages are stored.

Has two fields: 1. 4 Bytes: Relative Offset to the Base Offset 2. 4 Bytes: Physical Position of the related Log Message (base offset + relative offset so we can get O(1) lookup time) The configuration .index.interval.bytes (4096 bytes by default) sets and index interval that describes how frequently (after how many bytes) an index entry will be added

Data Delivery

Kafka is really concerned about reliability. Depending on the application, you can accidentally lose messages or duplicate messages, causing issues.

As a Kafka administrator, you can use the following ack tradeoffs on speed vs reliability:

This can be set in a producer like:

./bin/kafka-console-producer.sh --broker kafka1:9092 --topic test --producer-property acks=0

Security

We want secure connections between the producers, consumers, brokers, and zookeepers.

This will give you access so that certain users can change certain topics or brokers. All this is disabled by default. There is minor performance degradation if SSL is activated.

Schemas

You can use any data type in your Kafka clusters, but you may never know what consumers are trying to subscribe to the messages so it’s common to use a tool like Avro to make sure that the consumers don’t miss any data within the messages themselves.

We can setup Avro to manage schemas over the life of the Kafka cluster. The producers serialize the data into Avro and the consumers deserialize the data.

Schema Registry with Kafka Connect

Schema Registry is a component of the Confluent Platform.

The idea is to enable client applications to read and write Avro data, check compatibility as schemas evolve. Basically, Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retreiving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compability setting. It provides serializers that plug into Kafka clients that handle schemas storage and retrieval for Kafka messages that are sent in the Avro format.

https://github.com/confluentinc/schema-registry

Kafka Connect

Kafka Connect is an API that comes with Kafka. It’s a tool with already built connectors for many different data sources, letting you get data in and out of a cluster quickly. Since Kafka Connect exposes a REST API, this works well with other data sources. Kafka Connect works in standalone mode and in distributed mode.

An example would be if you have a MySQL table, you can stream that data into Kafka using Kafka Connect. You install Kafka Connect plugin on the broker, then use the REST API to connect to the cluster.

Kafka Connect Configs

You have the following important files:

Important configs in connect-standalone or connect-distributed modes:

Important configs in file-sink or file-source properties:

Run with: ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Storage

Depending on the configuration, your message logs are stored in a directory (for ours, we chose /data/kafka). Finding and maintaining these logs is handled by Kafka. As we make more messages, you can see the dir filling up (du -h)

Log Compaction

By default, messages in a topic are kept for a week. A topic can be created with log compaction, meaning the messages are compacted, keeping only the most recent key (and everything has to be key-value pairs).

The config is --config cleanup.policy=compact

Storage Structures

Stream Processing

Streams are a sequence of events. There are no delete or update abilities because streams are immutable. Streams are much different than a database tables. Events are ordered, immutable, and replayable. Events can be structured or unstructured. Stream Processing is for when response times should be ms (e.g. credit card is valid or not) - this is real time data processing.

Operations perform in a time window. There are three times of time:

You want events to be on a single time zone (especially for Event Time).

State

Sometimes you’ll want to capture more than just a single event. This might be because:

The solution is to store the state.

State is the information stored between events. There are two types of state:

Design Patterns

We have the following design patterns that depend on what you’re trying to do with the data and the sequence of events.

Single Event Processing

Local State Processing

Multiphase Processing

External Processing

Windowed Join

Out of Sequence Events

Multi-Cluster Architectures

We have the following multi-cluster architectures:

MirrorMaker

MirrorMaker is used to replicate data between two datacenters. MirrorMaker is a collection of consumers in a consumer group. MirrorMaker runs a thread for each consumer. MirrorMaker creates a consumer for each topic. Run MirrorMaker on the destination cluster (not the source).

Run with bin/kafka-mirror-maker.sh MirrorMaker uses a shared producer to send events to the new cluster.

Security

Since Kafka is used to store data, we need to think about security features. We’ll break things down into Encryption, Client Authentication, and Authorization using ACLs.

A few of the things we want include:

So what does this look like?

Zookeeper

You can initialize server certificates and trusted client certificates, then specify them in the config here:

zookeeper.ssl.keyStore.location="/path/to/your/keystore"
zookeeper.ssl.keyStore.password="keystore_password"
zookeeper.ssl.trustStore.location="/path/to/your/truststore"
zookeeper.ssl.trustStore.password="truststore_password"

You would also need to setup a secure Client Port

secureClientPort=2281

Kafka

With Kafka, we want server and client side keystores and truststores. Here’s an example of testing out SSL with a client side configuration that can be used in say a console producer or consumer.

We want to set our Kafka brokers with security.protocol=SSL and specify a server truststore. After our Kafka brokers are setup, we then want to switch our client applications to use a client truststore (e.g. in producers, consumers). In general (with the below example as a client truststore), it might look like:

ssl.truststore.location=/tmp/kafka.client.truststore.jks

To create this truststore, I’m using a docker kafka image (wurstmeister/kafka:2.11-1.1.1), which has the following files: /usr/lib/jvm/java-1.8-openjdk/jre/lib/security/cacerts, that is actually just a symlink to /etc/ssl/certs/java/cacerts Create your truststore with cp /usr/lib/jvm/java-1.8-openjdk/jre/lib/security /tmp/kafka.client.truststore.jks

Now set that into your kafka client config:

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

Keystore and Truststore (and cacerts)

We use a keystore and a truststore when an application needs to communicate over SSL/TLS. Usually these are password-protected files that sit on the same file system as our running application. For java, the default format is JKS until Java 8, then PKCS12 since Java 8.

A keystore has private keys and the certificates with their corresponding public keys. Keystores hold keys that our application owns that we can use to prove the integrity of a message and the authenticity of the sender. We use a keystore when we are a server and want to use HTTPS. Every Java Runtime Environment (JRE) has its own keystore.

During an SSL handshake, the server looks up the private key from the keystore and presents its corresponding public key and certificate to the client. Also, if the client needs to authenticate itself (aka mutual authentication) then the client also has a keystore and also presents its public key and certificate.

A truststore is like the opposite of a keystore. A keystore holds onto certificates that identify us while a truststore holds onto certificates that identify others. We use a truststore to trust the third party we’re about to communicate with.

So basically, a truststore is used to authenticate peers. If you’re the client, the server is the peer. If you’re the server, the client is the peer. If you’re the server, or if you’re the client and the server requests client authentication, then you have to authenticate yourself to the peer (which is why you need your own certificate and prviate key), which are in the keystore.

cacerts are a trustore where Java stores public certificates of root CAs. This sits in the $JAVA_HOME/jre/lib/security dir.

TLDR; clients and servers both need their own keystore and truststore for certain authentication methods like mutual authentication.

The summary is we’ll have a CA that creates a keystore (e.g. kafka.server.keystore.jks), that then creates a key pair and cert, which is used to sign other certificates. We add the generated CA to clients’ truststore (e.g. kafka.client.truststore.jks) and to brokers’ truststore (e.g. kafka.server.truststore.jks), basically telling the clients and brokers to trust this CA. We then sign all certificates in the keystore with the CA that we generated.

SSL Keys and Certificates on Kafka

https://docs.confluent.io/current/security/security_tutorial.html

Each machine in the cluster has a public-private key pair, and a certificate to identify the machine. We want to sign the certificate for each machine in the cluster using a certificate authority (CA). Think of the CA like a government that issues out passports (signed certificates) that are difficult to forge.

The keystore stores each machine’s own identity. The truststore stores all the certificates that the machine should trust.

For Kafka, this means that you can sign all certificates in the cluster with a single CA, and then have all machines share the same truststore that trusts the CA. This way all machines can authenticate all other machines.

Definitions

If host name verification is enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields, the Common Name (CN) or the Subject Alternative Name (SAN)

Steps for deploying SSL

To deploy SSL, we want to:

  1. Generate the keys and certificates
  2. Create your own Certificate Authority (CA)
  3. Sign the certificate

Step 0 - Info about Java’s Keytool utility

We’ll need keytool, a key and certificate management utility. This tool allows users to administer their own public/private key pairs and associated certificates for use in self-authentication (where the user authenticates himself/herself to other uesrs/services) or data integrity and authentication services using digital signatures.

Keytool stores the keys and certificates in a keystore (with the default implementation being a file). Example to create a keystore:

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

Step 1 - Generate Keys and Certificates

We use Java’s keytool utility to generate the key and the certificate for each Kafka broker in the cluster. We generate the key into a keystore called kafka.server.keystore so that we can then export and sign it later with a CA. The keystore file has the private key of the certificate so it needs to be kept safely.

# With user prompts
keytool -keystore kafka.server.keystore.jks -alias localhost -genkey

Enter keystore password:
Re-enter new password:
What is your first and last name?
...

We then have a kafka.server.keystore.jks file. Remember, this is a keystore file containing the private key of the certificate (keep it away from others like kafka.client.truststore.jks and kafka.server.truststore.jks)

Step 2 - Generate our own Certificate Authority (CA)

First let’s generate a CA that is a public-private key pair and certificate. It is intended to sign other certificates.

openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}

# -keyout file : means the file to write the newly created private key to
# -out file : the DER-encoded output file
# -days arg : the number of days to certify the certificate for

We’ll be prompted to Enter PEM pass phrase along with:

openssl req -new -x509 -keyout ca-key -out ca-cert -days 33
Generating a 2048 bit RSA private key
...............................+++
.................................................................+++
writing new private key to 'ca-key'
Enter PEM pass phrase:
Verifying - Enter PEM pass phrase:
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) []:US
...

Out of the above command, we’ll have a key (ca-key) and a cert (ca-cert).

Step 3 - Sign the Certificate

Now we want to sign all the certificates in the keystore with the CA that we generated.

Note: if you need client keystores signed as well, then repeat this for clients as well. Example:

keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed

Debugging:

To check if kafka.server.keystore.jks is setup correctly, run:

openssl s_client -debug -connect localhost:9093 -tls1