BERTを使ったMLバッチ処理実サービスのアーキテクチャとMLOpsの取り組み

Tags
BERTを使ったMLバッチ処理実サービスのアーキテクチャとMLOpsの取り組み
Page content

こんにちは、Development部門に所属しているSREの佐藤と申します。

Development部門では複数プロダクト共通の基盤構築や、新技術の検証、インフラ整備などを幅広く担当しています。これまでストックマークではCI/CD基盤の構築やAWS上で構築するインフラのコード化、ニュース収集基盤のアーキテクチャの改善や運用負荷軽減から、製品利用状況のデータ分析基盤構築などに取り組んできました。

今日はAstrategyという製品でのMLOpsの取り組みについて話します。

Astrategyについて

Astrategyは国内外Webメディアを対象として情報を収集・構造化し、調査・報告業務を包括的にサポートする検索プラットフォームです。

図1: 「言葉のAI」自然言語解析を用いたオープンデータ解析ツール

複数の分析画面を提供しており、目的に応じて異なる観点で市場変化や競合動向を可視化できます。

図2: Astrategyの分析画面

人力では約50~100記事(期間:1カ月)の調査が限界ですが、Astrategyを利用することで約5,000記事を俯瞰し構造化することでこれまでにたどりつけなかった情報の調査が可能になります。

図3: Astrategyが提供する価値とは

Astrategyのシステム構成

以下がAstrategyのシステム構成です。

ユーザーがアクセスした時に動作する「オンライン処理」システムと、夜間や早朝にオンライン処理から参照するデータ生成する「バッチ処理」システムで構成されています。

図4 Astrategyのシステム構成

今日私が話すのは機械学習バッチ処理(以降MLバッチ処理)システムのMLOps取り組みについてです。

MLバッチ処理

Astrategyのバッチ処理では、弊社の独自クローラーが約3万メディアから収集した1日約30万件、合計約2000万件のニュース記事を、「業界」、「地域」などの分類、「企業名の抽出」など10種類の機械学習ジョブで推論を行い、記事にラベル付けをします。

図5: バッチ処理概要

ラベル付けされた記事データは検索サーバー (Elasticsearch)のインデックスに登録され、アプリケーションの各分析画面 (図2)から呼び出されます。

MLモデルについて

バッチ処理ではBERTを応用して作成したMLモデルを各タスク用にチューニングしたものを使っています。モデルの開発については本記事のスコープ対象外としますが、興味がある方は弊社森長が大規模ニュースコーパスで事前学習させたモデルを公開しているので、そちらを参照いただければと思います。

MLOpsとは

MLOpsの定義はGoogle社のドキュメントがわかりやすいかと思います。 以下は日本語翻訳記事からの引用です。

MLOpsは、MLシステム開発(Dev)とMLシステムオペレーション(Ops)を統合することを目的としたMLエンジニアリングの文化と実践です。MLOpsを実践するということは、統合、テスト、リリース、デプロイ、インフラストラクチャ管理を含む、MLシステム構築のすべてのステップで自動化とモニタリングを提唱することを意味します。

要はDevOpsのML版といった感じです。MLシステムは構成要素が多く、通常のソフトウェア開発に加えて特有の難しさがあり、技術的負債が蓄積しやすいことなどがHidden Technical Debt in Machine Learning Systemsで説明されています。

図6: MLシステムの構成要素 Hidden Technical Debt in Machine Learning Systemsから転載.

また、What is ML Ops? Best Practices for DevOps for ML (Cloud Next ‘18)では、複雑なMLシステム開発・運用を1人できる人(ML superhero)に任せてしまうスケールしなくなるため、アンチパターンとして紹介されています。

図7: ML superhero What is ML Ops? Best Practices for DevOps for ML (Cloud Next '18)から転載.

弊社のMLエンジニアはインフラやアプリケーションもできるフルスタック技術者が多く、Astrategyのバッチシステムは森長がモデル開発、インフラ管理、アプリケーションのデプロイすべてを初期段階では担当していて、まさにこのアンチパターンに陥っていました。

MLOpsの検討開始

昨年の11月末頃、Development部門のマネジャー兼Astrategyのバックエンド開発者の谷本からミーティングで「Astrategyのリリースが近くなりいくつかMLシステムの運用周りで課題が出てきたので、MLOpsの必要性を感じている。佐藤さんやりたいですか?」という内容でした。

MLシステム構築の経験がなかった私ですが、ストックマーク入社時からMLシステム構築に関わりたいと感じて、Stanford大のMachine Learningや弊社のKaggle masterに勧められたKaggleに登録したら次にやること ~ これだけやれば十分闘える!Titanicの先へ行く入門 10 Kernel ~などを通して、MLパイプラインの勉強をしていたことは伝えていました。自分で開発できるほどではないが、なんとなくMLパイプラインの構成要素を把握している、といった感じでした。

「是非やらせてください」と伝えると「お、いいですね。ではお願いします。」とMLOps大臣に任命されました笑。

ストックマークでは手をあげると大抵やりたいことをやらせてもらえます。

MLOpsキックオフ

1月初頭にMLとDevチームでMLOpsのキックオフミーティングを開きました。その時に、以下の課題が上がりました。

  1. 継続的インテグレーション (CI) 機構がない。 → テストで防げるような不具合が検知できないため、デプロイ後に発覚して手戻りが発生する。
  2. 継続的デプロイ (CD) 機構がない。 → デプロイ運用コストが大きく、かつ手動デプロイによるミスが発生しやすい。
  3. 監視機構がない。 → バッチ処理が無事完了かどうかを毎日手動でElasticsearchとS3に確認しないといけない。

中でも、当時は複数台のEC2やLambdaにコードやモデルを手動でデプロイしていたため、非常にデプロイ負荷が高くなっていました。経験上本番環境への手動デプロイは、どんなに優秀な技術者でもミスが出ます。ミスが出たときのビジネスへの影響も問題ですが、ミスができない緊張感で疲弊し、2回目以降は面白くない仕事(いわゆるToil)になることも問題です。ML技術、研究者の時間がToil沼にどっぷり取られていたので、これはSREとしても価値を提供できるのではないか!という話になりました。

というわけでCI/CD機構の構築デプロイの自動化に取り組む事としました。 

本番環境での構成

CI/CD機構を作るにはまず、システムの構成を把握することから始めました。

初期構成

MLOps立ち上げ時の構成が以下の図です。

  • CloudWatchの定期実行イベントをトリガーにLambdaが起動し、RDSの記事データを抽出し、S3に格納
  • S3更新をトリガーに、別のLambdaが起動し、EC2 (GPU)を起動
  • EC2にて記事データをダウンロードし、ML処理後のデータをS3とElasticsearchに格納
  • 処理後データがS3に更新されるタイミングで別のLambdaが起動し、日次の処理後データと既存の全件データを合わせて類似記事集約処理などの後、S3に格納するEC2インスタンスを起動
  • 各EC2インスタンスはジョブ完了後にシャットダウンする処理を設定

図8: 初期構成

当時は製品環境のコードに更新があるたびに以下の作業を手作業でやっていました。

  • EC2 x 2(CPU, GPUインスタンス)
    • SCPコマンドで開発環境からコードをアップロード
    • SCPコマンドでモデルのアップロード
    • SSHでEC2にログインし、Docker imageをビルド
  • Lambda x 3
    • 各Lambda用にDocker化して必要なライブラリをインストールしたzipファイルを作成
    • コンソールからLambdaを更新

ちなみに、ML実行環に必要なライブラリをDocker化するところもすべて森長がやりました。 この構成だと、10種類あるMLタスクを1つのGPUインスタンスで直列に実行するため

  • 処理時間が長くなる
  • 途中でコケた場合にどのタスクでコケたか、なぜコケたか調べるのが困難

などの問題がありました。

マイクロサービス構成

この対処として、実装されたのが以下のマイクロサービス構成です。

図7: マイクロサービス構成

MLタスクを10のマイクロサービスインスタンスに分けることで、並列処理の実行と各タスクを疎結合化できました。10のインスタンスはすべてがGPUを必要とするわけではないため、リソース要件に合わせてCPU最適化、メモリ最適化なども合わせて活用し、コストパフォーマンスの良い構成になりました。

これによりバッチ処理の実行時間は大幅に短縮され、処理に失敗した時の調査は大分楽になりました。

「機械学習エンジニアはモデルの開発だけで、本番用のコードやインフラ構築・運用をあまり得意としない。」という話を以前他社のMLOps登壇イベントで聞きましたが、弊社のMLエンジニアはインフラに詳しい方も多数います(自分ではこの構成は思いつかなかったと思います。)まさに先述のML superheroです。

図8: ML Superhero

しかし、ご覧の通り管理するインスタンスの数が増えたため、

  • EC2のプロビジョニング作業が大変
  • コード・モデルのデプロイが地獄

になってしまったのです。 EC2でDockerビルドを繰り返すと古いイメージを消し忘れディスクスペース不足で落ちる、なんてこともたまに起きていました。

MLOps第一弾

というわけで、まずは運用負荷の高い業務を自動化することを目的として、デプロイの自動化に取り組むこととしました。

実現したい状態をまとめてみました。

  • GitHubへのPUSHをトリガーにコードが指定のEC2インスタンスへデプロイされる
  • 社内GPUサーバーで開発したMLモデルがS3にアップロードされるとアプリケーションからアクセスできる場所に配置される

EC2からAWS Batch、ECSやEKSなどマネージド型のコンピューティング環境への移行も検討したのですが、改修が大掛かり、工数が大きくなりそうなため、まずはすでに実行環境ができているEC2で動かす前提でのCI/CDパイプラインを構築することにしました。

できた構成がこちらです!

図9: CI/CD機構V1

以下簡単に説明します。

  • CI機構

    • GitHubの指定ブランチにプルリクエスト (PR)を作成するとCodeBuildへWebhookイベントが届き、自動テスト用のワークフローが実行されます。
  • CD機構

    • テストとレビューが通り、PRがマージされるとデプロイ用のワークフローが実行されます。
    • デプロイワークフローでは、CodeBuildからEC2とLambdaへのデプロイ処理が実行されます。
      • EC2
        • Pythonのboto3を使い、ec2とssmクライアントを用いて以下の操作をするようにしました。
          • インスタンスの起動
          • コードのプル
          • Dockerイメージのビルド
          • インスタンスの停止
      • Lambda
    • MLモデルの更新タイミングはコードの更新とは異なるため、GitHubのWebhookイベントは使わないことにし、S3バケットにアップロードしたタイミングで、Lambda & EC2経由でEFS (Elastic File System)に更新することとしました。各バッチ用EC2インスタンスは、EFSにマウントすることで、最新のMLモデルにアクセスできます。
  • 監視機構

    • 監視機構はLambdaで作成し、期待するデータが期待する場所 (S3, Elasticsearc)に入っているかをチェックし、入っていなければSlack通知で担当者をメンションする仕組みを入れました。これで「あれ、今日データが更新されてないな」というようなレベルの問題は自動検知できるようになりました。

図10: 監視機構

苦労したこと

まず、全体の構成を理解するのに苦労しました。上記の構成図にしてみるとシンプルに感じるかもしれませんが、実装するにあたっては既存のバッチ処理のEC2やLambdaのコードを読みながらフローを把握して、不明な点は森長に確認し、という繰り返しでようやく理解できました。 S3は中間データを保持する場所とし、処理を繋ぐのにLambdaを使っていたので、全体のフローが追いにくいと感じました。

EC2の並列デプロイで、はじめは直列にDockerイメージのビルド処理を実行していたのですが、各インスタンスでのビルドに1時間弱(とくにGPUを使用するインスタンスは時間がかかる)ため、10インスタンスへのビルドを直列にやっていると半日を要してしまいます。ここはマルチスレッドにして、並列処理にすることで1時間弱に短縮できました。 キャッシュを使用しての高速化も検討したのですが、Nvidia GPUを使うのに必要なライブラリPytorch & Apexをインストールする処理は毎回走りここで時間がかかっていたことと、まれに必要な変更が反映されない場合もあり、キャッシュは使わない方針で運用することにしました。

EC2へのデプロイはAnsibleなどの構成管理ツールを使うことも検討したのですが、SSHの秘密鍵をCI/CDで使えるようにすることが必要であったり、デプロイ先のインスタンスにagentのインストールが必要などの点を考慮し、不採用にしました。また、将来的にはマネージド環境の移行を検討(後述)していた点を考慮し、あまり作り込まずに、迅速に運用負荷を減らす機構を構築することを優先しました。

効果

MLの森長の運用負荷は激減しました。 Astrategyは日々成長していて、当時バッチ処理のコードやモデルは一週間に2, 3回は更新されていたのですが、その都度10数台のサーバーに各1時間弱かかるデプロイコストが削減できました。

図11: デプロイ状況Slack通知

ML Superheroの森長から、社内Slackで感謝されました。嬉しかったです。

結果、森長は本来のML研究開発など創造的な仕事に集中することができるようになりました。 ちょうどその時期にALBERTの日本語モデルを公開しています。

ちなみに、ここまでの話を社内の全社LTで披露したところ非常に良い反響が得られました。

次の課題

もちろんこれですべててが解決したわけではありません。まだ大きく、以下の課題が残っていました。

  • EC2インスタンスの管理が必要。新しいMLタスクができた時などは専用のEC2を立ててプロビジョニングする必要がある。
  • 各EC2を起動するLambdaの作り込みが必要
  • S3更新を起点で処理が走り、記事数が多い時は複数データファイルに分割して並列処理をするようにしているが、lock制御などをシェルスクリプトで作り込む必要がある。
  • 実行結果監視用機構を作り込む必要がある
  • 全体のフローの把握が困難

などです。これらを解決できる手段はないかと考えました。

MLOps第二弾

上記課題の解決策として以下の施策を考えました。

  • コンピューティング環境は、EC2からAWS Batchに移行する
    • EC2の管理が不要になる
    • Step Functionsから直接ジョブ登録できる
  • ワークフロー制御にはStep Functionsを使う
    • ワークフローを一元管理し、ジョブの依存関係(DAG)表現できる
    • フローの途中で通知処理を加えたい場合などに、アプリケーションロジックと分離して実装できる
    • 並列、配列処理やリトライ、例外処理がフローで実装でき、アプリケーションロジックをシンプルにできる
  • インフラはterraformでコード化する
    • インフラ構成をコードとしてドキュメント化
    • 変更管理をGitでレビュー
    • インフラもCI/CDに組み込める

AWS Batch

AWS Batchはフルマネージド型のバッチ処理実行サービスで、Dockerイメージやジョブに割り当てるリソース(CPU, Memoryなど)、実行コマンドなどを設定することで、最適なコンピューティングリソースの動的プロビジョニングして処理を実行できます。

また、EFSにマウントできたり、Step Functionsから直接呼び出すことが可能だったりと、他のAWSと組み合わせて利用できるため便利です。

Step Functions

Step FunctionsはAWSのマネージドなワークフローエンジンです。 他のメジャーなワークフローエンジンはAirflow, Digdag, Argoなどがあるようですが、今回はなるべくマネージドサービスを使いたいのと、他のAWSリソースとの連携しやすさから、Step Functionsを採用しました。

Step FunctionsはState MachineをJSONで記述します。 たとえば、以下のState Machineは

  • 最初にnotify_startで通知をLambdaとSNSに送り、
  • 次の処理がdb_to_s3
  • エラーの場合はnotify_failureのタスクに分岐することを記述しています。
{
  "Comment": "AWS Step Functions State Machine that automates astrategy batch job.",
  "StartAt": "notify_start",
  "States": {
    "notify_start": {
      "Type": "Task",
      "Resource":"${notification_lambda_arn}",
      "ResultPath": null,
      "Parameters":{
        "Records": [{
          "Sns": {
            "Subject": "Starting Astrategy Batch Job!",
            "TopicArn":"${notification_sns_arn}",
            "Message": "Astrategy Batch処理を開始しました..."
          }
        }
        ]
      },
      "Next": "db_to_s3",
      "Catch": [
        {
          "ErrorEquals": [ "States.ALL" ],
          "Next": "notify_failure"
        }
      ]
    }

上記のJSONのState Machineを定義すると、以下のようなフローチャートが生成され、処理内容を俯瞰できます。

図12: Step Functionsフローチャート

State Machineで指定できるタイプは多数あり、組み合わせにより柔軟にフローを記述できます。Astrategy Batchでは以下のタイプの組み合わせでフローを記述しています。

  • Task → Lambdaやバッチなどのタスクを実行
  • Map → 配列を反復処理
  • Parallel → 並列処理

以下はMLタスクを実行する際に、異なるMLタスクを並列で実行 (Parallel)し、各MLタスクは記事データファイルの配列を動的に反復処理 (Map)するジョブをAWS Batchに登録します。 これまではMLタスクを追加するのにEC2のインスタンスや機動用のLambdaの連携設定が必要でしたが、JSONの配列に処理を追加するだけで実現できるようになりました。

図13: Map, Parallel Task

...
    "ml_map_process": {
      "Type": "Map",
      "ItemsPath": "$.articles_pkl",
      "ResultPath": null,
      "MaxConcurrency": 5,
      "Next": "concat_data_from_ml_jobs",
      "Catch": [
              {
                "ErrorEquals": [ "States.ALL" ],
                "Next": "notify_failure"
              }
            ],
      "Iterator": {
        "StartAt": "ml_process",
        "States": {
          "ml_process": {
            "Type": "Parallel",
            "ResultPath": null,
            "End": true,
            "Branches": [
              {
                "StartAt": "cpu_sentence_structure",
                "States": {
                  "cpu_sentence_structure": {
                    "Type": "Task",
                    "ResultPath": null,
                    "TimeoutSeconds": 3600,
                    "Retry": [ {
                      "ErrorEquals": [ "States.TaskFailed" ],
                      "IntervalSeconds": 10,
                      "MaxAttempts": 3,
                      "BackoffRate": 1.5
                    } ],
                    "Resource": "arn:aws:states:::batch:submitJob.sync",
                    "Parameters": {
                      "JobName": "cpu_sentence_structure",
                      "JobQueue": "arn:aws:batch:ap-northeast-1:${account_id}:job-queue/astrategy_cpu_${env}",
                      "JobDefinition": "arn:aws:batch:ap-northeast-1:${account_id}:job-definition/astrategy_cpu_${env}",
                      "ContainerOverrides": {
                        "Command": ["python", "-m", "apps.quantitative_info_extractor.quantitative_info_extractor"],
                        "Environment": [{
                          "Name": "articles_pkl",
                          "Value.$": "$"
                        }]
                      }
                    },
                    "End": true
                  }
                }
              },
...

でき上がったフローは以下のようになり、Step Functionsを見るだけで、バッチ処理の流れが追えるようになりました。

図14: Step Functions & AWS Batch構成図

実際、State Machineを実行すると、処理の流れが可視化されます。

  • 成功時

図15: 成功時

  • 失敗時

図16: 失敗時

失敗時にワークフローの中のどこで失敗したのかが一目でわかるので、トラブルシューティングが容易になりました。

CI/CD

以下のCI/CDワークフローを定義しました。 各ワークフローは関連するファイルに変更があった時のみ起動するようにしました。

  • Lambdaへserverlssフレームワークでデプロイ
  • Step FunctionsやAWS Batchなど管理用terraformを実行
  • DockerfileやPipfile変更時にDockerイメージをビルドし、ECRにPUSH
  • Unittestを実行 & EFSにコードを同期

図17: CI/CD機構V2

苦労したこと

並列処理にしたことで、共有ファイルシステムに読み書きがあり(想定外)、実行タイミングによってはエラーになることなどがありました。(ジョブ実行中のファイル読み書きは非共有ファイルシステムにするように修正)

効果

  • 全体のフローが可視化できて良い
  • リトライ処理などをアプリケーションに組み込む必要がなくなり、コードが減った
  • 処理の途中経過をSNSに通知することで、他のアプリケーションが連携しやすくなった。
  • トラブルシューティングが容易になった。

まとめ

MLOpsでカバーされる範囲は広大で複雑ですが、MLとOpsが連携してチームとしてシステムを構築、運用する文化、が一番大事なのかなと感じました。

Ops側の人間としてはMLの知識も多少必要だし、ML側もOpsがやっていることを理解する必要があるが、ストックマークのMLメンバーは積極的にインフラを理解しようとする(インフラのメンバーより詳しいかも(焦))し、MLのことで質問すると気さくに答えてくれます。

これから

まだまだMLOpsで課題したい課題は多く残っています。

など、MLのモデル開発基盤整備やKubernetesでの基盤開発にも踏み込んでいきたいです。

採用呼びかけ

ストックマークで働くのめちゃめちゃおもしろいです。興味のある方は気軽に応募してみてください!

ストックマーク株式会社の会社情報 - Wantedly

参考