Monitorez votre cluster Apache Kafka autogéré fonctionnant sur Kubernetes en déployant l'OpenTelemetry Collector pour recueillir et transférer des métriques à New Relic.
Architecture
New Relic prend en charge deux approches pour le monitoring de Kubernetes Kafka autogéré : l’agent Java OpenTelemetry ou l’exportateur JMX Prometheus. Les diagrammes suivants illustrent le flux de données pour chaque approche.

Étapes d'installation
Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'agent Java OpenTelemetry sur vos brokers et en déployant un collecteur pour récupérer et envoyer des métriques et des logs à New Relic.
Avant de commencer
Assurez-vous d'avoir :
- Un compte New Relic avec un
- Cluster Kubernetes avec accès
kubectl - Kafka déployé en tant que StatefulSet
- Capacité à modifier et redéployer le StatefulSet Kafka
Déployer le Collecteur OpenTelemetry
Déployez le collecteur OpenTelemetry dans votre cluster. Cette étape crée également le ConfigMap kafka-jmx-config qui définit quelles métriques JMX l'agent Java collecte à partir de chaque pod de broker. Le collecteur doit être en cours d'exécution avant que vous ne redémarriez les brokers Kafka à l'étape suivante.
Étape 1. Créer un secret d'identifiants New Relic
Conseil
Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
Étape 2. Créer values.yaml avec la configuration du collecteur
Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Choisissez votre image de collecteur préférée :
Pour des options de configuration avancées, voir :
Documentation du récepteur de métriques Kafka
Étape 3. Installer OpenTelemetry Collector avec Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlÉtape 4. Vérifier le déploiement
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50
Étape 1. Créer un secret d'identifiants New Relic
Conseil
Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
Étape 2. Créer des fichiers manifestes
Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Seule l'image de conteneur diffère. Les deux nécessitent également le ConfigMap kafka-jmx-config appliqué à votre espace de nommage Kafka.
Créer kafka-jmx-config.yaml - Configuration des métriques JMX pour l'agent Java (à appliquer à votre espace de nommage Kafka) :
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-config namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-jmx-config.yaml: | --- rules: # Per-topic custom metrics - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
- bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: Number of active controllers in the cluster unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GCÉtape 3. Déployer les manifestes
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply JMX ConfigMap to the Kafka namespace$kubectl apply -f kafka-jmx-config.yaml$
$# Apply collector ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment and Service$kubectl apply -f collector-deployment.yamlÉtape 4. Vérifier le déploiement
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app=otel-collector --tail=50Configurer le StatefulSet Kafka pour l'agent Java
Maintenant que le collecteur est en cours d'exécution, patchez votre StatefulSet Kafka pour ajouter un conteneur d'initialisation qui télécharge le JAR de l'agent Java OpenTelemetry, puis attachez-le à la JVM du broker Kafka via KAFKA_OPTS.
Ajoutez les sections suivantes à votre manifeste Kafka StatefulSet existant :
spec: template: spec: # 1. Init container: downloads OTel Java agent JAR before Kafka starts initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - | wget -O /otel-agent/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach OTel Java agent to the Kafka broker JVM env: - name: KAFKA_OPTS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.jmx.enabled=true -Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.runtime-telemetry.enabled=false -Dotel.metric.export.interval=30000 volumeMounts: - name: otel-agent mountPath: /otel-agent - name: jmx-config mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules volumes: - name: otel-agent emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-config # Deployed with the collector in the previous stepConseil
Le ConfigMap kafka-jmx-config a été déployé avec le collecteur à l'étape précédente. La valeur otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 suppose que le collecteur est déployé dans l'espace de nommage newrelic avec le nom de service otel-collector. Mettez-le à jour pour qu'il corresponde au DNS réel de votre service de collecteur s'il est différent.
Appliquez votre StatefulSet mis à jour et attendez que les pod soient déployés :
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace(Facultatif) Instrumenter les applications productrices ou consommatrices
Important
Prise en charge des langages: actuellement, seules les applications Java sont prises en charge pour l'instrumentation des clients Kafka à l'aide de l'agent Java OpenTelemetry.
Pour collecter la télémétrie au niveau de l’application à partir de vos applications producteur et consommateur Kafka s’exécutant dans Kubernetes, ajoutez l’agent Java OpenTelemetry à ces pods d’application.
Ajoutez un conteneur d'initialisation et des variables d'environnement au déploiement de votre application :
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.service.name=order-process-service -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-agent mountPath: /otel-agent
volumes: - name: otel-agent emptyDir: {}Paramètres de configuration
Le tableau suivant décrit les principaux paramètres de configuration :
paramètres | Description |
|---|---|
| Remplacez par un nom unique pour votre application producteur ou consommateur |
| Remplacez par le même nom de cluster utilisé dans la configuration de votre broker |
| Remplacez par le nom DNS réel de votre service de collecteur (
) |
L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées. Pour une configuration avancée, consultez la documentation d’instrumentation Kafka.
Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'exportateur JMX Prometheus sur vos pods de broker et en déployant un collecteur pour récupérer et envoyer des métriques à New Relic.
Avant de commencer
Assurez-vous d'avoir :
- Un compte New Relic avec un
- Cluster Kubernetes avec accès
kubectl - Kafka déployé en tant que StatefulSet avec un service headless (pour des noms DNS de pod stables)
- Capacité à modifier et redéployer le StatefulSet Kafka
Créer la ConfigMap des métriques JMX
Créez une ConfigMap contenant la configuration de l'exportateur JMX qui définit les métriques Kafka à collecter. Ce ConfigMap sera monté dans chaque pod de broker Kafka.
Enregistrer sous kafka-jmx-config.yaml. Appliquez-le à l'espace de nommage où Kafka est déployé :
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-metrics namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-metrics-config.yml: | startDelaySeconds: 0 lowercaseOutputName: true lowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"Conseil
Personnaliser les métriques: vous pouvez ajouter ou modifier des motifs en consultant les exemples Prometheus JMX Exporter et la documentation des MBeans Kafka.
Appliquer la ConfigMap :
$kubectl apply -f kafka-jmx-config.yamlConfigurer le StatefulSet Kafka pour l'exportateur JMX
Patchez votre StatefulSet Kafka pour ajouter un conteneur d'initialisation qui télécharge le JAR de l'exportateur JMX Prometheus, puis attachez-le à la JVM du broker Kafka via KAFKA_OPTS.
Étape 1. Ajoutez les sections suivantes à votre manifeste StatefulSet Kafka existant :
spec: template: spec: # 1. Init container: downloads JMX Exporter JAR before Kafka starts initContainers: - name: download-jmx-exporter image: busybox:latest command: - sh - -c - | # Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases. JMX_EXPORTER_VERSION="1.5.0" wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \ "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar" volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach JMX Exporter as Java agent on port 9404 env: - name: KAFKA_OPTS value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml" # 3. Expose port 9404 for Prometheus scraping ports: - name: jmx-metrics containerPort: 9404 protocol: TCP volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx - name: jmx-config mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config volumes: - name: prometheus-jmx emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2Étape 2. Appliquez votre StatefulSet mis à jour et attendez que les pods soient redéployés :
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespaceÉtape 3. Une fois le déploiement terminé, vérifiez que les métriques sont exposées sur chaque pod de broker :
$# Replace kafka-0 and kafka with your pod name and namespace$kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20Important
Clusters multi-brokers: le conteneur init et la configuration KAFKA_OPTS s'appliquent automatiquement à tous les pods du StatefulSet. Vérifiez que chaque pod de broker expose des métriques après le déploiement.
Déployer le Collecteur OpenTelemetry
Déployez le Collector OpenTelemetry dans votre cluster. Le collecteur récupère les données des pods de broker Kafka en utilisant des cibles DNS statiques et écoute sur le port 4317 les données OTLP provenant d'applications instrumentées.
La méthode d'installation Helm est l'approche recommandée pour déployer OpenTelemetry Collector dans Kubernetes.
Étape 1. Créer un secret d'identifiants New Relic
Conseil
Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
Étape 2. Créer values.yaml avec la configuration du collecteur
Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Choisissez votre image de collecteur préférée :
Pour des options de configuration avancées, reportez-vous aux pages de documentation de ces récepteurs :
Documentation du récepteur de métriques Kafka
Étape 3. Installer OpenTelemetry Collector avec Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlÉtape 4. Vérifier le déploiement :
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50Vous devriez voir des logs indiquant un scraping réussi des pods de broker Kafka sur le port
9404.
La méthode d'installation par manifeste offre un contrôle direct sur les ressources Kubernetes sans utiliser Helm.
Étape 1. Créer un secret d'identifiants New Relic
Conseil
Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
Étape 2. Créer des fichiers manifestes
Les collecteurs NRDOT et OpenTelemetry utilisent une configuration identique. Seule l'image de conteneur diffère.
Pour des options de configuration avancées, reportez-vous aux pages de documentation de ces récepteurs :
Documentation du récepteur de métriques Kafka
Étape 3. Déployer les manifestes
bash$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$$# Apply ConfigMap$kubectl apply -f collector-configmap.yaml$$# Apply Deployment (includes ServiceAccount)$kubectl apply -f collector-deployment.yamlÉtape 4. Vérifier le déploiement :
bash$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app=otel-collector --tail=50Vous devriez voir des logs indiquant un scraping réussi des pods de broker Kafka sur le port
9404.
(Facultatif) Instrumenter les applications productrices ou consommatrices
Important
Prise en charge des langages: les applications Java prennent en charge l’instrumentation client Kafka prête à l’emploi à l’aide de l’agent Java OpenTelemetry.
Pour collecter la télémétrie au niveau de l'application à partir de vos applications producteur et consommateur Kafka, utilisez l'agent Java OpenTelemetry avec un conteneur init :
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-java-agent image: busybox:latest command: - sh - -c - | wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar -Dotel.service.name=my-kafka-app -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
volumes: - name: otel-auto-instrumentation emptyDir: {}Paramètres de configuration
Le tableau suivant décrit les principaux paramètres de configuration :
| paramètres | Description |
|---|---|
service.name | Remplacez my-kafka-app par un nom unique pour votre application producteur ou consommateur |
kafka.cluster.name | Remplacez my-kafka-cluster par le même nom de cluster utilisé dans la configuration de votre collecteur. |
otlp.endpoint | Le point de terminaison http://otel-collector.newrelic.svc.cluster.local:4317 suppose que le collecteur est déployé dans l'espace de nommage newrelic en tant que otel-collector |
L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées. Pour une configuration avancée, consultez la documentation d’instrumentation Kafka.
(Facultatif) Transmettre les logs du broker Kafka
Pour collecter les logs des brokers Kafka et les envoyer à New Relic, ajoutez un récepteur filelog à la configuration de votre collecteur.
Trouvez vos données
Après quelques minutes, vos données Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l’exploration de vos données Kafka dans différentes vues de l’UI de New Relic.
Le tableau suivant résume où chaque type de signal est stocké. Remplacez my-kafka-cluster par votre valeur KAFKA_CLUSTER_NAME dans toutes les requêtes ci-dessous :
| Signal | Type d'événement | Ce qui est inclus |
|---|---|---|
| Métriques | Metric | Métriques de broker, de topic, de partition, de groupe de consommateurs, et de la JVM |
| Logs | Log | Logs des applications de producteur et de consommateur (via l'agent Java OTel) et logs de broker collectés via l'agent Java |
| Traces | Span | Spans de producteur et de consommateur, y compris les opérations publish et receive par message à travers les topics |
Métriques
Les métriques de broker, de topic, de partition, de groupe de consommateurs et de la JVM sont stockées dans le type d'événement Metric :
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoLogs
Les logs des applications productrices et consommatrices instrumentées avec l'agent Java OpenTelemetry, et les logs de broker collectés via l'agent Java sur le broker, sont stockés dans le type d'événement Log :
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoTraces
Les spans de producteur et de consommateur, y compris les opérations publish et receive par message sur les topics, sont stockés dans le type d'événement Span :
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoExemple
Un exemple fonctionnel complet avec les manifestes Kafka StatefulSet, les valeurs Helm, la configuration du Collecteur Otel, et des exemples d'applications producteur/consommateur est disponible dans le référentiel New Relic OpenTelemetry Examples.
Dépannage
Prochaines étapes
- Explorer les métriques Kafka - Consulter la référence complète des métriques
- Créer des dashboards personnalisés - Créez des visualisations pour vos données Kafka
- Configurer des alertes - Monitorer les métriques critiques telles que le retard du consommateur et les partitions sous-répliquées
Ressources connexes
- Kafka auto-hébergé - monitoring Kafka pour les environnements auto-hébergés (non-Kubernetes)
- Kubernetes Strimzi - monitoring de Kafka pour Kafka géré par Strimzi sur Kubernetes
- Agent Java OpenTelemetry - Documentation officielle de l'agent Java OTel
- Exportateur JMX Prometheus – agent Java qui expose les métriques JMX au format Prometheus
- Récepteur Prometheus – récepteur du Collecteur Otel pour scraper les points de terminaison de métriques Prometheus
- Récepteur kafkametrics - Documentation du récepteur de décalage du consommateur et de métriques de topic