はじめに
GCPで定期的にBitflyerのチャート情報を取得して、データを貯めていくツールを構築していきます。
このツールをベースに、ビットコインの取引機能を実装することで、自動取引ができるようにしたいと思います。
どんなものを構築するのか
構築するツールの特徴は、ざっくり以下のようになります。
- GCP上に構築
- 15分ごとにBitflyerの15分足チャートを取得
- CSV形式でGoogle Cloud Storageに保管
構成
構成は下記のようになります。
Cloud SchedulerとPub/Subを使い、Cloud Functionsを定期実行し、Cryptowatchからチャートを取得して、Cloud Storageのデータと結合させて保存するという形になります。
Cloud Functionsの定期実行に関しては、下記を参考にしてください。

料金
料金については、Cloud Functinsのリソースや保存するCSVファイルの大きさにもよりますが、月数十円程度はかかるかと思います。
各種サービスの料金を確認の上、ご利用ください。




実装
まずは、Cloud Functionsにデプロイするアプリケーションの実装をします。
コードは下記のリポジトリにあります。
用意するファイルは下記の通りです。一つにファイルをまとめても良いですが、今後取引機能の実装などが追加されるのを考慮して、分割しています。
.
├── bitflyer_data_getter.py
├── const.py
├── main.py
└── requirements.txt
定数
まずは、定数の定義(const.py
)をしておきます。
BTC_URL = "https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc"
PERIOD = 900
MARKET = "bitflyer"
PAIRS = "btcfxjpy"
PROJECT_ID = "mlbot-test"
DATA_BUCKET_NAME = "mlbot-test-data"
CHART_PATH = "chart.csv"
requirements.txt
必要な依存関係は下記の通りです。
バージョンはとりあえず現状(2022/8/27現在)の最新にしています。
pandas==1.4.3
requests==2.28.1
google-cloud-storage==2.5.0
データの取得
Cloud Storageに保管されているCSV形式の既存データと新しいデータの取得をします。(bitflyer_data_getter.py
)
既存データはバケットとファイルを指定して、取得します。
storage_client = gcs.Client(const.PROJECT_ID)
def get_data_from_gcs(bucket_name, filepath):
'''
GCSからデータを取得
Parameters
-----------
bucket_name : string
取得先のバケット名
filepath : string
ファイルのパス
Returns
-----------
df : Dataframe
GCSから取得したデータのDataframe
'''
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(filepath)
df = pd.read_csv(BytesIO(blob.download_as_string()), index_col=0)
return df
新しいチャートデータはCryptowatchを使って取得します。詳しくは下記を参考にしてください。

CloseTime
をJSTに変換して、timestamp
カラムにしています。また、保存したいカラムはtimestamp
、op
、hi
、lo
、cl
、volume
なので、それ以外のカラムは削除しています。
def get_recent_data():
'''
cryptowatchから直近のデータを取得
Returns
-----------
df : Dataframe
直近のデータのDataframe[op, hi, lo, cl, volume]
'''
now = str(int(time.time()))
params = {"periods": str(const.PERIOD), "before": now}
columns = ["CloseTime", "op", "hi", "lo", "cl", "volume", "QuoteVolume"]
res = requests.get(const.BTC_URL, params).json()
df = pd.DataFrame(res['result'][str(const.PERIOD)], columns=columns)
df.index = pd.to_datetime(
df['CloseTime'], unit='s', utc=True).dt.tz_convert('Asia/Tokyo')
df.index.name = 'timestamp'
df = df.drop(['CloseTime', 'QuoteVolume'], axis=1)
return df
データの結合
既存のデータと新しく取得したデータを結合させて、新しく保存するデータを作成します。(bitflyer_data_getter.py
)
重複した削除されるようにしています。
def create_new_data(df_recent):
'''
新しいデータの作成
Parameters
-----------
df_recent : Dataframe
直近のデータのDataframe
Returns
-----------
df : Dataframe
既存データのと新しいデータを合わせたDataframe
'''
df_gcs = get_data_from_gcs(const.DATA_BUCKET_NAME, const.CHART_PATH)
df = pd.concat([df_gcs, df_recent]).drop_duplicates()
return df
データの保存
Cloud StorageにデータをCSV形式で保存します。(bitflyer_data_getter.py
)
storage_client = gcs.Client(const.PROJECT_ID)
def save_data(df, bucket_name, filepath):
'''
データの保存
Parameters
-----------
df : Dataframe
保存するDataframe
bucket_name : string
保存先のバケット名
filepath : string
ファイルのパス
'''
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(filepath)
blob.upload_from_string(df.to_csv())
main
実装した関数を使って、直近データの取得をし、既存のデータとの結合、データの保存をmainで実行します。(main.py
)
from bitflyer_data_getter import *
import const
def main(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
df_recent = get_recent_data()
df = create_new_data(df_recent)
save_data(df, const.DATA_BUCKET_NAME, const.CHART_PATH)
Cloud Functionで関数の作成
Cloud Functionsで関数を作成していきます。
Pub/Subをトリガーに関数を作成します。
先ほどの実装内容を元にデプロイします。エントリポイントをmain
に変更するのを忘れないようにしてください。
今回は、ランタイムをPython3.10にしています。
Cloud Storageの準備
次にCloud Storageにバケットの作成と空のCSVファイルのアップロードをします。
まずはバケットを作成します。
特に可用性は求めていないので、単一リージョンにしています。
下記コマンドでカラムだけのCSVファイルを作成して、Cloud Storageにアップロードします。
echo "timestamp,op,hi,lo,cl,volume" > chart.csv
Cloud Schedulerでジョブの作成
最後に定期実行するためのCloud Schedulerを作成します。
Cloud Functionsで指定したPub/Subのトピックと同じトピックを選択して、Cloud Schedulerを作成します。
以上で、15分ごとにチャートを取得して、Cloud Storageに保存するツールの構築が完了しました。
毎時00分、15分、30分、45分にCloud Functionsが実行され、チャートを取得・保存してくれます。