Kirovets¶
В данном разделе приведены сведения о Kirovets, плагине для СУБД Picodata.
Picodata Enterprise
Функциональность плагина доступна только в коммерческой версии Picodata.
Общие сведения¶
Kirovets — это платформа виртуализации данных, разработанная как расширяемый плагин для СУБД Picodata. Ее основная задача — организовать эффективный сбор, обработку, хранение и публикацию данных в виде HTTP API. Платформа позволяет проектировать собственные витрины данных и обрабатывать входящие данные с использованием пользовательской логики на языке Lua.
Система предназначена для использования в сценариях, где важно централизованное хранение данных из различных источников и возможность публикации витрин, агрегатов и аналитических представлений через API.
Основные термины
Витрина данных — это конфигурируемый срез данных из хранилища, определяемый пользователем через SQL- или Lua-обработчики, привязанные к конечным точкам системы. Витрины обеспечивают гибкий доступ к данным, адаптируясь под конкретные требования пользователей (например, отчеты, аналитические выборки или интеграционные сценарии).
Конечная точка — это уникальный URL-адрес, связанный с определенным обработчиком запросов (SQL или Lua), который предоставляет доступ к витрине данных. Если витрина — это логический срез данных (например, «продажи по регионам»), то конечная точка — это способ доступа к нему через API (например, URL /api/sales с параметром region).
Триггер (Trigger) — это пользовательская Lua-функция, автоматически выполняемая при изменении данных в связанной таблице (добавлении, обновлении или удалении записи). Ключевые свойства:
- запускается системой без явного вызова пользователем, реагируя на события модификации данных (
INSERT
,UPDATE
,DELETE
). - позволяет модифицировать как другие таблицы, так и ту, к которой привязан триггер, но так как выполнение происходит после фактической модификации данных в таблице, то мутация входящих данных выполнена быть не может. По сути триггер — это реакция на уже совершенное изменение данных.
- позволяет реализовать произвольную обработку: обновление связанных таблиц, отправку уведомлений, логирование изменений и т.д.
- каждая таблица может иметь несколько триггеров, которые выполняются последовательно в порядке, указанном в конфигурации.
- внутри функции доступны:
- изменяемые данные (например, значения полей новой записи),
- контекст операции (тип события —
create
,update
,delete
).
Подробнее см. в разделе Пользовательские триггеры
Отличие от витрины данных:
Если витрина данных реагирует на HTTP-запросы и возвращает данные, триггер срабатывает на события БД и может вернуть только ошибку (его цель — побочные эффекты).
Фильтр (Filter) — это пользовательская Lua-функция, выполняемая до попытки изменения данных в таблице (создания, обновления или удаления записи) и определяющая, должно ли изменение быть применено.
Ключевые свойства:
- анализирует данные и контекст операции (например, поля новой записи или условия удаления).
- может отклонить изменение, вернув
false
или ошибку, либо разрешить его (вернувtrue
). - привязывается к конкретной таблице в конфигурации и может быть только один.
- внутри функции доступны:
- новые/изменяемые данные (для
INSERT
/UPDATE
), - старые данные (для
UPDATE
/DELETE
), - тип операции (
"create"
,"update"
,"delete"
).
- новые/изменяемые данные (для
Примеры использования:
- запрет удаления записей с определенным статусом.
- проверка корректности формата данных перед сохранением.
- динамическое ограничение прав доступа ("только автор может редактировать").
Отличия от триггера:
- Триггер срабатывает после изменения и не может его отменить (только реагирует).
- Фильтр выполняется до изменения и имеет право вето.
Обработчик (Handler) — это пользовательская логика, написанная на Lua, которая определяет поведение системы при выполнении определенных операций. В зависимости от контекста, обработчик может выступать в различных ролях:
- витрина данных
- триггер
- фильтр
SQL-витрина — это SQL-запрос, находящийся в файле конфигурации, который может выступать как обработчик витрины данных.
Импортер — компонент, обеспечивающий интеграцию с внешними источниками данных. Импортер может быть пакетным, загружающим весь объем данных из источника, или потоковым, загружающим поток изменений, приходящий из источника.
Сервис — концепция системы плагинов СУБД Picodata, представляющая собой изолированную часть функциональности. Аналогом этой концепции является микросервис. Сервис может быть запущен на одном или нескольких узлах СУБД Picodata. Сервис взаимодействует с другими сервисами и может предоставлять интерфейс взаимодействия пользователю системы, например, HTTP API.
Задача — операция, которая выполняется по расписанию внутри платформы. На данный момент поддерживается только операция запуска импорта пакетного импортера.
Архитектура и компоненты системы¶
Архитектура платформы разделена на 4 ключевых сервиса, каждый из которых выполняет уникальные функции в системе.
1. Router¶
Сервис, отвечающий за взаимодействие с потребителями данных. Основные функции:
- обработка входящих HTTP-запросов от пользователей и выполнение соответствующих витрин данных
- предоставление API для администрирования системы, а также UI
- выполнение запросов к хранилищу для получения данных
2. Storage¶
Основной сервис хранения данных, обеспечивающий:
- хранение данных согласно сконфигурированной схеме
- получение и применение данных, переданных импортерами
- выполнение пользовательских фильтров и триггеров (на Lua)
3. Importer¶
Компонент, обеспечивающий интеграцию с внешними источниками данных:
- чтение данных из потоковых и пакетных источников
- отправку данных в хранилище данных
4. Task Coordinator¶
Служба, отвечающая за запуск задач по расписанию:
- поддерживает cron-расписания, а также однократные и непрерывные задачи
- позволяет настроить автоматический импорт данных
- обеспечивает периодическое выполнение задач в фоновом режиме
- позволяет запускать произвольные задачи, написанные на Lua
Конфигурация системы¶
Конфигурация описывается в нескольких файлах:
config.yml
содержит описание всех компонентов системы: таблиц, импортеров, задач и API.- файлы с расширением lua в директории
datamarts
содержат lua-функции, которые могут быть использованы как для триггеров и фильтров, так и для витрин данных - файлы с расширением sql в директории
datamarts
содержат запросы на языке SQL, которые могут быть использованы как витрины данных
Пример структуры директории с конфигурацией:
.
├── config.yaml
└── datamarts
├── datamarts.lua
├── filters.lua
├── relations.sql
└── triggers.lua
Загрузка конфигурации¶
После того как все конфигурационные файлы подготовлены, их необходимо
загрузить в платформу "Кировец". Это можно сделать двумя основными
способами: через командную строку с использованием curl
или через
графический пользовательский интерфейс.
Подготовка ZIP-архива¶
Перед началом работы все конфигурационные файлы необходимо упаковать в
ZIP-архив. Важно, чтобы файл config.yaml
находился на верхнем уровне
этого архива (не внутри поддиректории).
Создайте такой архив, выполнив следующую команду в корневой директории вашей конфигурации:
zip -r ../config.zip *
Эта команда создаст в родительской директории ZIP-архив config.zip
c
нужным набором файлов и директорий.
Загрузка конфигурации с помощью curl
¶
Для загрузки конфигурации через API выполните POST-запрос к конечной
точке /admin/config
платформы "Кировец". Если у вас включена
авторизация, не забудьте включить соответствующие авторизационные
заголовки в ваш запрос.
Пример команды curl
:
curl -X POST 'kirovets.local/admin/config' --data-binary @config.zip -H "Content-Type: application/zip"
kirovets.local/admin/config
: URL-адрес вашей платформы "Кировец" и конечная точка для загрузки конфигурации. Заменитеkirovets.local
на актуальный адрес вашей системы.--data-binary @config.zip
: указываетcurl
отправить содержимое файлаconfig.zip
в качестве тела запроса.-H "Content-Type: application/zip"
: указывает серверу, что отправляемые данные представляют собой ZIP-архив.
Загрузка конфигурации через пользовательский интерфейс¶
Платформа "Кировец" также предоставляет удобный графический интерфейс для управления конфигурацией. Выполните следующие шаги:
- откройте веб-браузер и перейдите по адресу
/config
относительно базового URL вашей платформы. Например, если ваша платформа доступна поhttp://kirovets.local
, то UI конфигурации будет доступен поhttp://kirovets.local/config
- на странице пользовательского интерфейса вы увидите редактор конфигурации и кнопки управления
- нажмите кнопку
Upload
для выбора ранее созданного ZIP-архива с конфигурацией. После выбора нажмите кнопку подтверждения в окне выбора файла
Таблицы (schema
)¶
Каждая таблица описывается через поля, индексы и источники данных. Поддерживаются следующие опции:
fields
: определяет столбцы таблицы, их типы данных и дополнительные свойстваindexes
: определяют, по каким полям будут созданы индексы для ускорения поиска и фильтрации данныхsources
: список источников (ogg
,parquet
и другие)affinity
: определяет стратегию распределения данных таблицы по узлам кластераtriggers
: Lua-функции, выполняемые после применения измененийfilter
: Lua-функция, определяющая применение изменений
Пример:
schema:
table1:
fields:
- name: id
type: string
is_nullable: false
indexes:
pk:
fields:
- id
affinity:
- id
triggers:
- triggers.table1
filter: filters.table1
sources:
ogg:
table: TABLE1
Определение полей (fields)¶
Раздел fields
описывает структуру каждого столбца (поля) в вашей
таблице. Каждое поле должно иметь имя, тип данных и, при необходимости,
дополнительные свойства. Раздел представляет собой массив с объявлениями
полей. Поля хранятся в рядах таблицы в том порядке, в каком они
объявлены в конфигурации.
Объявление поля состоит из трех обязательных атрибутов:
name
— имя поляtype
— тип данных в полеis_nullable
— признак нулабельности поля. Если выставлено вtrue
, то в поле может быть записано значениеnull
. В противном случае в поле могут быть записаны данные только указанного типа
Список поддерживаемых типов полей:
unsigned
— целые неотрицательные числаstring
— строка любой длиныnumber
— число, в том числе и с дробной частьюinteger
— целые числа со знакомboolean
— булево значениеdecimal
— используется для хранения точных числовых значений с заданной точностью и масштабомuuid
— идентификатор UUIDdatetime
— дата и время в соответствии с RFC 3339
Также в конце каждого ряда существует сервисное поле apply_at
, которое имеет тип integer
.
Определение индексов (indexes)¶
Раздел indexes
— это словарь, где ключом выступает имя
индекса, а значением — его описание. Можно задать один или несколько
индексов. Один из индексов обязательно должен называться pk
. Этот
индекс будет первичным ключом таблицы. Поля, входящие в него, не могут
иметь признак нулабельности.
Объявление состоит из атрибута fields
— массива имен полей, входящих в
индекс. Порядок важен, см ниже.
Все запросы к данным в системе выполняются исключительно по индексам. При этом, допустимы запросы только по части полей,
входящих в индекс, но эти поля должны идти с начала индекса. Таким образом, порядок полей в запросе имеет значение.
Например, если индекс состоит из полей field1
, field2
, field3
, то можно выполнить запрос по field1
, либо по комбинации
field1
и field2
, но нельзя запросить данные только по field2
или
field3
.
Определение локализации по значениям полей (affinity)¶
Платформа "Кировец" обладает распределенным хранилищем данных. Это
значит, что каждый набор реплик хранит только свою часть данных.
Определение набора реплик, на которые попадут данные, производится путем
вычисления хеша от полей, указанных в поле affinity
. Правильное
использование affinity
позволяет оптимизировать производительность за
счет минимизации сетевого трафика при выполнении запросов. Когда вы
определяете affinity
по одному или нескольким полям, "Кировец"
гарантирует, что все записи, имеющие одинаковые значения в этих полях,
будут храниться на одном и том же узле кластера.
Поле представляет собой массив с именами полей.
При использовании affinity
не гарантируется уникальность первичного
ключа, если данные попали на разные узлы хранения.
Определение триггеров (triggers)¶
Параметр triggers
— это массив с именами функций. Каждая функция — в
том порядке, в каком она объявлена в конфигурации — будет выполнена
после изменения данных в таблице, к которой она привязана. Триггер
представляет собой lua-функцию, которая может менять данные в системе.
Имя функции в конфигурации формируется из имени файла в директории
datamarts
, где эта функция объявлена, и самого имени функции,
объединенных через точку. Подробнее о написании функций см ниже.
Определение фильтра (filter)¶
Объект filter
— это строка, содержащая имя Параметр-фильтра, которая
будет применена до изменения данных в таблице и которая позволяет эти
изменения отменить. Как и у триггера, имя функции-фильтра формируется из
имени файла, где она объявлена и имени самой функции, объединенных через
точку. Подробнее о написании функций см ниже.
Определение источников данных для таблицы (sources)¶
Параметр sources
определяет, откуда система будет загружать данные для
таблицы. Это словарь, где:
ключ
— имя импортера (должно соответствовать секцииimporters
),значение
— конфигурация этого импортера для текущей таблицы.
Поддерживаемые типы импортеров:
OGG-импортер
Используется для загрузки изменений данных через Oracle GoldenGate.
Доступные параметры:
table
(обязательный) — имя таблицы-источника в OGG.
Особенности работы:
OGG генерирует два типа файлов:
- DSV-файлы — содержат операции изменения данных (INSERT/UPDATE/DELETE),
- Control-файлы — перечисляют актуальные DSV-файлы (формат:
<TABLE_NAME>control
, гдеTABLE_NAME
— имя таблицы в источнике).
Пример конфигурации:
sources:
ogg:
table: TABLE1 # Имя control-файла будет TABLE1control
- Parquet-импортер
Загружает данные из файлов в формате Parquet.
Доступные параметры (в блоке files):
type
(обязательный) — тип источника:dir
— загрузка всех файлов из указанной директории,files
— загрузка конкретного списка файлов.
Для типа dir
доступен параметр:
dir
— путь к директории с Parquet-файлами.
Для типа files
доступен параметр:
files
— список путей к файлам.
Пример конфигурации:
# Загрузка всех файлов из директории
sources:
parquet:
files:
type: "dir"
dir: "dev/example_data/parquets/table1"
# Загрузка конкретных файлов
sources:
parquet:
files:
type: "files"
files:
- "path/to/file1.parquet"
- "path/to/file2.parquet"
Пользовательские фильтры¶
Пользовательский фильтр — это функция, написанная на языке Lua, которая предоставляет пользователю возможность определить критерии для применения операции модификации (создания, обновления и удаления) к текущим сохраненным данным. Другими словами, при помощи фильтра можно принять решение: применять ли операцию из источника данных или пропустить ее. Фильтр срабатывает только на изменения, пришедшие из источника данных, но не созданные триггером.
Конфигурация фильтра¶
Для использования пользовательских фильтров необходимо создать
Lua-модуль (например, sample.lua
) и разместить его в директории
datamarts
, расположенной в корневом каталоге конфигурации (например,
datamarts/sample.lua
). Модуль должен возвращать таблицу, содержащую
все доступные в нем функции. При этом, один модуль может содержать
функции, используемые как в качестве фильтра, так и в качестве триггера.
Однако, рекомендуется разделять фильтры и триггеры по разным модулям.
Пример модуля с пользовательскими фильтрами:
local function filter1(old, new)
-- Логика фильтрации
return true
end
local function filter2(old, new)
-- Логика фильтрации
return true
end
return {
filter1 = filter1,
filter2 = filter2,
}
Далее, фильтры необходимо указать в конфигурации системы. Для каждой
таблицы можно задать свой фильтр, для этого в секции schema.<имя
таблицы>.filter
укажите его в формате <имя модуля>.<имя фильтра>
.
Например:
schema:
TBL:
filter: sample.filter1
Сигнатура пользовательской функции¶
Пользовательская функция должна иметь следующую сигнатуру:
local function filter(old, new)
-- Логика фильтрации
end
Здесь:
old
— таблица, представляющая старый объект (данные, находящиеся в таблице)new
— таблица, представляющая новый объект (данные, поступающие для обработки)
Функция фильтрации должна возвращать одно из следующих значений:
true
— операция применяется к таблице, и транзакция завершаетсяfalse
— операция пропускается, и транзакция завершаетсяnil, error
— где error — это объект ошибки, который сериализуется в строку: строка или объект библиотекиerrors
В таком случае операция пропускается, а ошибка выводится в журнал
Особенности разработки логики фильтрации:
Логика фильтрации должна учитывать наличие трех видов операций: update
, create
и delete
.
-
Операция
update
Только в этом случае оба объекта (old
иnew
) представлены непустыми таблицами, поскольку обновление предполагает наличие как исходных данных, так и данных после изменения. -
Операция
create
В случае операцииcreate
старый объект (old
) представлен пустой таблицей, так как данные до создания записи отсутствуют. -
Операция
delete
В случае операцииdelete
новый объект (new
) представлен пустой таблицей, так как после удаления записи данных для объекта больше не существует.
Пример пользовательской функции, учитывающей все описанные случаи:
local function filter(old, new)
-- Обработка операции create: old — пустая таблица
if next(old) == nil then
return true -- Принимаем новый объект
end
-- Обработка операции delete: new — пустая таблица
if next(new) == nil then
return true -- Удаляем существующий объект
end
-- Основная логика фильтрации (например, сравнение временных меток)
if old.timestamp and new.timestamp and old.timestamp >= new.timestamp then
return false -- Запрещаем обновление объекта в базе
end
return true -- Разрешаем обновление объекта в базе
end
В данном случае фильтрация фактически применяется только к операции
update
, для других случаев операция всегда применяется.
Пользовательские триггеры¶
Пользовательский триггер — это Lua-функция, позволяющуая реагировать на изменения данных в связанной таблице и выполнять произвольные действия в ответ на эти изменения. Триггер срабатывает только на данные, которые попали в таблицу из источника данных, но не на записанные другим триггером. Триггер срабатывает после изменения и не может его отменить (только реагирует).
Конфигурация триггеров¶
Для использования пользовательских триггеров необходимо создать
Lua-модуль (например, sample.lua
) и разместить его в директории
datamarts
, расположенной в корне конфигурации (например,
datamarts/sample.lua
). Модуль должен возвращать таблицу, содержащую
все функции, доступные в модуле. При этом, один модуль может содержать
функции используемые как в качестве фильтра, так и в качестве триггера.
Однако, рекомендуется разделять фильтры и триггеры по разным модулям.
Пример модуля с пользовательскими триггерами:
local function trigger1(old, new, operation)
-- Логика триггера
return true
end
local function trigger2(old, new, operation)
-- Логика триггера
return true
end
return {
trigger1 = trigger1,
trigger2 = trigger2,
}
Далее, триггеры необходимо указать в конфигурации системы. Для каждой
таблицы можно задать несколько триггеров, для этого в секции
schema.<имя таблицы>.triggers
укажите их в формате <имя модуля>.<имя
фильтра>
:
schema:
TBL:
triggers:
- sample.trigger1
- sample.trigger2
Триггеры будут выполнены в той последовательности, в какой они указаны в конфигурации.
Сигнатура пользовательской функции¶
Пользовательская функция должна иметь следующую сигнатуру:
local function trigger(old, new, operation)
-- Логика фильтрации
end
Здесь:
old
— таблица, представляющая старый объект (данные, находившиеся в таблице до изменения).new
— таблица, представляющая новый объект (данные, находящиеся в таблице после изменения).operation
— текстовое наименование операции. Возможные значения:update
,create
иdelete
.
Функция-триггер может вернуть nil, error
, где error
— это объект,
который можно сериализовать в строку (например, строка или ошибка из
пакета errors
). В таком случае ошибка попадет в журнал, а следующий
триггер в цепочке (если он есть) будет выполнен. Любые другие
возвращаемые значения, кроме nil
, считаются успешным выполнением
триггера.
Особенности разработки логики триггера:
Логика триггера должна учитывать наличие трех видов операций: update
,
create
и delete
.
-
Операция
update
Только в этом случае оба объекта (old
иnew
) представлены непустыми таблицами, поскольку обновление предполагает наличие как исходных данных, так и данных после изменения. -
Операция
create
В случае операцииcreate
старый объект (old
) представлен пустой таблицей, так как данные до создания записи отсутствуют. -
Операция
delete
В случае операцииdelete
новый объект (new
) представлен пустой таблицей, так как после удаления записи данных для объекта больше не существует.
Пример пользовательской функции, учитывающей все описанные случаи:
local function update_destination(old, new, operation)
if operation == 'delete' then --- игнорируем операции delete
return true
end
--- Получаем данные из таблицы по идентификатору из нового объекта
local rv, err = sql.execute('select * from table_with_trigger_destination where user_id=?', {new.user_id});
if err then
--- Возвращаем ошибку в случае если select не удался. Ошибка будет отображена в журнале платформы.
return nil, err
end
--- select всегда возвращает массив данных, даже если там всего один объект
local user = rv[1]
if not user then
log.verbose("Object with id %s doesn't exist", new.user_id)
--- Создаем объект в таблице на основе нового объекта
return sql.execute(
"insert into table_with_trigger_destination (user_id, value, apply_at) values (?, ?, ?)",
{
new.user_id,
new.value,
--- Поле находится в каждой таблице и является обязательным.
--- Оно представляет собой время обновления объекта
math.floor(fiber.time())
}
)
end
log.verbose("Update user with id %s with value %s", new.user_id, new.value)
--- Обновляем существующий объект в таблице на основе нового объекта
return sql.execute("update table_with_trigger_destination set value = ?, apply_at = ? where user_id = ?", {
new.value,
math.floor(fiber.time()),
new.user_id,
})
end
Данная логика обеспечивает синхронизацию между таблицами: при изменении данных в основной таблице автоматически создается или обновляется соответствующий объект в зависимой таблице.
Импортеры (importers
)¶
Импортеры являются элементом платформы "Кировец" для загрузки данных из внешних источников в ваши таблицы. В разделе importers
файла config.yaml
вы определяете, откуда и как данные будут поступать в систему, а также настраиваете правила их обработки.
Каждый импортер имеет уникальное имя и содержит конфигурацию, зависящую от его типа. "Кировец" поддерживает два основных типа импортеров: пакетные импортеры для периодической загрузки файлов и потоковые импортеры для непрерывной обработки изменений из систем типа CDC (Change Data Capture).
- Пакетные (batch) импортеры:
- загружают статичный срез данных на определенный момент времени
- предоставляют полный "слепок" данных
- используются для первоначальной загрузки или полного обновления данных
- Потоковые (stream) импортеры:
- обеспечивают непрерывный поток изменений данных
- передают последовательность модификаций (добавления/обновления/удаления)
- используются для инкрементального обновления данных в реальном времени
Ключевое отличие: пакетные импортеры работают с фиксированным состоянием данных, тогда как потоковые обрабатывают изменения данных во времени.
Существуют следующие пакетные импортеры:
parquet
— импортирует данные из parquet-файлов
Существуют следующие потоковые импортеры:
ogg
: импортирует данные, предоставляемые Oracle Golden Gate через flat files
Секция importers
определяет, откуда платформа будет получать данные. Это словарь, где:
ключ
— имя импортера. Оно используется при настройке таблиц в атрибуте sources и при настройке задач в секции tasks.значение
— конфигурация этого импортера
OGG¶
Импортер потребляющий данные, формируемые Oracle Golden Gate в формате flat files. Импортер ожидает следующий вид операций в dsv-файлах:
- INSERT-операция все OLD-значения должна иметь в NULL (или вообще их не указывать), а для всех ненулабельных полей схемы
иметь ненулевое NEW-значение. Пример корректной строки:
"ROW_ID"|NULL|"NEW_ROW_ID_1"|"CREATED"|NULL|"1988-09-25 17:45:30.00"|"CURR_STG_ID"|NULL|"NEW_STG_ID"|
- UPDATE-операция должна содержать только первичный ключ и обновленные поля. Пример корректной строки:
"ROW_ID"|"ROW_ID_1"|"ROW_ID_1"|"CREATED"|"2000-09-25 17:45:30.00"|"1988-09-25 17:45:30.00"|"CURR_STG_ID"|"PREV_STG_ID"|"NEW_STG_ID"|
- DELETE-операция должна содержать
NULL
во всех NEW-значениях полей(в т.ч. для первичного ключа). Пример корректной строки:"ROW_ID"|"ROW_ID_1"|NULL|"CREATED"||NULL|"NEW_STG_ID"||NULL|
Рекомендации по настройке OGG для взаимодействия с импортером см в Приложении №1
Импортер из OGG поддерживает следующие атрибуты конфигурации:
type
(обязательный) — тип импортера данных. В данном случаеogg
file_dir
(обязательный) — путь до директории с control-файлами.ogg_home
— задает базовый путь, который объединяется с относительными путями из control-файла для формирования абсолютных путей к DSV-файлам. Если не задан, то равенfile_dir
. Используется в случае, если control-файлы и dsv-файлы находятся в разных директориях, при этом control-файл содержит относительные пути до файловmax_batch_size
(обязательный) — задает размер пакета с записями, которые будут отправлены из импортера в слой хранения за один раз
Пример:
importers:
ogg:
type: "ogg"
file_dir: dev/example_data/ogg
max_batch_size: 1000
Parquet¶
Импортер, потребляющий данные из файлов в формате parquet.
Импортер из parquet поддерживает следующие атрибуты конфигурации:
type
(обязательный) — тип импортера данны. В данном случаеparquet
Задачи (tasks
)¶
Раздел tasks
в вашем файле config.yaml
позволяет автоматизировать выполнение различных операций в платформе "Кировец".
Вы можете настроить запуск импортеров, выполнение пользовательских Lua-скриптов (пока не поддерживается) и другие фоновые
процессы по определенному расписанию или в непрерывном режиме.
Каждая задача имеет уникальное имя и содержит конфигурацию, определяющую ее тип, расписание и выполняемые действия. "Кировец" поддерживает три основных типа задач:
- Задачи по расписанию (scheduled): Запускаются в определенные моменты времени, используя Cron-выражения
- Непрерывные задачи (continuous): Запускаются постоянно, выполняясь последовательно с заданной задержкой между выполнениями
- Однократные задачи (once): Выполняются только один раз при загрузке конфигурации
Секция представляет собой словарь, в которой:
ключ
— имя задачи, которое используется при доступе к задаче через APIзначение
— конфигурация задачи
Конфигурация задачи состоит из следующих атрибутов:
exec
— описывает задачу, которую необходимо запускать по расписанию. Состоит из следующих атрибутов:kind
— тип задачи. На данный момент поддерживается только типimport
. Этот тип задачи позволяет запускать потребление данных пакетными импортерамиimporter
— наименование импортера, который должен совершать потребление данных по расписанию
schedule
— настройки расписания задачи. Состоит из следующих атрибутов:type
— тип расписания. Доступныcron
,continuous
иonce
. Для каждого типа доступен свой атрибут, задающие расписания. В зависимости от типа задачи запускаются следующим образом:cron
— в соответствии с расписанием, заданным cron-строкойcontinuous
— друг за другом через указанный промежуток времениonce
— один раз через небольшое время после загрузки конфигурации в платформу. Также может быть запущен через API
cron
— доступен и обязателен при указании типа расписанияcron
. Представляет собой cron-строку, начинающуюся с секунд. Полный формат выглядит следующим образом: секунды, минуты, часы, номер дня в текущем месяце, месяц, день недели, годinterval
— доступен и обязателен при указании типа расписанияcontinuous
. Представляет собой человекочитаемое количество времени, которое пройдет между окончанием предыдущего запуска задачи и началом следующего. Пример:1s
— для одной секунды,1m
— для одной минуты,1h30m
— для 90 минут
Пример:
tasks:
parquet_cron:
exec:
kind: import
importer: parquet1
schedule:
type: cron
schedule: "*/10 * * * * * *"
parquet_continuous:
exec:
kind: import
importer: parquet2
schedule:
type: continuous
interval: 1h40m
parquet_once:
exec:
kind: import
importer: parquet3
schedule:
type: once
Витрины данных (api
)¶
Секция API предназначена для описания витрин данных и конечных точек, по которым витрины данных доступны для пользователей. К каждой конечной точке должен быть задан обработчик запроса, который формирует необходимый пользователю срез данных. Обработчик может быть написан как на языке SQL и по сути представлять собой обычный параметризированный SQL-запрос, так и написанный на языке Lua.
Витрины возвращают 200 OK
и данные в случае успешного выполнения
запроса, и 500
во всех остальных случаях.
Секция представляет собой массив с описаниями витрин данных. Описание витрины данных состоит из следующих атрибутов:
path
(обязательно) — относительный URI конечной точки. Например:/v1/app/mydata
method
(обязательно) — HTTP-метод запроса. Доступны:GET
,POST
,PUT
иDELETE
handler
(обязательно) — настройки обработчика запроса. Представляет собой словарь, состоящую из следующих атрибутов:type
(обязательно) — тип обработчика витрины. Доступныsql
иlua
name
(обязательно) — имя обработчика запроса. Для типаlua
необходимо указать имя LUA-модуля, расположенного в директорииdatamarts
и предоставляющего требуемую функцию и имя самой функции, объединенные через точку. Например: если имя файла LUA-модуляdatamarts.lua
и он предоставляет функциюhandler
, то в атрибутеname
необходимо указатьdatamarts.handler
. Для типаsql
необходимо указать имя SQL-файла, находящегося в директорииdatamarts
. Например для файлаmy_sql_handler.sql
в атрибутеname
необходимо указатьmy_sql_handler
args
— словарь, в которой ключ — имя аргумента запроса, а значение — его описание. Аргументы запроса необходимо указывать для SQL-витрин, содержащих именованные аргументы в своем теле. Описание аргумента состоит из следующих атрибутов:type
(обязательно) — тип аргументаis_nullable
— если выставлено true, то аргумент не обязан присутствовать в запросе к витрине. Все аргументы могут быть необязательными
Пример конфигурации витрин:
api:
- path: /v1/app/lua
method: GET
handler:
type: lua
name: datamarts.get_destination
- path: /v1/app/relations
method: GET
handler:
type: sql
name: relations
args:
id: # имя аргумента
type: string # тип аргумента
is_nullable: false # признак обязательности аргумента. Если выставлено false, то аргумент — обязательный
apply_at:
type: decimal
is_nullable: true
SQL-обработчик для витрины¶
SQL-обработчик для витрины — это SQL-файл в директории datamarts
, который
предоставляет пользователю необходимый срез данных. Запрос может
содержать именованные переменные внутри директивы where
.
Пример файла relations.sql
с SQL-обработчиком витрины:
SELECT
t1.*,
t2.*
FROM
table1 t1
JOIN
table2 t2
ON
t1.id = t2.table1_id
WHERE
t1.id = %arg:id% and t1.apply_at = %arg:apply_at% -- именованные аргументы id и apply_at
Lua-обработчик для витрины¶
Lua-обработчик для витрины — это файл, написанный на Lua, в директории
datamarts
, который выставляет функцию обработчика.
Пример файла с lua-обработчиком витрины:
local errors = require('errors')
local json = require("json")
local RouterError = errors.new_class("RouterError")
return {
---Функция обрабатывает запрос от пользователя и возвращает
---объект с данными, которые будут возвращены пользователю
---@param request table Таблица с аргументами запроса.
---@return nil|table
---@return string|nil
get_destination = function(request)
local id = request["id"]
if not id then
return nil, RouterError:new("Query param id is mandatory")
end
local rv, err = sql.execute('select * from table_with_trigger_destination where user_id=?', {tonumber(id)});
if err then
return nil, RouterError:new("Failed to execute sql query: %s", err)
end
if not rv then
return nil, RouterError:new("Dest with id %s doesn't exist", id)
end
-- MUST RETURN JSON IN DATAMARTS
return json.encode(rv)
end
}
Примеры запросов к витринам данных¶
Запрос с аргументами:
curl http://kirovets.loc/v1/app/relations?id=id_35
HTTP/1.1 200 Ok
Connection: keep-alive
Content-length: 213
Content-type: application/json; charset=utf8
Server: Tarantool http (tarantool v2.11.5-232-g0acc1353f)
[
{
"apply_at": 1752601680,
"created": 1640928856,
"curr_id": 43,
"description": "Description for row 35",
"id": 2375,
"instance_id": 655,
"last_upd": "2024-07-04 15:59:27",
"row_id": "row_35",
"sum": "5802.60",
"table1_id": "id_35"
}
]
Настройки авторизации через внешний SSO (Keycloak)¶
Платформа позволяет включить внешнюю авторизацию для графического интерфейса и для конечных точек доступа к данным.
Cекция auth
конфигурации отвечает за настройки аутентификации через
OpenID Connect с использованием Keycloak в качестве провайдера
идентификации.
Параметры конфигурации:
issuer_url
(обязательный) — URL-адрес сервера аутентификации Keycloak, включающий realm (область). Формат:http(s)://<host>:<port>/realms/<realm_name>/
. Пример:http://localhost:8989/realms/kirovets_dev
. Должен соответствовать реально существующему realm в Keycloakredirect_url
(обязательный) — URL в вашем приложении, на который Keycloak будет перенаправлять пользователя после успешной аутентификации. Формат: полный URL, включая схему (http/https), хост и путь. Пример:http://localhost:8002/auth/user/callback
. Этот URL должен быть зарегистрирован в настройках клиента в Keycloakclient_id
(обязательный) — идентификатор клиента, зарегистрированного в Keycloak. Пример: "kirovets-dev". Должен точно соответствовать Client ID в настройках Keycloakclient_secret
(обязательный для confidential clients) — секретный ключ клиента, выданный Keycloak. Пример:QmM4gvSI4ENfZwBjzCdv6Dr4RxBxcDlM
Пример конфигурации:
auth:
issuer_url: "http://localhost:8989/realms/kirovets_dev"
redirect_url: "http://localhost:8002/auth/user/callback"
client_id: "kirovets-dev"
client_secret: "QmM4gvSI4ENfZwBjzCdv6Dr4RxBxcDlM"
Руководство по настройке Keycloak смотри в соответствующем приложении
Пользовательский интерфейс¶
Пользовательский интерфейс доступен по относительному пути /config
.
Например http://kirovets.loc/config. Он позволяет загрузить, скачать и
редактировать конфигурацию платформы. При открытии пользователем
страницы в редактор подгружается текущая версия конфигурации.
- кнопка
Upload
позволяет загрузить zip-архив с конфигурацией платформы - кнопка
Download
позволяет скачать zip-архив с конфигурацией. При этом скачивается та конфигурация, которая отображена в редакторе конфигурации, а не та, что фактически может быть на сервере в текущий момент - кнопка
Apply
позволяет применить конфигурацию из редактора к платформе
Развертывание платформы (deploy)¶
Деплой приложения kirovets производится из артефакта в виде tgz-архива при помощи плейбука Ansible.
Предварительные требования¶
- ОС Astra Linux 1.7 "Орел"
- ПО Ansible 2.14 установленное на машине, предназначенной для развертывания и администрирования приложения kirovets
Состав поставки¶
.
├── .roles каталог, содержащий роли Ansible, которые применяются при деплое.
│ └── picodata-ansible директория с ролью picodata-ansible: https://git.picodata.io/core/picodata-ansible.git.
│ роль предназначена для развертывания БД Picodata и плагинов для нее.
├── ansible.cfg настройки Ansible для текущего деплоя.
├── hosts.yaml файл с инвентарем Ansible. Его необходимо актуализировать перед деплоем.
├── packages директория, содержащая пакеты с БД Picodata
│ ├── picodata_25.2.2.0-1.7_amd64.deb
│ └── picodata-dbgsym_25.2.2.0-1.7_amd64.deb
├── plugin директория, содержащая плагины к БД Picodata
│ └── kirovets-ng-1.0.0.tar.gz
└── run.yml плейбук Ansible для деплоя
Первоначальное развертывание кластера БД Picodata¶
- В файле
hosts.yaml
в секцияхDC1
иDC2
необходимо указать актуальные адреса серверов - В файле
hosts.yaml
в секцииall.vars.tiers.storage.config.memtx.memory
указать объем памяти, необходимый для хранения данных на слое хранения - Запустить плейбук:
ansible-playbook -i hosts.yaml run.yml
Удаление кластера БД Picodata¶
ansible-playbook -i hosts.yaml run.yml -t remove
Обновление плагина или версии БД¶
- В файле
hosts.yaml
изменить пути до нового пакета с плагином или новых пакетов с БД - Удалить текущую установку:
ansible-playbook -i hosts.yaml run.yml -t remove
- Запустить установку новой версии:
ansible-playbook -i hosts.yaml run.yml
Мониторинг платформы¶
Мониторинг платформы осуществляется при помощи связки системы
визуализации телеметрии Grafana и системы агрегации телеметрии
Prometheus. Для получения телеметрии от платформы необходимо настроить
Prometheus на сбор телеметрии со всех узлов платформы. Каждый узел
предоставляет телеметрию по относительному URL /metrics
.
Для визуализации телеметрии предоставляется дашборд Grafana.
Приложение №1. Настройка OGG для взаимодействия с платформой "Кировец"¶
Настройка¶
-
На первом экстракторе в цепи нужно выставить следующие настройки:
GETUPDATEBEFORES UPDATERECORDFORMAT COMPACT
На pump-экстракторе выставляется:
```ggsci
GETUPDATEBEFORES
```
На OGG-экстракторе настройка должна выглядеть как-то так, важны параметры INCLUDEUPDATEBEFORES И GETUPDATEBEFORES:
```ggsci
EXTRACT ogg_extract
CUSEREXIT /u01/gg/flatfilewriter.so CUSEREXIT PASSTHRU, INCLUDEUPDATEBEFORES, PARAMS '/u01/gg/dirprm/ffwriter.properties'
GETUPDATEBEFORES
```
Эти настройки нужны, чтобы UPDATE-операции передавались в корректном формате(с предыдущими значениями).
-
В
ggsci
на стороне первого экстрактора введите команду следующего вида:ADD SCHEMATRANDATA <YOUR_SCHEMA_NAME>
.Через SQL-интерфейс базы Oracle введите запросы:
alter database add supplemental log data; alter database force logging;
-
В
*.properties
-файл на стороне OGG-импортера внесите следующие настройки:goldengate.flatfilewriter.writers=dsvwriter dsvwriter.mode=DSV dsvwriter.rawchars=false dsvwriter.includebefores=true dsvwriter.includecolnames=true dsvwriter.omitvalues=false dsvwriter.diffsonly=false dsvwriter.omitplaceholders=false dsvwriter.files.onepertable=true dsvwriter.files.data.ext=\_data.dsv dsvwriter.files.data.tmpext=\_data.dsv.temp dsvwriter.dsv.nullindicator.chars=NULL dsvwriter.dsv.fielddelim.chars=| dsvwriter.dsv.linedelim.chars=|\n dsvwriter.files.data.rollover.size=10000 writer.dsv.quotes.chars="
Проверка¶
После генерации DSV-файлов и до их передачи в платформу "Кировец" их стоит проверить на соответствие формату самостоятельно.
- INSERT-операция должна иметь все OLD-значения в NULL (или вообще их не указывать), а для всех ненулабельных полей
схемы иметь ненулевое NEW-значение. Пример корректной строки:
"ROW_ID"|NULL|"NEW_ROW_ID_1"|"CREATED"|NULL|"1988-09-25 17:45:30.00"|"CURR_STG_ID"|NULL|"NEW_STG_ID"|
- UPDATE-операция должна содержать только первичный ключ и обновленные поля. Пример корректной строки:
"ROW_ID"|"ROW_ID_1"|"ROW_ID_1"|"CREATED"|"2000-09-25 17:45:30.00"|"1988-09-25 17:45:30.00"|"CURR_STG_ID"|"PREV_STG_ID"|"NEW_STG_ID"|
- DELETE-операция должна содержать
NULL
во всех NEW-значениях полей (в т.ч. для первичного ключа). Пример корректной строки:"ROW_ID"|"ROW_ID_1"|NULL|"CREATED"||NULL|"NEW_STG_ID"||NULL|
Устранение неполадок¶
Негативным считается случай, когда UPDATE-операции приходят в OGG в виде:
"ROW_ID"|||"CREATED"|||"CREATED_BY"|||"CURR_STG_ID"|"1-00003"|"1-00004"|
Например, есть только обновленные поля и нет первичного ключа. Это в частности означает что не использована команда ADD SCHEMATRANDATA
.
Но может так случиться, что поля теряются где-то в середине цепочки
экспортеров. Это можно отладить через logdump
, см.
руководство
и
референс
к нему. Начиная с первого экспортера в цепи, нужно удостовериться что
trail сгенерирован при проблемном запросе, затем следует изучить его
содержимое через logdump
, включая при этом конфигурацию DETAIL DATA
.
Приложение №2. Настройка Keycloak для авторизации в "Кировце" через SSO¶
План настройки Keycloak:
- Создать реалм
- Создать клиента в реалме для "Кировца" — нужно включить
Standard Flow
, получитьClient Secret
вCredentials
- Создать пользователя (администратора "Кировца")" в реалме
- Задать пользователю пароль для аутентификации в UI (можно и любой другой способ аутентификации)
- Внести конфигурационные параметры в
auth
-секцию "Кировца" [Опционально]
Увеличить таймаутaccess_token
(для машинной аутентификации) иclient_session_max
(для пользовательской аутентификации)
Создание реалма¶
Создание клиента, связанного с инсталляцией "Кировца"¶
На финальном шаге создания клиента нужно указать "Valid redirect URIs" —
если поставить в "*", то будет возможно перенаправление на любые эндпоинты.
Пользователя всегда перенаправляет на redirect_url
, указанный в
конфигурации.
Необходимо получить Client Secret
, который в дальнейшем будет занесен в конфигурацию "Кировца":
Создание пользователя реалма¶
Пользователь — это, например, администратор. Пользователю нужно непосредственно проходить аутентификацию в UI согласно standard flow.
Необходимо задать пароль новому пользователю. Можно задать и иные формы credentials — показан самый тривиальный вариант с постоянным паролем, который задает администратор keycloak:
Внесение параметров в auth-секцию "Кировца"¶
Допустим, что Keycloak находится по адресу http://localhost:8989
, реалм назван kirovets_dev_2
, роутер доступен пользователю по адресу http://localhost:8081
, клиент для "Кировца" назван kirovets_dev_cluster
.
Получение client_secret
показано в финальной части пункта о клиенте.
auth:
issuer_url: "http://localhost:8989/realms/kirovets_dev_2"
redirect_url: "http://localhost:8081/auth/user/callback"
client_id: "kirovets_dev_cluster"
client_secret: "Myr1GE3l8iEdf6k9N0miyJS5Pli1Tdxk"
[опционально]
Увеличение таймаутов сессий¶
Таймауты сессий стоит увеличивать индивидуально для клиента — на вкладке Advanced
.
Access Token Lifespan
влияет на время жизни токена для машинной
аутентификации — чем больше это значение, тем реже необходимы действия
администратора сторонней системы.
Client Session Max
влияет на время жизни пользовательской сессии — чем
больше это значение, тем реже пользователю необходимо проходить
повторную аутентификацию при работе с "Кировцем".
Машинная аутентификация¶
Базовая настройка Keycloak и kirovets такая же, как у user-аутентификации.
Получить access_token
можно через service-аккаунт клиента в ручном режиме.
Пример с помощью httpie — используется
basic-аутентификация с логином client_id
и паролем client_secret
:
http -a kirovets_dev_cluster:Myr1GE3l8iEdf6k9N0miyJS5Pli1Tdxk -f POST http://localhost:8989/realms/kirovets_dev_2/protocol/openid-connect/token grant_type=client_credentials
Пример ответа:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI4NzdPWS1LNlNqTHpsR0RqNFdOR0o1YlI1bXM1OVZLdlB6bURsT3EyVk1nIn0.eyJleHAiOjE3MjM3MTMyODIsImlhdCI6MTcyMzcxMjk4MiwianRpIjoiOTI2ZDFiNGItMzQ0YS00N2I5LWI0MzMtODBjOWFhYjI0ZTgwIiwiaXNzIjoiaHR0cDovL2xvY2FsaG9zdDo4OTg5L3JlYWxtcy9raXJvdmV0c19kZXZfMiIsImF1ZCI6ImFjY291bnQiLCJzdWIiOiI1NTUzMzFlNC1iZTI4LTQ5ODQtOTZmMS01OGIzMDQ0MGI2MGUiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJraXJvdmV0c19kZXZfY2x1c3RlciIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMta2lyb3ZldHNfZGV2XzIiXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsImNsaWVudEhvc3QiOiIxNzIuMTguMC4xIiwicHJlZmVycmVkX3VzZXJuYW1lIjoic2VydmljZS1hY2NvdW50LWtpcm92ZXRzX2Rldl9jbHVzdGVyIiwiY2xpZW50QWRkcmVzcyI6IjE3Mi4xOC4wLjEiLCJjbGllbnRfaWQiOiJraXJvdmV0c19kZXZfY2x1c3RlciJ9.hGh8t7sP1TX8X0DIuLVtv1_fb_Pj0a5gXl59tjQ-YHWBekh2bcC2O9km_eP8jwl__C7F6g8KpdPpstNLKMmqUBHrTpSEM7btvKCwMv_FXXZZeVtGLF-9k769U0uPPvIjJCQBwY19MeugUNU4R7JPrq2JOJl6v61SpZsoEp-wyU50wgfvMkgJ3AS5fwLO1pJMAUQMM7qUfYnOILdA3mI6mSyf2Y9IMeMPUpy6Qx4Nt3h0dx4RXH8pFBw8Bwmua30hXPw5bh1WOjR4vFwvnd_hP9mnUDObZQuFhCl2JJ9Yylsh0eJXBLw-g6PF1SSJ6bzSbNAIt50i9dFGMSEo4nE1qw",
"expires_in": 300,
"not-before-policy": 0,
"refresh_expires_in": 0,
"scope": "email profile",
"token_type": "Bearer"
}
Этот access_token
необходимо передать в заголовке Authorization
в
форме Bearer <access_token>
при обращении к эндпоинтам "Кировца".
Пример обращения стороннего сервиса к "Кировцу" используя выданный токен:
http http://localhost:8081/v1/application/crud\?x_cdi_id\=5 'Authorization:Bearer ey...qw'
Получение access_token через сервисный аккаунт подробно описано в документации Keycloak.
Приложение №3. Пример конфигурации "Кировца"¶
config.yaml
---
schema: # секция, задающая схему данных. Согласно ей в системе будут созданы таблицы.
table1:
fields: # секция, описывающая поля таблицы
- name: id # имя поля
type: string # тип поля
is_nullable: false # если значение false, то поле является обязательным.
- name: row_id
type: string
is_nullable: false
- name: sum
type: decimal
is_nullable: true
- name: curr_id
type: unsigned
is_nullable: true
- name: instance_id
type: integer
is_nullable: true
- name: last_upd
type: string
is_nullable: true
- name: created
type: unsigned
is_nullable: true
indexes: # секция, содержащая описание индексов
pk: # имя индекса
fields: # список полей таблицы, входящих в индекс
- id
by_org_id:
unique: false # признак уникальности индекса. Если выставлено false, то данные в индексе могут быть неуникальными
fields:
- curr_id
sources: # источники данных для таблицы
ogg: # имя источника. Должно совпадать с объявлением источника в секции importers
table: TABLE1 # из какой таблицы, выгруженной через OGG, будут импортированы данные
parquet:
files: # откуда будут получены parquet-файлы для импорта данных
type: "dir" # все файлы из указанной директории
dir: dev/example_data/parquets/table1 # путь до директории с файлами
affinity: # задает локальность данных. Все данные, значения указанного поля или полей
# которых равны, будут помещены на один и тот же узел хранения.
# Если в качестве поля для локальности не используется первичный индекс, то
# гарантия уникальности для первичного индекса распространяется только на
# один узел хранения.
- id
table2:
fields:
- name: id
type: unsigned
is_nullable: false
- name: table1_id
type: string
is_nullable: false
- name: description
type: string
is_nullable: true
indexes:
pk:
fields:
- id
table1_rel:
fields:
- table1_id
sources:
ogg:
table: TABLE2
parquet:
files:
type: "dir"
dir: dev/example_data/parquets/table2
affinity:
- table1_id
table_with_trigger_source:
fields:
- name: id
type: unsigned
is_nullable: false
- name: user_id
type: unsigned
is_nullable: false
- name: value
type: unsigned
is_nullable: false
indexes:
pk:
fields:
- id
affinity:
- user_id
sources:
ogg:
table: TABLE_WITH_TRIGGER_SOURCE
triggers: # список имен функций, которые будут применены к каждой измененной, добавленной
# или удаленной записи. Указываются в формате <file_name>.<func_name>.
- triggers.update_destination
table_with_trigger_destination:
fields:
- name: user_id
type: unsigned
is_nullable: false
- name: value
type: unsigned
is_nullable: false
indexes:
pk:
fields:
- user_id
affinity:
- user_id
table_with_filter:
fields:
- name: id
type: unsigned
is_nullable: false
- name: value
type: unsigned
is_nullable: false
- name: updated_at
type: unsigned
is_nullable: false
indexes:
pk:
fields:
- id
affinity:
- id
sources:
ogg:
table: TABLE_WITH_FILTER
filter: filters.filter_by_updated_at # имя функции, которая будет применена для каждой записи перед ее изменением,
# удалением или добавлением. Указывается в формате <file_name>.<func_name>.
# При помощи функции можно решить, применять ли изменение данных.
importers: # секция, описывающая импортеры данных
ogg: # наименование импортера. Используется в секциях sources таблиц и в расписании задач
type: "ogg" # тип импортера. На данный момент существует поддержка
# Oracle Golden Gate через Flat Files Adapter и файлов в формате parquet.
file_dir: dev/example_data/ogg # директория, в которой находятся control- и dsv-файлы, выгруженные через OGG
max_batch_size: 1000 # управляет размером пакетов записей, которые отправляются
# из сервиса importer в слой хранения
parquet:
type: "parquet"
tasks: # секция содержит расписание выполнения задач
parquet: # наименование задачи
exec:
kind: import # тип импортера. На данный момент поддерживается только тип import
importer: parquet # наименование импортера. Должно совпадать с наименованием в секции importers
schedule:
type: cron # тип расписания. Поддерживаются три типа:
# cron — расписание в виде cron
# continuous — задача выполняется через указанное количество времени после завершения предыдущей
# once — задача выполняется один раз после применения конфигурации
schedule: "*/10 * * * * * *" # строка в формате cron с расписанием.
# состоит из следующих полей: sec, min, hour, day of month, month, day of week, year
api: # список конечных точек API, доступных пользователям
- path: /v1/app/lua # путь конечной точки. Может быть произвольным за исключением
# нескольких зарезервированных путей.
method: GET # HTTP-метод, которым можно вызвать витрину через конечную точку.
handler:
type: lua # тип выставляемой функции. Доступны lua и sql
name: datamarts.get_destination # имя выставляемой функции. Для lua-функций указывается в формате <file_name>.<func_name>,
# где <file_name> — имя файлв, расположенного в каталоге datamarts и содержащего код выставляемой функции <func_name>.
# (см. пример файоа datamarts.lua ниже)
- path: /v1/app/relations
method: GET
handler:
type: sql
name: relations # имя файла, расположенного в директории datamarts в формате <name>.sql.
# В этом файле должен присутствовать SQL-запрос (см. пример файла relations.sql ниже).
args: # список аргументов. В запросе используется специальное обозначение
# для переменных. Переменные задаются при запросе через конечную точку
# и подставляются в запрос.
id: # имя аргумента
type: string # тип аргумента
is_nullable: false # признак обязательности аргумента. Если выставлено false, то аргумент — обязательный
apply_at: # Имя аргумента
type: decimal
is_nullable: true
# Example AUTH section
#auth:
# issuer_url: "http://localhost:8989/realms/kirovets_dev"
# redirect_url: "http://localhost:8002/auth/user/callback"
# client_id: "kirovets-dev"
# client_secret: "QmM4gvSI4ENfZwBjzCdv6Dr4RxBxcDlM"
...
datamarts/datamarts.lua
local errors = require('errors')
local json = require("json")
local RouterError = errors.new_class("RouterError")
return {
---Функция обрабатывает запрос от пользователя и возвращает
---объект с данными, которые будут возвращены пользователю
---@param request table Таблица с аргументами запроса.
---@return nil|table
---@return string|nil
get_destination = function(request)
local id = request["id"]
if not id then
return nil, RouterError:new("Query param id is mandatory")
end
local rv, err = sql.execute('select * from table_with_trigger_destination where user_id=?', {tonumber(id)});
if err then
return nil, RouterError:new("Failed to execute sql query: %s", err)
end
if not rv then
return nil, RouterError:new("Dest with id %s doesn't exist", id)
end
-- MUST RETURN JSON IN DATAMARTS
return json.encode(rv)
end
}
datamarts/filters.lua
local log = require('log')
return {
---Функция принимает решение о применении операции пришедшей
---из источника данных. Не влияет на операции, происходящие
---через механизм триггеров. Если функция возвращает true,
---то изменения применяются, если false — то нет.
---@param old table Объект с текущими данными. Данные можно получать по именам полей.
---@param new table Объект с данными, которые будут применены.
---@return boolean
filter_by_updated_at = function(old, new)
if old.id == nil then
return true
end
if old.updated_at >= new.updated_at then
log.verbose("Skip update operation for id %s", old.id)
return false
end
return true
end
}
datamarts/relations.sql
SELECT
t1.*,
t2.*
FROM
table1 t1
JOIN
table2 t2
ON
t1.id = t2.table1_id
WHERE
t1.id = %arg:id% and t1.apply_at = %arg:apply_at%
datamarts/triggers.lua
local log = require('log')
local fiber = require('fiber')
return {
---Функция может выполнять любые действия, в том числе и запись
---данных в другие таблицы.
---@param old table Кортеж, содержащий значения полей старого объекта. Будет равняться null при операциях вставки
---@param new table Кортеж, содержащий значения полей нового объекта. Будет равняться null при операциях удаления
---@param operation string Наименование операциии: create/update/delete
---@return boolean|nil error
update_destination = function(old, new, operation)
log.error({old, new, operation})
if operation == 'delete' then
return true
end
local rv, err = sql.execute('select * from table_with_trigger_destination where user_id=?', {new.user_id});
if err then
return nil, err
end
local user = rv[1]
if not user then
log.verbose("Object with id %s doesn't exist", new.user_id)
return sql.execute(
"insert into table_with_trigger_destination (user_id, value, apply_at) values (?, ?, ?)",
{
new.user_id,
new.value,
--- Поле находится в каждой таблице и является обязательным.
--- Оно представляет собой время обновления объекта
math.floor(fiber.time())
}
)
end
log.verbose("Update user with id %s with value %s", new.user_id, new.value)
return sql.execute("update table_with_trigger_destination set value = ?, apply_at = ? where user_id = ?", {
new.value,
math.floor(fiber.time()),
new.user_id,
})
end
}