Skip to the content.

Начало

Синхронизация потока сообщений

В некоторых случаях может потребоваться обрабатывать сигнал синхронизации, который генерирует команда CONSUME перед фиксацией транзакции обработки пакета своих сообщений (записей СУБД). Количество сообщений в этом пакете указывается в предложении TOP команды CONSUME. Такой сигнал генерируют также команды STREAM и потоковый UPDATE после полного своего выполнения, то есть выдачи последнего сообщения.

Для обработки сигнала синхронизации можно использовать опцию SYNC команды EXECUTE. Таким образом выполнение команды происходит только в случае получения сигнала. Функциональность полностью аналогична поведению синхронной версии команды EXECUTE.

EXECUTE SYNC 'file://<script>'
[DEFAULT 'file://<default-script>']
[WITH <parameters>]
[INTO <variable>]

Важно! Использование предложения INTO имеет смысл только в том случае, когда далее по конвейеру обработки сообщений есть команды, которые могут воспользоваться возвращаемым из команды EXECUTE SYNC значением. Дело в том, что не все команды реагируют на сигнал синхронизации. Большинство из них просто пропускают такой сигнал сквозь себя, то есть являются “прозрачными” по отношению к нему.

Данная версия SYNC команды EXECUTE может быть полезна, например, в тех случаях, когда нужно выполнить тот или иной алгоритм не для каждого сообщения в потоке, а только один раз для всего пакета.

Пример обработки сообщений по одному и пакетно

Ниже следующий набор скриптов демонстрирует обработку потока сообщений обычным способом - по одному за один раз, а также организацию пакетной обработки сообщений при помощи команды EXECUTE SYNC. Главный скрипт main.djs использует для одиночной обработки сообщений скрипт message.djs, а для пакетной - batch.djs.

-- ************
-- * main.djs *
-- ************

DECLARE @index number
DECLARE @message object
DECLARE @Пакет array
DECLARE @Сообщение object

PRINT '[MAIN] НАЧАЛО'

-- Создаём буфер для формирования пакетов сообщений
SET @Пакет = ARRAY_CREATE()

USE 'mssql://server/database'

   -- Создаём поток сообщений из базы данных
   STREAM ТипСообщения, ТелоСообщения
     INTO @message
     FROM РегистрСведений.ИсходящиеСообщения
    ORDER BY НомерСообщения ASC

   -- Выполняем обычную потоковую обработку сообщения
   EXECUTE 'file://message.djs'
      WITH Сообщение = @message

   -- Копируем сообщение из потока для формирования пакета
   SET @Сообщение = SELECT Тип = @message.ТипСообщения
                         , Тело = @message.ТелоСообщения
   
   -- Добавляем копию сообщения в новый пакет
   SET @index = ARRAY_APPEND(@Пакет, @Сообщение)

   -- Обрабатываем сигнал синхронизации: отправляем буфер на обработку
   EXECUTE SYNC 'file://batch.djs'
      WITH ПакетСообщений = @Пакет -- Сформированный пакет сообщений
END

PRINT '[MAIN] КОНЕЦ [' + ARRAY_COUNT(@Пакет) + ']'
-- ***************
-- * message.djs *
-- ***************

-- Входящий параметр: сообщение обмена
DECLARE @Сообщение object

-- Обрабатываем сообщение
PRINT '[FLOW] ' + JSON(@Сообщение)
-- *************
-- * batch.djs *
-- *************

-- Входящий параметр: буфер пакета сообщений
DECLARE @ПакетСообщений array

-- Вспомогательная переменная
DECLARE @index number

PRINT '[SYNC] НАЧАЛО [' + ARRAY_COUNT(@ПакетСообщений) + ']'

-- Выполняем обработку пакета сообщений
PRINT '[SYNC] ' + JSON(@ПакетСообщений)

-- ВАЖНО!
-- Очищаем переданный буфер для формирования следующего пакета
SET @index = ARRAY_CLEAR(@ПакетСообщений)

PRINT '[SYNC] КОНЕЦ [' + ARRAY_COUNT(@ПакетСообщений) + ']'

Результат выполнения скрипта main.djs

[2025-02-15 19:24:28] [MAIN] НАЧАЛО
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 1","ТелоСообщения":"body 1"}
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 2","ТелоСообщения":"body 2"}
[2025-02-15 19:24:28] [FLOW] {"ТипСообщения":"type 3","ТелоСообщения":"body 3"}
[2025-02-15 19:24:28] [SYNC] НАЧАЛО [3]
[2025-02-15 19:24:28] [SYNC] [{"Тип":"type 1","Тело":"body 1"},{"Тип":"type 2","Тело":"body 2"},{"Тип":"type 3","Тело":"body 3"}]
[2025-02-15 19:24:28] [SYNC] КОНЕЦ [0]
[2025-02-15 19:24:28] [MAIN] КОНЕЦ [0]

Наверх