Синхронизация потока сообщений
В некоторых случаях может потребоваться обрабатывать сигнал синхронизации, который генерирует команда CONSUME перед фиксацией транзакции обработки пакета своих сообщений (записей СУБД). Количество сообщений в этом пакете указывается в предложении TOP команды CONSUME. Такой сигнал генерируют также команды STREAM и потоковый UPDATE после полного своего выполнения, то есть выдачи последнего сообщения.
Для обработки сигнала синхронизации можно использовать опцию SYNC команды EXECUTE. Таким образом выполнение команды происходит только в случае получения сигнала. Функциональность полностью аналогична поведению синхронной версии команды EXECUTE.
EXECUTE SYNC 'file://<script>'
[DEFAULT 'file://<default-script>']
[WITH <parameters>]
[INTO <variable>]
Важно! Использование предложения INTO имеет смысл только в том случае, когда далее по конвейеру обработки сообщений есть команды, которые могут воспользоваться возвращаемым из команды EXECUTE SYNC значением. Дело в том, что не все команды реагируют на сигнал синхронизации. Большинство из них просто пропускают такой сигнал сквозь себя, то есть являются “прозрачными” по отношению к нему.
Данная версия SYNC команды EXECUTE может быть полезна, например, в тех случаях, когда нужно выполнить тот или иной алгоритм не для каждого сообщения в потоке, а только один раз для всего пакета.
Пример обработки сообщений по одному и пакетно
Ниже следующий набор скриптов демонстрирует обработку потока сообщений обычным способом - по одному за один раз, а также организацию пакетной обработки сообщений при помощи команды EXECUTE SYNC. Главный скрипт main.djs
использует для одиночной обработки сообщений скрипт message.djs
, а для пакетной - batch.djs
.
-- ************
-- * main.djs *
-- ************
DECLARE @index number
DECLARE @message object
DECLARE @Пакет array
DECLARE @Сообщение object
PRINT '[MAIN] НАЧАЛО'
-- Создаём буфер для формирования пакетов сообщений
SET @Пакет = ARRAY_CREATE()
USE 'mssql://server/database'
-- Создаём поток сообщений из базы данных
STREAM ТипСообщения, ТелоСообщения
INTO @message
FROM РегистрСведений.ИсходящиеСообщения
ORDER BY НомерСообщения ASC
-- Выполняем обычную потоковую обработку сообщения
EXECUTE 'file://message.djs'
WITH Сообщение = @message
-- Копируем сообщение из потока для формирования пакета
SET @Сообщение = SELECT Тип = @message.ТипСообщения
, Тело = @message.ТелоСообщения
-- Добавляем копию сообщения в новый пакет
SET @index = ARRAY_APPEND(@Пакет, @Сообщение)
-- Обрабатываем сигнал синхронизации: отправляем буфер на обработку
EXECUTE SYNC 'file://batch.djs'
WITH ПакетСообщений = @Пакет -- Сформированный пакет сообщений
END
PRINT '[MAIN] КОНЕЦ [' + ARRAY_COUNT(@Пакет) + ']'
-- ***************
-- * message.djs *
-- ***************
-- Входящий параметр: сообщение обмена
DECLARE @Сообщение object
-- Обрабатываем сообщение
PRINT '[FLOW] ' + JSON(@Сообщение)
-- *************
-- * batch.djs *
-- *************
-- Входящий параметр: буфер пакета сообщений
DECLARE @ПакетСообщений array
-- Вспомогательная переменная
DECLARE @index number
PRINT '[SYNC] НАЧАЛО [' + ARRAY_COUNT(@ПакетСообщений) + ']'
-- Выполняем обработку пакета сообщений
PRINT '[SYNC] ' + JSON(@ПакетСообщений)
-- ВАЖНО!
-- Очищаем переданный буфер для формирования следующего пакета
SET @index = ARRAY_CLEAR(@ПакетСообщений)
PRINT '[SYNC] КОНЕЦ [' + ARRAY_COUNT(@ПакетСообщений) + ']'
Результат выполнения скрипта main.djs
[2025-02-15 19:24:28] [MAIN] НАЧАЛО
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 1","ТелоСообщения":"body 1"}
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 2","ТелоСообщения":"body 2"}
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 3","ТелоСообщения":"body 3"}
[2025-02-15 19:24:28] [SYNC] НАЧАЛО [3]
[2025-02-15 19:24:28] [SYNC] [{"Тип":"type 1","Тело":"body 1"},{"Тип":"type 2","Тело":"body 2"},{"Тип":"type 3","Тело":"body 3"}]
[2025-02-15 19:24:28] [SYNC] КОНЕЦ [0]
[2025-02-15 19:24:28] [MAIN] КОНЕЦ [0]