사용 예제

Atio의 다양한 사용 사례와 고급 기능을 살펴보세요.

기본 예제

간단한 데이터 저장

import atio
import pandas as pd

# 샘플 데이터 생성
data = {
    "id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
    "age": [25, 30, 35, 28, 32],
    "city": ["Seoul", "Busan", "Incheon", "Daegu", "Daejeon"],
    "salary": [50000, 60000, 70000, 55000, 65000]
}

df = pd.DataFrame(data)

# 다양한 형식으로 저장
atio.write(df, "employees.parquet", format="parquet")
atio.write(df, "employees.csv", format="csv", index=False)
atio.write(df, "employees.xlsx", format="excel", sheet_name="Employees")

대용량 데이터 처리

진행률 표시와 성능 모니터링

import numpy as np
import pandas as pd
import atio

# 대용량 데이터 생성 (100만 행)
large_data = {
    "id": range(1, 1000001),
    "value": np.random.randn(1000000),
    "category": np.random.choice(["A", "B", "C"], 1000000),
    "timestamp": pd.date_range("2024-01-01", periods=1000000, freq="S")
}

large_df = pd.DataFrame(large_data)

# 진행률 표시와 성능 모니터링 활성화
atio.write(
    large_df,
    "large_dataset.parquet",
    format="parquet",
    show_progress=True,
    verbose=True,
    compression='snappy'
)

데이터베이스 연동

PostgreSQL 데이터베이스 저장

import atio
import pandas as pd
from sqlalchemy import create_engine

# 데이터베이스 연결
engine = create_engine('postgresql://username:password@localhost:5432/mydb')

# 샘플 데이터
sales_data = {
    "order_id": [1001, 1002, 1003, 1004, 1005],
    "product_name": ["Laptop", "Mouse", "Keyboard", "Monitor", "Headphones"],
    "quantity": [1, 2, 1, 1, 3],
    "price": [1200, 25, 75, 300, 150],
    "order_date": pd.date_range("2024-01-01", periods=5)
}

sales_df = pd.DataFrame(sales_data)

# 데이터베이스에 안전하게 저장
atio.write(
    sales_df,
    format="sql",
    name="sales_orders",
    con=engine,
    if_exists="replace",
    index=False
)

스냅샷 기반 버전 관리

데이터 버전 관리

import atio
import pandas as pd
from datetime import datetime, timedelta

# 초기 데이터
initial_data = {
    "user_id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "status": ["active", "active", "inactive"]
}

df = pd.DataFrame(initial_data)

# 초기 스냅샷 생성
snapshot_id_1 = atio.write_snapshot(df, "users", format="parquet")
print(f"초기 스냅샷 생성: {snapshot_id_1}")

# 데이터 업데이트
df.loc[df['user_id'] == 3, 'status'] = 'active'
df = df.append({"user_id": 4, "name": "Diana", "status": "active"}, ignore_index=True)

# 업데이트된 스냅샷 생성
snapshot_id_2 = atio.write_snapshot(df, "users", format="parquet")
print(f"업데이트 스냅샷 생성: {snapshot_id_2}")

# 최신 데이터 읽기
latest_df = atio.read_table("users", format="parquet")
print("최신 데이터:")
print(latest_df)

# 특정 스냅샷 읽기
initial_df = atio.read_table("users", snapshot_id=snapshot_id_1, format="parquet")
print("초기 데이터:")
print(initial_df)

# 오래된 스냅샷 정리 (7일 이상)
deleted_count = atio.expire_snapshots("users", days=7, format="parquet")
print(f"삭제된 스냅샷 수: {deleted_count}")

Polars DataFrame 활용

고성능 데이터 처리

import atio
import polars as pl
import numpy as np

# Polars DataFrame 생성
polars_data = {
    "id": range(1, 10001),
    "value": np.random.randn(10000),
    "category": np.random.choice(["A", "B", "C", "D"], 10000),
    "score": np.random.uniform(0, 100, 10000)
}

polars_df = pl.DataFrame(polars_data)

# Polars DataFrame 저장
atio.write(
    polars_df,
    "polars_data.parquet",
    format="parquet",
    compression='snappy',
    show_progress=True
)

# 데이터 변환 후 저장
filtered_df = polars_df.filter(pl.col("score") > 50)
aggregated_df = filtered_df.group_by("category").agg(
    pl.col("value").mean().alias("avg_value"),
    pl.col("score").mean().alias("avg_score")
)

atio.write(aggregated_df, "aggregated_data.parquet", format="parquet")

에러 처리 및 복구

안전한 데이터 처리

import atio
import pandas as pd
import os

def safe_data_processing():
    try:
        # 원본 파일이 있는지 확인
        if os.path.exists("important_data.parquet"):
            print("원본 파일이 존재합니다.")

        # 데이터 처리 및 저장
        df = pd.DataFrame({
            "id": [1, 2, 3],
            "data": ["important", "data", "here"]
        })

        atio.write(df, "important_data.parquet", format="parquet")
        print("데이터가 안전하게 저장되었습니다.")

        # SUCCESS 파일 확인
        if os.path.exists("important_data.parquet_SUCCESS"):
            print("저장 완료 플래그가 생성되었습니다.")

    except Exception as e:
        print(f"오류 발생: {e}")
        print("원본 파일이 보존되었습니다.")

        # 임시 파일 정리
        temp_files = [f for f in os.listdir(".") if f.startswith("important_data.parquet.tmp")]
        for temp_file in temp_files:
            try:
                os.remove(temp_file)
                print(f"임시 파일 정리: {temp_file}")
            except:
                pass

# 안전한 데이터 처리 실행
safe_data_processing()

배치 처리

여러 파일 동시 처리

import atio
import pandas as pd
import os
from pathlib import Path

def process_multiple_files():
    # 처리할 파일 목록
    files_to_process = [
        {"name": "users", "data": pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})},
        {"name": "products", "data": pd.DataFrame({"id": [1, 2], "product": ["Laptop", "Mouse"]})},
        {"name": "orders", "data": pd.DataFrame({"id": [1, 2], "amount": [100, 200]})}
    ]

    # 각 파일을 안전하게 처리
    for file_info in files_to_process:
        try:
            file_path = f"{file_info['name']}.parquet"
            atio.write(
                file_info['data'],
                file_path,
                format="parquet",
                show_progress=True
            )
            print(f"{file_info['name']} 파일 처리 완료")

        except Exception as e:
            print(f"{file_info['name']} 파일 처리 실패: {e}")
            continue

# 배치 처리 실행
process_multiple_files()

성능 최적화

압축 및 최적화 설정

import atio
import pandas as pd
import numpy as np

# 대용량 데이터 생성
large_df = pd.DataFrame({
    "id": range(1, 100001),
    "value": np.random.randn(100000),
    "text": ["sample text"] * 100000
})

# 다양한 압축 설정으로 성능 비교
compression_settings = [
    ("snappy", "fast_compression.parquet"),
    ("gzip", "balanced_compression.parquet"),
    ("brotli", "high_compression.parquet")
]

for compression, filename in compression_settings:
    print(f"\n{compression} 압축으로 저장 중...")
    atio.write(
        large_df,
        filename,
        format="parquet",
        compression=compression,
        show_progress=True,
        verbose=True
    )

    # 파일 크기 확인
    file_size = os.path.getsize(filename) / (1024 * 1024)  # MB
    print(f"파일 크기: {file_size:.2f} MB")

실제 사용 사례

데이터 파이프라인

import atio
import pandas as pd
from datetime import datetime, timedelta

class DataPipeline:
    def __init__(self, base_path="data"):
        self.base_path = Path(base_path)
        self.base_path.mkdir(exist_ok=True)

    def extract_data(self):
        """데이터 추출 (시뮬레이션)"""
        # 실제로는 API나 데이터베이스에서 데이터를 가져옴
        data = {
            "timestamp": pd.date_range(datetime.now(), periods=1000, freq="H"),
            "value": np.random.randn(1000),
            "source": ["api"] * 1000
        }
        return pd.DataFrame(data)

    def transform_data(self, df):
        """데이터 변환"""
        # 시간별 집계
        df['hour'] = df['timestamp'].dt.hour
        df['day'] = df['timestamp'].dt.date

        # 일별 통계
        daily_stats = df.groupby('day').agg({
            'value': ['mean', 'std', 'min', 'max']
        }).round(2)

        return daily_stats

    def load_data(self, df, table_name):
        """데이터 로드"""
        # 스냅샷 생성
        snapshot_id = atio.write_snapshot(
            df,
            table_name,
            format="parquet"
        )

        # 최신 데이터도 별도 저장
        latest_path = self.base_path / f"{table_name}_latest.parquet"
        atio.write(df, str(latest_path), format="parquet")

        return snapshot_id

    def run_pipeline(self):
        """파이프라인 실행"""
        print("데이터 파이프라인 시작...")

        # 1. 데이터 추출
        raw_data = self.extract_data()
        print(f"추출된 데이터: {len(raw_data)} 행")

        # 2. 데이터 변환
        processed_data = self.transform_data(raw_data)
        print(f"처리된 데이터: {len(processed_data)} 행")

        # 3. 데이터 로드
        snapshot_id = self.load_data(processed_data, "daily_stats")
        print(f"스냅샷 생성 완료: {snapshot_id}")

        # 4. 오래된 스냅샷 정리
        deleted_count = atio.expire_snapshots("daily_stats", days=30)
        print(f"정리된 스냅샷: {deleted_count}개")

        print("파이프라인 완료!")

# 파이프라인 실행
pipeline = DataPipeline()
pipeline.run_pipeline()