エンジニアの今井です。
さて、システム開発をしているときに、処理を分散させたいがデータの整合性を保ちたいという要求があると思います。
このような時、すぐに思いつくものとして複数のシステムへイベントを送るものがありますが、この手法はデータの整合性の維持が難しいという問題があります。この問題を回避するためには、DBにイベント用のテーブルを用意したり、更新履歴を別システムのトリガーにするOutboxと呼ばれる手法があります。
今回はこのOutboxを実現するためのOSSであるDebeziumを利用してみました。
Debeziumとは?
DBの変更履歴を監視し、変更があったらそれを分散処理システムであるKafkaに通知する仕組みです。メジャーなDBには対応しており、今回はSQL Serverの監視を行おうと思います。
SQL Server自体の変更履歴の追跡は以下の二つのやり方があります。
・変更データ キャプチャ (CDC)
・変更の追跡(CT)
DebeziumではSQL ServerのCDCの監視によって、Outbocパターンを実現しています。
やり方以下のチュートリアル通りにやればできます。https://github.com/debezium/debezium-examples/tree/main/tutorial#using-sql-server
動作環境
・windows10
・tuotorialフォルダ以下で実施
・事前にDockerをインストール済み
実施
1.
まずは実行に必要なファイルを上記Githubから引っ張ってきます。
register-sqlserver.json
docker-compose-sqlserver.yaml
debezium-sqlserver-init/inventory.sql
2.
次にDockerを起動します。
$ENV:DEBEZIUM_VERSION=1.7
docker-compose -f docker-compose-sqlserver.yaml up
3.
SQL Serverにテーブルとテストデータを投入します。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
このテストデータはproducts、orders、customersの三つのテーブルがあり、それぞれにいくつかのサンプル用レコードを挿入するものです。
4.
SQL ServerのConnectorを設定します。
curl.exe -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
ただし、私のやり方だとなかなかうまくいかなかったため、以下のようにregister-sqlserver.jsonを書き換えました。
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "sqlserver",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"table.whitelist": "dbo.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.server1"
}
}
このJSONではtable.whitelistによって、ユーザーのテーブル変更を監視するようにしました。
また、より詳しい情報は下記に記されています。
ちゃんと使いこなすにはここを理解しないといけないですね…
https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-example-configuration
5.
コネクタを設定します。
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning -property print.key=true --topic server1.dbo.customers
6.
環境は整ったので、DBに変更を加え、実際に変更を検知できるか確認します。
docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'
このコマンドを打った後に、以下のようにクエリを書くと変更を検知出来ます。
UPDATE customers SET first_name = 'foo' WHERE first_name = 'SALLY'
GO
さて、この状態では以下のように商品情報を書き換えても検知されません。
UPDATE products SET name = 'sushi' WHERE name = 'scooter'
商品を見れるようにするには以下を実施します。
7.
コネクタを削除
curl.exe -X DELETE http://localhost:8083/connectors/inventory-connector
商品を見るように変更
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "sqlserver",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"table.whitelist": "dbo.orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.server1"
}
}
これで商品の変更を見れるようになります
まとめ
今回、CDCを監視することで変更検知出来ました。これにより整合性を保ちながらイベント駆動ができるようになります。
一方で、Kafkaを使うのは結構重たい印象もあり、もっと簡単に行うにはCTを使う方が良いかもしれないと感じました。
コメント