dshimizu/blog/alpha

とりとめのないITブログ

Kafka の管理コマンドで Topic を作成する処理がどうなっているか調べた

はじめに

Kafka には多数の管理コマンドのシェルスクリプトが用意されている。 この管理コマンドの中の kafka-topic.sh で Topic を作成するときの動きを調べた。

環境

Amazon Managed Kafka の推奨バージョンが v2.6.2 となっていたので、 v2.6 のもので調べた。

kafka-topic.sh

github.com

コメントは書かれているが、中身は実質以下のみ。

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

kafka-topic.sh は Topic の作成以外にも色々なことに使うことができるけど、kafka-run-class.sh から、オプションを渡しつつ Scala のコードを呼び出す形になっている模様。

kafka-run-class.sh

github.com

kafka-run-class.sh は jar ファイルなどの読み込みを行なっている模様。これを動かして必要な環境をセットしつつ本体のコードを実行していると思う。

kafka.admin.TopicCommand

呼び出されるのは以下のコードと思われる。

github.com

TopicCommand オブジェクトで、最初に TopicCommandOptions() のクラスが呼び出され、kafka-topic.sh に指定したオプションの解析っぽいことが行われている。

    val opts = new TopicCommandOptions(args)

その後、以下の部分で、 Zookeeper に接続するかどうかによって実行する処理が変わっている。 これは、Kafka に関する処理は、Zookeeper に接続するか、Broker (bootstrap-server) に接続するかに分かれているためかと思う。 トピック作成時は Broker に接続する形になるので、AdminClientTopicService()が呼び出されている。

    val topicService = if (opts.zkConnect.isDefined)
      ZookeeperTopicService(opts.zkConnect)
    else
      AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)

AdminClientTopicService() オブジェクトの apply メソッドが呼び出され、その後 createAdminClient メソッドが呼び出され、そこで create が実行されている感じのようだった。

  object AdminClientTopicService {
    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
      bootstrapServer match {
        case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
        case None =>
      }
      Admin.create(commandConfig)
    }

    def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
      new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
  }

まとめ

kafka-topics.sh で実行できる機能の中の、トピックの作成部分のコードを読んでみた。 kafka-topics.sh にいろんな機能をまとめる形になっているのか、ちょっと気になった。

参考