Franz¶
В данном разделе приведены сведения о Franz, плагине для СУБД Picodata.
Picodata Enterprise
Функциональность плагина доступна только в коммерческой версии Picodata.
Общие сведения¶
Franz — плагин, который позволяет импортировать данные из топиков Apache Kafka в СУБД Picodata.
Плагин поддерживает форматы сообщений JSON, Avro Object Container Files, и Avro с поддержкой Schema Registry. Для корректной работы требуется соответствие между схемой сообщений и структурой таблицы в Picodata.
Состав плагина¶
Внутри архива с плагином находится структура вложенных директорий, включающая имя и версию плагина, а также его файлы:
└── franz
└── 0.3.6
├── assets
├── libfranz_plugin.so
└── manifest.yaml
Основная логика плагина обеспечивается разделяемой библиотекой
franz-lib (libfranz_plugin.so). Исходная конфигурация плагина
задается в файле манифеста
(manifest.yaml).
Разделяемая библиотека franz-lib реализует сервис franz-service—
вместе они могут быть использованы как в составе плагина franz-plugin,
так и как самостоятельные компоненты для интеграции в сторонние решения.
Подключение плагина¶
- Запустите инстанс Picodata с поддержкой плагинов (параметр
--share-dir) - Поместите файлы плагина в директорию, указанную на предыдущем шаге
- Подключитесь к административной консоли инстанса и выполните команды, указанные ниже.
Далее последовательно выполните следующие SQL-команды в административной консоли Picodata.
CREATE PLUGIN franz 0.3.6;
ALTER PLUGIN franz 0.3.6 ADD SERVICE "kafka-consumer" TO TIER default;
ALTER PLUGIN franz 0.3.6 ADD SERVICE saver TO TIER default;
ALTER PLUGIN franz 0.3.6 ENABLE OPTION(TIMEOUT=30);
Архитектура¶
В зависимости от потребностей конечного решения, функциональность Franz может быть представлена в трех вариантах:
-
Плагин (franz-plugin) Основной сценарий использования: законченное решение для получения данных из Kafka в кластер Picodata. Позволяет получить "из коробки" интеграцию Kafka → Picodata со стандартным механизмом конфигурации плагинов.
-
Сервис (franz-service) Основной сценарий использования: интеграция полного набора задач по чтению из Kafka в собственный плагин. Основная функциональность библиотеки реализуется в виде сервиса, который подходит для интеграции в любой плагин для Picodata.
-
Библиотека (franz-lib) Основной сценарий использования: импорт и переиспользование отдельных компонентов для реализации собственной интеграции с Kafka. Позволяет использовать компоненты Franz напрямую и максимально гибко.
Компоненты Franz разделены на два сервиса:
- KafkaConsumer (
kafka-consumer) — основной процесс обработки данных. Сервис читает данные из Apache Kafka и отправляет подготовленные к записи данные сервису Saver - Saver (
saver) — вспомогательный компонент для сохранения результата обработки. Сервис принимает RPC-запросы от KafkaConsumer и записывает локально полученные данные
Концептуальная схема обработки данных при использовании Franz:
Обработка сообщения из Kafka происходит в сервисе kafka-consumer и состоит из следующих стадий:
- Получение сообщения Franz вычитывает сообщения из топика Kafka небольшими порциями для улучшения пропускной способности
- Десериализация Данные из сообщения преобразуются в объект, используя JSON или Apache Avro
- Трансформация Исходный объект из сообщения превращается в объект, готовый к записи в Picodata
- Сохранение
Обработанная порция сообщений распределяется сервисом
saverпо протоколу RPC для сохранения на конечных инстансах. - Отправка в DLQ Сообщения, которые были признаны "сломанными" на стадиях десериализации или трансформации (например, из-за неправильной структуры), отправляются в отдельный топик Kafka.
Схема работы воркера:
Сценарии использования¶
- Реализации сложных ETL-процессов
- Миграцию данных из любого источника данных в Picodata через Kafka
- Построение CQRS-системы
Конфигурация плагина¶
Исходная конфигурация Franz задается в файле manifest.yaml:
name: franz
description: A plugin for consuming messages from Kafka and saving them to configured space
version: 0.3.6
services:
- name: kafka-consumer
description: Consumes messages from Kafka
default_configuration:
tasks: []
- name: saver
description: Saves messages on behalf of kafka-consumer
default_configuration:
migration:
Описания пайплайнов обработки сообщений помещаются в секцию tasks сервиса kafka-consumer. Элементы этой секции-списка имеют следующие свойства:
target_space(обязательный) — строка Таблица в Picodata для записи данных. Должна быть создана в кластере заранее.target_tier(обязательный) — строка Целевой тир. Должен быть настроен в кластере заранее.-
consumer(обязательный) — конфигурация потребителя сообщений:broker(обязательный) — строка Адрес брокера Kafka в формате "host:port" (поддерживается указание нескольких брокеров через запятую)topic(обязательный) — строка Название топика Kafka для подпискиconsumer_group(обязательный) — строка Идентификатор группы потребителей (group.id)batch_size(по умолчанию: 100) — целое положительное число Количество сообщений которые будут обработаны пачкой за одну итерацию обработкиbuffer_size(по умолчанию: 5 000 000) — целое положительное число Максимальный размер временного буфера сообщений, который используется для предварительного сохранения полученных из Kafka сообщенийopts(опциональный) — маппинг (строка) -> (строка)
Дополнительные параметры для драйвера Kafka cм. в документации librdkafka
-
decoder(по умолчанию: JSON) — конфигурация десериализации сообщений. Имеет обязательное свойствоtype, а также может иметь дополнительные свойства — в зависимости от выбранного декодера:-
JSON— не имеет дополнительных параметровtype: "json"
-
Avro— десериализация с использованием схем Apache Avrotype: "avro"remote_registry_url(обязательный) — строка Адрес Schema Registry, который предоставит схемы для десериализации сообщений.
-
Avro OCF— десериализация сообщений из Apache Avro Object Container Filestype: "avro_ocf"
-
-
flow_control(обязательный) — конфигурация потока обработкиstep_delay_ms(обязательный) — целое положительное число
Задержка между итерациями получения и обработки пачки сообщений. Чем меньше значение, тем чаще сервис будет опрашивать Kafka на появление новых сообщений.
step_hard_duration_ms(обязательный) — целое положительное число
Максимальная продолжительность одной итерации обработки пачки сообщений. Защищает от зависания стадий обработки (например декодирования или трансформации). Если время обработки закончится, то текущая итерация прервётся и повторится после
step_delay_ms
Примечание
Конфигурация DLQ и трансформаций сообщения не
поддерживаются через конфигурацию плагина и требуют использования
библиотеки franz-lib.
Мониторинг¶
Сообщения плагина попадают в общий журнал кластера Picodata.
Помимо записи в журнал, плагин экспортирует набор метрик в формате Prometheus, которые позволяют следить за состоянием системы, производительностью и использованием ресурсов.
Метрики можно получить стандартным для Picodata способом
Основные метрики сервиса kafka-consumer для мониторинга чтения
сообщений из источника:
- потребленные сервисом сообщения Kafka
- записанные оффсеты Kafka
- показатель отставания сервиса (см. consumer lag)
- смещение репликации сообщений Kafka (high/low watermark)
Примечание
Для сбора метрик сервисом kafka-consumer необходимо указать
периодичность сбора статистики в конфигурации потребителя сообщений
через параметр consumer.opts(см.
statistics.interval.ms)
Полный список метрик, поддерживаемый librdkafka, содержится в файле STATISTICS.md
Основные метрики аллокатора Rust для мониторинга использования памяти, выделенной через jemalloc или системный аллокатор процессу:
- количество аллокаций
- объем выделенной памяти
- объем деаллоцированной памяти
Рекомендации¶
- Используйте разные
consumer_groupдля разных процессов/кластеров
Приложение¶
Конвертируемость типов Apache Avro ↔ Picodata¶
Типы, поддерживаемые Picodata, могут быть получены из полей сообщения в формате Apache Avro следующим образом:
| Целевой тип данных Picodata | Исходные типы данных Apache Avro |
|---|---|
| BOOLEAN | boolean |
| DECIMAL | string, decimal, float, double |
| DOUBLE | int, long, float, double |
| INTEGER | int, long |
| JSON | not supported |
| TEXT, VARCHAR | string, int, long, boolean, float, double (1) |
| UUID | string, uuid |
| DATETIME | int, long, string, date, timestamp-micros |
| UNSIGNED | int, long (2) |
(1)Числа будут отформатированы в строку с использованием основания 10, булевы значения преобразуются в строку как"true"и"false"соответственно.(2)Требуются положительные значения полей с этими типами, иначе возникнет ошибка при сохранении обработанных записей.