Команда CONSUME
- Общее описание
- Источники данных
- Регистр сведений исходящих сообщений
- Таблица регистрации изменений плана обмена
- Приёмники данных
- Входящая таблица-очередь базы данных
- Хранимая процедура базы данных
- Топик или очередь RabbitMQ
- Топик Apache Kafka
- Вызов метода Web API
- Пользовательский обработчик на C#
- Дополнительные материалы
Общее описание
Команда CONSUME реализована DaJet Script специально для работы с таблицами СУБД, которые используются как очереди. Это могут быть очереди сообщений, событий или заданий для асинхронной обработки данных, организации обмена данными или решения любых других аналогичных задач. В контексте 1С:Предприятие 8 такими таблицами-очередями могут быть, например, таблицы регистрации изменений планов обмена или регистры сведений исходящих сообщений. Использование команды CONSUME дополняется механизмом управления последовательностью, а всё это, вместе взятое, призвано поддержать применение методики РИБ 2.0 на практике.
Команда CONSUME проектировалась таким образом, чтобы работать по принципу круглосуточной службы, целью которой является максимально эффективная обработка таблицы-очереди. Исходя из этого, хостинг скриптов, использующих команду CONSUME, целесообразно осуществлять при помощи, например, DaJet Script Host.
Синтаксис команды CONSUME выглядит следующим образом:
CONSUME TOP <batch-size> [WITH STRICT ORDER]
<select-column> [,...n]
INTO <object-variable>
FROM <table-queue>
[WHERE [NOT] <expression> [{ AND | OR | [NOT] (<expression>) }...n]]
ORDER BY <order-column> [{ ASC | DESC }] [,...n]
<batch-size> - размер пакета (количество записей), обрабатываемого в одной транзакции.
<select-column> - список полей через запятую, значения которых будут выбраны из таблицы-очереди.
<object-variable> - имя переменной типа object
, которая будет использована DaJet Script для поочерёдной обработки данных одной записи таблицы-очереди.
<table-queue> - имя таблицы-очереди, например, регистра сведений.
<order-column> - имена одного или нескольких полей таблицы-очереди для сортировки выборки в нужном порядке. Каждое поле может дополняться необязательным уточнением направления сортировки (по умолчанию ASC):
- ASC - по возрастанию (1.2.3)
- DESC - по убыванию (3.2.1)
WHERE - данное предложение фильтрации записей таблицы-очереди необязательно. Для построения логических выражений допускается использование ключевых слов AND, OR, NOT и оператора группировки (круглых скобок) аналогично стандартному синтаксису SQL
.
<expression> - логическое выражение вида <column> = <value>
, где
- <column> - имя поля таблицы-очереди, по которому выполняется фильтрация записей.
- <value> - значение фильтра: скалярное значение, имя переменной или обращение к свойству переменной типа
object
.
WITH STRICT ORDER - необязательная опция, которая убирает в генерируемом запросе к СУБД использование хинта READPAST
или SKIP LOCKED
для Microsoft SQL Server или PostgreSQL соответственно. Для PostgreSQL эта опция не оказывает какого-то практически значимого эффекта на поведение команды CONSUME. Использование опции WITH STRICT ORDER
описано в статье Нюанс № 1.
Функциональность команды CONSUME
- Выполнение всех действий только в транзакции СУБД, что гарантирует отсутствие потерь данных (записей таблицы-очереди) в случае возникновения ошибок, недоступности сетевых служб и прочих нештатных ситуаций.
- Обработка данных в определённом пользователем порядке, используя необязательное предложение ORDER BY. Для максимальной производительности рекомендуется использовать сортировку по полям кластерного индекса таблицы-очереди, соответствующую порядку следования этих полей в индексе и направлению их сортировки.
- Комбинированное применение пунктов 1 и 2 позволяет обеспечить гарантию доставки уровня
at least once in order
(минимум один раз в заданном порядке FIFO). - Пакетная обработка записей таблицы-очереди в целях оптимизации сетевого взаимодействия с базой данных и, как вариант, брокерами сообщений. Размер пакета (batch) определяется ключевым словом
TOP
команды CONSUME. Пакетная обработка значительно повышает производительность доставки данных. Рекомендуемый размер пакета, подходящий для большинства случаев - 1000 записей таблицы-очереди. - Удаление записей таблицы-очереди (очистка) только при успешном завершении транзакции обработки пакета, сразу же в момент её фиксации.
- Гибкое подключение разнообразных приёмников данных, используя следующие команды:
- INSERT (входящая таблица-очередь базы данных приёмника)
- PRODUCE (брокер RabbitMQ или Apache Kafka)
- REQUEST (хранимая процедура или вызов web api)
- PROCESS (пользовательский обработчик на C#)
- Возможность фильтрации записей таблицы-очереди при помощи предложения WHERE. Это очень полезная возможность для реализации сценариев, например, многопоточной обработки очереди. В данном случае для улучшения производительности может быть рекомендовано использование дополнительных индексов.
- Возможность высококонкурентной обработки таблицы-очереди параллельно выполняемыми скриптами или при помощи команды FOR … MAXDOP. Чаще всего на практике этого не требуется, однако, если, например, возникает необходимость обработать большую очередь, которая может накопиться в случае очень долгого простоя, то такая возможность будет не лишней.
- Защита от сбоев реализована таким образом, что, в случае возникновения любой нештатной или непредвиденной ситуации, команда CONSUME “засыпает” на
60 секунд
, а затем пытается продолжить свою работу в штатном режиме. Всё это происходит в бесконечном цикле, пока не будет остановлена программа, которая является хостом для скрипта. Например, можно вполне безопасно обновлять конфигурацию информационной базы 1С:Предприятие 8 в монопольном режиме, не останавливая работу хоста DaJet Script. Когда база данных 1С будет снова доступна, команда CONSUME продолжит свою работу как ни в чём не бывало.
Схема выполнения команды CONSUME
Диаграмма выполнения транзакции для одного пакета из 1000 записей
Пример кода DaJet Script для обмена СУБД » СУБД
DECLARE @message object
-- База данных источник
USE 'mssql://sa:sa@localhost:1433/source-database'
CONSUME TOP 10
НомерСообщения, Заголовки
, ТипСообщения, ТелоСообщения
INTO @message
FROM РегистрСведений.ИсходящиеСообщения
ORDER BY НомерСообщения ASC
-- База данных приёмник
USE 'pgsql://postgres:postgres@localhost:5432/target-database'
INSERT РегистрСведений.ВходящиеСообщения
SELECT Отправитель = 'DaJet Script'
, НомерСообщения = VECTOR('so_incoming_queue')
, Заголовки = @message.Заголовки
, ТипСообщения = @message.ТипСообщения
, ТелоСообщения = @message.ТелоСообщения
END -- Контекст базы данных приёмника
END -- Контекст базы данных источника
Пример кода Microsoft SQL Server, который генерирует DaJet Script
WITH queue AS
(SELECT TOP (10)
_Fld135 AS НомерСообщения,
_Fld220 AS Заголовки,
_Fld137 AS ТипСообщения,
_Fld138 AS ТелоСообщения
FROM _InfoRg134 WITH (ROWLOCK, READPAST)
ORDER BY
_Fld135 ASC)
DELETE queue
OUTPUT
deleted.НомерСообщения,
deleted.Заголовки,
deleted.ТипСообщения,
deleted.ТелоСообщения
Пример кода PostgreSQL, который генерирует DaJet Script
WITH filter AS
(SELECT
_fld99,
_fld100
FROM _InfoRg98
ORDER BY
_Fld99 ASC
LIMIT 10
FOR UPDATE SKIP LOCKED)
, queue AS
(DELETE FROM _InfoRg98 AS source USING filter
WHERE (source._fld99 = filter._fld99
AND source._fld100 = filter._fld100)
RETURNING
source._Fld99 AS НомерСообщения,
source._Fld101 AS ТипСообщения,
source._Fld102 AS ТелоСообщения)
SELECT
queue.НомерСообщения,
queue.ТипСообщения,
queue.ТелоСообщения
FROM queue
ORDER BY
queue.НомерСообщения ASC
Взаимодействие с брокерами RabbitMQ и Apache Kafka