Apache Kafka
DaJet Script реализует работу с топиками Apache Kafka при помощи двух команд: CONSUME и PRODUCE. Концепция их работы аналогична одноимённым командам RabbitMQ.
Внимание! Специальной строки подключения не используется! Указывается только идентификатор адаптера: kafka.
Реализация DaJet Script для работы с Apache Kafka имеет самый простой из возможных вариантов. Предполагается работа только с текстовыми значениями в кодировке UTF-8
. Работа с protobuf
или иными бинарными форматами не реализована по причине малого спроса (обращений) со стороны пользователей DaJet.
Как обычно гарантируется доставка сообщений на уровне at-least-once-in-order (минимум один раз строго по порядку). Дублирование (повторную отправку) сообщений DaJet Script не контролирует. Для этого следует использовать, либо логику приложения, либо возможности самого брокера Apache Kafka.
Ниже следующие скрипты DaJet Script демонстрируют основные сценарии использования адаптера Apache Kafka при помощи соответствующих команд CONSUME и PRODUCE, используя возможные, но не обязательные для всех случаев, варианты настройки.
Настройку адаптера для конкретного сценария следует выполнять согласно официальной документации:
Таблица свойств записей (сообщений) Apache Kafka
Свойство | Тип данных | Описание |
---|---|---|
Key | string | Ключ записи |
Value | string | Тело записи |
Topic | string | Наименование топика |
Важно! Предполагается использование текстовых значений в кодировке
UTF-8
.
Отправка сообщений в топик Apache Kafka
DECLARE @changes object -- Запись таблицы регистрации изменений
DECLARE @message object -- Данные элемента справочника "Номенклатура"
-- ********************************************************************************
-- * Источник сообщений, таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************
USE 'mssql://server/database'
DECLARE @Получатель entity = SELECT Ссылка
FROM ПланОбмена.ПланОбменаРИБ
WHERE Код = 'KAFKA'
AND ПометкаУдаления = false
CONSUME TOP 1000 Ссылка INTO @changes
FROM Справочник.Номенклатура.Изменения -- Таблица регистрации изменений
WHERE УзелОбмена = @Получатель -- плана обмена 1С:Предприятие 8
ORDER BY Ссылка ASC
SELECT Ссылка = UUIDOF(Ссылка)
, Код = LTRIM(RTRIM(Данные.Код))
, Наименование = Данные.Наименование
INTO @message
FROM Справочник.Номенклатура
WHERE Ссылка = @changes.Ссылка
-- *******************************************************
-- * Приёмник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************
PRODUCE 'kafka'
WITH acks = 'all'
, client.id = 'dajet'
, max.in.flight = 1
, bootstrap.servers = '192.168.238.182:9092' -- (csv)
, message.timeout.ms = 30000
, enable.idempotence = false
SELECT Key = 'Справочник.Номенклатура' -- Ключ записи
, Value = JSON(@message) -- Тело сообщения
, Topic = 'test-topic' -- Топик Apache Kafka
END -- Контекст базы данных источника
Получение сообщений из топика Apache Kafka
DECLARE @message object -- Запись (сообщение) Apache Kafka
-- *****************************************************
-- * Источник сообщений Apache Kafka, топик test-topic *
-- *****************************************************
CONSUME 'kafka'
WITH topic = 'test-topic'
, group.id = 'dajet'
, client.id = 'dajet'
, bootstrap.servers = '192.168.239.177:9092' -- csv
, enable.auto.commit = false
, auto.offset.reset = 'earliest'
, session.timeout.ms = 60000
, heartbeat.interval.ms = 20000
INTO @message
-- ************************************************************************
-- * Приёмник сообщений SQL Server - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE 'mssql://server/database'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, ОтметкаВремени = NOW()
, Отправитель = @message.Key
, ТипСообщения = @message.Topic
, ТелоСообщения = @message.Value
END -- Контекст базы данных приёмника