HomeAbout
[Data Engineering] 데이터 엔지니어를 위한 GCP Dataproc 활용법: PySpark로 BigQuery 데이터 변환 및 적재
Data Engineering
[Data Engineering] 데이터 엔지니어를 위한 GCP Dataproc 활용법: PySpark로 BigQuery 데이터 변환 및 적재
NASA1515
NASA1515
September 08, 2021
2 min

목차

01
✌ DataProc Cluster 생성
02
🙌 Data 준비하기
03
👍 Jupyter Notebook 연결
04
👏 Pyspark Test
05
👏 DataProc JOB을 사용해서 Pyspark Code 관리
06
👍 Python Script
07
👌 DataProc Job 생성
08
🐱‍🏍 결과 확인

✌ DataProc Cluster 생성


  • GCP 탐색 메뉴 > Dataproc > 클러스터 선택 > 클러스터 만들기


  • 클러스터 필드 설정 (이름을 제외한 나머지 부분은 기본값)


  • 프로비저닝 과정을 3분정도 거치고 다음과 같이 생성이 완료됩니다.


🙌 Data 준비하기

  • 이번 포스트에서는 BigQuery에서 제공하는 Public DataSet을 이용합니다.
    Dataproc Cluster는 GCS Connector를 기본으로 제공하여 다른 설정없이 GCS에 있는 데이터에 바로 액세스가 가능합니다.
    BigQuery의 DataSet의 특정 테이블을 Cloud Storage로 Export하여 데이터에 바로 접근하여 퍼포먼스 테스트를 해보겠습니다.

데이터 정보

  • Table ID : bigquery-public-data:covid19_weathersource_com.postal_code_day_history
  • Table 크기 : 약 300

데이터 형식 (데이터의 내용은 나라 별 COVID-19의 기상상태 데이터입니다.)


Cloude Storage 생성 (GCS) - 쿼리 결과 데이터(CSV)를 저장하는 위치

  • Region을 Bigquery와 맞춰주어야 합니다.

BigQuery DataSet, Table 생성 (쿼리 결과 데이터를 쌓는 곳)



저는 Python으로 간단한 코드를 작성해서 다음과 같이 데이터를 분류했습니다.


공개 DataSet에서 쿼리 결과를 다른 테이블에 저장하는 코드

from google.cloud import bigquery

 # Construct a BigQuery client object.
 client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the destination table.
table_id = "lws-cloocus.ustest.ustable"
job_config = bigquery.QueryJobConfig(destination=table_id)
sql = 'SELECT * FROM `bigquery-public-data.covid19_weathersource_com.postal_code_day_history` LIMIT 34230421'

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(table_id))
  • 아셔야 하는 건 Table을 복사하는 것과 데이터만(쿼리결과)복사하는 것은 다릅니다. Table을 그대로 복사하게되면 Table의 정보까지 저장됩니다.


해당 Code를 실행시키게 되면 다음과 같이 특정 Table에 쿼리결과가 저장됩니다.



쿼리결과가 저장되어있는 Table의 데이터를 csv로 변환해서 GCS로 저장

# Source option
project = "lws-cloocus"
dataset_id = "ustest"
table_id = "ustable"

# 용량 많은 Table (1G이상)은 * 정규표현식으로 Table 읽어서 csv화 시켜야 함.
destination_uri = "gs://{}/{}".format(bucket_name, "result*.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location="US",
)  # API request
extract_job.result()  # Waits for job to complete.

print(
    "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)
  • BigQeury에서 Data를 export 할 때 한번에 1GB 단위까지 밖에 지원되지 않습니다. 때문에 * 와일드카드를 사용해서 CSV File을 분리해줘야 합니다.


해당 코드를 실행시키면 다음과 같이 GCS에 Data가 저장됩니다.

  • 다음과 같이 용량이 일정하게 나눠서 저장됩니다.


저장된 CSV File을 Local로 다운받아서 NotePad로 확인해보죠

  • 다음과 같이 맨 윗줄은 헤더 정보, 나머지는 데이터 값만 저장됩니다.

👍 Jupyter Notebook 연결

  • 이제 간단한 쿼리 테스트를 진행하기 위해 Jupyter Notebook을 연결 하려고 했는데… Cluster를 잘못 생성했네요 아래 구성요소 GW를 사용하는 옵션을 체크해야합니다.

구성요소 게이트웨이

  • 게이트웨이 옵션 체크
  • 구성요소에서 Jupyter Notebook 체크


클러스터가 새롭게 만들어졌다면 클러스터 정보-> 웹 인터페이스로 접속합니다.

  • 그럼 다음과 같이 JupyterLab GW link가 생기고 접속합니다.


이제 DataProc Cluster와 연결된 Jupyter Page에 접속이 가능합니다.



  • 이 후에 GCS에 저장된 csv를 읽는 것도 가능합니다.



저는 쿼리 결과 시간이나, table 형태로 결과를 보고 싶어서 extension을 추가 설치 했습니다.


Jupyter에서 Terminal 창을 연 뒤 아래 명령어로 설치합니다.

# pip install jupyterlab_execute_time

설치가 완료 된 뒤 연결된 WEB을 새로고침 하면 execure-time 이 설치되어 있습니다.


그 후 Settings - Advanced Settings editor - Notebook에 아래 코드를 추가합니다.


그럼 다음과 같이 쿼리 실행 시간이 출력됩니다.!

3213121



저는 필요해보이는 추가 Extention을 더 설치해줬습니다.

  • variableinspector
pip install lckr-jupyterlab-variableinspector
  • TOC
jupyter nbextension enable toc2/main

👏 Pyspark Test

위에서도 간단한 Test를 해봤지만 그래도 몇가지 pyspark 함수를 사용해보겠습니다.

CSV File 읽어오기 (GCS에 있는)

df = spark.read.csv("gs://nasa_us/", header=True, inferSchema=True)
  • gs://nasa_us/ 경로에 있는 모든 파일을 읽습니다.
  • read.csv 옵션인 header, inferSchema를 사용했습니다.

2221313

약 45.96s 가 소요 되었습니다.



해당 DataFrame의 Row 반환

df.show(10)    -- Row 10개 반환
df.count()     -- Row 갯수 반환

11111

  • show에는 546ms 가 소요 되었습니다.
  • count에는 13.81s 가 소요 되었습니다.


해당 DataFrame의 Summary 값 반환

df.summary().show()

222222

Summary에는 4m 16.06s 가 소요 되었습니다.




👏 DataProc JOB을 사용해서 Pyspark Code 관리

위와 같이, ETL 전처리 작업을 모두 작성한 Pyspark Code를 자동화, 즉 관리하는 방법에 대해서도 알아야 합니다.

✔ Data

Data의 경우에는 Covid-19의 기상 데이터를 기반으로 진행합니다.

12312312

  • 용량 : 약 51GB
  • 행 : 542,304,210


데이터 형식 및 포맷 요약

2222


👍 Python Script

위의 DataSet 에서 특정 그룹(나라, 날짜) 별로 MAX,MIN,AVG 값들의 평균 값을 구하는 Python Code

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
print(type(spark))

read_path = "gs://nasa_us/"
write_path = 'gs://proc_result/result/'

# def for columns cheange

# ------------------------------------------------------------------
def renameCols(df, old_columns, new_columns):
    for old_col,new_col in zip(old_columns,new_columns):
        df = df.withColumnRenamed(old_col,new_col)
    return df

# Old_columns
old_columns = ['avg(min_temperature_air_2m_f)',
                'avg(max_temperature_air_2m_f)',
                'avg(avg_temperature_air_2m_f)'
                ]

# New_columns
new_columns = ['temperature_air_min_avg',
                'temperature_air_max_avg',
                'temperature_air_avg_avg'
                ]
# --------------------------------------------
# ----------------------

# Read CSV from GCS
df = spark.read.csv(read_path, header=True, inferSchema=True)

# data transform
df = df.groupBy('country', 'date').agg({'min_temperature_air_2m_f' : 'avg', 'max_temperature_air_2m_f' : 'avg', 'avg_temperature_air_2m_f' : 'avg'})

df2 = renameCols(df, old_columns, new_columns)

# Write CSV to GCS
df2.coalesce(1).write.option("header", "true").mode("overwrite").csv(write_path)
  • 간단 설명 : GCS에서 CSV Format의 Data를 읽고 ETL 작업 후 결과를 GCS에 저장
  • Bigquery Table Data를 csv화 시키고 GCS에 저장하는 방법은 이전 포스트를 확인하세요


👌 DataProc Job 생성



DataProc - JOB -> 작업 제출로 JOB을 생성합니다.

333


Cluster는 실행 할 Cluster를 지정합니다.

  • 작업 유형은 Pyspark를 선택합니다.
  • Python File의 경우 미리 GCS에 올려놓고 지정하면 됩니다.


해당 작업이 생성되면서 실행되게 되고 JOB의 완료 된 후에는 결과 및 로그가 출력되게 됩니다.

캡처55555



🐱‍🏍 결과 확인


Script 실행대로 GCS에 ETL 결과 파일이 다음과 같이 저장되었습니다.



그럼 해당 CSV 파일을 기반으로 BigQuery에 Table을 만들어 보겠습니다.



데이터를 확인해보면 Script에서 실행 된 ETL 결과만 남아있는 것을 확인 가능합니다.

  • 스키마 데이터

    77777


  • 결과 데이터


Tags

#Cloud#GCP#Data Engineering
NASA1515

NASA1515

Data Engineer

Hello I'M Wonseok aka NASA1515

Expertise

Public Cloud
k8s/Docker
Python

Social Media

instagramwebsitelinkedingithub

Related Posts

[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
2024-02-10
7 min

Topics

CloudDevelop

Social Media