データ パイプライン  

データ パイプラインとは、一連の連続した処理ステップによってデータを処理するアプリケーションで、システム間のデータ転送、ETL と呼ばれる抽出 (Extract)、変換 (Transform)、読み込み (Load) 処理、データ拡充やリアルタイムのデータ分析処理等に適用されるコンセプトです。

データパイプラインにはバッチ処理とストリーミング処理があります。バッチ処理の例としては、一定の期間(1 日、1 ヶ月など)にわたって収集されたデータの集計処理などがあります。ストリーミング処理では、リアルタイムにアプリケーションから送信されるイベントデータの処理や、IoT デバイス等から大量に送信される時系列データの処理、他には OLTP データベースの更新データの反映などの例があげられます。

Google Cloud は、データ パイプラインを構築するマネージド サービスとして、Cloud DataflowCloud Data Fusion を提供しています。Cloud Dataflowは、OSS の Apache Beam SDK を利用して開発したパイプラインを実行するフル マネージドのデータ処理サービスです。Cloud Data Fusion は、OSS の CDAP のマネージド サービスで、組み込みのコネクタや GUI を用いてデータ パイプラインを構築することができます。  

エンタープライズ DWH として BigQuery を利用する際、データソースとなる基幹システム等のデータベースから未加工のデータを収集、蓄積することになります。本稿では、データソースからのデータ収集方法として、Dataflow を用いた ETL と、Cloud Data Fusion Replicator を利用した CDC によるデータ収集を紹介します

バッチによる RDBMS から BigQuery へのデータ連携 (Cloud Dataflow template)

解決する課題・使い所

BigQuery は、高度化するデータ分析環境への要求に応えられる、スケーラブルで柔軟性があり、コスト効率に優れたデータ プラットフォームです。セキュリティと耐久性、スケーラビリティのあるデータ ウェアハウスに対して、複数のデータソースからのデータを集めることで、分析環境の運用、管理にコストをかけずにデータを活用しビジネスの知見を得ることができます。

そういったデータソースの中には、OLTP 用途で利用されるリレーショナル データベースがあります。企業における業務トランザクション データやマスターデータ、e コマースサイトや各種アプリケーションのデータは、多くの場合 OLTP データベース によって管理されています。分析の第一歩として、これらのデータソースから BigQuery へ収集することが必要になります。

Cloud Dataflow は、Google Cloud のフルマネージドなデータ処理サービスです。Apache Beam SDK の実行環境として、ストリーミング処理とバッチ処理を統合して提供しています。Apache Beam SDK は、Google Cloud 以外にも様々なデータソースとデータシンク (Kafka、S3 等) に対応したコネクタが用意されており、バッチ、ストリーミング双方の処理に対応しています。また、JDBC ドライバ経由でリレーショナル データベースをインプットとしたデータ処理パイプラインを作成することが可能です。Beam SDK を利用し、Java や Python 等の言語で様々なデータ加工や処理を記述し、フルマネージドなパイプライン実行環境でスケーラブルに実行することができます。

Cloud Dataflow には、Dataflow テンプレートという、あらかじめテンプレート化されたデータ処理パイプラインを実行する機能があります。ユーザーは Cloud Console、 gcloud コマンドライン ツール、または REST API を使用して、Dataflow サービスでパイプラインを簡単に実行することができます。Dataflow テンプレートはユーザー自身が作成することも可能ですが、Google Cloud はオープンソースの Dataflow テンプレートを提供しており、Cloud Storage (GCS) から BigQuery への書き込みや Pub/Sub から GCS への書き込みなどのバッチ、ストリーミング双方に対応したテンプレートをいくつも用意しています。

このデザイン パターンでは、OLTP データベースから Java Database Connectivity (JDBC) to BigQuery テンプレートを用いて、バッチ処理にてデータを収集する方法を説明します。


アーキテクチャ 

Dataflow テンプレートを用いてバッチデータ パイプラインを実行すると、Dataflow サービスはパイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。オンプレミス環境のデータベース サーバーへアクセスする場合には、Dataflow が実行されているサブネットからアクセス可能である必要性があります。

Dataflow テンプレートを用いてバッチデータ パイプラインを実行すると、Dataflow サービスはパイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。オンプレミス環境のデータベース サーバーへアクセスする場合には、Dataflow が実行されているサブネットからアクセス可能である必要性があります

Cloud Console、 gcloud コマンドライン ツール、REST API から必要なパラメーターを指定して実行します。以下が Cloud Console からの実行方法です。

Dataflow のメニューから、「テンプレートからジョブを作成」を選択すると、テンプレート選択画面が表示されます。

Jdbc to BigQuery を選択すると、パラメーター入力画面が表示されます。

JDBC 接続設定やソーステーブルに発行するクエリ、ターゲットの BigQuery の指定等を入力します。前述したネットワーク、サブネットワークの指定は「オプション パラメータを表示」を展開して設定します。 「ジョブを実行」をクリックすると、ジョブが作成され、実行が開始されます


利点

Dataflow テンプレートを利用することにより、パイプライン実行環境の構築、運用の手間もなく、また複雑なコードを記述せずにリレーショナル データベースから BigQuery へデータを取り込むパイプラインを作成することができます。


関係するデザインパターン 


参照文献

Cloud Data Fusion Replicator による リレーショナル データベース から BigQuery への変更データ キャプチャ(CDC)データ連携

解決する課題・使い所

リレーショナル データベース に蓄えられたデータを用いて Google Cloud でデータを分析する場合、BigQuery にデータを取り込む方法として、「エクスポートしたデータを Cloud Storage にアップロードし、そこから BigQuery へロードする」「ETL ツールを利用して BigQuery へデータをロードする」など、複数の方法があります。


一方、より迅速な意思決定のために、リアルタイムに近い形でデータソースに蓄えられたデータを、データ ウェアハウス(DWH)に取り込みたいケースもあります。例えば「ソース データベースにあるマスターデータを、ほぼリアルタイムに BigQuery にロードして、レポートやダッシュボードを更新したい」というケースや、「最新のトランザクション データを用いて分析したい」といったケースです。このような要件に対しては、CDC を用いて、ソース データベースへの変更点を継続的に BigQuery へ反映させることで対応できます。また、ソース データベース側の処理に、負荷をかけずにデータを移動させることで、BigQuery 側で大量データ処理を実行させることも可能です

アーキテクチャ

Cloud Data Fusion は、バージョン 6.3.0 以上 で、MySQL(MySQL Binary Log を利用)、および SQL Server(SQL Server CDC を利用)とのレプリケーションに対応しています。Google Cloud 内のデータ ソースに加えて、ネットワーク要件を満たすことで、オンプレミス データセンターの RDBMS から、BigQuery に対して CDC を構成することが可能です

レプリケーションに必要なネットワーク接続と、セキュリティ メカニズムは、こちらのドキュメントをご参照ください。 

また、Cloud Data Fusion で、テナント プロジェクトのサービスやユーザー プロジェクト内の Dataproc などのサービスからデータ ソースに接続する方法については、こちらのドキュメントをご参照ください


Cloud Data Fusion レプリケーション機能の有効化

Cloud Data Fusion で CDC を利用するためには、Replication アクセラレータを有効にする必要があります。以下の図は、インスタンス作成時にアクセラレータを追加する例です

また、既存のインスタンスでレプリケーションを有効にするためには、こちらの手順を参考にします


ソース データベース システムの設定

MySQL をソースとするレプリケーションを構築する際には、こちらのガイドに従い MySQL の設定を行います。 

SQL Server をソースとする場合には、Microsoft 社のガイドに従い Change Data Capture を有効にします


JDBC ドライバのアップロード

Cloud Data Fusion の web UI でレプリケーション ジョブを構成するために、取得元データベースに対応する JDBC ドライバを取得し、Cloud Data Fusion UI でアップロードします


パイプラインの作成

Cloud Data Fusion UI で、メニューから Replication を選択します。Replication jobs 画面で、「Create a replication job」を選択し、レプリケーション パイプラインを作成します。Data Fusion のテナントプロジェクトからソース データベースに対して、ネットワーク経由でログインできるよう構成する必要があります。

ユーザー プロジェクトに構築した Compute Engine インスタンス上にインストールされている MySQL サーバーから、テナント プロジェクトである Cloud Data Fusion UI のパブリック IP を用いて接続し、BigQuery に複製するレプリケーション ジョブを構成するケースを例に説明します。なお、オンプレミス環境のデータ ソースに対してレプリケーションを作成する場合は、Cloud Data Fusion のネットワーク構成に関するドキュメントを参考にして Cloud Data Fusion インスタンスを構成します

[Configure source] ページでは、データ ソースへの接続方法、データベース名などを設定します。

[Advanced] セクションにある、Replicate Existing Data タブを切り替えることで、レプリケーション開始時点で、既存データの初期転送を実施するかどうかを選択します。

[Select tables] ページでは、ソース データベースから、レプリケーション対象のテーブル、列を選択し、Insert、Update、Delete のイベントのうち、対象とするイベントを選択します

レプリケーション対象テーブルの [Columns to replicate] 列を選択することで、レプリケーション対象列を選択することができます

[Configure target] ページでは、レプリケーション対象となる BigQuery のプロジェクト名、データセット名、テーブル名、ロードインターバルなどを設定します。BigQuery へのレプリケーションでは、ステージング テーブルに、ソーステーブルから取得した変更イベントが蓄積され、設定したインターバルごとに、merge ステートメントによって変更が反映されます。

[Configure advanced properties] ページでタスク数の設定を行います。1 時間あたりの処理データ量によってタスク数を設定することもできます。

[Review assessment] ページでいずれかのテーブルの横にある [View mappings] をクリックすると、スキーマの問題や欠損している機能、接続性に関する複製中に発生する可能性のある問題の評価を取得できます。問題が発生した場合は、処理を進める前に問題を解決する必要があります。

[View Summary] ページで DEPLOY REPLICATION JOB ボタンを押すと、レプリケーション パイプラインがデプロイされます。

パイプラインの開始

パイプライン ページで Start ボタンを押すと、パイプラインが開始されます。ダッシュボード上では,、Status が Provisioning、 Starting、 Running と遷移し、レプリケーション ジョブが実行されている様子を確認できます。この間、処理用の Dataproc クラスタがユーザー プロジェクトに作成され、レプリケーション ジョブが実行されます。Stop ボタンを押すとレプリケーション ジョブは終了し、Dataproc クラスタも削除されます。

パイプラインのモニタリング

パイプラインの実行ページでは、レプリケートされたデータ量やイベント数、スループット、レイテンシーや挿入、更新、削除された行数などのメトリクスを確認することができます


アーキテクチャ

Cloud Data Fusion によるレプリケーションには、以下の利点があります


注意事項 

レプリケーションを実行すると、BigQuery 内にステージング テーブルが作成され、変更差分が書き込まれます。その後、ステージング テーブルをターゲット テーブルにマージする処理が実行され、merge 文が発行されます。この際、BigQuery の処理料金が発生するため、公式ドキュメントでは BigQuery の定額料金を利用することを推奨しています。


参照文献

アプリケーションおよびウェブデータを データ ウェアハウス に収集するパターン( Firebase 向け Google アナリティクス )

解決する課題・使い所

スマホアプリのやウェブサイト上でのユーザの行動を分析、モニタリングする場合、一般的には Firebase 向け Google アナリティクス や Google アナリティクスを使用しますが、一方で以下のよう複雑な要件を満たすためにはそれらの機能のみを使用して実現することが難しい場合があります。

そのような場合、Google Analytics for Firebase やGoogle アナリティクスの生データを BigQuery にエクスポートして、

ことにより、要件を満たすことができます。


アーキテクチャ


利点

各種のデータをノンコーディングで BigQuery にエクスポートできます。


注意事項  


このパターンで作成された事例


参照文献