• /
  • EnglishEspañolFrançais日本語한국어Português
  • Se connecterDémarrer

Cette traduction automatique est fournie pour votre commodité.

En cas d'incohérence entre la version anglaise et la version traduite, la version anglaise prévaudra. Veuillez visiter cette page pour plus d'informations.

Créer un problème

Monitorer Kafka auto-hébergé avec OpenTelemetry

Monitorez votre cluster Apache Kafka auto-hébergé en installant le Collector OpenTelemetry directement sur les hôtes Linux. Choisissez entre l'approche de l'agent Java OpenTelemetry et celle de l'exportateur JMX Prometheus pour collecter les métriques JMX de vos brokers Kafka.

Architecture

New Relic prend en charge deux approches pour le monitoring de Kafka auto-hébergé : l’agent Java OpenTelemetry ou l’exportateur JMX Prometheus. Le diagramme suivant illustre le flux de données pour chaque approche.

Self-hosted Kafka monitoring architecture

Étapes d'installation

Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'agent Java OpenTelemetry sur vos brokers et en déployant un collecteur pour récupérer et envoyer des métriques et des logs à New Relic.

Avant de commencer

Assurez-vous d'avoir :

  • Un compte New Relic avec un
  • Accès réseau du collecteur au port du serveur bootstrap Kafka (généralement 9092)

Créer une configuration de collecteur

Créez la configuration principale du Collector OpenTelemetry à ~/opentelemetry/collector-kafka-config.yaml sur un hôte de monitoring.

receivers:
# OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
# Remove broker.id for cluster-level metrics — these represent the whole cluster,
# not a specific broker. broker.id is retained on broker-level metrics pipelines.
- context: resource
statements:
- delete_key(attributes, "broker.id")
transform/remove_extra_attributes:
metric_statements:
- context: resource
statements:
# Delete all attributes starting with "process."
- delete_matching_keys(attributes, "^process\\..*")
# Delete all attributes starting with "telemetry."
- delete_matching_keys(attributes, "^telemetry\\..*")
- delete_key(attributes, "host.arch")
- delete_key(attributes, "os.description")
- delete_key(attributes, "host.image.id")
- delete_key(attributes, "host.type")
- delete_matching_keys(attributes, "^cloud\\..*")
- delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:")
- delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side)
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
filter/remove_partition_level_replicas:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
exporters:
otlp/newrelic:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
# Broker metrics pipeline (excludes cluster-level metrics)
metrics/broker:
receivers: [otlp, kafkametrics]
processors:
- resourcedetection
- resource
- filter/exclude_cluster_metrics
- filter/internal_topics
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- metricstransform/kafka_topic_sum_aggregation
- filter/remove_partition_level_replicas
- batch/aggregation
exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id)
metrics/cluster:
receivers: [otlp]
processors:
- resourcedetection
- resource
- filter/include_cluster_metrics
- transform/remove_broker_id
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/aggregation
exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent)
traces/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent)
logs/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]

Définir les variables d'environnement

Définissez les variables d'environnement requises sur l'hôte de monitoring avant d'installer le collecteur :

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

VariableDescription
NEW_RELIC_LICENSE_KEYVotre clé de licence New Relic, par exemple YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUn nom unique pour votre cluster Kafka, par exemple my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESVos adresses de broker bootstrap Kafka, par exemple broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTPoint de terminaison d'ingestion OTLP. Utilisez https://otlp.nr-data.net:4317 pour la région US ou https://otlp.eu01.nr-data.net:4317 pour la région EU. Pour d’autres configurations, consultez Configurer votre point de terminaison OTLP.

Installer et démarrer le collecteur

Installez et exécutez le collecteur sur l'hôte de monitoring. Choisissez entre NRDOT Collector (la distribution de New Relic) ou OpenTelemetry Collector :

Conseil

NRDOT Collector est la distribution New Relic de l'OpenTelemetry Collector avec le support de New Relic pour l'assistance.

Étape 1. Télécharger et installer le binaire

Téléchargez et installez le binaire NRDOT Collector pour votre système d’exploitation hôte. L'exemple ci-dessous concerne l'architecture linux_amd64 :

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Important

Pour les autres systèmes d'exploitation et architectures, consultez les versions de NRDOT Collector et téléchargez le binaire approprié pour votre système.

Étape 2. Démarrer le collecteur

Lancez le collecteur avec votre fichier de configuration pour commencer le monitoring :

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

Le collecteur est maintenant en cours d'exécution et prêt à recevoir des données. Terminez les étapes restantes pour attacher l'agent Java à vos brokers Kafka avant que les métriques n'apparaissent dans New Relic.

Étape 1. Télécharger et installer le binaire

Téléchargez et installez le binaire OpenTelemetry Collector Contrib pour votre système d’exploitation hôte. L'exemple ci-dessous concerne l'architecture linux_amd64 :

bash
$
# Set version and architecture
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
# Download the collector
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
# Extract the binary
$
tar -xzf otelcol-contrib.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv otelcol-contrib /usr/local/bin/
$
$
# Verify installation
$
otelcol-contrib --version

Pour les autres systèmes d'exploitation, consultez la page des versions du Collecteur OpenTelemetry.

Étape 2. Démarrer le collecteur

Lancez le collecteur avec votre fichier de configuration pour commencer le monitoring :

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

Le collecteur est maintenant en cours d'exécution et prêt à recevoir des données. Terminez les étapes restantes pour attacher l'agent Java à vos brokers Kafka avant que les métriques n'apparaissent dans New Relic.

Télécharger l'agent Java OpenTelemetry

Important

Assurez-vous que votre Collector OpenTelemetry est en cours d'exécution avant de (re)démarrer les brokers Kafka avec l'agent Java attaché. L'agent commence à envoyer des métriques immédiatement au démarrage du broker, le collecteur doit donc être disponible pour les recevoir.

L’ agent Java OpenTelemetry s’exécute en tant qu’agent Java attaché à vos brokers Kafka, collectant les métriques Kafka et JMX et les envoyant via OTLP au collecteur :

bash
$
# Create directory for OpenTelemetry components
$
mkdir -p ~/opentelemetry
$
$
# Download OpenTelemetry Java agent
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Créer une configuration JMX personnalisée

Créez un fichier de configuration JMX pour l'agent Java OpenTelemetry afin de collecter les métriques Kafka à partir des MBeans JMX.

Créez le fichier ~/opentelemetry/kafka-jmx-config.yaml sur chaque hôte de broker avec la configuration suivante :

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above)
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms)
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
# JVM class loading
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
# Additional JVM CPU and system metrics
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.)
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Configurer le broker Kafka

Attachez l'agent Java OpenTelemetry à votre broker Kafka en définissant la variable d'environnement KAFKA_OPTS avant de démarrer Kafka.

Exemple de broker unique:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \
>
-Dotel.jmx.enabled=true \
>
-Dotel.jmx.config=$JMX_CONFIG \
>
-Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol=grpc \
>
-Dotel.metrics.exporter=otlp \
>
-Dotel.logs.exporter=otlp \
>
-Dotel.instrumentation.runtime-telemetry.enabled=false \
>
-Dotel.metric.export.interval=30000" \
>
bin/kafka-server-start.sh config/server.properties &

Important

Clusters multi-brokers: Pour plusieurs brokers, utilisez la même configuration avec des valeurs broker.id uniques (par ex. broker.id=1, broker.id=2, broker.id=3) dans le paramètre -Dotel.resource.attributes pour chaque broker.

Conseil

Les logs du broker sont automatiquement activés avec l'indicateur -Dotel.logs.exporter=otlp ci-dessus. Pour désactiver la collecte des logs du broker, définissez plutôt -Dotel.logs.exporter=none.

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

paramètresDescription
otlp.endpointRemplacez par l'adresse IP ou le nom d’hôte de l'hôte exécutant votre OpenTelemetry Collector, par exemple http://collector-host-ip:4317
broker.idRemplacez 1 par l'ID de broker unique pour chaque broker, par exemple broker.id=1, broker.id=2, broker.id=3
kafka.cluster.nameRemplacez my-kafka-cluster par le nom de votre cluster Kafka. Doit correspondre à la valeur définie dans la configuration du collecteur.
logs.exporterActive la collecte de log du broker lorsque défini sur otlp. Définissez sur none pour désactiver le transfert de logs du broker.

Pour toutes les options de configuration, consultez le guide de configuration de l’agent Java.

(Facultatif) Instrumenter les applications productrices ou consommatrices

Important

Prise en charge des langages: actuellement, seules les applications Java sont prises en charge pour l'instrumentation des clients Kafka à l'aide de l'agent Java OpenTelemetry.

Pour collecter la télémétrie applicative de vos applications productrices et consommatrices Kafka, téléchargez l’agent Java OpenTelemetry à partir de l’étape Télécharger l’agent Java OpenTelemetry ci-dessus.

Démarrez votre application avec l'agent :

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

paramètresDescription
service.nameRemplacez par un nom unique pour votre application producteur ou consommateur, par exemple order-process-service
kafka.cluster.nameRemplacez par le même nom de cluster utilisé dans la configuration de votre collecteur, par exemple my-kafka-cluster
otlp.endpointRemplacez par le nom d’hôte ou l'adresse IP de l'hôte exécutant votre OpenTelemetry Collector, par exemple http://collector-host-ip:4317

Conseil

La configuration ci-dessus envoie la télémétrie à un Collector OpenTelemetry s'exécutant sur collector-host-ip:4317. Si vous souhaitez un collecteur distinct dédié à la télémétrie d'application, créez-en un avec la configuration suivante :

L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées.

Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.

Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'exportateur JMX Prometheus sur vos brokers et en déployant un collecteur pour récupérer et envoyer des métriques à New Relic.

Avant de commencer

Assurez-vous d'avoir :

  • Un compte New Relic avec un
  • Accès réseau de l'hôte du collecteur vers chaque broker sur le port 9404
  • Accès réseau du collecteur au port bootstrap Kafka (généralement 9092)

Télécharger l'exportateur Prometheus JMX

Téléchargez le JAR de l'exportateur JMX Prometheus sur chaque hôte de broker Kafka :

bash
$
# Create directory for Prometheus components
$
mkdir -p ~/opentelemetry
$
$
# Download the Prometheus JMX Exporter agent JAR
$
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
$
JMX_EXPORTER_VERSION="1.5.0"
$
curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \
>
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"

Créer la configuration des métriques JMX

Créez le fichier de configuration de l'exportateur JMX qui définit les métriques Kafka à collecter. Enregistrez sous ~/opentelemetry/kafka-jmx-config.yaml sur chaque hôte de broker :

startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Conseil

Personnaliser les métriques: vous pouvez ajouter ou modifier des motifs en consultant les exemples Prometheus JMX Exporter et la documentation des MBeans Kafka. Consultez la documentation des règles de l'exportateur JMX pour une configuration supplémentaire.

Configurer les brokers Kafka pour utiliser l'exportateur JMX

Attachez l'exportateur JMX Prometheus en tant qu'agent Java à chaque broker Kafka en l'ajoutant à vos options de démarrage Kafka.

Exemple de broker unique:

bash
$
JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \
>
bin/kafka-server-start.sh config/server.properties &

Chaque broker exposera désormais des métriques Prometheus sur le port 9404. Vérifier :

bash
$
curl http://localhost:9404/metrics | grep kafka_

Important

Clusters multi-brokers: appliquez la même configuration KAFKA_OPTS à chaque broker. Chaque broker expose des métriques sur le port 9404 depuis l'adresse IP de son propre hôte.

Créer une configuration de collecteur

Créez la configuration du Collector OpenTelemetry à ~/opentelemetry/collector-kafka-config.yaml sur un hôte de monitoring.

Le récepteur Prometheus récupère tous les points de terminaison du broker. Le collecteur écoute sur 0.0.0.0:4317 toutes les données OTLP (traces applicatives, logs) en plus de scraper les points de terminaison Prometheus.

receivers:
# OTLP receiver for application traces, metrics, and logs (listens on port 4317)
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers
prometheus/kafka-jmx:
config:
scrape_configs:
- job_name: 'kafka-jmx-metrics'
metrics_path: /metrics
scrape_interval: 30s
static_configs:
# TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker
- targets: ['broker1-host:9404']
labels:
broker.id: '0'
- targets: ['broker2-host:9404']
labels:
broker.id: '1'
- targets: ['broker3-host:9404']
labels:
broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics
kafkametrics/cluster:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
exporters:
otlp/backend:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
processors:
# Batch processor for efficient export
batch/export:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names)
transform/metric-naming:
metric_statements:
- context: metric
statements:
- replace_pattern(name, "_", ".")
- replace_pattern(name, "\\.load\\.1", ".load_1")
- replace_pattern(name, "\\.recent\\.util", ".recent_util")
- replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count")
- replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used")
- replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max")
- replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc")
- replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader")
- replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr")
- replace_pattern(name, "\\.under\\.replicated", ".under_replicated")
- replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total"
- context: datapoint
statements:
- set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil
- delete_key(attributes, "gc") where attributes["gc"] != nil
- set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil
- delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics
resource/cluster-name:
attributes:
- key: kafka.cluster.name
# TODO: Replace with your Kafka cluster name
value: ${env:KAFKA_CLUSTER_NAME}
action: upsert
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: datapoint
statements:
- delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics
filter/scrape-overhead:
metrics:
exclude:
match_type: regexp
metric_names:
- "^jmx_.*"
- "^process_.*"
- "^jvm_buffer_pool_.*"
- "^jvm_threads_.*"
- "^jvm_classes_.*"
- "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$"
- "^jvm_compilation_.*"
- "^jvm_(runtime|info).*"
- "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes
transform/remove_attributes:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- delete_key(attributes, "service.instance.id")
- delete_key(attributes, "host.name")
- delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level
metricstransform/topic-aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
# Filter out original partition replicas metric
filter/exclude_partition_replicas_metric:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics
cumulativetodelta:
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
service:
pipelines:
# Application traces from instrumented Kafka clients and apps
traces:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps
metrics:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps
logs:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping
metrics/broker:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/exclude_cluster_metrics
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Prometheus JMX scraping
metrics/cluster/prometheus:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- memory_limiter
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions)
metrics/cluster/kafkametrics:
receivers:
- kafkametrics/cluster
processors:
- resource/cluster-name
- filter/internal_topics
- transform/remove_attributes
- metricstransform/topic-aggregation
- filter/exclude_partition_replicas_metric
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend

Définir les variables d'environnement

Définissez les variables d'environnement requises sur l'hôte de monitoring avant de démarrer le collecteur :

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region
$
# EU region: https://otlp.eu01.nr-data.net:4317

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

VariableDescription
NEW_RELIC_LICENSE_KEYVotre clé de licence New Relic, par exemple YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUn nom unique pour votre cluster Kafka, par exemple my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESVos adresses de broker bootstrap Kafka, par exemple broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTPoint de terminaison d'ingestion OTLP. Utilisez https://otlp.nr-data.net:4317 pour la région US ou https://otlp.eu01.nr-data.net:4317 pour la région EU. Pour d’autres configurations, consultez Configurer votre point de terminaison OTLP.

Installer et démarrer le collecteur

Installez et exécutez le collecteur sur l'hôte de monitoring. Choisissez entre NRDOT Collector (la distribution de New Relic) ou OpenTelemetry Collector :

Conseil

NRDOT Collector est la distribution New Relic de l’OpenTelemetry Collector avec le support de New Relic pour l’assistance. Pour plus d’informations, consultez le référentiel GitHub NRDOT Collector.

Étape 1. Télécharger et installer le binaire

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Important

Pour les autres systèmes d'exploitation et architectures, consultez les versions de NRDOT Collector et téléchargez le binaire approprié pour votre système.

Étape 2. Démarrer le collecteur

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

Le collecteur commencera à récupérer des métriques Kafka et à les envoyer à New Relic en quelques minutes.

Étape 1. Télécharger et installer le binaire

bash
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
tar -xzf otelcol-contrib.tar.gz
$
sudo mv otelcol-contrib /usr/local/bin/
$
otelcol-contrib --version

Pour les autres systèmes d'exploitation, consultez la page des versions du Collecteur OpenTelemetry.

Étape 2. Démarrer le collecteur

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

Le collecteur commencera à récupérer des métriques Kafka et à les envoyer à New Relic en quelques minutes.

(Facultatif) Instrumenter les applications productrices ou consommatrices

Important

Prise en charge des langages: actuellement, seules les applications Java sont prises en charge pour l'instrumentation des clients Kafka à l'aide de l'agent Java OpenTelemetry.

Pour collecter la télémétrie au niveau applicatif de vos applications productrices et consommatrices Kafka, téléchargez l'agent Java OpenTelemetry si ce n'est pas déjà fait :

bash
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Démarrez votre application avec l'agent :

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Paramètres de configuration

Le tableau suivant décrit les principaux paramètres de configuration :

paramètresDescription
service.nameRemplacez par un nom unique pour votre application producteur ou consommateur, par exemple order-process-service
kafka.cluster.nameRemplacez par le même nom de cluster utilisé dans la configuration de votre collecteur, par exemple my-kafka-cluster
otlp.endpointRemplacez par le nom d’hôte ou l'adresse IP de l'hôte exécutant votre OpenTelemetry Collector, par exemple http://collector-host-ip:4317

Conseil

La configuration ci-dessus envoie la télémétrie à un Collector OpenTelemetry s'exécutant sur collector-host-ip:4317. Si vous souhaitez un collecteur distinct dédié à la télémétrie d'application, créez-en un avec la configuration suivante :

L’agent Java fournit l’instrumentation Kafka prête à l’emploi sans aucune modification de code, capturant la latence des requêtes, les métriques de débit, les taux d’erreur et les traces distribuées. Pour une configuration avancée, consultez la documentation d’instrumentation Kafka.

(Facultatif) Transmettre les logs du broker Kafka

Pour collecter les logs des brokers Kafka et les envoyer à New Relic, configurez le récepteur filelog dans votre OpenTelemetry Collector.

Trouvez vos données

Après quelques minutes, vos données Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l’exploration de vos données Kafka dans différentes vues de l’UI de New Relic.

Le tableau suivant résume où chaque type de signal est stocké. Remplacez my-kafka-cluster par votre valeur KAFKA_CLUSTER_NAME dans toutes les requêtes ci-dessous :

SignalType d'événementCe qui est inclus
MétriquesMetricMétriques de broker, de topic, de partition, de groupe de consommateurs, et de la JVM
LogsLogLogs des applications de producteur et de consommateur (via l'agent Java OTel) et logs de broker collectés via l'agent Java
TracesSpanSpans de producteur et de consommateur, y compris les opérations publish et receive par message à travers les topics

Métriques

Les métriques de broker, de topic, de partition, de groupe de consommateurs et de la JVM sont stockées dans le type d'événement Metric. Remplacez my-kafka-cluster par votre valeur KAFKA_CLUSTER_NAME :

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Logs

Les logs des applications producteur et consommateur instrumentées avec l'agent Java OpenTelemetry, et les logs du broker lorsque -Dotel.logs.exporter=otlp est défini, sont stockés dans le type d'événement Log :

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traces

Les spans de producteur et de consommateur, y compris les opérations publish et receive par message sur les topics, sont stockés dans le type d'événement Span :

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Exemple

Un exemple complet et fonctionnel avec la configuration Docker Compose, la configuration du Collecteur Otel, la configuration de l'agent Java OTel et des exemples d'applications producteur/consommateur est disponible dans le référentiel New Relic OpenTelemetry Examples.

Dépannage

Prochaines étapes

Droits d'auteur © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.