본문 바로가기
AI_RSS_트래픽 프로젝트

모르는 상태로 하는 RSS&분석&RAG 프로젝트(19) 트렌드 분석

by chol_rang 2026. 1. 14.

마지막으로 기능구현한 트렌드 상세 분석한 내용을 작성하면서 나도 이해해보려고 한다.

 

글이 많이 길 예정이니 주의....

 

1. 급상승 키워드 탐지 detect_surge_keywords():

시간대 별로 키워드 빈도를 비교하여 급격히 증가한 키워드 찾기

  • 1단계: 데이터 준비 및 키워드 추출
    - 최근 N일간의 뉴스 기사와 SNS 게시물을 가져옵니다
    - 각 게시물에서 형태소 분석을 통해 키워드를 추출합니다
    - 키워드와 게시물의 발행 시간을 함께 저장합니다
for article in news_queryset:
    text = extract_text_from_news_article(article)
    if text:
        keywords = analyzer.extract_keywords(text, min_length=2, exclude_stopwords=True)
        news_keywords_map[article.id] = {
            'keywords': keywords,
            'published_at': article.published_at
        }

각 뉴스 기사에서 텍스트를 추출후, 형태소 분석을 통해 키워드를 뽑아냄. 게시물 ID를 키로하고 키워드 리스트와 발행 시간을 값으로 저장

 

  • 2단계: 시간대별 버킷 분류
    - 게시물의 발행 시간을 기준으로 시간대별로 그룹화합니다
    - 예를 들어 interval_hours가 6이면, 0-5시, 6-11시, 12-17시, 18-23시로 나눕니다
    - 각 시간대 버킷에 속한 키워드들의 빈도를 카운트합니다
time_buckets = defaultdict(lambda: defaultdict(int))
for article_id, data in news_keywords_map.items():
    if data['published_at']:
        bucket_key = data['published_at'].replace(
            minute=0, second=0, microsecond=0
        )
        hours_offset = bucket_key.hour % interval_hours
        bucket_key = bucket_key - timedelta(hours=hours_offset)
        
        for keyword in data['keywords']:
            time_buckets[bucket_key][keyword] += 1

게시물의 발행 시간에 분과 초를 제거하고, interval_hours로 나눈 나머지를 빼서 시간 버킷의 시작 시간을 만든다. 

각 키워드의 빈도를 카운트하는 로직.

 

  • 3단계: 급상승 키워드 탐지
    - 마지막 시간대와 그 이전 시간대를 비교합니다
    - 각 키워드의 빈도를 전체 키워드 수로 나눠서 정규화합니다 (비율로 변환)
    - 현재 시간대의 비율을 이전 시간대의 비율로 나눠서 증가율을 계산합니다
    - 증가율이 surge_threshold (기본값 2.0, 즉 2배) 이상인 키워드를 급상승 키워드로 판단합니다
sorted_buckets = sorted(time_buckets.keys())

if len(sorted_buckets) >= 2:
    current_bucket = sorted_buckets[-1]
    previous_bucket = sorted_buckets[-2]
    
    current_freq = time_buckets[current_bucket]
    previous_freq = time_buckets[previous_bucket]
    
    current_total = sum(current_freq.values())
    previous_total = sum(previous_freq.values())
    
    if current_total > 0 and previous_total > 0:
        surge_keywords = []
        for keyword, count in current_freq.items():
            current_norm = count / current_total if current_total > 0 else 0
            previous_norm = previous_freq.get(keyword, 0) / previous_total if previous_total > 0 else 0
            
            if current_norm >= min_frequency and previous_norm > 0:
                growth_ratio = current_norm / previous_norm if previous_norm > 0 else 0
                
                if growth_ratio >= surge_threshold:
                    surge_keywords.append({
                        'keyword': keyword,
                        'current_frequency': current_norm,
                        'previous_frequency': previous_norm,
                        'growth_ratio': growth_ratio,
                        'current_count': count,
                        'previous_count': previous_freq.get(keyword, 0),
                        'time_bucket': current_bucket.isoformat()
                    })

시간 버킷을 시간순으로 정렬한 후, 마지막 버킷과 그 이전 버킷을 비교합니다. 각 키워드의 빈도를 전체 키워드 수로 나눠서 정규화하고, 현재 비율을 이전 비율로 나눠서 증가율을 계산합니다. 증가율이 임계값 이상이면 급상승 키워드로 판단합니다.

 

  • 4단계: 결과 정리
    - 증가율이 높은 순서대로 정렬합니다
    - top_n이 지정되어 있으면 상위 N개만 반환합니다
    - 각 키워드의 현재 빈도, 이전 빈도, 증가율 등의 정보를 포함합니다
surge_keywords.sort(key=lambda x: x['growth_ratio'], reverse=True)

if top_n and top_n > 0:
    surge_keywords = surge_keywords[:top_n]

result['news_surge_keywords'] = surge_keywords

증가율이 높은 순서대로 정렬하고, top_n이 지정되어 있으면 상위 N개만 선택합니다.

  • 반환 데이터:
    - news_surge_keywords: 뉴스에서 급상승한 키워드 리스트
    - sns_surge_keywords: SNS에서 급상승한 키워드 리스트
    - summary: 분석 요약 정보 (플랫폼, 기간, 급상승 키워드 개수 등)

 

2. 트렌드 동기화 분석 analyze_trend_synchronization():

뉴스와 SNS에서 같은 키워드가 시간대별로 얼마나 비슷한 패턴으로 나타나는지 측정합니다.

 

  • 1단계: 형태소 분석 및 키워드 추출
    - 뉴스 기사와 SNS 게시물에서 각각 키워드를 추출합니다
news_keywords_map = {}
for article in news_queryset:
    text = extract_text_from_news_article(article)
    if text:
        keywords = analyzer.extract_keywords(text, min_length=2, exclude_stopwords=True)
        news_keywords_map[article.id] = {
            'keywords': keywords,
            'published_at': article.published_at
        }

sns_keywords_map = {}
for post in sns_queryset:
    text = extract_text_from_sns_post(post)
    if text:
        keywords = analyzer.extract_keywords(text, min_length=2, exclude_stopwords=True)
        sns_keywords_map[post.id] = {
            'keywords': keywords,
            'published_at': post.published_at
        }

뉴스와 SNS 각각에서 키워드를 추출합니다. 형태소 분석은 각 게시물당 한 번만 수행하고, 결과를 재사용하여 성능을 최적화합니다.

 

  • 2단계: 시간대별 빈도 계산
    - 각 플랫폼의 키워드를 시간대별로 그룹화합니다
    - interval_hours 간격으로 시간 버킷을 만듭니다
    - 각 시간 버킷에서 키워드별 빈도를 카운트합니다
news_time_buckets = defaultdict(lambda: defaultdict(int))
for _, data in news_keywords_map.items():
    if data['published_at']:
        bucket_key = data['published_at'].replace(
            minute=0, second=0, microsecond=0
        )
        hours_offset = bucket_key.hour % interval_hours
        bucket_key = bucket_key - timedelta(hours=hours_offset)
        
        for keyword in data['keywords']:
            news_time_buckets[bucket_key][keyword] += 1

sns_time_buckets = defaultdict(lambda: defaultdict(int))
for _, data in sns_keywords_map.items():
    if data['published_at']:
        bucket_key = data['published_at'].replace(
            minute=0, second=0, microsecond=0
        )
        hours_offset = bucket_key.hour % interval_hours
        bucket_key = bucket_key - timedelta(hours=hours_offset)
        
        for keyword in data['keywords']:
            sns_time_buckets[bucket_key][keyword] += 1

뉴스와 SNS 각각에 대해 시간대별 키워드 빈도를 계산합니다. defaultdict를 사용하여 존재하지 않는 키에 접근해도 자동으로 초기화됩니다.

defaultdict: 존재하지 않는 키에 접근해도 자동으로 기본값을 생성하는 딕셔너리입니다. 예: defaultdict(int)는 키가 없으면 0을 반환합니다.

 

  • 3단계: 공통 키워드 찾기
    - 뉴스와 SNS 양쪽에 모두 나타나는 키워드만 분석 대상으로 합니다
    - 양쪽에 모두 있어야 동기화를 비교할 수 있기 때문입니다
all_news_keywords = set()
for bucket in news_time_buckets.values():
    all_news_keywords.update(bucket.keys())

all_sns_keywords = set()
for bucket in sns_time_buckets.values():
    all_sns_keywords.update(bucket.keys())

common_keywords = all_news_keywords & all_sns_keywords

뉴스와 SNS 각각의 모든 키워드를 수집한 후, 교집합 연산자(&)를 사용하여 양쪽에 모두 나타나는 키워드만 선택합니다.

 

  • 4단계: 시간대별 빈도 시퀀스 생성
    - 각 공통 키워드에 대해 모든 시간 버킷을 순회합니다
    - 각 시간 버킷에서 해당 키워드의 빈도를 전체 키워드 수로 나눠서 정규화합니다
    - 키워드가 없는 시간 버킷은 0으로 처리합니다
    - 이렇게 뉴스와 SNS 각각에 대해 시간대별 빈도 리스트를 만듭니다
for keyword in common_keywords:
    news_sequence = []
    sns_sequence = []
    
    for bucket in all_buckets:
        news_count = news_time_buckets[bucket].get(keyword, 0)
        sns_count = sns_time_buckets[bucket].get(keyword, 0)
        
        news_total = sum(news_time_buckets[bucket].values())
        sns_total = sum(sns_time_buckets[bucket].values())
        
        news_norm = news_count / news_total if news_total > 0 else 0
        sns_norm = sns_count / sns_total if sns_total > 0 else 0
        
        if news_norm >= min_frequency or sns_norm >= min_frequency:
            news_sequence.append(news_norm)
            sns_sequence.append(sns_norm)

각 공통 키워드에 대해 모든 시간 버킷을 순회하면서, 해당 키워드의 빈도를 전체 키워드 수로 나눠서 정규화합니다. 키워드가 없는 버킷은 .get(keyword, 0)으로 0을 반환합니다. min_frequency(2글자) 이상인 경우만 시퀀스에 추가합니다.

 

  • 5단계: 상관관계 계산 (피어슨 상관계수)
    - 뉴스의 시간대별 빈도 리스트와 SNS의 시간대별 빈도 리스트를 비교합니다
    - 두 리스트가 얼마나 비슷한 패턴을 보이는지 수치로 계산합니다
    - 상관계수는 -1부터 1까지의 값을 가지며, 1에 가까울수록 동기화되어 있다는 의미입니다
if len(news_sequence) >= 2 and len(sns_sequence) >= 2:
    n = len(news_sequence)
    news_mean = sum(news_sequence) / n
    sns_mean = sum(sns_sequence) / n
    
    numerator = sum((news_sequence[i] - news_mean) * (sns_sequence[i] - sns_mean) 
                  for i in range(n))
    
    news_var = sum((x - news_mean) ** 2 for x in news_sequence)
    sns_var = sum((x - sns_mean) ** 2 for x in sns_sequence)
    
    denominator = (news_var * sns_var) ** 0.5
    
    if denominator > 0:
        correlation = numerator / denominator
    else:
        correlation = 0.0

피어슨 상관계수 공식을 사용하여 두 시퀀스의 상관관계를 계산합니다. numerator는 공분산, denominator는 각 시퀀스의 표준편차를 곱한 값입니다. 상관계수가 1에 가까우면 두 패턴이 비슷하게 움직인다는 의미입니다.

 

  • 6단계: 동기화/비동기화 분류
    - 상관계수가 0.7 이상이면 동기화된 키워드로 분류합니다
    - 상관계수가 0.3 미만이면 비동기화된 키워드로 분류합니다
    - 0.3과 0.7 사이는 중간 상태로 간주합니다
correlation_scores.sort(key=lambda x: abs(x['correlation']), reverse=True)

synchronized_keywords = [
    item for item in correlation_scores 
    if item['correlation'] >= 0.7
]
desynchronized_keywords = [
    item for item in correlation_scores 
    if item['correlation'] < 0.3
]

상관계수의 절댓값 순으로 정렬한 후, 0.7 이상은 동기화, 0.3 미만은 비동기화로 분류합니다.

 

  • 반환 데이터:
    - synchronized_keywords: 동기화된 키워드 리스트 (상관계수 높음)
    - desynchronized_keywords: 비동기화된 키워드 리스트 (상관계수 낮음)
    - correlation_scores: 모든 키워드의 상관계수 정보
    - summary: 분석 요약 정보 (동기화/비동기화 키워드 개수, 평균 상관계수 등)

 

3. 시간대별 트렌드 분석 (analyze_hourly_trends)

하루 24시간 중 각 시간대별로 어떤 키워드가 인기인지 분석합니다.

 

  • 1단계: 데이터 준비 및 키워드 추출
    - 최근 N일간의 뉴스 기사와 SNS 게시물을 가져옵니다
    - 각 게시물에서 형태소 분석을 통해 키워드를 추출합니다
    - 게시물의 발행 시간에서 시간(hour) 정보를 추출합니다 (0시부터 23시까지)
hourly_keywords = defaultdict(lambda: defaultdict(int))
news_keywords_map = {}

for article in news_queryset:
    text = extract_text_from_news_article(article)
    if text:
        keywords = analyzer.extract_keywords(text, min_length=2, exclude_stopwords=True)
        news_keywords_map[article.id] = keywords
        
        if article.published_at:
            hour = article.published_at.hour
            for keyword in keywords:
                hourly_keywords[hour][keyword] += 1

각 게시물에서 키워드를 추출하고, 발행 시간에서 시간(hour) 정보를 가져옵니다. 0시부터 23시까지 각 시간대별로 키워드 빈도를 카운트합니다.

 

  • 2단계: 시간대별 키워드 빈도 계산
    - 각 게시물의 발행 시간을 기준으로 0시, 1시, 2시... 23시로 분류합니다
    - 각 시간대에 속한 게시물들의 키워드 빈도를 카운트합니다
    - 예를 들어 12시에 발행된 게시물들의 키워드를 모두 모아서 빈도를 계산합니다
  • 3단계: 정규화 및 필터링
    - 각 시간대의 키워드 빈도를 전체 키워드 수로 나눠서 비율로 변환합니다
    - min_frequency보다 작은 비율의 키워드는 제외합니다 (노이즈 제거)
    - 이렇게 하면 각 시간대에서 실제로 의미 있는 키워드만 남습니다
for hour in range(24):
    if hour in hourly_keywords:
        hour_freq = hourly_keywords[hour]
        total = sum(hour_freq.values())
        
        if total > 0:
            normalized = {
                k: v / total for k, v in hour_freq.items()
                if (v / total) >= min_frequency
            }

 각 시간대(0시~23시)를 순회하면서, 해당 시간대의 키워드 빈도를 전체 키워드 수로 나눠서 정규화합니다. 딕셔너리 컴프리헨션을 사용하여 min_frequency 이상인 키워드만 남깁니다.

 

  • 4단계: 상위 키워드 추출
    - 각 시간대의 키워드를 빈도 순으로 정렬합니다
    - top_n이 지정되어 있으면 상위 N개만 선택합니다
    - 각 키워드의 빈도 정보를 포함합니다
sorted_keywords = sorted(
    normalized.items(),
    key=lambda x: x[1],
    reverse=True
)

if top_n and top_n > 0:
    sorted_keywords = sorted_keywords[:top_n]

result['news_hourly_trends'][hour] = {
    'keywords': [
        {'keyword': k, 'frequency': v}
        for k, v in sorted_keywords
    ],
    'total_keywords': len(normalized),
    'total_count': total
}

정규화된 키워드를 빈도 순으로 정렬하고, top_n이 지정되어 있으면 상위 N개만 선택합니다. 각 키워드와 빈도를 딕셔너리 형태로 저장합니다.

  • 반환 데이터:
    - news_hourly_trends: 뉴스의 시간대별 트렌드 (0시~23시)
    - sns_hourly_trends: SNS의 시간대별 트렌드 (0시~23시)
    - summary: 분석 요약 정보 (플랫폼, 기간, 분석된 시간대 개수 등)
  • 예시:
    - 0시: 달러(15%), 환율(10%), 코인(5%)
    - 12시: 점심(20%), 맛집(15%), 배달(10%)
    - 18시: 저녁(18%), 퇴근(12%), 교통(8%)
    - 각 시간대마다 다른 키워드가 인기인 것을 확인할 수 있습니다

 

4. 참여도 기반 인기 키워드 분석 (analyze_engagement_keywords)

SNS 게시물의 참여도(조회수, 댓글수, 좋아요수, 공유수)를 활용하여 실제 반응이 높은 키워드를 찾습니다.

 

  • 1단계: 데이터 준비
    - 최근 N일간의 SNS 게시물을 가져옵니다
    - 각 게시물의 참여도 메트릭을 확인합니다 (조회수, 댓글수, 좋아요수, 공유수)

    2단계: 참여도 점수 계산
    - 각 메트릭에 가중치를 적용하여 종합 참여도 점수를 계산합니다
    - 기본 가중치: 조회수 0.1, 댓글수 0.3, 좋아요수 0.4, 공유수 0.2
    - 예를 들어 조회수 1000, 댓글수 50, 좋아요수 100, 공유수 20이면:
      참여도 점수 = 1000×0.1 + 50×0.3 + 100×0.4 + 20×0.2 = 100 + 15 + 40 + 4 = 159점
if engagement_weights is None:
    engagement_weights = {
        'views': 0.1,
        'comments': 0.3,
        'likes': 0.4,
        'shares': 0.2
    }

for post in sns_queryset:
    text = extract_text_from_sns_post(post)
    if not text:
        continue
    
    keywords = analyzer.extract_keywords(text, min_length=2, exclude_stopwords=True)
    
    views = post.views_count or 0
    comments = post.comments_count or 0
    likes = post.likes_count or 0
    shares = post.shares_count or 0
    
    engagement_score = (
        views * engagement_weights['views'] +
        comments * engagement_weights['comments'] +
        likes * engagement_weights['likes'] +
        shares * engagement_weights['shares']
    )

각 게시물의 참여도 메트릭에 가중치를 곱해서 종합 참여도 점수를 계산합니다. 좋아요수에 가장 높은 가중치(0.4)를 부여하여 실제 반응을 더 중요하게 봅니다.

 

  • 3단계: 키워드별 참여도 집계
    - 각 게시물에서 키워드를 추출합니다
    - 게시물의 참여도 점수를 해당 게시물에 포함된 모든 키워드에 분배합니다
    - 각 키워드별로 총 참여도 점수, 게시물 개수, 각 메트릭의 합계를 누적합니다
keyword_engagement = defaultdict(lambda: {
    'total_engagement': 0.0,
    'post_count': 0,
    'total_views': 0,
    'total_comments': 0,
    'total_likes': 0,
    'total_shares': 0,
    'posts': []
})

for post in sns_queryset:
    # ... 키워드 추출 및 참여도 점수 계산 ...
    
    for keyword in keywords:
        keyword_engagement[keyword]['total_engagement'] += engagement_score
        keyword_engagement[keyword]['post_count'] += 1
        keyword_engagement[keyword]['total_views'] += views
        keyword_engagement[keyword]['total_comments'] += comments
        keyword_engagement[keyword]['total_likes'] += likes
        keyword_engagement[keyword]['total_shares'] += shares
        keyword_engagement[keyword]['posts'].append({
            'post_id': post.id,
            'engagement_score': engagement_score,
            'views': views,
            'comments': comments,
            'likes': likes,
            'shares': shares
        })

각 키워드에 대해 참여도 점수와 각 메트릭을 누적합니다. 게시물 정보도 함께 저장하여 나중에 바이럴 키워드 탐지에 사용합니다.

 

  • 4단계: 평균 참여도 계산
    - 각 키워드의 총 참여도 점수를 게시물 개수로 나눠서 평균 참여도를 계산합니다
    - 각 메트릭(조회수, 댓글수 등)의 평균도 계산합니다
    - 키워드의 빈도(전체 게시물 중 몇 퍼센트에 나타나는지)도 계산합니다
for keyword, data in keyword_engagement.items():
    post_count = data['post_count']
    
    frequency = post_count / total_posts if total_posts > 0 else 0
    if frequency < min_frequency:
        continue
    
    avg_engagement = data['total_engagement'] / post_count if post_count > 0 else 0
    avg_views = data['total_views'] / post_count if post_count > 0 else 0
    avg_comments = data['total_comments'] / post_count if post_count > 0 else 0
    avg_likes = data['total_likes'] / post_count if post_count > 0 else 0
    avg_shares = data['total_shares'] / post_count if post_count > 0 else 0
    
    engagement_keywords.append({
        'keyword': keyword,
        'frequency': frequency,
        'post_count': post_count,
        'avg_engagement_score': avg_engagement,
        'total_engagement_score': data['total_engagement'],
        'avg_views': avg_views,
        'avg_comments': avg_comments,
        'avg_likes': avg_likes,
        'avg_shares': avg_shares,
        'total_views': data['total_views'],
        'total_comments': data['total_comments'],
        'total_likes': data['total_likes'],
        'total_shares': data['total_shares']
    })

각 키워드의 총 참여도 점수를 게시물 개수로 나눠서 평균을 계산합니다. min_frequency보다 작은 빈도의 키워드는 제외합니다.

 

 

  • 5단계: 필터링 및 정렬
    - min_frequency보다 작은 빈도의 키워드는 제외합니다
    - 평균 참여도 점수가 높은 순서대로 정렬합니다
    - top_n이 지정되어 있으면 상위 N개만 선택합니다
engagement_keywords.sort(key=lambda x: x['avg_engagement_score'], reverse=True)

if top_n and top_n > 0:
    engagement_keywords = engagement_keywords[:top_n]

평균 참여도 점수 순으로 정렬하고, top_n이 지정되어 있으면 상위 N개만 선택합니다.

lambda를 참 많이 쓴거같다. 쓰지않으면 로직자체가 길어지고 한번쓰고 말 함수이기에 간단하게 해볼생각으로 했지만 

이해하기에는 더욱 복잡해진거같다.

 

  • 6단계: 바이럴 키워드 탐지
    - 상위 20% 키워드 중에서 최근 1일간의 참여도가 평균보다 1.5배 이상 높은 키워드를 찾습니다
    - 이런 키워드는 최근 급상승한 바이럴 키워드로 간주합니다
    - 바이럴 점수도 계산하여 얼마나 급상승했는지 수치화합니다
viral_keywords = []
if len(engagement_keywords) > 0:
    top_20_percent = max(1, int(len(engagement_keywords) * 0.2))
    top_keywords = engagement_keywords[:top_20_percent]
    
    recent_cutoff = timezone.now() - timedelta(days=1)
    for item in top_keywords:
        keyword = item['keyword']
        keyword_data = keyword_engagement[keyword]
        
        recent_posts = [
            p for p in keyword_data['posts']
            if sns_queryset.filter(id=p['post_id'], published_at__gte=recent_cutoff).exists()
        ]
        
        if recent_posts:
            recent_avg_engagement = sum(p['engagement_score'] for p in recent_posts) / len(recent_posts)
            
            if recent_avg_engagement > item['avg_engagement_score'] * 1.5:
                viral_keywords.append({
                    **item,
                    'recent_avg_engagement': recent_avg_engagement,
                    'recent_post_count': len(recent_posts),
                    'viral_score': recent_avg_engagement / item['avg_engagement_score'] if item['avg_engagement_score'] > 0 else 0
                })
    
    viral_keywords.sort(key=lambda x: x.get('viral_score', 0), reverse=True)

상위 20% 키워드 중에서 최근 1일간 게시물의 평균 참여도가 전체 평균보다 1.5배 이상 높은 키워드를 바이럴로 판단합니다. 바이럴 점수는 최근 평균을 전체 평균으로 나눈 값입니다

  • 반환 데이터:
    - engagement_keywords: 참여도 기반 인기 키워드 리스트 (평균 참여도 순)
    - viral_keywords: 바이럴 키워드 리스트 (최근 급상승)
    - summary: 분석 요약 정보 (총 키워드 수, 바이럴 키워드 수, 평균 참여도 점수 등)
  • 예시:
    - 비트코인 키워드가 포함된 게시물 100개
    - 평균 참여도 점수: 500점
    - 최근 1일간 게시물 10개의 평균 참여도 점수: 800점
    - 800 / 500 = 1.6배이므로 바이럴 키워드로 판단

 

위의 함수들을 작성 후 tasks.py에 올려놓는 작업까지 마무리 후 블로그를 작성하다보니 시간이 길어졌고 실제로 코딩을 하는시간이 적어 더 오래걸린거같다.

 

이후에는 Dashboard에 sns,news posts, 분석결과 등을 각각 분산하여 api로 쏴서 한 페이지에 post,분석결과등을 볼 수 있게끔 

api를 만들어볼 생각이다. 

 

물론, 분석결과도 저장해야하기에 그걸 먼저 하고 난뒤에...