일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- Set
- 후기
- 사이킷런
- 자연어처리
- 딕셔너리
- 데이터사이언스 스쿨
- 머신러닝
- 스크랩
- 재귀함수
- 아이펠
- NLP
- 카카오
- 클래스
- 추천시스템
- TensorFlow
- numpy
- 데이터사이언티스트
- AI
- 파이썬코딩도장
- 함수
- Python
- 속성
- 제어문
- 딥러닝
- AIFFEL
- 제로베이스 데이터사이언스
- 파이썬
- 코딩도장
- 데이터분석
- 기사
- Today
- Total
뮤트 개발일지
AIFFEL 아이펠 25일차 본문
폐렴 환자 구분하기
필요한 모듈 임포트
import os, re
import random, math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings(action='ignore')
변수 설정하기
# 데이터 로드할 때 빠르게 로드할 수 있도록하는 설정 변수
AUTOTUNE = tf.data.experimental.AUTOTUNE
# X-RAY 이미지 사이즈 변수
IMAGE_SIZE = [180, 180]
# 데이터 경로 변수
ROOT_PATH = os.path.join(os.getenv('HOME'), 'aiffel')
TRAIN_PATH = ROOT_PATH + '/chest_xray/data/train/*/*' # *은 모든 디렉토리와 파일을 의미합니다.
VAL_PATH = ROOT_PATH + '/chest_xray/data/val/*/*'
TEST_PATH = ROOT_PATH + '/chest_xray/data/test/*/*'
# 프로젝트를 진행할 때 필요한 변수
BATCH_SIZE = 16
EPOCHS = 25
데이터 가져오기
train_filenames = tf.io.gfile.glob(TRAIN_PATH)
test_filenames = tf.io.gfile.glob(TEST_PATH)
val_filenames = tf.io.gfile.glob(VAL_PATH)
print(len(train_filenames))
print(len(test_filenames))
print(len(val_filenames))
>>>
5216
624
16
val 데이터가 너무 적어 train에서 더 가져오도록 한다.
# train 데이터와 validation 데이터를 모두 filenames에 담는다
filenames = tf.io.gfile.glob(TRAIN_PATH)
filenames.extend(tf.io.gfile.glob(VAL_PATH))
# 모아진 filenames를 8:2로 나눈다
train_size = math.floor(len(filenames)*0.8)
random.seed(8)
random.shuffle(filenames)
train_filenames = filenames[:train_size]
val_filenames = filenames[train_size:]
print(len(train_filenames))
print(len(val_filenames))
>>>
4185
1047
정상 이미지와 폐렴 이미지 확인하기 (경로를 통해 확인할 수 있다.)(경로에 정상과 폐렴 따로 구분해놓았었음)
COUNT_NORMAL = len([filename for filename in train_filenames if "NORMAL" in filename])
print(f"Normal images count in training set: {COUNT_NORMAL}")
COUNT_PNEUMONIA = len([filename for filename in train_filenames if "PNEUMONIA" in filename])
print(f"Pneumonia images count in training set: {COUNT_PNEUMONIA}")
>>>
Normal images count in training set: 1072
Pneumonia images count in training set: 3113
tf.data 인스턴스 만들기
train_list_ds = tf.data.Dataset.from_tensor_slices(train_filenames)
val_list_ds = tf.data.Dataset.from_tensor_slices(val_filenames)
train, val data 개수 확인(왜 하는 거지.. 위에서 len으로 확인했는데..)
TRAIN_IMG_COUNT = tf.data.experimental.cardinality(train_list_ds).numpy()
print(f"Training images count: {TRAIN_IMG_COUNT}")
VAL_IMG_COUNT = tf.data.experimental.cardinality(val_list_ds).numpy()
print(f"Validating images count: {VAL_IMG_COUNT}")
>>>
Training images count: 4185
Validating images count: 1047
라벨 만들기
# 파일 경로의 끝에서 두번째 부분을 확인하면 양성과 음성을 구분할 수 있습니다
def get_label(file_path):
parts = tf.strings.split(file_path, os.path.sep)
return parts[-2] == "PNEUMONIA" # 폐렴이면 양성(True), 노말이면 음성(False)
제각각인 이미지 사이즈 통일하기
# 이미지를 알맞은 형식으로 바꿉니다.
def decode_img(img):
img = tf.image.decode_jpeg(img, channels=3) # 이미지를 uint8 tensor로 수정
img = tf.image.convert_image_dtype(img, tf.float32) # float32 타입으로 수정
img = tf.image.resize(img, IMAGE_SIZE) # 이미지 사이즈를 IMAGE_SIZE로 수정
return img
# 이미지 파일의 경로를 입력하면 이미지와 라벨을 읽어옵니다.
def process_path(file_path):
label = get_label(file_path) # 라벨 검출
img = tf.io.read_file(file_path) # 이미지 읽기
img = decode_img(img) # 이미지를 알맞은 형식으로 수정
return img, label
train, validation 데이터셋 만들기
train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
이미지가 잘 리사이즈 되었는지 확인하기
for image, label in train_ds.take(1):
print("Image shape: ", image.numpy().shape)
print("Label: ", label.numpy())
>>>
Image shape: (180, 180, 3)
Label: False
test 데이터셋 만들기
test_list_ds = tf.data.Dataset.list_files(TEST_PATH)
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)
print(TEST_IMAGE_COUNT)
>>> 624
def prepare_for_training(ds, shuffle_buffer_size=1000):
ds = ds.shuffle(buffer_size=shuffle_buffer_size)
ds = ds.repeat()
ds = ds.batch(BATCH_SIZE)
ds = ds.prefetch(buffer_size=AUTOTUNE)
return ds
train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)
shuffle(): 고정 크기 버퍼를 유지하고 해당 버퍼에서 무작위로 균일하게 다음 요소를 선택한다.
repeat(): epoch를 진행하면서 여러 번 데이터셋을 불러오는데, 이 때 repeat()을 사용한 데이터셋의 경우 여러번 데이터셋을 사용할 수 있게 해준다. 예를 들어, 100개의 데이터를 10번 반복하면 1000개의 데이터가 필요한데, repeat()을 사용하면 자동으로 데이터를 맞춘다.
batch(): BATCH_SIZE에서 정한 만큼의 배치로 주어진다. 예를 들어, 100개의 데이터를 10개의 배치로 나누게 되면 각 배치에는 10개의 데이터로 나뉘게 된다.
prefectch(): 학습 데이터를 나눠서 읽어온다. 첫 번째 데이터를 GPU에서 학습하는 동안 두 번째 데이터를 CPU에서 준비할 수 있어 리소스의 유휴 상태를 줄일 수 있다.
데이터 시각화
# 이미지 배치를 입력하면 여러장의 이미지를 보여줍니다.
def show_batch(image_batch, label_batch):
plt.figure(figsize=(10,10))
for n in range(BATCH_SIZE):
ax = plt.subplot(4,math.ceil(BATCH_SIZE/4),n+1)
plt.imshow(image_batch[n])
if label_batch[n]:
plt.title("PNEUMONIA")
else:
plt.title("NORMAL")
plt.axis("off")
image_batch, label_batch = next(iter(train_ds))
show_batch(image_batch.numpy(), label_batch.numpy())
CNN 모델링
def conv_block(filters):
block = tf.keras.Sequential([
tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D()
])
return block
def dense_block(units, dropout_rate):
block = tf.keras.Sequential([
tf.keras.layers.Dense(units, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(dropout_rate)
])
return block
def build_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
tf.keras.layers.MaxPool2D(),
conv_block(32),
conv_block(64),
conv_block(128),
tf.keras.layers.Dropout(0.2),
conv_block(256),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Flatten(),
dense_block(512, 0.7),
dense_block(128, 0.5),
dense_block(64, 0.3),
tf.keras.layers.Dense(1, activation='sigmoid')
])
return model
데이터 imbalance 처리
imbalance를 해결하는 방법으로 Weight balancing을 사용한다. Weight balancing은 training set의 각 데이터에서 loss를 계산할 때 특정 클래스의 데이터에서 더 큰 loss 값을 갖도록 가중치를 부여하는 방법이다. keras의 model.fit()을 호출할 때 파라미터로 넘기는 class_weight에 이러한 클래스별 가중치를 세팅할 수 있도록 지원하고 있다.
아래 코드에서 weight_for_0은 정상 이미지에 사용할 가중치를, weight_for_1은 폐렴 이미지에 사용할 가중치를 세팅한다. 이 가중치들은 정상과 폐렴 전체 데이터 건수에 반비례하도록 설정된다.
weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0
weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0
class_weight = {0: weight_for_0, 1: weight_for_1}
print('Weight for NORMAL: {:.2f}'.format(weight_for_0))
print('Weight for PNEUMONIA: {:.2f}'.format(weight_for_1))
>>>
Weight for NORMAL: 1.95
Weight for PNEUMONIA: 0.67
모델 훈련
with tf.device('/GPU:0'):
model = build_model()
METRICS = [
'accuracy',
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall')
]
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=METRICS
)
with tf.device('/GPU:0'):
history = model.fit(
train_ds,
steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
epochs=EPOCHS,
validation_data=val_ds,
validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
class_weight=class_weight,
)
결과 확인
fig, ax = plt.subplots(1, 4, figsize=(20, 3))
ax = ax.ravel()
for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
ax[i].plot(history.history[met])
ax[i].plot(history.history['val_' + met])
ax[i].set_title('Model {}'.format(met))
ax[i].set_xlabel('epochs')
ax[i].set_ylabel(met)
ax[i].legend(['train', 'val'])
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')
'AIFFEL' 카테고리의 다른 글
AIFFEL 아이펠 27일차 (0) | 2022.02.10 |
---|---|
AIFFEL 아이펠 26일차 (0) | 2022.02.04 |
AIFFEL 아이펠 24일차 (0) | 2022.02.03 |
AIFFEL 아이펠 23일차 (0) | 2022.02.03 |
AIFFEL 아이펠 22일차 (0) | 2022.01.26 |