【仮想通貨の自動売買を目指して】GCPで定期的にbitflyerのチャートを取得してデータを格納する

2022.08.27
2024.03.24
仮想通貨
bitFlyerCloud Pub/SubCloud SchedulerGCPPython

本ページはAmazonアフィリエイトのリンクを含みます。

はじめに

GCPで定期的にBitflyerのチャート情報を取得して、データを貯めていくツールを構築していきます。

このツールをベースに、ビットコインの取引機能を実装することで、自動取引ができるようにしたいと思います。

どんなものを構築するのか

構築するツールの特徴は、ざっくり以下のようになります。

  • GCP上に構築
  • 15分ごとにBitflyerの15分足チャートを取得
  • CSV形式でGoogle Cloud Storageに保管

構成

構成は下記のようになります。

Cloud SchedulerとPub/Subを使い、Cloud Functionsを定期実行し、Cryptowatchからチャートを取得して、Cloud Storageのデータと結合させて保存するという形になります。

Cloud Functionsの定期実行に関しては、下記を参考にしてください。

unknown link

料金

料金については、Cloud Functinsのリソースや保存するCSVファイルの大きさにもよりますが、月数十円程度はかかるかと思います。

各種サービスの料金を確認の上、ご利用ください。

料金  |  Cloud Run functions  |  Google Cloud

料金  |  Cloud Run functions  |  Google Cloud

Cloud Run 関数の料金情報を確認する

料金  |  Pub/Sub  |  Google Cloud

料金  |  Pub/Sub  |  Google Cloud

Pub/Sub と Pub/Sub Lite の料金を確認する。

料金  |  Cloud Scheduler  |  Google Cloud

料金  |  Cloud Scheduler  |  Google Cloud

Cloud Scheduler の料金を確認する

Pricing  |  Cloud Storage  |  Google Cloud

Pricing  |  Cloud Storage  |  Google Cloud

実装

まずは、Cloud Functionsにデプロイするアプリケーションの実装をします。

コードは下記のリポジトリにあります。

GitHub - monda00/bitflyer-data-getter-sample: Get data from bitflyer on GCP Cloud Functions.

GitHub - monda00/bitflyer-data-getter-sample: Get data from bitflyer on GCP Cloud Functions.

Get data from bitflyer on GCP Cloud Functions. Contribute to monda00/bitflyer-data-getter-sample development by creating an account on GitHub.

用意するファイルは下記の通りです。一つにファイルをまとめても良いですが、今後取引機能の実装などが追加されるのを考慮して、分割しています。

1.
2├── bitflyer_data_getter.py
3├── const.py
4├── main.py
5└── requirements.txt

定数

まずは、定数の定義(const.py)をしておきます。

1BTC_URL = "https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc"
2PERIOD = 900
3MARKET = "bitflyer"
4PAIRS = "btcfxjpy"
5PROJECT_ID = "mlbot-test"
6DATA_BUCKET_NAME = "mlbot-test-data"
7CHART_PATH = "chart.csv"

requirements.txt

必要な依存関係は下記の通りです。

バージョンはとりあえず現状(2022/8/27現在)の最新にしています。

1pandas==1.4.3
2requests==2.28.1
3google-cloud-storage==2.5.0

データの取得

Cloud Storageに保管されているCSV形式の既存データと新しいデータの取得をします。(bitflyer_data_getter.py)

既存データはバケットとファイルを指定して、取得します。

1storage_client = gcs.Client(const.PROJECT_ID)
2
3def get_data_from_gcs(bucket_name, filepath):
4    '''
5    GCSからデータを取得
6
7    Parameters
8    -----------
9    bucket_name : string
10        取得先のバケット名
11    filepath : string
12        ファイルのパス
13
14    Returns
15    -----------
16    df : Dataframe
17        GCSから取得したデータのDataframe
18    '''
19    bucket = storage_client.get_bucket(bucket_name)
20    blob = bucket.blob(filepath)
21    df = pd.read_csv(BytesIO(blob.download_as_string()), index_col=0)
22
23    return df

新しいチャートデータはCryptowatchを使って取得します。詳しくは下記を参考にしてください。

unknown link

CloseTimeをJSTに変換して、timestampカラムにしています。また、保存したいカラムはtimestampophiloclvolumeなので、それ以外のカラムは削除しています。

1def get_recent_data():
2    '''
3    cryptowatchから直近のデータを取得
4
5    Returns
6    -----------
7    df : Dataframe
8        直近のデータのDataframe[op, hi, lo, cl, volume]
9    '''
10    now = str(int(time.time()))
11    params = {"periods": str(const.PERIOD), "before": now}
12    columns = ["CloseTime", "op", "hi", "lo", "cl", "volume", "QuoteVolume"]
13
14    res = requests.get(const.BTC_URL, params).json()
15    df = pd.DataFrame(res['result'][str(const.PERIOD)], columns=columns)
16
17    df.index = pd.to_datetime(
18        df['CloseTime'], unit='s', utc=True).dt.tz_convert('Asia/Tokyo')
19
20    df.index.name = 'timestamp'
21    df = df.drop(['CloseTime', 'QuoteVolume'], axis=1)
22
23    return df

データの結合

既存のデータと新しく取得したデータを結合させて、新しく保存するデータを作成します。(bitflyer_data_getter.py)

重複した削除されるようにしています。

1def create_new_data(df_recent):
2    '''
3    新しいデータの作成
4
5    Parameters
6    -----------
7    df_recent : Dataframe
8        直近のデータのDataframe
9
10    Returns
11    -----------
12    df : Dataframe
13        既存データのと新しいデータを合わせたDataframe
14    '''
15    df_gcs = get_data_from_gcs(const.DATA_BUCKET_NAME, const.CHART_PATH)
16    df = pd.concat([df_gcs, df_recent]).drop_duplicates()
17
18    return df

データの保存

Cloud StorageにデータをCSV形式で保存します。(bitflyer_data_getter.py)

1storage_client = gcs.Client(const.PROJECT_ID)
2
3def save_data(df, bucket_name, filepath):
4    '''
5    データの保存
6
7    Parameters
8    -----------
9    df : Dataframe
10        保存するDataframe
11    bucket_name : string
12        保存先のバケット名
13    filepath : string
14        ファイルのパス
15    '''
16    bucket = storage_client.get_bucket(bucket_name)
17    blob = bucket.blob(filepath)
18    blob.upload_from_string(df.to_csv())

main

実装した関数を使って、直近データの取得をし、既存のデータとの結合、データの保存をmainで実行します。(main.py)

1from bitflyer_data_getter import *
2import const
3
4def main(event, context):
5    """Triggered from a message on a Cloud Pub/Sub topic.
6    Args:
7         event (dict): Event payload.
8         context (google.cloud.functions.Context): Metadata for the event.
9    """
10
11    df_recent = get_recent_data()
12    df = create_new_data(df_recent)
13    save_data(df, const.DATA_BUCKET_NAME, const.CHART_PATH)

Cloud Functionで関数の作成

Cloud Functionsで関数を作成していきます。

Pub/Subをトリガーに関数を作成します。

先ほどの実装内容を元にデプロイします。エントリポイントをmainに変更するのを忘れないようにしてください。

今回は、ランタイムをPython3.10にしています。

Cloud Storageの準備

次にCloud Storageにバケットの作成と空のCSVファイルのアップロードをします。

まずはバケットを作成します。

特に可用性は求めていないので、単一リージョンにしています。

下記コマンドでカラムだけのCSVファイルを作成して、Cloud Storageにアップロードします。

1echo "timestamp,op,hi,lo,cl,volume" > chart.csv

Cloud Schedulerでジョブの作成

最後に定期実行するためのCloud Schedulerを作成します。

Cloud Functionsで指定したPub/Subのトピックと同じトピックを選択して、Cloud Schedulerを作成します。

以上で、15分ごとにチャートを取得して、Cloud Storageに保存するツールの構築が完了しました。

毎時00分、15分、30分、45分にCloud Functionsが実行され、チャートを取得・保存してくれます。

参考

Support

\ この記事が役に立ったと思ったら、サポートお願いします! /

buy me a coffee
Share

Profile

author

Masa

都内のIT企業で働くエンジニア
自分が学んだことをブログでわかりやすく発信していきながらスキルアップを目指していきます!

buy me a coffee