HomeAbout
[Data Engineering] GCP DataFlow로 구현하는 BigQuery ETL: PySpark 활용 실전 가이드
Data Engineering
[Data Engineering] GCP DataFlow로 구현하는 BigQuery ETL: PySpark 활용 실전 가이드
NASA1515
NASA1515
September 02, 2021
1 min

목차

01
✔ DataFlow에 대해서..
02
✌ DataFlow 사용을 위한 환경 구성
03
👍 python code
04
👀 이제 코드에 대한 설명을 이어서 하겠습니다.
05
🙌 실행 결과

✔ DataFlow에 대해서..

  • DataFlow는 GCP에서 DataPipeline(ETL, MR 등)을 Apache Beam 기반으로 동작하도록 만든 Runtime Service 입니다.
    간단하게 Streming이나 Batch 처리를 PaaS로 사용 가능합니다. 단 Apache beam에 종속되어 있어서 beam SDK를 봐야하는 불편한 부분은 있습니다.

일단 이번 포스트에서 간단하게 구성하려고 하는 아키텍쳐는 다음과 같습니다.

캡처

  • Batch 형태의 Data [CSV]를 GCS에 Upload하면 해당 File Parsing 후 DW에 적재.

✌ DataFlow 사용을 위한 환경 구성

  • 저는 Local의 VScode에서 Code를 사용 할 것이기 때문에 DataFlow 사용하기 위한 환경을 구성하는 것부터 진행하도록 하겠습니다.

DataFlow를 사용하기 위해서는 아래 API들이 필요합니다. [추가해줍니다.]

설치 API 목록

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager APIs

이후 새로운 Service Account를 생성합니다.

  • 권한 : 소유자
  • KeyFile : Json
    • KeyFilw을 Local로 받아놔야 합니다!

이제 GCS(Google Cloud Storage)를 생성합니다.

  • Storage Class : Standard
  • Single Region
    • 가급적이면 비용적으로라도 BigQuery를 사용 할 Region과 맞춰주세요!

최종적으로 Data를 적재 할 Bigquery DataSet을 생성합니다.

캡처2

  • BigQuery Project : lws-cloocus
  • BigQuery DataSet : Test

자 여기까지 간단한 환경설정은 완료되었고 Code를 설명하면서 추가적으로 보겠습니다.


👍 python code


최종적인 DataPipeline의 Python Code 입니다.

from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
import json
import os

from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.pipeline import PipelineOptions
from google.cloud import storage
from google.oauth2 import service_account
import pandas as pd

# GCP Service Account Key env 윈도우에서는 환경변수로 설정가능
storage_client = storage.Client.from_service_account_json('C:\GCP\lws-cloocus-d4fde98375c7.json')
        
# for linux "service account key" 
#GOOGLE_APPLICATION_CREDENTIALS('/home/nasa1515/dataflow/lwskey.json')
# word length code

class WordExtractingDoFn(beam.DoFn):
def process(self, element):
    
    splited = element.split(',')
    writestring = ({'id': splited[0], 'price': splited[1], 'manufacturer': splited[2], 'condition': splited[3]})
    #writestring = {'splited[0], splited[1], splited[2], splited[3]'}
    return [writestring]

# parser option code
def run(argv=None, save_main_session=True):
 
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',dest='input',required=False,help='default'
    ,default='gs://storage_nasa1515/data/batch2.csv')
parser.add_argument(
    '--output',dest='output',required=False,help='default'
    ,default='lws-cloocus:nasa1515.batchtest')
 
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)

# pipline option

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'lws-cloocus'
google_cloud_options.job_name = 'test-to-big'
google_cloud_options.staging_location = 'gs://storage_nasa1515/staging'
google_cloud_options.temp_location = 'gs://storage_nasa1515/temp'
google_cloud_options.region = 'asia-northeast3'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'

# # test1

p = beam.Pipeline(options = PipelineOptions(pipeline_args))

with beam.Pipeline(options=pipeline_options) as p:
 
    table_schema = {
        'fields': [
            {"name": "id", "type": "STRING", "mode": "NULLABLE"}, 
            {"name": "price", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "manufacturer", "type": "STRING", "mode": "NULLABLE"},
            {"name": "condition", "type": "STRING", "mode": "NULLABLE"}
        ]
    }
    
    (p 
        | 'Read Data' >> ReadFromText(known_args.input)
        | beam.ParDo(WordExtractingDoFn(WordExtractingDoFn))
        | 'write to BigQuery' >> beam.io.WriteToBigQuery(
            known_args.output,
            schema = table_schema,
            method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
            create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
    )

result = p.run()
result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

기본적으로 GCP DataFlow에서 beam SDK를 사용하기 위해선 아래 SDK의 설치가 필요합니다.

pip install apache-beam[gcp]

SDK 선언 코드

from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
import json
import os
import pandas as pd

from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.pipeline import PipelineOptions
from google.cloud import storage
from google.oauth2 import service_account

사용 권한을 얻기 위한 코드

  • 아까 Service Account를 생성하며 발급 받았던 Key의 위치를 선언해줍니다.
    Window/Linux 모두 환경변수로 GOOGLE_APPLICATION_CREDENTIALS 설정하면 됩니다.
# GCP Service Account Key env
storage_client = storage.Client.from_service_account_json('C:\GCP\lws-cloocus-d4fde98375c7.json')
        
# for linux/window "service account key" 

#GOOGLE_APPLICATION_CREDENTIALS('/home/nasa1515/dataflow/lwskey.json')

파이프라인 설정 코드

## ,으로 구분된 CSV File을 Bigquery가 인식하는 ,으로 구분된 Json File로 변환하는 코드

class WordExtractingDoFn(beam.DoFn):
def process(self, element):
    
    splited = element.split(',')
    writestring = ({'id': splited[0], 'price': splited[1], 'manufacturer': splited[2], 'condition': splited[3]})
    #writestring = {'splited[0], splited[1], splited[2], splited[3]'}
    return [writestring]

## pasing 할 데이터를 가져오고 write 할 Bigquery에 대한 정보들을 입력

# parser option code
def run(argv=None, save_main_session=True):
 
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',dest='input',required=False,help='default'
    ,default='gs://storage_nasa1515/data/batch2.csv')
parser.add_argument(
    '--output',dest='output',required=False,help='default'
    ,default='lws-cloocus:nasa1515.batchtest')

known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)

# pipline option

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'lws-cloocus'
google_cloud_options.job_name = 'test-to-big'
google_cloud_options.staging_location = 'gs://storage_nasa1515/staging'
google_cloud_options.temp_location = 'gs://storage_nasa1515/temp'
google_cloud_options.region = 'asia-northeast3'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
  • 이부분에서 주요하게 봐야 할 점을 Bigquery가 받아 들일 수 있는 파일의 형식입니다.

제가 Pasing 하려고 하는 Batch 성 데이터는 다음과 같습니다.


그러나 Bigquery에서는 미리 정의한 Schema 형태의 ,JSON으로만 Data를 Load 할 수 있습니다.


만약 해당 Data 형태로 Parsing이 되지 않으면 아래와 같은 Error가 발생하며 Job이 멈춥니다..

  • 때문에 위의 자료형대로 형식을 맞춰주는 건 매우 중요합니다..[여기서 뻘짓을 너무 많이했어요..]

또한 google_cloud_options에 GCS의 경로들은 전부 미리 생성되어있어야 합니다.

  • staging
  • temp

👀 이제 코드에 대한 설명을 이어서 하겠습니다.


파이프라인 실행 코드

p = beam.Pipeline(options = PipelineOptions(pipeline_args))

with beam.Pipeline(options=pipeline_options) as p:
 
    table_schema = {
        'fields': [
            {"name": "id", "type": "STRING", "mode": "NULLABLE"}, 
            {"name": "price", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "manufacturer", "type": "STRING", "mode": "NULLABLE"},
            {"name": "condition", "type": "STRING", "mode": "NULLABLE"}
        ]
    }
    
    (p 
        | 'Read Data' >> ReadFromText(known_args.input)
        | beam.ParDo(WordExtractingDoFn(WordExtractingDoFn))
        | 'write to BigQuery' >> beam.io.WriteToBigQuery(
            known_args.output,
            schema = table_schema,
            method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
            create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
    )

result = p.run()
result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
  • 저는 위처럼 table_schema로 JSON 형태의 Schema를 미리 정의해서 사용했습니다.
  • create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    • 해당 구문은 BigQuery DataSet에 Table이 없으면 만들어 주는 옵션입니다.

🙌 실행 결과

  • 코드가 모두 짜여졌으니 코드를 실행시켜보죠.

실행하면 다음과 같이 DataFlow Tab에서 결과를 확인 할 수 있습니다.

  • 항목마다 어떤 부분을 성공했는지 자세하게 볼 수 있습니다.

DataFlow Job이 성공이니 BigQuery Table을 확인 해 볼까요?

  • 다음과 같이 간단한 쿼리문을 BigQuery에 날려보니 Data가 제대로 들어갔네요!

Tags

#CLOUD#GCP#Data Engineering
NASA1515

NASA1515

Data Engineer

Hello I'M Wonseok aka NASA1515

Expertise

Public Cloud
k8s/Docker
Python

Social Media

instagramwebsitelinkedingithub

Related Posts

[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
2024-02-10
7 min

Topics

CloudDevelop

Social Media