마이크로서비스 분해 전략
도메인 기반 서비스 분리
현재 모놀리식 구조
Apps Script (단일 스크립트)
├── nkey() - 키워드 도구 API
├── ntrend() - 데이터랩 트렌드 API
├── nAgeTrend() - 데이터랩 연령별 API
├── 인증 로직 (HMAC)
├── 데이터 저장 (Sheets)
└── 에러 처리
목표 마이크로서비스 구조
API Gateway
├── Authentication Service (인증/인가)
├── Data Collection Service (외부 API 호출)
├── Data Processing Service (데이터 변환/분석)
├── Cache Service (Redis/Memcached)
├── Notification Service (알림)
└── Web API Service (REST API 제공)
서비스별 상세 설계
1. Authentication Service
책임: 네이버 API 인증, 사용자 인증
# auth-service/main.py
import jwt
import hmac
import hashlib
import base64
from datetime import datetime, timedelta
class NaverAuthService:
def __init__(self, access_key, secret_key, customer_id):
self.access_key = access_key
self.secret_key = secret_key
self.customer_id = customer_id
def generate_signature(self, method, api_url, timestamp):
"""네이버 API HMAC 서명 생성"""
signature_string = f"{timestamp}.{method}.{api_url}"
signature = base64.b64encode(
hmac.new(
self.secret_key.encode('utf-8'),
signature_string.encode('utf-8'),
hashlib.sha256
).digest()
).decode('utf-8')
return signature
def get_auth_headers(self, method, api_url):
"""인증 헤더 생성"""
timestamp = str(int(datetime.now().timestamp() * 1000))
signature = self.generate_signature(method, api_url, timestamp)
return {
'X-Timestamp': timestamp,
'X-API-KEY': self.access_key,
'X-Customer': self.customer_id,
'X-Signature': signature
}
class UserAuthService:
def __init__(self, jwt_secret):
self.jwt_secret = jwt_secret
def generate_token(self, user_id, expires_hours=24):
"""JWT 토큰 생성"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=expires_hours)
}
return jwt.encode(payload, self.jwt_secret, algorithm='HS256')
def validate_token(self, token):
"""토큰 검증"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
return payload['user_id']
except jwt.ExpiredSignatureError:
raise Exception('Token expired')
except jwt.InvalidTokenError:
raise Exception('Invalid token')2. Data Collection Service
책임: 외부 API 호출, 원시 데이터 수집
# data-collection-service/collectors/naver_keyword.py
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class KeywordData:
keyword: str
pc_search_count: int
mobile_search_count: int
pc_click_count: float
mobile_click_count: float
pc_ctr: float
mobile_ctr: float
competition: str
ad_count: int
class NaverKeywordCollector:
def __init__(self, auth_service):
self.auth_service = auth_service
async def collect_keyword_data(self, keyword: str) -> Optional[KeywordData]:
"""키워드 데이터 수집"""
try:
headers = self.auth_service.get_auth_headers("GET", "/keywordstool")
url = f"https://api.naver.com/keywordstool?hintKeywords={keyword}&showDetail=1"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return self._parse_keyword_data(keyword, data)
else:
raise Exception(f"API Error: {response.status}")
except Exception as e:
# 에러 로깅 및 재시도 로직
logger.error(f"Failed to collect data for {keyword}: {str(e)}")
return None
def _parse_keyword_data(self, keyword: str, raw_data: dict) -> KeywordData:
"""원시 데이터 파싱"""
if not raw_data.get('keywordList'):
raise Exception("No keyword data found")
kw_data = raw_data['keywordList'][0]
return KeywordData(
keyword=keyword,
pc_search_count=kw_data.get('monthlyPcQcCnt', 0),
mobile_search_count=kw_data.get('monthlyMobileQcCnt', 0),
pc_click_count=kw_data.get('monthlyAvePcClkCnt', 0),
mobile_click_count=kw_data.get('monthlyAveMobileClkCnt', 0),
pc_ctr=kw_data.get('monthlyAvePcCtr', 0),
mobile_ctr=kw_data.get('monthlyAveMobileCtr', 0),
competition=kw_data.get('compIdx', '낮음'),
ad_count=kw_data.get('plAvgDepth', 0)
)
# 배치 수집기
class BatchCollector:
def __init__(self, keyword_collector, datalab_collector):
self.keyword_collector = keyword_collector
self.datalab_collector = datalab_collector
async def collect_batch(self, keywords: List[str]) -> List[dict]:
"""여러 키워드 동시 수집"""
tasks = []
for keyword in keywords:
# 키워드 데이터 + 트렌드 데이터 동시 수집
tasks.append(self._collect_all_data(keyword))
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def _collect_all_data(self, keyword: str) -> dict:
"""단일 키워드의 모든 데이터 수집"""
keyword_task = self.keyword_collector.collect_keyword_data(keyword)
trend_task = self.datalab_collector.collect_trend_data(keyword)
keyword_data, trend_data = await asyncio.gather(
keyword_task, trend_task, return_exceptions=True
)
return {
'keyword': keyword,
'keyword_data': keyword_data if not isinstance(keyword_data, Exception) else None,
'trend_data': trend_data if not isinstance(trend_data, Exception) else None,
'collected_at': datetime.utcnow().isoformat()
}3. Data Processing Service
책임: 데이터 변환, 분석, 집계
# data-processing-service/processors/analytics.py
from dataclasses import dataclass
from typing import List, Dict
import numpy as np
from datetime import datetime, timedelta
@dataclass
class ProcessedKeywordData:
keyword: str
search_volume_trend: str # 'rising', 'falling', 'stable'
competition_level: str # 'high', 'medium', 'low'
opportunity_score: float # 0-100
seasonal_pattern: Dict[str, float]
recommendations: List[str]
class KeywordAnalyzer:
def __init__(self):
self.trend_threshold = 0.1 # 10% 변화를 기준으로 판단
def analyze_keyword(self, keyword_data: dict, trend_data: List[dict]) -> ProcessedKeywordData:
"""키워드 종합 분석"""
# 트렌드 분석
trend_direction = self._analyze_trend(trend_data)
# 경쟁도 정규화
competition = self._normalize_competition(keyword_data.get('competition'))
# 기회 점수 계산
opportunity = self._calculate_opportunity_score(keyword_data, trend_data)
# 계절성 패턴 분석
seasonal = self._analyze_seasonality(trend_data)
# 추천사항 생성
recommendations = self._generate_recommendations(
keyword_data, trend_direction, competition, opportunity
)
return ProcessedKeywordData(
keyword=keyword_data['keyword'],
search_volume_trend=trend_direction,
competition_level=competition,
opportunity_score=opportunity,
seasonal_pattern=seasonal,
recommendations=recommendations
)
def _analyze_trend(self, trend_data: List[dict]) -> str:
"""트렌드 방향 분석"""
if len(trend_data) < 3:
return 'insufficient_data'
# 최근 3개월 vs 이전 3개월 비교
recent_avg = np.mean([d['ratio'] for d in trend_data[-3:]])
previous_avg = np.mean([d['ratio'] for d in trend_data[-6:-3]])
change_rate = (recent_avg - previous_avg) / previous_avg
if change_rate > self.trend_threshold:
return 'rising'
elif change_rate < -self.trend_threshold:
return 'falling'
else:
return 'stable'
def _calculate_opportunity_score(self, keyword_data: dict, trend_data: List[dict]) -> float:
"""기회 점수 계산 (0-100)"""
# 검색량 점수 (높을수록 좋음)
total_volume = keyword_data.get('pc_search_count', 0) + keyword_data.get('mobile_search_count', 0)
volume_score = min(total_volume / 10000 * 40, 40) # 최대 40점
# 경쟁도 점수 (낮을수록 좋음)
competition = keyword_data.get('competition', '높음')
competition_score = {'낮음': 30, '중간': 20, '높음': 10}.get(competition, 10)
# 트렌드 점수
if len(trend_data) >= 3:
trend_direction = self._analyze_trend(trend_data)
trend_score = {'rising': 30, 'stable': 20, 'falling': 10}.get(trend_direction, 15)
else:
trend_score = 15
return volume_score + competition_score + trend_score
def _analyze_seasonality(self, trend_data: List[dict]) -> Dict[str, float]:
"""계절성 패턴 분석"""
monthly_patterns = {}
for data in trend_data:
# period 형태: "2024-01-01"
month = data['period'].split('-')[1]
if month not in monthly_patterns:
monthly_patterns[month] = []
monthly_patterns[month].append(data['ratio'])
# 월별 평균 계산
monthly_avg = {}
for month, ratios in monthly_patterns.items():
monthly_avg[month] = np.mean(ratios)
return monthly_avg
def _generate_recommendations(self, keyword_data, trend, competition, opportunity) -> List[str]:
"""추천사항 생성"""
recommendations = []
if opportunity > 70:
recommendations.append("높은 기회 점수 - 적극적 마케팅 권장")
if trend == 'rising':
recommendations.append("상승 트렌드 - 빠른 진입 필요")
elif trend == 'falling':
recommendations.append("하락 트렌드 - 신중한 접근 필요")
if competition == '낮음':
recommendations.append("경쟁도 낮음 - 진입 장벽 낮음")
elif competition == '높음':
recommendations.append("경쟁도 높음 - 차별화 전략 필요")
# CTR 기반 추천
mobile_ctr = keyword_data.get('mobile_ctr', 0)
pc_ctr = keyword_data.get('pc_ctr', 0)
if mobile_ctr > pc_ctr * 2:
recommendations.append("모바일 최적화 광고 권장")
elif pc_ctr > mobile_ctr * 2:
recommendations.append("PC 타겟팅 광고 권장")
return recommendations4. Cache Service
책임: 데이터 캐싱, 성능 최적화
# cache-service/cache_manager.py
import redis
import json
import hashlib
from datetime import timedelta
from typing import Optional, Any
class CacheManager:
def __init__(self, redis_client=None):
# 로컬 개발 시 메모리 캐시 사용
self.redis_client = redis_client
self.memory_cache = {} if redis_client is None else None
def _generate_key(self, prefix: str, params: dict) -> str:
"""캐시 키 생성"""
param_str = json.dumps(params, sort_keys=True)
param_hash = hashlib.md5(param_str.encode()).hexdigest()[:8]
return f"{prefix}:{param_hash}"
async def get(self, prefix: str, params: dict) -> Optional[Any]:
"""캐시 조회"""
key = self._generate_key(prefix, params)
if self.redis_client:
cached = await self.redis_client.get(key)
return json.loads(cached) if cached else None
else:
# 메모리 캐시 (개발 환경)
return self.memory_cache.get(key)
async def set(self, prefix: str, params: dict, data: Any, expire_seconds: int = 3600):
"""캐시 저장"""
key = self._generate_key(prefix, params)
if self.redis_client:
await self.redis_client.setex(key, expire_seconds, json.dumps(data))
else:
# 메모리 캐시
self.memory_cache[key] = data
async def invalidate_pattern(self, pattern: str):
"""패턴 기반 캐시 무효화"""
if self.redis_client:
keys = await self.redis_client.keys(pattern)
if keys:
await self.redis_client.delete(*keys)
else:
# 메모리 캐시에서 패턴 매칭 삭제
to_delete = [k for k in self.memory_cache.keys() if pattern in k]
for k in to_delete:
del self.memory_cache[k]
# 캐시 데코레이터
def cached(prefix: str, expire_seconds: int = 3600):
def decorator(func):
async def wrapper(*args, **kwargs):
cache_manager = kwargs.get('cache_manager') or args[0].cache_manager
# 캐시 키용 파라미터 추출
cache_params = {
'func_name': func.__name__,
'args': str(args[1:]), # self 제외
'kwargs': str(sorted(kwargs.items()))
}
# 캐시 확인
cached_result = await cache_manager.get(prefix, cache_params)
if cached_result is not None:
return cached_result
# 캐시 미스 시 실행
result = await func(*args, **kwargs)
# 결과 캐싱
await cache_manager.set(prefix, cache_params, result, expire_seconds)
return result
return wrapper
return decorator5. Notification Service
책임: 알림, 이벤트 처리
# notification-service/notifiers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict
import asyncio
@dataclass
class NotificationEvent:
event_type: str
user_id: str
data: Dict
priority: str = 'normal' # 'high', 'normal', 'low'
class NotificationChannel(ABC):
@abstractmethod
async def send(self, event: NotificationEvent) -> bool:
pass
class EmailNotifier(NotificationChannel):
def __init__(self, smtp_config):
self.smtp_config = smtp_config
async def send(self, event: NotificationEvent) -> bool:
"""이메일 발송"""
# 실제 구현 시 SendGrid, AWS SES 등 활용
print(f"Email sent: {event.event_type} to {event.user_id}")
return True
class WebhookNotifier(NotificationChannel):
def __init__(self, webhook_url):
self.webhook_url = webhook_url
async def send(self, event: NotificationEvent) -> bool:
"""웹훅 발송"""
# Slack, Discord 등으로 알림
print(f"Webhook sent: {event.event_type}")
return True
class NotificationService:
def __init__(self):
self.channels: List[NotificationChannel] = []
self.event_queue = asyncio.Queue()
def add_channel(self, channel: NotificationChannel):
self.channels.append(channel)
async def notify(self, event: NotificationEvent):
"""알림 발송"""
await self.event_queue.put(event)
async def process_notifications(self):
"""알림 큐 처리"""
while True:
try:
event = await self.event_queue.get()
# 우선순위별 처리
if event.priority == 'high':
await self._send_immediately(event)
else:
await self._send_normal(event)
self.event_queue.task_done()
except Exception as e:
print(f"Notification error: {e}")
async def _send_immediately(self, event: NotificationEvent):
"""즉시 발송"""
tasks = [channel.send(event) for channel in self.channels]
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_normal(self, event: NotificationEvent):
"""일반 발송 (배치 처리)"""
await asyncio.sleep(1) # 배치 간격
await self._send_immediately(event)서비스 간 통신 설계
1. API Gateway 패턴
# api-gateway-config.yaml
routes:
- path: /api/v1/keywords/*
service: data-collection-service
timeout: 30s
retry: 3
- path: /api/v1/analytics/*
service: data-processing-service
timeout: 10s
- path: /api/v1/auth/*
service: authentication-service
timeout: 5s
middleware:
- rate_limiting:
requests_per_minute: 100
- authentication:
required: true
exclude_paths: ["/api/v1/auth/login"]
- logging:
enabled: true2. 이벤트 기반 통신
# events/event_bus.py
from dataclasses import dataclass
from typing import Any, Callable, List
import asyncio
@dataclass
class Event:
event_type: str
source_service: str
data: Any
timestamp: float
class EventBus:
def __init__(self):
self.subscribers = {}
def subscribe(self, event_type: str, handler: Callable):
"""이벤트 구독"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event: Event):
"""이벤트 발행"""
if event.event_type in self.subscribers:
handlers = self.subscribers[event.event_type]
tasks = [handler(event) for handler in handlers]
await asyncio.gather(*tasks, return_exceptions=True)
# 사용 예시
event_bus = EventBus()
# 데이터 수집 완료 이벤트 구독
async def on_data_collected(event):
"""데이터 수집 완료 시 처리"""
await data_processing_service.process(event.data)
async def on_analysis_complete(event):
"""분석 완료 시 알림"""
await notification_service.notify(NotificationEvent(
event_type='analysis_complete',
user_id=event.data['user_id'],
data=event.data
))
event_bus.subscribe('data_collected', on_data_collected)
event_bus.subscribe('analysis_complete', on_analysis_complete)배포 및 운영
Docker Compose 로컬 개발환경
# docker-compose.yml
version: '3.8'
services:
auth-service:
build: ./auth-service
ports:
- "3001:3000"
environment:
- JWT_SECRET=${JWT_SECRET}
- NAVER_ACCESS_KEY=${NAVER_ACCESS_KEY}
data-collection-service:
build: ./data-collection-service
ports:
- "3002:3000"
depends_on:
- auth-service
data-processing-service:
build: ./data-processing-service
ports:
- "3003:3000"
cache-service:
image: redis:7-alpine
ports:
- "6379:6379"
api-gateway:
image: nginx:alpine
ports:
- "8080:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- auth-service
- data-collection-service
- data-processing-serviceKubernetes 배포 설정
# k8s/data-collection-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-collection-service
spec:
replicas: 2
selector:
matchLabels:
app: data-collection-service
template:
metadata:
labels:
app: data-collection-service
spec:
containers:
- name: app
image: gcr.io/project/data-collection-service:latest
ports:
- containerPort: 3000
env:
- name: AUTH_SERVICE_URL
value: "http://auth-service:3000"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
---
apiVersion: v1
kind: Service
metadata:
name: data-collection-service
spec:
selector:
app: data-collection-service
ports:
- port: 3000
targetPort: 3000업데이트: 2025-11-08
단계: 설계 완료, 구현 대기
작성일: 2025-11-08