본문 바로가기

DB/PostgreSQL

[ PostgreSQL ] 부하 테스트 하기

반응형

1. 수정 조건 요약

항목 설정 내용

Database mydatabase
Schema loadtest
Table type1
부하 강화 INSERT마다 WAL flush + I/O 집중
추가 부하 - 큰 TEXT 필드 2개- 무작위 UNIQUE 값- 인덱스 추가- 단일 COMMIT

2. 부하 유도 전략 추가

  1. schema 별도 생성 및 사용
  2. 긴 텍스트 필드 2개 (bio, payload) 추가
  3. username, email, payload_hash 에 인덱스
  4. 1건당 INSERT + COMMIT으로 WAL flush 매번 유도

3. 최종 코드

import psycopg2
import random
import string
from datetime import datetime
import time
import hashlib

DB_HOST = 'localhost'
DB_NAME = 'mydatabase'
DB_USER = 'postgres'
DB_PASS = 'your_password'

SCHEMA_NAME = 'loadtest'
TABLE_NAME = 'type1'

def random_text(min_len=2048, max_len=8192):
    length = random.randint(min_len, max_len)
    return ''.join(random.choices(string.ascii_letters + string.digits + ' ', k=length))

def random_email():
    return ''.join(random.choices(string.ascii_lowercase, k=10)) + "@example.com"

def random_username():
    return ''.join(random.choices(string.ascii_lowercase, k=8))

def create_schema_and_table(cur):
    cur.execute(f"""
        CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};

        DROP TABLE IF EXISTS {SCHEMA_NAME}.{TABLE_NAME};

        CREATE TABLE {SCHEMA_NAME}.{TABLE_NAME} (
            id SERIAL PRIMARY KEY,
            username TEXT,
            email TEXT,
            bio TEXT,
            payload TEXT,
            payload_hash TEXT,
            created_at TIMESTAMP DEFAULT now()
        );

        CREATE INDEX idx_username ON {SCHEMA_NAME}.{TABLE_NAME}(username);
        CREATE INDEX idx_email ON {SCHEMA_NAME}.{TABLE_NAME}(email);
        CREATE INDEX idx_payload_hash ON {SCHEMA_NAME}.{TABLE_NAME}(payload_hash);
    """)

def insert_one_by_one(conn, cur, count=100_000):
    for i in range(count):
        username = random_username()
        email = random_email()
        bio = random_text()
        payload = random_text()
        payload_hash = hashlib.sha256(payload.encode()).hexdigest()
        now = datetime.now()

        cur.execute(
            f"""INSERT INTO {SCHEMA_NAME}.{TABLE_NAME}
                (username, email, bio, payload, payload_hash, created_at)
                VALUES (%s, %s, %s, %s, %s, %s)""",
            (username, email, bio, payload, payload_hash, now)
        )
        conn.commit()  # 1건당 commit → WAL flush + I/O

        if (i + 1) % 1000 == 0:
            print(f"{i + 1} records inserted...")

def main():
    conn = psycopg2.connect(
        host=DB_HOST,
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASS
    )
    cur = conn.cursor()

    print("Creating schema and table...")
    create_schema_and_table(cur)
    conn.commit()

    print("Inserting rows with high I/O cost...")
    start = time.time()
    insert_one_by_one(conn, cur, count=100_000)
    end = time.time()

    print(f"Total time: {end - start:.2f} seconds")

    cur.close()
    conn.close()

if __name__ == '__main__':
    main()

4. 참고 팁

  • payload_hash는 긴 텍스트를 SHA-256으로 해시해 저장 → CPU 사용도 유도
  • 각 INSERT 이후 COMMIT으로 WAL flush 발생
  • 인덱스가 세 개라서 각 INSERT마다 인덱스도 갱신

이대로 실행하면 PostgreSQL에 WAL 쓰기, 디스크 쓰기, 인덱스 정렬, CPU 처리 등 다양한 부하가 동시에 발생합니다.

반응형