はじめに
Kafka には多数の管理コマンドのシェルスクリプトが用意されている。
この管理コマンドの中の kafka-topic.sh
で Topic を作成するときの動きを調べた。
環境
Amazon Managed Kafka の推奨バージョンが v2.6.2 となっていたので、 v2.6 のもので調べた。
kafka-topic.sh
コメントは書かれているが、中身は実質以下のみ。
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
kafka-topic.sh
は Topic の作成以外にも色々なことに使うことができるけど、kafka-run-class.sh
から、オプションを渡しつつ Scala のコードを呼び出す形になっている模様。
kafka-run-class.sh
kafka-run-class.sh
は jar ファイルなどの読み込みを行なっている模様。これを動かして必要な環境をセットしつつ本体のコードを実行していると思う。
kafka.admin.TopicCommand
呼び出されるのは以下のコードと思われる。
TopicCommand オブジェクトで、最初に TopicCommandOptions()
のクラスが呼び出され、kafka-topic.sh
に指定したオプションの解析っぽいことが行われている。
val opts = new TopicCommandOptions(args)
その後、以下の部分で、 Zookeeper に接続するかどうかによって実行する処理が変わっている。
これは、Kafka に関する処理は、Zookeeper に接続するか、Broker (bootstrap-server) に接続するかに分かれているためかと思う。
トピック作成時は Broker に接続する形になるので、AdminClientTopicService()
が呼び出されている。
val topicService = if (opts.zkConnect.isDefined) ZookeeperTopicService(opts.zkConnect) else AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)
AdminClientTopicService()
オブジェクトの apply メソッドが呼び出され、その後 createAdminClient
メソッドが呼び出され、そこで create が実行されている感じのようだった。
object AdminClientTopicService { def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = { bootstrapServer match { case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList) case None => } Admin.create(commandConfig) } def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer)) }
まとめ
kafka-topics.sh
で実行できる機能の中の、トピックの作成部分のコードを読んでみた。
kafka-topics.sh
にいろんな機能をまとめる形になっているのか、ちょっと気になった。