Skip to the content.

STREAM

Многопоточный перенос данных

В данном примере используется команда параллельного выполнения кода FOR ... IN ... MAXDOP с выделением 4-х потоков операционной системы. Выборка элементов справочника “Номенклатура” делится также на 4 порции по 3 элемента каждая. Для добавления нового элемента или обновления уже существующего в базе-приёмнике используется конструкция TRY...CATCH. Это не самое лучшее решение, однако вполне рабочее. Такая техника называется “just do it” (просто сделай это). В данном случае она используется в целях упрощения кода для понимания. Кроме этого, обратите внимание, что предложение ORDER BY команды STREAM дополнено необязательными опциями OFFSET и FETCH NEXT. Коротко: они позволяют получать порции записей сортированной выборки по смещению от её начала. Более подробно можно прочитать в соответствующей документации по стандартному SQL.

DECLARE @record object
DECLARE @task object
DECLARE @tasks array
DECLARE @counter number = 0

PRINT 'Начало скрипта'

USE 'mssql://server/database'

   -- Формируем данные для параллельных заданий
   -- Всего 4 задания, каждое по 3 записи
   -- Деление на порции выполняется при помощи смещения от начала выборки
   SELECT Задание    = 'Поток 1' -- Имя задания (потока выполнения)
        , Смещение   = 0         -- Отступ (смещение) от начала выборки
        , Количество = 3         -- Количество записей после отступа
     INTO @tasks
    UNION ALL SELECT 'Поток 2', 3, 3
    UNION ALL SELECT 'Поток 3', 6, 3
    UNION ALL SELECT 'Поток 4', 9, 3

   -- Выделяем 4 потока для выполнения всех заданий
   FOR @task IN @tasks MAXDOP 4

      SET @counter = 0

      STREAM Ссылка, Код, Наименование
        INTO @record
        FROM Справочник.Номенклатура
       ORDER BY Код ASC
      OFFSET @task.Смещение ROWS
       FETCH NEXT @task.Количество ROWS ONLY

      -- Логируем значения для проверки
      PRINT @task.Задание
          + ' (' + @task.Смещение + ',' + @task.Количество
          + '): ' +  JSON(@record)

      -- Контекст базы данных приёмника
      USE 'pgsql://postgres:postgres@localhost:5432/database'
         TRY
            INSERT Справочник.Номенклатура
            SELECT Ссылка       = @record.Ссылка
                 , Код          = @record.Код
                 , Наименование = @record.Наименование
         CATCH
            UPDATE Справочник.Номенклатура
             WHERE Ссылка       = @record.Ссылка
               SET Код          = @record.Код
                 , Наименование = @record.Наименование
         END
      END -- Контекст базы данных приёмника

      SET @counter = @counter + 1

      -- Логируем выполнение для проверки
      IF @counter = @task.Количество THEN
         PRINT @task.Задание
             + ' (' + @task.Смещение + ',' + @task.Количество
             + '): Обработано ' + @counter + ' записи'
      END

   END -- Конец параллельного выполнения

END -- Контекст базы данных источника

PRINT 'Конец скрипта'

-- Результат выполнения скрипта
[2024-11-09 19:50:39] Начало скрипта
[2024-11-09 19:50:39] Поток 4 (9,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a16}","Код":"00000010","Наименование":"Товар 10"}
[2024-11-09 19:50:39] Поток 2 (3,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a10}","Код":"00000004","Наименование":"Товар 4"}
[2024-11-09 19:50:39] Поток 1 (0,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a0d}","Код":"00000001","Наименование":"Товар 1"}
[2024-11-09 19:50:39] Поток 3 (6,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a13}","Код":"00000007","Наименование":"Товар 7"}
[2024-11-09 19:50:40] Поток 1 (0,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a0e}","Код":"00000002","Наименование":"Товар 2"}
[2024-11-09 19:50:40] Поток 4 (9,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a17}","Код":"00000011","Наименование":"Товар 11"}
[2024-11-09 19:50:40] Поток 3 (6,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a14}","Код":"00000008","Наименование":"Товар 8"}
[2024-11-09 19:50:40] Поток 2 (3,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a11}","Код":"00000005","Наименование":"Товар 5"}
[2024-11-09 19:50:40] Поток 1 (0,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a0f}","Код":"00000003","Наименование":"Товар 3"}
[2024-11-09 19:50:40] Поток 4 (9,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a18}","Код":"00000012","Наименование":"Товар 12"}
[2024-11-09 19:50:41] Поток 3 (6,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a15}","Код":"00000009","Наименование":"Товар 9"}
[2024-11-09 19:50:41] Поток 2 (3,3): {"Ссылка":"{36:08ec109d-a06b-a1b1-11ee-ca472bff0a12}","Код":"00000006","Наименование":"Товар 6"}
[2024-11-09 19:50:41] Поток 1 (0,3): Обработано 3 записи
[2024-11-09 19:50:41] Поток 4 (9,3): Обработано 3 записи
[2024-11-09 19:50:41] Поток 3 (6,3): Обработано 3 записи
[2024-11-09 19:50:41] Поток 2 (3,3): Обработано 3 записи
[2024-11-09 19:50:41] Конец скрипта

Наверх