Камунда.РФ 8 с Kafka Exporter

Назначение Kafka Exporter

Kafka Exporter используется для публикации событий из Камунда.РФ 8 в Kafka (или Kafka-compatible брокерах, например Redpanda).

Экспортер позволяет:

  • получать события жизненного цикла процессов

  • обрабатывать события job’ов

  • сохранять историю процессов во внешних системах

  • строить аналитику и мониторинг

Предварительные требования

  • Запущенная Камунда.РФ 8

  • Kafka или Kafka-compatible брокер (например Redpanda)

  • Docker / Docker Compose

  • JAR-файл Kafka Exporter (его можно скачать из закрытого nexus-репозитория)

Структура каталогов

Рекомендуемая структура проекта:

.
├── docker-compose.yml
├── agent/
│   └── zeebe-kafka-exporter.jar
└── config/
    └── broker.yaml

Каталог agent/ используется для хранения расширений Zeebe

Подключение Exporter к контейнеру Zeebe

Kafka Exporter должен быть смонтирован внутрь контейнера Zeebe.

Пример конфигурации в docker-compose.yml:

zeebe:
  image: <zeebe-image>
  volumes:
    - ./agent/zeebe-kafka-exporter.jar:/tmp/zeebe-kafka-exporter.jar

В результате JAR-файл будет доступен в контейнере по пути:

/tmp/zeebe-kafka-exporter.jar

Конфигурация broker.yaml

Экспортер подключается через конфигурацию Zeebe Broker.

Пример минимальной настройки Kafka Exporter:

zeebe:
  broker:
    exporters:
      kafka:
        className: rf.kamunda.exporters.kafka.KafkaExporter
        jarPath: /tmp/zeebe-kafka-exporter.jar
        args:
          producer:
            servers: "redpanda:9092"
            config: |
              clusterName: "clusterName"

Описание параметров

Параметр Описание

className

Полное имя класса экспортера

jarPath

Путь к JAR-файлу внутри контейнера

args.bootstrapServers

Адрес Kafka брокера

args.config.clusterName

Название вашего кластера

Проверка загрузки Exporter

После запуска Zeebe в логах должно появиться сообщение о загрузке экспортера.

Проверка логов:

docker compose logs -f zeebe

Ожидаемые признаки:

  • отсутствие ошибок загрузки JAR

  • сообщение об инициализации Kafka Exporter

  • успешное подключение к Kafka

Конфигурация экспортера

zeebe:
  broker:
    exporters:
      kafka:
        className: rf.kamunda.exporters.kafka.KafkaExporter
        # Указывает расположение JAR-файла экспортера
        jarPath: /path/to/zeebe-kafka-exporter.jar
        args:
          # Управляет количеством записей, буферизуемых в одном пакете,
          # перед принудительной отправкой (flush).
          # Значение по умолчанию — 100
          maxBatchSize: 100

          # Максимальное время блокировки (в миллисекундах), если пакет заполнен.
          # Если пакет заполнен и приходит новая запись, экспортёр будет ждать,
          # пока не освободится место в пакете, либо пока не истечёт
          maxBlockingTimeoutMs: 1000

          # Как часто ожидающие пакеты должны отправляться (flush)
          # в Kafka-брокер.
          flushIntervalMs: 1000

          # Конфигурация, специфичная для Kafka-продюсера
          producer:
            # Список адресов подключения к Kafka-брокерам.
            # Формат должен соответствовать: "host:port"
            servers: "redpanda:9092"

            # Определяет, сколько времени продюсер будет ждать подтверждения
            # от Kafka-брокера перед повторной попыткой отправки запроса
            requestTimeoutMs: 5000

            # Период корректного завершения работы продюсера
            # при остановке (в миллисекундах)
            closeTimeoutMs: 5000

            # Идентификатор Kafka-продюсера
            clientId: zeebe

            # Любые параметры в этом разделе будут переданы напрямую
            # в ProducerConfig. Это можно использовать для настройки
            # аутентификации, сжатия и т.п.
            config: |
               linger.ms=5
              buffer.memory=8388608
              batch.size=32768
              max.block.ms=5000
              clusterName: "myCluster"
        records:
          # Если тип значения записи не указан в конфигурационном файле,
          # будет использовано значение, заданное в defaults
          defaults: { type: "event", topic: zeebe }

          # Для записей со значением типа DEPLOYMENT
          deployment: { topic: zeebe-deployment }

          # Для записей со значением типа DEPLOYMENT_DISTRIBUTION
          deploymentDistribution: { topic: zeebe-deployment-distribution }

          # Для записей со значением типа ERROR
          error: { topic: zeebe-error }

          # Для записей со значением типа INCIDENT
          incident: { topic: zeebe-incident }

          # Для записей со значением типа JOB_BATCH
          jobBatch: { topic: zeebe-job-batch }

          # Для записей со значением типа JOB
          job: { topic: zeebe-job }

          # Для записей со значением типа MESSAGE
          message: { topic: zeebe-message }

          # Для записей со значением типа MESSAGE_SUBSCRIPTION
          messageSubscription: { topic: zeebe-message-subscription }

          # Для записей со значением типа MESSAGE_START_EVENT_SUBSCRIPTION
          messageStartEventSubscription: { topic: zeebe-message-subscription-start-event }

          # Для записей со значением типа PROCESS
          process: { topic: zeebe-process }

          # Для записей со значением типа PROCESS_EVENT
          processEvent: { topic: zeebe-process-event }

          # Для записей со значением типа PROCESS_INSTANCE
          processInstance: { topic: zeebe-process-instance }

          # Для записей со значением типа PROCESS_INSTANCE_RESULT
          processInstanceResult: { topic: zeebe-process-instance-result }

          # Для записей со значением типа PROCESS_MESSAGE_SUBSCRIPTION
          processMessageSubscription: { topic: zeebe-process-message-subscription }

          # Для записей со значением типа TIMER
          timer: { topic: zeebe-timer }

          # Для записей со значением типа VARIABLE
          variable: { topic: zeebe-variable }

Пример docker-compose файла

Важно иметь следующую структуру каталогов:

.
├── docker-compose.yml
├── agent/
│   └── zeebe-kafka-exporter.jar
└── config/
    └── broker.yaml

docker-compose.yml

name: KamundaRF
networks:
  net:
    driver: bridge
volumes:
  redpanda: null

services:
  redpanda:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      - --rpc-addr redpanda:33145
      - --advertise-rpc-addr redpanda:33145
      - --mode dev-container
      - --smp 1
      - --default-log-level=info
    image: docker.redpanda.com/redpandadata/redpanda:v25.1.9
    container_name: redpanda
    volumes:
      - redpanda:/var/lib/redpanda/data
    networks:
      - net
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v3.1.3
    networks:
      - net
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda:9092"]
        schemaRegistry:
          enabled: true
          urls: ["http://redpanda:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda
  zeebe:
    image: harbor.boos.solutions/kamundarf/kamundarf8:1.0.0-SNAPSHOT
    depends_on:
      - redpanda
    volumes:
      - ./agent/zeebe-kafka-exporter.jar:/tmp/zeebe-kafka-exporter.jar
      - ./config/broker.yaml:/usr/local/zeebe/config/application.yaml
    environment:
      - ZEEBE_BROKER_NETWORK_HOST=0.0.0.0
      - ZEEBE_BROKER_GATEWAY_CLUSTER_HOST=0.0.0.0
      - ZEEBE_BROKER_GATEWAY_ENABLE=true
      - ZEEBE_RESTORE=false
      - ROCKSDB_MUSL_LIBC=false
    extra_hosts:
      host.testcontainers.internal: host-gateway
    ports:
      - 26500:26500
      - 9600:9600
    networks:
      - net
  simple-monitor:
    image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.6.0
    container_name: zeebe-simple-monitor
    depends_on:
      - zeebe
      - redpanda
    networks:
      - net
    environment:
      SPRING_PROFILES_ACTIVE: kafka

      ZEEBE_CLIENT_BROKER_GATEWAYADDRESS: zeebe:26500
      ZEEBE_CLIENT_SECURITY_PLAINTEXT: "true"

      SPRING_KAFKA_BOOTSTRAP_SERVERS: redpanda:9092
    ports:
      - 8082:8082

broker.yml

zeebe:
  broker:
    exporters:
      kafka:
        className: rf.kamunda.exporters.kafka.KafkaExporter
        jarPath: /tmp/zeebe-kafka-exporter.jar
        args:
          producer:
            servers: "redpanda:9092"
            config: |
              linger.ms=5
              buffer.memory=8388608
              batch.size=32768
              max.block.ms=5000
              clusterName: "cluster"

Локальный запуск:

docker compose up

В результате вы получите:

Сервис Адрес

Zeebe Gateway

localhost:26500

Redpanda Console

http://localhost:8080

Zeebe Simple Monitor

http://localhost:8082

Здесь используется Kafka-compatible брокер - Redpanda. Отправленные сообщения можно посмотреть в Redpanda Console. Отслеживать и запускать процессы можно через Zeebe Simple Monitor.

Итог

Kafka Exporter является ключевым механизмом интеграции Камунда.РФ 8 с внешними системами хранения.

Ниже прикреплен пример в котором содержится:

  • docker-compose файл

  • Конфигурация для брокера

  • Инструкция по скачиванию экспортера

  • spring-boot приложение которое инициализирует несколько воркеров для исполнения задач и деплоит простой процесс в Камунда.РФ 8, которым можно управлять через Web-интерфейс (localhost:8082)