项目作者: 3axap4eHko

项目描述 :
CLI kafka client
高级语言: TypeScript
项目地址: git://github.com/3axap4eHko/kafka-console.git
创建时间: 2020-03-14T20:14:43Z
项目社区:https://github.com/3axap4eHko/kafka-console

开源协议:

下载


Kafka CLI tool

Command line tool to sufficiently and easy work with Kafka

NPM version
Downloads

Table of Contents

Features

  • Producer
  • Consumer groups with seek and timeout
  • Built-in message encoders/decoders with types: json, js, raw
  • Custom message encoders/decoders as a js module
  • Message headers
  • GZIP compression
  • Plain, SSL and SASL_SSL implementations
  • Admin client
  • TypeScript support

Installing

  1. npm install -g kafka-console

Examples

Common options

  1. -b, --brokers <brokers> bootstrap server host (default: "localhost:9092")
  2. -l, --log-level <logLevel> log level
  3. -t, --timeout <timeout> set a timeout of operation (default: "0")
  4. -p, --pretty pretty print (default: false)
  5. --ssl enable ssl (default: false)
  6. --mechanism <mechanism> sasl mechanism
  7. --username <username> sasl username
  8. --password <password> sasl password
  9. --auth-id <authId> sasl aws authorization identity
  10. --access-key-id <accessKeyId> sasl aws access key id
  11. --secret-access-key <secretAccessKey> sasl aws secret access key
  12. --session-token <seccionToken> sasl aws session token
  13. --oauth-bearer <oauthBearer> sasl oauth bearer token
  14. -V, --version output the version number
  15. -h, --help display help for command

Commands

  1. consume [options] <topic> Consume kafka topic events
  2. produce [options] <topic> Produce kafka topic events
  3. metadata Displays kafka server metadata
  4. list|ls [options] Lists kafka topics
  5. config [options] Describes config for specific resource
  6. topic:create <topic> Creates kafka topic
  7. topic:delete <topic> Deletes kafka topic
  8. topic:offsets <topic> [timestamp] Shows kafka topic offsets
  9. help [command] display help for command

Consumer

npx kafka-console consume [options] <topic>

Options

  1. -g, --group <group> consumer group name (default: "kafka-console-consumer-TIMESTAMP")
  2. -d, --data-format <data-format> messages data-format: json, js, raw (default: "json")
  3. -o, --output <filename> write output to specified filename
  4. -f, --from <from> read messages from the specific timestamp in milliseconds or ISO 8601 format. Set 0 to read from the beginning
  5. -c, --count <count> a number of messages to read (default: null)
  6. -s, --skip <skip> a number of messages to skip (default: 0)
  7. -h, --help display help for command

General usage with authentication

  1. npx kafka-console --brokers $KAFKA_BROKERS --ssl --mechanism plain --username $KAFKA_USERNAME --password $KAFKA_PASSWORD consume $KAFKA_TOPIC --group $KAFKA_TOPIC_GROUP

Stdout from timestamp jq example

  1. npx kafka-console consume $KAFKA_TOPIC --from 0 | jq .value

Custom data formatter example

  1. npx kafka-console consume $KAFKA_TOPIC --data-format ./formatter/avro.js | jq

Producer

  1. npx kafka-console produce [options] <topic>

Options

  1. -d, --data-format <data-format> messages data-format: json, js, raw (default: "json")
  2. -i, --input <filename> input filename
  3. -w, --wait <wait> wait the time in ms after sending a message (default: 0)
  4. -h, --header <header> set a static header (default: [])
  5. --help display help for command

General usage

  1. npx kafka-console produce $KAFKA_TOPIC -b $KAFKA_BROKERS --ssl --mechanism plain --username $KAFKA_USERNAME --password $KAFKA_PASSWORD

Produce a json data from stdin with custom formatter

  1. npx kafka-console payload.txt|kcli produce $KAFKA_TOPIC --data-format ./formatter/avro.js

Produce a json data from stdin

  1. node payloadGenerator.js|npx kafka-console produce $KAFKA_TOPIC

Produce a json array data from stdin

  1. cat payload.json|jq -r -c .[]|npx kafka-console produce $KAFKA_TOPIC

Payload single message input interface

  1. interface Payload {
  2. key?: string; // kafka
  3. value: any;
  4. headers?: { [key: string]: value };
  5. }

Formatters

  1. export interface Encoder<T> {
  2. (value: T): Promise<string | Buffer> | string | Buffer;
  3. }
  4. export interface Decoder<T> {
  5. (value: Buffer): Promise<T> | T;
  6. }
  7. export interface Formatter<T> {
  8. encode: Encoder<T>;
  9. decode: Decoder<T>;
  10. }

Supported Environment Variables

  • KAFKA_BROKERS
  • KAFKA_TIMEOUT
  • KAFKA_MECHANISM
  • KAFKA_USERNAME
  • KAFKA_PASSWORD
  • KAFKA_AUTH_ID
  • KAFKA_ACCESS_KEY_ID
  • KAFKA_SECRET_ACCESS_KEY
  • KAFKA_SESSION_TOKEN
  • KAFKA_OAUTH_BEARER

License

License The MIT License
Copyright (c) 2024 Ivan Zakharchanka