【仮想通貨の自動売買を目指して】GCPで定期的にbitflyerのチャートを取得してデータを格納する

スポンサーリンク

はじめに

GCPで定期的にBitflyerのチャート情報を取得して、データを貯めていくツールを構築していきます。

このツールをベースに、ビットコインの取引機能を実装することで、自動取引ができるようにしたいと思います。

どんなものを構築するのか

構築するツールの特徴は、ざっくり以下のようになります。

  • GCP上に構築
  • 15分ごとにBitflyerの15分足チャートを取得
  • CSV形式でGoogle Cloud Storageに保管

構成

構成は下記のようになります。

Cloud SchedulerとPub/Subを使い、Cloud Functionsを定期実行し、Cryptowatchからチャートを取得して、Cloud Storageのデータと結合させて保存するという形になります。

Cloud Functionsの定期実行に関しては、下記を参考にしてください。

【GCP】Cloud Functionsを定期実行したい
はじめにGCPのCloud Functionsを定期実行する方法を解説していきます。Cloud Functionsの基本的な使い方として、Pythonの関数を実行する方法は下記で紹介しています。Cloud Functionsを定期実...

料金

料金については、Cloud Functinsのリソースや保存するCSVファイルの大きさにもよりますが、月数十円程度はかかるかと思います。

各種サービスの料金を確認の上、ご利用ください。

料金  |  Cloud Functions  |  Google Cloud
Cloud Functions の料金情報を確認する
料金  |  Cloud Pub/Sub  |  Google Cloud
Pub/Sub と Pub/Sub Lite の料金を確認する。
料金  |  Cloud Scheduler  |  Google Cloud
Cloud Scheduler の料金を確認する
Pricing  |  Cloud Storage  |  Google Cloud

実装

まずは、Cloud Functionsにデプロイするアプリケーションの実装をします。

コードは下記のリポジトリにあります。

GitHub - monda00/bitflyer-data-getter-sample: Get data from bitflyer on GCP Cloud Functions.
Get data from bitflyer on GCP Cloud Functions. Contribute to monda00/bitflyer-data-getter-sample development by creating an account on GitHub.

用意するファイルは下記の通りです。一つにファイルをまとめても良いですが、今後取引機能の実装などが追加されるのを考慮して、分割しています。

.
├── bitflyer_data_getter.py
├── const.py
├── main.py
└── requirements.txt

定数

まずは、定数の定義(const.py)をしておきます。

BTC_URL = "https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc"
PERIOD = 900
MARKET = "bitflyer"
PAIRS = "btcfxjpy"
PROJECT_ID = "mlbot-test"
DATA_BUCKET_NAME = "mlbot-test-data"
CHART_PATH = "chart.csv"

requirements.txt

必要な依存関係は下記の通りです。

バージョンはとりあえず現状(2022/8/27現在)の最新にしています。

pandas==1.4.3
requests==2.28.1
google-cloud-storage==2.5.0

データの取得

Cloud Storageに保管されているCSV形式の既存データと新しいデータの取得をします。(bitflyer_data_getter.py)

既存データはバケットとファイルを指定して、取得します。

storage_client = gcs.Client(const.PROJECT_ID)

def get_data_from_gcs(bucket_name, filepath):
    '''
    GCSからデータを取得

    Parameters
    -----------
    bucket_name : string
        取得先のバケット名
    filepath : string
        ファイルのパス

    Returns
    -----------
    df : Dataframe
        GCSから取得したデータのDataframe
    '''
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(filepath)
    df = pd.read_csv(BytesIO(blob.download_as_string()), index_col=0)

    return df

新しいチャートデータはCryptowatchを使って取得します。詳しくは下記を参考にしてください。

【仮想通貨の自動売買を目指して】過去チャート取得
はじめに仮想通貨の自動売買に向けて、まずはとにかくデータを集めます。今回は過去のチャートを取得し、過去データでのシミュレーションや機械学習モデルの作成などで活用できるようにしたいと思います。CryptowatchとはCryptowa...

CloseTimeをJSTに変換して、timestampカラムにしています。また、保存したいカラムはtimestampophiloclvolumeなので、それ以外のカラムは削除しています。

def get_recent_data():
    '''
    cryptowatchから直近のデータを取得

    Returns
    -----------
    df : Dataframe
        直近のデータのDataframe[op, hi, lo, cl, volume]
    '''
    now = str(int(time.time()))
    params = {"periods": str(const.PERIOD), "before": now}
    columns = ["CloseTime", "op", "hi", "lo", "cl", "volume", "QuoteVolume"]

    res = requests.get(const.BTC_URL, params).json()
    df = pd.DataFrame(res['result'][str(const.PERIOD)], columns=columns)

    df.index = pd.to_datetime(
        df['CloseTime'], unit='s', utc=True).dt.tz_convert('Asia/Tokyo')

    df.index.name = 'timestamp'
    df = df.drop(['CloseTime', 'QuoteVolume'], axis=1)

    return df

データの結合

既存のデータと新しく取得したデータを結合させて、新しく保存するデータを作成します。(bitflyer_data_getter.py)

重複した削除されるようにしています。

def create_new_data(df_recent):
    '''
    新しいデータの作成

    Parameters
    -----------
    df_recent : Dataframe
        直近のデータのDataframe

    Returns
    -----------
    df : Dataframe
        既存データのと新しいデータを合わせたDataframe
    '''
    df_gcs = get_data_from_gcs(const.DATA_BUCKET_NAME, const.CHART_PATH)
    df = pd.concat([df_gcs, df_recent]).drop_duplicates()

    return df

データの保存

Cloud StorageにデータをCSV形式で保存します。(bitflyer_data_getter.py)

storage_client = gcs.Client(const.PROJECT_ID)

def save_data(df, bucket_name, filepath):
    '''
    データの保存

    Parameters
    -----------
    df : Dataframe
        保存するDataframe
    bucket_name : string
        保存先のバケット名
    filepath : string
        ファイルのパス
    '''
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(filepath)
    blob.upload_from_string(df.to_csv())

main

実装した関数を使って、直近データの取得をし、既存のデータとの結合、データの保存をmainで実行します。(main.py)

from bitflyer_data_getter import *
import const

def main(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """

    df_recent = get_recent_data()
    df = create_new_data(df_recent)
    save_data(df, const.DATA_BUCKET_NAME, const.CHART_PATH)

Cloud Functionで関数の作成

Cloud Functionsで関数を作成していきます。

Pub/Subをトリガーに関数を作成します。

先ほどの実装内容を元にデプロイします。エントリポイントをmainに変更するのを忘れないようにしてください。

今回は、ランタイムをPython3.10にしています。

Cloud Storageの準備

次にCloud Storageにバケットの作成と空のCSVファイルのアップロードをします。

まずはバケットを作成します。

特に可用性は求めていないので、単一リージョンにしています。

下記コマンドでカラムだけのCSVファイルを作成して、Cloud Storageにアップロードします。

echo "timestamp,op,hi,lo,cl,volume" > chart.csv

Cloud Schedulerでジョブの作成

最後に定期実行するためのCloud Schedulerを作成します。

Cloud Functionsで指定したPub/Subのトピックと同じトピックを選択して、Cloud Schedulerを作成します。

以上で、15分ごとにチャートを取得して、Cloud Storageに保存するツールの構築が完了しました。

毎時00分、15分、30分、45分にCloud Functionsが実行され、チャートを取得・保存してくれます。

参考

タイトルとURLをコピーしました