scikit-learnを用いた機械学習パイプラインの作成

パイプラインとは何か

パイプライン処理とは、ある処理プログラムの出力が次の処理プログラムの入力となるようにした複数の処理プログラムを直列に連結したもの。

機械学習パイプラインでは、複数の変換器と予測器を直列に繋げ、一連の処理(前処理1 → 前処理2 → ‥ → 学習と予測)をまとめて実行できる様にする。

  • 変換器:特徴量の生成や選択を行う(transformor)
    例)StandardScaler(標準化)
  • 予測器(推定器):予測モデルを行う(Estimator)
    例)Logistic Regression(ロジスティック回帰)

といった具合で、複数の処理を逐次的に進めることが一般的であり、この一連の処理をまとめて実行できるようにする仕組みをパイプラインと呼ぶ。

改行
例)
「標準化(前処理1) → 主成分分析(前処理2) → サポートベクタマシン」の一連の処理を、最もシンプルな、「学習データ = 推論データ」で実行する場合、以下のようなフローの様になる。

引用|機械学習で「メリットが分かりづらい」と思われがちなパイプラインを分かりやすく解説

パイプラインを使うメリット

一連の処理をパイプラインとしてまとめておくと、複数のデータに対して同様の処理をする場合や、訓練データとテストデータのそれぞれを用いて精度検証を行う際のプログラムがシンプルに記述できる

例えば、Scikit-Learnなどのライブラリでは、パイプライン用クラスは学習メソッドと推論メソッドから構成されており、

  1. 学習用メソッドを用いて複数の前処理を実施した訓練データで学習を行う(学習メソッド|fit)
  2. 学習済みモデルの入ったパイプラインを用いて、テストデータの前処理と予測を行う(推論メソッド|predict)

パイプラインを構築しておくと、1,2の一連の処理を、通常の機械学習のメソッドの様に fitpredictの二つのコードを記述するだけで実行できる。

特にクロスバリデーションを絡めた実装の際には、前処理を含めた精度検証が出来たり、複数のモデルを横断したモデルの最適化(grid searchなど)が可能になるため、スクラッチで多大な手間をかける必要がなくなるため、パイプラインを使うメリットが非常に大きい。

引用|機械学習で「メリットが分かりづらい」と思われがちなパイプラインを分かりやすく解説

scikit-learnのpipelineモジュールとは

変換器(transformer)と推定器(estimator)を組み合わせて、機械学習パイプラインを構築するためのAPI

推定器・変換器の要件(参考)

pipelineのstepsの最後(final_estimator)か、それ以外(transform)で変わる

  • transformer: fit及びtransformメソッドを持っている、もしくはfit_transformメソッドを持っていること
  • final_estimator: fitメソッドを持っていること

pipelineモジュールに組み込めるのは、fit, transformメソッドなど持つ、sklean準拠の変換器(transformer)や推定器(Estimator)である。
そのため、独自の処理や予測手法を用いる場合には、これらを自作する必要がある。
(sklean準拠の変換器(transformer)や推定器(Estimator)の作成方法はこちらの記事参照

pipelineのメソッド一覧

decision_function
fitFit the model.
fit_predictTransform the data, and apply fit_predict with the final estimator.
fit_transformFit the model and transform with the final estimator.
get_feature_names_outGet output feature names for transformation.
get_paramsGet parameters for this estimator.
inverse_transformApply inverse_transform for each step in a reverse order.
predictTransform the data, and apply predict with the final estimator.
predict_log_probaTransform the data, and apply predict_log_proba with the final estimator.
predict_probaTransform the data, and apply predict_proba with the final estimator.
scoreTransform the data, and apply score with the final estimator.
score_samplesTransform the data, and apply score_samples with the final estimator.
set_paramsSet the parameters of this estimator.
transformTransform the data, and apply transform with the final estimator.

(引用)

(各メソッドで実行される処理についてはこちらの記事参照)

その他メモ

目的変数(y)には transformerによる前処理は適用されない模様。
例えばpipelineにstandardScalerセットしても、標準化されるのはXのみとなる。
(yの値にもtransformerによる処理などを実施したい場合にはTransformedTargetRegresorなどを使うといいとのこと|参考)

サンプルコード

基本的な使い方

scikit-learnに標準で入っている、bostonの住宅価格のデータセットを使って、回帰分析のパイプラインを作成してみる。
処理の流れは以下

  • 標準化(Scikit-learnのStandardScaler使用)
  • Lasso回帰(scikit-learnのLasso使用)

前準備

モジュールインストール、データの読み込み、訓練・テストデータへの分割

from sklearn.pipeline import Pipeline
from sklearn.linear_model import Lasso
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import pandas as pd
import numpy as np

# サンプルデータの用意
boston = load_boston()
df_X = pd.DataFrame(data=boston.data, columns=boston.feature_names)
df_y =  pd.Series(boston.target, name="MEDV")

train_X, test_X, train_y, test_y = train_test_split(df_X,df_y)

Pipelineの作成

Pipelineのstep引数に、変換器(transformer)、推定器(estimator)の名前とオブジェクトの入ったタプルのリストを渡す。
make_pipelineを使うことで、各ステップに名前を付けずにパイプラインを構築することもできる)

また、set_configの設定を変えることで、ダイアグラムでパイプラインの中身を確認することができる。

pipeline = Pipeline(steps=[
                        ('standard_scaler', StandardScaler()),
                        ('regressor', Lasso())
                        ])

# make_pipelineを使うことで、各ステップに名前を付けずにパイプラインを構築することも可能です。
# from sklearn.pipeline import make_pipeline
# pipeline = make_pipeline(scaler,lasso)

# パイプラインの中身の確認
from sklearn import set_config
set_config(display='diagram')
pipeline

前処理は全ての列に実行されるため、特定の列にのみ前処理をかけたい場合は、ColumnsTransformerを使用する。

from sklearn.compose import ColumnTransformer
### パイプライン定義

transform_features = df_X.columns[:5]
transformer = Pipeline(steps=[
    ("scaler", StandardScaler()) #特徴量を標準化
])
preprocessor = ColumnTransformer(transformers=[
    ("transform", transformer, features)
])

pipeline = Pipeline(steps=[
                        ('preprocessor', preprocessor),
                        ('regressor', Lasso())
                        ])

前処理と推論

作成したパイプラインのfitpredictやメソッドを実行することで、パイプラインに登録した前処理が行われた後、推定器(estimator)に該当するメソッドが実行される。

テストデータに対する予測は、学習済みのpipelineのpredictメソッドで実行できる(かなり楽)

# (変換器と)推定器の学習
model = pipeline.fit(train_X, train_y)

# 訓練・テストデータの予測
train_y_pred = model.predict(train_X)
test_y_pred = model.predict(test_X)

# 予測精度確認
from sklearn.metrics import r2_score
print('r2(train):',round(r2_score(train_y, train_y_pred),3))
print('r2(train):',round(r2_score(test_y, test_y_pred),3))
"""output
0.616
0.74
"""

クロスバリデーションへの組み込み

cross_val_scoreにpipelineをそのまま渡すことで、クロスバリデーション内の分割された各データの学習・予測の際にも、逐一前処理が実行される。
(この処理をスクラッチで書く場合かなり長いコードになってしまうので、pipelineが非常に便利)

from sklearn.model_selection import cross_val_score
score = cross_val_score(pipe, 
                         train_X, 
                         train_y, 
                         cv=5,
                         scoring='r2')
print(score)
"""output
array([0.69871477, 0.61571631, 0.62491997, 0.54113371, 0.55547713])
"""

各ステップの取り出し

named_stepsメソッドにてpipelineの各処理の名前をkeyとして各処理のオブジェクト?を取り出すことも可能


# standard_scalerを取り出して変換した特徴量名を調査
standard_scaler_fitted = model.named_steps['standard_scaler']
print(standard_scaler_fitted.get_feature_names_out())
"""output
array(['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD',
       'TAX', 'PTRATIO', 'B', 'LSTAT'], dtype=object)
"""

# 予測器を取り出して、回帰係数を調べる
regressor_fitted = model.named_steps['regressor']
print(regressor_fitted.coef_)
"""output
[-0.          0.         -0.          0.05812895 -0.          2.1649965
 -0.         -0.         -0.         -0.         -1.38356969  0.
 -3.63722932]
"""

pipelineの保存・再利用

一応、pipelineは、丸ごとpickleで保存することが可能。

import pickle

# 保存
with open("pipeline.pickle", mode="wb") as f:
    pickle.dump(model,f)

# 読み込み
with open("pipeline.pickle", mode="rb") as f:
    loaded_model = pickle.load(f)

# 確認(ステップと、特徴量名とその数を調べてみる)
print(loaded_model.named_steps)
print(loaded_model.feature_names_in_)
print(loaded_model.n_features_in_)
"""output
{'standard_scaler': StandardScaler(), 'regressor': Lasso()}
['CRIM' 'ZN' 'INDUS' 'CHAS' 'NOX' 'RM' 'AGE' 'DIS' 'RAD' 'TAX' 'PTRATIO'
'B' 'LSTAT']
13
"""

パイプライン処理でのグリッドサーチ

グリッドサーチ(やランダムサーチ)と組み合わせると、ハイパーパラメータに加えて、前処理と予測モデルの組み合わせ種類も含めた、中から最適な組み合わせを自動的に探索できる。
パラメータは、処理名__パラメータ名という形式で指定する。

例として、以下の組み合わせで最適化を実行してみる。

  • 前処理:StandardScaler, MinMaxScaler, RobustScaler
  • 予測モデル:PLS, Lasso回帰(とそれらのハイパーパラメータ)
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import RobustScaler

from sklearn.cross_decomposition import PLSRegression

from sklearn.model_selection import GridSearchCV

# 探索空間を定義
param_grid = [
    {
        "preprocesser": [StandardScaler(), MinMaxScaler(), RobustScaler()],   
         "regressor": [Lasso()],
        "regressor__alpha": np.logspace(-5, -1, 10),       
    },
        {
        "preprocesser": [StandardScaler(), MinMaxScaler(), RobustScaler()],   
         "regressor": [PLSRegression()],
        "regressor__n_components": [1,2,3,4,5],       
    }]

# 仮でpipelineインスタンスを作成しておく
pipeline = Pipeline(steps=[("preprocesser",StandardScaler()),
                          ("regressor",Lasso())])

grid_search = GridSearchCV(pipeline, param_grid, cv=5, verbose=1,n_jobs=-1)

# 探索
grid_search.fit(train_X,train_y)

# 最もスコアの良かった組み合わせ
print(grid_search.best_params_)
"""ouput
{'preprocesser': MinMaxScaler(), 'regressor': Lasso(alpha=0.012915496650148827), 'regressor__alpha': 0.012915496650148827}
"""

参考

参考HP

関連書籍






関連Udemy