구글 클라우드 데이터 관리 체계 및 데이터 엔지니어링
데이터 관리 개념, 시스템 아키텍처, 서비스 아키텍처 및 구글 클라우드 서비스 활용 가이드
1. 소개
현대 비즈니스에서 데이터는 가장 중요한 자산 중 하나입니다. 대규모 데이터를 효율적으로 저장, 처리, 분석하는 능력은 비즈니스 성공의 핵심 요소가 되었습니다. 구글 클라우드 플랫폼(Google Cloud Platform, GCP)은 기업이 데이터를 최대한 활용할 수 있도록 다양한 데이터 관리 서비스를 제공합니다.
이 교육 자료에서는 구글 클라우드의 데이터 관리 체계, 데이터 파이프라인 구축 방법, 그리고 데이터 엔지니어링 프로세스에 대해 자세히 알아보겠습니다. 구글 클라우드의 주요 데이터 서비스인 Google Cloud Storage, BigQuery, Cloud Composer, Dataproc, Dataflow, Data Fusion 등에 대한 상세한 설명과 활용 예시를 통해 효과적인 데이터 관리 시스템을 구축하는 방법을 배울 수 있습니다.
2. 데이터 관리 개념
2.1 데이터 관리란?
데이터 관리는 조직이 보유한 데이터 자산의 수집, 저장, 처리, 보호 및 활용을 위한 종합적인 프로세스와 정책을 의미합니다. 효과적인 데이터 관리는 데이터의 품질, 접근성, 보안성을 보장하여 비즈니스 인텔리전스와 의사결정에 신뢰할 수 있는 정보를 제공합니다.
🏡 비유: 도서관 시스템
데이터 관리는 거대한 도서관을 운영하는 것과 같습니다. 도서관에서는 책(데이터)을 체계적으로 분류하고, 카탈로그를 만들어 쉽게 찾을 수 있게 하며, 도서 대출 시스템(데이터 접근 정책)을 통해 이용자가 필요한 정보에 접근할 수 있도록 합니다. 또한 귀중한 자료는 특별 보관소(보안 시스템)에 보관하고, 손상된 책은 복원(데이터 복구)하며, 주기적으로 도서 목록을 업데이트(데이터 유지관리)합니다.
2.2 데이터 라이프사이클
데이터 라이프사이클은 데이터가 생성되는 시점부터 보관, 사용, 그리고 최종적으로 보존 또는 폐기되는 전체 과정을 나타냅니다.
1. 생성/수집
데이터가 시스템에 처음 들어오는 단계
2. 저장/관리
데이터를 적절한 형식으로 저장하고 카탈로그화
3. 처리/분석
데이터에서 가치 있는 인사이트 추출
4. 보존/폐기
정책에 따른 데이터 아카이빙 또는 삭제
🌱 비유: 농작물 재배
데이터 라이프사이클은 농작물 재배 과정과 유사합니다. 씨앗 심기(데이터 생성), 물과 비료 주기(데이터 관리), 수확(데이터 처리), 그리고 보관 또는 소비(데이터 활용)의 단계를 거칩니다. 농부가 작물의 상태를 모니터링하듯이, 데이터 엔지니어는 데이터의 품질과 상태를 지속적으로 관리해야 합니다.
2.3 데이터 거버넌스
데이터 거버넌스는 조직 내에서 데이터 관리에 관한 권한, 통제, 정책 수립을 위한 프레임워크입니다. 효과적인 데이터 거버넌스는 데이터의 일관성, 신뢰성, 보안성, 가용성을 보장하는 데 필수적입니다.
⚖️ 비유: 국가 통치 시스템
데이터 거버넌스는 국가의 통치 시스템과 같습니다. 국가에는 법률(데이터 정책), 정부 부처(데이터 소유권), 사법 시스템(규정 준수), 국민의 권리(데이터 접근 권한)가 있습니다. 효과적인 국가 통치가 국민의 안전과 번영을 보장하듯이, 강력한 데이터 거버넌스는 조직의 데이터 자산을 보호하고 최대한 활용할 수 있게 합니다.
2.4 데이터 품질 관리
데이터 품질 관리는 데이터가 비즈니스 요구 사항을 충족하고 목적에 적합한지 확인하는 프로세스입니다. 데이터 품질은 정확성, 완전성, 일관성, 시의성, 유효성, 고유성 등 여러 차원에서 평가됩니다.
정확성
데이터가 실제 값을 올바르게 반영하는 정도
완전성
필요한 모든 데이터 요소가 존재하는지 여부
일관성
여러 시스템 간에 데이터 값이 일치하는 정도
시의성
데이터가 필요한 시점에 사용 가능한지 여부
유효성
데이터가 정의된 규칙과 제약 조건을 충족하는지 여부
고유성
중복 데이터 없이 각 항목이 한 번만 표현되는지 여부
🔍 비유: 보석 감정
데이터 품질 관리는 보석 감정과 유사합니다. 보석 감정사는 보석의 순도(정확성), 크기(완전성), 색상 일관성(일관성), 시장 가치(관련성), 결함 유무(유효성)를 평가합니다. 고품질 보석이 더 높은 가치를 가지듯이, 고품질 데이터는 조직에 더 큰 가치와 인사이트를 제공합니다.
3. 시스템 아키텍처
3.1 클라우드 데이터 아키텍처 개요
클라우드 데이터 아키텍처는 데이터의 수집, 저장, 처리, 분석, 시각화를 위한 클라우드 기반 시스템 구성을 의미합니다. 구글 클라우드는 다양한 데이터 요구 사항을 충족할 수 있는 확장 가능하고 유연한 아키텍처를 구축할 수 있도록 다양한 서비스를 제공합니다.
🏗️ 비유: 현대적인 도시 설계
클라우드 데이터 아키텍처는 현대적인 도시 설계와 유사합니다. 도시에는 물류 센터(데이터 수집), 창고(데이터 저장), 공장(데이터 처리), 연구소(데이터 분석), 전시관(데이터 시각화)이 있습니다. 이들은 효율적인 교통 시스템(데이터 파이프라인)으로 연결되어 있고, 모든 시설은 확장 가능하며 도시의 성장(비즈니스 확장)에 맞춰 조정될 수 있습니다.
3.2 데이터 레이어 구성
효과적인 데이터 아키텍처는 일반적으로 여러 레이어로 구성됩니다. 각 레이어는 특정 기능을 담당하며, 전체 시스템의 성능, 확장성, 유지 관리성을 향상시킵니다.
수집 레이어
다양한 소스에서 데이터를 수집하고 전송
서비스: Pub/Sub, Dataflow, Data Fusion
저장 레이어
수집된 데이터를 적절한 형식과 구조로 저장
서비스: Cloud Storage, BigQuery, Bigtable
처리 레이어
데이터 변환, 정제, 통합 작업 수행
서비스: Dataproc, Dataflow, BigQuery
분석/시각화 레이어
데이터에서 인사이트를 추출하고 시각화
서비스: BigQuery, Looker, Data Studio
🍰 비유: 케이크 층
데이터 레이어 구성은 여러 층으로 이루어진 케이크와 같습니다. 바닥 층(수집 레이어)은 기초가 되어 전체 구조를 지지하고, 중간 층(저장 및 처리 레이어)은 케이크의 맛과 질감을 결정하며, 상단 층(분석 및 시각화 레이어)은 아름다운 장식으로 최종 사용자 경험을 제공합니다. 각 층은 고유한 역할을 하지만, 전체적으로 조화를 이루어야 맛있는 케이크가 됩니다.
3.3 데이터 레이크와 데이터 웨어하우스
현대적인 데이터 아키텍처에서는 데이터 레이크와 데이터 웨어하우스가 핵심 구성 요소로 자리 잡고 있습니다. 두 시스템은 목적과 특성이 다르지만, 상호 보완적으로 작동하여 종합적인 데이터 관리 솔루션을 제공합니다.
특성 | 데이터 레이크 | 데이터 웨어하우스 |
---|---|---|
데이터 구조 | 원시 데이터, 구조화/비구조화/반구조화 데이터 | 구조화된 처리 데이터 |
스키마 | 스키마-온-리드 (읽기 시 스키마 적용) | 스키마-온-라이트 (쓰기 시 스키마 적용) |
데이터 처리 | 처리 전 데이터 (ELT 방식) | 처리 후 데이터 (ETL 방식) |
용도 | 다양한 분석, 머신러닝, 예측 분석 | 보고서, BI, 정형화된 분석 |
사용자 | 데이터 과학자, 데이터 엔지니어 | 비즈니스 분석가, 의사 결정자 |
구글 서비스 | Google Cloud Storage | BigQuery |
💧 비유: 호수와 정수 시설
데이터 레이크는 천연 호수와 같습니다. 다양한 종류의 물(데이터)이 유입되고, 원시 상태로 저장됩니다. 반면, 데이터 웨어하우스는 정수 처리 시설과 같습니다. 호수에서 가져온 물을 여과, 정화하여 특정 용도(비즈니스 분석)에 적합하게 만듭니다. 호수는 모든 물을 저장하는 반면, 정수 시설은 식수 용도로 처리된 물만 보관합니다.
3.4 데이터 파이프라인 기본 구성
데이터 파이프라인은 데이터가 소스에서 목적지까지 이동하는 과정에서 필요한 모든 처리 단계를 자동화하는 시스템입니다. 효과적인 데이터 파이프라인은 데이터의 안정적인 흐름을 보장하고, 필요한 변환을 수행하며, 오류를 처리합니다.
배치 처리 파이프라인
정해진 주기(시간, 일, 주 단위)로 데이터를 처리
사용 사례: 일일 보고서, 정기 백업, 대량 데이터 처리
서비스: Dataproc, BigQuery, Cloud Composer
스트리밍 처리 파이프라인
실시간 또는 준실시간으로 데이터를 지속적으로 처리
사용 사례: 실시간 대시보드, 이상 감지, 실시간 분석
서비스: Pub/Sub, Dataflow, BigQuery
🚿 비유: 수도 시스템
데이터 파이프라인은 도시의 수도 시스템과 같습니다. 물은 수원지(데이터 소스)에서 시작하여 파이프(데이터 흐름)를 통해 이동합니다. 이동 중에 여과 장치와 처리 설비(데이터 변환)를 거쳐 최종적으로 각 가정의 수도꼭지(데이터 소비 지점)에 도달합니다. 밸브와 계측기(모니터링 도구)는 시스템 전체의 흐름을 제어하고 모니터링합니다. 배치 처리는 대형 저수지에서 주기적으로 물을 공급하는 것과 같고, 스트리밍 처리는 지속적으로 흐르는 수도 시스템과 유사합니다.
4. 서비스 아키텍처
4.1 구글 클라우드 데이터 서비스 개요
구글 클라우드는 데이터의 수집, 저장, 처리, 분석, 시각화를 위한 다양한 서비스를 제공합니다. 이러한 서비스들은 개별적으로 사용할 수도 있지만, 통합하여 완전한 데이터 파이프라인을 구축할 수도 있습니다.
데이터 수집 및 통합
- Cloud Pub/Sub: 메시징 서비스
- Data Fusion: 코드 없는 데이터 통합
- Transfer Service: 데이터 마이그레이션
데이터 저장
- Cloud Storage: 객체 스토리지
- BigQuery: 데이터 웨어하우스
- Bigtable: NoSQL 데이터베이스
- Cloud SQL: 관계형 데이터베이스
데이터 처리 및 분석
- Dataflow: 스트림/배치 처리
- Dataproc: 관리형 Hadoop/Spark
- BigQuery: 분석 엔진
- Vertex AI: 머신러닝 플랫폼
오케스트레이션 및 시각화
- Cloud Composer: 워크플로우 관리
- Looker: BI 및 시각화
- Looker Studio: 대시보드 생성
🧩 비유: 레고 블록
구글 클라우드의 데이터 서비스는 레고 블록과 같습니다. 각 서비스는 특정 기능을 가진 고유한 블록이며, 이러한 블록들을 조합하여 비즈니스 요구 사항에 맞는 다양한 데이터 솔루션(레고 모델)을 구축할 수 있습니다. 일부 블록은 범용적이고(Cloud Storage), 일부는 특수 목적(BigQuery)을 가지고 있지만, 모든 블록은 서로 잘 맞물려 작동하도록 설계되어 있습니다.
4.2 서비스 통합 패턴
구글 클라우드 서비스들은 다양한 통합 패턴을 통해 함께 작동할 수 있습니다. 이러한 패턴은 특정 데이터 처리 요구 사항을 충족하기 위한 모범 사례와 아키텍처 지침을 제공합니다.
ETL 패턴 (Extract, Transform, Load)
데이터를 추출하여 변환한 후 데이터 웨어하우스에 로드
예: Cloud Storage → Dataflow → BigQuery
ELT 패턴 (Extract, Load, Transform)
데이터를 추출하여 바로 로드한 후 데이터 웨어하우스 내에서 변환
예: Cloud Storage → BigQuery → BigQuery SQL
실시간 스트리밍 패턴
실시간 데이터 처리 및 분석을 위한 구성
예: Pub/Sub → Dataflow → BigQuery
데이터 레이크 패턴
다양한 형식의 대규모 데이터 저장 및 처리
예: 다양한 소스 → Cloud Storage → Dataproc
🍽️ 비유: 요리 레시피
서비스 통합 패턴은 요리 레시피와 같습니다. 레시피는 특정 요리를 만들기 위해 어떤 재료(서비스)를 사용하고, 어떤 순서로 조리(프로세스)해야 하는지 알려줍니다. ETL은 재료를 손질(변환)한 후 냄비에 넣는 것과 같고, ELT는 재료를 먼저 냄비에 넣고 조리 과정에서 손질하는 것과 유사합니다. 모든 요리사가 같은 재료로도 다른 방식으로 요리할 수 있듯이, 데이터 엔지니어도 같은 서비스로 다양한 패턴을 구현할 수 있습니다.
4.3 참조 아키텍처
구글 클라우드는 다양한 데이터 처리 시나리오에 대한 참조 아키텍처를 제공합니다. 이러한 참조 아키텍처는 일반적인 사용 사례에 대한 검증된 솔루션을 제공하여 아키텍처 설계 프로세스를 가속화합니다.
데이터 웨어하우스 아키텍처
데이터 소스 → 데이터 수집(Pub/Sub, Dataflow) → 저장(Cloud Storage) → 처리(Dataflow) → 웨어하우스(BigQuery) → 시각화(Looker)
기업 데이터 분석 및 보고를 위한 중앙 집중식 데이터 저장소 구축에 적합합니다.
실시간 분석 아키텍처
스트리밍 소스 → Pub/Sub → Dataflow → BigQuery/Bigtable → 실시간 대시보드
지연 시간이 짧은 데이터 처리 및 분석이 필요한 사용 사례에 적합합니다.
데이터 레이크 아키텍처
다양한 소스 → Cloud Storage → Dataproc/BigQuery → 데이터 과학/ML
데이터 과학 및 머신러닝을 위한 대규모 데이터 저장 및 처리에 적합합니다.
하이브리드/멀티 클라우드 아키텍처
온프레미스/다른 클라우드 → Anthos/Transfer Service → 구글 클라우드 서비스
여러 환경에 분산된 데이터를 통합하고 관리해야 하는 기업에 적합합니다.
🏗️ 비유: 건축 청사진
참조 아키텍처는 건축 청사진과 같습니다. 청사진은 건물의 구조, 시스템, 구성 요소를 상세히 보여주어 건축가가 처음부터 설계할 필요 없이 검증된 디자인을 활용할 수 있게 합니다. 참조 아키텍처도 마찬가지로 데이터 시스템의 구성 요소와 상호 작용을 보여주어 데이터 엔지니어가 검증된 설계를 기반으로 자신의 요구 사항에 맞게 조정할 수 있도록 도와줍니다. 청사진이 다양한 건물 유형(주거용, 상업용 등)에 맞게 준비되듯이, 참조 아키텍처도 다양한 데이터 처리 시나리오에 맞게 준비되어 있습니다.
5. 구글 클라우드 서비스
5.1 Google Cloud Storage (GCS)
Google Cloud Storage는 구글 클라우드의 객체 스토리지 서비스로, 무제한에 가까운 확장성과 비용 효율적인 데이터 저장 기능을 제공합니다. 객체 스토리지이기 때문에 데이터를 파일 형태로 저장하며, 이러한 파일들은 버킷이라 불리는 컨테이너에 구성됩니다.
주요 특징
- 99.999999999%(11 9's)의 내구성
- 전 세계적으로 분산된 스토리지
- 강력한 일관성 모델
- 통합된 암호화
- 버전 관리 및 객체 라이프사이클 관리
- 4가지 스토리지 클래스 제공
- Standard: 자주 접근하는 데이터
- Nearline: 월 1회 정도 접근하는 데이터
- Coldline: 분기 1회 정도 접근하는 데이터
- Archive: 연 1회 미만으로 접근하는 데이터
사용 사례
- 데이터 레이크 구축
- 백업 및 아카이브
- 콘텐츠 배포
- 빅데이터 및 분석
- 머신러닝 데이터 세트 저장
- 웹사이트 정적 콘텐츠 호스팅
📦 비유: 창고 시스템
Google Cloud Storage는 현대적인 창고 시스템과 같습니다. 버킷은 대형 창고 건물이고, 객체(파일)는 창고 내의 포장된 상자입니다. 각 상자에는 고유한 라벨(객체 이름)이 있어 쉽게 식별할 수 있습니다. 다양한 스토리지 클래스는 창고 내의 다양한 보관 구역과 같습니다 - 자주 접근하는 물품은 입구 근처(Standard), 거의 사용하지 않는 물품은 창고 깊숙한 곳(Archive)에 보관합니다. 보안 시스템(암호화, 접근 제어)은 창고의 보안 장치에 해당하며, 자동 옮기기 규칙(라이프사이클 정책)은 일정 기간이 지난 상자를 자동으로 다른 구역으로 이동시키는 시스템입니다.
GCS 사용 예시
# Python을 사용한 GCS 버킷 생성 및 파일 업로드
from google.cloud import storage
# 스토리지 클라이언트 생성
storage_client = storage.Client()
# 새 버킷 생성
bucket_name = "my-unique-bucket"
bucket = storage_client.create_bucket(bucket_name)
# 로컬 파일을 버킷에 업로드
source_file_name = "local/path/to/file.txt"
destination_blob_name = "storage-path/file.txt"
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"파일 {source_file_name}이 {destination_blob_name}으로 업로드되었습니다.")
GCS 모범 사례
- 버킷 이름은 글로벌 네임스페이스에서 고유해야 함
- 적절한 스토리지 클래스 선택으로 비용 최적화
- 데이터 접근 패턴에 맞는 라이프사이클 정책 구성
- 버전 관리를 활용하여 실수로 인한 데이터 손실 방지
- IAM(Identity and Access Management)을 사용한 세분화된 접근 제어
- 지리적 위치를 고려한 리전 또는 멀티 리전 선택
5.2 BigQuery
BigQuery는 구글 클라우드의 서버리스, 고확장성 데이터 웨어하우스로, 대규모 데이터셋에 대한 초고속 SQL 쿼리를 가능하게 합니다. 스토리지와 컴퓨팅을 분리한 아키텍처로 설계되어 필요에 따라 독립적으로 확장할 수 있습니다.
주요 특징
- 서버리스 아키텍처 (인프라 관리 불필요)
- 페타바이트 규모로 확장 가능
- 표준 SQL 지원
- 스토리지와 컴퓨팅 분리 (독립적 확장)
- 자동 고가용성 및 재해 복구
- 머신러닝 내장 (BigQuery ML)
- 지리 공간 분석 지원
- 실시간 분석 (스트리밍 데이터)
- 데이터 공유 및 마켓플레이스
사용 사례
- 기업 데이터 웨어하우스
- 고급 분석 및 보고
- 실시간 비즈니스 인텔리전스
- 데이터 기반 의사 결정
- 로그 분석
- 고객 행동 분석
- 예측 모델링
- 대규모 데이터 조인 및 집계
🔍 비유: 초고속 도서관 검색 시스템
BigQuery는 세계 최대 규모의 도서관에 설치된 초고속 검색 시스템과 같습니다. 이 시스템은 수십억 권의 책(테이블)을 보관하고 있으며, 사용자가 질문(쿼리)을 하면 수천 명의 사서(분산 처리 노드)가 동시에 모든 책을 검색하여 순식간에 답을 찾아 제공합니다. 사용자는 검색 작업에 필요한 사서의 수(컴퓨팅 리소스)에 대한 비용만 지불하며, 도서관 건물(인프라)의 유지 관리는 신경 쓸 필요가 없습니다. 또한, 새로운 책이 계속 들어와도(데이터 증가) 도서관은 자동으로 확장되어 모든 책을 수용합니다.
BigQuery 사용 예시
# 기본적인 BigQuery SQL 쿼리 예시
-- 공개 데이터셋에서 뉴욕 택시 데이터 분석
SELECT
pickup_datetime,
dropoff_datetime,
trip_distance,
fare_amount,
tip_amount,
passenger_count
FROM
`bigquery-public-data.new_york.tlc_yellow_trips_2018`
WHERE
trip_distance > 0
AND fare_amount > 0
AND pickup_datetime BETWEEN '2018-01-01' AND '2018-01-31'
ORDER BY
fare_amount DESC
LIMIT 1000;
# Python을 사용한 BigQuery 데이터 로드 및 쿼리
from google.cloud import bigquery
# 클라이언트 생성
client = bigquery.Client()
# 데이터셋 생성
dataset_id = "my_dataset"
dataset = bigquery.Dataset(f"{client.project}.{dataset_id}")
dataset.location = "US"
dataset = client.create_dataset(dataset, timeout=30)
# 테이블 생성 및 CSV 파일에서 데이터 로드
table_id = f"{client.project}.{dataset_id}.my_table"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
)
with open("data.csv", "rb") as source_file:
load_job = client.load_table_from_file(
source_file, table_id, job_config=job_config
)
load_job.result() # 작업 완료 대기
# SQL 쿼리 실행
query = f"""
SELECT * FROM `{table_id}`
LIMIT 10
"""
query_job = client.query(query)
results = query_job.result()
# 결과 출력
for row in results:
print(row)
BigQuery 모범 사례
- 파티션과 클러스터링을 사용하여 쿼리 성능 향상 및 비용 최적화
- 적절한 스키마 설계 (정규화 vs 비정규화)
- 쿼리에서 SELECT * 대신 필요한 열만 선택
- WHERE 절에서 파티션 열 필터링
- 대용량 데이터 로드 시 일괄 처리 사용
- 뷰를 활용하여 복잡한 쿼리 단순화 및 데이터 액세스 제어
- 반복적인 집계 데이터는 구체화된 뷰로 미리 계산
- 작업 단위로 데이터 세트 구성 (프로젝트 > 데이터 세트 > 테이블)
5.3 Cloud Composer
Cloud Composer는 구글 클라우드에서 제공하는 완전 관리형 워크플로우 오케스트레이션 서비스로, Apache Airflow를 기반으로 합니다. 데이터 파이프라인이나 ETL 작업과 같은 복잡한 작업 흐름을 작성, 예약, 모니터링할 수 있습니다.
주요 특징
- 오픈소스 Apache Airflow 기반
- Python으로 작성된 워크플로우 정의(DAG)
- 서비스 간 워크플로우 오케스트레이션
- 자동 확장 및 관리
- 내장된 환경 모니터링
- 버전 제어 통합
- 풍부한 운영자 라이브러리
- 하이브리드 및 멀티 클라우드 지원
사용 사례
- ETL/ELT 파이프라인 관리
- 데이터 웨어하우스 로드 자동화
- 머신러닝 워크플로우 오케스트레이션
- 데이터 검증 및 품질 관리
- 크로스 클라우드 데이터 처리
- 정기적인 보고서 생성
- 데이터 마이그레이션
- 타사 API와의 데이터 통합
🎭 비유: 영화 감독과 제작 팀
Cloud Composer는 영화 제작 과정을 관리하는 감독과 제작 팀과 같습니다. 영화 제작(데이터 파이프라인)은 대본 작성, 촬영, 편집, 음향 작업 등 많은 단계(태스크)로 이루어져 있습니다. 감독(Composer)은 이 모든 단계가 올바른 순서로 진행되도록 조율하고, 문제가 발생하면 즉시 대응합니다. 제작 일정표(DAG)는 각 작업, 시작 시간, 종속성을 명확하게 정의합니다. 각 부서(서비스)는 자신의 전문 영역에 집중하면서도 전체 영화 제작 과정의 한 부분으로 함께 작동합니다. 감독은 작업 진행 상황을 지속적으로 모니터링하고, 필요시 일정을 조정하여 최종 영화(데이터 제품)가 성공적으로 완성되도록 합니다.
Cloud Composer 사용 예시
# 기본적인 Airflow DAG 예시 (GCS에서 BigQuery로 데이터 로드)
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
# DAG 정의
with models.DAG(
dag_id='gcs_to_bigquery_example',
schedule_interval=datetime.timedelta(days=1),
start_date=days_ago(1),
tags=['example'],
) as dag:
# GCS 데이터를 참조하는 BigQuery 외부 테이블 생성
create_external_table = BigQueryCreateExternalTableOperator(
task_id='create_external_table',
table_resource={
'tableReference': {
'projectId': 'my-project',
'datasetId': 'my_dataset',
'tableId': 'external_table',
},
'externalDataConfiguration': {
'sourceFormat': 'CSV',
'sourceUris': ['gs://my-bucket/data/*.csv'],
'schema': {
'fields': [
{'name': 'id', 'type': 'INTEGER'},
{'name': 'name', 'type': 'STRING'},
{'name': 'date', 'type': 'DATE'},
]
}
}
}
)
# 외부 테이블에서 데이터를 쿼리하여 정식 BigQuery 테이블에 저장
create_table = BigQueryInsertJobOperator(
task_id='create_table',
configuration={
'query': {
'query': '''
CREATE OR REPLACE TABLE `my-project.my_dataset.my_table` AS
SELECT *
FROM `my-project.my_dataset.external_table`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
''',
'useLegacySql': False
}
}
)
# 태스크 의존성 설정
create_external_table >> create_table
Cloud Composer 모범 사례
- 효율적인 DAG 설계
- 명확한 태스크 분리
- 태스크 간 적절한 의존성 설정
- 태스크 그룹화 및 재사용
- 적절한 스케줄링
- 과도한 태스크 중복 실행 방지
- 데이터 처리 시간을 고려한 일정 설정
- 적절한 환경 크기 조정
- 워크로드에 맞는 노드 유형 선택
- 자동 스케일링 활용
- 효과적인 오류 처리 및 재시도 메커니즘 구현
- 파라미터화된 쿼리 및 변수 사용
- 버전 관리 및 코드 검토 프로세스 확립
- 모니터링 및 알림 설정
5.4 Dataproc
Dataproc은 구글 클라우드의 관리형 Apache Hadoop 및 Apache Spark 서비스입니다. 대규모 데이터 처리, 배치 처리, 쿼리, 스트리밍 및 머신러닝 작업을 실행할 수 있는 빠르고 쉬운 방법을 제공합니다.
주요 특징
- 빠른 클러스터 생성 (90초 이내)
- 자동 클러스터 확장
- 다양한 클러스터 모드 (Standard, Single Node, High Availability)
- 사전 구성된 초기화 작업
- 다양한 OS 이미지 (Debian, Ubuntu, CentOS)
- Spark, Hadoop, Hive, Pig 등 지원
- 작업 단위 결제 (초 단위)
- 클러스터 스케줄링 및 자동 삭제
- 다양한 클라우드 스토리지 통합 (GCS, BigQuery)
사용 사례
- 대규모 데이터 변환 및 ETL
- 배치 처리 작업
- 로그 처리 및 분석
- 머신러닝 모델 학습
- 기존 Hadoop/Spark 워크로드 마이그레이션
- 데이터 과학 탐색
- 임시 분석 작업
- 데이터 레이크 처리
🏭 비유: 첨단 자동화 공장
Dataproc은 첨단 자동화 공장과 같습니다. 이 공장은 단 몇 분 만에 설치(클러스터 생성)할 수 있으며, 다양한 기계(Hadoop, Spark, Hive 등)를 갖추고 있어 다양한 유형의 제품(데이터 처리 작업)을 생산할 수 있습니다. 공장은 주문량(작업량)에 따라 자동으로 생산 라인(노드)을 늘리거나 줄일 수 있으며, 사용한 시간만큼만 비용을 지불합니다. 작업이 완료되면 공장을 쉽게 철거(클러스터 삭제)할 수 있어 유지 비용이 발생하지 않습니다. 또한, 여러 창고(GCS, BigQuery)에서 원자재(데이터)를 가져오고 완성품을 다시 창고로 보낼 수 있는 효율적인 물류 시스템(통합 기능)을 갖추고 있습니다.
Dataproc 사용 예시
# gcloud CLI를 사용한 Dataproc 클러스터 생성 및 작업 제출
# 클러스터 생성
gcloud dataproc clusters create my-cluster \
--region=us-central1 \
--zone=us-central1-a \
--master-machine-type=n1-standard-4 \
--master-boot-disk-size=500 \
--num-workers=2 \
--worker-machine-type=n1-standard-4 \
--worker-boot-disk-size=500 \
--image-version=2.0-debian10
# PySpark 작업 제출
gcloud dataproc jobs submit pyspark gs://my-bucket/wordcount.py \
--region=us-central1 \
--cluster=my-cluster \
-- gs://my-bucket/input/ gs://my-bucket/output/
# Python API를 사용한 Dataproc 클러스터 생성 및 작업 제출
from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import \
cluster_controller_grpc_transport
from google.cloud.dataproc_v1.gapic.transports import \
job_controller_grpc_transport
# 클라이언트 생성
region = 'us-central1'
project_id = 'my-project-id'
cluster_client = dataproc_v1.ClusterControllerClient(
transport=cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address=f'{region}-dataproc.googleapis.com:443'
)
)
job_client = dataproc_v1.JobControllerClient(
transport=job_controller_grpc_transport.JobControllerGrpcTransport(
address=f'{region}-dataproc.googleapis.com:443'
)
)
# 클러스터 구성
cluster = {
'project_id': project_id,
'cluster_name': 'my-cluster',
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-4'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-4'
},
}
}
# 클러스터 생성
operation = cluster_client.create_cluster(
region, project_id, cluster
)
result = operation.result()
# PySpark 작업 제출
job = {
'placement': {
'cluster_name': 'my-cluster'
},
'pyspark_job': {
'main_python_file_uri': 'gs://my-bucket/wordcount.py',
'args': ['gs://my-bucket/input/', 'gs://my-bucket/output/']
}
}
result = job_client.submit_job(region, project_id, job)
PySpark 워드 카운트 예제
# wordcount.py - 간단한 PySpark 워드 카운트 예제
import sys
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder.appName("WordCount").getOrCreate()
# 입력 및 출력 경로 가져오기
input_path = sys.argv[1]
output_path = sys.argv[2]
# 텍스트 파일 읽기
lines = spark.read.text(input_path).rdd.map(lambda r: r[0])
# 단어 분리 및 카운트
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
# 결과를 DataFrame으로 변환
df = counts.toDF(['word', 'count'])
# 결과 저장
df.write.csv(output_path)
# SparkSession 종료
spark.stop()
Dataproc 모범 사례
- 작업 특성에 맞는 클러스터 크기 선택
- 메모리 집약적 작업에는 메모리 최적화 인스턴스 사용
- 컴퓨팅 집약적 작업에는 컴퓨팅 최적화 인스턴스 사용
- GCS를 HDFS 대신 사용하여 저장소와 컴퓨팅 분리
- 자동 스케일링 활용하여 비용 최적화
- 클러스터 자동 삭제 기능 활용하여 유휴 비용 방지
- 클러스터 초기화 스크립트를 사용하여 환경 사용자 지정
- 작업 종류에 따른 적절한 클러스터 구성
- 임시 작업: 작업별 클러스터 생성 및 삭제
- 반복적인 작업: 장기 실행 클러스터 + 스케줄링
- SparkSQL을 사용하여 BigQuery와 직접 통합
- Cloud Monitoring을 사용한 클러스터 모니터링
5.5 Dataflow
Dataflow는 구글 클라우드의 완전 관리형 데이터 처리 서비스로, 배치 및 스트리밍 데이터 처리 파이프라인을 개발하고 실행할 수 있습니다. Apache Beam을 기반으로 하며, 사용자는 인프라 관리 없이 데이터 변환과 처리에 집중할 수 있습니다.
주요 특징
- 서버리스 아키텍처
- 배치 및 스트리밍 처리 통합 모델
- 자동 확장 및 병렬 처리
- 지연 및 백 프레셔 처리
- 정확히 한 번 처리 보장
- 자동화된 최적화 및 튜닝
- 풍부한 변환 라이브러리
- 다양한 소스 및 싱크 커넥터
- 데이터 일관성 및 정확성 보장
사용 사례
- 스트리밍 ETL
- 실시간 데이터 처리
- 이벤트 기반 애플리케이션
- 실시간 머신러닝 추론
- 로그 처리 및 분석
- IoT 데이터 처리
- 실시간 사기 탐지
- 실시간 추천 시스템
🚰 비유: 지능형 수처리 시스템
Dataflow는 지능형 수처리 시스템과 같습니다. 이 시스템은 여러 소스(호수, 강, 빗물 등)에서 물(데이터)을 수집하고, 다양한 처리 장치를 통해 정화, 필터링, 화학적 처리(데이터 변환)를 수행합니다. 시스템은 계절적 물 공급량 변화(데이터 볼륨 변동)에 따라 자동으로 처리 용량을 조절하며, 천천히 도착하는 물(배치 처리)과 계속 흐르는 물(스트리밍 처리)을 동일한 정화 시설에서 처리할 수 있습니다. 첨단 측정 장비(모니터링)는 수질(데이터 품질)을 지속적으로 확인하며, 목적지(데이터 싱크)에 도달할 때까지 안전하게 물이 운반되도록 보장합니다. 시설 관리자(개발자)는 파이프와 밸브 조작(파이프라인 코드)에만 집중하고, 펌프와 모터 유지보수(인프라 관리)는 자동화되어 신경 쓸 필요가 없습니다.
Dataflow 사용 예시
# Python Apache Beam을 사용한 간단한 Dataflow 배치 처리 예제
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
# 파이프라인 옵션 설정
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project'
google_cloud_options.region = 'us-central1'
google_cloud_options.job_name = 'word-count-job'
google_cloud_options.staging_location = 'gs://my-bucket/staging'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# 워드 카운트 파이프라인 정의
def run_wordcount():
with beam.Pipeline(options=options) as p:
lines = p | 'ReadFromGCS' >> beam.io.ReadFromText('gs://my-bucket/input/*.txt')
counts = (
lines
| 'SplitWords' >> beam.FlatMap(lambda x: x.split())
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
)
counts | 'WriteToGCS' >> beam.io.WriteToText('gs://my-bucket/output/wordcount')
if __name__ == '__main__':
run_wordcount()
# Python Apache Beam을 사용한 스트리밍 데이터 처리 예제
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
import json
# 파이프라인 옵션 설정
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project'
google_cloud_options.region = 'us-central1'
google_cloud_options.job_name = 'streaming-data-processing'
google_cloud_options.staging_location = 'gs://my-bucket/staging'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
# 스트리밍 모드 설정
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
# JSON 파싱 및 변환 함수
def parse_json(message):
return json.loads(message)
def calculate_total(element):
data = element
data['total'] = data.get('price', 0) * data.get('quantity', 0)
return data
# 스트리밍 파이프라인 정의
def run_streaming_pipeline():
with beam.Pipeline(options=options) as p:
# Pub/Sub에서 메시지 읽기
messages = (
p | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/my-topic')
)
# JSON 파싱
parsed = messages | 'ParseJSON' >> beam.Map(parse_json)
# 비즈니스 로직 적용
processed = parsed | 'CalculateTotal' >> beam.Map(calculate_total)
# 결과를 BigQuery에 쓰기
processed | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'my-project:my_dataset.my_table',
schema='product_id:STRING, price:FLOAT, quantity:INTEGER, total:FLOAT, timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
if __name__ == '__main__':
run_streaming_pipeline()
Dataflow 모범 사례
- 파이프라인 설계 최적화
- 가능한 경우 병렬 처리 활용
- 데이터 셔플링 최소화
- 중간 결과 캐싱 고려
- 효율적인 스트리밍 처리
- 적절한 윈도우 크기 선택
- 지연 데이터 처리 전략 구현
- 워터마크 설정 최적화
- 비용 관리
- Flex RS(Resource Scheduling) 활용
- 작업 병렬화 수준 최적화
- 데이터 크기에 따른 머신 유형 선택
- 디버깅 및 모니터링
- Dataflow 모니터링 대시보드 활용
- Cloud Logging 연동
- 적절한 로깅 전략 수립
- 테스트
- 로컬 환경에서 DirectRunner로 먼저 테스트
- 단위 테스트 작성
- 점진적 배포 전략 사용
5.6 Data Fusion
Data Fusion은 구글 클라우드의 완전 관리형, 클라우드 기반 데이터 통합 서비스로, 코드 작성 없이(no-code) 데이터 파이프라인을 구축, 관리, 모니터링할 수 있는 시각적 인터페이스를 제공합니다. CDAP(Cask Data Application Platform)을 기반으로 합니다.
주요 특징
- 시각적 파이프라인 디자인
- 코드 없는(No-code) 개발 환경
- 200개 이상의 사전 구축된 커넥터 및 변환
- 하이브리드 및 멀티 클라우드 지원
- 데이터 계보 및 메타데이터 추적
- 다양한 데이터 소스 지원
- 클라우드 스토리지, 데이터베이스
- SaaS 애플리케이션, API
- 온프레미스 시스템
- 실시간 및 배치 처리 모두 지원
- 파이프라인 템플릿 및 재사용 가능한 컴포넌트
사용 사례
- 데이터 통합 및 ETL/ELT
- 데이터 마이그레이션
- 데이터 웨어하우스 로딩
- 데이터 레이크 구축
- 데이터 통합 표준화
- 자체 서비스 데이터 준비
- 데이터 품질 모니터링
- 실시간 데이터 처리
🧩 비유: 레고 블록 조립
Data Fusion은 레고 블록으로 복잡한 구조물을 만드는 것과 같습니다. 다양한 형태와 색상의 블록(사전 구축된 커넥터와 변환)을 선택하여 설계도(시각적 인터페이스)에 따라 조립하면 복잡한 구조물(데이터 파이프라인)이 완성됩니다. 프로그래밍 지식이 없어도 미리 설계된 블록을 연결하는 것만으로 다양한 구조물을 만들 수 있으며, 한 번 만든 부품(파이프라인 템플릿)은 여러 프로젝트에서 재사용할 수 있습니다. 레고는 다양한 테마 세트(다양한 데이터 소스)와 호환되므로, 어떤 환경의 블록이든 함께 사용할 수 있습니다. 또한, 조립 과정에서 어떤 블록이 어디에 사용되었는지 추적(데이터 계보)할 수 있어 나중에 문제가 발생하면 쉽게 수정할 수 있습니다.
Data Fusion 파이프라인 예시
Data Fusion은 주로 시각적 인터페이스를 통해 사용하지만, REST API를 통해 프로그래매틱하게 제어할 수도 있습니다. 아래는 Data Fusion의 시각적 인터페이스에서 구축할 수 있는 파이프라인의 개념적 단계를 설명합니다.
MySQL에서 BigQuery로 데이터 마이그레이션 파이프라인
- 소스 구성: MySQL 소스 커넥터 추가 및 구성
- MySQL 서버 연결 정보 설정
- 추출할 테이블 또는 SQL 쿼리 지정
- 증분 로드 설정 (타임스탬프 필드 기준)
- 변환 추가: 데이터 정제 및 변환
- JavaScript 또는 Python 변환기를 사용한 데이터 처리
- 필터 변환기로 불필요한 레코드 제거
- 데이터 품질 검사 추가
- 데이터 유형 변환 (필요한 경우)
- 싱크 구성: BigQuery 대상 설정
- BigQuery 프로젝트, 데이터셋, 테이블 정보 지정
- 쓰기 옵션 설정 (덮어쓰기 또는 추가)
- 스키마 자동 감지 또는 수동 정의
- 파이프라인 유효성 검사 및 배포: 파이프라인 구성 확인
- 유효성 검사 실행
- 파이프라인 이름 및 설명 추가
- 예약 일정 설정 (필요한 경우)
- 모니터링 및 로깅: 파이프라인 실행 모니터링
- 실행 로그 확인
- 성공/실패 알림 설정
- 실행 지표 모니터링
# Data Fusion REST API를 사용한 파이프라인 실행 예시 (Python)
import requests
import json
import google.auth
import google.auth.transport.requests
# 인증 획득
credentials, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
token = credentials.token
# Data Fusion 인스턴스 정보
project_id = 'my-project'
instance_name = 'my-datafusion'
region = 'us-central1'
namespace = 'default'
pipeline_name = 'mysql-to-bigquery'
# API 엔드포인트 구성
base_url = f'https://{region}-{project_id}.datafusion.googleusercontent.com/api/v3'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
# 파이프라인 실행
run_url = f'{base_url}/namespaces/{namespace}/apps/{pipeline_name}/runs'
response = requests.post(run_url, headers=headers)
if response.status_code == 200:
run_id = response.json().get('runid')
print(f'파이프라인 실행 시작, 실행 ID: {run_id}')
else:
print(f'파이프라인 실행 실패: {response.text}')
Data Fusion 모범 사례
- 파이프라인 설계
- 모듈식 접근 방식으로 재사용 가능한 파이프라인 구성
- 템플릿 활용하여 표준 파이프라인 패턴 구현
- 파이프라인 매개변수화를 통한 유연성 확보
- 데이터 품질 관리
- 데이터 유효성 검사 단계 포함
- 오류 처리 및 로깅 강화
- 데이터 스키마 유효성 검사
- 성능 최적화
- 인스턴스 크기를 워크로드에 맞게 조정
- 배치 크기 최적화
- 분할 처리 활용
- 보안 및 거버넌스
- 최소 권한 원칙 적용
- 데이터 마스킹 및 민감 정보 보호
- 메타데이터 관리 및 계보 추적
- 운영 관리
- CI/CD 파이프라인 통합
- 알림 및 모니터링 설정
- 정기적인 대기 파이프라인 테스트
6. 데이터 파이프라인 구축
6.1 엔드 투 엔드 파이프라인 설계
데이터 파이프라인은 데이터가 소스에서 목적지까지 이동하는 과정에서 필요한 모든 처리 단계를 자동화하는 시스템입니다. 효과적인 데이터 파이프라인은 일관된 데이터 흐름을 보장하고, 데이터 품질을 유지하며, 비즈니스 요구 사항을 충족합니다.
1. 데이터 수집
다양한 소스에서 데이터 추출
서비스: Cloud Storage, Pub/Sub, Data Fusion
2. 데이터 처리
변환, 정제, 집계, 보강
서비스: Dataflow, Dataproc, BigQuery
3. 데이터 저장
처리된 데이터를 목적지에 저장
서비스: BigQuery, Cloud Storage, Bigtable
4. 데이터 소비
분석, 시각화, 머신러닝
서비스: Looker, BigQuery ML, Vertex AI
🍽️ 비유: 레스토랑 주방
데이터 파이프라인은 고급 레스토랑의 주방 운영과 유사합니다. 식재료 구매(데이터 수집)부터 시작하여 식재료 손질 및 조리(데이터 처리), 요리 완성 및 플레이팅(데이터 저장), 고객에게 서빙(데이터 소비)에 이르는 일련의 과정이 조화롭게 이루어져야 합니다. 주방의 각 스테이션(파이프라인 단계)은 전문 쉐프(서비스)가 담당하며, 전체 과정은 헤드 쉐프(오케스트레이션 도구)의 감독 하에 진행됩니다. 식재료의 품질 관리(데이터 검증)와 요리 시간 조절(작업 스케줄링)은 맛있는 요리(고품질 데이터 제품)를 제공하기 위한 핵심 요소입니다.
6.2 배치 처리 파이프라인 구축
배치 처리 파이프라인은 정해진 주기(시간, 일, 주 단위)로 대량의 데이터를 처리하는 시스템입니다. 이러한 파이프라인은 대규모 데이터 변환, 집계, 정제 작업에 적합하며, 처리 시간보다 처리량이 중요한 경우에 활용됩니다.
(본 게시글은 genspark.ai의 슈퍼 에이전트를 사용하여 작성되었습니다. 크레딧 부족으로 마지막 일부가 짤렸습니다.)
'IT > DB' 카테고리의 다른 글
Supabase (0) | 2024.02.21 |
---|---|
Qdrant - Vector Database (0) | 2023.04.21 |
Milvus open source vector database (0) | 2023.04.17 |
Pinecon - Long-term Memory for AI (0) | 2023.04.17 |
DataHub: The Metadata Platform for the Modern Data Stack (0) | 2023.04.06 |
댓글