개발 일지/Kafka

[Kafka] 고가용성과 리플리케이션 & 스프링 부트 테스트

배발자 2023. 8. 9.
반응형

개요

프로젝트를 진행하면서 ELK-Stack을 활용하여 로그 수집을 진행하였고 추가적으로 kafka를 도입하면서 안정화된 시스템을 구축하였다. 하지만, 프로젝트의 제한된 날짜 때문에 카프카의 큰 장점을 못살렸던 것이 너무 아쉽게 느껴졌다. 예를들면, 필자가 구성한 카프카는 단일 브로커로 구성하였기 때문에 카프카의 고가용성이라는 장점을 못살린 아키텍처가 된 것이다. 

 

프로젝트 시즌이 끝나고 좋아하는 코딩 문제만 풀다보니 아쉽게 마무리했던 과거 프로젝트의 기억이 주마등처럼 지나가면서 '아차!' 하는 느낌을 받았다. 이전에 작업하던 카프카 관련 내용들은 이미 사라진 서버에서의 고대 작업물이 되었고, 개인 노션 저장소에 백업을 해두긴 하였지만 각 서버마다 복잡하게 얽혀있는 환경 설정 등으로 같은 세팅으로 로컬에 띄우는 것은 주제와 어긋난 오류를 많이 접할 것이라고 생각했다. 그래서 로컬에서 새로운 세팅을 통해 카프카 서버를 구축하여 고가용성에 대한 의미를 확실히 하기위한 테스트를 진행하려고 한다. 

 

즉, 이번 포스팅에서는 카프카의 고가용성과 리플리케이션이 어떻게 돌아가는지 분석해보며, 과거 아쉽게 끝났던 프로젝트를 홀로남아 재구성하는 시간을 가지려고 한다. 

 

 

도커 kafka 서버 구축

과거 프로젝트를 진행했을 때 카프카를 필자가 구축해서 띄우진 않았다. 그때 당시 스토리 관련 기능을 담당하는 팀원이 카프카를 구축하였기 때문에 필자는 해당 팀원에게 'log' 토픽을 설정해달라고 하였다. 이미 카프카 서버는 세팅이 되어있었고 나의 담당이였던 ELK-Stack을 활용하기 위해 Logstash에서 'log' 토픽을 Consume만 해오면 됐기때문이다. 

 

아무튼, 그때 당시 kafka 관련 컴포즈 파일을 열어보았을 때 단일 브로커로 한대로만 구성되어있었다. 이러한 설정은 테스트 환경에서는 괜찮을진 몰라도 커스터마이징 한 실제 배포 환경에서는 가용성에 대한 대비책이나 데이터의 일관성을 유지하지 못할 수 있다. 그러므로 로컬 환경에서 3대의 카프카 브로커를 띄우고 'bae'라는 공통 토픽을 설정하여 리플리케이션 환경 구축이 이번 포스팅의 주제이다. 

 

 

만약 카프카에 대한 개념을 모른다면 아래의 글을 읽어보자

 

카프카의 도입 이유와 ELK-Stack과 결합한 시스템 설계

프로젝트를 진행하면서 서비스의 API 기능을 구현하는 것뿐만이 아니라 '성능 개선', '트래픽 처리'를 주제로 어떤 기술을 도입을 할지 고민을 하는 시점이 있었다. 그때 당시 ELK-Stack을 활용한 로

baebalja.tistory.com

 

 

파티션 구성

 

위에서 언급했듯이 브로커는 총 3대로 구성하고 3개의 파티션과 원본 파티션을 포함하여 3개의 Replica로 구성한다. 즉, 카프카를 서버에 띄우게 된다면 위의 그림처럼 되어있다. (Replica는 그림에서 생략)

 

먼저 파티션에 대해서 조금 더 자세히 알아보자. bae 토픽에 전송되는 메시지를 보관하는 물리적인 메세지 그룹이다. 여러 개의 브로커에 나눠서 Parition을 구별할 수 있으며 Partiion 마다 message를 관리하기 때문에 meaage 병렬 처리도 가능하다. 즉, 보관도 분산해서 하기 때문에 고가용성을 유지하는데 좋다.  

 

파티션이 메시지 보관하는 법

 

 

'bae'라는 토픽에 3개의 파티션을 구성했다고 하면 partition은 0부터 2까지 3개로 위의 그림처럼 구성된다. publisher가 'bae'라는 토픽에 메세지를 넣게되면 각 브로커는 상태에 맞게 partition 별로 offset 0부터 차례로 쌓이게 된다. partition별 offset 단위로 메시지를 보관하기 때문에 메시지를 consume 할 때 partition 별로 처음부터 끝까지 읽기, 특정 파티션만 설정해서 (ex partition 0,1만) 메세지 가져오기 등이 가능하게 되어 메세지 관리와 고도화가 편리하다. 

 

그렇다면 왜 파티션이 필요할까? 

'bae'라는 토픽에 partition 1개가 있다고 가정했을 때 3개의 Producer가 1초에 10개의 메세지를 보낸다면 1초에 Broker가 받는 메세지의 개수는 총 30개이다. 만약 Broker가 1개의 파티션이 존재하고 이것을 1초에 10개의 메세지만 받을 수 있다면 1초에 메세지 보관 처리하는 메세지 총 개수는 10개이다. 즉, Producer가 보내는 메세지와 Brokder가 처리하는 메세지 수가 20개의 간극이 벌어진다. 

 

즉, 병목 현상이 발생한다. 

 

병목현상이 일어나는 이유는 message의 순서가 보장이 되어야한다. Broker가 처리할 수 있는 양보다 더 많은 Message가 들어온다면 순서를 보장하는 데까지의 waiting이 생긴다. 따라서 Broker의 처리량을 잘 파악해서 Partition 처리를 해줘야한다.

 

그렇다면 어떻게 해야할까? 

 

파티션 수를 늘리는 방법이 있다. 파티션 수를 늘리게되면 Producer가 보내주는 Message를 병렬적으로 처리하기 때문에 병목 현상을 없앨 수 있다. 

 

 

파티션을 무작정 늘려야하는가? 

Kafka에서 고가용성을 제공하기 위해 Broker에 대한 Message replication factor을 제공한다. Cluster를 구성하고 Partition을 Topic별로 구성했다면 복제 옵션을 줘서 Partition에 들어온 Message를 복제할 수 있다. 

 

필자가 경험한 프로젝트는 단일 브로커로 진행하였기 때문에 해당 브로커에 장애가 발생한다면 메세지 데이터들이 한순간에 다 사라지고 SPOF(단일 장애지점)으로 분류될 것이다. SPOF 문제점을 피하기 위해 가용성을 중심으로 브로커 3대를 두어 아래와 같이 구성한다. 

 

 

위의 그림에서 하나의 브로커가 장애가 발생한다고 치면 다른 브로커에 복제본을 들고 있기 때문에 장애 복구가 가능한것이다. 즉, 고가용성이라는 장점을 지니고 있다. 하지만 엄청나게 많은 Partition이 존재한다면 다른 곳으로 복제하는데 시간이 많이 걸린다는 단점이 있다. 

 

그래서 Partition을 효율적으로 사용하기 위해서는 적당한 Partition을 유지하는 것이 중요하다. 

 

[Partition Rplica 기준]

용도 복제본 개수
개발 1
운영(장애 대응 필수x) 2
운영(장애 대응 필수) 3

 

 

 

Kafka 설정

[docker-compose.yml] 

version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "22181:2181"


  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1

    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1

    ports:
      - "39092:39092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

  kafka-3:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1

    ports:
      - "49092:49092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://localhost:49092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

 

위의 도커 컴포즈를 활용하여 컨테이너를 띄우게 되면 아래와 같이 생성된다.

 

 

docker-compose exec kafka-1 kafka-topics --create --topic bae --bootstrap-server kafka-1:9092 --replication-factor 3 --partitions 3

[출력]
Created topic bae.

 

토픽은 bae라고 정하고 위의 명령문을 작성하여 kafak-1 브로커에 토픽을 생성하였다. 

 

 

docker-compose exec kafka-1 kafka-topics --describe --topic bae --bootstrap-server kafka-1:9092

[출력]
Topic: bae      TopicId: 8vGExAYpRUmKGzh5JXlN1A PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: bae      Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        Topic: bae      Partition: 1    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
        Topic: bae      Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1

 

위의 명령문을 작성하여 현재 kafak-1 브로커의 토픽 관련 정보를 불러온다. 하나하나 살펴보면 다음과 같다. 

 

Topic 'bae'라는 토픽
TopicId 8vGExAYpRUmKGzh5JXlN1A   'bae' 토픽의 고유 ID
PartitionCount 해당 토픽은 3개의 파티션으로 나누어짐.  
ReplicationFactor 각 파티션은 3개의 복제본으로 구성(원본 포함)
Configs  Partition: 파티션 번호
Leader: 해당 파티션의 리더로 할당된 브로커 ID (모든 읽기 및 쓰기 작업을 책임지는 브로커)
Replicas: 파티션의 복제본이 할당된 브로커 ID
Isr (In-Sync Replicas): 데이터가 동기화되어 있는 복제본이 할당된 브로커 ID

 

 

결론적으로, 이 구성은 'bae' 토픽이 3개의 파티션으로 구성되고, 각 파티션에는 3개의 복제본이 있으며, 파티션의 리더와 복제 노드 설정을 보여주는 것이다. 

 

도커 환경에서 카프카 서버를 띄웠으니 스프링 부트를 활용하여 Producer와 Consumer 서버를 생성하여 정상적으로 메세지가 수신되는지 확인해보자. 

 

Producer 설정

[controller]

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
        return "success";
    }
}

 

[service] 

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;


@Service
public class KafkaProducer {

    private static final String TOPIC = "bae";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }

}

 

Producer는 'bae'라는 토픽으로 메세지를 발송하는 서버라고 생각하면 된다. 

위의 예제는 매우 간단하게 작성하였지만, 큰 규모의 프로젝트에서는 Producer를 하는 여러 대의 서버로 구성되어 있고 일련의 하위 작업들을 Kafka에 비동기적으로 메세지를 던지고 작업 프로세스(Consumer)가 바라보고 있는 토픽에 대한 메세지를 받고 작업을 처리하게 되는 것이다. 

 

Consumer 설정 

[생성 2개]

package com.example.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "bae", groupId = "consumer")
    public void consume(ConsumerRecord<String, String> record) throws IOException {
        String message = record.value();
        int partition = record.partition();
        System.out.println(String.format("Consumed message : %s, from partition: %d", message, partition));
    }

}

 

Consumer 서버 또한 두개의 스프링 부트로 작업을 하였고 Local에서 진행하기 때문에 각각 다른 포트로 서버를 실행시켰다. 실행시켰을 때 다음과 같은 로그가 생성되었다. 

 

 

 

consumer(left): partitions assigned: [bae-2]
consumer(right): partitions assigned: [bae-0, bae-1]

왼쪽 컨슈머에 파티셔닝 2 할당, 오른쪽 컨슈머에 파티셔닝 0, 1 할당

 

 

Message push

 

 

PostMan을 활용하여 POST 방식으로 메세지를 Proucer 서버에 전달하면, Producer는 해당 메세지를 카프카에 전송하고 컨슈머는 파티션에서 데이터를 꺼내와 작업을 수행한다.  

 

 

 

위의 사진은 Consumer 서버에서 출력된 로그이며, Postman에서 Send 버튼을 계속해서 클릭했을 때 번갈아가면서 Consumer가 작업하고 있다.

 

 

Kafka 브로커 1번 노드를 중간에 중지시켜도 해당 토픽은 지속적으로 사용가능했다. 

즉, 카프카는 고가용성을 달성하기 위해, 리더 브로커가 다운되면 자동으로 팔로워 브로커 중 하나에 리더를 양도한다. 

 

 

반응형

댓글