Кафка MCP-сервер
Сервер протокола контекста сообщений (MCP), который интегрируется с Apache Kafka для предоставления функций публикации и потребления для приложений LLM и Agentic.
Обзор
Этот проект реализует сервер, который позволяет моделям ИИ взаимодействовать с темами Kafka через стандартизированный интерфейс. Он поддерживает:
Публикация сообщений в темах Kafka
Потребление сообщений из тем Kafka
Related MCP server: Slack MCP Server
Предпосылки
Питон 3.8+
Экземпляр Apache Kafka
Зависимости Python (см. раздел Установка)
Установка
Клонируйте репозиторий:
git clone <repository-url> cd <repository-directory>Создайте виртуальную среду и активируйте ее:
python -m venv venv source venv/bin/activate # On Windows, use: venv\Scripts\activateУстановите необходимые зависимости:
pip install -r requirements.txtЕсли файл requirements.txt отсутствует, установите следующие пакеты:
pip install aiokafka python-dotenv pydantic-settings mcp-server
Конфигурация
Создайте файл .env в корне проекта со следующими переменными:
Использование
Запуск сервера
Вы можете запустить сервер, используя предоставленный скрипт main.py :
Доступные варианты транспорта:
stdio: Стандартный ввод/вывод (по умолчанию)sse: События, отправленные сервером
Интеграция с Claude Desktop
Чтобы использовать этот сервер Kafka MCP с Claude Desktop, добавьте следующую конфигурацию в файл конфигурации Claude Desktop:
Замените <PATH TO PROJECTS> на абсолютный путь к каталогу вашего проекта.
Структура проекта
main.py: Точка входа для приложенияkafka.py: реализация коннектора Kafkaserver.py: реализация сервера MCP с инструментами для взаимодействия с Kafkasettings.py: Управление конфигурацией с помощью Pydantic
Доступные инструменты
kafka-опубликовать
Публикует информацию в настроенной теме Kafka.
кафка-потреблять
использовать информацию из настроенной темы Kafka.
Примечание: после прочтения сообщения из темы его нельзя будет прочитать снова, используя тот же идентификатор группы.
Создать тему
Создает новую тему Kafka с указанными параметрами.
Параметры :
--topicНазвание темы для создания--partitionsКоличество разделов для выделения--replication-factorФактор репликации между брокерами--config(необязательно) Переопределение конфигурации на уровне темы (например,retention.ms=604800000)
Удалить тему
Удаляет существующую тему Kafka.
Параметры :
--topicНазвание темы для удаления--timeout(необязательно) Время ожидания завершения удаления.
Список тем
Перечисляет все темы в кластере (или отфильтровывает по шаблону).
Параметры :
--bootstrap-serverАдрес брокера--pattern(необязательно) Регулярное выражение для фильтрации названий тем--exclude-internal(необязательно) Исключить внутренние темы (по умолчанию: true)
Тема-Конфигурация
Отображает или изменяет конфигурацию для одной или нескольких тем.
Параметры :
--describeПоказать текущие конфигурации для темы--alterИзменить конфигурации (например,--add-config retention.ms=86400000,--delete-config cleanup.policy)--topicНазвание темы
Тема-Метаданные
Извлекает метаданные о теме или кластере.
Параметры :
--topic(если указано) Извлечь метаданные только для этой темы--bootstrap-serverАдрес брокера--include-offline(необязательно) Включить брокеров или разделы, которые находятся в автономном режиме