Создание плагина¶
В данном разделе приведено руководство для написания плагинов к Picodata с помощью команд языка SQL.
Возможности плагинов¶
С помощью плагинов разработчик может добавить практически любую дополнительную функциональность к распределенной СУБД Picodata. Условием выступает лишь написание валидного Rust-кода. В свою очередь, Picodata предоставляет API плагинов — фреймворк для создания распределенных приложений, поддерживающих работу во всем кластере СУБД.
Технически, реализация плагина — это написание набора callback'ов, которые представлены трейтом Service. Реализовав этот трейт, разработчик получает "строительные кирпичики" плагина — сервисы. Их стоит рассматривать как классические web-микросервисы, из которых можно построить законченную систему.
Пример разработки тестового плагина¶
Функции тестового плагина¶
В данном руководстве мы попробуем создать плагин, который будет использовать Picodata в роли кэширующей БД, хранящей данные о погоде. Мы предоставим HTTP API, который позволит обрабатывать запросы к публичному сервису OpenWeather, получая от него текущую температуру по географическим координатам. При этом температуру по заданным координатам мы будем кэшировать и сохранять в базе данных и, если к нам придет еще один запрос с такими же координатами, мы не будем совершать еще один запрос к OpenWeather, а отдадим кешированное значение. Для упрощения нашего примера мы не будем инвалидировать кеш.
Разработка компонентов плагина¶
Сервис¶
Основой плагина является сервис, с которого мы и начнем. Сервис для плагина необходимо написать на Rust, поэтому первым делом инициализируем крейт:
mkdir weather_cache && cd weather_cache
cargo init --lib
Обратите внимание на флаг --lib
— мы будем собирать библиотеку вида
*.so
или *.dylib
, а значит и крейт сразу нужно инициализировать
соответствующим образом. Также сразу добавим строки для сборки
*.so
-файла в Cargo.toml
:
[lib]
crate-type = ["lib", "cdylib"]
Далее добавим наш SDK в проект:
cargo add picodata_plugin
Выполняемая сервисом логика будет находиться в файле
src/lib.rs
:
use picodata_plugin::plugin::prelude::*;
struct WeatherService;
impl Service for WeatherService {
type Config = ();
fn on_config_change(
&mut self,
_ctx: &PicoContext,
_new_cfg: Self::Config,
_old_cfg: Self::Config,
) -> CallbackResult<()> {
println!("I got a new config: {_new_cfg:?}");
Ok(())
}
fn on_start(&mut self, _ctx: &PicoContext, _cfg: Self::Config) -> CallbackResult<()> {
println!("I started with config: {_cfg:?}");
Ok(())
}
fn on_stop(&mut self, _ctx: &PicoContext) -> CallbackResult<()> {
println!("I stopped with config");
Ok(())
}
/// Called after replicaset master is changed
fn on_leader_change(&mut self, _ctx: &PicoContext) -> CallbackResult<()> {
println!("Leader has changed!");
Ok(())
}
}
impl WeatherService {
pub fn new() -> Self {
WeatherService {}
}
}
#[service_registrar]
pub fn service_registrar(reg: &mut ServiceRegistry) {
reg.add("weather_service", "0.1.0", WeatherService::new);
}
Посмотрим на этот код подробнее, а именно на реализацию трейта Service
.
Первое на что нужно обратить внимание — это тип Config
. Он позволит
описать конфигурацию сервиса — например, адреса внешних узлов или
значения таймаутов. Мы пока не хотим настраивать кэширование,
поэтому определим тип пустым.
Перейдем к функциям сервиса, которые необходимо реализовать:
-
сервис начинает свою жизнь с вызова
on_start
. Эта функция будет вызвана каждым узлом, где включается сервис, или при вводе нового узла, где он должен быть запущен. При изменении конфигурации плагина будет вызвана функцияon_config_change
, а старая и новые конфигурации будут переданы в качестве параметров. -
о смене лидера в репликасете информирует вызов функции
on_leader_change
, что позволяет реагировать и запускать / останавливать фоновые операции. Корректное завершение работы (gracefull shutdown
) мы можем отработать в функцииon_stop
.
Пока что мы оставим тут заглушки и проверим, что наш минимальный плагин загружается и пишет что-то в журнал.
Манифест¶
Теперь нам необходимо как-то описать для Picodata правила загрузки
плагина. Это делается при помощи манифеста — файла, который
описывает составляющие части плагина и предоставляет Picodata необходимую
для установки и запуска метаинформацию. Можно представить, что это
аналог cargo.toml
в пакетном менеджере Cargo
или package.json
в
npm
.
Создадим файл манифеста:
# Имя плагина
name: weather_cache
# Описание плагина. Это метаданные, которые сейчас не используются Picdata, но могут администратору системы при установке или изучении установленных в кластер плагинов
description: That one is created as an example of Picodata's plugin
# Версия плагина в формате semver. Picodata следит за установленными версиями плагинов и плагины с разными версиями — это разные объекты для Пикодаты
version: 0.1.0
# Список сервисов. Так как наш плагин не слишком сложный, нам хватит одного сервиса для реализации задуманного.
services:
# Имя сервиса
— name: weather_service
# Описание сервиса. Не используется внутри Picodata, но могут помочь администраторам системы
description: This service provides HTTP route for a throughput weather cache
# Конфигурация сервиса по умолчанию
default_configuration:
Пробный запуск¶
Соберем сервис:
cargo build
Теперь нужно правильно организовать размещение файлов плагина и
манифеста. Picodata выполняет поиск плагина в plugin_path с учетом его
имени и версии, используя следующую структуру пути:
<plugin-dir>/<plugin-name>/<plugin-version>
. Для нашего плагина это
будет выглядеть как <plugin-dir>/weather_cache/0.1.0
.
mkdir -p build/weather_cache/0.1.0
cp target/debug/libweather_cache.so build/weather_cache/0.1.0
cp manifest.yaml build/weather_cache/0.1.0
Должна получиться такая структура:
build
└── weather_cache
└── 0.1.0
├── libweather_cache.so
└── manifest.yaml
Теперь запустим Picodata с поддержкой плагинов и затем попробуем запустить наш плагин.
Запуск Picodata:
picodata run -l 127.0.0.1:3301 --advertise 127.0.0.1:3301 --peer 127.0.0.1:3301 --http-listen 127.0.0.1:8081 --data-dir i1 --plugin-dir build
Запуск плагина:
$ picodata admin i1/admin.sock
Connected to admin console by socket path "i1/admin.sock"
type '\help;' for interactive help
picodata> CREATE PLUGIN weather_cache 0.1.0;
1
picodata> ALTER PLUGIN weather_cache 0.1.0 ADD SERVICE weather_service TO TIER default;
1
picodata> ALTER PLUGIN weather_cache 0.1.0 ENABLE;
1
После этого в журнале Picodata появится строка о том, что наш плагин ожил и запустился.
I started with config: ()
Попробуем выключить и удалить плагин:
picodata> ALTER PLUGIN weather_cache 0.1.0 DISABLE;
1
picodata> DROP PLUGIN weather_cache 0.1.0;
1
Добавление миграций¶
Вернемся к задаче кэширования (наша цель — сохранять результаты запросов
к внешнему сервису). Так как Picodata — это, в первую очередь, СУБД, нам
стоит создать для этого таблицу. Система плагинов в Picodata позволяет
создавать необходимые служебные таблицы для каждого плагина при помощи
механизма миграций. Для этого надо написать SQL-команды,
которые необходимо выполнить при установке плагина, а также те, которые
необходимы для удаления этих таблиц (при удалении плагина). У нас
получится файл 0001_weather.sql
:
-- pico.UP
CREATE TABLE "weather" (
id UUID NOT NULL,
latitude NUMBER NOT NULL,
longitude NUMBER NOT NULL,
temperature NUMBER NOT NULL,
PRIMARY KEY (id)
)
USING memtx
DISTRIBUTED BY (latitude, longitude);
-- pico.DOWN
DROP TABLE "weather";
В этом файле есть специальные аннотации — -- pico.UP
и -- pico.DOWN
.
Именно они помечают, какие команды выполнять на установке (UP
) и
удалении (DOWN
). Теперь необходимо положить файл миграций в директорию
с плагином и добавить его в манифест, чтобы Picodata могла знать, откуда
его загрузить:
Примечание
Не забудьте также отредактировать манифест в plugin_path, а не только в репозитории
name: weather_cache
description: That one is created as an example of Picodata's plugin
version: 0.1.0
services:
— name: weather_service
description: This service provides HTTP route for a throughput weather cache
default_configuration:
openweather_timeout: 5
migration:
— 0001_weather.sql
Добавим файл миграций в plugin_path:
cp 0001_weather.sql build/weather_cache/0.1.0
Теперь еще раз установим плагин, но в этот раз также запустим добавленные миграции:
$ picodata admin i1/admin.sock
Connected to admin console by socket path "i1/admin.sock"
type '\help;' for interactive help
picodata> CREATE PLUGIN weather_cache 0.1.0;
1
picodata> ALTER PLUGIN weather_cache 0.1.0 ADD SERVICE weather_service TO TIER default;
1
picodata> ALTER PLUGIN weather_cache MIGRATE TO 0.1.0;
1
picodata> ALTER PLUGIN weather_cache 0.1.0 ENABLE;
1
Убедимся, что была создана таблица weather
:
picodata> SELECT * FROM weather;
+----+----------+-----------+-------------+
| id | latitude | longitude | temperature |
+=========================================+
+----+----------+-----------+-------------+
(0 rows)
Снова очистим кластер, но на этот раз также удалим созданные миграциями таблицы:
picodata> ALTER PLUGIN weather_cache 0.1.0 DISABLE;
1
picodata> DROP PLUGIN weather_cache 0.1.0 WITH DATA;
1
Убедимся в успешности DOWN
-миграции:
picodata> SELECT * FROM weather;
sbroad: table with name "weather" not found
Теперь попробуем поднять HTTP-сервер.
Чтобы не писать код для FFI между Lua и Rust, давайте возьмем
готовое решение — библиотеку shors
.
Это библиотека позволит нам создать HTTP-сервер в плагине,
инкапсулируя связывание двух разных языков внутри:
cargo add shors@0.12.1 --features picodata
И добавим код для инициализации HTTP endpoint, который вернет нам
Hello, World!
в callback on_start
:
fn on_start(&mut self, _ctx: &PicoContext, _cfg: Self::Config) -> CallbackResult<()> {
println!("I started with config: {_cfg:?}");
let endpoint = Builder::new()
.with_method("GET")
.with_path("/hello")
.build(
|_ctx: &mut Context, _: Request| -> Result<_, Box<dyn Error>> {
Ok("Hello, World!".to_string())
},
);
let s = server::Server::new();
s.register(Box::new(endpoint));
Ok(())
}
Далее добавим endpoint, который будет осуществлять запрос к Openweather.
Для этого мы воспользуемся еще одной библиотекой, которая предоставит
нам HTTP клиент — fibreq
. Мы не можем использовать популярные HTTP-клиенты,
например, reqwest
, из-за особенностей однопоточной среды выполнения Picodata —
необходимо использовать библиотеки со специальной реализацией
асинхронного ввода/вывода.
cargo add fibreq@0.1.8 --features picodata
Также нам понадобится serde
и serde-json
для сериализации JSON в HTTP-запросах:
cargo add serde
cargo add serde-json
Добавим запрос к OpenWeather в отдельном файле openweather.rs
:
use std::{error::Error, time::Duration};
use serde::Deserialize;
use serde::Serialize;
#[derive(Serialize, Deserialize, Clone, Debug)]
struct CurrentWeather {
temperature_2m: f64,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct WeatherInfo {
latitude: f64,
longitude: f64,
current: CurrentWeather,
}
static METEO_URL: once_cell::sync::Lazy<String> = once_cell::sync::Lazy::new(|| {
std::env::var("METEO_URL").unwrap_or(String::from("https://api.open-meteo.com"))
});
pub fn weather_request(latitude: f64, longitude: f64, request_timeout: u64) -> Result<WeatherInfo, Box<dyn Error>> {
let http_client = fibreq::ClientBuilder::new().build();
let http_req = http_client
.get(format!(
"{url}/v1/forecast?\
latitude={latitude}&\
longitude={longitude}&\
current=temperature_2m",
url = METEO_URL.as_str(),
latitude = latitude,
longitude = longitude
))?;
let mut http_resp = http_req.request_timeout(Duration::from_secs(request_timeout)).send()?;
let resp_body = http_resp.text()?;
let info = serde_json::from_str(&resp_body)?;
return Ok(info)
}
и изменим код нашего сервиса следующим образом:
fn on_start(&mut self, _ctx: &PicoContext, _cfg: Self::Config) -> CallbackResult<()> {
println!("I started with config: {_cfg:?}");
let hello_endpoint = Builder::new()
.with_method("GET")
.with_path("/hello")
.build(
|_ctx: &mut Context, _: Request| -> Result<_, Box<dyn Error>> {
Ok("Hello, World!".to_string())
},
);
#[derive(Serialize, Deserialize)]
pub struct WeatherReq {
latitude: i8,
longitude: i8,
}
let weather_endpoint = Builder::new()
.with_method("POST")
.with_path("/weather")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let req: WeatherReq = request.parse()?;
let res = openweather::weather_request(req.latitude, req.longitude, 3)?;
Ok(res)
},
);
let s = server::Server::new();
s.register(Box::new(hello_endpoint));
s.register(Box::new(weather_endpoint));
Ok(())
}
Запустим наш сервис и проверим его работоспособность следующим запросом:
curl --location '127.0.0.1:8081/weather' \
--header 'Content-Type: application/json' \
--data '{
"latitude": 55,
"longitude": 55
}'
Теперь необходимо добавить кеш. Здесь мы воспользуемся Picodata
как СУБД.
Для начала добавим структуру, которая хранится в БД:
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Weather {
latitude: f64,
longitude: f64,
temperature: f64,
}
Теперь напишем запрос извлечения ее из БД:
let SELECT_QUERY: &str = r#"
SELECT * FROM "weather"
WHERE
(latitude < (? + 0.5) AND latitude > (? - 0.5))
AND
(longitude < (? + 0.5) AND longitude > (? - 0.5));
"#;
let cached: Vec<Weather> = picodata_plugin::sql::query(&SELECT_QUERY)
.bind(latitude)
.bind(latitude)
.bind(longitude)
.bind(longitude)
.fetch::<Weather>()
.map_err(|err| format!("failed to retrieve data: {err}"))?;
let select_query: &str = r#"
SELECT * FROM "weather"
WHERE
(latitude < (? + 0.5) AND latitude > (? - 0.5))
AND
(longitude < (? + 0.5) AND longitude > (? - 0.5));
"#;
let res = picoplugin::sql::query(&select_query)
.bind(latitude)
.bind(latitude)
.bind(longitude)
.bind(longitude)
.fetch::<StoredWeatherInfo>()
.unwrap();
Примечание
При запросе мы сравниваем широту и долготу с ограниченной точностью, так как это числа с плавающей точкой
Аналогично напишем запрос на вставку в кэш после получения данных:
let INSERT_QUERY: &str = r#"
INSERT INTO "weather"
VALUES(?, ?, ?)
"#;
let _ = picodata_plugin::sql::query(&INSERT_QUERY)
.bind(resp.latitude)
.bind(resp.longitude)
.bind(resp.temperature)
.execute()
.map_err(|err| format!("failed to retrieve data: {err}"))?;
После этого необходимо добавить только проверку — нашли ли мы необходимые данные
БД или необходимо запросить данные с OpenWeather. Наш on_start
будет выглядеть так:
fn on_start(&mut self, _ctx: &PicoContext, _cfg: Self::Config) -> CallbackResult<()> {
println!("I started with config: {_cfg:?}");
let hello_endpoint = Builder::new().with_method("GET").with_path("/hello").build(
|_ctx: &mut Context, _: Request| -> Result<_, Box<dyn Error>> {
Ok("Hello, World!".to_string())
},
);
#[derive(Serialize, Deserialize)]
pub struct WeatherReq {
latitude: f64,
longitude: f64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Weather {
latitude: f64,
longitude: f64,
temperature: f64,
}
let weather_endpoint = Builder::new()
.with_method("POST")
.with_path("/weather")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let req: WeatherReq = request.parse()?;
let latitude = req.latitude;
let longitude = req.longitude;
let cached: Vec<Weather> = picodata_plugin::sql::query(&SELECT_QUERY)
.bind(latitude)
.bind(latitude)
.bind(longitude)
.bind(longitude)
.fetch::<Weather>()
.map_err(|err| format!("failed to retrieve data: {err}"))?;
if !cached.is_empty() {
let resp = cached[0].clone();
return Ok(resp);
}
let openweather_resp =
openweather::weather_request(req.latitude, req.longitude, 3)?;
let resp: Weather = Weather {
latitude: openweather_resp.latitude,
longitude: openweather_resp.longitude,
temperature: openweather_resp.current.temperature_2m,
};
let _ = picodata_plugin::sql::query(&INSERT_QUERY)
.bind(resp.latitude)
.bind(resp.longitude)
.bind(resp.temperature)
.execute()
.map_err(|err| format!("failed to retrieve data: {err}"))?;
Ok(resp)
},
);
let s = server::Server::new();
s.register(Box::new(hello_endpoint));
s.register(Box::new(weather_endpoint));
Ok(())
}
Разработка тестового плагина завершена. Его исходный код можно найти на нашем Gitlab.
См. также: