Kafka MCP サーバー
Apache Kafka と統合して、LLM および Agentic アプリケーションの公開および消費機能を提供するメッセージ コンテキスト プロトコル (MCP) サーバー。
概要
このプロジェクトは、AIモデルが標準化されたインターフェースを介してKafkaトピックと対話できるようにするサーバーを実装します。サポート対象は以下のとおりです。
Kafkaトピックへのメッセージの公開
Kafkaトピックからのメッセージの消費
Related MCP server: Slack MCP Server
前提条件
Python 3.8以上
Apache Kafka インスタンス
Python の依存関係 (インストールのセクションを参照)
インストール
リポジトリをクローンします。
git clone <repository-url> cd <repository-directory>仮想環境を作成してアクティブ化します。
python -m venv venv source venv/bin/activate # On Windows, use: venv\Scripts\activate必要な依存関係をインストールします。
pip install -r requirements.txtrequirements.txt が存在しない場合は、次のパッケージをインストールします。
pip install aiokafka python-dotenv pydantic-settings mcp-server
構成
次の変数を含む.envファイルをプロジェクト ルートに作成します。
使用法
サーバーの実行
提供されているmain.pyスクリプトを使用してサーバーを実行できます。
利用可能な交通手段:
stdio: 標準入出力(デフォルト)sse: サーバー送信イベント
Claude Desktopとの統合
この Kafka MCP サーバーを Claude Desktop で使用するには、Claude Desktop 構成ファイルに次の構成を追加します。
<PATH TO PROJECTS>プロジェクト ディレクトリへの絶対パスに置き換えます。
プロジェクト構造
main.py: アプリケーションのエントリポイントkafka.py: Kafka コネクタの実装server.py: Kafka とのやり取りのためのツールを備えた MCP サーバーの実装settings.py: Pydantic を使用した構成管理
利用可能なツール
kafka-publish
構成された Kafka トピックに情報を公開します。
kafka-consume
構成された Kafka トピックから情報を消費します。
注意: 一度トピックからメッセージを読み取ると、同じグループIDを使用して再度読み取ることはできません。
トピックの作成
指定されたパラメータを使用して新しい Kafka トピックを作成します。
オプション:
--topic作成するトピックの名前--partitions割り当てるパーティションの数--replication-factorブローカー間のレプリケーション係数--config(オプション) トピックレベルの構成のオーバーライド (例:retention.ms=604800000)
トピックを削除
既存の Kafka トピックを削除します。
オプション:
--topic削除するトピックの名前--timeout(オプション) 削除が完了するまでの待機時間
リストトピック
クラスター内のすべてのトピックを一覧表示します (またはパターンでフィルタリングします)。
オプション:
--bootstrap-serverブローカーアドレス--pattern(オプション) トピック名をフィルタリングする正規表現--exclude-internal(オプション) 内部トピックを除外する (デフォルト: true)
トピック構成
1 つ以上のトピックの構成を表示または変更します。
オプション:
--describeトピックの現在の設定を表示する--alter構成を変更します (例:--add-config retention.ms=86400000,--delete-config cleanup.policy)--topicトピックの名前
トピックメタデータ
トピックまたはクラスターに関するメタデータを取得します。
オプション:
--topic(指定されている場合) このトピックのメタデータのみを取得します--bootstrap-serverブローカーアドレス--include-offline(オプション) オフラインのブローカーまたはパーティションを含める