• /
  • EnglishEspañolFrançais日本語한국어Português
  • Se connecterDémarrer

Cette traduction automatique est fournie pour votre commodité.

En cas d'incohérence entre la version anglaise et la version traduite, la version anglaise prévaudra. Veuillez visiter cette page pour plus d'informations.

Créer un problème

Monitorer Kafka autogéré sur Kubernetes avec OpenTelemetry

Monitorez votre cluster Apache Kafka autogéré fonctionnant sur Kubernetes en déployant l'OpenTelemetry Collector pour recueillir et transférer des métriques à New Relic.

Architecture

New Relic prend en charge deux approches pour le monitoring de Kubernetes Kafka autogéré : l’agent Java OpenTelemetry ou l’exportateur JMX Prometheus. Les diagrammes suivants illustrent le flux de données pour chaque approche.

Kubernetes self-managed Kafka monitoring architecture

Étapes d'installation

Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'agent Java OpenTelemetry sur vos brokers et en déployant un collecteur pour récupérer et envoyer des métriques et des logs à New Relic.

Avant de commencer

Assurez-vous d'avoir :

  • Un compte New Relic avec un
  • Cluster Kubernetes avec accès kubectl
  • Kafka déployé en tant que StatefulSet
  • Capacité à modifier et redéployer le StatefulSet Kafka

Déployer le Collecteur OpenTelemetry

Déployez le collecteur OpenTelemetry dans votre cluster. Cette étape crée également le ConfigMap kafka-jmx-config qui définit quelles métriques JMX l'agent Java collecte à partir de chaque pod de broker. Le collecteur doit être en cours d'exécution avant que vous ne redémarriez les brokers Kafka à l'étape suivante.

Étape 1. Créer un secret d'identifiants New Relic

Conseil

Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.

Étape 2. Créer values.yaml avec la configuration du collecteur

Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Choisissez votre image de collecteur préférée :

Pour des options de configuration avancées, voir :

  • Documentation du récepteur OTLP

  • Documentation du récepteur de métriques Kafka

    Étape 3. Installer OpenTelemetry Collector avec Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Étape 4. Vérifier le déploiement

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics are being received from broker pods
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

Étape 1. Créer un secret d'identifiants New Relic

Conseil

Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.

Étape 2. Créer des fichiers manifestes

Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Seule l'image de conteneur diffère. Les deux nécessitent également le ConfigMap kafka-jmx-config appliqué à votre espace de nommage Kafka.

Créer kafka-jmx-config.yaml - Configuration des métriques JMX pour l'agent Java (à appliquer à votre espace de nommage Kafka) :

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-config
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-jmx-config.yaml: |
---
rules:
# Per-topic custom metrics
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: Number of active controllers in the cluster
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute)
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC

Étape 3. Déployer les manifestes

bash
$
# Create namespace if it doesn't exist
$
kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
$
$
# Apply JMX ConfigMap to the Kafka namespace
$
kubectl apply -f kafka-jmx-config.yaml
$
$
# Apply collector ConfigMap
$
kubectl apply -f collector-configmap.yaml
$
$
# Apply Deployment and Service
$
kubectl apply -f collector-deployment.yaml

Étape 4. Vérifier le déploiement

bash
$
# Check pod status
$
kubectl get pods -n newrelic -l app=otel-collector
$
$
# View logs to verify metrics are being received from broker pods
$
kubectl logs -n newrelic -l app=otel-collector --tail=50

Configurer le StatefulSet Kafka pour l'agent Java

Maintenant que le collecteur est en cours d'exécution, patchez votre StatefulSet Kafka pour ajouter un conteneur d'initialisation qui télécharge le JAR de l'agent Java OpenTelemetry, puis attachez-le à la JVM du broker Kafka via KAFKA_OPTS.

Ajoutez les sections suivantes à votre manifeste Kafka StatefulSet existant :

spec:
template:
spec:
# 1. Init container: downloads OTel Java agent JAR before Kafka starts
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-agent/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach OTel Java agent to the Kafka broker JVM
env:
- name: KAFKA_OPTS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.jmx.enabled=true
-Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.runtime-telemetry.enabled=false
-Dotel.metric.export.interval=30000
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
- name: jmx-config
mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules
volumes:
- name: otel-agent
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-config # Deployed with the collector in the previous step

Conseil

Le ConfigMap kafka-jmx-config a été déployé avec le collecteur à l'étape précédente. La valeur otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 suppose que le collecteur est déployé dans l'espace de nommage newrelic avec le nom de service otel-collector. Mettez-le à jour pour qu'il corresponde au DNS réel de votre service de collecteur s'il est différent.

Appliquez votre StatefulSet mis à jour et attendez que les pod soient déployés :

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

(Facultatif) Instrumenter les applications productrices ou consommatrices

Important

Prise en charge des langages: actuellement, seules les applications Java sont prises en charge pour l'instrumentation des clients Kafka à l'aide de l'agent Java OpenTelemetry.

Pour collecter la télémétrie au niveau de l’application à partir de vos applications producteur et consommateur Kafka s’exécutant dans Kubernetes, ajoutez l’agent Java OpenTelemetry à ces pods d’application.

Ajoutez un conteneur d'initialisation et des variables d'environnement au déploiement de votre application :

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.service.name=order-process-service
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
volumes:
- name: otel-agent
emptyDir: {}

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

paramètres

Description

order-process-service

Remplacez par un nom unique pour votre application producteur ou consommateur

my-kafka-cluster

Remplacez par le même nom de cluster utilisé dans la configuration de votre broker

otel-collector.newrelic.svc.cluster.local

Remplacez par le nom DNS réel de votre service de collecteur (

<service-name>.<namespace>.svc.cluster.local

)

L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées. Pour une configuration avancée, consultez la documentation d’instrumentation Kafka.

Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'exportateur JMX Prometheus sur vos pods de broker et en déployant un collecteur pour récupérer et envoyer des métriques à New Relic.

Avant de commencer

Assurez-vous d'avoir :

  • Un compte New Relic avec un
  • Cluster Kubernetes avec accès kubectl
  • Kafka déployé en tant que StatefulSet avec un service headless (pour des noms DNS de pod stables)
  • Capacité à modifier et redéployer le StatefulSet Kafka

Créer la ConfigMap des métriques JMX

Créez une ConfigMap contenant la configuration de l'exportateur JMX qui définit les métriques Kafka à collecter. Ce ConfigMap sera monté dans chaque pod de broker Kafka.

Enregistrer sous kafka-jmx-config.yaml. Appliquez-le à l'espace de nommage où Kafka est déployé :

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-metrics
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-metrics-config.yml: |
startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Conseil

Personnaliser les métriques: vous pouvez ajouter ou modifier des motifs en consultant les exemples Prometheus JMX Exporter et la documentation des MBeans Kafka.

Appliquer la ConfigMap :

bash
$
kubectl apply -f kafka-jmx-config.yaml

Configurer le StatefulSet Kafka pour l'exportateur JMX

Patchez votre StatefulSet Kafka pour ajouter un conteneur d'initialisation qui télécharge le JAR de l'exportateur JMX Prometheus, puis attachez-le à la JVM du broker Kafka via KAFKA_OPTS.

Étape 1. Ajoutez les sections suivantes à votre manifeste StatefulSet Kafka existant :

spec:
template:
spec:
# 1. Init container: downloads JMX Exporter JAR before Kafka starts
initContainers:
- name: download-jmx-exporter
image: busybox:latest
command:
- sh
- -c
- |
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
JMX_EXPORTER_VERSION="1.5.0"
wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach JMX Exporter as Java agent on port 9404
env:
- name: KAFKA_OPTS
value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml"
# 3. Expose port 9404 for Prometheus scraping
ports:
- name: jmx-metrics
containerPort: 9404
protocol: TCP
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
- name: jmx-config
mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config
volumes:
- name: prometheus-jmx
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2

Étape 2. Appliquez votre StatefulSet mis à jour et attendez que les pods soient redéployés :

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

Étape 3. Une fois le déploiement terminé, vérifiez que les métriques sont exposées sur chaque pod de broker :

bash
$
# Replace kafka-0 and kafka with your pod name and namespace
$
kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20

Important

Clusters multi-brokers: le conteneur init et la configuration KAFKA_OPTS s'appliquent automatiquement à tous les pods du StatefulSet. Vérifiez que chaque pod de broker expose des métriques après le déploiement.

Déployer le Collecteur OpenTelemetry

Déployez le Collector OpenTelemetry dans votre cluster. Le collecteur récupère les données des pods de broker Kafka en utilisant des cibles DNS statiques et écoute sur le port 4317 les données OTLP provenant d'applications instrumentées.

La méthode d'installation Helm est l'approche recommandée pour déployer OpenTelemetry Collector dans Kubernetes.

Étape 1. Créer un secret d'identifiants New Relic

Conseil

Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.

Étape 2. Créer values.yaml avec la configuration du collecteur

Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Choisissez votre image de collecteur préférée :

Pour des options de configuration avancées, reportez-vous aux pages de documentation de ces récepteurs :

  • Documentation du récepteur Prometheus

  • Documentation du récepteur de métriques Kafka

    Étape 3. Installer OpenTelemetry Collector avec Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Étape 4. Vérifier le déploiement :

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

    Vous devriez voir des logs indiquant un scraping réussi des pods de broker Kafka sur le port 9404.

La méthode d'installation par manifeste offre un contrôle direct sur les ressources Kubernetes sans utiliser Helm.

Étape 1. Créer un secret d'identifiants New Relic

Conseil

Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.

Étape 2. Créer des fichiers manifestes

Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Seule l'image de conteneur diffère.

Pour des options de configuration avancées, reportez-vous aux pages de documentation de ces récepteurs :

  • Documentation du récepteur Prometheus

  • Documentation du récepteur de métriques Kafka

    Étape 3. Déployer les manifestes

    bash
    $
    # Create namespace if it doesn't exist
    $
    kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
    $
    $
    # Apply ConfigMap
    $
    kubectl apply -f collector-configmap.yaml
    $
    $
    # Apply Deployment (includes ServiceAccount)
    $
    kubectl apply -f collector-deployment.yaml

    Étape 4. Vérifier le déploiement :

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app=otel-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app=otel-collector --tail=50

    Vous devriez voir des logs indiquant un scraping réussi des pods de broker Kafka sur le port 9404.

(Facultatif) Instrumenter les applications productrices ou consommatrices

Important

Prise en charge des langages: les applications Java prennent en charge l’instrumentation client Kafka prête à l’emploi à l’aide de l’agent Java OpenTelemetry.

Pour collecter la télémétrie au niveau de l'application à partir de vos applications producteur et consommateur Kafka, utilisez l'agent Java OpenTelemetry avec un conteneur init :

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-java-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar
-Dotel.service.name=my-kafka-app
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
volumes:
- name: otel-auto-instrumentation
emptyDir: {}

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

paramètresDescription
service.nameRemplacez my-kafka-app par un nom unique pour votre application producteur ou consommateur
kafka.cluster.nameRemplacez my-kafka-cluster par le même nom de cluster utilisé dans la configuration de votre collecteur.
otlp.endpointLe point de terminaison http://otel-collector.newrelic.svc.cluster.local:4317 suppose que le collecteur est déployé dans l'espace de nommage newrelic en tant que otel-collector

L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées. Pour une configuration avancée, consultez la documentation d’instrumentation Kafka.

(Facultatif) Transmettre les logs du broker Kafka

Pour collecter les logs des brokers Kafka et les envoyer à New Relic, ajoutez un récepteur filelog à la configuration de votre collecteur.

Trouvez vos données

Après quelques minutes, vos données Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l’exploration de vos données Kafka dans différentes vues de l’UI de New Relic.

Le tableau suivant résume où chaque type de signal est stocké. Remplacez my-kafka-cluster par votre valeur KAFKA_CLUSTER_NAME dans toutes les requêtes ci-dessous :

SignalType d'événementCe qui est inclus
MétriquesMetricMétriques de broker, de topic, de partition, de groupe de consommateurs, et de la JVM
LogsLogLogs des applications de producteur et de consommateur (via l'agent Java OTel) et logs de broker collectés via l'agent Java
TracesSpanSpans de producteur et de consommateur, y compris les opérations publish et receive par message à travers les topics

Métriques

Les métriques de broker, de topic, de partition, de groupe de consommateurs et de la JVM sont stockées dans le type d'événement Metric :

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Logs

Les logs des applications productrices et consommatrices instrumentées avec l'agent Java OpenTelemetry, et les logs de broker collectés via l'agent Java sur le broker, sont stockés dans le type d'événement Log :

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traces

Les spans de producteur et de consommateur, y compris les opérations publish et receive par message sur les topics, sont stockés dans le type d'événement Span :

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Exemple

Un exemple fonctionnel complet avec les manifestes Kafka StatefulSet, les valeurs Helm, la configuration du Collecteur Otel, et des exemples d'applications producteur/consommateur est disponible dans le référentiel New Relic OpenTelemetry Examples.

Dépannage

Prochaines étapes

Droits d'auteur © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.