Home [BoostCamp AI Tech / Final] Day88 - User 수집, Cloud storage
Post
Cancel

[BoostCamp AI Tech / Final] Day88 - User 수집, Cloud storage

Final Project : User 수집, Cloud storage


목차

  • User producer & consumer 설계 변경
  • Cloud Storage 설정 및 연결 코드 개발

User producer & Consumer 설계 변경

User data의 확보

지난번에 활용한 producer-consumer 코드로 repository 수집이 완료되었습니다. 이 repository를 활용해서 학습에 사용할 user 데이터를 구축해야 합니다. 여기서 문제는 단순히 아무 유저 정보를 가져올 수도 없을 뿐더러 저희가 갖고 있는 아이템에서 선택한 값이 없다면 기반 모델로 선정한 RecVAE를 학습할 수 없는 문제가 있습니다.
이런 문제를 해결하고자 학습 데이터에 한해서는 역으로 데이터를 추출하는 방식을 활용하기로 했습니다.

일반적인 방식도 user-item matrix를 생성하긴 합니다.

일반적인 방식은 아이템이 많고 서비스 자체에서 user 데이터를 확보한 상태에서 학습을 진행합니다. 하지만 저희의 서비스는 난잡하게 있는 repository 중 awesome에서 선정한 repository를 위주로 선정했기 때문에 random user의 아이템 hit ratio가 굉장히 낮아질 위험이 있었습니다. 최악의 경우 user-item matrix의 모든 값이 0일 수도 있다는 것입니다. 이를 방지하고자 반드시 최소 1개는 hit가 되는 user-item matrix를 구축하기 위해 갖고 있는 repository에서 역으로 user를 추출하는 방식을 사용하기로 했습니다.
저희가 hit를 하는 기준은 repository star이므로 repository의 star user list를 가져와서 학습용 user군을 확보하기로 했습니다. 각 repository별로 최대 100명씩 확보하기로 했고 최종적으로 중복된 유저를 제외하고 약 35만명의 user군이 확보될 것으로 확인했습니다.

User producer & Repository 정보 update

위의 방법을 사용하기 위해 아래 사진과 같은 방식으로 데이터를 수집하게 설계했습니다.

여전히 Github oAuth token이 4개라는 제한이 있기 때문에 안정적인 수집을 위해서는 이전에 사용한 ThreadPoolExecutor로 API request를 병렬처리하기에 약간의 리스크가 있었습니다. 따라서 기존 repository에 저장한 star_pages 필드를 활용해서 star_pages를 10으로 제한하고 각 페이지당 100명으로 값을 가져오게 했습니다.

API 호출의 효율성을 높이고 user-item matrix를 만들기 위해 필요한 repository star user list를 확보하고자 동시에 update 정보를 consumer로 같이 보내줘야 했습니다. 따라서 producer는 consumer에게 다음과 같은 방식으로 데이터를 전송하게 설정했습니다.

1
2
3
4
5
6
7
8
9
10
{
    "insert": {
        "uid": user unique id,
        "login": user github id
    },
    "update": {
        "rid": repository unique id,
        "uid": star user id
    }
}

이렇게 보내면 consumer는 데이터의 조건 분기로 update 정보와 insert 정보를 동시에 처리할 수 있다는 장점이 있고 consumer의 확장성이 늘어날 수 있습니다.

Consumer 설계 변경

기존의 consumer는 단순 insert만 처리하게 만들었습니다. insert 함수가 사용자가 설정한 주기에 맞춰 수행되어 batch 단위로 DB에 저장되는 방식이었습니다. 이 consumer의 문제점은 update가 발생할 경우 처리하기 어렵다는 문제가 있었습니다. 따라서 update 정보에 대한 확장이 필요했습니다.

1
2
3
4
5
6
7
8
9
10
11
12
# 기존의 consumer 데이터 처리 부분
while True:
        schedule.run_pending()
        msg = q.get(isBlocking=False)

        if msg is not None:
            msg = json.loads(msg)

            print(msg)
            print()

            batch_list.append(msg)

이를 해결하고자 producer에서 insert와 update를 구분해서 정보를 보내고 해당 정보의 유무에 따라 consumer는 데이터를 처리하게 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 수정된 consumer 데이터 처리 부분
while True:
        schedule.run_pending()
        msg = q.get(isBlocking=False)

        if msg is not None:
            msg = json.loads(msg)
            insert = msg["insert"]

            if "update" in msg:
                update_data = msg["update"]
                rid = update_data["rid"]
                uid = update_data["uid"]
                conn_repo.update_one({"rid": rid}, {"$push": {"star_user_list": uid}})

            if insert:
                batch_list.append(insert)

            print(msg)
            print()

아직 수정할 부분들이 있습니다.

  • 어차피 데이터는 insert와 update를 모두 갖고 있으므로 update 처리도 insert 처럼 dictionary의 비어있음 여부로 처리하는 것이 안정적
  • update를 특정 데이터에 대해서 진행하는 것이 아닌 들어오는 데이터에 맞춰 알아서 처리하는 방식으로 확장성을 높이면 좋을 듯
    • condition을 json에 추가로 작성해도 좋을까?

Cloud Storage 설정 및 연결 코드 개발

모델의 성능을 저장하고 보존하기 위해 model.pt 파일 정보를 저장할 필요가 있었습니다. 빠른 inference와 동시에 높은 성능을 확보하려면 batch 단위로 학습이 진행되는 기록들을 저장하고 inference에서는 높은 성능의 모델만 가져오면 되는 것입니다.
이를 해결하고자 다음과 같은 두가지 방식을 고안했습니다.

  1. MongoDB에 model.pt의 값을 dictionary로 저장
  2. GCP의 Cloud Storage에 저장하고 load하는 방식

이때, 1번 과정은 시도를 해봤을 때, model.pt 내부의 n차원 torch.tensor 를 dictionary로 변경하는 과정에서 문제가 발생하였고 2번 방식을 사용하기로 채택했습니다.

Cloud Storage 설정

Cloud Storage 설정 자체는 어렵지 않았습니다. 블로그를 참고하여 설정했고 이를 사용하는 코드를 데이터 사이언티스트 분들이 원활하게 사용하는 코드를 개발하는 것에 좀 더 집중했습니다.

저장 및 로드 코드 개발

google cloud에서 제공하는 google-cloud-storage 라이브러리를 사용하면 원활하게 저장과 로드가 가능합니다. 하지만 사용과정에서 문제가 발생하는 것을 막고 MongoDB와의 저장, 로드도 병행해야하므로 코드를 atomic하게 작성할 필요가 있었습니다.
따라서 file_to_storage 함수와 download_file 함수를 만들어서 팀원들과 공유했습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pymongo import MongoClient
from google.cloud import storage
import tarfile
from datetime import datetime

def file_to_storage(file_path, model_name, score, tag="", db_conn=None, file_name=None, tar_zip=False):
    if file_name is None:
        file_name = file_path.split("/")[-1]

    if tar_zip is True:
        tar_name = file_name.split(".")[0] + ".tar"
        with tarfile.open(file_name.split(".")[0] + ".tar") as f:
            f.add(file_path)
        file_name = tar_name

    storage_client = storage.Client()
    bucket = storage_client.bucket("model-save")
    blob = bucket.blob(file_name)

    blob.upload_from_filename(file_path)

    if db_conn is not None:
        db_conn.insert_one({
            "name": model_name,
            "bucket_name": "model-save",
            "file_name": file_name,
            "time": datetime.now(),
            "score": score,
            "tag": tag
        })

    print("model save complete!")

file_to_storage 함수의 기본 로직

  1. file_name을 지정하지 않으면 원본 파일명으로 저장
  2. tar 압축을 요청하는 영우 tar 파일로 압축
  3. cloud storage 연결, bucket, blob 세팅
  4. cloud storage에 파일 업로드
  5. 저장한 model의 이름과 태그, 버킷명, 저장 파일 이름등 모델의 메타데이터를 MongoDB에 저장
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def download_file(model_name, db_conn, tag="", latest=True):
    
    if latest is True:
        ret = list(db_conn.find({"name": model_name, "tag":tag}).sort("time", -1).limit(1))[0]
    else:
        ret = list(db_conn.find_one({"name":model_name, "tag":tag}))[0]
    
    source_blob_name = ret["file_name"]
    storage_client = storage.Client()
    bucket = storage_client.bucket(ret["bucket_name"])
    blob = bucket.blob(source_blob_name)
    
    blob.download_to_filename(source_blob_name)
    print(f"file download complete: {source_blob_name}")

download_file 함수의 기본 로직

  1. model_nametag를 활용해서 MongoDB에 저장된 내용을 탐색
    1. 이때, tag는 설정하지 않으면 자동적으로 공백 탐색
  2. 이후 cloud storage dusruf, bucket, blob 세팅
  3. 파일 다운로드

보완할 점

이번 개발 일지에서 보완할 점은 다음과 같습니다.

  • 좀 더 확장성 높은 consumer 개발
  • tar 파일 압축했는데 사이즈가 그대로,….???
  • 지난번 redis producer-consumer에서 발생하는 문제로 너무 빠른 속도로 데이터가 전달되는 경우 데이터 유실이 발생하는 것으로 보이는데, 이게 진짜 유실인지 아니면 저장 과정에서 시간차가 있는건지 확인해 볼 필요가 있음

Reference

This post is licensed under CC BY 4.0 by the author.

[BoostCamp AI Tech / Final] Day85 - Redis Producer Consumer

[BoostCamp AI Tech / Final] Day91 - Airflow setting 및 배치 파이프라인 설계

Comments powered by Disqus.