본문 바로가기
AI_RSS_트래픽 프로젝트

모르는 상태로 하는 RSS&분석&RAG 프로젝트(12) 데이터 수집(6)

by chol_rang 2025. 12. 16.

오늘은 tasks.py, views.py뿐만 아니라 테스트를 진행하면서 나온 에러 수정까지 진행했다. 

 

저번에 service(DCinside, Reddit의 post들을 수집)를 작성 이후 

 

비동기 방식으로 큐에 올려놓을수 있는 tasks.py를 작성했다. 

 

간단하게 

 

    try:
        # Reddit RSS 수집 서비스 인스턴스 생성
        collector = RedditRSSCollectorService()
       
        # 수집 실행
        result = collector.collect_from_source(source_id=source_id)
       
        logger.info(
            f"Reddit RSS 수집 태스크 완료: {result.get('source', 'Unknown')} - "
            f"상태: {result.get('status')}"
        )

result에 이전에 만든 service를 실행하는것으로 마무리 하고 

통계를 볼 수 있도록 업데이트하는 로직을 추가했다. 

 

Reddit뿐만 아니라 DCinside도 동일한 방식으로 작성하고 

def collect_all_social_media_task():

도 만들어

 

    session = CollectionSession.objects.create(
        status='running',
        started_at=timezone.now()
    )
   
    # 활성화된 모든 소셜 미디어 소스 조회
    social_media_sources = SocialMediaSource.objects.filter(
        is_active=True
    )
   
    session.total_sources = social_media_sources.count()
    session.save()
   
    started_tasks = []
    failed_sources = []
   
    for source in social_media_sources:
        try:
            # 플랫폼별로 적절한 태스크 선택
            if source.platform == 'dcinside':
                result = collect_dcinside_task.delay(
                    source_id=source.id,
                    session_id=session.id
                )
            elif source.platform == 'reddit':
                result = collect_reddit_rss_task.delay(
                    source_id=source.id,
                    session_id=session.id
                )
            else:
                logger.warning(
                    f"지원하지 않는 플랫폼: {source.platform} "
                    f"(소스 ID: {source.id})"
                )
                failed_sources.append({
                    'source_id': source.id,
                    'source_name': str(source),
                    'error': f'지원하지 않는 플랫폼: {source.platform}'
                })
                session.failed_sources += 1
                session.save()
                continue
           
            started_tasks.append({
                'source_id': source.id,
                'source_name': str(source),
                'platform': source.platform,
                'task_id': result.id
            })
            logger.info(
                f"소셜 미디어 수집 태스크 시작: {str(source)} "
                f"(플랫폼: {source.platform}, "
                f"Task ID: {result.id}, Session ID: {session.id})"
            )
        except Exception as e:
            failed_sources.append({
                'source_id': source.id,
                'source_name': str(source),
                'error': str(e)
            })
            logger.error(
                f"소셜 미디어 수집 태스크 시작 실패: {str(source)} - {str(e)}",
                exc_info=True
            )
            # 실패한 소스 카운트 증가
            session.failed_sources += 1
            session.save()
   
    logger.info(
        f"소셜 미디어 수집 세션 시작: Session ID={session.id}, "
        f"총 {session.total_sources}개 소스"
    )
   
    # 세션 완료 체크 및 리포트 생성 태스크 예약
    check_session_completion.delay(session.id)
   
    return {
        'status': 'started',
        'session_id': session.id,
        'sources_count': social_media_sources.count(),
        'started_tasks': started_tasks,
        'failed_sources': failed_sources,
        'message': (
            f'{len(started_tasks)}개 소셜 미디어 소스에 대한 '
            f'수집 태스크를 시작했습니다.'
        )
    }

플랫폼 별로 태스크를 선택하고 tasks에 DB에 있는 source_id,name,platform 등을 넣을 수 있도록 만들었다. 

 

 

views.py

 

view에는 SocialMediaCollectionViewSet이라는 이름과 AllCollectionViewSet이라는 이름으로 

sns만 수집할 수 있는 로직와 sns와 news를 합쳐서 한번에 수집할 수 있도록 하는 로직을 따로 만들었다. 

 

최종적으로는 자동화를 시킬 계획이지만 일단 세부적으로 나눠놓고 테스트를 하기 편하게 만들었다. 

 

여기서 중요한 내용은 중복 실행 방지 이다.

테스트를 진행했었을때 tasks가 자동적으로 2초안에 4~5번정도가 재시작되었는데 이로인해 reports수가 한번 api를 했음에도4~5개가 생기는 불상사가 생겼다. 

시간적인 효율과 자동화를 했을때도 해당문제가 발생한다면 속도저하, tasks 과다 사용등 문제가 발생할것으로 보여져 

5분 이내에 동일한 세션이 있을경우 실행하지않는다는 조건을 넣었다. 

 

class SocialMediaCollectionViewSet(viewsets.ViewSet):
    """소셜 미디어 수집 작업 트리거 ViewSet"""

    def list(self, request):
        """최근 소셜 미디어 수집 작업 목록 조회"""
        recent_sources = (
            SocialMediaSource.objects
            .filter(is_active=True)
            .order_by('-last_collected_at')[:10]
        )
        return Response({
            'message': '활성화된 소셜 미디어 소스 목록',
            'count': len(recent_sources),
            'sources': SocialMediaSourceSerializer(
                recent_sources, many=True
            ).data
        })

    def create(self, request):
        """
        모든 활성화된 소셜 미디어 소스에서 수집 작업 시작
        중복 실행 방지: 최근 5분 이내에 실행 중인 세션이 있으면 새로 시작하지 않습니다.
        """
        # 중복 실행 방지: 최근 5분 이내에 실행 중인 Social Media 세션이 있는지 확인
        recent_time = timezone.now() - timedelta(minutes=5)
        running_sessions = CollectionSession.objects.filter(
            status='running',
            started_at__gte=recent_time
        )
        
        if running_sessions.exists():
            recent_session = running_sessions.order_by('-started_at').first()
            return Response({
                'status': 'already_running',
                'message': (
                    f'이미 소셜 미디어 수집 작업이 실행 중입니다. '
                    f'세션 ID: {recent_session.id}, '
                    f'시작 시간: {recent_session.started_at.strftime("%Y-%m-%d %H:%M:%S")}'
                ),
                'session_id': recent_session.id
            }, status=status.HTTP_409_CONFLICT)
        
        try:
            # 전체 소셜 미디어 소스 수집
            task_result = collect_all_social_media_task.delay()
            logger.info(
                f"전체 소셜 미디어 소스 수집 태스크 시작 "
                f"(Task ID: {task_result.id})"
            )
            return Response({
                'status': 'started',
                'task_id': task_result.id,
                'message': (
                    '모든 활성화된 소셜 미디어 소스에서 '
                    '수집 작업이 시작되었습니다.'
                )
            }, status=status.HTTP_202_ACCEPTED)

        except Exception as e:
            logger.error(
                f"소셜 미디어 수집 작업 트리거 실패: {str(e)}",
                exc_info=True
            )
            return Response({
                'status': 'error',
                'message': f'수집 작업 시작 실패: {str(e)}'
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

 

이정도로 마무리 했고  AllCoectionViewSet도 비슷하게 돌아간다. 

 

이렇게 마무리를 하고 테스트를 진행해보니 1가지 문제가 더 발생했다.

그 뒤에 내용은 트러블슈팅으로 새글로 작성하겠다.