본문 바로가기
Big Data/Data Engineering

kafka Basic

by 룸구 2024. 10. 26.

빅데이터에 대한 정의

빅데이터 정의

  • 5v (volume variety velocity veracity value)
  • 기존 db에서 효율적으로 혹은 전혀 처리할 수 없는 데이터
  • 하나의 서버에서 처리하기에 많은 양

빅데이터 발생 배경

  • disk 등의 하드웨어 비용 감소
    • 실시간 logging 비용 감소
  • 데이터 생성 기기 증가
    • 스마트 기기의 종류와 갯수 증가
  • 데이터 provider의 다양화
    • Ott, Youtube, 개인방송 등
  • 오픈소스의 발전
    • Hadoop의 등장 - 대량의 데이터를 저장하거나 분산병렬처리 이용한 집계 가능
    • 오픈소스의 등장으로 sw 비용절감

결론

  • 저렴한 하드웨어와 오픈소스 소프트웨어의 합작으로 데이터 처리 효율 급상승
  • 데이터를 통한 비즈니스 수익성 증명 ex) 추천시스템을 통한 매출증가

데이터 처리량의 변화

빅데이터 처리 기술의 다양화

빅데이터 처리 기술 ecosystem

  • kafka 위치 : 실시간 빅데이터 처리의 표준

kafka란

어원

  • writing에 최적화하여 만든 오픈소스라서 좋아하는 작가 이름 붙임
    • 프란츠 카프카 : 대표작) 변신 - 바퀴벌레로 변하면?

kafka 정의

  • 오픈소스 분산 이벤트 스트리밍 플랫폼
오픈소스 contributor들의 지속적인 update
- 
https://github.com/apache/kafka
분산 확장성 및 장애회복성을 바탕으로, 데이터 파티셔닝으로 분산하여 다중 broker 이용한 병렬처리 가능
이벤트
스트리밍
무한한 연속적인 event 처리 가능
플랫폼 다양한 모듈로 구성
- producer/ consumer / broker / connect / streams / schema registry

 

탄생배경

  • 기존의 데이터 분석에서 데이터 수집 및 정합성 정리 등에 들어가는 공수가 큼
  • 데이터 처리방식의 니즈 변경(배치 -> 실시간 처리)
  • rest api 활용의 한계성 : 회복탄력성, scalable 부족
  • 기존에 사용하던 oracle의 이기종과 호환성 부족 ..?
  • 기존에 사용하던 실시간 처리기술인 MQ(대표: rabbit MQ)의 성능상 이슈 존재 (낮은 처리량)

kafka 특징

  • 순서 보장 
  • 전송방식 : at least once
  • 파티셔닝 전략

kafka 장점 - 비교대상 : MQ, Rest API

  • 높은 처리량 : throughoutput
  • 실시간성 : source와 sink 사이에 kafka 위치해도 latency 무의미한 수준
  • 이기종과 호환성
  • 무중단성: 하드웨어(server, broker 등) 장애 등에 대한 강력한 회복탄력성
  • 동적 스케일 조절 : broker 갯수조절을 통한 scalable한 대응 가능
  • 유지보수 및 하드웨어 비용 절감

kafka 구조

kafka 구조

  • producer - kafka cluster (broker + zookeeper) - consumer
  • pub / sub model
    • pub/sub 디커플링
    • rest API와 비교
      • client나 server가 추가될때마다 발생하는 복잡성 증가 대처 가능 (확장성)
      • 특정 server에서 이슈 발생 시 발생하는 연쇄작용 차단 및 장애회복 후 연속성있게 data consume 가능 (고가용성)

 

kafka 구성

  • producer
    • 데이터 소스에서 발생한 이벤트나 로그를 Kafka의 topic으로 전송하는 connector역할
      • 다중 토픽 데이터 전송 가능
      • 어떤 파티션에 데이터를 보낼지 결정 가능 (ex, 라운드-로빈 방식, 키 기반 지정방식)
      • 비동기 전송으로 높은 처리량
from kafka import KafkaProducer
import json

# Kafka 프로듀서 설정
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 메시지 전송
topic_name = 'my_topic'
message = {'key': 'value'}

producer.send(topic_name, message)
producer.flush()  # 모든 메시지가 전송될 때까지 대기

print(f"Message sent to topic {topic_name}: {message}")

 

  • kafka cluster : broker (default 3ea)
    • 메시지를 디스크에 저장하고 관리하는 서버 (영속성)
    • producer의 data push와 consumer의 data pull 요청 처리
    • 특정 partition에 대해 리더 broker 역할, 팔로워 broker 역할 가능 (고가용성)
      • 리더 broker : 해당 partition에 대한 모든 요청을 처리
      • 팔로워 broker : 리더 broker를 미러링하여 sync 유지
    • controller broker가 각 partition에 대한 리더, 팔로워 broker 결정
      • broker 갯수 동적 조절가능 (효율성, 확장성)
  • kafka cluster : zookeeper (default 3ea)
    • kafka cluster의 metadata를 관리
    • coordinator app 역할
    • hadoop 프로젝트 일환, hadoop eco system의 서비스들이 동물이름이기 때문에 사육사(zookeeper)로 이름
    • zookeeper 없이 kafka 동작 불가, 최근 release에서 kraft controller라는 내장 coordinator가 포함되어 deprecate 예정
  • kafka cluster : topic / broker / segment / offset
    • topic
      • publisher가 write하고 subscriber가 read하는 특정 유형의 메시지를 구분하는 논리적 이름 (폴더)
    • partition
      • 실제 메시지가 저장되는 물리적 단위로 Append-Only 방식으로 기록되는 하나의 로그 파일묶음
      • 더보기
        ※ Append-Only : partition을 늘린 후 줄일 수 없게 만든 이유는?
        1. io 성능 증가
            -> time retention 설정으로 오랜된 기록순으로 자동삭제하는 방법 사용
        2. offset 이용한 메세시 신뢰성 관리 방식 유지
    • segment
      • partition 내에서 물리적으로 메시지를 저장하는 파일 단위, 실제 data가 저장되는 최소단위(최대1GB) 
    • offset
      • partition 내 unique한 위치나타내는 정수
      • incremental하며 FIFO이므로 offset 작을수록 과거 데이터임이 명확 (순서보장)
      • data write/read 시 기준점, data consume 끊기다가 진행되도 순서보장 가능

  • consumer
    • Kafka의 topic에서 데이터를 읽고 처리하며 data sink connector역할
      • consumer group을 이용한 scalability 증가
      • topic offset 관리 통한 순서보장 및 장애회복시 신뢰성
      • 데이터에 대한 다양한 ETL 처리 가능 (ex, 다른 kafka topic의 producer 역할)
from kafka import KafkaConsumer  
import json

# Kafka 소비자 설정

consumer = KafkaConsumer(  
'my\_topic',  
bootstrap\_servers='localhost:9092',  
value\_deserializer=lambda v: json.loads(v.decode('utf-8')),  
auto\_offset\_reset='earliest', # 가장 처음부터 메시지를 읽음  
enable\_auto\_commit=True  
)

# 메시지 소비

for message in consumer:  
	print(f"Received message: {message.value}")

'Big Data > Data Engineering' 카테고리의 다른 글

Spark 기초  (0) 2022.02.04
ELK 스택  (0) 2022.02.04