【仮想通貨の自動売買を目指して】GCPで定期的にbitflyerのチャートを取得してデータを格納する
はじめに
GCPで定期的にBitflyerのチャート情報を取得して、データを貯めていくツールを構築していきます。
このツールをベースに、ビットコインの取引機能を実装することで、自動取引ができるようにしたいと思います。
どんなものを構築するのか
構築するツールの特徴は、ざっくり以下のようになります。
- GCP上に構築
- 15分ごとにBitflyerの15分足チャートを取得
- CSV形式でGoogle Cloud Storageに保管
構成
構成は下記のようになります。
Cloud SchedulerとPub/Subを使い、Cloud Functionsを定期実行し、Cryptowatchからチャートを取得して、Cloud Storageのデータと結合させて保存するという形になります。
Cloud Functionsの定期実行に関しては、下記を参考にしてください。
unknown link料金
料金については、Cloud Functinsのリソースや保存するCSVファイルの大きさにもよりますが、月数十円程度はかかるかと思います。
各種サービスの料金を確認の上、ご利用ください。
料金 | Cloud Run functions | Google Cloud
Cloud Run 関数の料金情報を確認する
料金 | Pub/Sub | Google Cloud
Pub/Sub と Pub/Sub Lite の料金を確認する。
料金 | Cloud Scheduler | Google Cloud
Cloud Scheduler の料金を確認する
Pricing | Cloud Storage | Google Cloud
実装
まずは、Cloud Functionsにデプロイするアプリケーションの実装をします。
コードは下記のリポジトリにあります。
GitHub - monda00/bitflyer-data-getter-sample: Get data from bitflyer on GCP Cloud Functions.
Get data from bitflyer on GCP Cloud Functions. Contribute to monda00/bitflyer-data-getter-sample development by creating an account on GitHub.
用意するファイルは下記の通りです。一つにファイルをまとめても良いですが、今後取引機能の実装などが追加されるのを考慮して、分割しています。
1.
2├── bitflyer_data_getter.py
3├── const.py
4├── main.py
5└── requirements.txt
定数
まずは、定数の定義(const.py
)をしておきます。
1BTC_URL = "https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc"
2PERIOD = 900
3MARKET = "bitflyer"
4PAIRS = "btcfxjpy"
5PROJECT_ID = "mlbot-test"
6DATA_BUCKET_NAME = "mlbot-test-data"
7CHART_PATH = "chart.csv"
requirements.txt
必要な依存関係は下記の通りです。
バージョンはとりあえず現状(2022/8/27現在)の最新にしています。
1pandas==1.4.3
2requests==2.28.1
3google-cloud-storage==2.5.0
データの取得
Cloud Storageに保管されているCSV形式の既存データと新しいデータの取得をします。(bitflyer_data_getter.py
)
既存データはバケットとファイルを指定して、取得します。
1storage_client = gcs.Client(const.PROJECT_ID)
2
3def get_data_from_gcs(bucket_name, filepath):
4 '''
5 GCSからデータを取得
6
7 Parameters
8 -----------
9 bucket_name : string
10 取得先のバケット名
11 filepath : string
12 ファイルのパス
13
14 Returns
15 -----------
16 df : Dataframe
17 GCSから取得したデータのDataframe
18 '''
19 bucket = storage_client.get_bucket(bucket_name)
20 blob = bucket.blob(filepath)
21 df = pd.read_csv(BytesIO(blob.download_as_string()), index_col=0)
22
23 return df
新しいチャートデータはCryptowatchを使って取得します。詳しくは下記を参考にしてください。
unknown linkCloseTime
をJSTに変換して、timestamp
カラムにしています。また、保存したいカラムはtimestamp
、op
、hi
、lo
、cl
、volume
なので、それ以外のカラムは削除しています。
1def get_recent_data():
2 '''
3 cryptowatchから直近のデータを取得
4
5 Returns
6 -----------
7 df : Dataframe
8 直近のデータのDataframe[op, hi, lo, cl, volume]
9 '''
10 now = str(int(time.time()))
11 params = {"periods": str(const.PERIOD), "before": now}
12 columns = ["CloseTime", "op", "hi", "lo", "cl", "volume", "QuoteVolume"]
13
14 res = requests.get(const.BTC_URL, params).json()
15 df = pd.DataFrame(res['result'][str(const.PERIOD)], columns=columns)
16
17 df.index = pd.to_datetime(
18 df['CloseTime'], unit='s', utc=True).dt.tz_convert('Asia/Tokyo')
19
20 df.index.name = 'timestamp'
21 df = df.drop(['CloseTime', 'QuoteVolume'], axis=1)
22
23 return df
データの結合
既存のデータと新しく取得したデータを結合させて、新しく保存するデータを作成します。(bitflyer_data_getter.py
)
重複した削除されるようにしています。
1def create_new_data(df_recent):
2 '''
3 新しいデータの作成
4
5 Parameters
6 -----------
7 df_recent : Dataframe
8 直近のデータのDataframe
9
10 Returns
11 -----------
12 df : Dataframe
13 既存データのと新しいデータを合わせたDataframe
14 '''
15 df_gcs = get_data_from_gcs(const.DATA_BUCKET_NAME, const.CHART_PATH)
16 df = pd.concat([df_gcs, df_recent]).drop_duplicates()
17
18 return df
データの保存
Cloud StorageにデータをCSV形式で保存します。(bitflyer_data_getter.py
)
1storage_client = gcs.Client(const.PROJECT_ID)
2
3def save_data(df, bucket_name, filepath):
4 '''
5 データの保存
6
7 Parameters
8 -----------
9 df : Dataframe
10 保存するDataframe
11 bucket_name : string
12 保存先のバケット名
13 filepath : string
14 ファイルのパス
15 '''
16 bucket = storage_client.get_bucket(bucket_name)
17 blob = bucket.blob(filepath)
18 blob.upload_from_string(df.to_csv())
main
実装した関数を使って、直近データの取得をし、既存のデータとの結合、データの保存をmainで実行します。(main.py
)
1from bitflyer_data_getter import *
2import const
3
4def main(event, context):
5 """Triggered from a message on a Cloud Pub/Sub topic.
6 Args:
7 event (dict): Event payload.
8 context (google.cloud.functions.Context): Metadata for the event.
9 """
10
11 df_recent = get_recent_data()
12 df = create_new_data(df_recent)
13 save_data(df, const.DATA_BUCKET_NAME, const.CHART_PATH)
Cloud Functionで関数の作成
Cloud Functionsで関数を作成していきます。
Pub/Subをトリガーに関数を作成します。
先ほどの実装内容を元にデプロイします。エントリポイントをmain
に変更するのを忘れないようにしてください。
今回は、ランタイムをPython3.10にしています。
Cloud Storageの準備
次にCloud Storageにバケットの作成と空のCSVファイルのアップロードをします。
まずはバケットを作成します。
特に可用性は求めていないので、単一リージョンにしています。
下記コマンドでカラムだけのCSVファイルを作成して、Cloud Storageにアップロードします。
1echo "timestamp,op,hi,lo,cl,volume" > chart.csv
Cloud Schedulerでジョブの作成
最後に定期実行するためのCloud Schedulerを作成します。
Cloud Functionsで指定したPub/Subのトピックと同じトピックを選択して、Cloud Schedulerを作成します。
以上で、15分ごとにチャートを取得して、Cloud Storageに保存するツールの構築が完了しました。
毎時00分、15分、30分、45分にCloud Functionsが実行され、チャートを取得・保存してくれます。