Въздушен поток: Как да използвате за планиране на хиляди задачи в един кадър

В тази публикация ще обсъдя как можем да планираме хиляди задачи в рамките на един даг. Няма да се съсредоточа върху това какво е Airflow и как можете да го инсталирате, но вместо това ще обсъдя как можем да планираме голям брой задачи в рамките на един даг.

По принцип Airflow е проектиран да има множество DAG и вътре в този DAG може да има стотици или хиляда задачи. И така, какво се случва, когато искаме да планираме голям брой задачи, да кажем около 60000 или повече от това? Това обясних в този блог.

Работя с Airflow, за да автоматизирам работния си процес. Но в моята компания имаме наистина голям обем данни и опитах, използвайки различни версии на Airflow и поради наистина огромни данни, имам почти 70000 задачи вътре в един DAG. Опитах различни версии на Airflow и най-новата версия може да планира 5000 задачи, но ако искаме да насрочим повече от това, планировникът остава в състояние на работа, без да планира задачите. Открих всеки проблем и проверих как да го реша каква е действителната причина и накрая въпреки това да напиша този блог.

Това е един от случаите на използване в Airflow, когато имате хиляди задачи в един DAG. За начало трябва да използваме версия на Airflow 1.10.3, след което да не се фокусираме върху голям брой задачи, така че трябва да използваме Airflow версия 1.10.3. За да инсталирате тази версия, следвайте стъпките по-долу:

  • Първо трябва да създадем нова среда и да я активираме, като използваме следната команда:
conda създаване -n въздушен поток_3
conda активирайте въздушния поток_3
  • За да инсталирате Airflow с конкретна версия 1.10.3 използвайте следната команда:
conda install -c conda-forge airflow == 1.10.3
  • Трябва да се уверите в някои специфични изисквания, че тази версия не работи с колба≥1.0.9, така че ако имате колба по-голяма от тази версия, използвайте следната команда:
pip инсталирате колба == 1.0.4
pip install funcsigs == 1.0.0 (Това е друго изискване, което трябва да бъде инсталирано)
  • И се препоръчва да се използва Celery Executor, когато работим с толкова много голям брой задачи, тъй като трябва да паралелизираме тези задачи и това може да се постигне с помощта на Celery Executor. За да инсталирате Celery използвайте следната команда:
пип инсталирайте целина
  • Трябва да използвате работници и да настроите брокер, който да използва изпълнител на целина, аз използвам RabbitMQ като брокер. TO настройка на брокера URL може да използва следната структура:
broker_url = amqp: // „потребителско име“: „парола“ @ „име на хост“: „порт“ /

например

broker_url = amqp: // гост: гост @ localhost: 5672 /
  • За да видите потребителския интерфейс на Celery Executor, можем да използваме Flower, за да го инсталираме, използвайки следната команда:
conda install -c conda-forge цвете
  • След това трябва да променим някаква конфигурация, за да изпълняваме хиляди задачи паралелно и да планираме хиляди задачи в един кадър.
[Ядро]
executor = Паралелизъм на CeleryExecutor = 200000 non_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[Scheduler]
max_thread = 10 (Може да използва нишки според вашата програма, като го увеличава или намалява)

Това са основните настройки, когато искате да планирате хиляди задачи в един кадър. Трябва да го коригирате според това колко максимални DAG-та искате да изпълните паралелно и колко задачи имате в един DAG.

Основният параметър е „Non_pooled_task_slot_count“, който беше премахнат от Airflow версия 1.10.4, така че аз използвам 1.10.3, тъй като този параметър играе много важна роля при планирането на задачите.

Основната разлика след премахването на „Non_pooled_task_slot_count“ е, че той използва default_pool, който е зададен на 128 по подразбиране (може да го увеличи според изискването). Основната работа на „Non_pooled_task_slot_count“ е да планира задачите и тя не е свързана към default_pool или друг номер на връзка от базата данни, така че можем да увеличим този брой колкото искате, но ако увеличите броя на слотовете в „default_pool“ тогава той също е свързан с връзките към базата данни, които имате и не можете да имате 100000 връзки към база данни в момент, който работи паралелно. По принцип „Non_pooled_task_slot_count“ се премахна в полза на „default_pool“.

Тази публикация съдържа отговора на въпроса, че защо планировчикът се затруднява, заседна, не планира голям брой задачи или работи цял ден, без да прави нищо. Всички отговори имат един отговор за използване на Airflow версия 1.10.3.

Когато използвате Airflow 1.10.3, трябва да определим кой пул трябва да се използва от DAG, тъй като той не използва „default_pool“ по подразбиране, така че докато създаваме задачи, трябва да преминем para mater pool = 'defautl_pool'. Можете да създадете 'default_pool' с помощта на потребителски интерфейс (Администратор -> пулове) или можете да направите чрез командния ред:

airflow pool -s default_pool 128 'default pool'.

Ето примера на извадката DAG:

import os от datetime import datetime, timedelta импортиране на въздушен поток от въздушен поток import DAG от airflow.operators.dummy_operator import DummyOperator
default_args = {'owner': 'Airflow', 'зависи_on_past': False, 'start_date': airflow.utils.dates.days_ago (2), 'retries': 1, 'retry_delay': timedelta (минути = 1),}
dag = DAG ('dummy_try1', default_args = default_args, raspored_interval = Няма)
за i in range (50000): задачи = DummyOperator (task_id = '{}'. формат (i), dag = dag, pool = 'default_pool)

Можете да проверите разликата между всички версии на линка по-долу:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104