from google.cloud import bigquery # Construct a BigQuery client object. client = bigquery.Client() # TODO(developer): Set table_id to the ID of the destination table. table_id = "lws-cloocus.ustest.ustable" job_config = bigquery.QueryJobConfig(destination=table_id) sql = 'SELECT * FROM `bigquery-public-data.covid19_weathersource_com.postal_code_day_history` LIMIT 34230421' # Start the query, passing in the extra configuration. query_job = client.query(sql, job_config=job_config) # Make an API request. query_job.result() # Wait for the job to complete. print("Query results loaded to the table {}".format(table_id))
# Source option project = "lws-cloocus" dataset_id = "ustest" table_id = "ustable" # 용량 많은 Table (1G이상)은 * 정규표현식으로 Table 읽어서 csv화 시켜야 함. destination_uri = "gs://{}/{}".format(bucket_name, "result*.csv") dataset_ref = bigquery.DatasetReference(project, dataset_id) table_ref = dataset_ref.table(table_id) extract_job = client.extract_table( table_ref, destination_uri, # Location must match that of the source table. location="US", ) # API request extract_job.result() # Waits for job to complete. print( "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri) )
# pip install jupyterlab_execute_time
pip install lckr-jupyterlab-variableinspector
jupyter nbextension enable toc2/main
위에서도 간단한 Test를 해봤지만 그래도 몇가지 pyspark 함수를 사용해보겠습니다.
df = spark.read.csv("gs://nasa_us/", header=True, inferSchema=True)
df.show(10) -- Row 10개 반환 df.count() -- Row 갯수 반환
df.summary().show()
위와 같이, ETL 전처리 작업을 모두 작성한 Pyspark Code를 자동화, 즉 관리하는 방법에 대해서도 알아야 합니다.
from pyspark.context import SparkContext from pyspark.sql.session import SparkSession sc = SparkContext('local') spark = SparkSession(sc) print(type(spark)) read_path = "gs://nasa_us/" write_path = 'gs://proc_result/result/' # def for columns cheange # ------------------------------------------------------------------ def renameCols(df, old_columns, new_columns): for old_col,new_col in zip(old_columns,new_columns): df = df.withColumnRenamed(old_col,new_col) return df # Old_columns old_columns = ['avg(min_temperature_air_2m_f)', 'avg(max_temperature_air_2m_f)', 'avg(avg_temperature_air_2m_f)' ] # New_columns new_columns = ['temperature_air_min_avg', 'temperature_air_max_avg', 'temperature_air_avg_avg' ] # -------------------------------------------- # ---------------------- # Read CSV from GCS df = spark.read.csv(read_path, header=True, inferSchema=True) # data transform df = df.groupBy('country', 'date').agg({'min_temperature_air_2m_f' : 'avg', 'max_temperature_air_2m_f' : 'avg', 'avg_temperature_air_2m_f' : 'avg'}) df2 = renameCols(df, old_columns, new_columns) # Write CSV to GCS df2.coalesce(1).write.option("header", "true").mode("overwrite").csv(write_path)