编辑
2024-02-01
Kubernetes
0

目录

Postgres 数据同步一份到 BI 数据库/ElasticSearch
debezium - 把 MySql/Postgres/SQL Server/MongoDB 数据变动推送到 Kafka
debezium - 将各种类型数据库的变动统一消息格式,推送给 Kafka 里面
启动所有组件
Postgres - 设置用户权限新建示例表
pg-connect - 安装 pg-sink-connector
Postgres - 回到 postgres 里面新增一条记录
Kafka - 来到 kafka 控制台查看是否收到了消息
kafka-connect-elasticsearch - 将 Kafka 里面的消息落地到 elasticsearch
es-connect - 安装 es-sink-connector
验证和测试
测试 1 - 新增一个 todo,看看 es 里面是否新增一条记录
测试 2 - 更新一个 todo,看看 es 是否是修改,而不会新增
总结

看完本文会得到什么

  1. 了解 debezium 和 大名鼎鼎的 confluentinc
  2. 实现 Postgres 和 ElasticSearch 的数据同步
  3. 在 Kubernetes 上部署 debezium 和 confluentinc 的 connect

Postgres 数据同步一份到 BI 数据库/ElasticSearch

这种需求一般都是应对读写分离

  1. 复杂的数据分析需要交给专业第三方 BI 数据分析机构,你得把你的业务数据重新映射新的数据结构扔给 BI 数据服务商
  2. 要支持全文检索的功能,第一时间想到的是把关系型数据库里面全都同步一份到 ElasticSearch 里面去,然后根据业务需求编写搜索接口,该接口不应该直接去查询原来的关系型数据库,应该直接调用 ElasticSearch 的 SDK 或者 REST API 去实现全文检索的需求

所以我们可以理解为业务量增长以及也许需求催促下,一定会有人或者社区去写这么一个组件。

debezium - 把 MySql/Postgres/SQL Server/MongoDB 数据变动推送到 Kafka

为了实现数据同步,正常的工程角度都会想到要利用定时任务来实现,每分钟扫描一次,但是如果你是古早的单点服务,一个工厂就用一台服务器,服务器上跑着 20 年前的代码,这个定时器,还真不一定好写,说不定是几千万的数据,单单一个定时器任务很有可能跑到意外数据就挂了,你的容错,你的重试,你的回滚都得好好设计,这个太可怕。 或许你想到了,我可以定时器把数据变动以消息的方式塞到队列里面去,但是第一你这个定时任务的健康检查得做好,不能跑着挂了你不知道,然后重启之后,你得让这个 job 能有“断点续跑”的能力,这个你得踅摸一个框架来帮你实现,想想就脑壳疼。

因此哪怕是队列思想也要区分一个什么是好的思路,什么是不够好,或者说不合理的思路。

这里要拿 MongoDB 来举例,MongoDB 在很早就被社区呼吁了这个功能,因此他在 3.6 之后给了一个强大的 ChangeStream 的功能,传送门 -> https://www.mongodb.com/docs/manual/changeStreams/

另外,还有一篇教程 -> https://www.mongodb.com/basics/change-streams

ChangeStream 提供了一个 Push 的思路

数据被 Create/Update/Delete 这些操作都会产生一条类似于消息的记录,推送给订阅方

实际上,MongoDB 实现了一个内部的消息队列来实现向订阅者推送数据变动。

debezium - 将各种类型数据库的变动统一消息格式,推送给 Kafka 里面

如果你已经动手试过了 MongoDB 的 ChangeStream,接下来 debezium 会给你熟悉但是更易用的体验。

首先打开示例代码 -> https://github.com/strapi-extensions/postgres-sync-elasticsearch

$ git clone git@github.com:strapi-extensions/postgres-sync-elasticsearch.git

或者下载 docker-compose.yml 文件到本地 -> https://raw.githubusercontent.com/strapi-extensions/postgres-sync-elasticsearch/main/docker-compose.yml

yml
services: postgresql: image: docker.io/bitnami/postgresql:16 container_name: postgresql ports: - '5432:5432' volumes: - 'postgresql_data:/bitnami/postgresql' - './postgres:/opt/bitnami/postgresql/conf/conf.d' environment: - POSTGRESQL_POSTGRES_PASSWORD=admin123abc - POSTGRESQL_USERNAME=pg-connector - POSTGRESQL_PASSWORD=pg123abc - POSTGRESQL_DATABASE=demo networks: - infra kafka: image: docker.io/bitnami/kafka:3.6 container_name: kafka ports: - "9092:9092" volumes: - "kafka_data:/bitnami" environment: # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 # Listeners - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT networks: - infra kafka-console: image: docker.redpanda.com/redpandadata/console:latest container_name: kafka-console ports: - "8080:8080" environment: - KAFKA_BROKERS=kafka:9092 depends_on: - kafka networks: - infra elasticsearch: image: docker.io/bitnami/elasticsearch:8 container_name: elasticsearch ports: - '9200:9200' - '9300:9300' volumes: - 'elasticsearch_data:/bitnami/elasticsearch/data' networks: - infra pg-connect: image: quay.io/debezium/connect:latest container_name: pg-connect ports: - '8082:8083' environment: BOOTSTRAP_SERVERS: "kafka:9092" GROUP_ID: 1 CONFIG_STORAGE_TOPIC: pg-connect_configs STATUS_STORAGE_TOPIC: pg-connect_statuses OFFSET_STORAGE_TOPIC: pg-connect_offsets depends_on: - kafka - postgresql networks: - infra es-connect: build: dockerfile: ./Dockerfile context: ./confluentinc network: host image: confluentinc-es-connect:latest container_name: es-connect ports: - '8084:8083' environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:9092" CONNECT_REST_PORT: "8083" CONNECT_GROUP_ID: es-connect-group CONNECT_CONFIG_STORAGE_TOPIC: es-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: es-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: es-connect-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_PLUGIN_PATH: '/usr/share/java,/connectors,/usr/share/confluent-hub-components/' depends_on: - kafka - elasticsearch networks: - infra volumes: elasticsearch_data: driver: local postgresql_data: driver: local kafka_data: driver: local networks: infra: name: infra driver: bridge

注意,如果想启动 postgres 确保你在本地有一个文件,参考 -> https://github.com/strapi-extensions/postgres-sync-elasticsearch/tree/main/postgres

启动所有组件

sh
docker compose up -d --build

Postgres - 设置用户权限新建示例表

sh
docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 9b6a9a89fa34 quay.io/debezium/connect:latest "/docker-entrypoint.…" About an hour ago Up About an hour 8778/tcp, 9092/tcp, 0.0.0.0:8082->8083/tcp, :::8082->8083/tcp pg-connect 2f6b5f9c352b confluentinc-es-connect:latest "/etc/confluent/dock…" About an hour ago Up About an hour (unhealthy) 9092/tcp, 0.0.0.0:8084->8083/tcp, :::8084->8083/tcp es-connect ec9246f8832f docker.redpanda.com/redpandadata/console:latest "./console" About an hour ago Up About an hour 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp kafka-console 1dc12a3d8d45 bitnami/postgresql:16 "/opt/bitnami/script…" About an hour ago Up About an hour 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp postgresql 130c19dda245 bitnami/kafka:3.6 "/opt/bitnami/script…" About an hour ago Up About an hour 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka a9cb8eba8f03 bitnami/elasticsearch:8 "/opt/bitnami/script…" About an hour ago Up About an hour 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch docker exec -i -t 1dc12a3d8d45 /bin/bash

进入 Postgres 容器之后执行

sh
PGPASSWORD=admin123abc psql -U postgres demo psql (16.1) Type "help" for help. > ALTER ROLE "pg-connector" SUPERUSER CREATEDB NOCREATEROLE INHERIT LOGIN REPLICATION NOBYPASSRLS; > CREATE TABLE public.todos ( id serial4 PRIMARY KEY NOT NULL, title varchar(255) NULL, completed bool NULL, due timestamp(6) NULL );

pg-connect - 安装 pg-sink-connector

通过 debezium-connect 开放的 rest api 可以新增一个 connector,它就会开始从 postgres 里面开始扫描修改,发送消息给 kafka

sh
curl --location 'http://localhost:8082/connectors' \ --header 'Content-Type: application/json' \ --data '{ "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.dbname": "demo", "database.hostname": "postgresql", "database.password": "pg123abc", "database.port": "5432", "database.user": "pg-connector", "name": "pg-sink-connector", "table.include.list": "public.todos", "message.key.columns": "my_database.users:id", "schema.include.list": "public", "tasks.max": "1", "topic.prefix": "demo", "plugin.name": "pgoutput", "transforms":"unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "table,ts_ms" }, "name": "pg-sink-connector" }'

Postgres - 回到 postgres 里面新增一条记录

sql
INSERT INTO public.todos (title, completed, due) VALUES ('Fix 502 bug', false, '2024-02-15 16:00:00.000');

Kafka - 来到 kafka 控制台查看是否收到了消息

浏览器打开 http://localhost:8080/topics/demo.public.todos?p=-1&s=50&o=-1#messages

image

第一步已经完成,数据修改的消息流已经进入 Kafka 了,接下来我们需要完成后一半的工作,让 kafka 的数据落在 elasticsearch 里面。

kafka-connect-elasticsearch - 将 Kafka 里面的消息落地到 elasticsearch

es-connect - 安装 es-sink-connector

confluentinc-es-connect 也提供了一个 rest api 可以添加 es-sink-connector

sh
curl --location 'http://localhost:8084/connectors' \ --header 'Content-Type: application/json' \ --data '{ "name": "es-sink-connector", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "demo.public.todos", "connection.url": "http://elasticsearch:9200", "key.ignore": "false", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id" } }'

验证和测试

我们通过一个 chomre 的 elasticsearch 的插件-> https://github.com/cars10/elasticvue.

image

测试 1 - 新增一个 todo,看看 es 里面是否新增一条记录

sql
INSERT INTO public.todos (title, completed, due) VALUES ('Publish new release', false, '2024-02-15 16:00:00.000');

测试 2 - 更新一个 todo,看看 es 是否是修改,而不会新增

sql
UPDATE public.todos SET completed = false, title = 'Create bug issue edited' WHERE id = 2;

我们可以看到,id = 2 的记录的确只是修改了 doc 里面的内容,并没有新增:

image

完成了。

总结

数据源的同步一直是各种复杂系统的刚需,读写分离,读也分类,搜索、查询列表、获取单个分别又不太一样,有时候如果数据源能多样化并且有高性能系统确保数据一致性,会大大增加系统健壮性和拓展性,因此掌握这个技能还是很重要的,另外,本文中只是一个单机案例,真正上到集群+分布式还需要借助其他组件。