外資就活プロダクトエンジニアリングチームの川原です。今回は大量のデータの同期処理を改善するために行なった取り組みについて紹介します。
背景
MySQLからElasticsearchへのデータ同期のために、PHPのバッチからGoのAPIサーバーへアクセスして実現する仕組みが元々存在していました。
同期対象の大量のデータについては、MySQLに保存されている特定のテーブルの全レコードになります。今回新規で同期させる対象のデータのレコードの数は約120万件程度でした。同期先はElasticsearch(OpenSearch)になります。
PHPバッチの一部例
// 対象データ取得 $items = $this->Item->find('all', [ 'recursive' => -1, 'fields' => [ 'Item.id', 'Item.content', 'Item.created', ], ]); // ElasticsearchへのUpsert処理 foreach ($items as $item) { $itemId = Hash::get($item, 'Item.id'); $itemContent = Hash::get($item, 'Item.content'); $createdAt = Hash::get($item, 'Item.created'); // 内部でGoのAPIサーバーへアクセスしupsert処理を実行している if (!$this->Elasticsearch->updateItem($itemId, $itemContent, $createdAt)) { $this->log("itemId: ${itemId} failed"); } else { $this->log("itemId: " . Hash::get($item, 'Item.id') . " updated"); } }
上記のような実装による既存の仕組みを利用しようとしたところ、次の課題が発見されました。
元の処理の課題
- 実行時間の長さ
- 元のバッチの実装の同期処理で実際のデータ量でテストを行なったところ、8時間以上かかっており以下の観点から許容できなそうな長さでした。
- セキュリティ上の懸念
- PHPのバッチからGoのAPIサーバーへアクセスする既存の仕組みでは、もともとAPIサーバーに設定されているWAFのアクセス制限の設定を一部解除する必要がある等のセキュリティ上の問題がありました。
改修の内容について
上記の課題を解決するために、バッチサーバー内部のみで処理が完結し、レコード数が多い問題に対して並行処理をすることで解決することを試みました。以下はGoの実装一部の例です。対象のテーブルの全レコードを batchSize
分毎に分割し、並行して処理するようにしています。 batchSize
は1000~5000件の間で、実際に動かしてみながら決定しています。
for batch := int64(0); batch < totalBatches; batch++ { start := batch * batchSize // バッチ単位でデータを取得 var batchRecords []model.QuestionAnswer conn.Table("items").Offset(int(start)).Limit(batchSize).Find(&batchRecords) // データの同期処理を並列実行 go func (batchRecords []Item) { defer sem.Release(1) for _, record := range batchRecords { // upsert用のデータ input := UpsertInput{ Index: "item", ID: record.ID, } if _, err := Upsert(input); err != nil { // エラーログを出力 } } }(batchRecords) }
また、セマフォを用いて同時実行数を制御しリソース不足を防ぎます。
結果
PHPの逐次実行からGoの平行処理の移行により処理時間が8時間以上かかっていたものが1時間半へ短縮されました。
また、メモリ使用量とCPU使用率の面でも大きな問題はなく、多くのデータ同期を効率的に処理できるようになりました。
まとめ
Goの並行処理はこれまであまり実装したことがなかったですが、大量のデータを扱う時に特に効果的であることをより理解できました。 データの規模や性質に応じて適切な処理方法を選定、実装できることは重要だと再確認しました。