setup
kubectl create namespace kafka-connect-tutorial
kubectl config set-context --current --namespace kafka-connect-tutorial # optional
kafka
helm install kafka --namespace kafka-connect-tutorial apphub/kafka --set external.enabled=true,global.storageClass=fast,persistence.size=1Gi
Kafka Connect client
cat > kafka-client-deploy.yaml <<EOF
# kafka-client-deploy.yaml
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
spec:
containers:
- name: kafka-client
image: confluentinc/cp-kafka:5.0.1
command:
- sh
- -c
- "exec tail -f /dev/null"
EOF
kubectl create -f kafka-client-deploy.yaml -n kafka-connect-tutorial
kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-offsets --create --partitions 1 --replication-factor 1
kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-configs --create --partitions 1 --replication-factor 1
kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-status --create --partitions 1 --replication-factor 1
Kafka Connect
# kafka-connect-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkaconnect-deploy
labels:
app: kafkaconnect
spec:
replicas: 1
selector:
matchLabels:
app: kafkaconnect
template:
metadata:
labels:
app: kafkaconnect
spec:
containers:
- name: kafkaconnect-container
image: debezium/connect:latest
readinessProbe:
httpGet:
path: /
port: 8083
livenessProbe:
httpGet:
path: /
port: 8083
env:
- name: BOOTSTRAP_SERVERS
value: kafka-headless.kafka:9092
- name: GROUP_ID
value: "1"
- name: OFFSET_STORAGE_TOPIC
value: connect-offsets
- name: CONFIG_STORAGE_TOPIC
value: connect-configs
- name: STATUS_STORAGE_TOPIC
value: connect-status
ports:
- containerPort: 8083
---
apiVersion: v1
kind: Service
metadata:
name: kafkaconnect-service
labels:
app: kafkaconnect-service
spec:
type: NodePort
ports:
- name: kafkaconnect
protocol: TCP
port: 8083
nodePort: 30500
selector:
app: kafkaconnect
kubectl apply -f kafka-connect-deploy.yaml -n kafka-connect-tutorial
pg
# extended.conf
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
kubectl create configmap --namespace kafka-connect-tutorial --from-file=extended.conf postgresql-config
helm install postgres --namespace kafka-connect-tutorial --set extendedConfConfigMap=postgresql-config --set service.type=NodePort --set service.nodePort=30600 --set postgresqlPassword=passw0rd,global.storageClass=fast,persistence.size=1Gi apphub/postgresql
kubectl exec --namespace kafka-connect-tutorial -it postgres-postgresql-0 -- /bin/sh
psql --user postgres
# => 导入测试数据
# 绑定
curl -X POST \
http://192.168.1.61:30500/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "containers-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres-postgresql-headless.postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mmtip-production",
"database.server.name": "postgres"
}
}'
kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list
kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-console-consumer --topic postgres.public.containers --from-beginning --bootstrap-server kafka:9092