Skip to the content.

Назад

Команда CONSUME

Общее описание

Команда CONSUME реализована DaJet Script специально для работы с таблицами СУБД, которые используются как очереди. Это могут быть очереди сообщений, событий или заданий для асинхронной обработки данных, организации обмена данными или решения любых других аналогичных задач. В контексте 1С:Предприятие 8 такими таблицами-очередями могут быть, например, таблицы регистрации изменений планов обмена или регистры сведений исходящих сообщений. Использование команды CONSUME дополняется механизмом управления последовательностью, а всё это, вместе взятое, призвано поддержать применение методики РИБ 2.0 на практике.

Команда CONSUME проектировалась таким образом, чтобы работать по принципу круглосуточной службы, целью которой является максимально эффективная обработка таблицы-очереди. Исходя из этого, хостинг скриптов, использующих команду CONSUME, целесообразно осуществлять при помощи, например, DaJet Script Host.

Синтаксис команды CONSUME выглядит следующим образом:

CONSUME TOP <batch-size> [WITH STRICT ORDER]
<select-column> [,...n]
INTO <object-variable>
FROM <table-queue>
[WHERE [NOT] <expression> [{ AND | OR | [NOT] (<expression>) }...n]]
ORDER BY <order-column> [{ ASC | DESC }] [,...n]

<batch-size> - размер пакета (количество записей), обрабатываемого в одной транзакции.
<select-column> - список полей через запятую, значения которых будут выбраны из таблицы-очереди.
<object-variable> - имя переменной типа object, которая будет использована DaJet Script для поочерёдной обработки данных одной записи таблицы-очереди.
<table-queue> - имя таблицы-очереди, например, регистра сведений.
<order-column> - имена одного или нескольких полей таблицы-очереди для сортировки выборки в нужном порядке. Каждое поле может дополняться необязательным уточнением направления сортировки (по умолчанию ASC):

WHERE - данное предложение фильтрации записей таблицы-очереди необязательно. Для построения логических выражений допускается использование ключевых слов AND, OR, NOT и оператора группировки (круглых скобок) аналогично стандартному синтаксису SQL.
<expression> - логическое выражение вида <column> = <value>, где

WITH STRICT ORDER - необязательная опция, которая убирает в генерируемом запросе к СУБД использование хинта READPAST или SKIP LOCKED для Microsoft SQL Server или PostgreSQL соответственно. Для PostgreSQL эта опция не оказывает какого-то практически значимого эффекта на поведение команды CONSUME. Использование опции WITH STRICT ORDER описано в статье Нюанс № 1.

Наверх

Функциональность команды CONSUME

  1. Выполнение всех действий только в транзакции СУБД, что гарантирует отсутствие потерь данных (записей таблицы-очереди) в случае возникновения ошибок, недоступности сетевых служб и прочих нештатных ситуаций.
  2. Обработка данных в определённом пользователем порядке, используя необязательное предложение ORDER BY. Для максимальной производительности рекомендуется использовать сортировку по полям кластерного индекса таблицы-очереди, соответствующую порядку следования этих полей в индексе и направлению их сортировки.
  3. Комбинированное применение пунктов 1 и 2 позволяет обеспечить гарантию доставки уровня at least once in order (минимум один раз в заданном порядке FIFO).
  4. Пакетная обработка записей таблицы-очереди в целях оптимизации сетевого взаимодействия с базой данных и, как вариант, брокерами сообщений. Размер пакета (batch) определяется ключевым словом TOP команды CONSUME. Пакетная обработка значительно повышает производительность доставки данных. Рекомендуемый размер пакета, подходящий для большинства случаев - 1000 записей таблицы-очереди.
  5. Удаление записей таблицы-очереди (очистка) только при успешном завершении транзакции обработки пакета, сразу же в момент её фиксации.
  6. Гибкое подключение разнообразных приёмников данных, используя следующие команды:
    • INSERT (входящая таблица-очередь базы данных приёмника)
    • PRODUCE (брокер RabbitMQ или Apache Kafka)
    • REQUEST (хранимая процедура или вызов web api)
    • PROCESS (пользовательский обработчик на C#)
  7. Возможность фильтрации записей таблицы-очереди при помощи предложения WHERE. Это очень полезная возможность для реализации сценариев, например, многопоточной обработки очереди. В данном случае для улучшения производительности может быть рекомендовано использование дополнительных индексов.
  8. Возможность высококонкурентной обработки таблицы-очереди параллельно выполняемыми скриптами или при помощи команды FOR … MAXDOP. Чаще всего на практике этого не требуется, однако, если, например, возникает необходимость обработать большую очередь, которая может накопиться в случае очень долгого простоя, то такая возможность будет не лишней.
  9. Защита от сбоев реализована таким образом, что, в случае возникновения любой нештатной или непредвиденной ситуации, команда CONSUME “засыпает” на 60 секунд, а затем пытается продолжить свою работу в штатном режиме. Всё это происходит в бесконечном цикле, пока не будет остановлена программа, которая является хостом для скрипта. Например, можно вполне безопасно обновлять конфигурацию информационной базы 1С:Предприятие 8 в монопольном режиме, не останавливая работу хоста DaJet Script. Когда база данных 1С будет снова доступна, команда CONSUME продолжит свою работу как ни в чём не бывало.

Наверх

Схема выполнения команды CONSUME

Схема выполнения команды CONSUME

Диаграмма выполнения транзакции для одного пакета из 1000 записей

Диаграмма выполнения транзакции

Наверх

Пример кода DaJet Script для обмена СУБД » СУБД

DECLARE @message object

-- База данных источник
USE 'mssql://sa:sa@localhost:1433/source-database'

   CONSUME TOP 10
           НомерСообщения, Заголовки
         , ТипСообщения, ТелоСообщения
      INTO @message
      FROM РегистрСведений.ИсходящиеСообщения
     ORDER BY НомерСообщения ASC

   -- База данных приёмник
   USE 'pgsql://postgres:postgres@localhost:5432/target-database'

      INSERT РегистрСведений.ВходящиеСообщения
      SELECT Отправитель    = 'DaJet Script'
           , НомерСообщения = VECTOR('so_incoming_queue')
           , Заголовки      = @message.Заголовки
           , ТипСообщения   = @message.ТипСообщения
           , ТелоСообщения  = @message.ТелоСообщения

   END -- Контекст базы данных приёмника

END -- Контекст базы данных источника

Пример кода Microsoft SQL Server, который генерирует DaJet Script

WITH queue AS 
(SELECT TOP (10)
_Fld135 AS НомерСообщения,
_Fld220 AS Заголовки,
_Fld137 AS ТипСообщения,
_Fld138 AS ТелоСообщения
FROM _InfoRg134 WITH (ROWLOCK, READPAST)
ORDER BY
_Fld135 ASC)
DELETE queue
OUTPUT
deleted.НомерСообщения,
deleted.Заголовки,
deleted.ТипСообщения,
deleted.ТелоСообщения

Пример кода PostgreSQL, который генерирует DaJet Script

WITH filter AS 
(SELECT
_fld99,
_fld100
FROM _InfoRg98
ORDER BY
_Fld99 ASC
LIMIT 10
FOR UPDATE SKIP LOCKED)
, queue AS 
(DELETE FROM _InfoRg98 AS source USING filter
WHERE (source._fld99 = filter._fld99
AND source._fld100 = filter._fld100)
RETURNING
source._Fld99 AS НомерСообщения,
source._Fld101 AS ТипСообщения,
source._Fld102 AS ТелоСообщения)
SELECT
queue.НомерСообщения,
queue.ТипСообщения,
queue.ТелоСообщения
FROM queue
ORDER BY
queue.НомерСообщения ASC

Наверх

Взаимодействие с брокерами RabbitMQ и Apache Kafka

Схема выполнения команды CONSUME

Схема выполнения команды CONSUME

Наверх