Skip to the content.

Назад

Команда CONSUME

Поведение команды CONSUME, реализующей потребление сообщений RabbitMQ, с точки зрения концепции потоковой обработки данных DaJet Script аналогично поведению команды CONSUME для баз данных.

CONSUME 'amqp://<username>:<password>@<server>:<port>/<virtual-host>'
   WITH <options>
   INTO <variable>

<username> - имя пользователя
<password> - пароль пользователя
<server> - адрес сервера RabbitMQ
<port> - порт сервера RabbitMQ
<virtual-host> - виртуальный хост сервера RabbitMQ
<options> - параметры команды CONSUME (смотри ниже)

Строка подключения к брокеру RabbitMQ указывается в формате URL, следовательно, все специфические символы, например в пароле пользователя, должны быть указаны в URL-кодировке.

Таблица параметров команды CONSUME

Свойство Тип данных Описание
QueueName string Наименование очереди для получения сообщений
Heartbeat number Период проверки в секундах наличия подключения к RabbitMQ. В случае его потери - автоматическое восстановление. Кроме этого данное значение используется для периодического вывода количества обработанных (успешно полученных) сообщений в журнал утилиты dajet.
PrefetchSize number Размер клиентского буфера в байтах.
Значение по умолчанию: неограниченно.
PrefetchCount number Количество сообщений, которые могут быть отправлены сервером без подтверждения.
Значение по умолчанию: 1. Документация RabbitMQ
Значение 1 выбрано DaJet Script для обеспечения гарантий доставки at-least-once-in-order как наиболее простой и достаточный в большинстве случаев вариант, в том числе с точки зрения производительности.

На заметку: хорошая статья, объясняющая основные принципы оптимизации потребления сообщений при помощи параметра PrefetchCount.

Наверх

Таблица свойств потребляемого сообщения RabbitMQ

Свойство Тип данных Описание
AppId string Наименование отправителя
MessageId string Идентификатор сообщения
Type string Тип сообщения
Body string Тело сообщения: DaJet Script ориентируется на текстовые сообщения в кодировке UTF-8.
ContentType string Тип содержимого тела сообщения.
Значение по умолчанию: application/json
ContentEncoding string Формат (кодировка) тела сообщения.
Значение по умолчанию: UTF-8
ReplyTo string Адресат для обратной связи, определяемый логикой приложения.
CorrelationId string Идентификатор корреляции сообщений между собой, определяемый логикой приложения.
Headers object Пользовательские заголовки (смотри документацию ниже)

Наверх

Пример кода DaJet Script

DECLARE @message object -- Сообщение RabbitMQ

CONSUME 'amqp://guest:guest@localhost:5672/dajet'
   WITH QueueName = 'test-queue', Heartbeat = 10
   INTO @message

USE 'mssql://server/database'

   INSERT РегистрСведений.ВходящиеСообщения
   SELECT НомерСообщения = VECTOR('so_incoming_queue')
        , Отправитель    = @message.AppId
        , ТипСообщения   = @message.Type
        , ТелоСообщения  = @message.Body

END

Наверх

Обработка пользовательских заголовков

Команда CONSUME получает пользовательские заголовки сообщения RabbitMQ в виде значения типа object. В ниже следующем примере заголовки сообщения сериализуются в формат JSON и сохраняются в ресурс “Заголовки” регистра сведений “ВходящиеСообщения”.

outgoing-queue-data

DECLARE @message object -- Сообщение RabbitMQ
DECLARE @headers string -- Заголовки в формате JSON

CONSUME 'amqp://guest:guest@localhost:5672/dajet'
   WITH QueueName = 'test-queue', Heartbeat = 10
   INTO @message

IF @message.Headers = NULL
   THEN SET @headers = 'нет заголовков'
   ELSE SET @headers = JSON(@message.Headers)
END

USE 'mssql://server/database'

   INSERT РегистрСведений.ВходящиеСообщения
   SELECT НомерСообщения = VECTOR('so_incoming_queue')
        , Отправитель    = @message.AppId
        , Заголовки      = @headers
        , ТипСообщения   = @message.Type
        , ТелоСообщения  = @message.Body

END

consume-message-headers

Наверх