之前学了Zeppelin的使用,今天开始结合Airflow串任务。
Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据处理和分析需求。
下面是一些结合使用Airflow和Zeppelin的方式:
Airflow调度Zeppelin Notebooks:
数据流管道:
参数传递:
日志和监控:
整合数据存储:
结合使用Airflow和Zeppelin能够充分发挥它们各自的优势,实现更全面、可控和可视化的数据处理和分析工作流。
安装参考:
https://airflow.apache.org/docs/apache-airflow/stable/start.html
CentOS 7.9安装后启动会报错,还需要配置下sqlite,参考:https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database
[root@slas bin]# airflow standalone
Traceback (most recent call last):
File "/root/.pyenv/versions/3.9.10/bin/airflow", line 5, in <module>
from airflow.__main__ import main
File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py", line 52, in <module>
from airflow import configuration, settings
File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 2326, in <module>
conf.validate()
File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 718, in validate
self._validate_sqlite3_version()
File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 824, in _validate_sqlite3_version
raise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old (< 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database
我想做个简单的demo,包括两个节点,实现如图所示功能,读取csv,去重:
csv文件输入在airflow上实现,去重在zeppelin上实现。
先实现extract_data_script.py,做个简单的读取csv指定列数据写入新的csv文件。
import argparse
import pandas as pd
def extract_and_write_data(date, input_csv, output_csv, columns_to_extract):
# 读取指定列的数据
csv_file_path = f"/home/works/datasets/data_{date}.csv"
df = pd.read_csv(csv_file_path, usecols=columns_to_extract)
# 将数据写入新的 CSV 文件
df.to_csv(output_csv, index=False)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--date", type=str, required=True, help="Date parameter passed by Airflow")
args = parser.parse_args()
# 输出 CSV 文件路径(替换为实际的路径)
output_csv_path = "/home/works/output/extracted_data.csv"
# 指定要提取的列
columns_to_extract = ['column1', 'column2', 'column3']
# 调用函数进行数据提取和写入
extract_and_write_data(args.date, input_csv_path, output_csv_path, columns_to_extract)
然后在 Zeppelin 中创建一个 Python 笔记本(Notebook),其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据:
%python
# 导入必要的库
import pandas as pd
# 加载先前从 CSV 文件中提取的数据
csv_file_path = "/home/works/output/extracted_data.csv"
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)
# 过滤掉 column1 为空的行
df = df[df['column1'].notnull()]
# 去重,以 column2、column3 字段为联合去重依据
deduplicated_df = df.drop_duplicates(subset=["column2", "column3"])
# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv("/home/works/output/dd_data.csv", index=False)
将这个 Zeppelin 笔记本保存,并记住笔记本的 ID, Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。
接下来,用VSCode编写zeppelin_integration.py代码如下,上传到$AIRFLOW_HOME/dags目录下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'zeppelin_integration',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
extract_data_task = BashOperator(
task_id='extract_data',
bash_command='python /home/works/z/extract_data_script.py --date {{ ds }}',
dag=dag,
)
run_zeppelin_notebook_task = BashOperator(
task_id='run_zeppelin_notebook',
bash_command='curl -X POST http://zeppelin-server:zeppelin-port/api/notebook/job/notebook-id',
dag=dag,
)
# 设置关联度
extract_data_task >> run_zeppelin_notebook_task
如下命令:
[root@slas dags]# airflow dags trigger zeppelin_integration
[2024-01-19T11:23:54.331+0800] {__init__.py:42} INFO - Loaded API auth backend: airflow.api.auth.backend.session
conf | dag_id | dag_run_id | data_interval_start | data_interval_end | end_date | external_trigger | last_scheduling_decision | logical_date | run_type | start_date | state
=====+======================+===================================+===========================+===========================+==========+==================+==========================+===========================+==========+============+=======
{} | zeppelin_integration | manual__2024-01-19T03:23:54+00:00 | 2024-01-18T03:23:54+00:00 | 2024-01-19T03:23:54+00:00 | None | True | None | 2024-01-19T03:23:54+00:00 | manual | None | queued
页面上会增加一个DAG,如图:
在Actions里点击执行。
以上就是今天要讲的内容,总体来说集成两个工具还是很方便的,期待后面更多的应用。