본문 바로가기
AI_RSS_트래픽 프로젝트

모르는 상태로 하는 RSS&분석&RAG 프로젝트(7) 데이터 수집(1)

by chol_rang 2025. 11. 27.

진짜 오늘부턴 코드 작성 및 확인 해보았다.

 

간단하게 작성해보려고 했는데 생각보다 엄청 길어지고 django 틀에 맞추고 가독성좋은 코드를 작성하기 위해서 길어지기도 하고 redis를 섞어서 사용하니까 많이 코드가 어지럽고 어려웠다.

 

먼저 RSS로 가져올 뉴스의 모델을 정의하였다.

 

 

class NewsSource(models.Model):
    name = models.CharField(max_length=100, unique=True)
    url = models.URLField(max_length=500)
    source_type = models.CharField(max_length=20, choices=[('rss', 'RSS'), ('api', 'API'), ('scraping', '웹 스크래핑')], default='rss')
    is_active = models.BooleanField(default=True)
    collection_interval = models.IntegerField(default=60)
    last_collected_at = models.DateTimeField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        verbose_name = '뉴스 소스'
        verbose_name_plural = '뉴스 소스'
        ordering = ['-created_at']


class NewsArticle(models.Model):
    source = models.ForeignKey(NewsSource, on_delete=models.CASCADE, related_name='articles')
    title = models.CharField(max_length=500)
    url = models.URLField(max_length=1000, unique=True, db_index=True)
    description = models.TextField(blank=True, null=True)
    content = models.TextField(blank=True, null=True)
    author = models.CharField(max_length=200, blank=True, null=True)
    category = models.CharField(max_length=100, blank=True, null=True, db_index=True)
    published_at = models.DateTimeField(null=True, blank=True, db_index=True)
    collected_at = models.DateTimeField(auto_now_add=True, db_index=True)
    is_processed = models.BooleanField(default=False)
    
    class Meta:
        verbose_name = '뉴스 기사'
        verbose_name_plural = '뉴스 기사'
        ordering = ['-published_at', '-collected_at']
        indexes = [
            models.Index(fields=['-published_at', '-collected_at']),
            models.Index(fields=['source', '-published_at']),
        ]


class DataCollectionJob(models.Model):
    STATUS_CHOICES = [('pending', '대기 중'), ('running', '실행 중'), ('completed', '완료'), ('failed', '실패')]
    
    source = models.ForeignKey(NewsSource, on_delete=models.SET_NULL, null=True, blank=True, related_name='collection_jobs')
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    started_at = models.DateTimeField(auto_now_add=True)
    completed_at = models.DateTimeField(null=True, blank=True)
    items_collected = models.IntegerField(default=0)
    error_message = models.TextField(blank=True, null=True)
    
    class Meta:
        verbose_name = '데이터 수집 작업'
        verbose_name_plural = '데이터 수집 작업'
        ordering = ['-started_at']

 

기본적은것들은 정리했고 model에서 중요한것은

- on_delete=models.CASCADE: 소스 삭제 시 기사도 함께 삭제
- on_delete=models.SET_NULL: 소스 삭제 시 작업 로그는 유지 (히스토리 보존)
- Meta 클래스: 모델 메타데이터 설정 (verbose_name, ordering 등)
- db_index=True: 자주 조회하는 필드에 인덱스 추가

 위에 있는 내용으로 실제코드에는 help_text를 많이 넣어서 나중에 헷갈리지않도록 잘 정리해두었지만 블로그에 다 올리기에는 내용이 너무 길어져 빼고 간단하게 넣어보았다.

 

models를 작성했으면 serializers도 작성해야하는데 

 

class NewsSourceSerializer(serializers.ModelSerializer):
    article_count = serializers.IntegerField(read_only=True)
    last_collected_at_display = serializers.DateTimeField(read_only=True, source='last_collected_at', format='%Y-%m-%d %H:%M:%S')
    
    class Meta:
        model = NewsSource
        fields = '__all__'
        read_only_fields = ['id', 'created_at', 'updated_at', 'last_collected_at']
    
    def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'name' in data:
            name = data.get('name', '').strip()
            if not name:
                raise serializers.ValidationError({'name': '소스 이름은 필수입니다.'})
            data['name'] = name
        
        if 'url' in data:
            url = data.get('url', '').strip()
            if not url:
                raise serializers.ValidationError({'url': 'URL은 필수입니다.'})
            if not (url.startswith('http://') or url.startswith('https://')):
                raise serializers.ValidationError({'url': 'URL은 http:// 또는 https://로 시작해야 합니다.'})
            data['url'] = url
        
        return data
    
    def to_representation(self, instance):
        representation = super().to_representation(instance)
        representation['article_count'] = instance.articles.count()
        return representation


class NewsArticleSerializer(serializers.ModelSerializer):
    source_detail = NewsSourceSerializer(source='source', read_only=True)
    source_name = serializers.CharField(source='source.name', read_only=True)
    title_short = serializers.SerializerMethodField()
    published_at_display = serializers.DateTimeField(read_only=True, source='published_at', format='%Y-%m-%d %H:%M:%S')
    
    class Meta:
        model = NewsArticle
        fields = '__all__'
        read_only_fields = ['id', 'collected_at', 'url']
    
    def get_title_short(self, obj):
        if obj.title and len(obj.title) > 50:
            return obj.title[:50] + '...'
        return obj.title or ''
    
    def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'title' in data:
            title = data.get('title', '').strip()
            if not title:
                raise serializers.ValidationError({'title': '제목은 필수입니다.'})
            data['title'] = title
        
        if 'url' in data:
            url = data.get('url', '').strip()
            if url and not (url.startswith('http://') or url.startswith('https://')):
                raise serializers.ValidationError({'url': 'URL은 http:// 또는 https://로 시작해야 합니다.'})
            data['url'] = url
        
        return data

위에 내용처럼 작성했고 

중요한 내용은 

- SerializerMethodField: 메서드 이름이 get_필드명 형식이면 자동 매칭
- validate(): 개별 validate_필드명() 대신 하나의 메서드에서 모든 검증 처리
- source_detail: 중첩된 객체 표현 (ForeignKey → 전체 객체)
- to_representation(): 직렬화 시 추가 데이터 포함

위의 내용처럼 validate를 통해 걸러지는 내용과 MethodField를 활용하여 코드를 한줄이라도 줄여서 늘어지지않게끔 조정했다. 

또한, 중첩된 객체 표현인 source_detail을 통해 전체내용을 한번 더 가져와 추후 내보낼때 편하도록 작성하였다. 

 

그리고 redis_services에는 redis의 로직을 작성 및 정리했는데 로직을 다 가져오기에는 너무 길어져서 간단요약을 하게되면

기본 클래스인 RedisService를 상속받아 공통 Redis 연결을 하고 

각 클래스마다 고유 키 접두사를 사용하여 데이터를 구분하여 수집 및 정리한다.

1. 중복 데이터 수집 방지 (DuplicatePreventionService)
이미 수집한 기사를 다시 수집하지 않도록 방지

동작 흐름
1. 서비스 초기화: Redis 연결 설정 및 키 접두사와 TTL 설정 (7일)
2. 중복 체크: 기사 URL을 키로 변환하여 Redis에 존재 여부 확인
3. 수집 완료 표시: 수집 완료 후 Redis에 키 저장 (TTL 7일 자동 설정)
4. 자동 정리: 7일 후 TTL 만료로 자동 삭제

 

 2.실시간 통계 집계 (RealtimeStatsService)
수집된 기사 수, 이벤트(redis에 저장된) 발생 시간 등을 실시간으로 추적

동작 흐름
1. 서비스 초기화: Redis 연결 설정 및 키 접두사와 TTL 설정 (24시간)
2. 카운터 증가: 전체 카운터와 시간대별 카운터 동시 증가
3. 이벤트 기록: 이벤트 발생 시각을 리스트에 추가 (최신 순서)
4. 리스트 관리: 최근 1000개만 유지하고 나머지 삭제
5. 자동 정리: 24시간 후 TTL 만료로 자동 삭제

 

3. API Rate Limiting (RateLimitService)
외부 API 호출 시 Rate Limit 관리

동작 흐름
1. 서비스 초기화: Redis 연결 설정 및 기본값 설정 (시간당 100회, 1시간 윈도우)
2. Rate Limit 체크: 현재 요청 수 조회 및 최대치 초과 여부 확인
3. 초과 시: 요청 거부 및 리셋 시간 반환
4. 미만 시: 카운터 증가 (첫 요청은 SETEX, 이후는 INCR)
5. 자동 리셋: TTL이 지나면 키가 자동 삭제되어 리셋

 

이렇게 작성했다. 

 

마지막으로 오늘 코드 작성하면서 멍청해서 질문했던 내용 중 기억에 남는것들을 정리 한다.

 

DUPLICATE_PREFIX는 뭔가요?

A: Redis 키의 접두사(Prefix)입니다. 키를 그룹화하고 관리하기 쉽게 만듭니다.

 
DUPLICATE_PREFIX = "collected:"
# 실제 키: "collected:news:https://example.com/article/123"

 

 

current가 왜 문자열인가요?

A: decode_responses=True 옵션 때문입니다.

 
self.client = redis.Redis(decode_responses=True)

# Redis에 저장: 숫자 1
# Redis에서 조회: 문자열 "1" (decode_responses=True)
current = self.client.get(key)  # "1" (str)
current_count = int(current) if current else 0  # 1 (int)
 
이건 사실 억울했다. 
난 정말.. redis가 어떤식으로 저장되는지 몰랐기에 "1"로 저장되는걸 몰랐다.. 왜냐하면 그전엔 idenfier가 고유url이나, api였기에 int()값이 안나올줄 알아서 고쳐야 하나 싶어서 물어봤던거기때문에 

 

"events"는 뭘 저장하나요?

A: 이벤트 발생 시각(타임스탬프)만 저장합니다. 실제 기사 데이터는 저장하지 않습니다.

 
def record_timestamp(self, event_name: str):
    timestamp = datetime.now().isoformat()  # "2024-01-01T15:00:00"
    self.client.lpush(key, timestamp)
 

Redis에 저장되는 값:
- "2024-01-01T15:00:00"
- "2024-01-01T15:01:00"
- "2024-01-01T15:02:00"

 

SerializerMethodField는 어떻게 동작하나요?

A: 메서드 이름이 get_필드명 형식이면 자동으로 매칭됩니다.

 
# 필드 정의
title_short = serializers.SerializerMethodField()
# 메서드 정의 (자동 매칭)
def get_title_short(self, obj):
    return obj.title[:50] + '...' if len(obj.title) > 50 else obj.title
 

명시적 지정도 가능:
title_short = serializers.SerializerMethodField('get_title_short')

 

오늘 한거 가지고 계속 살펴보면서 다음 코드도 작성해보고 다음엔 services.py랑 tasks.py를 작성하면서 맨땅에 헤딩해보겠다.