新潟県最大級のポータイルサイトを情報技術を駆使して盛り上げたい方を募集しています。

Cloud FunctionsでBigQueryからStorageにCSVをエクスポートする方法(Python3.7編)

はじめに

株式会社ニューズ・ラインのバックエンドエンジニア(インフラもほんのりやってます)のemuraです。

さて、前回の説明した

Cloud FunctionsでFirestoreからStorageにCSVをエクスポートする方法(Python3.7編)

に続き、今回はCloud Functionsを使ってBigQueryのデータをStorage(Google Cloud Storage 以降Storageとする)にCSVでエクスポートする方法を説明します。

事前に必要なこと

  • GCPのプロジェクトに対する課金(Blazeプラン)が有効になっていること
  • BigQueryにデータがあること
  • StorageにCSVをエクスポートするバケットの用意

BigQueryからStorageへCSVをエクスポートする

Cloud Functionsの管理画面を開き、新規で関数を作成していきます。(関数の基本設定は省く)

ランタイムの選択

「Pytthon 3.7」を選択
※プログラム言語を指します。

エントリポイントの設定

今回は「main」を設定
※プログラムの開始メソッドの名前です。

requirements.txtの設定

下記のように書きます。

# Function dependencies, for example:
# package>=version
google-cloud-bigquery==1.26.1
google-cloud-storage==1.23.0

main.pyの設定

下記のように書きます。
【】の部分は自身のGCPの環境の設定値に変更してください。
※例ではBigQueryからデータを取得するSELECT文はid、url項目のデータをurlの正規表現によって抽出しています。
※ソースコード中のextract_job.result()以降のソースはPub/Subと連動する為のソースコードの為、Pub/Subを使わない方は不要なソースです。

import base64
from google.cloud import bigquery
from google.cloud import storage

def main(event, context):

     # BigQueryに接続
     client = bigquery.Client()
     # バケット名を設定
     bucket_name = "【CSVをエクスポートするStorageのバケット名】"
     # プロジェクト名を設定
     project = "【GCPのプロジェクト名】"
     # データセットIDを設定
     dataset_id = "【BigQueryのデータセット名】"
     # テーブル名を設定
     table_src = "【BigQueryのテーブル名】"
     # エクスポート用のtempテーブル名を設定
     table_dst = "{}_temp".format(table_src)
     # CSVファイル名を設定
     destination_uri = "gs://{}/export_{}.csv".format(bucket_name, table_dst)

     # SQL分の作成
     sql = """
     SELECT id, url
     FROM {}.{}.{}
     WHERE REGEXP_CONTAINS(url, r"^https://www.XXX.net/XXX/XXX")
     ORDER BY id;
     """.format(project, dataset_id, table_src)
     # tempテーブル作成準備
     job_config = bigquery.QueryJobConfig(destination="{}.{}.{}".format(project, dataset_id, table_dst))
     # テーブル作成モードは削除して上書きのモードを設定
     job_config.write_disposition = 'WRITE_TRUNCATE'
     # tempテーブル作成
     query_job = client.query(sql, job_config=job_config)
     query_job.result()
     # データセットに接続
     dataset_ref = bigquery.DatasetReference(project, dataset_id)
     # tempテーブル情報を取得
     table_ref = dataset_ref.table(table_dst)
     # tempテーブルデータをCSVファイルでエクスポート
     extract_job = client.extract_table(
     table_ref,
     destination_uri,
     )
     extract_job.result()

     """Triggered from a message on a Cloud Pub/Sub topic.
     Args:
          event (dict): Event payload.
          context (google.cloud.functions.Context): Metadata for the event.
     """
     pubsub_message = base64.b64decode(event['data']).decode('utf-8')
     print(pubsub_message)

関数のデプロイ

作成した関数をデプロイし、エクスポート先に設定したStorageのバケット配下にBigQueryのデータのCSVファイルがエクスポートされていればOKです。

今回苦労した点

下記、BigQuery独自の制限のようなものがあり、調査と実装にかなり時間がかかりました。

  • 毎度ですが、requirements.txtの書き方が分からなかった(公式含め、情報が少ない)
  • SELECT文の書き方がMySQLやSQL Serverなどと勝手が違う(LIKE句がないなど)
  • 直テーブルからエクスポートができないので、今回はSELECTした結果をtempテーブルに書き込んでからエクスポートした(直テーブルからのエクスポートの方法もあるかもしれないが分からなかったし、情報が少なかったた・・・)
  • 2回目以降にこの処理を行う場合、tempテーブルの更新ができなかったので、テーブルを削除して新規作成という流れでしかできなかった。その際の上書きモードなどのオプションの存在を知るのに時間がかかった。(他に方法があるのかもしれないが、分からなかった。)

おわりに

Cloud Functionsを少しずつ触り初めて分かってきましたが、やはりGCP内の各サービス間の処理であれば、わずか数行のソースコードで実装をすることができるので、さすがGoogleといった感じです。情報は少なめだが、勝手を理解すれば、便利なサービスだと感じました。次回はCloud Functionを使って、CSVファイルをBigQueryに読み込む(インポート)方法についても触れていきたいと思います。