CSV(Comma-Separated Values)是一种常见的数据交换格式,每一行代表一条记录,每个字段由逗号分隔。PostgreSQL是一种强大的开源关系型数据库管理系统。Airflow是一个用于创建、调度和监控工作流的开源平台。
pandas
库可以方便地读取CSV文件。psycopg2
库连接PostgreSQL数据库并插入数据。以下是一个简单的Airflow DAG示例,展示如何从CSV文件读取数据并加载到PostgreSQL数据库。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
import psycopg2
# 定义默认参数
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
# 定义DAG
dag = DAG('csv_to_postgres', default_args=default_args, schedule_interval='@daily')
def load_csv_to_postgres():
# 读取CSV文件
df = pd.read_csv('/path/to/your/file.csv')
# 连接到PostgreSQL数据库
conn = psycopg2.connect(
host="localhost",
database="your_database",
user="your_user",
password="your_password"
)
# 创建游标
cur = conn.cursor()
# 插入数据
for index, row in df.iterrows():
cur.execute(
"INSERT INTO your_table (column1, column2, column3) VALUES (%s, %s, %s);",
(row['column1'], row['column2'], row['column3'])
)
# 提交事务
conn.commit()
# 关闭连接
cur.close()
conn.close()
# 定义任务
load_task = PythonOperator(
task_id='load_csv_to_postgres',
python_callable=load_csv_to_postgres,
dag=dag,
)
# 设置任务依赖
load_task
if __name__ == "__main__":
dag.cli()
通过以上步骤和示例代码,你可以实现从CSV文件读取数据并加载到PostgreSQL数据库的功能。
领取专属 10元无门槛券
手把手带您无忧上云