![[Data Engineering] 데이터 엔지니어를 위한 GCP Dataproc 활용법: PySpark로 BigQuery 데이터 변환 및 적재](/static/d1fab369e5e68fc3d9da90f2051c59bc/c482c/bigquery.png)
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the destination table.
table_id = "lws-cloocus.ustest.ustable"
job_config = bigquery.QueryJobConfig(destination=table_id)
sql = 'SELECT * FROM `bigquery-public-data.covid19_weathersource_com.postal_code_day_history` LIMIT 34230421'
# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config) # Make an API request.
query_job.result() # Wait for the job to complete.
print("Query results loaded to the table {}".format(table_id))
# Source option
project = "lws-cloocus"
dataset_id = "ustest"
table_id = "ustable"
# 용량 많은 Table (1G이상)은 * 정규표현식으로 Table 읽어서 csv화 시켜야 함.
destination_uri = "gs://{}/{}".format(bucket_name, "result*.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print(
"Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)
# pip install jupyterlab_execute_time
pip install lckr-jupyterlab-variableinspector
jupyter nbextension enable toc2/main
위에서도 간단한 Test를 해봤지만 그래도 몇가지 pyspark 함수를 사용해보겠습니다.
df = spark.read.csv("gs://nasa_us/", header=True, inferSchema=True)
df.show(10) -- Row 10개 반환 df.count() -- Row 갯수 반환
df.summary().show()
위와 같이, ETL 전처리 작업을 모두 작성한 Pyspark Code를 자동화, 즉 관리하는 방법에 대해서도 알아야 합니다.
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
print(type(spark))
read_path = "gs://nasa_us/"
write_path = 'gs://proc_result/result/'
# def for columns cheange
# ------------------------------------------------------------------
def renameCols(df, old_columns, new_columns):
for old_col,new_col in zip(old_columns,new_columns):
df = df.withColumnRenamed(old_col,new_col)
return df
# Old_columns
old_columns = ['avg(min_temperature_air_2m_f)',
'avg(max_temperature_air_2m_f)',
'avg(avg_temperature_air_2m_f)'
]
# New_columns
new_columns = ['temperature_air_min_avg',
'temperature_air_max_avg',
'temperature_air_avg_avg'
]
# --------------------------------------------
# ----------------------
# Read CSV from GCS
df = spark.read.csv(read_path, header=True, inferSchema=True)
# data transform
df = df.groupBy('country', 'date').agg({'min_temperature_air_2m_f' : 'avg', 'max_temperature_air_2m_f' : 'avg', 'avg_temperature_air_2m_f' : 'avg'})
df2 = renameCols(df, old_columns, new_columns)
# Write CSV to GCS
df2.coalesce(1).write.option("header", "true").mode("overwrite").csv(write_path)