마이크로서비스 분해 전략

도메인 기반 서비스 분리

현재 모놀리식 구조

Apps Script (단일 스크립트)
├── nkey() - 키워드 도구 API
├── ntrend() - 데이터랩 트렌드 API  
├── nAgeTrend() - 데이터랩 연령별 API
├── 인증 로직 (HMAC)
├── 데이터 저장 (Sheets)
└── 에러 처리

목표 마이크로서비스 구조

API Gateway
├── Authentication Service (인증/인가)
├── Data Collection Service (외부 API 호출)
├── Data Processing Service (데이터 변환/분석) 
├── Cache Service (Redis/Memcached)
├── Notification Service (알림)
└── Web API Service (REST API 제공)

서비스별 상세 설계

1. Authentication Service

책임: 네이버 API 인증, 사용자 인증

# auth-service/main.py
import jwt
import hmac
import hashlib
import base64
from datetime import datetime, timedelta
 
class NaverAuthService:
    def __init__(self, access_key, secret_key, customer_id):
        self.access_key = access_key
        self.secret_key = secret_key
        self.customer_id = customer_id
    
    def generate_signature(self, method, api_url, timestamp):
        """네이버 API HMAC 서명 생성"""
        signature_string = f"{timestamp}.{method}.{api_url}"
        signature = base64.b64encode(
            hmac.new(
                self.secret_key.encode('utf-8'),
                signature_string.encode('utf-8'),
                hashlib.sha256
            ).digest()
        ).decode('utf-8')
        return signature
    
    def get_auth_headers(self, method, api_url):
        """인증 헤더 생성"""
        timestamp = str(int(datetime.now().timestamp() * 1000))
        signature = self.generate_signature(method, api_url, timestamp)
        
        return {
            'X-Timestamp': timestamp,
            'X-API-KEY': self.access_key,
            'X-Customer': self.customer_id,
            'X-Signature': signature
        }
 
class UserAuthService:
    def __init__(self, jwt_secret):
        self.jwt_secret = jwt_secret
    
    def generate_token(self, user_id, expires_hours=24):
        """JWT 토큰 생성"""
        payload = {
            'user_id': user_id,
            'exp': datetime.utcnow() + timedelta(hours=expires_hours)
        }
        return jwt.encode(payload, self.jwt_secret, algorithm='HS256')
    
    def validate_token(self, token):
        """토큰 검증"""
        try:
            payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
            return payload['user_id']
        except jwt.ExpiredSignatureError:
            raise Exception('Token expired')
        except jwt.InvalidTokenError:
            raise Exception('Invalid token')

2. Data Collection Service

책임: 외부 API 호출, 원시 데이터 수집

# data-collection-service/collectors/naver_keyword.py
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional
 
@dataclass
class KeywordData:
    keyword: str
    pc_search_count: int
    mobile_search_count: int
    pc_click_count: float
    mobile_click_count: float
    pc_ctr: float
    mobile_ctr: float
    competition: str
    ad_count: int
 
class NaverKeywordCollector:
    def __init__(self, auth_service):
        self.auth_service = auth_service
    
    async def collect_keyword_data(self, keyword: str) -> Optional[KeywordData]:
        """키워드 데이터 수집"""
        try:
            headers = self.auth_service.get_auth_headers("GET", "/keywordstool")
            url = f"https://api.naver.com/keywordstool?hintKeywords={keyword}&showDetail=1"
            
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers) as response:
                    if response.status == 200:
                        data = await response.json()
                        return self._parse_keyword_data(keyword, data)
                    else:
                        raise Exception(f"API Error: {response.status}")
        
        except Exception as e:
            # 에러 로깅 및 재시도 로직
            logger.error(f"Failed to collect data for {keyword}: {str(e)}")
            return None
    
    def _parse_keyword_data(self, keyword: str, raw_data: dict) -> KeywordData:
        """원시 데이터 파싱"""
        if not raw_data.get('keywordList'):
            raise Exception("No keyword data found")
        
        kw_data = raw_data['keywordList'][0]
        
        return KeywordData(
            keyword=keyword,
            pc_search_count=kw_data.get('monthlyPcQcCnt', 0),
            mobile_search_count=kw_data.get('monthlyMobileQcCnt', 0),
            pc_click_count=kw_data.get('monthlyAvePcClkCnt', 0),
            mobile_click_count=kw_data.get('monthlyAveMobileClkCnt', 0),
            pc_ctr=kw_data.get('monthlyAvePcCtr', 0),
            mobile_ctr=kw_data.get('monthlyAveMobileCtr', 0),
            competition=kw_data.get('compIdx', '낮음'),
            ad_count=kw_data.get('plAvgDepth', 0)
        )
 
# 배치 수집기
class BatchCollector:
    def __init__(self, keyword_collector, datalab_collector):
        self.keyword_collector = keyword_collector
        self.datalab_collector = datalab_collector
    
    async def collect_batch(self, keywords: List[str]) -> List[dict]:
        """여러 키워드 동시 수집"""
        tasks = []
        
        for keyword in keywords:
            # 키워드 데이터 + 트렌드 데이터 동시 수집
            tasks.append(self._collect_all_data(keyword))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if not isinstance(r, Exception)]
    
    async def _collect_all_data(self, keyword: str) -> dict:
        """단일 키워드의 모든 데이터 수집"""
        keyword_task = self.keyword_collector.collect_keyword_data(keyword)
        trend_task = self.datalab_collector.collect_trend_data(keyword)
        
        keyword_data, trend_data = await asyncio.gather(
            keyword_task, trend_task, return_exceptions=True
        )
        
        return {
            'keyword': keyword,
            'keyword_data': keyword_data if not isinstance(keyword_data, Exception) else None,
            'trend_data': trend_data if not isinstance(trend_data, Exception) else None,
            'collected_at': datetime.utcnow().isoformat()
        }

3. Data Processing Service

책임: 데이터 변환, 분석, 집계

# data-processing-service/processors/analytics.py
from dataclasses import dataclass
from typing import List, Dict
import numpy as np
from datetime import datetime, timedelta
 
@dataclass 
class ProcessedKeywordData:
    keyword: str
    search_volume_trend: str  # 'rising', 'falling', 'stable'
    competition_level: str    # 'high', 'medium', 'low'
    opportunity_score: float  # 0-100
    seasonal_pattern: Dict[str, float]
    recommendations: List[str]
 
class KeywordAnalyzer:
    def __init__(self):
        self.trend_threshold = 0.1  # 10% 변화를 기준으로 판단
    
    def analyze_keyword(self, keyword_data: dict, trend_data: List[dict]) -> ProcessedKeywordData:
        """키워드 종합 분석"""
        
        # 트렌드 분석
        trend_direction = self._analyze_trend(trend_data)
        
        # 경쟁도 정규화
        competition = self._normalize_competition(keyword_data.get('competition'))
        
        # 기회 점수 계산
        opportunity = self._calculate_opportunity_score(keyword_data, trend_data)
        
        # 계절성 패턴 분석
        seasonal = self._analyze_seasonality(trend_data)
        
        # 추천사항 생성
        recommendations = self._generate_recommendations(
            keyword_data, trend_direction, competition, opportunity
        )
        
        return ProcessedKeywordData(
            keyword=keyword_data['keyword'],
            search_volume_trend=trend_direction,
            competition_level=competition,
            opportunity_score=opportunity,
            seasonal_pattern=seasonal,
            recommendations=recommendations
        )
    
    def _analyze_trend(self, trend_data: List[dict]) -> str:
        """트렌드 방향 분석"""
        if len(trend_data) < 3:
            return 'insufficient_data'
        
        # 최근 3개월 vs 이전 3개월 비교
        recent_avg = np.mean([d['ratio'] for d in trend_data[-3:]])
        previous_avg = np.mean([d['ratio'] for d in trend_data[-6:-3]])
        
        change_rate = (recent_avg - previous_avg) / previous_avg
        
        if change_rate > self.trend_threshold:
            return 'rising'
        elif change_rate < -self.trend_threshold:
            return 'falling'
        else:
            return 'stable'
    
    def _calculate_opportunity_score(self, keyword_data: dict, trend_data: List[dict]) -> float:
        """기회 점수 계산 (0-100)"""
        
        # 검색량 점수 (높을수록 좋음)
        total_volume = keyword_data.get('pc_search_count', 0) + keyword_data.get('mobile_search_count', 0)
        volume_score = min(total_volume / 10000 * 40, 40)  # 최대 40점
        
        # 경쟁도 점수 (낮을수록 좋음)  
        competition = keyword_data.get('competition', '높음')
        competition_score = {'낮음': 30, '중간': 20, '높음': 10}.get(competition, 10)
        
        # 트렌드 점수
        if len(trend_data) >= 3:
            trend_direction = self._analyze_trend(trend_data)
            trend_score = {'rising': 30, 'stable': 20, 'falling': 10}.get(trend_direction, 15)
        else:
            trend_score = 15
        
        return volume_score + competition_score + trend_score
    
    def _analyze_seasonality(self, trend_data: List[dict]) -> Dict[str, float]:
        """계절성 패턴 분석"""
        monthly_patterns = {}
        
        for data in trend_data:
            # period 형태: "2024-01-01"
            month = data['period'].split('-')[1] 
            if month not in monthly_patterns:
                monthly_patterns[month] = []
            monthly_patterns[month].append(data['ratio'])
        
        # 월별 평균 계산
        monthly_avg = {}
        for month, ratios in monthly_patterns.items():
            monthly_avg[month] = np.mean(ratios)
        
        return monthly_avg
    
    def _generate_recommendations(self, keyword_data, trend, competition, opportunity) -> List[str]:
        """추천사항 생성"""
        recommendations = []
        
        if opportunity > 70:
            recommendations.append("높은 기회 점수 - 적극적 마케팅 권장")
        
        if trend == 'rising':
            recommendations.append("상승 트렌드 - 빠른 진입 필요")
        elif trend == 'falling':
            recommendations.append("하락 트렌드 - 신중한 접근 필요")
        
        if competition == '낮음':
            recommendations.append("경쟁도 낮음 - 진입 장벽 낮음")
        elif competition == '높음':
            recommendations.append("경쟁도 높음 - 차별화 전략 필요")
        
        # CTR 기반 추천
        mobile_ctr = keyword_data.get('mobile_ctr', 0)
        pc_ctr = keyword_data.get('pc_ctr', 0)
        
        if mobile_ctr > pc_ctr * 2:
            recommendations.append("모바일 최적화 광고 권장")
        elif pc_ctr > mobile_ctr * 2:
            recommendations.append("PC 타겟팅 광고 권장")
        
        return recommendations

4. Cache Service

책임: 데이터 캐싱, 성능 최적화

# cache-service/cache_manager.py
import redis
import json
import hashlib
from datetime import timedelta
from typing import Optional, Any
 
class CacheManager:
    def __init__(self, redis_client=None):
        # 로컬 개발 시 메모리 캐시 사용
        self.redis_client = redis_client
        self.memory_cache = {} if redis_client is None else None
    
    def _generate_key(self, prefix: str, params: dict) -> str:
        """캐시 키 생성"""
        param_str = json.dumps(params, sort_keys=True)
        param_hash = hashlib.md5(param_str.encode()).hexdigest()[:8]
        return f"{prefix}:{param_hash}"
    
    async def get(self, prefix: str, params: dict) -> Optional[Any]:
        """캐시 조회"""
        key = self._generate_key(prefix, params)
        
        if self.redis_client:
            cached = await self.redis_client.get(key)
            return json.loads(cached) if cached else None
        else:
            # 메모리 캐시 (개발 환경)
            return self.memory_cache.get(key)
    
    async def set(self, prefix: str, params: dict, data: Any, expire_seconds: int = 3600):
        """캐시 저장"""
        key = self._generate_key(prefix, params)
        
        if self.redis_client:
            await self.redis_client.setex(key, expire_seconds, json.dumps(data))
        else:
            # 메모리 캐시
            self.memory_cache[key] = data
    
    async def invalidate_pattern(self, pattern: str):
        """패턴 기반 캐시 무효화"""
        if self.redis_client:
            keys = await self.redis_client.keys(pattern)
            if keys:
                await self.redis_client.delete(*keys)
        else:
            # 메모리 캐시에서 패턴 매칭 삭제
            to_delete = [k for k in self.memory_cache.keys() if pattern in k]
            for k in to_delete:
                del self.memory_cache[k]
 
# 캐시 데코레이터
def cached(prefix: str, expire_seconds: int = 3600):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            cache_manager = kwargs.get('cache_manager') or args[0].cache_manager
            
            # 캐시 키용 파라미터 추출
            cache_params = {
                'func_name': func.__name__,
                'args': str(args[1:]),  # self 제외
                'kwargs': str(sorted(kwargs.items()))
            }
            
            # 캐시 확인
            cached_result = await cache_manager.get(prefix, cache_params)
            if cached_result is not None:
                return cached_result
            
            # 캐시 미스 시 실행
            result = await func(*args, **kwargs)
            
            # 결과 캐싱
            await cache_manager.set(prefix, cache_params, result, expire_seconds)
            
            return result
        return wrapper
    return decorator

5. Notification Service

책임: 알림, 이벤트 처리

# notification-service/notifiers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict
import asyncio
 
@dataclass
class NotificationEvent:
    event_type: str
    user_id: str  
    data: Dict
    priority: str = 'normal'  # 'high', 'normal', 'low'
 
class NotificationChannel(ABC):
    @abstractmethod
    async def send(self, event: NotificationEvent) -> bool:
        pass
 
class EmailNotifier(NotificationChannel):
    def __init__(self, smtp_config):
        self.smtp_config = smtp_config
    
    async def send(self, event: NotificationEvent) -> bool:
        """이메일 발송"""
        # 실제 구현 시 SendGrid, AWS SES 등 활용
        print(f"Email sent: {event.event_type} to {event.user_id}")
        return True
 
class WebhookNotifier(NotificationChannel):
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
    
    async def send(self, event: NotificationEvent) -> bool:
        """웹훅 발송"""
        # Slack, Discord 등으로 알림
        print(f"Webhook sent: {event.event_type}")
        return True
 
class NotificationService:
    def __init__(self):
        self.channels: List[NotificationChannel] = []
        self.event_queue = asyncio.Queue()
    
    def add_channel(self, channel: NotificationChannel):
        self.channels.append(channel)
    
    async def notify(self, event: NotificationEvent):
        """알림 발송"""
        await self.event_queue.put(event)
    
    async def process_notifications(self):
        """알림 큐 처리"""
        while True:
            try:
                event = await self.event_queue.get()
                
                # 우선순위별 처리
                if event.priority == 'high':
                    await self._send_immediately(event)
                else:
                    await self._send_normal(event)
                
                self.event_queue.task_done()
            
            except Exception as e:
                print(f"Notification error: {e}")
    
    async def _send_immediately(self, event: NotificationEvent):
        """즉시 발송"""
        tasks = [channel.send(event) for channel in self.channels]
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _send_normal(self, event: NotificationEvent):
        """일반 발송 (배치 처리)"""
        await asyncio.sleep(1)  # 배치 간격
        await self._send_immediately(event)

서비스 간 통신 설계

1. API Gateway 패턴

# api-gateway-config.yaml
routes:
  - path: /api/v1/keywords/*
    service: data-collection-service
    timeout: 30s
    retry: 3
    
  - path: /api/v1/analytics/*  
    service: data-processing-service
    timeout: 10s
    
  - path: /api/v1/auth/*
    service: authentication-service
    timeout: 5s
 
middleware:
  - rate_limiting:
      requests_per_minute: 100
  - authentication:
      required: true
      exclude_paths: ["/api/v1/auth/login"]
  - logging:
      enabled: true

2. 이벤트 기반 통신

# events/event_bus.py
from dataclasses import dataclass
from typing import Any, Callable, List
import asyncio
 
@dataclass
class Event:
    event_type: str
    source_service: str
    data: Any
    timestamp: float
 
class EventBus:
    def __init__(self):
        self.subscribers = {}
    
    def subscribe(self, event_type: str, handler: Callable):
        """이벤트 구독"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """이벤트 발행"""
        if event.event_type in self.subscribers:
            handlers = self.subscribers[event.event_type]
            tasks = [handler(event) for handler in handlers]
            await asyncio.gather(*tasks, return_exceptions=True)
 
# 사용 예시
event_bus = EventBus()
 
# 데이터 수집 완료 이벤트 구독
async def on_data_collected(event):
    """데이터 수집 완료 시 처리"""
    await data_processing_service.process(event.data)
 
async def on_analysis_complete(event):
    """분석 완료 시 알림"""
    await notification_service.notify(NotificationEvent(
        event_type='analysis_complete',
        user_id=event.data['user_id'],
        data=event.data
    ))
 
event_bus.subscribe('data_collected', on_data_collected)
event_bus.subscribe('analysis_complete', on_analysis_complete)

배포 및 운영

Docker Compose 로컬 개발환경

# docker-compose.yml
version: '3.8'
services:
  auth-service:
    build: ./auth-service
    ports:
      - "3001:3000"
    environment:
      - JWT_SECRET=${JWT_SECRET}
      - NAVER_ACCESS_KEY=${NAVER_ACCESS_KEY}
    
  data-collection-service:
    build: ./data-collection-service  
    ports:
      - "3002:3000"
    depends_on:
      - auth-service
    
  data-processing-service:
    build: ./data-processing-service
    ports:
      - "3003:3000"
    
  cache-service:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    
  api-gateway:
    image: nginx:alpine
    ports:
      - "8080:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - auth-service
      - data-collection-service  
      - data-processing-service

Kubernetes 배포 설정

# k8s/data-collection-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-collection-service
spec:
  replicas: 2
  selector:
    matchLabels:
      app: data-collection-service
  template:
    metadata:
      labels:
        app: data-collection-service
    spec:
      containers:
      - name: app
        image: gcr.io/project/data-collection-service:latest
        ports:
        - containerPort: 3000
        env:
        - name: AUTH_SERVICE_URL
          value: "http://auth-service:3000"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi" 
            cpu: "200m"
---
apiVersion: v1
kind: Service
metadata:
  name: data-collection-service
spec:
  selector:
    app: data-collection-service
  ports:
  - port: 3000
    targetPort: 3000

업데이트: 2025-11-08
단계: 설계 완료, 구현 대기


작성일: 2025-11-08