개발 일지/Kafka

[Kafka] 프로듀서가 정확히 한 번만 전송하는 방식

배발자 2023. 10. 24.
반응형

개요

이전 포스팅을 보면 카프카에서 멱등성 옵션을 이용해 중복 없는 전송을 할 수 있다고 언급하였다. 다만, 여기서 유의해야하는 점은 중복 없는 전송 방식이 정확히 한 번만 전송한다는 의미가 아니다. 두 번을 전송하더라도 중복된 데이터를 삽입하지 않는 로직을 의미한 것이며, 정확히 한 번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미한다. 즉, 이번 포스팅에서는 정확히 한 번만 전송하는 로직은 어떻게 흘러가는지 알아보려고 한다. 
 

이유

전체적인 프로세스를 관리하기 위해 카프카에서는 정확히 한 번 처리하는 별도의 프로세스가 존재하는데 이를 트랙잭션 API라고 부른다. 우리는 이 과정을 자세히 살펴보려고 하는데, 전송을 두 번 하더라도 중복 데이터를 처리하면 되는 것이 아닐까라는 의문이 생기기도 한다. 왜 정확히 한 번만 전송하는 프로세스가 필요로 하는 걸까? 
 
 

 why?

 

 

[메시지를 정확히 한 번 처리해야 하는 경우의 예로 은행 시스템을 가정해보자]

개발자 배씨가 자신의 계좌로부터 돈을 인출해 B의 계좌로 돈을 송금했다. 해당 이벤트는 배씨의 계좌 출금 이벤트, B계좌로의 입금 이벤트로 크게 나뉠수 있다. 배씨 계좌에서 출금 이벤트가 중복되어 두 차례 발생하거나 배씨의 출금 이벤트는 유실되고 B계좌에 입금 이벤트만 처리된다면 은행은 곤란한 상황이 될 것이다. 뭐, 출금 이벤트가 유실되어 출금 계좌의 업데이트가 없다면 배씨는 좋아할거지만.

결국, 은행 시스템처럼 메시지 중복이나 손실이 발생하지 않도록 정확히 한 번 처리하는 로직은 필요한 것이다. 

 

디자인

카프카에서는 서버 측에는 프로듀서에 의해 전송된 메시지를 관리하고, 커밋 또는 중단 등을 표시하는 역할을 하는 트랜잭션 코디네이터라는 것이 존재한다. 카프카는 트랜잭션 로그를 카프카의 내부 토픽인 __transaction_state에 저장하는데 이 토픽은 카프카의 내부 토픽이며, 파티션 수와 리플리케이션 수가 존재하며, 브로커의 설정을 통해 관리자가 설정할 수 있다. 이때 __transaction_state 토픽에 프로듀서가 직접 기록하는 것이 아니라, 프로듀서는 트랜잭션 관련 정보를 트랜잭션 코디네이터에게 알린 후 트랜잭션 코디네이터가 모든 정보의 로그를 직접 기록한다. 
 
정확히 한 번 전송을 이용해 전송된 메시지들이 카프카에 저장되면, 카프카의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상 또는 실패 커밋인지 식별할 수 있어야하는데, 카프카에서 이를 식별하기 위해 컨트롤 메시지라고 불리는 특별한 타입의 메시지가 추가로 사용된다. 이후 해당 메시지에 대해서 언급할 것이니 이런게 있다 정도만 알고 넘어가자.  
 
정확히 한 번 전송을 위해 필수로 설정해야하는 옵션이 있는데 트랜잭션 프로듀서 예제 코드를 보면서 흐름을 이해해보자.

* 해당 코드는 [실전 카프카 개발부터 운영까지] 라는 서적에서 제공해주는 코드이다. 

 

[Java]

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ExactlyOnceProducer {
    public static void main(String[] args) {
        String bootstrapServers = "peter-kafka01.foo.bar:9092";
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 정확히 한번 전송을 위한 설정
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // 정확히 한번 전송을 위한 설정
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // 정확히 한번 전송을 위한 설정
        props.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); // 정확히 한번 전송을 위한 설정
        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "peter-transaction-01"); // 정확히 한번 전송을 위한 설정

        Producer<String, String> producer = new KafkaProducer<>(props);

        producer.initTransactions(); // 프로듀서 트랜잭션 초기화
        producer.beginTransaction(); // 프로듀서 트랜잭션 시작
        try {
            for (int i = 0; i < 1; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("peter-test05", "Apache Kafka is a distributed streaming platform - " + i);
                producer.send(record);
                producer.flush();
                System.out.println("Message sent successfully");
            }
        } catch (Exception e){
            producer.abortTransaction(); // 프로듀서 트랜잭션 중단
            e.printStackTrace();
        } finally {
            producer.commitTransaction(); // 프로듀서 트랜잭션 커밋
            producer.close();
        }
    }
}

//https://github.com/onlybooks/kafka2/blob/main/chapter5/%EC%98%88%EC%A0%9C/%EC%98%88%EC%A0%9C%205-3

 
여기서 중요한 것은 중복 없는 전송과 정확히 한 번 전송의 옵션 설정에서 큰 차이점은 TRANSACTIONAL_ID_CONFIG이다. 해당 옵션은 실행하는 프로듀서 프로세스마다 고유한 아이디로 설정해야한다. 다시 말해, n개의 프로듀서가 존재하면 n개의 다른 아이디로 설정해야한다는 뜻.
 
 

단계별 동작

지금부터 로직 흐름을 살펴보려고 하는데 필자도 해당 부분을 이해하는 것이 쉽지만은 않았다. 이 내용은 10번~20번을 정독하면서 이해해보자. 
 

 
먼저, 정확히 한 번만 전송하기 위해서는 트랜잭션 코디네이터를 찾아서 도움을 받아야한다. 그러므로 프로듀서는 브로커에게 FindCoordinatorRequest를 보내서 트랜잭션 코디네이터의 위치를 찾게 된다.
 

*만약 트랜잭션 코디네이터가 존재하지 않는다면 신규 트랜잭션 코디네이터가 생성된다.

 
트랜잭션 코디네이터는 브로커에 위치하며 주역할은 PID(ProducerID), TID(transactional.id)를 매핑하고 해당 트랜잭션 전체를 관리한다. __transaction_state 토픽의 파티션 번호는 TID(trasactional.id)로 해시하여 결정되고 이 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 결정된다. 
 
여기서 브로커가 두개인데 위의 브로커는 트랜잭션 코디네이터의 브로커이며 아래의 브로커프로듀서가 전송하는 메시지를 받는 브로커이므로 서로 다르다. 
 

 

producer.initTransactions(); // 프로듀서 트랜잭션 초기화

 
프로듀서는 initTransactions() 메소드를 이용해  트랜잭션 전송을 위한 initPidRequest를 트랜잭션 코디네이터로 보낸다. 이때 TID가 설정된 경우에 initPidRequest와 함께 TID를 전송한다. 이때, 트랜잭션 코디네이터는 TID, PID를 매핑하고 트랜잭션 로그에 기록한다. 그 다음에 PID에포크를 한 단계 올리는 동작을 하고 PID에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시하게 된다. 이전에 포스팅했던 리더에포크에 대한 개념과 유사하다. 쉽게 말하면, 새로운 트랜잭션과 이전 트랜잭션을 구분하는 용도이다.
 

* 에포크를 활용하는 이유는 신뢰성 있는 메시지 전송을 위함.
 

 

producer.beginTransaction(); // 프로듀서 트랜잭션 시작

 
다음으로 프로듀서는 beginTransaction() 메소드를 이용해 새로운 트랜잭션의 시작을 알린다. 프로듀서는 내부적으로 트랜잭션이 시작됐음을 기록하지만, 트랜잭션 코디네이터 관점에서는 첫 번째 레코드가 전송될 때까지 트랜잭션이 시작되는 것은 아니라는 것을 알아두자. 
 

 
다음으로 트랜잭션 상태 추가 동작이다. 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달하고, 트랜잭션 코디네이터는 해당 정보를 트랜잭션 로그에 기록한다. 그러니까 프로듀서가 아래 브로커에 존재하는 P0(파티션0)으로 메시지를 보내야하니 트랜잭션 코디네이터에게 미리 기록해달라는 것이다. 위의 그림처럼 TID와 P0(파티션0)의 정보가 트랜잭션 로그에 기록되며, 트랜잭션의 현재 상태를 Ongoing(진행중)으로 표시한다. 만약 트랜잭션 로그에 추가되는 첫 번째 파티션이라면, 트랜잭션 코디네이터는 해당 트랜잭션에 대한 타이머를 시작한다. 기본값으로 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면, 해당 트랜잭션은 실패로 처리된다. 
 

 
이제 프로듀서는 대상 토픽의 파티션으로 메시지를 전송하는데 위의 그림에서는 P0(파티션0)으로 메시지를 전송했고, 해당 메시지에는 PID, 에포크, 시퀀스 번호가 함께 전송된다. 
 

 catch (Exception e){
    producer.abortTransaction(); // 프로듀서 트랜잭션 중단
    e.printStackTrace();
} finally {
    producer.commitTransaction(); // 프로듀서 트랜잭션 커밋
    producer.close();
}

 
메시지 전송을 완료한 프로듀서는 commitTransaction() 메소드 또는 abortTrasaction() 메소드 중 하나를 반드시 호출해야 하며, 해당 메소드의 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알린다. 위의 코드에서 전송 과정 중 Exception이 터지면 abortTransction() 메소드를 호출하는 것을 확인할 수 있다.이어서 트랜잭션 코디네이터는 두 단계의 커밋 과정을 시작한다.
 
[첫 번째 단계]는 트랜잭션 로그에 해당 트랜잭션에 대한 PrepareCommit 또는 PrepareAbort를 기록한다. 
 

 
[두 번째 단계]는 트랜잭션 코디네이터가 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록한다. 여기서 기록하는 메시지가 앞서 언급한 컨트롤 메시지다. 
 
예를 들어, 트랜잭션 프로듀서가 파티션0에 메시지를 전송했고 해당 메시지의 오프셋이 1이라고 가정해보자. 트랜잭션 코디네이터는 파티션0에 트랜잭션 커밋 표시 메시지를 기록하고, 이 추가 메시지(컨트롤 메시지)로 인해 파티션0의 마지막 오프셋은 2로 증가한다. 이 메시지는 해당 PID의 메시지가 제대로 전송됐는지 여부를 컨슈머에게 나타내는 용도로도 사용된다. 따라서 트랜잭션 커밋이 끝나지 않은 메시지는 컨슈머에게 반환하지 않으며, 오프셋의 순서 보장을 위해 트랜잭션 성공 또는 실패를 나타내는 LSO(Last Stable Offset)라는 오프셋을 유지하게 된다. 
 

 
마지막 단계로 트랜잭션 완료이다. 트랜잭션 코디네이터는 완료됨(Committed)이라고 트랜잭션 로그에 기록한다. 그리고 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리는 모두 마무리된다. 트랜잭션을 이용하는 컨슈머는 read_committed 설정을 하면 트랜잭션에 성공한 메시지들만 읽을 수 있게 된다. 
 
 

마치며

동작 흐름이 생각보다 복잡해서 이해하기가 쉽지 않았다. 이 글을 작성하면서 관련 서적을 한 30번을 다시 읽어봤을 정도로 까다로운 내용이였다. 하지만, 신뢰성있는 카프카를 만들기 위해 이러한 로직도 존재하는구나를 알게되었고 특히나 트랜잭션 코디네이터로 각 트랜잭션을 고유하게 식별하고 기록하며, 이로써 동일한 트랜잭션을 두 번 이상 처리하지 않도록 보장한다는 것이 놀라웠다. 다음 포스팅은 프로듀서의 내용에서 벗어나 컨슈머에 대한 내용을 정리해보려고 한다. 
 
* 참고서적 : 실전 카프카 개발부터 운영까지 - 고승범

반응형

댓글