Optimize site visitors prices of Amazon MSK customers on Amazon EKS with rack consciousness


Are you incurring important cross Availability Zone site visitors prices when operating an Apache Kafka consumer in containerized environments on Amazon Elastic Kubernetes Service (Amazon EKS) that devour knowledge from Amazon Managed Streaming for Apache Kafka (Amazon MSK) matters?

Should you’re not aware of Apache Kafka’s rack consciousness function, we strongly suggest beginning with the weblog put up on the right way to Scale back community site visitors prices of your Amazon MSK customers with rack consciousness for an in-depth clarification of the function and the way Amazon MSK helps it.

Though the answer described in that put up makes use of an Amazon Elastic Compute Cloud (Amazon EC2) occasion deployed in a single Availability Zone to devour messages from an Amazon MSK matter, fashionable cloud-native architectures demand extra dynamic and scalable approaches. Amazon EKS has emerged as a number one platform for deploying and managing distributed functions. The dynamic nature of Kubernetes introduces distinctive implementation challenges in comparison with static consumer deployments. On this put up, we stroll you thru an answer for implementing rack consciousness in client functions which might be dynamically deployed throughout a number of Availability Zones utilizing Amazon EKS.

Right here’s a fast recap of some key Apache Kafka terminology from the referenced weblog. An Apache Kafka consumer client will register to learn towards a matter. A subject is the logical knowledge construction that Apache Kafka organizes knowledge into. A subject is segmented right into a single or many partitions. Partitions are the unit of parallelism in Apache Kafka. Amazon MSK supplies excessive availability by replicating every partition of a subject throughout brokers in numerous Availability Zones. As a result of there are replicas of every partition that reside throughout the totally different brokers that make up your MSK cluster, Amazon MSK additionally tracks whether or not a duplicate partition is in sync with the latest knowledge for that partition. This implies there may be one partition that Amazon MSK acknowledges as containing probably the most up-to-date knowledge, and this is called the chief partition. The gathering of replicated partitions known as in-sync replicas. This record of in-sync replicas is used internally when the cluster must elect a brand new chief partition if the present chief had been to turn out to be unavailable.

When client functions learn from a subject, the Apache Kafka protocol facilitates a community alternate to find out which dealer at present has the chief partition that the patron must learn from. Because of this the patron may very well be instructed to learn from a dealer in a special Availability Zone than itself, resulting in cross-zone site visitors cost in your AWS account. To assist optimize this price, Amazon MSK helps the rack consciousness function, utilizing which purchasers can ask an Amazon MSK cluster to offer a duplicate partition to learn from, throughout the identical Availability Zone because the consumer, even when it isn’t the present chief partition. The cluster accomplishes this by checking for an in-sync duplicate on a dealer throughout the identical Availability Zone as the patron.

The problem with Kafka purchasers on Amazon EKS

In Amazon EKS, the underlying items of computes are EC2 cases which might be abstracted as Kubernetes nodes. The nodes are organized into node teams for ease of administration, scaling, and grouping of functions on sure EC2 occasion sorts. As a greatest observe for resilience, the nodes in a node group are unfold throughout a number of Availability Zones. Amazon EKS makes use of the underlying Amazon EC2 metadata concerning the Availability Zone that it’s positioned in, and it injects that info into the node’s metadata throughout node configuration. Specifically, the Availability Zone (AZ ID) is injected into the node metadata.

When an software is deployed in a Kubernetes Pod on Amazon EKS, it goes by means of a strategy of binding to a node that meets the pod’s necessities. As proven within the following diagram, while you deploy consumer functions on Amazon EKS, the pod for the applying could be certain to a node with accessible capability in any Availability Zone. Additionally, the pod doesn’t robotically inherit the Availability Zone info from the node that it’s certain to, a bit of data crucial for rack consciousness. The next structure diagram illustrates Kafka customers operating on Amazon EKS with out rack consciousness.

AWS Cloud architecture showing MSK brokers, EKS pods, and EC2 instances in three Availability Zones

To set the consumer configuration for rack consciousness, the pod must know what Availability Zone it’s positioned in, dynamically, as it’s certain to a node. Throughout its lifecycle, the identical pod could be evicted from the node it was certain to beforehand and moved to a node in a special Availability Zone, if the matching standards allow that. Making the pod conscious of its Availability Zone dynamically units the rack consciousness parameter consumer.rack in the course of the initialization of the applying container that’s encapsulated within the pod.

After rack consciousness is enabled on the MSK cluster, what occurs if the dealer in the identical Availability Zone because the consumer (hosted on Amazon EKS or elsewhere) turns into unavailable? The Apache Kafka protocol is designed to assist a distributed knowledge storage system. Assuming clients comply with the most effective observe of implementing a replication issue > 1, Apache Kafka can dynamically reroute the patron consumer to the following accessible in-sync duplicate on a special dealer. This resilience stays constant even after implementing nearest duplicate fetching, or rack consciousness. Enabling rack consciousness optimizes the networking alternate to want a partition throughout the identical Availability Zone, but it surely doesn’t compromise the patron’s capability to function if the closest duplicate is unavailable.

On this put up, we stroll you thru an instance of the right way to use the Kubernetes metadata label, topology.k8s.aws/zone-id, assigned to every node by Amazon EKS, and use an open supply coverage engine, Kyverno, to deploy a coverage that mutates the pods which might be within the binding state to dynamically inject the node’s AZ ID into the pod’s metadata as an annotation, as depicted within the following diagram. This annotation, in flip, is utilized by the container to create an surroundings variable that’s assigned the pod’s annotated AZ ID info. The surroundings variable is then used within the container postStart lifecycle hook to generate the Kafka consumer configuration file with rack consciousness setting. The next structure diagram illustrates Kafka customers operating on Amazon EKS with rack consciousness.

AWS architecture with MSK, EKS, Kyverno, and EC2 across three Availability Zones, detailing topology

Answer Walkthrough

Conditions

For this walkthrough, we use AWS CloudShell to run the scripts which might be supplied inline as you progress. For a clean expertise, earlier than getting began, be certain that to have kubectl and eksctl put in and configured within the AWS CloudShell surroundings, following the set up directions for Linux (amd64). Helm can also be required to be set up on AWS CloudShell, utilizing the directions for Linux.

Additionally, test if the envsubst device is put in in your CloudShell surroundings by invoking:

If the device isn’t put in, you may set up it utilizing the command:

sudo dnf -y set up gettext-devel

We additionally assume you have already got an MSK cluster deployed in an Amazon Digital Non-public Cloud (VPC) in three Availability Zones with the title MSK-AZ-Conscious. On this walkthrough, we use AWS Identification and Entry Administration (IAM) authentication for consumer entry management to the MSK cluster. Should you’re utilizing a cluster in your account with a special title, substitute the cases of MSK-AZ-Conscious within the directions.

We comply with the identical MSK cluster configuration talked about within the Rack Consciousness weblog talked about beforehand, with some modifications. (Make sure you’ve set duplicate.selector.class = org.apache.kafka.frequent.duplicate.RackAwareReplicaSelector for the explanations mentioned there). In our configuration, we add one line: num.partitions = 6. Though not necessary, this ensures that matters which might be robotically created could have a number of partitions to assist clearer demonstrations in subsequent sections.

Lastly, we use the Amazon MSK Information Generator with the next configuration:

{
"title": "msk-data-generator",
    "config": {
    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
    "genkp.MSK-AZ-Conscious-Subject.with": "#{Web.uuid}",
    "genv.MSK-AZ-Conscious-Subject.product_id.with": "#{quantity.number_between '101','200'}",
    "genv.MSK-AZ-Conscious-Subject.amount.with": "#{quantity.number_between '1','5'}",
    "genv.MSK-AZ-Conscious-Subject.customer_id.with": "#{quantity.number_between '1','5000'}"
    }
}

Operating the MSK Information Generator with this configuration will robotically create a six-partition matter named MSK-AZ-Conscious-Subject on our cluster for us, and it’ll push knowledge to that matter. To comply with together with the walkthrough, we suggest and assume that you just deploy the MSK Information Generator to create the subject and populate it with simulated knowledge.

Create the EKS cluster

Step one is to put in an EKS cluster in the identical Amazon VPC subnets because the MSK cluster. You’ll be able to modify the title of the MSK cluster by altering that surroundings variable MSK_CLUSTER_NAME in case your cluster is created with a special title than prompt. You may as well change the Amazon EKS cluster title by altering EKS_CLUSTER_NAME.

The surroundings variables that we outline listed below are used all through the walkthrough.

The final step is to replace the kubeconfig with an entry for the EKS cluster:

AWS_ACCOUNT=$(aws sts get-caller-identity --output textual content --query Account)
export AWS_ACCOUNT
export AWS_REGION=${AWS_DEFAULT_REGION}
export MSK_CLUSTER_NAME=MSK-AZ-Conscious
export EKS_CLUSTER_NAME=EKS-AZ-Conscious
export EKS_CLUSTER_SIZE=3
export K8S_VERSION=1.32
export POD_ID_VERSION=1.3.5
 
MSK_BROKER_SG=$(aws kafka list-clusters 
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.SecurityGroups'  
  --output textual content | xargs)
export MSK_BROKER_SG

MSK_BROKER_CLIENT_SUBNETS=$(aws kafka list-clusters 
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.ClientSubnets'  
  --output textual content | xargs)
export MSK_BROKER_CLIENT_SUBNETS
 
VPC_ID=$(aws ec2 describe-subnets 
  --subnet-ids "$(echo "${MSK_BROKER_CLIENT_SUBNETS}" | minimize -d' ' -f1)" 
  --query 'Subnets[0].VpcId' 
  --output textual content)
export VPC_ID

EKS_SUBNETS=$(echo ${MSK_BROKER_CLIENT_SUBNETS} | sed 's/ +/,/g')
export EKS_SUBNETS

# Create a minimal config file for encrypted node volumes
cat > eks-config.yaml << EOF
apiVersion: eksctl.io/v1alpha5
form: ClusterConfig
metadata:
  title: ${EKS_CLUSTER_NAME}
  area: ${AWS_REGION}
  model: "${K8S_VERSION}"
vpc:
  id: "${VPC_ID}"
  subnets:
    public:
$(for subnet in ${MSK_BROKER_CLIENT_SUBNETS}; do
  AZ=$(aws ec2 describe-subnets --subnet-ids "$subnet" --query 'Subnets[0].AvailabilityZone' --output textual content)
  echo "      $AZ: { id: $subnet }"
performed)
nodeGroups:
  - title: ng1
    instanceType: m5.xlarge
    desiredCapacity: ${EKS_CLUSTER_SIZE}
    minSize: ${EKS_CLUSTER_SIZE}
    maxSize: ${EKS_CLUSTER_SIZE}
    securityGroups:
      attachIDs: ["${MSK_BROKER_SG}"]
    volumeSize: 100
    volumeType: gp3
    volumeEncrypted: true
EOF

eksctl create cluster -f eks-config.yaml

aws eks update-kubeconfig 
  --region "${AWS_REGION}" 
  --name ${EKS_CLUSTER_NAME}

Subsequent, you could create an IAM coverage, MSK-AZ-Conscious-Coverage, to permit entry from the Amazon EKS pods to the MSK cluster. Be aware right here that we’re utilizing MSK-AZ-Conscious because the cluster title.

Create a file, msk-az-aware-policy.json, with the IAM coverage template:

cat > msk-az-aware-policy.json << EOF
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:DescribeClusterDynamicConfiguration",
                "kafka-cluster:AlterClusterDynamicConfiguration"
            ],
            "Useful resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:cluster/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Impact": "Permit",
            "Motion": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Useful resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:topic/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Impact": "Permit",
            "Motion": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Useful resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:group/${MSK_CLUSTER_NAME}/*"
            ]
        }
    ]
}
EOF

To create the IAM coverage, use the next command. It first replaces the placeholders within the coverage file with values from related surroundings variables, after which creates the IAM coverage:

envsubst < msk-az-aware-policy.json | 
xargs -0 -I {} aws iam create-policy 
            --policy-name MSK-AZ-Conscious-Coverage 
            --policy-document {}

Configure EKS Pod Identification

Amazon EKS Pod Identification affords a simplified expertise for acquiring IAM permissions for pods on Amazon EKS. This requires putting in an add-on Amazon EKS Pod Identification Agent to the EKS cluster:

eksctl create addon 
  --cluster ${EKS_CLUSTER_NAME} 
  --name eks-pod-identity-agent 
  --version ${POD_ID_VERSION}

Affirm that the add-on has been put in and its standing is ACTIVE and that the standing of all of the pods related to the add-on is Operating.

eksctl get addon 
  --cluster ${EKS_CLUSTER_NAME} 
  --region "${AWS_REGION}" 
  --name eks-pod-identity-agent -o json

kubectl get pods 
  -n kube-system 
  -l app.kubernetes.io/occasion=eks-pod-identity-agent

After you’ve put in the add-on, you could create a pod identification affiliation between a Kubernetes service account and the IAM coverage created earlier:

eksctl create podidentityassociation 
  --namespace kafka-ns 
  --service-account-name kafka-sa 
  --role-name EKS-AZ-Conscious-Position 
  --permission-policy-arns arn:aws:iam::"${AWS_ACCOUNT}":coverage/MSK-AZ-Conscious-Coverage 
  --cluster ${EKS_CLUSTER_NAME} 
  --region "${AWS_REGION}"

Set up Kyverno

Kyverno is an open supply coverage engine for Kubernetes that permits for validation, mutation, and era of Kubernetes assets utilizing insurance policies written in YAML, thus simplifying the enforcement of safety and compliance necessities. You should set up Kyverno to dynamically inject metadata into the Amazon EKS pods as they enter the binding state to tell them of Availability Zone ID.

In AWS CloudShell, create a file named kyverno-values.yaml. This file defines the Kubernetes RBAC permissions for Kyverno’s Admission Controller to learn Amazon EKS node metadata as a result of the default Kyverno (v. 1.13 onwards) settings don’t enable this:

cat > kyverno-values.yaml << EOF
admissionController:
  rbac:
    clusterRole:
      extraResources:
        - apiGroups:
            - ""
          assets:
            - "nodes"
          verbs:
            - get
            - record
            - watch
EOF

After this file is created, you may set up Kyverno utilizing helm and offering the values file created within the earlier step:

helm repo add kyverno https://kyverno.github.io/kyverno/
helm repo replace

helm set up kyverno kyverno/kyverno 
  -n kyverno 
  --create-namespace 
  --version 3.3.7 
  -f kyverno-values.yaml

Beginning with Kyverno v 1.13, the Admission Controller is configured to disregard the AdmissionReview requests for pods in binding state. This must be modified by modifying the Kyverno ConfigMap:

kubectl -n kyverno edit configmap kyverno

The kubectl edit command makes use of the default editor configured in your surroundings (in our case Linux VIM).

This can open the ConfigMap in a textual content editor.

As highlighted within the following screenshot, [Pod/binding,*,*] ought to be faraway from the resourceFilters subject for the Kyverno Admission Controller to course of AdmissionReview requests for pods in binding state.

Kubernetes YAML configuration detailing Kyverno policy resource filters and cluster roles

If Linux VIM is your default editor, you may delete the entry utilizing VIM command 18x, that means delete (or minimize) 18 characters from the present cursor place. Save the modified configuration utilizing the VIM command :wq, that means write (or save) the file and give up.

After deleting, the resourceFilters subject ought to look much like the next screenshot.

Kubernetes YAML configuration with ReplicaSet resource filter highlighted for Kyverno policy management

When you have a special editor configured in your surroundings, comply with the suitable steps to attain the same end result.

Configure Kyverno coverage

You should configure the coverage that can make the pods rack conscious. This coverage is customized from the prompt method within the Kyverno weblog put up, Assigning Node Metadata to Pods. Create a brand new file with the title kyverno-inject-node-az-id.yaml:

cat > kyverno-inject-node-az-id.yaml  << EOF
apiVersion: kyverno.io/v2beta1
form: ClusterPolicy
metadata:
  title: inject-node-az-id
spec:
  background: false
  guidelines:
    - title: inject-node-az-id
      match:
        any:
        - assets:
            sorts:
            - Pod/binding
      context:
      - title: node
        variable:
          jmesPath: request.object.goal.title
          default: ''
      - title: node_az_id
        apiCall:
          urlPath: "/api/v1/nodes/{{node}}"
          jmesPath: "metadata.labels."topology.k8s.aws/zone-id" || 'empty'"
      mutate:
        patchStrategicMerge:
          metadata:
            annotations:
              node_az_id: "{{ node_az_id }}"
EOF

It instructs Kyverno to look at for pods in binding state. After Kyverno receives the AdmissionReview request for a pod, it units the variable node to the title of the node to which the pod is being certain. It additionally units one other variable node_az_id to the Availability Zone ID by calling the Kubernetes API /api/v1/nodes/node to get the node metadata label topology.k8s.aws/zone-id. Lastly, it defines a mutate rule to inject the obtained AZ ID into the pod’s metadata as an annotation node_az_id.
After you’ve created the file, apply the coverage utilizing the next command:

kubectl apply -f kyverno-inject-node-az-id.yaml

Deploy a pod with out rack consciousness

Now let’s visualize the issue assertion. To do that, connect with one of many EKS pods and test the way it interacts with the MSK cluster while you run a Kafka client from the pod.

First, get the bootstrap string of the MSK cluster. Lookup the Amazon Useful resource Names (ARNs) of the MSK cluster:

MSK_CLUSTER_ARN=$(
    aws kafka list-clusters 
      --query 'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].ClusterArn' 
      --output textual content)
export MSK_CLUSTER_ARN

Utilizing the cluster ARN, you will get the bootstrap string with the next command:

BOOTSTRAP_SERVER_LIST=$(
    aws kafka get-bootstrap-brokers 
        --cluster-arn "${MSK_CLUSTER_ARN}" 
        --query 'BootstrapBrokerStringSaslIam' 
        --output textual content)
export BOOTSTRAP_SERVER_LIST

Create a brand new file named kafka-no-az.yaml:

cat > kafka-no-az.yaml << EOF
apiVersion: v1
form: Namespace
metadata:
 title: kafka-ns
---
apiVersion: v1
form: ServiceAccount
metadata:
 title: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
form: Deployment
metadata:
  title: kafka-no-az
  namespace: kafka-ns
  labels:
    app: kafka-no-az
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-no-az
  template:
    metadata:
      labels:
        app: kafka-no-az
    spec:
      serviceAccountName: kafka-sa
      containers:
      - picture: bitnami/kafka:3.8.0
        title: kafka-no-az
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - title: BootstrapServerString
          worth: ${BOOTSTRAP_SERVER_LIST}
        - title: MSK_TOPIC
          worth: "MSK-AZ-Conscious-Subject"
        - title: KAFKA_HOME
          worth: /choose/bitnami/kafka
        - title: KAFKA_BIN
          worth: /choose/bitnami/kafka/bin
        - title: KAFKA_CONFIG
          worth: /choose/bitnami/kafka/config
        - title: KAFKA_LIBS
          worth: /choose/bitnami/kafka/libs
        - title: KAFKA_LOG4J_OPTS
          worth: "-Dlog4j.configuration=file:/choose/bitnami/kafka/config/log4j.properties"
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/choose/bitnami/kafka
                export KAFKA_BIN=${KAFKA_HOME}/bin
                export KAFKA_CONFIG=${KAFKA_HOME}/config
                cat > ${KAFKA_CONFIG}/consumer.properties << EOF1
                safety.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software program.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.consumer.callback.handler.class=software program.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                
                cat >> ${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Allow logging of Kafka Consumer to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.purchasers.client.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.structure=org.apache.log4j.PatternLayout
                log4j.appender.stderr.structure.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Goal=System.err
                EOF2
                cd ${KAFKA_HOME}/libs
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/obtain/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output ${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

This pod manifest doesn’t make use of the Availability Zone ID injected into the metadata annotation and therefore doesn’t add consumer.rack to the consumer.properties configuration.

Deploy the pods utilizing the next command:

kubectl apply -f kafka-no-az.yaml

Run the next command to substantiate that the pods have been deployed and are within the Operating state:

kubectl -n kafka-ns get pods

Choose a pod id from the output of the earlier command, and connect with it utilizing:

kubectl -n kafka-ns exec -it POD_ID -- sh

Run the Kafka client:

"${KAFKA_BIN}"/kafka-console-consumer.sh 
  --bootstrap-server  "${BootstrapServerString}" 
  --consumer.config  "${KAFKA_CONFIG}"/consumer.properties 
  --topic "${MSK_TOPIC}" 
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

This command will dump all of the ensuing logs into the file, non-rack-aware-consumer.log. There’s a number of info in these logs, and we encourage you to open them and take a deeper look. Subsequent, look at the EKS pod in motion. To do that, run the next command to tail the file to view fetch request outcomes to the MSK cluster. You’ll discover a handful of significant logs to evaluation as the patron entry numerous partitions of the Kafka matter:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which ought to look much like the next:

[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-3 at place FetchPosition{offset=100, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-0 at place FetchPosition{offset=83, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-5 at place FetchPosition{offset=100, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-2 at place FetchPosition{offset=107, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-4 at place FetchPosition{offset=84, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-1 at place FetchPosition{offset=85, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-3 at place FetchPosition{offset=100, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-0 at place FetchPosition{offset=83, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)

You’ve now related to a particular pod within the EKS cluster and run a Kafka client to learn from the MSK matter with out rack consciousness. Do not forget that this pod is operating inside a single Availability Zone.

Reviewing the log output, you discover rack: values as use1-az2, use1-az4, and use1-az6 because the pod makes calls to totally different partitions of the subject. These rack values signify the Availability Zone IDs that our brokers are operating inside. Because of this our EKS pod is creating networking connections to brokers throughout three totally different Availability Zones, which might be accruing networking prices in our account.

Additionally discover that you haven’t any method to test which node, and subsequently Availability Zone, this EKS pod is operating in. You’ll be able to observe within the logs that it’s calling to MSK brokers in numerous Availability Zones, however there isn’t a method to know which dealer is in the identical Availability Zone because the EKS pod you’ve related to. Delete the deployment while you’re performed:

kubectl -n kafka-ns delete -f kafka-no-az.yaml

Deploy a pod with rack consciousness

Now that you’ve skilled the patron conduct with out rack consciousness, you could inject the Availability Zone ID to make your pods rack-aware.

Create a brand new file named kafka-az-aware.yaml:

cat > kafka-az-aware.yaml << EOF
apiVersion: v1
form: Namespace
metadata:
 title: kafka-ns
---
apiVersion: v1
form: ServiceAccount
metadata:
 title: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
form: Deployment
metadata:
  title: kafka-az-aware
  namespace: kafka-ns
  labels:
    app: kafka-az-aware
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-az-aware
  template:
    metadata:
      labels:
        app: kafka-az-aware
    spec:
      serviceAccountName: kafka-sa
      containers:
      - picture: bitnami/kafka:3.8.0
        title: kafka-az-aware
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - title: BootstrapServerString
          worth: ${BOOTSTRAP_SERVER_LIST}
        - title: MSK_TOPIC
          worth: "MSK-AZ-Conscious-Subject"
        - title: KAFKA_HOME
          worth: /choose/bitnami/kafka
        - title: KAFKA_BIN
          worth: /choose/bitnami/kafka/bin
        - title: KAFKA_CONFIG
          worth: /choose/bitnami/kafka/config
        - title: KAFKA_LIBS
          worth: /choose/bitnami/kafka/libs
        - title: KAFKA_LOG4J_OPTS
          worth: "-Dlog4j.configuration=file:/choose/bitnami/kafka/config/log4j.properties"
        - title: NODE_AZ_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['node_az_id']
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/choose/bitnami/kafka
                export KAFKA_BIN=${KAFKA_HOME}/bin
                export KAFKA_CONFIG=${KAFKA_HOME}/config
                cat > ${KAFKA_CONFIG}/consumer.properties << EOF1
                safety.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software program.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.consumer.callback.handler.class=software program.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                if [ $NODE_AZ_ID ]
                then
                  echo "consumer.rack=$NODE_AZ_ID" >> ${KAFKA_CONFIG}/consumer.properties
                fi
                
                cat >> ${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Allow logging of Kafka Consumer to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.purchasers.client.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.structure=org.apache.log4j.PatternLayout
                log4j.appender.stderr.structure.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Goal=System.err
                EOF2
                
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/obtain/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output ${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

As you may observe, the pod manifest defines an surroundings variable NODE_AZ_ID, assigning it the worth from the pod’s personal metadata annotation node_az_id that was injected by Kyverno. The manifest then makes use of the pod’s postStart lifecycle script so as to add consumer.rack into the consumer.properties configuration, setting it equal to the worth within the surroundings variable NODE_AZ_ID.

Deploy the pods utilizing the next command:

kubectl apply -f kafka-az-aware.yaml

Run the next command to substantiate that the pods have been deployed and are within the Operating state:

kubectl -n kafka-ns get pods

Confirm that Availability Zone Ids have been injected into the pods

for pod in $(kubectl -n kafka-ns get pods --field-selector=standing.part==Operating -o=title | grep "pod/kafka-az-aware-" | xargs)
do
  kubectl -n kafka-ns get "$pod" -o yaml | grep "node_az_id:"
performed

Your output ought to look much like:

node_az_id: use1-az2
node_az_id: use1-az4
node_az_id: use1-az6

Or:

AWS CloudShell showing Kafka namespace pods and node assignments in Kubernetes cluster

Choose a pod id from the output of the get pods command and shell-in to it.

kubectl -n kafka-ns exec -it POD_ID -- sh

The output of the get $pod command matches the order of outcomes from the get pods command. This matching will enable you to perceive what Availability Zone your pod is operating in so you may examine it to log outputs later.

After you’ve related to your pod, run the Kafka client:

"${KAFKA_BIN}"/kafka-console-consumer.sh 
  --bootstrap-server  "${BootstrapServerString}" 
  --consumer.config  "${KAFKA_CONFIG}"/consumer.properties 
  --topic "${MSK_TOPIC}" 
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

Just like earlier than, this command will dump all of the ensuing logs into the file, rack-aware-consumer.log. You create a brand new file so there’s no overlap between the Kafka customers you’ve run. There’s a number of info in these logs, and we encourage you to open them and take a deeper look. If you wish to see the rack consciousness of your EKS pod in motion, run the next command to tail the file to view fetch request outcomes to the MSK cluster. You’ll be able to observe a handful of significant logs to evaluation right here as the patron entry numerous partitions of the Kafka matter:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which ought to look much like the next:

[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-5 at place FetchPosition{offset=527, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-4 at place FetchPosition{offset=509, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-3 at place FetchPosition{offset=527, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-2 at place FetchPosition{offset=522, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-1 at place FetchPosition{offset=533, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Conscious-Subject-0 at place FetchPosition{offset=520, offsetEpoch=Optionally available[0], currentLeader=LeaderAndEpoch{chief=Optionally available[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)

For every log line, now you can observe two rack: values. The primary rack: worth exhibits the present chief, the second rack: exhibits the rack that’s getting used to fetch messages.

For instance, take a look at MSK-AZ-Conscious-Subject-5. The chief is recognized as rack: use1-az4, however the fetch request is shipped to use1-az6 as indicated by to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.purchasers.client.internals.AbstractFetch)

You’ll discover one thing related in all different log traces. The fetch is at all times to the dealer in use1-az6, which maps to our expectation, given the pod we related to was on this Availability Zone.

Congratulations! You’re consuming from the closest duplicate on Amazon EKS.

Clear Up

Delete the deployment when completed:

kubectl -n kafka-ns delete -f kafka-az-aware.yaml

To delete the EKS Pod Identification affiliation:

eksctl delete podidentityassociation 
--cluster ${EKS_CLUSTER_NAME} 
--namespace kafka-ns 
--service-account-name kafka-sa

To delete the IAM coverage:

aws iam delete-policy 
  --policy-arn arn:aws:iam::"${AWS_ACCOUNT}":coverage/MSK-AZ-Conscious-Coverage

To delete the EKS cluster:

eksctl delete cluster -n ${EKS_CLUSTER_NAME} --disable-nodegroup-eviction

Should you adopted together with this put up utilizing the Amazon MSK Information Generator, remember to delete your deployment so it’s not making an attempt to generate and ship knowledge after you delete the remainder of your assets.

Clear up will depend upon which deployment choice you used. To learn extra concerning the deployment choices and the assets created for the Amazon MSK Information Generator, confer with Getting Began within the GitHub repository.

Creating an MSK cluster was a prerequisite of this put up, and in case you’d like to wash up the MSK cluster as properly, you should utilize the next command:

aws kafka delete-cluster --cluster-arn "${MSK_CLUSTER_ARN}"

There isn’t a extra price to utilizing AWS CloudShell, however in case you’d prefer to delete your shell, confer with the Delete a shell session house listing within the AWS CloudShell Person Information.

Conclusion

Apache Kafka nearest duplicate fetching, or rack consciousness, is a strategic cost-optimization method. By implementing it for Amazon MSK customers on Amazon EKS, you may considerably scale back cross-zone site visitors prices whereas sustaining sturdy, distributed streaming architectures. Open supply instruments resembling Kyverno can simplify complicated configuration challenges and drive significant financial savings.The answer we’ve demonstrated supplies a robust, repeatable method to dynamically injecting Availability Zone info into Kubernetes pods, optimize Kafka client routing, and reduce scale back switch prices.

Extra assets

To be taught extra about rack consciousness with Amazon MSK, confer with Scale back community site visitors prices of your Amazon MSK customers with rack consciousness.


Concerning the authors

Austin Groeneveld is a Streaming Specialist Options Architect at Amazon Internet Providers (AWS), primarily based within the San Francisco Bay Space. On this function, Austin is captivated with serving to clients speed up insights from their knowledge utilizing the AWS platform. He’s notably fascinated by the rising function that knowledge streaming performs in driving innovation within the knowledge analytics area. Exterior of his work at AWS, Austin enjoys watching and taking part in soccer, touring, and spending high quality time along with his household.

Farooq Ashraf is a Senior Options Architect at AWS, specializing in SaaS, Generative AI, and MLOps. He’s captivated with mixing multi-tenant SaaS ideas with Cloud providers to innovate scalable options for the digital enterprise, and has a number of weblog posts, and workshops to his credit score.