데이터 동기화를 위한 Apache Kafka 활용
포스트
취소

데이터 동기화를 위한 Apache Kafka 활용

Apache Kafka

정의

  • 분산 스트리밍 플랫폼
  • 메시지를 토픽 단위로 지속적으로 기록하고, 여러 소비자가 독립적으로 처리할 수 있게 설계된 시스템
  • 로그 저장 + 메시지 브로커 + 스트리밍 처리

특징

  • 높은 처리량 (High Throughput)
    • 대량의 데이터를 실시간으로 처리할 수 있다.
    • 메시지를 배치로 묶어 전송하고, 디스크에 순차적으로 저장하여 I/O 성능을 최적화한다.
  • 확장성 (Scalability)
    • 필요에 따라 브로커를 추가하여 수평적으로 확장할 수 있다.
    • 토픽을 파티션으로 분할하여 데이터를 분산 저장하고 병렬 처리할 수 있다.
  • 내결함성 (Fault Tolerance)
    • 데이터를 여러 브로커에 복제하여 일부 브로커에 장애가 발생하더라도 데이터 손실 없이 서비스를 유지할 수 있다.
  • 데이터 지속성 (Durability)
    • 메시지를 디스크에 저장하여 데이터의 영속성을 보장한다.
    • 설정에 따라 데이터 보존 기간을 조절할 수 있다.
  • 실시간 스트리밍 (Real-time Streaming)
    • 데이터 생성과 동시에 즉시 처리할 수 있는 실시간 데이터 스트리밍을 지원한다.
  • 다양한 API 지원
    • Producer API, Consumer API, Streams API, Connect API 등 다양한 API를 제공하여 유연한 데이터 처리 및 통합을 지원한다.
  • Pub/Sub 모델
    • Publisher Subscriber 모델은 데이터 큐를 중간에 두고 서로 간 독립적으로 데이터를 생산하고 소비한다.

장점

  • 높은 처리량 및 낮은 지연 시간
    • 대용량 데이터 스트림을 실시간으로 처리하는 데 최적화되어 있다.
  • 확장성 및 유연성
    • 클러스터 확장을 통해 변화하는 데이터 처리 요구사항에 유연하게 대응할 수 있다.
  • 내결함성 및 데이터 안정성
    • 데이터 복제 및 분산 저장으로 데이터 손실 위험을 최소화하고 안정적인 서비스 운영을 보장한다.
  • 다양한 시스템과의 통합 용이성
    • Kafka Connect API를 통해 다양한 데이터 소스 및 싱크와 쉽게 통합될 수 있다.
  • 실시간 데이터 처리
    • 실시간 데이터 분석 및 모니터링에 유용하게 활용될 수 있다.

단점

  • 복잡한 설정 및 관리
    • 클러스터 구성, 토픽 설정, 파티션 관리 등 설정 및 관리 작업이 복잡할 수 있다.
  • 높은 리소스 요구량
    • 고성능을 유지하기 위해 상당한 시스템 리소스를 필요로 할 수 있다.
    • 시스템 리소스에는 디스크 I/O, 네트워크 대역폭 등이 해당한다.
  • ZooKeeper 의존성
    • 과거 버전에서는 Kafka 클러스터 운영을 위해 ZooKeeper를 별도로 관리해야 하는 부담이 있었다.
    • 최근에는 KRaft 모드를 통해 이러한 의존성을 줄이는 추세다.
  • 모니터링 도구 부족
    • 모니터링 및 관리 도구가 부족할 수 있다.
    • 메시지 조정이 필요한 경우 성능 저하가 발생할 수 있다.
  • Message tweaking 이슈
    • 카프카는 byte를 받고 보내기만 한다.
    • 그런데 메시지가 수정이 필요하다면 카프카의 퍼포먼스는 급격히 감소한다.

관련 용어

  • 프로듀서 (Producer)
    • 데이터를 생성하여 Kafka 토픽에 전송하는 애플리케이션
  • 컨슈머 (Consumer)
    • Kafka 토픽에서 데이터를 읽어와 처리하는 애플리케이션
  • 브로커 (Broker)
    • Kafka 서버의 노드
    • 데이터를 저장하고 관리한다.
    • 여러 브로커가 클러스터를 구성한다.
  • 토픽 (Topic)
    • 메시지를 카테고리별로 구분하는 단위
    • 프로듀서와 컨슈머가 데이터를 주고받는 채널 역할을 한다.
  • 파티션 (Partition)
    • 토픽을 분할한 단위
    • 데이터의 병렬 처리 및 확장성을 가능하게 한다.
    • 각 파티션은 순서대로 정렬된 메시지 시퀀스를 가진다.
  • 주키퍼 (ZooKeeper)
    • Kafka 클러스터의 메타데이터 관리 및 브로커 조정을 담당하는 분산 코디네이션 시스템
    • 최근에는 KRaft로 대체되는 추세다.
  • 오프셋 (Offset)
    • 파티션 내에서 메시지의 고유한 위치를 나타내는 정수 값
    • 컨슈머가 데이터를 어디까지 읽었는지 추적하는 데 사용된다.
  • 프로듀서 오프셋 (Producer Offset)
    • 프로듀서가 가장 최근에 토픽의 특정 파티션에 기록한 데이터의 오프셋
  • 컨슈머 오프셋 (Consumer Offset)
    • 컨슈머가 해당 파티션에서 마지막으로 읽은 데이터의 오프셋
  • 컨슈머 랙 (Consumer Lag)
    • 프로듀서 오프셋에서 컨슈머 오프셋을 뺀 값
    • 컨슈머 랙이 클 수록 처리해야할 데이터가 많이 쌓여있다는 것을 의미한다.

Kafka 도입 시 고려사항

  • 데이터 정합성
    • Consumer가 메시지 처리에 실패할 경우 데이터 손실 또는 중복 처리가 발생할 수 있다.
    • 멱등성을 고려한 Consumer 구현이 필요하다.
  • Consumer Lag
    • 프로듀서의 데이터 생산 속도가 컨슈머의 소비 속도보다 빠른 경우 Consumer Lag이 발생할 수 있다.
    • Consumer Lag의 발생은 서비스 지연으로 이어질 수 있으므로 주의해야 한다.
  • 키 설정
    • 파티션 키 설정을 잘못하면 특정 파티션에 트래픽이 집중되어 성능 병목이 발생할 수 있다.
    • 분산 처리를 고려하여 적절한 키를 설정해야 한다.

Apache Kafka 설치 (Windows OS 기준, 직접 설치)

다운로드 및 사전 설정

  1. 카프카 다운로드 페이지로 이동한다.
  2. 상황에 맞는 파일을 다운로드 받는다.
    • PC에 직접 설치 시에는 Binary download를 클릭하면 된다.
    • Supported releases말고 Archived releases 쪽에 있는 것을 받자.
  3. 다운로드한 압축 파일을 원하는 장소로 옮긴다.
    • 폴더명이 너무 길면 문제가 생길 수도 있다.
    • C 드라이브 같은 곳에 옮기는 것이 권장된다.
  4. 압축 해제한다.
  5. 편의성을 위해 폴더명을 kafka로 변경하자.
    • 선택사항
  6. kafka 폴더 내부의 config 폴더로 이동하자.
  7. server.properties에서 log.dirs를 수정해주자.
    • kafka 폴더 안에 kafka-logs라는 폴더를 만들자. (네이밍은 알아서)
    • log.dirs=C:/kafka/kafka-logs라고 작성하면 된다.
  8. 이번엔 같은 폴더에 있는 zookeeper.properties에서 dataDir을 수정해주자.
    • kafka 폴더 안에 zookeeper-data라는 폴더를 만들자. (네이밍은 알아서)
    • dataDir=C:/kafka/zookeeper-data라고 작성하면 된다.

Kafka 실행하기

  1. CMD 창을 연다. (만약에 powershell이 열렸다면 cmd 명령어를 입력하면 된다.)
  2. cd 명령어를 통해 kafka가 설치된 폴더로 이동한다.
    • cd /kafka
  3. .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties 실행하기
    • Zookeeper 서버를 실행하는 명령어다.
    • 별도의 CMD 창을 열어서 netstat -na | findstr "2181"를 실행했을 때 결과가 나오면 서버가 실행된 것이다.
  4. 새로운 CMD 창을 열기
  5. cd 명령어를 통해 kafka가 설치된 폴더로 이동하기
  6. .\bin\windows\kafka-server-start.bat .\config\server.properties 실행하기
    • Kafka 서버를 실행하는 명령어다.
    • 별도의 CMD 창을 열어서 netstat -na | findstr "9092"를 실행했을 때 결과가 나오면 서버가 실행된 것이다.

설치 테스트

  1. 새로운 CMD 창 열기
  2. cd 명령어를 통해 kafka가 설치된 폴더로 이동한다.
  3. .\bin\windows\kafka-topics.bat --create --topic quickstart --bootstrap-server localhost:9092 --partitions 1 실행하기
    • quickstart라는 토픽을 생성하는 명령어다.
  4. .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list 실행하기
    • 토픽 목록을 확인하는 명령어다.
  5. .\bin\windows\kafka-topics.bat --describe --topic quickstart --bootstrap-server localhost:9092
    • 토픽 정보를 확인하는 명령어다.

Kafka 종료하기

  1. 새로운 CMD 창 열기
  2. cd 명령어를 통해 kafka가 설치된 폴더로 이동한다.
  3. .\bin\windows\zookeeper-server-stop.bat .\config\zookeeper.properties 실행하기
    • ZooKeeper 서버를 종료하는 명령어다.
  4. .\bin\windows\kafka-server-stop.bat .\config\server.properties 실행하기
    • Kafka 서버를 종료하는 명령어다.

Apache Kafka 사용 - Producer/Consumer

실제로 메시지를 발송하고 수신하는 과정을 살펴보자.

  1. 아까의 과정을 통해 ZooKeeper 서버와 Kafka 서버를 실행하자.
  2. 추가로 2개의 CMD 창을 띄운다.
  3. 2개의 CMD 창 모두 cd 명령어를 통해 kafka가 설치된 폴더로 이동한다.
  4. 첫번째 CMD 창에서 quickstart 토픽에 대한 프로듀서를 생성한다.
    • .\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic quickstart를 실행한다.
  5. 두번째 CMD 창에서 quickstart 토픽에 대한 컨슈머를 생성한다.
    • .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart --from-beginning을 실행한다.
  6. 이제 첫번째 창에서 Hello를 입력해보자.
  7. 두번째 창으로 이동해보면 Hello라는 메시지가 온 것을 확인할 수 있다.

Kafka Connect

사전 준비 (Windows OS + MySQL 기준)

미리 MySQL을 설치한 다음에 mydb1이라는 DB를 만들어두자.
그 다음에 아래의 쿼리를 실행하자.

create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);

설치하기 (Windows OS + MySQL 기준)

  1. CMD 창을 연다. (만약에 powershell이 열렸다면 cmd 명령어를 입력하면 된다.)
  2. curl -O https://packages.confluent.io/archive/7.2/confluent-community-7.2.1.tar.gz 실행하기
  3. tar xvf confluent-community-7.2.1.tar.gz 실행하기
  4. 압축 해제된 폴더를 C 드라이브로 이동하기
  5. 폴더명을 kafka-connect로 바꾸기
  6. kafka-connect 설치 경로의 .\bin\windows\kafka-run-class.bat 파일에서
    rem Classpath addition for core 위쪽에 아래 코드 삽입하기
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)
  1. Kafka JDBC Connector 사이트로 이동해서 Self-Hosted에 있는 파일 다운 받기
  2. 다운받은 jdbc.zip 파일 압축 해제하기
  3. 압축 해제한 파일을 kafka-connect 폴더 아래로 이동하기
  4. jdbc 드라이버 파일들을 kafka-connect 폴더 밑의 .\share\java\kafka폴더 에 복사하기
  5. kafka-connect 폴더 밑의 .\etc\kafka\connect-distributed.properties 파일의 plugin.path 수정하기
    • plugin.path=<JDBC 커넥터 폴더>\lib
    • plugin.path=C:\kafka-connect\confluentinc-kafka-connect-jdbc-10.8.3\lib처럼 수정하면 된다.

참고로 MySQL 드라이버 파일은 없어서 별도로 build.gradle을 통해 받은 다음에,
사용자\.m2 폴더에서 해당 파일을 옮겨줘야 한다.

실행하기

  1. CMD 창을 연다. (만약에 powershell이 열렸다면 cmd 명령어를 입력하면 된다.)
  2. CMD 창에서 kafka-connect 폴더로 이동하기
  3. .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties 실행하기

Kafka JDBC Connector 확인하기

GET으로 http://localhost:8083/connector-plugins를 호출하면 된다.
JdbcSinkConnector와 JdbcSourceConnector가 나오면 성공이다.

Kafka Source Connector 생성

POST로 ` http://localhost:8083/connectors`를 아래 데이터와 함께 호출하면 된다.

{
   "name":"my-source-connect",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url":"jdbc:mysql://localhost:3306/mydb1",
      "connection.user":"root",
      "connection.password":"비밀번호",
      "mode":"incrementing",
      "incremental.column.name":"id",
      "table.whitelist":"users",
      "topic.prefix":"my_topic_",
      "tasks.max":"1"
   }
}

Kafka Source Connector 확인

GET으로 http://localhost:8083/connectors/my-source-connect/status를 호출하면 된다.
connector.state에 RUNNING이라고 나오면 성공이다.

log4j:ERROR Could not read configuration file from URL XXX

  1. 파일 탐색기에서 kafka-connect\bin 폴더로 이동
  2. connect-distributed.bat을 메모장으로 열기
  3. file:%BASE_DIR%/config/connect-log4j.properties
    file:%BASE_DIR%/etc/kafka/connect-log4j.properties로 변경 후 저장

테스트 해보기

HeidiSQL같은 GUI 툴에서 아래 쿼리를 실행해보자.

INSERT users (user_id, pwd, `name`) VALUES ('1', '2', '3')

이제 토픽 목록을 확인해보자.
그러면 my_topic_users라는 토픽이 자동으로 생겨난 것을 확인할 수 있다.

그 다음에 해당 토픽이 수신받은 메시지 목록을 확인해보면
아래와 같은 메시지를 받은 것을 확인할 수 있다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":true,
            "field":"user_id"
         },
         {
            "type":"string",
            "optional":true,
            "field":"pwd"
         },
         {
            "type":"string",
            "optional":true,
            "field":"name"
         },
         {
            "type":"int64",
            "optional":true,
            "name":"org.apache.kafka.connect.data.Timestamp",
            "version":1,
            "field":"created_at"
         }
      ],
      "optional":false,
      "name":"users"
   },
   "payload":{
      "id":1,
      "user_id":"1",
      "pwd":"2",
      "name":"3",
      "created_at":1744762992000
   }
}

Kafka Sync Connector 생성

POST로 ` http://localhost:8083/connectors`를 아래 데이터와 함께 호출하면 된다.

{
   "name":"my-sink-connect",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url":"jdbc:mysql://localhost:3306/mydb",
      "connection.user":"root",
      "connection.password":"비밀번호",
      "auto.create": "true",
      "auto.evolve": "true",
      "delete.enabled":"false",
      "tasks.max":"1",
      "topics":"my_topic_users"
   }
}

토픽의 이름이랑 같은 테이블을 생성한다.

my_topic_users 토픽에 메시지를 밀어넣는다고 실제 테이블에 데이터가 쌓이지는 않는다.
그저 my_topic_users 테이블에 쌓일 뿐이다.
하지만 users 테이블에 데이터를 추가하면 그건 my_topic_users 토픽과 my_topic_users 테이블에 쌓인다.

실제로 사용하게 된다면?

주문 서비스에서 주문을 생성한다고 가정해보자.
주문을 생성하게 되면 주문 데이터가 생성될 것이고,
또한 상품 서비스에서 상품의 개수를 주문 데이터에 명시된 개수만큼 줄여야 한다.

그러면 주문 정보 생성 => 상품 정보 변경라는 관계가 생성된다.
메시지가 전달되는 흐름은 이렇게 파악하면 된다.

이 때 메시지가 전달되는 흐름대로 생산자(Producer)와 소비자(Consumer)를 판단하면 된다.
시작점이 주문 서비스니 주문 서비스가 생산자가 된다.
종료점이 상품 서비스니 상품 서비스가 소비자가 된다.

그리고 여기서 메시지가 흐르는 통로의 흐름이 토픽(Topic)이 된다고 이해하면 된다.
즉, 토픽의 이름은 연결된 서비스끼리 메시지가 흐르는 통로의 이름이다.

여기서는 임시로 example-order-topic라는 토픽을, DB는 임베디드 H2를 사용해보자.

주문 서비스의 경우 (Producer)

build.gradle

kafka와 연동하기 위해 build.gradle에 아래와 같이 의존성을 추가하자.

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

환경설정

서비스에서 생산자에 대한 환경설정을 할 때는 아래와 같이 작성하면 된다.

/**
 * 생산자(Producer)에 대한 환경설정
 */
@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // bootstrap.servers
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // key.serializer
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value.serializer
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

우선 ProducerFactory를 통해 생산자가 kafka에 대해 연동하기 위한 정보를 작성한다.
bootstrap.servers는 kafka 서버의 IP와 포트 번호를 작성하면 된다.
key.serializer는 key를 직렬화하기 위한 클래스를 작성하면 된다.
value.serializer는 value를 직렬화하기 위한 클래스를 작성하면 된다.

그 다음에는 ProducerFactory를 인자로 넣어서 KafkaTemplate 인스턴스를 반환하면 된다.
KafkaTemplate은 Kafka에 메시지를 발송하기 위한 매개체다.
RestTemplate같은 개념이라고 생각하면 된다.

생산자 구현

이제 실제로 메시지를 보내는 기능을 구현해보자.

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 메시지 발송
     * @param topic 토픽
     * @param orderDto 주문 객체
     * @return
     */
    public OrderDto send(String topic, OrderDto orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch(JsonProcessingException e) {
            log.error("[KafkaProducer > send] {}", e.getMessage());
        }

        kafkaTemplate.send(topic, jsonInString); // 메시지 발송

        return orderDto;
    }
}

원리 자체는 간단하다.
ObjectMapper를 통해 메시지로 보낼 데이터를 문자열로 바꾸고,
KafkaTemplate의 send() 메소드를 통해 메시지를 발송하면 된다.

실제 사용

메시지를 발송해야 할 곳에서 아까 정의한 메소드를 호출하자.

kafkaProducer.send("example-order-topic", orderDto);

상품 서비스의 경우 (Consumer)

build.gradle

kafka와 연동하기 위해 build.gradle에 아래와 같이 의존성을 추가하자.

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

환경설정

서비스에서 소비자에 대한 환경설정을 할 때는 아래와 같이 작성하면 된다.

/**
 * 소비자(Consumer)에 대한 환경설정
 */
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // bootstrap.servers
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // group.id
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key.deserializer
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value.deserializer
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

우선 ConsumerFactory를 통해 소비자가 kafka에 대해 연동하기 위한 정보를 작성한다.
bootstrap.servers는 kafka 서버의 IP와 포트 번호를 작성하면 된다.
group.id는 그룹에 대한 고유한 아이디를 작성하면 된다.
key.deserializer는 key를 역직렬화하기 위한 클래스를 작성하면 된다.
value.deserializer는 value를 역직렬화하기 위한 클래스를 작성하면 된다.

그 다음에는 ConcurrentKafkaListenerContainerFactory 인스턴스를 생성한 다음에,
setConsumerFactory() 메소드를 통해서 ConsumerFactory를 담는다.
그리고 ConcurrentKafkaListenerContainerFactory 인스턴스를 반환하면 된다.

소비자 구현

이제 실제로 메시지를 수신했을 때 동작할 기능을 구현해보자.

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    @KafkaListener(topics = "example-order-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<>() {});
        } catch (JsonProcessingException e) {
            log.error("[KafkaConsumer > updateQty] {}", e.getMessage());
        }

        // 상품 번호를 통해 상품 엔티티 조회
        CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
        if (entity != null) {
            // 수량 변경
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    }
}

@KafkaListener을 적용하면 명시한 토픽에 대해서 메시지를 받으면
해당 메소드에게 처리하게 된다.

그러면 ObjectMapper를 통해 메시지를 역직렬화하고,
그 다음에 메시지에서 추출한 데이터에서
필요한 데이터를 뽑아다가 원하는 작업을 진행하면 된다.

테스트해보기

실제로 테스트를 진행해보자.

기존에 아래와 같은 상품 정보가 있다고 가정해보자.

그리고 이러한 데이터를 주문 서비스에 보내서 주문을 생성하자.

{
    "productId": "CATALOG-0002",
    "qty": 10,
    "unitPrice": 1200
}

그러면 아래와 같이 실제로 주문이 생성되고, 수량이 변경된 것을 확인할 수 있다.

이벤트 기반 비동기 시스템 설계 원칙

애플리케이션 동작 자체에 대해서라면
각 서비스 인스턴스가 메시지를 받았을 때
서비스에서 직접 데이터 작업을 하게 해도 문제는 없다.

결국 중요한 것은 특정한 이벤트가 발생했을 때
해당하는 데이터를 저장하는 것이기 때문이다.

하지만 kafka를 사용하는 목적과는 방향성이 다르다.
kafka는 이벤트 기반 비동기 시스템을 위해 사용한다.
이벤트 기반 비동기 시스템은 이벤트가 발생했을 때,
그 이벤트를 비동기적으로 처리하는 시스템이다.
그래서 원본 트랜잭션과 수신자의 트랜잭션이 분리되어 있어야 한다.

그런데 서비스 인스턴스에서 JPA 등을 통해 데이터를 직접 저장하게 되면
이는 일반적으로 동기적 트랜잭션 기반이다.
보통 @KafkaListener → service.save() → repository.save(entity)의 구조로 동작한다.

그렇다면 어떻게 해야지 “이벤트 기반 비동기 시스템”에 맞게
동작하게 할 수 있을까?

해결방법

위와 같은 문제를 해결하려면
서비스에서 데이터를 DB에 바로 저장하는 것이 아니라
Kafka Sink Connect를 통해 저장하면 된다.

데이터를 DB에 바로 저장하지 않고
토픽에 발송하게 되면
해당 토픽에 설정된 Kafka Sink Connect를 사용해
DB에 저장할 수 있다.

주문 서비스에 Kafka Sink Connect 연동하기

주문 서비스에 Kafka Sink Connect를 연동해보자.

server.port 변경

우선 서버의 포트 번호를 0번으로 바꿔서 랜덤한 번호를 부여받도록 바꾸자.

관련 프로젝트 실행

이번 테스트는 게이트웨이를 통해
2개의 인스턴스를 실행해볼 것이다.

게이트웨이 프로젝트와 디스커버리 프로젝트를 실행해주자.

Kafka Sink Connect 생성

POST로 ` http://localhost:8083/connectors`를 아래 데이터와 함께 호출하면 된다.

{
   "name":"my-sink-connect",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url":"jdbc:mysql://localhost:3306/mydb",
      "connection.user":"root",
      "connection.password":"비밀번호",
      "auto.create": "true",
      "auto.evolve": "true",
      "delete.enabled":"false",
      "tasks.max":"1",
      "topics":"orders"
   }
}

토픽명을 실제로 사용할 테이블과 맞춰주자.

테스트 진행

아까 사용했던 데이터를 게이트웨이를 통해 주문 서비스를 호출해서 주문을 생성해보자.

{
    "productId": "CATALOG-0002",
    "qty": 10,
    "unitPrice": 1200
}

호출해보면 인스턴스가 2개라 데이터가 2건 쌓일 것 같은데
실제로 쌓인 것은 1건뿐인 것을 알 수 있다.

Kafka Sync Connect 사용 시 주의사항

Kafka Sync Connect를 사용하면 데이터 저장을 비동기 이벤트로
처리할 수 있다는 장점이 있다.
하지만 장점이 있으면 단점도 있고 주의사항도 있다.

장점

  • 별도의 개발 없이 자동으로 데이터 저장 가능
    • 코딩 없이 설정 파일(yaml/properties)만으로 Kafka → DB 전송 가능
    • 운영자/데이터 엔지니어 중심의 구성 가능
  • 확장성과 유연성
    • Kafka의 분산 처리 특성과 맞물려 Sink Connector도 수평 확장 가능
    • 토픽 파티션 수에 따라 처리량 확장 가능 (scale out)
  • 여러 타겟 지원
    • 다양한 DB, 데이터 웨어하우스, NoSQL, 파일 시스템(S3 등)에 저장 가능
    • 대표적인 커넥터
      • JDBC Sink Connector
      • Elasticsearch Sink
      • MongoDB Sink
      • BigQuery Sink
  • Schema Registry와 연동하면 스키마 기반으로 안정된 적재 가능
    • Avro/JSON Schema 기반의 메시지를 안정적으로 테이블 컬럼과 매핑
  • 운영/모니터링 도구와의 통합 용이
    • Confluent Control Center, REST API 등을 통해 상태/오류 모니터링 가능

단점

  • 복잡한 비즈니스 로직 처리 불가
    • 단순한 “받은 데이터를 저장” 정도만 가능하다.
    • 조건 분기, 연관 데이터 조회, 트랜잭션 처리 등은 불가능
  • 에러 처리 로직이 제한적
    • 재시도는 가능하지만, 사용자 정의 핸들링이 어렵다
    • 예시 : DB 커넥션 오류, 스키마 mismatch, 중복 키 충돌 등 발생 시
    • 잘못 처리된 메시지를 어디로 보낼지(Dead Letter Queue 설정 등) 따로 설정해줘야 한다.
  • 운영 난이도
    • connector가 별도 인스턴스로 동작한다.
    • 설정, 배포, 장애 감지, scaling 등에서 운영 노하우가 필요하다.
      • 특히 분산 환경에서의 commit, offset 관리에 이슈가 많이 발생한다.
  • 스키마 변화 대응이 어려움
    • 테이블 구조가 변경되면 동기화가 깨지거나 오류 발생
    • 스키마 레지스트리를 쓰더라도 완벽하게 대응되진 않음

주의사항

  • Offset과 정확성(Exactly-once)
    • Sink Connector는 offset 기반으로 Kafka 메시지를 읽음
    • DB 커밋 실패 시 메시지를 중복 처리할 수 있다.
    • Exactly-once 보장하려면 트랜잭션 또는 idempotent 처리 필요
  • Primary Key & Upsert 처리
    • 테이블에 PK가 있어야 INSERT/UPDATE 구분 가능
    • 없으면 중복 insert가 발생할 수 있다.
  • Dead Letter Queue 설정
    • 오류난 메시지를 따로 저장하도록 설정하지 않으면 유실될 가능성이 있다.
  • 데이터 유실/중복 가능성 고려
    • 네트워크 이슈, 커넥터 다운 등으로 메시지가 누락될 가능성이 있다.
    • 중요한 데이터일수록 모니터링 + fallback 전략이 필요하다.
  • 스키마와 테이블 구조 일치 확인
    • Kafka 메시지 필드와 DB 컬럼 불일치 시 오류가 발생한다.
    • 자동 생성 옵션(schema evolution 등)을 쓰면 의도치 않은 테이블 구조 생길 수 있다.

Kafka Sink Connector를 써도 괜찮은 경우

  • 단순한 데이터 동기화가 목적일 때
    • 메시지를 가공하지 않고, DB에 그대로 저장하는 파이프라인으로 쓸 때
    • 예시 : Kafka에 적재된 이벤트 로그를 그대로 DB에 적재만 하면 됨
  • 비즈니스 로직이 거의 필요 없는 경우
    • 저장할 때 추가 연산, 조건 판단, 트랜잭션 처리 등이 필요 없다면
    • 예시 : Kafka로 전송된 로그를 바로 DB에 기록
  • Schema Registry와 함께 구조화된 이벤트를 사용 중일 때 Avro/JSON Schema 기반의 메시지를 Kafka에 넣고
    Sink Connector가 이를 기반으로 테이블에 매핑할 수 있는 경우
  • 대량의 데이터를 빠르게 밀어넣는 ETL/ELT 파이프라인이 필요할 때
    • 실시간 데이터 적재 또는 배치 성향의 시스템

Sink Connector를 쓰면 안 되는 경우

  • 비즈니스 로직 처리가 필요한 경우
    • 메시지를 저장하기 전의 검증, 조건 분기, 연관 데이터 조회, 트랜잭션 처리 등
    • 예시
      • 주문 메시지를 받은 후 사용자의 포인트 차감
      • 이벤트 저장 전에 유저 상태 확인 등
        • 이 경우는 애플리케이션 레벨에서 직접 처리해야 한다.
        • @KafkaListener + JPA/Service 로직
  • 멀티 테이블에 걸친 복잡한 저장/조인 로직이 필요한 경우
    • Sink Connector는 보통 단일 테이블 매핑에 적합하다.
    • 조인이 필요하거나 다른 테이블까지 처리해야 한다면 적합하지 않다.
  • 정합성과 트랜잭션이 중요한 경우
    • Kafka 메시지 처리와 DB 저장이 트랜잭션 경계 내에서 관리돼야 하는 경우
    • Sink Connector는 DB 트랜잭션을 정교하게 다루는 데 한계가 있다.
  • 데이터 전처리/포맷 변환이 필요한 경우
    • 예시 : 날짜 포맷 변경, 특정 필드 암호화, enum 값 변환 등

출처

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.

마이크로서비스 간 통신

장애 처리와 마이크로서비스 분산 추적