Notes from: https://kafka.apache.org/documentation/
Apache Kafka is a distributed streaming platform.
A streaming platform can:
Kafka is usually built for two classes of applications:
Kafka MirrorMaker - Copy files Kafkacat - command line to test and debug; can produce, consume, list topics and partition information Kafka Connect - Allow import/export of data (e.g. write to S3 kafka messages in JSON)
Kafka has four core APIs.
For Kafka:
You can alter configs with the ./bin/kafka-configs.sh
.
To alter the details of a topic (e.g. when to log rotate segments), you can run:
./bin/kafka-configs.sh --zookeeper zookeeper1:2181/kafka --alter --entity-type topics --entity-name events-1 --ad-config segments.ms=60000
To delete a config, pass in the --delete-config
The core abstraction that Kafka provides for a stream of records is the topic.
Example Topic:
Partition 0 0 1 2 3 4 5 6 7 8 9 10 11 12
Partition 1 0 1 2 3 4 5 6 7 8 9
Partition 2 0 1 2 3 4 5 6 7 8 9 19 11 12
Each partition is an ordered, immutable sequence of records that is continually appended to. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition. Offsets start at 0. Offsets keep increasing and are immutable. The Kafka cluster durably persists all published records, whether or not they have been consumed by using a configurable retention period (default: 7 days) Unless you specify a key, partitions are not evenly distributed; they’re randomly distributed.
Use kafka-topics.sh
and kafka-configs.sh
for Kafka tools
kafka-topics.sh
__consumer_offsets
is an automatically generated topic holding all of our consumer offsetsif-not-exists
--topics-with-overrides
--under-replicated-partitions
--unavailable-partitions
kakfa-configs.sh
--add-config retention.ms=3600000
--entity-type brokers --entity-name 0 --describe
--add-config
--alter --delete-config
Producers are clients, usually applications that create a stream of messages. Data is written to disk and replicated. Producers can wait for acks, which means it will wait for a number of acknowledgements before confirming the successful delivery of a message.
Producers can specify keys to indicated that a message will go to the same partition every time. Producers send messages to Kafka brokers, which is able to intelligently replicate the data and elect a leader.
Create a dummy file base64 /dev/random | head -c 10000 | egrep -ao "\w" | tr -d '\n' > myrandomfile.txt
For performance testing, you can run: ./bin/kafka-producer-perf-test.sh --topic mytopic --num-records 1000 --throughput 10 --payload-file myrandomfile.txt --producer-props ack=1 bootstrap.server=kafka1:9092,kafka2:9092:kafka3:9092 --payload delimiter A
Some important producer configs are listed here: https://docs.confluent.io/current/installation/configuration/producer-configs.html
acks
- the number of acknowledgements the producer needs the leader to send back before request is completed
acks=0
(producer does not wait for any acknowledgement, will not retry)
acks=1
(leader will write the record to its local log, but will respond without awaiting full acknowledgement from all followers.
if the leader fails immediately after acknowledging the record, but before followers replicated it, then record is lost)
acks=all
(leader will wait for the full set of in-sync replicas to acknowledge the record. this guarantees that the record
will not be lost as long as at least one in-sync replica remains alive. Strongest available guarantee
acks=-1
(same as acks=all
)max_in_flight_requests_per_connection
controls how many messages to send to a server without getting a response.
A high number here can improve throughput, but a number being too high can decrease throughput. 5
is good to start.max.block.ms
when your producer is sending too many messages for the broker to keep up, the producer buffer memory
may fill up.There is a setting enable.idempotence
for ensuring message safety (will reach destination) and for making sure
there are no duplicates. Idempotency attaches a Produce Request ID in the acknowledgement and is able to detect
that it is a duplicate.
Another setting that goes along with this is acks=all
and retries
is max. Some messages will not be retried
(e.g. message too large)
By default, Kafka will send messages as soon as it can. To improve efficency and throughput to the broker, batching with compression can be the quickest way to produce thousands of messages. When multiple records are send to the same partition, the producer can batch them together.
We can set the compression type config to:
When Producers create a message, there is a timestamp associated with each message. If you want data to be consumed in those messages, you’ll need to either 1.) create a timestamp or 2.) use the built-in timestamp
If you have to constantly retry messages, this can mess up ordering.
We can create custom serializers or use a generic serializer like Avro Serializer, Thrift, Protobuf, or just String.
Consumers read messages and keep track of the offset. When multiple messages are being read, a consumer group can be formed so that each consumer is reading a different message (i.e. no conflicts between reading many messages).
The consumer goes out to the broker using a polling event. The consumer gets messages, then keeps track of the offset to determine which message it read last. If you have too many threads going on at once, you’ll get a concurrent modification exception. To solve this, make sure there is only one consumer per thread (so we don’t read too many messages at once).
When writing your code for your consumer, make sure to close the connection after you shut down the consumer (otherwise Kafka will pick it up eventually when your consumer stops sending a heartbeat, but best practice is to close when done).
A committed offset is sent to Kafka by the consumer to acknowledge that it received AND processed all messages in the partition up to that offset.
A committed message is an acknowledgement sent by the leader replica, indicating to the broker that the message has been committed (based on the acknowledgement setting).
A consumer can start reading an offset, but never finish. This is a lost message. To completely process the message, the consumer must commit the message. Consumers may need to retry the records multiple times.
Some important consumer configs are:
enable.auto.commit
is set to true, then how often should we commit (in ms)Consumer offsets are stored in a topic called __consumer_offsets
. You can verify consumer offsets
are being commited by reading that topic like:
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --tpic __consumer_offsets --format 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter
To track a message, we need to know the topic, its leader broker, partition, and offset.
Since Kafka consumers poll for data, it allows control over how often we read messages and if we want to replay messages. Some configs to keep in mind:
fetch.min.bytes
, which is the minimum batch size of the request from the consumer (default 1 byte)fetch.max.bytes
, which is the maximum batch size of the request from the consumer (default 50 MB)max.poll.records
the maximum number of messages received by the consumer when pollingmax.partition.fetch.bytes
is the maximum amount of data the broker will return to the consumer (default 1 MB)enable.auto.commit=true
- if you want automatic offsetsenable.auto.commit=false
- if you want to manually set offset in codeSo the solution is to create a unique string for your messages (say the record.topic() + record.partition() + record.offset()
)
and add this id to the index request in your consumer. Since we have this unique id, the message will only be read
once and will elminate the duplicate.
A Kafka cluster is made up of many Kafka brokers. A Kafka broker is a server that receives messages from the producer, assigns offsets and stores the messages on disk. Brokers replicate data across brokers in order to create fault tolerance.
One broker is automatically selected as the controller, so this broker assigns the partitions for each broker as well as monitors other brokers for failures (so it can fail over).
You can set replication across brokers, meaning data is replicated across many brokers. With replication,
each Topic and Partition has a Leader and N
Replica brokers (N
being your replication factor setting).
For example:
Broker 1
Topic A - Partition 0 (Leader)
Topic A - Partition 2 (Replica)
Broker 2
Topic A - Partition 2 (Leader)
Topic A - Partition 1 (Replica)
Broker 3
Topic A - Partition 1 (Leader)
Topic A - Partition 0 (Replica)
Each broker has an ID and this is very important. Zookeeper uses the broker id to help recover information.
kafka/config/server.properties
kafka/logs/server.log
- Errors with the server itself (e.g. kafka does not come up, broker not respond)kafka/data/kafka
kafka/logs/controller.log
tells us which broker is the controller (e.g. Broker 2 is the Controller)state-change.log
shows us the changes in a controller (e.g. if a partition leader has been reassigned)
In production environments, you want a broker that is responding before it gets elected a leader. In your configs,
make sure to have unclean.leader.election.enable=false
Zookeeper helps keep consensus within a cluster, meaning all brokers know which broker is the Controller, the brokers are aware of each other, and what partition is is the Leader. Kafka requires Zookeeper in order to run. Since Zookeeper helps with Leader Election, we must have an odd number of Zookeeper servers to keep quorum.
In a production setting, you want multiple zookeepers to create an ensemble.
We can use kafka/bin/zookeeper-shell.sh
to run zookeeper specific commands.
To view ephemeral nodes in Zookeeper with kafka/bin/zookeeper-shell.sh zookeeper1:2181/kafka ls get /controller
zookeeper.out
logs has your zookeeper errors (e.g. say zookeeper is not running)
You can either install kafka and zookeeper using binaries or with containers.
Installing the binary takes more time, but allows you to run our programs as a service
(e.g. sudo service zookeeper start
, sudo service kafka start
) by adding to /etc/init.d/kafka
and /etc/init.d/zookeeper
The below is with containers:
Add Docker to Your Package Repository
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
Update Packages and Install Docker
sudo apt update
sudo apt install -y docker-ce=18.06.1~ce~3-0~ubuntu
Add Your User to the Docker Group
sudo usermod -a -G docker cloud_user
Install Docker Compose
sudo -i
curl -L https://github.com/docker/compose/releases/download/1.24.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
Install Java
sudo apt install -y default-jdk
Get the Kafka Binaries
wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -xvf kafka_2.12-2.2.0.tgz
Create Your First Topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 1
Describe the Topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
./bin
is where the executables are (e.g. ./bin/kafka-topics.sh
)
./config
is where the configurations are
There are a few Kafka commands and you can see the available options by not typing in any additional params or arguments.
For example ./bin/kafka-topics.sh
gives you:
./kafka-topics.sh
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option).
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting
topics, the action will only execute
if the topic exists
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <String: topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: hosts> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
Things to note:
--zookeeper
, you can specify one or all zookeepers (if all, use comma separated list)--producer-property acks=all
from-beginning
, a consumer will only read new messagesConsumer Groups allow you multiple consumers to analyze messages in a topic in parallel without overlap.
For example, say you’re producing messages to mytopic
. You can then run two consumers (consumer1
and consumer2)
to read from
mytopic and specify the same consumer group
myconsumergroup. When a message is produced, the messages
are split across the consumers (i.e. some messages go to
consumer1 and some messages go to
consumer2`) without
any overlap. We can add and remove consumers as needed.
You can also create a new consumer group to read messages from a topic (e.g.
bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --group application1 --from-beginning
)
When you describe a consumer group, you’ll see columns like:
Use the kafka-consumer-groups
command to change the configuration of our brokers and topics while the cluster
is up and running.
Documentation is here: https://kafka.apache.org/documentation/#basic_ops_consumer_group
Commands include:
bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --list
describe --group mygroup
bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group mygroup --members --verbose
--state
--delete --group mygroup
--reset-offsets --group mygroup --topic mytopic --to-latest
. You
can reset this for specific topics using --topic
or all topics using --all-topics
--reset-offsets
has following scenarios to choose from (atleast one scenario must be selected):
--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.
Replicas are important in Kafka so that we can achieve fault tolerance. If one broker goes down, we want to still ensure that there is zero data loss. Replicas are based off of Topic AND Partition. There are two different types of replicas:
The replicas also have offsets.
So what do these replicas do?
server.properties
under broker.rack=rack1
, etc)If the replica is not able to keep up (and the check is every 10 seconds), then the replica is labeled as ‘out of sync’ instead of ‘in sync’. In sync replicas can take over leader if the current leader goes down. Another way that a replica may become ‘out of sync’ is if it cannot communicate with Zookeeper every 6 seconds.
There are a few different types of requests, which can come from either a producer or a consumer. These types are:
All of the above requests go through this type of process:
So how does this all work?
Say we have a produce request that gets written to the leader broker, but that data hasn’t been replicated to our replicas yet. If we make a fetch request and our replicas are not in-sync yet, even if we send the request to the leader broker, the data is not yet available. The reasoning is that if the leader were to fail, our data has not been replicated across yet so we do not have any data loss.
If acks
are set to all, then data is written to a buffer called purgatory till the data is written to all
brokers, then it’s available to all consumer clients.
Kafka is different than other databases in that it uses a zero copy method to send messages to the clients (making it extra efficient). It skips the network channel directly to the client instead of any intermediate buffers (like saving to a local cache first). When making a request from a client, you can specify the amount of data (in case you request too much, it’ll try to store to memory so it might crash).
Partitions are part of a topic. Partitions are split out amongst brokers to create resiliency and makes it easier to consume/subscribe to multiple partitions. A partition can only be appended to (can only add, not delete).
Partitions are stored based on the server.properties
under the log.dirs
setting (e.g. /data/kafka
). In each,
you will see something like:
.log
- data payload being stored (actual message itself) - see Segments Sectionleader-epoch-checkpoint
- used to handle leader failures between replicas.index
- represents the segment’s base offset, has same name as the corresponding .log
file. See Segments Section.timeindex
-.snapshot
- internal kafka snapshotsTo view logs, you can run ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /data/kafka/events-1/00000000003064504069.log
You will see the actual payload (i.e. what the message said) and its baseOffset
The higher number of partitions, the higher the end-to-end latency.
We can reassign partitions or change the replication factor for a partition and topics.
With ./bin/kafka-reassign-partitions.sh --zookeeper1:2181/kafka --generate --topics-to-move-json-file topics.json --broker-list 3,2,1
With a JSON file:
topics.json file
{"topics":
[{"topic": "test"}],
"version":1
}
If your broker fails during partition reassignment, you can try deleting the reassigned partitions
OR by deleting the controller
node
bin/zookeeper-shell.sh --zookeeper1:2181/kafka
ls / # to view nodes on Zookeeper
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /admin
[delete_topics]
delete /admin/reassign_partitions
delete /controller
Consumers in a Consumer Group share ownership of the topics and partitions that they subscribe to. When a new consumer is added to the consumer group, it starts consuming messages previously consumed by other consumers. Same thing happens when a consumer dies; other consumers pick up the work.
Moving partition ownership from one consumer to another consumer is called partition rebalancing. During a rebalance, the entire consumer group will not be consuming messages. The consumer sends a heartbeat when it polls for data and when it commits offsets. If there isn’t a heartbeat within a certain timeframe (as specificed by your configs), the session will time out, then the coordinator will trigger a rebalance.
Kafka has two built-in consumer to partition assignment strategies; assign or subscribe. The first consumer to join a consumer group becomes the consumer group leader. The consumer group leader sends partition assignments to the consumer group coordinator, which sends assignments out to all consumers. Each consumer only has their list of partition assignments while the consumer group leader has all partition assignments. This process repeats every time a partition rebalance happens.
partition.assignment.strategy
is how you want to assign partitions to consumers. Options include:
To make sure there isn’t too many partition rebalances, consider writing a Consumer Rebalance Listener to monitor the number of partition rebalances.
When Kafka writes to a partition, it writes to a segment, specifically the active segment. Once the segment’s size limit has been reached, a new segment is opened and that becomes the new active segment. Segments are named by their base offset. The base offset of a segment is an offset greater than offsets in previous segments and less than or equal to offsets in that segment.
So what is a segment?
On disk, a partition is a directory and each segment is made up of two files: an index file and a
log file. A partition events-1
with segments 00000000003064504069
and 00000000003065011416
looks like:
tree kafka | head -n 6
kafka
├── events-1
│ ├── 00000000003064504069.index
│ ├── 00000000003064504069.log
│ ├── 00000000003065011416.index
│ ├── 00000000003065011416.log
Segment logs are where messages are stored.
Has two fields:
1. 4 Bytes: Relative Offset to the Base Offset
2. 4 Bytes: Physical Position of the related Log Message (base offset + relative offset so we can get O(1) lookup time)
The configuration .index.interval.bytes
(4096 bytes by default) sets and index interval that describes how frequently
(after how many bytes) an index entry will be added
Kafka is really concerned about reliability. Depending on the application, you can accidentally lose messages or duplicate messages, causing issues.
As a Kafka administrator, you can use the following ack tradeoffs on speed vs reliability:
acks=0
means no acknowledgement - guarantees are not made, retries are not attempted. Messages may be duplicated.acks=1
means leader acknowledgement; guarantees the leader only; message lost if leader compromisedacks=all
means all replicas acknowledge; the leader will wait until all replicas acknowledge they have received the messageThis can be set in a producer like:
./bin/kafka-console-producer.sh --broker kafka1:9092 --topic test --producer-property acks=0
We want secure connections between the producers, consumers, brokers, and zookeepers.
This will give you access so that certain users can change certain topics or brokers. All this is disabled by default. There is minor performance degradation if SSL is activated.
You can use any data type in your Kafka clusters, but you may never know what consumers are trying to subscribe to the messages so it’s common to use a tool like Avro to make sure that the consumers don’t miss any data within the messages themselves.
We can setup Avro to manage schemas over the life of the Kafka cluster. The producers serialize the data into Avro and the consumers deserialize the data.
Schema Registry is a component of the Confluent Platform.
schema-registry.properties
is the configuration file<path-to_confluent>/etc/schema-registry/schema-registry.properties
on a local installThe idea is to enable client applications to read and write Avro data, check compatibility as schemas evolve. Basically, Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retreiving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compability setting. It provides serializers that plug into Kafka clients that handle schemas storage and retrieval for Kafka messages that are sent in the Avro format.
https://github.com/confluentinc/schema-registry
Kafka Connect is an API that comes with Kafka. It’s a tool with already built connectors for many different data sources, letting you get data in and out of a cluster quickly. Since Kafka Connect exposes a REST API, this works well with other data sources. Kafka Connect works in standalone mode and in distributed mode.
An example would be if you have a MySQL table, you can stream that data into Kafka using Kafka Connect. You install Kafka Connect plugin on the broker, then use the REST API to connect to the cluster.
You have the following important files:
config/connect-file-sink.properties
- Used for getting data OUT of the kafka cluster (e.g. a Hadoop sink)config/connect-file-source.properties
- Used for getting data INTO the kafka clusterconfig/connect-standalone.properties
- Kafka Connect in Standalone modeconfig/connect-distributed.properties
- Kafka Connect in Distributed modeImportant configs in connect-standalone
or connect-distributed
modes:
bootstrap.servers=
Important configs in file-sink
or file-source
properties:
connector.class
(e.g. FileStreamSink)Run with: ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Depending on the configuration, your message logs are stored in a directory (for ours, we chose /data/kafka
).
Finding and maintaining these logs is handled by Kafka. As we make more messages, you can see the dir
filling up (du -h
)
By default, messages in a topic are kept for a week. A topic can be created with log compaction, meaning the messages are compacted, keeping only the most recent key (and everything has to be key-value pairs).
The config is --config cleanup.policy=compact
Streams are a sequence of events. There are no delete or update abilities because streams are immutable. Streams are much different than a database tables. Events are ordered, immutable, and replayable. Events can be structured or unstructured. Stream Processing is for when response times should be ms (e.g. credit card is valid or not) - this is real time data processing.
Operations perform in a time window. There are three times of time:
You want events to be on a single time zone (especially for Event Time).
Sometimes you’ll want to capture more than just a single event. This might be because:
The solution is to store the state.
State is the information stored between events. There are two types of state:
We have the following design patterns that depend on what you’re trying to do with the data and the sequence of events.
We have the following multi-cluster architectures:
ny.users
and uk.users
)
Consumer can then consume from *.users
Another solution is to use record headers, which will show what broker a message came fromMirrorMaker is used to replicate data between two datacenters. MirrorMaker is a collection of consumers in a consumer group. MirrorMaker runs a thread for each consumer. MirrorMaker creates a consumer for each topic. Run MirrorMaker on the destination cluster (not the source).
Run with bin/kafka-mirror-maker.sh
MirrorMaker uses a shared producer to send events to the new cluster.
Since Kafka is used to store data, we need to think about security features. We’ll break things down into Encryption, Client Authentication, and Authorization using ACLs.
A few of the things we want include:
Zookeeper
You can initialize server certificates and trusted client certificates, then specify them in the config here:
zookeeper.ssl.keyStore.location="/path/to/your/keystore"
zookeeper.ssl.keyStore.password="keystore_password"
zookeeper.ssl.trustStore.location="/path/to/your/truststore"
zookeeper.ssl.trustStore.password="truststore_password"
You would also need to setup a secure Client Port
secureClientPort=2281
Kafka
With Kafka, we want server and client side keystores and truststores. Here’s an example of testing out SSL with a client side configuration that can be used in say a console producer or consumer.
We want to set our Kafka brokers with security.protocol=SSL
and specify a server truststore.
After our Kafka brokers are setup, we then want to switch our client applications to use a client
truststore (e.g. in producers, consumers). In general (with the below example as a client truststore), it might
look like:
ssl.truststore.location=/tmp/kafka.client.truststore.jks
To create this truststore, I’m using a docker kafka image (wurstmeister/kafka:2.11-1.1.1), which has the following files:
/usr/lib/jvm/java-1.8-openjdk/jre/lib/security/cacerts
, that is actually just a symlink to /etc/ssl/certs/java/cacerts
Create your truststore with cp /usr/lib/jvm/java-1.8-openjdk/jre/lib/security /tmp/kafka.client.truststore.jks
Now set that into your kafka client config:
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
We use a keystore and a truststore when an application needs to communicate over SSL/TLS.
Usually these are password-protected files that sit on the same file system as our running application.
For java, the default format is JKS
until Java 8, then PKCS12
since Java 8.
A keystore has private keys and the certificates with their corresponding public keys. Keystores hold keys that our application owns that we can use to prove the integrity of a message and the authenticity of the sender. We use a keystore when we are a server and want to use HTTPS. Every Java Runtime Environment (JRE) has its own keystore.
During an SSL handshake, the server looks up the private key from the keystore and presents its corresponding public key and certificate to the client. Also, if the client needs to authenticate itself (aka mutual authentication) then the client also has a keystore and also presents its public key and certificate.
A truststore is like the opposite of a keystore. A keystore holds onto certificates that identify us while a truststore holds onto certificates that identify others. We use a truststore to trust the third party we’re about to communicate with.
So basically, a truststore is used to authenticate peers. If you’re the client, the server is the peer. If you’re the server, the client is the peer. If you’re the server, or if you’re the client and the server requests client authentication, then you have to authenticate yourself to the peer (which is why you need your own certificate and prviate key), which are in the keystore.
cacerts are a trustore where Java stores public certificates of root CAs.
This sits in the $JAVA_HOME/jre/lib/security
dir.
TLDR; clients and servers both need their own keystore and truststore for certain authentication methods like mutual authentication.
The summary is we’ll have a CA that creates a keystore (e.g. kafka.server.keystore.jks
), that then
creates a key pair and cert, which is used to sign other certificates. We add the generated CA to clients’ truststore
(e.g. kafka.client.truststore.jks
) and to brokers’ truststore (e.g. kafka.server.truststore.jks
), basically telling
the clients and brokers to trust this CA. We then sign all certificates in the keystore with the CA that we generated.
https://docs.confluent.io/current/security/security_tutorial.html
Each machine in the cluster has a public-private key pair, and a certificate to identify the machine. We want to sign the certificate for each machine in the cluster using a certificate authority (CA). Think of the CA like a government that issues out passports (signed certificates) that are difficult to forge.
The keystore stores each machine’s own identity. The truststore stores all the certificates that the machine should trust.
For Kafka, this means that you can sign all certificates in the cluster with a single CA, and then have all machines share the same truststore that trusts the CA. This way all machines can authenticate all other machines.
If host name verification is enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields, the Common Name (CN) or the Subject Alternative Name (SAN)
To deploy SSL, we want to:
We’ll need keytool
, a key and certificate management utility. This tool allows users to administer their own
public/private key pairs and associated certificates for use in self-authentication (where the user authenticates
himself/herself to other uesrs/services) or data integrity and authentication services using digital signatures.
Keytool stores the keys and certificates in a keystore (with the default implementation being a file). Example to create a keystore:
keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}
We use Java’s keytool
utility to generate the key and the certificate for each Kafka broker in the cluster.
We generate the key into a keystore called kafka.server.keystore
so that we can then export and sign it later with
a CA. The keystore file has the private key of the certificate so it needs to be kept safely.
# With user prompts
keytool -keystore kafka.server.keystore.jks -alias localhost -genkey
Enter keystore password:
Re-enter new password:
What is your first and last name?
...
We then have a kafka.server.keystore.jks
file. Remember, this is a keystore
file containing the private key
of the certificate (keep it away from others like kafka.client.truststore.jks
and kafka.server.truststore.jks
)
First let’s generate a CA that is a public-private key pair and certificate. It is intended to sign other certificates.
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
# -keyout file : means the file to write the newly created private key to
# -out file : the DER-encoded output file
# -days arg : the number of days to certify the certificate for
We’ll be prompted to Enter PEM pass phrase
along with:
openssl req -new -x509 -keyout ca-key -out ca-cert -days 33
Generating a 2048 bit RSA private key
...............................+++
.................................................................+++
writing new private key to 'ca-key'
Enter PEM pass phrase:
Verifying - Enter PEM pass phrase:
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) []:US
...
Out of the above command, we’ll have a key (ca-key
) and a cert (ca-cert
).
We want to add the generated CA to the clients’ truststore so that the clients can trust this CA
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
This then generates kafka.client.truststore.jks
We want to add the generated CA to the brokers’ truststore so that the brokers can trust this CA
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
This then generates kafka.server.truststore.jks
Now we want to sign all the certificates in the keystore with the CA that we generated.
We export the certificate from the keystore
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
This creates the cert-file
file, which is the certificate from the keystore
We then want to sign the cert with the CA
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
This creates the cert-signed
file
Import both the certificate of the CA and the signed certificate into the broker keystore
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
Note: if you need client keystores signed as well, then repeat this for clients as well. Example:
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed
Debugging:
To check if kafka.server.keystore.jks
is setup correctly, run:
openssl s_client -debug -connect localhost:9093 -tls1