Oracle Exadata → Hadoop Hive & Google BigQuery 데이터 이관
Oracle Exadata의 대용량 데이터를 Hadoop 기반 Hive와 Google BigQuery로 이관하기 위한 아키텍처를 설계하고, ETL 개발 방법을 살표봅니다. 배치(Batch)와 실시간(Real-time) 파이프라인을 모두 고려하여 매일 새벽 전체 데이터 배치 이관과 낮 시간 10분 주기의 증분 데이터 실시간 이관 요구사항을 충족합니다. 설계에는 성능 확장성, 데이터 변환/정제, 보안, 사용 기술 스택, 데이터 마트 구축 전략, BI/AI 활용 방안, 그리고 예제 코드를 포함합니다.
📌 사용된 기술 스택 목록
1️⃣ 데이터 추출 (ETL - Batch & Streaming)
툴/기술 | 설명 |
---|---|
Apache Sqoop | - Oracle Exadata에서 데이터를 HDFS 및 Hive로 대량 이동하는 배치(ETL) 도구 - JDBC 기반 RDBMS 데이터 추출을 병렬(MapReduce) 방식으로 수행 |
Apache NiFi | - 실시간 데이터 스트리밍 파이프라인 관리 도구 (Drag & Drop GUI 기반) - Oracle DB에서 증분 데이터를 가져와 Hive, BigQuery, Kafka로 전달 |
Apache Kafka | - 대량의 실시간 데이터를 전송하는 메시지 브로커 - Oracle 변경 데이터(CDC)를 Kafka Topic으로 전달 후 여러 시스템(Hive, BigQuery)으로 소비 가능 |
Kafka Connect | - Kafka와 외부 시스템(Oracle, HDFS, BigQuery 등)을 연계하는 커넥터 프레임워크 - Kafka Connect JDBC Source: Oracle → Kafka - Kafka Connect HDFS Sink: Kafka → Hive - Kafka Connect BigQuery Sink: Kafka → BigQuery |
2️⃣ 데이터 저장 및 변환
툴/기술 | 설명 |
---|---|
Apache Hive | - Hadoop 기반 데이터 웨어하우스 역할 - Sqoop 및 Kafka를 통해 Oracle 데이터를 Hive 테이블로 변환/저장 - ORC/Parquet 포맷으로 최적화 |
Google BigQuery | - 클라우드 기반의 서버리스 데이터 웨어하우스 - GCS, Kafka 또는 NiFi를 통해 데이터를 적재하여 BI 및 AI 분석에 활용 - Streaming API 또는 Batch Load 방식 지원 |
Google Cloud Storage (GCS) |
- BigQuery로 데이터를 이동할 때 중간 저장소 역할 - HDFS에서 추출된 데이터를 CSV/Avro/Parquet 포맷으로 GCS에 업로드 후 BigQuery로 로드 |
3️⃣ 데이터 처리 및 오케스트레이션
툴/기술 | 설명 |
---|---|
Apache Airflow | - 배치 워크플로우 스케줄링 및 자동화 도구 - Sqoop 실행, HDFS → BigQuery 로드, NiFi/Kafka 제어 등 ETL 프로세스 관리 |
Google Dataflow (Apache Beam 기반) |
- 실시간 스트리밍 데이터 변환 및 배치 데이터 처리 - Kafka 또는 GCS에서 데이터를 읽어 BigQuery로 적재할 때 사용 가능 |
Apache Spark | - 대용량 데이터 ETL 및 분석을 위한 분산처리 엔진 - Hive 및 BigQuery에 저장된 데이터를 변환/가공 |
4️⃣ 인증 및 보안
툴/기술 | 설명 |
---|---|
Key-Based Authentication (SSH, OAuth2, API Keys) |
- Oracle, Hive, BigQuery 등의 접근을 키 기반 인증으로 설정 |
TLS/SSL Encryption | - Kafka, NiFi, Sqoop, BigQuery API 호출 시 TLS(SSL) 암호화 적용 |
Google IAM (Identity and Access Management) |
- BigQuery 및 GCS 접근 제어를 위해 IAM 정책 및 서비스 계정 인증 사용 |
HashiCorp Vault (선택적 적용 가능) | - API 키, DB 접속 정보 등을 안전하게 저장하고 관리하는 비밀 관리 솔루션 |
5️⃣ 데이터 분석 및 활용 (BI & AI)
툴/기술 | 설명 |
---|---|
Google BigQuery ML | - SQL 기반으로 머신러닝 모델을 BigQuery 내에서 직접 학습 및 예측 |
Apache Superset | - Hive 및 BigQuery를 시각화할 수 있는 BI 대시보드 오픈소스 도구 |
Tableau / Looker / Power BI | - BigQuery 또는 Hive 데이터를 시각화하여 비즈니스 분석(BI) 대시보드 구축 |
TensorFlow / PyTorch (AI 모델링) | - Hive 및 BigQuery의 데이터를 머신러닝/딥러닝 모델 훈련 데이터로 활용 |
✅ 전체 아키텍처에서 사용된 오픈소스 기술 요약
분야 | 사용된 도구/기술 |
---|---|
데이터 추출 (ETL - 배치 & 실시간) | Apache Sqoop, Apache NiFi, Apache Kafka, Kafka Connect |
데이터 저장 및 변환 | Apache Hive, Google BigQuery, Google Cloud Storage |
데이터 처리 및 오케스트레이션 | Apache Airflow, Apache Spark, Google Dataflow (Beam) |
보안 및 인증 | Key-Based Authentication, TLS/SSL Encryption, Google IAM, HashiCorp Vault |
데이터 분석 및 BI/AI | Google BigQuery ML, Apache Superset, Tableau, Looker, Power BI, TensorFlow, PyTorch |
주요 요구사항 정리
- 데이터 적재 주기:
- 배치 적재 – 매일 01:00 ~ 05:00 사이 전체 데이터를 대상 시스템으로 이관 (Full Load).
- 실시간 적재 – 업무 시간 중 10분마다 신규/변경 데이터 증분 적재.
- 성능 및 확장성:
- 배치 이관 작업은 05:00 이전 완료 보장 (4시간 내 완료).
- 실시간 적재 및 배치 적재 처리 속도를 조절할 수 있는 설정 필요 (확장성 및 유연성).
- 데이터 변환 및 정제:
- Hive 및 BigQuery 스키마에 맞도록 데이터 형식 변환 및 정제 적용.
- 원본(Oracle) 스키마를 대상(Hive/BigQuery)로 자동 변환, 컬럼 데이터 타입 매핑 가이드 제공.
- 보안 및 인증:
- 키 기반 인증 적용 (비밀번호 대신 키/토큰을 활용한 인증).
- 데이터 전송 채널 암호화 등 보안 고려.
- 도구 및 기술 스택:
- Apache Sqoop, NiFi, Kafka, Airflow 등 오픈소스 기반 최적 데이터 이관 도구 선정 및 조합.
- Hive 및 BigQuery에 데이터 마트 구축을 위한 테이블 설계 전략.
- BI 및 AI 연계:
- 이관된 데이터를 활용해 분석용 데이터 마트를 구성.
- BI 도구 및 AI/ML 모델과 연계하는 방안.
- 샘플 코드 제공:
- Sqoop, NiFi, Kafka, Airflow 등을 활용한 ETL 구현 예제 코드.
이 가이드는 상기 요구사항을 하나씩 해결하는 방식으로 구성되어 있습니다.
아키텍처 설계 (Architecture Design)
전체 데이터 파이프라인은 배치 ETL 흐름과 실시간 스트리밍 흐름 두 가지로 나뉩니다. 아래에 각 흐름의 아키텍처와 구성 요소를 설명합니다.
1. 배치 ETL 아키텍처 (매일 전체 데이터 이관)
매일 새벽 1시에 Oracle Exadata에서 Hive와 BigQuery로 전체 데이터를 이관하는 배치 작업이 시작됩니다. Apache Airflow 등의 워크플로우 스케줄러를 사용하여 순차 작업을 자동화합니다. 주요 단계:
- 데이터 추출 (Extract): Apache Sqoop을 사용하여 Oracle Exadata에서 대량의 데이터를 Hadoop HDFS로 내보냅니다. Sqoop은 Oracle 등 RDBMS의 전체 테이블이나 스키마를 HDFS로 직접 가져올 수 있고, Hive 테이블을 자동 생성하며 메타데이터까지 적재할 수 있습니다. 예를 들어 Sqoop을 통해 Oracle의 테이블들을 한 번에 내보내는 import-all-tables도 가능합니다. 대용량에 대비해 병렬 매퍼(Task) 개수를 조절하여 가져오며, 분할 키(--split-by)를 지정해 병렬 추출 시 레코드 균등 분배를 합니다.
- 데이터 적재 - Hadoop/Hive: Sqoop --hive-import 옵션을 사용하여 HDFS에 적재된 파일로부터 Hive 테이블을 생성합니다. 이 과정에서 Sqoop이 Oracle 스키마를 분석하여 Hive에 대응하는 데이터 타입으로 매핑하고 테이블을 만들어줍니다. 다만, Sqoop 기본 매핑은 일부 Oracle 데이터 타입을 Hive에서 STRING이나 DOUBLE로 뭉뚱그려 생성할 수 있으므로, 필요한 경우 Hive에서 테이블 스키마를 수정 또는 재생성하여 적절한 타입 (INT, DATE, DECIMAL 등)으로 맞춥니다. 대안으로, Sqoop은 일단 HDFS에 데이터 파일만 적재하고 Hive 외부 테이블을 수동 생성하는 방법도 있습니다. Hive에 적재된 데이터는 분석용 원본 테이블(Staging)로 활용됩니다.
- 데이터 적재 - BigQuery: Oracle에서 추출된 데이터를 Google BigQuery로 로드합니다. 오픈소스만으로 직접 Oracle→BigQuery 쓰기는 제한적이므로, 중간 파일 또는 **GCS(Cloud Storage)**를 활용합니다. 즉, Sqoop이 추출한 HDFS의 파일을 GS bucket으로 옮긴 후 BigQuery에서 외부 데이터 소스로 가져오거나, BigQuery 로드 API를 호출합니다. 이 과정도 Airflow DAG 내에서 bash 또는 Python 작업으로 구현 가능합니다. 예를 들어 Airflow에서 bash_operator를 사용해 hdfs dfs -copyToLocal로 HDFS파일을 가져오고 gsutil cp로 GCS에 업로드한 뒤 bq load 커맨드를 실행하는 식입니다. BigQuery 로드시 AVRO 또는 Parquet 포맷을 사용하면 스키마가 자동 인식되어 편리합니다. Stack Overflow 조언: 한 번에 대량 이전 시 CSV/Avro로 Cloud Storage에 덤프 후 BigQuery로 import하는 스크립트를 작성하고, 오케스트레이션 도구(예: Cloud Composer=Airflow)를 이용하면 효율적입니다.
- 일정 제어 및 모니터링: Airflow는 DAG으로 배치 작업들의 **의존성 및 일정(예: 매일 01시)**을 관리합니다. sqoop_import → bq_load 순서로 태스크를 구성하고, 전체 작업이 05시 전에 끝날 수 있도록 병렬도와 리소스를 조정합니다. 만약 데이터량 증가로 4시간 내 완료가 어려워지면, Sqoop 병렬 mapper 수 증가, Hadoop 클러스터 노드 증설, 또는 데이터 범위 분할(예: 날짜별로 분산 처리) 등의 성능 튜닝을 적용합니다. BigQuery 로드의 경우 대용량도 분산 로딩되므로 비교적 빠르나, 필요 시 테이블 파티셔닝으로 부분 로드/병렬 로드를 고려합니다.
2. 실시간 증분 처리 아키텍처 (10분 주기)
업무 시간 동안 10분 간격으로 발생하는 신규 및 변경 데이터를 소스에서 대상들로 지속 반영합니다. 이를 위해 Change Data Capture(CDC) 또는 마이크로배치 기법을 사용합니다. 주요 구성 요소:
- CDC/스트리밍 소스: Oracle의 변경분을 캡처하기 위해 오픈소스 Kafka Connect의 JDBC 소스나 Apache NiFi의 Query processor를 활용합니다. 예를 들어 NiFi의 QueryDatabaseTable 프로세서는 마지막 실행 이후 추가된 레코드만 추출할 수 있으며, 10분 간격으로 실행 스케줄을 지정할 수 있습니다. Oracle 테이블의 TIMESTAMP 컬럼이나 증가하는 시퀀스 ID를 기준으로 delta 쿼리를 수행하여 증분 데이터 세트를 가져옵니다. NiFi는 GUI에서 손쉽게 DB 접속 설정과 쿼리 주기를 설정할 수 있어 데이터 추출에 용이합니다. 대안으로 Debezium(Kafka Connect CDC) + Apache Kafka를 사용하면 트랜잭션 로그 기반 CDC 스트림을 구축할 수 있지만, Oracle의 경우 XStream 등 추가 설정이 필요합니다. 간단한 구현을 위해 여기서는 폴링(polling) 방식을 추천합니다.
- 실시간 파이프라인: 추출된 증분 데이터는 동시에 두 갈래로 전송됩니다. (a) Hadoop/Hive 쪽으로 전송하여 Hive 테이블에 MERGE 또는 APPEND, (b) BigQuery로 스트리밍 또는 배치 삽입.
- NiFi 기반 흐름: NiFi 내부에서 분기( 분할 Flow)하여 한쪽은 PutHDFS 또는 PutHiveStreaming 프로세서로 HDFS에 파일을 적재하고 Hive 테이블에 반영합니다. 다른 한쪽은 BigQuery에 REST API 호출(InvokeHTTP로 BigQuery streaming insert)하거나, BigQuery JDBC 드라이버를 통해 PutDatabaseRecord로 삽입하는 방안을 쓸 수 있습니다. NiFi는 다양한 시스템 간 실시간 데이터 통합에 적합하며, 시각적 UI로 흐름을 구성해 데이터 처리를 손쉽게 관리할 수 있습니다.
- Kafka 기반 흐름: 변화 데이터를 Kafka 토픽에 전달하여 비동기 처리합니다. Kafka는 높은 처리량과 내구성을 지닌 실시간 분산 스트리밍 플랫폼으로, 실시간 데이터 파이프라인 구축에 널리 사용됩니다. 예를 들어 Oracle DB에서 증분 데이터를 읽어 oracle.cdc 토픽에 생산(Producer)하고, 두 개의 소비자(Consumer) 애플리케이션을 둡니다. 하나는 Hadoop 영역의 Kafka HDFS 커넥터를 통해 Hive 테이블에 데이터 적재, 또 하나는 Kafka → BigQuery Sink를 통해 BigQuery에 지속 삽입합니다. (Confluent Kafka 커넥터를 사용하면 BigQuery에 실시간 Sink가 가능하며, 오픈소스 구현으로는 Kafka Connect + Google Pub/Sub + Dataflow 조합 등이 있습니다.) Kafka를 도입하면 소스와 대상의 분리(Decoupling) 및 버퍼링으로 일시적 속도 차이를 흡수할 수 있고, 다수 구독자가 실시간 데이터를 소비할 수 있습니다.
- 속도 제어와 확장성: 10분마다 실행되는 증분 적재는 부하에 따라 조절 가능해야 합니다. NiFi의 경우 Run Schedule이나 Max Batch Size 등을 조정하여 한번에 처리하는 레코드 수나 주기를 동적으로 변경할 수 있습니다. Kafka 파이프라인의 경우 토픽 파티션 수와 컨슈머 스레드 풀을 확장하여 처리량을 높이거나, 스로틀(throttle) 설정으로 소비 속도를 제한할 수 있습니다. 또한 실시간 파이프라인은 수평 확장이 가능하도록 NiFi Cluster 모드 활용 또는 Kafka 브로커 클러스터를 구축해 처리량 증가에도 대응합니다.
- 데이터 적재: Hive 측에서는 증분 도착 데이터를 기존 Hive 테이블에 병합(MERGE)하거나 파티션 단위로 Append 후 최신 상태를 조회할 수 있습니다. BigQuery는 스트리밍 API로 바로 삽입하면 수초 내 조회 가능하며, 10분 주기 배치라면 데이터 수신량이 크지 않아 부담이 적습니다. BigQuery로의 실시간 삽입은 GCP 서비스 계정 키 인증을 통해 안전하게 호출하며 (이 부분은 아래 보안 섹션 참고), 혹시 일괄 적재로 전환한다면 10분마다 작은 CSV를 GCS에 올리고 bq load ... append를 자동화하는 방법도 고려합니다.
3. 전체 아키텍처 개요
종합하면, Oracle Exadata (온프레미스)의 데이터를 Hadoop/Hive 데이터 레이크와 Google BigQuery 데이터 웨어하우스 두 곳으로 동시 이관하여 이중의 분석 기반을 구축합니다. 배치 파이프라인은 정규화된 전체 데이터를 매일 동기화하고, 실시간 파이프라인은 변경사항을 짧은 지연으로 수시 반영함으로써 두 대상의 데이터가 최신으로 유지됩니다.
- 오케스트레이션: Airflow는 배치 작업 (Sqoop → Hive/BigQuery 로드)을 스케줄링하고 모니터링합니다. 또한 필요시 Airflow가 NiFi나 Kafka 작업의 모니터링/관리 역할을 담당할 수도 있습니다 (예: Airflow Sensor로 NiFi 흐름 완료 여부 체크).
- 데이터 저장 및 활용: Hive는 주로 데이터 레이크 역할로 원본 원장을 저장하거나 정제/가공 테이블(Parquet/ORC 포맷 등)로 유지하고, BigQuery는 쿼리 성능이 우수한 분석 DB로서 BI 리포팅이나 애드혹 질의를 담당합니다. 두 시스템 모두 동일 원본 데이터를 가지므로 상호 검증이나 백업 역할도 기대할 수 있습니다.
- 요약 그림: (텍스트로 설명) Oracle Exadata에서 → (배치: Sqoop via Airflow) → Hadoop HDFS/Hive; Oracle Exadata에서 → (실시간: NiFi/Kafka pipeline) → Hive 및 BigQuery; 또한 Hive와 BigQuery에 쌓인 데이터로부터 → BI Dashboard 및 AI Modeling 시스템에 연계.
데이터 변환 및 정제 가이드 (Data Transformation & Cleansing)
Oracle의 데이터는 Hive와 BigQuery의 스키마 규칙에 맞게 변환합니다. 컬럼 데이터 타입 매핑 및 데이터 정제의 주요 사항은 다음과 같습니다:
- 데이터 타입 매핑 (Oracle→Hive→BigQuery):
Oracle과 Hive/BigQuery 간에 지원 타입이 상이하므로 변환 룰을 정의합니다. 일반적으로 Hive의 데이터 타입 대부분을 BigQuery 타입으로 매핑 가능하며, 일부 예외(MAP, UNION 등 복합 타입)는 BigQuery에 직접 대응 타입이 없어 구조를 변경해야 합니다. - Oracle 원본 타입별 권장 매핑 예:
-
Oracle 타입 Hive 타입 BigQuery 타입 NUMBER(p,s) (정수 범위) INT 또는 BIGINT (자리수에 따라) INT64 NUMBER(p,s) (소수 포함) DECIMAL(p,s) NUMERIC (또는 BIGNUMERIC, 정밀도에 따라) FLOAT / BINARY_FLOAT FLOAT / DOUBLE FLOAT64 VARCHAR2(size) / CHAR STRING STRING CLOB STRING (또는 Hive TEXTFILE) STRING (최대 1MB) RAW/BLOB (이진 데이터) BINARY BYTES DATE (시간 없는 날짜) DATE DATE TIMESTAMP (날짜와 시간) TIMESTAMP DATETIME 또는 TIMESTAMP TIMESTAMP WITH TIME ZONE 지원 제한 (표준화 필요) TIMESTAMP (UTC로 변환 권장) INTERVAL, YEAR TO MONTH 등 텍스트 변환 (Hive에서 지원 제한) 텍스트 또는 분리 컬럼 - 스키마 자동 변환: Sqoop을 통한 Hive import 시 기본적으로 Oracle 스키마를 분석해 Hive DDL을 생성하지만, 앞서 언급했듯 모든 타입을 완벽 매핑하지는 못할 수 있음. 예를 들어 NUMBER 타입이 자리수에 상관없이 DOUBLE로 생성되거나, DATE가 STRING으로 처리되는 사례가 있습니다. 이를 보완하기 위해:
- 사전 스키마 정의: 중요 테이블은 Oracle의 ALL_TAB_COLUMNS 메타데이터를 참고해 Hive/BigQuery용 DDL을 수동 작성합니다. 자동화를 원하면 스크립트를 작성해 Oracle 타입→Hive/BigQuery 타입 매핑 룰을 적용한 DDL 생성기를 만들 수 있습니다.
- Sqoop 매핑 옵션: Sqoop --map-column-hive 또는 --map-column-java 파라미터를 사용하여 특정 컬럼 타입을 수동 지정 가능합니다. 대량 테이블에는 비효율이므로, 가능하면 DDL 생성 접근이 권장됩니다.
- Hive-BigQuery 연결 도구 활용: Google에서는 Hive와 BigQuery를 연결하는 hive-bigquery-connector 등을 제공하여 Hive 쿼리로 BigQuery에 쓰거나 읽는 기능이 있습니다. 이 경우 Hive 테이블을 통해 BigQuery에 데이터를 쓸 때 자동으로 타입 변환이 이뤄질 수 있으나, 해당 커넥터 적용은 추가 검토가 필요합니다.
- 데이터 정제 (Cleansing):
Oracle에서 추출된 데이터가 목적지 포맷 제약을 위반하지 않도록 클렌징을 수행합니다. 예를 들어, Hive의 TEXTFILE 저장시 컬럼 구분자나 줄바꿈 문자가 데이터에 있으면 파싱 문제가 생길 수 있으므로 Sqoop --hive-drop-import-delims 옵션으로 임베디드 개행 등을 제거합니다 (위 옵션은 임베디드 newline/tab을 공백으로 치환). BigQuery의 JSON 로드나 CSV 로드시에도 비정형 문자는 에러를 유발할 수 있으므로, 미리 특수문자를 이스케이프하거나 제거합니다. 또한 널(NUL) 값이나 Oracle에서의 NULL 표기는 Hive와 BigQuery에서 NULL로 일관되게 다룰 수 있도록 Sqoop 옵션 --null-string '\\N' --null-non-string '\\N' 등을 지정하여 처리했습니다. - 포맷 및 파티셔닝: Hive에 적재된 데이터는 최적 성능을 위해 ORC/Parquet 같은 컬럼형 포맷으로 변환하고, 분할(Partitioning) 전략을 적용합니다. 일반적으로 날짜(예: LOAD_DATE 또는 업무일자) 컬럼으로 Hive 파티션을 생성하여 향후 쿼리 성능을 높이고, 매일 증분 적재도 Partition append로 처리하도록 설계합니다. BigQuery에서도 파트션을 활용하는 것이 중요합니다 – 대용량 테이블은 날짜/시간 기반 파티션을 걸어두면 쿼리 시 스캔 범위를 줄여 비용과 시간을 절감합니다. 또한 BigQuery는 클러스터 키를 지정해 자주 필터링되는 컬럼 단위로 Clustering을 적용하면 좋습니다. 예를 들어, Hive와 BigQuery 모두 DATE 컬럼으로 분할/클러스터링하여 서로 유사한 구조로 관리하면 데이터 품질 검증에도 도움이 됩니다.
- 데이터 무결성 검증: 변환 후 Hive와 BigQuery에 적재된 데이터가 원본과 행/컬럼 일치하는지 검증 프로세스도 권장됩니다. 해시값 비교나 샘플링 조회로 Oracle↔Hive↔BigQuery 간 랜덤 레코드를 대조하는 절차를 주기적으로 수행하여 이관 신뢰성을 확보합니다.
보안 및 인증 (Security & Authentication)
데이터 이관 작업 전반에 보안을 적용하여 민감정보 보호와 안전한 전송을 보장합니다:
- 키 기반 인증 (Key-Based Authentication): 비밀번호 대신 공개키/비밀키 쌍을 이용한 인증을 사용합니다. 예를 들어, Airflow나 Sqoop 작업이 Oracle DB에 접속할 때 DB 인증 정보를 평문으로 두지 않고, Oracle Wallet이나 암호화된 키 저장소를 사용해 자격증명을 관리합니다. Hadoop/Hive 클러스터 접근도 SSH 공개키 인증을 활용하여 접근하고, UN*X 계정에 키를 배포함으로써 패스워드 노출을 방지합니다. BigQuery API 접근은 **GCP 서비스 계정 키 파일(JSON)**을 발급받아 활용합니다. Airflow의 Connection에 해당 키 경로를 설정하거나, NiFi의 Google Cloud Processor에 서비스 계정 키를 등록하여 OAuth2 토큰 획득을 자동화합니다. 이렇게 하면 사람의 ID/Password 없이 시스템 간 안전한 인증이 이뤄집니다.
- 암호화: 데이터 전송 시 TLS/SSL 암호화 채널을 사용합니다. Oracle에서 Sqoop으로 JDBC 통신 시 jdbc:oracle:thin:@//host:port/SID URL에 TCPS(SSL) 프로토콜을 쓸 수 있게 DB 측 설정을 하거나, NiFi에서 JDBC 연결 시 SSL Context Service를 구성합니다. Kafka를 사용하는 경우 브로커와 프로듀서/컨슈머 간 통신에 SSL 암호화를 설정하고, SASL_SSL로 인증까지 겸하는 방식을 고려합니다. 또한 Kafka 토픽에 들어가는 데이터는 필요 시 필드 단위 암호화 또는 마스킹을 적용해 민감정보를 보호합니다.
- 접근제어 및 감사: 각 구성요소별로 최소 권한의 원칙을 적용합니다. Oracle에는 데이터 extraction 전용 읽기 계정만 부여하고, Hive에는 ETL 프로세스 전용 계정으로 HDFS/Hive 읽기쓰기 권한을 줍니다. BigQuery도 서비스 계정의 IAM 권한을 최소화하여 (예: 해당 데이터셋에만 BigQuery Data Editor 권한) 운영합니다. Airflow나 NiFi 등의 관리 콘솔 접근 역시 LDAP 연동이나 인증을 걸어 임의 조작을 막습니다. 모든 데이터 이동 활동(Oracle에서 추출한 쿼리, Hive 적재 내역, BigQuery 로드 로그 등)은 로그로 남겨 추적 가능하게 합니다.
- 키 관리: 사용되는 모든 키(SSH key, GCP service account key 등)는 중앙 관리하고 주기적 rotation 정책을 세웁니다. 예를 들어 서비스 계정 키는 90일마다 재발급하고, 이전 키는 폐기합니다. SSH 키도 정기 교체하거나 사용자별로 구분하여 추적성을 높입니다. 중요한 키는 HashiCorp Vault 같은 비밀관리 시스템이나 클라우드 KMS에 저장하고, 운영자는 직접 키 내용을 볼 수 없게 통제합니다.
요약하면, 안전한 인증과 암호화를 통해 데이터가 이관되는 모든 경로에서 보안을 유지하고, 역할별 최소 권한으로 접근을 제한하여 데이터 보호와 규정 준수를 달성합니다.
도구 및 기술 선정 (Tools & Technologies)
이관 솔루션에는 오픈소스 기반 데이터 파이프라인 도구들을 조합하여 활용합니다. 각 도구의 역할과 적용 방안을 정리하면 다음과 같습니다:
- Apache Sqoop – 배치 데이터 이관의 핵심 도구입니다. Sqoop은 SQL-to-Hadoop의 줄임말로, Oracle 등 RDBMS에서 HDFS/Hive로 대용량 데이터 이송에 특화되어 있습니다. JDBC로 Oracle에 연결하여 MapReduce 잡으로 병렬 덤프를 수행하므로 대량 데이터도 효율적으로 처리합니다. 또한 Hive 연동 기능으로 테이블 생성까지 자동화해 주므로 스키마 이관에 유용합니다. 이번 아키텍처에서는 매일 전체 데이터 배치 적재에 Sqoop을 사용하며, 필요 시 증분 모드(--incremental)로 특정 테이블의 변경분만 누적할 수도 있습니다.
- Apache NiFi – 실시간/마이크로 배치 데이터 통합 도구입니다. NiFi는 웹 기반 UI에서 Processor들을 드래그앤드롭으로 연결하여 데이터 흐름을 디자인할 수 있는 강력한 데이터 통합 플랫폼입니다. 데이터 마이그레이션, 처리, 통합, 실시간 스트리밍 등 폭넓은 use case를 지원하며, 조작이 쉬워 개발 생산성이 높습니다. 이관 시나리오에서 NiFi는 Oracle CDC 추출, HDFS/Hive 적재, BigQuery API 호출 등을 하나의 흐름에서 처리하도록 구성할 수 있습니다. 또한 NiFi 자체에 스케줄러 기능이 있어 10분마다 Flow를 트리거하거나, 특정 시간대(예: 01~05시에는 정지) 운영하는 것도 설정 가능합니다. NiFi의 Back Pressure, Buffering 기능으로 일시적인 대상 지연에도 소스데이터를 안전히 큐잉할 수 있어 신뢰성 있는 실시간 이관을 돕습니다.
- Apache Kafka – 분산 실시간 스트리밍 플랫폼으로, 이번 아키텍처에서는 이벤트 버스 역할을 합니다. Kafka는 높은 처리량과 확장성을 지닌 Publish-Subscribe 메시징 시스템으로, 실시간 데이터 파이프라인과 스트리밍 애플리케이션 구축에 널리 사용됩니다. Oracle 변경 이벤트를 Kafka 토픽에 담아두면 Hive 적재, BigQuery 적재, 기타 실시간 소비(예: 알람 시스템) 등 여러 구독자가 비동기로 데이터를 소비할 수 있어 유연성이 커집니다. 특히 기업 내 다른 시스템에도 이 변경 데이터를 전달해야 한다면 Kafka를 중추로 활용하는 것이 효과적입니다. Kafka Connect 프레임워크를 사용하면 별도 코드 작성 없이 JDBC 소스 커넥터로 Oracle에서 토픽으로, HDFS Sink 커넥터로 Hive로, BigQuery Sink 커넥터로 BigQuery에 연결할 수 있어 재사용성과 표준화에 이점이 있습니다. 단, Kafka 인프라 운영에 대한 부담(브로커 관리 등)을 고려해야 하므로, 데이터 발생량이 많고 낮은 지연이 필수적일 때 Kafka 사용을 검토합니다.
- Apache Airflow – 워크플로우 오케스트레이션 & 스케줄러로서 배치 잡을 관리합니다. Airflow는 파이썬 DAG로 복잡한 ETL 흐름을 정의하고, 스케줄링/모니터링 하는 도구입니다. 본 아키텍처에서 Airflow는 주로 Sqoop 배치 작업과 BigQuery 로드 작업을 순서대로 실행하고, 필요시 NiFi나 Kafka 연결부도 제어합니다. 예를 들어 NiFi로 구축한 10분 주기 파이프라인이 있더라도, Airflow에서 데일리 완료 시그널을 받아 NiFi 파이프라인 일시 중지/재개하는 식의 제어가 가능합니다. (NiFi Rest API로 그룹을 stop/start 가능). 하지만 일반적으로는 배치와 스트리밍을 분리 운용하므로, Airflow는 배치에 집중합니다. Airflow의 강점은 의존 관계 관리와 리트라이/얼럿입니다. 만약 Sqoop로 Oracle→Hive 작업 실패 시 자동 재시도하거나, 문제 발생 시 알림을 보내는 로직을 넣어 운영 편의성을 높입니다. GCP 환경이라면 Cloud Composer로 Airflow를 호스팅해 관리부담을 낮출 수도 있지만, 오픈소스 구축도 무난합니다.
- 데이터베이스 및 스토리지: Oracle Exadata(소스), HDFS/Hive(목적지1), BigQuery(목적지2)가 핵심 저장소입니다. Hive는 Hadoop 클러스터 상에 동작하며, DataNode 용량과 Hive 메타스토어를 설계 용량에 맞게 튜닝합니다 (예: 테이블 파티션 수가 많아지면 메타스토어 DB 성능 고려). BigQuery는 완전관리형이므로 테이블 분할, 클러스터링, 클라우드 리소스 (슬롯) 확보 등을 통해 대용량 쿼리에도 성능을 보장하도록 설정합니다. 또한 GCS는 BigQuery 로드시 중간 버퍼로 사용되며, 필요시 영구 Archive로 Oracle 원본 데이터를 CSV/Avro로 보관해둘 수 있습니다.
- 기타 도구: 필요에 따라 데이터 품질이나 에러 처리에 Apache Spark나 HiveQL 배치 쿼리를 활용할 수 있습니다. 예를 들어 Hive에 적재된 원시 데이터를 가공하여 분석마트에 넣는 작업을 Spark로 수행하고, Airflow DAG에 연결합니다. 또한 초기 스키마 마이그레이션 작업에는 Schemacrawler나 Oracle SQL Developer의 DDL export 기능을 활용해 Oracle의 DDL을 추출하고 수정하여 Hive/BigQuery DDL로 변환할 수도 있습니다. (AWS SCT 등의 툴은 Hive->Redshift 등 용이지만 Hive/BigQuery 간도 일부 도움될 수 있음).
각 도구의 장단점을 고려해 배치에는 Sqoop+Airflow, 실시간에는 NiFi(또는 Kafka)+Airflow 조합으로 구현하고, 둘 다 Hive와 BigQuery를 대상으로 데이터를 넣는 구조를 취했습니다. 이런 멀티도구 파이프라인은 초기 구성은 복잡하지만 각 부분이 역할에 최적화되어 전체 성능과 유연성이 높습니다.
Hive 및 BigQuery 데이터마트 설계 (Data Mart Design Strategy)
이관된 데이터를 효과적으로 분석하기 위해 데이터 마트를 구축합니다. Hive와 BigQuery 모두에 비즈니스 분석에 최적화된 테이블 구조를 설계하여 BI/AI 활용을 용이하게 합니다:
- 원시 존(Raw zone) vs 정제 존(Curated zone): 먼저 Hive와 BigQuery 내에 계층별 스키마를 나눕니다. 스테이징 테이블은 Oracle에서 옮겨온 원본 형태를 유지한 테이블들입니다. Hive에서는 External Table로 원본 CSV/Avro/Parquet 파일을 바로 매핑하거나, Internal 테이블에 로드합니다. BigQuery에서는 스테이징 데이터셋(e.g., staging.oracle_tables)을 만들어 Oracle 테이블들과 동일한 구조의 테이블을 둡니다. 그 다음, 정제/통합 테이블을 별도로 생성합니다. 예를 들어, 필요한 경우 정규화된 여러 테이블을 조인해 **데이터 마트용 폭넓은 테이블(Wide Table)**을 만들거나, 스타 스키마(Star Schema) 모델로 팩트(Fact)와 차원(Dimension) 테이블을 구성합니다. 이 정제존이 BI/AI의 주된 데이터 소스로 활용됩니다. Oracle에 존재하지 않던 파티션 컬럼이나 결합 컬럼도 이 단계에서 추가 가능합니다.
- Hive 테이블 설계 전략: Hive에서는 성능을 위해 Partition과 File Format을 신경써서 설계합니다. 일례로, 날짜별 누적되는 Fact 테이블은 업무일자로 파티션을 걸고 ORC 포맷으로 저장하여 Hive 엔진(Tez, Spark SQL 등)의 쿼리속도를 높입니다. 또한 Hive는 인덱스가 제한적이므로 자주 사용하는 조인 키나 필터 컬럼은 파티셔닝 또는 **클러스터링(Clustering / Bucketing)**으로 대비합니다. 데이터 마트의 Hive 테이블을 생성할 때, Partition 컬럼 이외에도 클러스터 키를 지정하면 동일 파티션 내 데이터도 정렬되어 저장되어 Hive on Tez 조인 성능이 향상됩니다. (예: 고객 행동 로그 테이블을 날짜 파티션하고, 고객ID를 클러스터 키로 버킷팅). 또한, Hive 메타스토어에 등록될 테이블/파티션 수가 방대해질 경우, 메타스토어 튜닝(DB connection pool, query cache 등)과 분할 메타DB까지 고려합니다.
- BigQuery 테이블 설계 전략: BigQuery에서는 스키마 설계 철학이 전통 RDBMS나 Hive와 다소 다릅니다. **반정규화(denormalization)**를 많이 활용하는데, 테이블 조인을 줄이고 한 테이블에 필요한 정보를 많이 담는 것이 일반적입니다. 예를 들어 Hive에서는 여러 차원 테이블과 조인해야 얻을 데이터를 BigQuery에서는 **중첩 및 반복 필드 (Nested & Repeated Fields)**로 한 테이블에 넣는 것을 고려합니다. BigQuery는 Nested field를 통해 배열이나 구조체 형태 데이터를 가질 수 있으므로, Oracle/Hive의 상세-상위 테이블 관계를 BigQuery에서는 하나의 레코드 내 구조로 포함시켜 조인 비용을 줄이는 방식입니다. 또한 파트셔닝/클러스터링은 기본으로 설계합니다. 대부분 테이블은 일자 파티션을 걸고, 큰 테이블은 클러스터링 키 2~3개를 선정합니다. 예를 들어, 웹 로그 테이블이라면 DATE로 파티션, 사용자ID와 이벤트타입으로 클러스터를 주어 특정 사용자 혹은 이벤트별 분석 쿼리를 최적화합니다. 테이블 이름/구성은 Oracle/Hive와 반드시 같을 필요는 없습니다. BigQuery의 강점을 살리도록 스키마 리모델링을 하고 필요시 Data Mart 전용 테이블을 추가로 만드는 것이 좋습니다. (예: Hive에는 정규화된 3장 테이블, BigQuery에는 이를 조인하여 생성한 뷰 혹은 마트 테이블).
- 동일 뷰(View) 제공: Hive와 BigQuery의 데이터 마트 설계가 달라질 수 있으므로, 추상화 계층을 통일하는 방안을 고려합니다. 예를 들어, **뷰(View)**나 공통 SQL 인터페이스를 제공합니다. Hive 메타스토어에 정의한 View와 BigQuery View를 만들어, BI에서 “어제 매출 집계”를 질의하면 내부적으로 두 시스템에서 동일한 로직이 수행되도록 맞출 수 있습니다. 또는 BI툴에서 Hive와 BigQuery를Federation하거나, Presto/Trino 같은 SQL 엔진으로 Hive와 BigQuery를 연동해 하나의 SQL로 질의하는 방법도 있습니다.
- 데이터 동기화: 두 시스템의 데이터 마트가 일치되도록 관리합니다. 배치 적재 후 Hive와 BigQuery에 모두 동일 집계 쿼리를 돌려 결과를 비교하는 등을 자동화하여 차이가 생기면 알림을 보내는 품질 관리도 설계에 포함합니다. 향후 한 쪽만 사용하게 되더라도 현재 이관 단계에서는 이중화로서 서로 검증하며 품질을 담보할 수 있습니다.
결론적으로, Hive는 대용량 원시데이터 보관과 Hadoop 에코시스템 연계, BigQuery는 고속 대화형 분석과 손쉬운 확장에 각각 장점을 가지므로, 각 특성에 맞게 스키마와 저장방식을 최적화하여 데이터 마트를 구축합니다.
BI 및 AI 연계 (Integration with BI & AI)
이관된 데이터는 Business Intelligence(BI) 대시보드와 AI/ML 모델링에 활용할 수 있습니다. 이를 위한 연계 방안을 제시하면 다음과 같습니다:
- BI 도구 연결: Hive와 BigQuery 모두 표준 SQL 인터페이스를 제공하므로 대부분의 BI 도구(Tableau, PowerBI, Looker, Qlik 등)에서 연결 가능합니다.
- Hive 연결: Hive에는 HiveServer2를 통해 ODBC/JDBC로 접속할 수 있습니다. BI툴에서 Hive JDBC 드라이버를 사용해 Hive 메타스토어의 테이블에 쿼리할 수 있습니다. 다만 Hive 쿼리는 배치 성격이 강해 대화형 속도가 느릴 수 있으므로, Presto/Trino 같은 빠른 SQL 엔진을 Hive 데이터에 붙여 사용하기도 합니다. Presto는 Hive의 데이터를 메모리/MPP 형태로 질의하여 대화형 응답 속도를 높여주므로, BI 조회용으로 많이 활용됩니다. 또한 Hadoop에 Apache Kylin이나 Druid를 도입해 큐브나 OLAP 인덱스를 만들면 대시보드 쿼리에 대한 응답속도를 향상시킬 수 있습니다. 이러한 추가 기술은 필요시 고려사항이고, 우선은 Hive 자체로도 대량 데이터 집계에는 문제가 없습니다.
- BigQuery 연결: BigQuery는 표준 SQL을 사용하고 ODBC/JDBC 드라이버뿐만 아니라 네이티브 커넥터를 많은 BI 도구가 제공합니다. 예를 들어 Tableau의 경우 BigQuery용 커넥터를 통해 손쉽게 연결하고, 성능을 위해 쿼리 푸시다운을 수행합니다. BigQuery는 완전관리형으로 높은 동시성과 대량 쿼리 처리에 강하므로, 여러 BI 리포트가 동시 조회해도 탄력적으로 리소스를 확장하여 응답합니다. 다만, BigQuery 비용은 스캔 데이터 양에 비례하므로, BI 쿼리가 매우 큰 테이블을 전체 스캔하지 않도록 필터 조건을 걸거나 요약테이블을 만들어두는 것이 좋습니다.
- AI/ML 활용:
- Hive 데이터 활용 AI: Hive에 저장된 데이터는 Hadoop 에코시스템 내 Spark MLlib, Mahout, TensorFlowOnSpark 등으로 바로 활용할 수 있습니다. 예를 들어 Spark를 이용해 Hive 테이블을 DataFrame으로 불러오고 피처 엔지니어링 후 머신러닝 모델을 학습시킬 수 있습니다. Hive 테이블을 피쳐스토어처럼 사용해 훈련 데이터셋을 구성하고 결과 모델을 저장/배포하는 식입니다. 또한 Hive 위에 돌아가는 H2O.ai나 SAS 등의 분석 플랫폼을 연계하면 Hive 데이터를 대화형으로 탐색하고 모델링할 수 있습니다. 실시간 데이터의 경우 Kafka로 스트리밍 처리하여 Spark Streaming이나 Flink 등으로 실시간 예측도 구현 가능할 것입니다.
- BigQuery 데이터 활용 AI: BigQuery 자체적으로 BigQuery ML이라는 SQL 기반 머신러닝 기능을 제공합니다. 이를 통해 데이터를 BigQuery에 둔 채로 회귀, 분류, 시계열 예측 모델을 바로 만들 수 있습니다. 예를 들어 매일 적재되는 판매 데이터에 대해 BigQuery ML로 수요 예측 모델을 학습시키고 결과를 테이블로 저장해 BI 대시보드에 시각화할 수 있습니다. 또는 BigQuery에 저장된 데이터를 AI Platform (Vertex AI) 노트북이나 AutoML로 불러와 모델링에 사용할 수도 있습니다. BigQuery는 Python 클라이언트와 호환되어 TensorFlow나 PyTorch 코드에서 BigQuery를 데이터 소스로 불러오는 것이 가능하므로, 대규모 데이터 학습에 유용합니다.
- Cross-System AI: Hive와 BigQuery 양쪽에 데이터가 있는 경우, Feature Store 관점에서 두 곳 데이터를 결합하여 쓸 수도 있습니다. 예를 들어 Hive에 장기간의 상세 로그가 있고 BigQuery에 최근 1년치 요약데이터가 있다면, ML모델은 두 부분을 합쳐 학습해야 할 수 있습니다. 이때는 데이터 파이프라인으로 Hive→BigQuery 또는 반대로 데이터를 이동해 일관된 데이터셋을 구성합니다. (이미 양방향 데이터 동기화가 구축된 만큼 활용만 하면 됨).
- 실시간 대시보드/알람: 10분 주기로 적재되는 데이터는 준실시간 대시보드에 활용 가능합니다. Kafka로 유입된 스트림을 이용해 실시간 지표를 계산하는 Spark Streaming 애플리케이션을 두고 그 결과를 메모리 캐시나 빠른 Kudu/Redis 등에 넣어 웹 대시보드에 제공할 수도 있습니다. 하지만 요구사항에서 10분 주기이므로, BigQuery에 10분마다 누적된 데이터를 쿼리하여 가까운 실시간 대시보드를 구현할 수도 있습니다. (예: Data Studio에서 10분 단위 Auto-refresh). 알람의 경우 임계치 조건을 Airflow나 Cloud Functions에서 크론 형태로 BigQuery SQL을 돌려 체크하거나, NiFi에서 분기 처리하여 특정 값 이상시 메시지큐로 알림 전송하는 식으로 구현 가능합니다.
- 데이터 카탈로그 & 거버넌스: BI/AI 활용을 높이기 위해 데이터 카탈로그를 구축하여 Hive와 BigQuery의 데이터 자산을 한눈에 찾고 이해할 수 있게 합니다. 컬럼 설명, 혈통(Lineage), 품질 지표 등을 기록하여 분석가들이 쉽게 접근하도록 합니다.
결과적으로, ETL로 적재된 데이터 → 데이터 마트 구성 → BI 시각화/AI 모델링의 선순환이 이뤄지며, 이를 통해 Oracle Exadata의 데이터가 Hadoop과 BigQuery 환경에서 최대한 활용될 수 있게 됩니다.
ETL 구현 예시 (Sample ETL Code Examples)
마지막으로, 앞서 설명한 ETL 파이프라인의 구현을 돕기 위해 예제 코드와 설정 조각을 제공합니다. Sqoop, NiFi, Kafka, Airflow 각각에 대한 간략한 사용 예를 포함합니다 (실제 환경에 맞게 수정 필요).
1. Sqoop을 이용한 Oracle → Hive 배치 적재 (예제)
다음 Sqoop 커맨드는 Oracle의 특정 테이블을 Hive로 가져오는 예시입니다. Oracle JDBC 드라이버가 Sqoop에 등록되어 있어야 하며, 대상 Hive 데이터베이스와 테이블명이 지정됩니다.
# Oracle 테이블 전체를 Hive에 import하는 Sqoop 명령 예시
sqoop import \
--connect jdbc:oracle:thin:@//oracle-server:1521/ORCL \
--username oracle_user --password-file hdfs://user/etl/oracle.pwd \
--table SALES_DATA \
--target-dir /data/staging/sales_data \ # HDFS 임시 저장 경로
--hive-import --hive-database staging_db \ # Hive로 바로 import
--hive-table sales_data_raw \
--hive-drop-import-delims \ # 임베디드 제어문자 제거
--null-string '\\N' --null-non-string '\\N' \ # NULL 처리
--fields-terminated-by '\001' \ # 필드 구분자 (Ctrl-A)
--num-mappers 8 --split-by SALES_ID \ # 병렬 8태스크, 분할 키
--incremental append --check-column LAST_UPDATE_DATE --last-value "2025-02-26"
위 예시는 판매 데이터(SALES_DATA) 테이블을 Hive staging_db.sales_data_raw 테이블로 가져옵니다. 주요 포인트: --num-mappers 8로 8개의 병렬 맵 태스크로 추출하여 속도를 높였고, --split-by SALES_ID로 고유키 기반 분할을 지정했습니다. --incremental append 모드와 --check-column을 지정하면 증분 모드로 작동하여 LAST_UPDATE_DATE가 주어진 값 이후인 행만 가져올 수 있습니다. 처음엔 풀로드를 하고, 이후부터는 --last-value를 갱신해가며 증분도 가능함을 보여줍니다. (증분 모드는 NiFi/Kafka 없이 Sqoop만으로 특정 테이블은 10분마다 잡아올 때 활용 가능). Hive 관련 옵션으로는 --hive-drop-import-delims로 임베디드 구분 문자를 제거했고, 필드 구분자를 ^A (0x01)로 지정하여 CSV에 콤마 등이 있어도 안전하게 구분됩니다.
실제로 Airflow에서 이 Sqoop 명령을 실행하려면 BashOperator를 사용할 수 있습니다:
# Airflow DAG 내 Sqoop 배치 작업 태스크
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('daily_oracle_to_hive',
default_args=default_args,
schedule_interval='0 1 * * *', # 매일 01:00 실행
start_date=datetime(2025, 2, 27))
sqoop_import = BashOperator(
task_id='sqoop_sales_data_to_hive',
bash_command="""sqoop import --connect jdbc:oracle:thin:@//oracle:1521/ORCL --username oracle_user --password-file hdfs://... --table SALES_DATA --hive-import --hive-database staging_db --hive-table sales_data_raw --num-mappers 8 --split-by SALES_ID""",
dag=dag
)
(패스워드는 --password-file 옵션으로 HDFS에 암호화되어 저장된 파일을 참조하는 방식으로 넣었습니다.)
2. Apache NiFi를 이용한 실시간 흐름 예시
NiFi에서는 데이터 흐름을 UI에서 설정합니다. 예를 들어 Oracle -> Hive/HDFS + BigQuery 흐름을 구현하는 간단한 예시는 다음과 같습니다:
- GenerateTableFetch (또는 QueryDatabaseTable): Oracle 소스 DB에서 증분 질의를 수행. 구성 예:
- Database Connection Pool: Oracle JDBC 연결 (Service로 별도 설정)
- Table Name: SALES_DATA
- Maximum Value Columns: LAST_UPDATE_DATE (이 컬럼 기준 최대값 저장하여 증분 추출)
- 스케줄: 10분 주기 실행
- ExecuteSQL: 상기 GenerateTableFetch가 생성한 쿼리(FlowFile)에 따라 Oracle에서 데이터를 뽑아옴. 결과가 Avro 레코드 세트로 흐름에 담깁니다.
- ConvertAvroToJSON (선택): Downstream 시스템에 맞게 Avro -> JSON 변환. (BigQuery Streaming API는 JSON 활용 가능)
- SplitRecord (선택): 대량의 레코드를 한 번에 처리하기보다 적당한 청크로 나눔. 예를 들어 1000건씩 JSON array 분할.
- PutHDFS (or PutFile): Hive의 HDFS 경로에 파일로 저장. 예: /data/hive/landing/sales_data/dt=20250227/part-*.json (분할 저장 및 파티션 디렉토리).
- InvokeHTTP (or PutBigQuery): BigQuery REST API로 JSON 레코드 전송. NiFi에 기본 BigQuery 프로세서는 없으나 InvokeHTTP를 사용하여 BigQuery streaming insert API 호출 가능. 요청 본문에 JSON 배열을 담고, 인증은 서비스 계정 OAuth 토큰 헤더로 포함. (이전 단계에서 GetJWT 혹은 Script 활용). 또는, NiFi ExecuteScript (Python/Groovy)에서 BigQuery 파이썬 클라이언트를 호출하여 삽입할 수도 있습니다.
- RouteOnContent/ReplaceText (선택): 에러 처리나 데이터 변환을 위한 프로세서. 예를 들어 null 값을 "\N" -> null 로 치환하거나, 특정 컬럼값 포맷 변경 등.
- PutHiveQL: Hive 테이블에 데이터 존재 시 MERGE 작업 수행 가능. (예를 들어 임시 테이블에 로드 후 MERGE INTO). NiFi에서 HiveQL 프로세서를 통해 Hive에 SQL을 날려 테이블 갱신을 스크립팅할 수 있습니다.
위와 같이 NiFi로 엔드투엔드 실시간 파이프라인을 구축할 수 있습니다. NiFi의 장점은 UI에서 각 단계의 큐 상태, 처리 속도를 모니터링하며 필요시 조정이 쉽다는 점입니다. 또한 특정 프로세서에서 병렬 처리(병렬 task 수)를 조절함으로써 실시간 파이프라인의 속도 제어 요구사항도 충족합니다.
3. Apache Kafka + Kafka Connect 예시
Kafka를 활용한 접근에서는 코드보다는 설정이 중심이 됩니다. Kafka Connect를 사용하면 소스→싱크의 데이터 이동을 설정 파일로 정의할 수 있습니다. 예를 들어 Kafka Connect JDBC Source로 Oracle을 10분마다 폴링하는 설정 (Debezium 커넥터 사용 시 실시간 CDC도 가능):
// Kafka Connect JDBC Source config (Oracle -> Kafka)
{
"name": "oracle-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@oracle-server:1521/ORCL",
"connection.user": "oracle_user",
"connection.password": "oracle_password",
"table.whitelist": "SALES_DATA",
"mode": "timestamp",
"timestamp.column.name": "LAST_UPDATE_DATE",
"poll.interval.ms": "600000", // 10 minutes
"topic.prefix": "oracle."
}
위 설정은 Oracle의 SALES_DATA 테이블을 LAST_UPDATE_DATE 컬럼 기준으로 timestamp 모드 증분조회하여, Kafka 토픽 oracle.SALES_DATA에 계속 적재합니다. 10분 주기로 폴링하도록 설정되었습니다.
Kafka에 데이터가 들어왔으면, Kafka Connect HDFS Sink와 Kafka Connect BigQuery Sink를 설정합니다. (BigQuery Sink는 Confluent 제공 커넥터로 가정):
// Kafka Connect HDFS Sink config (Kafka -> HDFS for Hive)
{
"name": "hdfs-sink-connector",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "2",
"topics": "oracle.SALES_DATA",
"hdfs.url": "hdfs://namenode:8020",
"hadoop.conf.dir": "/etc/hadoop/conf",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"topics.dir": "/data/hive/landing",
"locale": "en_US",
"topics.dir": "/data/hive/landing",
"logs.dir": "/connect-hdfs/logs"
}
위 HDFS Sink는 토픽 데이터를 HDFS 경로 /data/hive/landing/oracle.SALES_DATA/year=YYYY/month=MM/day=DD/ 이하에 파일로 저장합니다 (커넥터 기본 설정에 따라 날짜별 디렉토리 분리). flush.size나 rotate.interval.ms에 따라 파일 롤링이 이루어집니다. 이렇게 적재된 파일은 Hive 외부 테이블로 매핑해 두면 최신 데이터까지 Hive에서 조회 가능하게 됩니다.
// Kafka Connect BigQuery Sink config (Kafka -> BigQuery)
{
"name": "bigquery-sink-connector",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "2",
"topics": "oracle.SALES_DATA",
"project.id": "my-gcp-project",
"datasets": "oracle_dataset",
"sanitizeTopics": "true",
"autoCreateTables": "true",
"autoUpdateSchemas": "true",
"bufferSize": "1000",
"maxWriteSize": "10000",
"tableWriteWait": "1000"
}
(참고: BigQuerySinkConnector는 오픈소스로 Wepay에서 개발된 커넥터를 예시로 들었습니다.) 이 설정은 Oracle.SALES_DATA 토픽의 메시지를 받아 BigQuery의 oracle_dataset.SALES_DATA 테이블에 지속 삽입합니다. autoCreateTables 및 autoUpdateSchemas 옵션으로 스키마에 따라 BigQuery 테이블을 자동 생성/수정하게 할 수 있습니다. buffer와 batch write 관련 설정도 조절하여 실시간 속도를 제어합니다.
Kafka Connect를 쓰지 않을 경우, 직접 소비자 코드를 작성할 수도 있습니다. 예를 들어 Python으로 Kafka→BigQuery consumer를 작성한다면:
from google.cloud import bigquery
from confluent_kafka import Consumer
# BigQuery client 설정
bq_client = bigquery.Client(project="my-gcp-project")
dataset_id = "oracle_dataset"
table_id = "SALES_DATA"
# Kafka Consumer 설정
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'bq-writer-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['oracle.SALES_DATA'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
record = msg.value().decode('utf-8') # JSON string for example
# BigQuery에 기록
errors = bq_client.insert_rows_json(f"{dataset_id}.{table_id}", [json.loads(record)])
if errors:
print(f"Error inserting to BQ: {errors}")
위 코드는 지속적으로 Kafka 토픽을 폴링하여 메시지를 BigQuery에 JSON으로 스트리밍 삽입합니다. 서비스 계정 인증은 애플리케이션 기본 인증 또는 환경변수 GOOGLE_APPLICATION_CREDENTIALS로 JSON 키 경로를 지정하여 이뤄집니다.
4. Apache Airflow를 이용한 워크플로우 관리 예시
Airflow DAG을 통해 전체 ETL 프로세스를 관리할 수 있습니다. 예를 들어 배치 파이프라인 DAG과 실시간 파이프라인 DAG/데몬 두 가지를 운영할 수 있습니다.
- 배치 DAG (oracle_to_hive_bq_batch): 매일 01시 시작, Sqoop으로 Oracle→HDFS/Hive 적재 후, GCS 업로드 + BigQuery 로드.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import subprocess
default_args = {'owner': 'airflow', 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=10)}
dag = DAG('oracle_to_hive_bq_batch', default_args=default_args, schedule_interval='0 1 * * *', start_date=datetime(2025,2,27))
# 1) Sqoop to Hive
sqoop_to_hive = BashOperator(
task_id='sqoop_to_hive',
bash_command='sqoop import --connect jdbc:oracle:thin:@... --username ... --password ... --table ALL_TABLES --hive-import --hive-database staging_db',
dag=dag
)
# 2) Export HDFS to local (for GCS upload)
export_hdfs = BashOperator(
task_id='export_hdfs_data',
bash_command='hdfs dfs -getmerge /data/staging /tmp/exadata_dump.csv',
dag=dag
)
# 3) Upload to GCS
upload_gcs = BashOperator(
task_id='upload_to_gcs',
bash_command='gsutil cp /tmp/exadata_dump.csv gs://my-bucket/exadata/full_dump.csv',
dag=dag
)
# 4) BigQuery Load
load_bigquery = BashOperator(
task_id='load_bigquery',
bash_command='bq load --source_format=CSV mydataset.full_table gs://my-bucket/exadata/full_dump.csv schema.json',
dag=dag
)
sqoop_to_hive >> export_hdfs >> upload_gcs >> load_bigquery
위 DAG는 간략화된 예시이며, 실제로는 테이블별로 Sqoop 작업을 병렬로 수행하거나, Hive에서 변환 작업을 PythonOperator로 수행하는 등 세분화될 수 있습니다. 또한 중간 파일보다는 Avro+Direct BigQuery 로드로 개선 가능합니다.
- 실시간 DAG (start_stop_nifi_streaming): NiFi 자체에 스케줄이 있으므로 Airflow에서 항상 개입할 필요는 없지만, 업무 시간에만 NiFi 흐름을 가동하려면 Airflow로 제어 가능합니다. 예를 들어 평일 09:00에 NiFi 실행, 18:00에 NiFi 정지하는 DAG:
from airflow.operators.http_operator import SimpleHttpOperator
start_nifi = SimpleHttpOperator(
task_id='start_nifi_flow',
method='PUT',
http_conn_id='nifi_conn', # NiFi API URL pre-configured
endpoint='/nifi-api/flow/process-groups/<PG_ID>', # PG_ID NiFi에서 확인
data=json.dumps({"id": "<PG_ID>", "state": "RUNNING"}),
headers={"Content-Type": "application/json"},
dag=dag
)
stop_nifi = SimpleHttpOperator(
task_id='stop_nifi_flow',
method='PUT',
http_conn_id='nifi_conn',
endpoint='/nifi-api/flow/process-groups/<PG_ID>',
data=json.dumps({"id": "<PG_ID>", "state": "STOPPED"}),
headers={"Content-Type": "application/json"},
dag=dag
)
이처럼 Airflow에서 NiFi REST API를 호출하여 데이터 흐름의 ON/OFF를 제어할 수 있습니다. Kafka의 경우는 항상 데몬으로 돌기 때문에 일반적으로 Airflow 개입이 필요없지만, 모니터링 DAG을 만들어 Kafka 컨슈머 라그(latency)를 체크하고 일정 threshold 이상이면 알람을 주거나, Kubernetes 기반이라면 컨슈머 파드를 스케일아웃하는 KubernetesPodOperator를 실행하는 방법도 생각할 수 있습니다.
- 에러 처리 및 알림: Airflow DAG 내 각 Task에 대해 실패 시 Slack이나 이메일 알림을 걸어둘 수 있습니다. 예: on_failure_callback 설정 사용. 또한 태스크 간 의존성에 BranchOperator를 써서, 특정 조건 (예: 전일자 데이터 없음) 시 흐름을 스킵하거나 별도 경로로 분岐하는 유연한 운영 로직도 코드로 작성 가능합니다.
전체 ETL 과정은 다양한 오픈소스 도구의 조합으로 구현되며, 각 구성 요소의 역할과 연계를 적절히 설정함으로써 안정적이고 확장 가능한 Oracle→Hive→BigQuery 데이터 파이프라인을 구축할 수 있습니다. 이 가이드의 아키텍처와 개발 원칙을 토대로 실제 환경에 최적화된 구현을 진행하시기 바랍니다.
본 게시글은 ChatGPT를 활용하여 작성되었습니다.