Команда CONSUME
- Таблица свойств потребляемого сообщения RabbitMQ
- Пример кода DaJet Script
- Обработка пользовательских заголовков
Поведение команды CONSUME, реализующей потребление сообщений RabbitMQ, с точки зрения концепции потоковой обработки данных DaJet Script аналогично поведению команды CONSUME для баз данных.
CONSUME 'amqp://<username>:<password>@<server>:<port>/<virtual-host>'
WITH <options>
INTO <variable>
<username> - имя пользователя
<password> - пароль пользователя
<server> - адрес сервера RabbitMQ
<port> - порт сервера RabbitMQ
<virtual-host> - виртуальный хост сервера RabbitMQ
<options> - параметры команды CONSUME (смотри ниже)
Строка подключения к брокеру RabbitMQ указывается в формате URL, следовательно, все специфические символы, например в пароле пользователя, должны быть указаны в URL-кодировке.
Таблица параметров команды CONSUME
Свойство | Тип данных | Описание |
---|---|---|
QueueName | string | Наименование очереди для получения сообщений |
Heartbeat | number | Период проверки в секундах наличия подключения к RabbitMQ. В случае его потери - автоматическое восстановление. Кроме этого данное значение используется для периодического вывода количества обработанных (успешно полученных) сообщений в журнал утилиты dajet. |
PrefetchSize | number | Размер клиентского буфера в байтах. Значение по умолчанию: неограниченно. |
PrefetchCount | number | Количество сообщений, которые могут быть отправлены сервером без подтверждения. Значение по умолчанию: 1. Документация RabbitMQ Значение 1 выбрано DaJet Script для обеспечения гарантий доставки at-least-once-in-order как наиболее простой и достаточный в большинстве случаев вариант, в том числе с точки зрения производительности. |
На заметку: хорошая статья, объясняющая основные принципы оптимизации потребления сообщений при помощи параметра PrefetchCount.
Таблица свойств потребляемого сообщения RabbitMQ
Свойство | Тип данных | Описание |
---|---|---|
AppId | string | Наименование отправителя |
MessageId | string | Идентификатор сообщения |
Type | string | Тип сообщения |
Body | string | Тело сообщения: DaJet Script ориентируется на текстовые сообщения в кодировке UTF-8. |
ContentType | string | Тип содержимого тела сообщения. Значение по умолчанию: application/json |
ContentEncoding | string | Формат (кодировка) тела сообщения. Значение по умолчанию: UTF-8 |
ReplyTo | string | Адресат для обратной связи, определяемый логикой приложения. |
CorrelationId | string | Идентификатор корреляции сообщений между собой, определяемый логикой приложения. |
Headers | object | Пользовательские заголовки (смотри документацию ниже) |
Пример кода DaJet Script
DECLARE @message object -- Сообщение RabbitMQ
CONSUME 'amqp://guest:guest@localhost:5672/dajet'
WITH QueueName = 'test-queue', Heartbeat = 10
INTO @message
USE 'mssql://server/database'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @message.AppId
, ТипСообщения = @message.Type
, ТелоСообщения = @message.Body
END
Обработка пользовательских заголовков
Команда CONSUME получает пользовательские заголовки сообщения RabbitMQ в виде значения типа object
. В ниже следующем примере заголовки сообщения сериализуются в формат JSON и сохраняются в ресурс “Заголовки” регистра сведений “ВходящиеСообщения”.
DECLARE @message object -- Сообщение RabbitMQ
DECLARE @headers string -- Заголовки в формате JSON
CONSUME 'amqp://guest:guest@localhost:5672/dajet'
WITH QueueName = 'test-queue', Heartbeat = 10
INTO @message
IF @message.Headers = NULL
THEN SET @headers = 'нет заголовков'
ELSE SET @headers = JSON(@message.Headers)
END
USE 'mssql://server/database'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @message.AppId
, Заголовки = @headers
, ТипСообщения = @message.Type
, ТелоСообщения = @message.Body
END