Understanding Kafka message format

πŸ”Ή Kafka Request–Response Communication Model

  • Kafka clients (Producers, Consumers, Admin) communicate with Kafka Brokers using a binary protocol over TCP.
  • Each client request β†’ Broker processes β†’ sends a corresponding response.
  • Communication is always request/response (even for β€œfire and forget” producers, the broker still gets the request).

πŸ”Ή High-Level Message Format

1. Request Message

A Kafka Request has:

Field Description
Size (int32) Total size of the request in bytes
Request Header Metadata (API key, version, correlation ID, client ID)
Request Body The actual payload (depends on API type: Produce, Fetch, Metadata, etc.)

Request Header Format:

  • api_key β†’ Identifies the API (e.g., Produce=0, Fetch=1, Metadata=3).
  • api_version β†’ Allows backward/forward compatibility.
  • correlation_id β†’ Unique ID to match request/response.
  • client_id β†’ String identifying the client.

2. Response Message

A Kafka Response has:

Field Description
Size (int32) Total size of the response in bytes
Correlation ID Must match the one from the request (for tracking)
Response Body Payload (depends on API type: acknowledgment, data, metadata, errors, etc.)

πŸ”Ή Example: Produce Request/Response

Produce Request (Client β†’ Broker)

  • Sent by producer when publishing messages to a topic.

Request Body fields include:

  • transactional_id (if using transactions)
  • acks (0,1,all)
  • timeout_ms
  • topic_data β†’ array of (topic, partition, record batch)
  • Record Batch = actual messages with key, value, timestamp, headers

Produce Response (Broker β†’ Client)

  • Sent by broker after writing messages.

Response Body fields include:

  • responses β†’ (topic, partition, base_offset, log_append_time, error_code)
  • throttle_time_ms

πŸ‘‰ This tells the producer where the messages were written and if there were errors.


πŸ”Ή Example: Fetch Request/Response

Fetch Request (Consumer β†’ Broker)

  • Sent by consumer to fetch messages.

Request Body fields include:

  • replica_id (usually -1 for normal consumers)
  • max_wait_ms (how long to wait for data)
  • min_bytes (minimum data to return)
  • topics β†’ array of (topic, partitions, offset, max_bytes)

Fetch Response (Broker β†’ Consumer)

  • Broker returns messages.

Response Body fields include:

  • throttle_time_ms
  • error_code
  • topics β†’ (topic, partition, high_watermark, record batch)
  • Record Batch = compressed batch of Kafka records (key, value, headers, timestamp, offset).

πŸ”Ή Metadata Request/Response Example

Metadata Request

  • Client asks broker for info about topics, partitions, leaders.

Metadata Response

  • Returns list of brokers, topic partitions, and leader assignments.
  • Used by producers/consumers to know where to send/fetch messages.

πŸ”Ή In Short

  • Kafka Request/Response format is a binary protocol (size + header + body).
  • Request Header = API key, version, correlation ID, client ID.
  • Response Header = correlation ID, response body.
  • Common APIs: Produce, Fetch, Metadata, OffsetCommit, JoinGroup, etc.
  • Message payload (records) = batches of key-value pairs with offsets, timestamps, headers.

πŸ”Ή 1. Produce Example (Producer β†’ Broker β†’ Response)

Produce Request

{
  "api_key": 0,               // ProduceRequest
  "api_version": 7,
  "correlation_id": 101,
  "client_id": "producer-1",
  "acks": "all",
  "timeout_ms": 30000,
  "topic_data": [
    {
      "topic": "orders",
      "partitions": [
        {
          "partition": 0,
          "records": [
            { "key": "order-101", "value": "{ 'item': 'Phone', 'qty': 1 }" },
            { "key": "order-102", "value": "{ 'item': 'Laptop', 'qty': 2 }" }
          ]
        }
      ]
    }
  ]
}

Produce Response

{
  "correlation_id": 101,
  "responses": [
    {
      "topic": "orders",
      "partition_responses": [
        {
          "partition": 0,
          "error_code": 0,
          "base_offset": 5001,
          "log_append_time": 1694592000000
        }
      ]
    }
  ]
}

πŸ‘‰ Broker confirms messages are written at offset 5001+ in partition 0.


πŸ”Ή 2. Fetch Example (Consumer β†’ Broker β†’ Response)

Fetch Request

{
  "api_key": 1,               // FetchRequest
  "api_version": 11,
  "correlation_id": 202,
  "client_id": "consumer-1",
  "max_wait_ms": 500,
  "min_bytes": 1,
  "topics": [
    {
      "topic": "orders",
      "partitions": [
        { "partition": 0, "fetch_offset": 5001, "max_bytes": 1048576 }
      ]
    }
  ]
}

Fetch Response

{
  "correlation_id": 202,
  "responses": [
    {
      "topic": "orders",
      "partition_responses": [
        {
          "partition": 0,
          "high_watermark": 5003,
          "records": [
            { "offset": 5001, "key": "order-101", "value": "{ 'item': 'Phone', 'qty': 1 }" },
            { "offset": 5002, "key": "order-102", "value": "{ 'item': 'Laptop', 'qty': 2 }" }
          ]
        }
      ]
    }
  ]
}

πŸ‘‰ Consumer fetches two records starting from offset 5001.


πŸ”Ή 3. Metadata Example (Client β†’ Broker β†’ Response)

Metadata Request

{
  "api_key": 3,               // MetadataRequest
  "api_version": 9,
  "correlation_id": 303,
  "client_id": "producer-1",
  "topics": ["orders"]
}

Metadata Response

{
  "correlation_id": 303,
  "brokers": [
    { "node_id": 1, "host": "broker1", "port": 9092 },
    { "node_id": 2, "host": "broker2", "port": 9092 }
  ],
  "topic_metadata": [
    {
      "topic": "orders",
      "partitions": [
        { "partition": 0, "leader": 1, "replicas": [1,2], "isr": [1,2] }
      ]
    }
  ]
}

πŸ‘‰ Tells the client: Partition-0 of orders topic is led by Broker-1, replicated on Brokers 1 & 2, both are in-sync (ISR).


βœ… These examples show how Kafka’s request/response protocol works:

  • Producer β†’ ProduceRequest β†’ Broker β†’ ProduceResponse
  • Consumer β†’ FetchRequest β†’ Broker β†’ FetchResponse
  • Any client β†’ MetadataRequest β†’ Broker β†’ MetadataResponse
Back to blog

Leave a comment