Maîtrisez Kafka pour le streaming de données en temps réel. Commandes CLI, configuration et gestion de clusters Kafka pour applications modernes.
kafka-topics.sh
Gère les topics Kafka (création, liste, suppression, description)
--bootstrap-server--command-config--create--delete--list--describe--alter--topic--partitions--replication-factor--config--if-not-exists--if-exists--under-replicated-partitions--unavailable-partitions--at-min-isr-partitions--exclude-internal--help
kafka-topics.sh --bootstrap-server localhost:9092 --list
Liste tous les topics
--exclude-internal
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mon-topic --partitions 3 --replication-factor 1
Crée un topic avec 3 partitions et réplication 1
--if-not-exists--config
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mon-topic --config retention.ms=86400000
Crée un topic avec rétention de 24h
--config
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic compact-topic --config cleanup.policy=compact
Crée un topic avec politique de compactage
--config
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mon-topic
Décrit un topic (partitions, leaders, replicas)
--under-replicated-partitions--unavailable-partitions--at-min-isr-partitions
kafka-topics.sh --bootstrap-server localhost:9092 --describe
Décrit tous les topics
--exclude-internal
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic mon-topic --partitions 6
Augmente le nombre de partitions (ne peut pas diminuer)
--partitions
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic mon-topic --config retention.ms=172800000
Modifie la configuration d'un topic
--config--delete-config
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic mon-topic --delete-config retention.ms
Supprime une configuration de topic
--delete-config
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic mon-topic
Supprime un topic
--if-exists
kafka-configs.sh
Gère les configurations des topics, brokers, clients
--bootstrap-server--command-config--entity-type--entity-name--describe--alter--add-config--delete-config--all--help
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mon-topic --describe
Affiche la configuration d'un topic
--all
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Affiche la configuration d'un broker
--all
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mon-topic --alter --add-config retention.ms=86400000
Ajoute/modifie une configuration de topic
--add-config
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mon-topic --alter --delete-config retention.ms
Supprime une configuration de topic
--delete-config
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.retention.ms=86400000
Modifie la configuration d'un broker
--add-config
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type users --entity-name alice --describe
Affiche les quotas d'un utilisateur
--all
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type users --entity-name alice --alter --add-config producer_byte_rate=1048576
Définit un quota de production (1 Mo/s)
--add-config
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type clients --entity-name clientA --alter --add-config consumer_byte_rate=2097152
Définit un quota de consommation (2 Mo/s)
--add-config
kafka-console-producer.sh
Producteur en ligne de commande
--bootstrap-server--topic--producer-property--producer.config--property--sync--compression-codec--batch-size--message-send-max-retries--retry-backoff-ms--request-required-acks--request-timeout-ms--max-block-ms--max-memory-bytes--max-partition-memory-bytes--help
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mon-topic
Lance un producteur interactif
--producer-property
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mon-topic --producer-property acks=all
Producteur avec accusé de réception 'all'
--producer-property
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mon-topic --property parse.key=true --property key.separator=:
Producteur avec clé (format clé:valeur)
--property
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mon-topic < fichier.txt
Produit le contenu d'un fichier
kafka-console-consumer.sh
Consommateur en ligne de commande
--bootstrap-server--topic--whitelist--blacklist--consumer-property--consumer.config--property--partition--offset--group--from-beginning--max-messages--skip-message-on-error--isolation-level--formatter--timeout-ms--enable-systest-events--help
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --from-beginning
Consomme tous les messages depuis le début
--max-messages--timeout-ms
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --group mon-groupe
Consomme avec un groupe de consommateurs
--from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --partition 0 --offset 10 --max-messages 5
Consomme depuis une partition et offset spécifiques
--partition--offset--max-messages
kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "topic-.*"
Consomme tous les topics correspondant au pattern
--whitelist--blacklist
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --property print.key=true --property key.separator=:
Affiche les clés des messages
--property
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --property print.timestamp=true
Affiche les timestamps des messages
--property
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --property print.partition=true
Affiche les partitions des messages
--property
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mon-topic --property print.offset=true
Affiche les offsets des messages
--property
kafka-consumer-groups.sh
Gère les groupes de consommateurs
--bootstrap-server--command-config--list--describe--delete--delete-offsets--reset-offsets--group--all-groups--all-topics--topic--partition--shift-by--to-earliest--to-latest--to-current--to-datetime--to-offset--by-duration--dry-run--execute--export--members--members-verbose--offsets--state--verbose-v--help
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Liste tous les groupes de consommateurs
--state
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mon-groupe
Décrit un groupe (lag, offset courant, etc.)
--members--members-verbose--offsets--state
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
Décrit tous les groupes
--verbose
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group mon-groupe
Supprime un groupe de consommateurs
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic --reset-offsets --to-earliest --execute
Réinitialise les offsets au début
--to-earliest--to-latest--to-datetime--to-offset--shift-by--by-duration--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic --reset-offsets --to-latest --execute
Réinitialise les offsets à la fin
--to-latest--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic --reset-offsets --to-datetime 2024-01-01T00:00:00.000 --execute
Réinitialise les offsets à une date spécifique
--to-datetime--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic:0 --reset-offsets --to-offset 100 --execute
Réinitialise l'offset d'une partition spécifique
--to-offset--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic --reset-offsets --shift-by -10 --execute
Décale les offsets de -10
--shift-by--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mon-groupe --topic mon-topic --reset-offsets --by-duration PT1H --execute
Réinitialise les offsets à il y a 1 heure
--by-duration--dry-run
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group mon-groupe --topic mon-topic
Supprime les offsets d'un groupe pour un topic
kafka-producer-perf-test.sh
Test de performance du producteur
--topic--num-records--record-size--throughput--producer-props--producer.config--print-metrics--payload-file--transactional-id--transaction-duration-ms--help
kafka-producer-perf-test.sh --topic mon-topic --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
Test de performance (100k messages de 1KB)
--throughput
kafka-producer-perf-test.sh --topic mon-topic --num-records 50000 --record-size 100 --throughput 1000 --producer-props bootstrap.servers=localhost:9092 acks=all
Test avec débit limité à 1000 msg/s
--throughput--producer-props
kafka-consumer-perf-test.sh
Test de performance du consommateur
--bootstrap-server--topic--messages--threads--group--fetch-size--socket-buffer-size--show-detailed-stats--reporting-interval--date-format--print-metrics--help
kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic mon-topic --messages 100000 --threads 1
Test de consommation (100k messages)
--group--fetch-size--show-detailed-stats
kafka-reassign-partitions.sh
Réassigne les partitions entre brokers
--bootstrap-server--command-config--zookeeper--reassignment-json-file--topics-to-move-json-file--broker-list--generate--execute--verify--cancel--throttle--replica-alter-log-dirs-throttle--timeout--preserve-throttles--help
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics.json --broker-list "0,1,2" --generate
Génère un plan de réassignation
--generate
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file plan.json --execute
Exécute un plan de réassignation
--execute--throttle
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file plan.json --verify
Vérifie l'état de la réassignation
--verify--preserve-throttles
kafka-preferred-replica-election.sh
Déclenche l'élection du replica préféré
--bootstrap-server--command-config--zookeeper--path-to-json-file--help
kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
Lance l'élection pour toutes les partitions
--path-to-json-file
kafka-leader-election.sh
Déclenche l'élection de leader
--bootstrap-server--command-config--election-type--topic--partition--path-to-json-file--all-topic-partitions--help
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions
Élection preferred pour toutes les partitions
--election-type--all-topic-partitions
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type unclean --topic mon-topic --partition 0
Élection unclean pour une partition
--election-type--topic--partition
kafka-log-dirs.sh
Affiche les informations des répertoires de logs
--bootstrap-server--command-config--describe--broker-list--topic-list--help
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --broker-list 0,1,2
Décrit les répertoires de logs des brokers
--describe--broker-list--topic-list
kafka-dump-log.sh
Affiche le contenu des fichiers de log
--files--max-message-size--print-data-log--print-offset-commit--print-txn-log--deep-iteration--value-deserializer--key-deserializer--offsets-decoder--transaction-log-decoder--help
kafka-dump-log.sh --files /var/lib/kafka/data/mon-topic-0/00000000000000000000.log --print-data-log
Affiche les messages d'un fichier de log
--print-data-log--deep-iteration
kafka-dump-log.sh --files /var/lib/kafka/data/__consumer_offsets-0/00000000000000000000.log --print-offset-commit
Affiche les offsets des consommateurs
--print-offset-commit
kafka-delete-records.sh
Supprime des enregistrements avant un offset
--bootstrap-server--command-config--offset-json-file--help
kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete.json
Supprime les enregistrements selon le fichier JSON
kafka-broker-api-versions.sh
Affiche les versions d'API supportées par les brokers
--bootstrap-server--command-config--help
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Liste les versions d'API des brokers
kafka-metadata-quorum.sh
Gère le quorum de métadonnées (KRaft)
--bootstrap-server--command-config--status--replicas--describe--snapshot--human-readable--help
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --status
Affiche le statut du quorum KRaft
--human-readable
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --describe --replicas
Décrit les replicas du quorum
--describe--replicas--human-readable
kafka-storage.sh
Gère le stockage pour KRaft
random-uuidinfoformat--config--cluster-id--ignore-formatted--help
kafka-storage.sh random-uuid
Génère un UUID aléatoire pour le cluster
kafka-storage.sh format --config server.properties --cluster-id $(kafka-storage.sh random-uuid)
Formate le stockage pour KRaft
--config--cluster-id--ignore-formatted
kafka-storage.sh info --config server.properties
Affiche les informations du stockage
--config
kafka-acls.sh
Gère les ACLs (Access Control Lists)
--bootstrap-server--command-config--add--remove--list--authorizer-properties--authorizer--principal--allow-principal--deny-principal--operation--topic--group--cluster--transactional-id--delegation-token--resource-pattern-type--host--force--help
kafka-acls.sh --bootstrap-server localhost:9092 --list
Liste toutes les ACLs
--topic--group--cluster--principal
kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice --operation Read --topic mon-topic
Ajoute une ACL en lecture pour Alice
--add--allow-principal--deny-principal--operation--topic--group--cluster--host
kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice --operation Write --topic mon-topic
Ajoute une ACL en écriture
--add--operation
kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice --operation All --topic mon-topic
Ajoute tous les droits sur un topic
--add--operation
kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:bob --operation Read --group mon-groupe
Ajoute une ACL sur un groupe
--add--group
kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:admin --operation Create --cluster
Ajoute une ACL sur le cluster
--add--cluster
kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:alice --operation Read --topic mon-topic
Supprime une ACL
--remove--force
kafka-delegation-tokens.sh
Gère les tokens de délégation
--bootstrap-server--command-config--create--renew--expire--describe--owner-principal--renewer-principal--max-life-time-period--help
kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --owner-principal User:alice --renewer-principal User:admin
Crée un token de délégation
--create--owner-principal--renewer-principal--max-life-time-period
kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe
Liste les tokens de délégation
--describe--owner-principal
kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --hmac <token>
Renouvelle un token
--renew--renew-time-period
kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --hmac <token>
Expire un token
--expire--expiry-time-period
kafka-server-start.sh
Démarre un broker Kafka
-daemon--override
kafka-server-start.sh config/server.properties
Démarre un broker avec configuration
-daemon
kafka-server-start.sh -daemon config/server.properties
Démarre un broker en arrière-plan
-daemon
kafka-server-stop.sh
Arrête un broker Kafka
zookeeper-server-start.sh
Démarre ZooKeeper
-daemon
zookeeper-server-start.sh config/zookeeper.properties
Démarre ZooKeeper avec configuration
-daemon
zookeeper-server-stop.sh
Arrête ZooKeeper
zookeeper-shell.sh
Shell interactif ZooKeeper
localhost:2181
zookeeper-shell.sh localhost:2181 ls /brokers/ids
Liste les brokers enregistrés dans ZooKeeper
zookeeper-shell.sh localhost:2181 get /brokers/ids/0
Affiche les informations d'un broker
kafka-verifiable-producer.sh
Producteur vérifiable (pour tests)
--bootstrap-server--topic--max-messages--throughput--producer.config--value-prefix--acks--message-create-time--help
kafka-verifiable-consumer.sh
Consommateur vérifiable (pour tests)
--bootstrap-server--topic--group-id--max-messages--session-timeout--enable-autocommit--reset-policy--assignment-strategy--consumer.config--verbose--help
kafka-mirror-maker.sh
Réplication entre clusters Kafka (déprécié, utiliser MirrorMaker 2)
--consumer.config--producer.config--num.streams--whitelist--blacklist--help
connect-standalone.sh
Lance Kafka Connect en mode standalone
config/connect-standalone.propertiesconfig/connect-file-source.propertiesconfig/connect-file-sink.properties
connect-distributed.sh
Lance Kafka Connect en mode distribué
config/connect-distributed.properties
kafka-run-class.sh
Exécute une classe Java Kafka
<class_name>--help
kafka-run-class.sh kafka.tools.GetOffsetShell
Outil pour obtenir les offsets
--bootstrap-server--topic--partitions--time--help
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server localhost:9092 --topic mon-topic --time -1
Obtient les derniers offsets (-1 = latest, -2 = earliest)
--time--partitions
kafka-run-class.sh kafka.tools.JmxTool
Collecte les métriques JMX
--object-name--attributes--help
kafka-streams-application-reset.sh
Réinitialise une application Kafka Streams
--bootstrap-server--application-id--input-topics--intermediate-topics--force--dry-run--help
kafka-streams-application-reset.sh --bootstrap-server localhost:9092 --application-id mon-app --input-topics mon-topic
Réinitialise une application Streams
--input-topics--intermediate-topics--force--dry-run
kafka-transactions.sh
Gère les transactions
--bootstrap-server--command-config--list--describe--describe-producers--describe-transactions--describe-consumers--topic--help
kafka-transactions.sh --bootstrap-server localhost:9092 --list
Liste les transactions actives
kafka-transactions.sh --bootstrap-server localhost:9092 --describe-producers
Décrit les producteurs transactionnels
--topic
kafka-transactions.sh --bootstrap-server localhost:9092 --describe-transactions --transactional-id <id>
Décrit une transaction spécifique
--describe-transactions
kafka-features.sh
Gère les features du cluster
--bootstrap-server--command-config--describe--upgrade--downgrade--feature--help
kafka-features.sh --bootstrap-server localhost:9092 --describe
Affiche les features supportées
kafka-features.sh --bootstrap-server localhost:9092 --upgrade --feature metadata.version=3.6
Met à niveau une feature
--upgrade--feature
kafka-client-metrics.sh
Gère les métriques client
--bootstrap-server--command-config--alter--describe--delete--list--subscription--entity-type--entity-name--help
kafka-cluster.sh
Gère le cluster Kafka
cluster-idunregister--bootstrap-server--command-config--help
kafka-cluster.sh cluster-id --bootstrap-server localhost:9092
Affiche l'ID du cluster
kafka-get-offsets.sh
Obtient les offsets pour un groupe/topic
--bootstrap-server--command-config--topic--group--help
kafka-get-offsets.sh --bootstrap-server localhost:9092 --topic mon-topic --group mon-groupe
Affiche les offsets d'un groupe