080521 SpringCloud_MSA 공부
kafka Source Connect추가
localhost:8083/connectors (Postman으로 Post로 전송 아래내용을 JSON형식으로)
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3307/mydb",
"connection.user":"root",
"connection.password":"${password}",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
Kafka Connect 목록 확인(Get)
localhost:8083/connectors
Kafka Connect Topic 상태 확인 (Get)
localhost:8083/connectors/${topic_name}/status
DB에 데이터 추가하면 Topic에 메시지가 들어오는데
Schema, paload 이런 정보들이 JSON Form형식으로 들어있음
kafka Sink Connect추가
localhost:8083/connectors (Postman으로 Post로 전송 아래내용을 JSON형식으로)
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3307/mydb",
"connection.user":"root",
"connection.password":"123456",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
connect를 추가하는 config 내용에 대해서는 추가적으로 학습해야 할 것 같다.
Source System(DB)에서 kafka connect source를 거치지않고
kafka producer에서 메시지를 만들어 Topic에 보내면 kafka connect sink쪽에 DB만 업데이트됨
기존 작업물에 Kafka 적용
Catalog sevice에 kafka 적용하기
Pom.xml에 kafka 관련 디펜던시 추가하고
Kafka Consumer class 를 만들고
ConsumerFactory에서 Configuration 함
ConsumerFactory를 ConsumerkafkaListennerContainerFactory에 등록 후 Return (이름 정말길다)
kafka 메시지를 받는 클래스를 생성한뒤 @KafkaListener(topics = {topic_name}) 어노테이션 추가후
메시지를 받아 매핑시켜서 메시지 안의 내용을 사용.. Mapping은 ObjectMapper를 사용