Understanding Kafka message format
πΉ Kafka RequestβResponse Communication Model
- Kafka clients (Producers, Consumers, Admin) communicate with Kafka Brokers using a binary protocol over TCP.
- Each client request β Broker processes β sends a corresponding response.
- Communication is always request/response (even for βfire and forgetβ producers, the broker still gets the request).
πΉ High-Level Message Format
1. Request Message
A Kafka Request has:
Field | Description |
---|---|
Size (int32) | Total size of the request in bytes |
Request Header | Metadata (API key, version, correlation ID, client ID) |
Request Body | The actual payload (depends on API type: Produce, Fetch, Metadata, etc.) |
Request Header Format:
- api_key β Identifies the API (e.g., Produce=0, Fetch=1, Metadata=3).
- api_version β Allows backward/forward compatibility.
- correlation_id β Unique ID to match request/response.
- client_id β String identifying the client.
2. Response Message
A Kafka Response has:
Field | Description |
---|---|
Size (int32) | Total size of the response in bytes |
Correlation ID | Must match the one from the request (for tracking) |
Response Body | Payload (depends on API type: acknowledgment, data, metadata, errors, etc.) |
πΉ Example: Produce Request/Response
Produce Request (Client β Broker)
-
Sent by producer when publishing messages to a topic.
Request Body fields include:
-
transactional_id
(if using transactions) -
acks
(0,1,all) timeout_ms
-
topic_data
β array of (topic, partition, record batch) - Record Batch = actual messages with key, value, timestamp, headers
Produce Response (Broker β Client)
-
Sent by broker after writing messages.
Response Body fields include:
-
responses
β (topic, partition, base_offset, log_append_time, error_code) throttle_time_ms
π This tells the producer where the messages were written and if there were errors.
πΉ Example: Fetch Request/Response
Fetch Request (Consumer β Broker)
-
Sent by consumer to fetch messages.
Request Body fields include:
-
replica_id
(usually -1 for normal consumers) -
max_wait_ms
(how long to wait for data) -
min_bytes
(minimum data to return) -
topics
β array of (topic, partitions, offset, max_bytes)
Fetch Response (Broker β Consumer)
- Broker returns messages.
Response Body fields include:
throttle_time_ms
error_code
-
topics
β (topic, partition, high_watermark, record batch) - Record Batch = compressed batch of Kafka records (key, value, headers, timestamp, offset).
πΉ Metadata Request/Response Example
Metadata Request
- Client asks broker for info about topics, partitions, leaders.
Metadata Response
- Returns list of brokers, topic partitions, and leader assignments.
- Used by producers/consumers to know where to send/fetch messages.
πΉ In Short
- Kafka Request/Response format is a binary protocol (size + header + body).
- Request Header = API key, version, correlation ID, client ID.
- Response Header = correlation ID, response body.
- Common APIs: Produce, Fetch, Metadata, OffsetCommit, JoinGroup, etc.
- Message payload (records) = batches of key-value pairs with offsets, timestamps, headers.
πΉ 1. Produce Example (Producer β Broker β Response)
Produce Request
{
"api_key": 0, // ProduceRequest
"api_version": 7,
"correlation_id": 101,
"client_id": "producer-1",
"acks": "all",
"timeout_ms": 30000,
"topic_data": [
{
"topic": "orders",
"partitions": [
{
"partition": 0,
"records": [
{ "key": "order-101", "value": "{ 'item': 'Phone', 'qty': 1 }" },
{ "key": "order-102", "value": "{ 'item': 'Laptop', 'qty': 2 }" }
]
}
]
}
]
}
Produce Response
{
"correlation_id": 101,
"responses": [
{
"topic": "orders",
"partition_responses": [
{
"partition": 0,
"error_code": 0,
"base_offset": 5001,
"log_append_time": 1694592000000
}
]
}
]
}
π Broker confirms messages are written at offset 5001+ in partition 0.
πΉ 2. Fetch Example (Consumer β Broker β Response)
Fetch Request
{
"api_key": 1, // FetchRequest
"api_version": 11,
"correlation_id": 202,
"client_id": "consumer-1",
"max_wait_ms": 500,
"min_bytes": 1,
"topics": [
{
"topic": "orders",
"partitions": [
{ "partition": 0, "fetch_offset": 5001, "max_bytes": 1048576 }
]
}
]
}
Fetch Response
{
"correlation_id": 202,
"responses": [
{
"topic": "orders",
"partition_responses": [
{
"partition": 0,
"high_watermark": 5003,
"records": [
{ "offset": 5001, "key": "order-101", "value": "{ 'item': 'Phone', 'qty': 1 }" },
{ "offset": 5002, "key": "order-102", "value": "{ 'item': 'Laptop', 'qty': 2 }" }
]
}
]
}
]
}
π Consumer fetches two records starting from offset 5001.
πΉ 3. Metadata Example (Client β Broker β Response)
Metadata Request
{
"api_key": 3, // MetadataRequest
"api_version": 9,
"correlation_id": 303,
"client_id": "producer-1",
"topics": ["orders"]
}
Metadata Response
{
"correlation_id": 303,
"brokers": [
{ "node_id": 1, "host": "broker1", "port": 9092 },
{ "node_id": 2, "host": "broker2", "port": 9092 }
],
"topic_metadata": [
{
"topic": "orders",
"partitions": [
{ "partition": 0, "leader": 1, "replicas": [1,2], "isr": [1,2] }
]
}
]
}
π Tells the client: Partition-0 of orders
topic is led by Broker-1, replicated on Brokers 1 & 2, both are in-sync (ISR).
β These examples show how Kafkaβs request/response protocol works:
- Producer β
ProduceRequest
β Broker βProduceResponse
- Consumer β
FetchRequest
β Broker βFetchResponse
- Any client β
MetadataRequest
β Broker βMetadataResponse