티스토리 뷰
기술적인 측면이 많아서 애먹었던 챕터ㅠ
지금 당장은 판다스와 사이킷런으로도 가능하지만 대규모 데이터셋을 다루게 되면 꼭 써야하기에 노력중..
# 파이썬 ≥3.5 필수
import sys
assert sys.version_info >= (3, 5)
# 사이킷런 ≥0.20 필수
import sklearn
assert sklearn.__version__ >= "0.20"
# 텐서플로 ≥2.0 필수
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"
# 공통 모듈 임포트
import numpy as np
import os
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
plt.style.use('default')
# 노트북 실행 결과를 동일하게 유지하기 위해
np.random.seed(42)
X = tf.range(10)
# 주어진 텐서에서 데이터셋을 형성한다.
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset
<TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>
from_tensor_slices() 는 텐서를 받아 X의 각 원소가 하나의 아이템으로 표현되는 데이터셋을 만들어준다.
이러면 데이터셋에 있는 아이템을 순회하는 것이 가능해진다.
for item in dataset:
print(item)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)
연쇄 변환¶
데이터셋이 준비되면 변환 메서드를 호출하여 여러 종류의 변환을 수행할 수 있다.
각 메서드는 새로운 데이터셋을 반환하며 이 메서드들을 연결해줄 수 있다.
# 전체 데이터셋을 3번 반복하고 크기가 7인 배치를 생성한다.
dataset1 = dataset.repeat(3).batch(7)
for item in dataset1:
print(item)
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)
# 이러면 3번 반복하다가 길이가 모자란 마지막 배치를 버려준다.
dataset2 = dataset.repeat(3).batch(7, drop_remainder=True)
for item in dataset2:
print(item)
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
# map() 메서드를 호출하면 아이템을 변환시켜준다.
# 다만 map()에 들어가는 함수는 tf.function으로 변환 가능해야 한다.
dataset3 = dataset.map(lambda x: x * 2)
for item in dataset3:
print(item)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
# 값을 필터링 하는것도 가능
dataset4 = dataset.filter(lambda x: x < 5)
for item in dataset4:
print(item)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
# 앞에서부터 n개만 가져올 때
for item in dataset.take(3):
print(item)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
데이터 셔플링¶
경사 하강법은 훈련셋에 있는 샘플이 독립적이고 동일한 분포일 때 최고 성능을 발휘한다.
이를 위해 shuffle() 메서드를 사용하여 샘플을 섞어주자.
먼저 원본 데이터셋의 아이템을 차례대로 buffer_size만큼 추출하여 버퍼에 채운다.(셔플링)
그다음 새로운 아이템이 요청되면 이 버퍼에서 랜덤하게 하나를 꺼내 반환한다.
그리고 추출한 만큼 원본 셋에서 새로운 아이템을 추출하여 비워진 버퍼를 매꾼다.
버퍼의 크기가 충분해야 셔플링의 효과를 극대화할 수 있다 (다만 메모리 크기를 넘기진 말자)
dataset = tf.data.Dataset.range(10).repeat(3) # 0~9 까지 3번 반복된 dataset을 생성한다.
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
print(item)
tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)
다만 메모리 용량보다 큰 대규모 데이터셋은 버퍼가 데이터셋보다 작아 간단한 셔플링으로는 버겁다.
이를 위해 원본 데이터 자체를 미리 섞어두는 것이 권장된다. 그래야 원본 데이터가 섞인 상태에서 에포크마다
한번 더 섞기때문에 동일한 순서가 반복되지 않아 편향이 추가되지 않는다.
추가적으로 원본 데이터를 여러 파일로 나눈 다음에 무작위로 읽는 방식이 있다.
그래도 동일한 파일에 있는 샘플들은 여전히 함께 처리되기에 파일 여러 개를 무작위로 선택하고 파일에서
동시에 읽은 레코드를 돌아가면서 반환하고 shuffle() 메서드를 추가하는 방식이 있다.
우선 데이터셋을 받아오고 각 셋을 여러 개로 나눈다.
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
housing_dir = os.path.join("datasets", "housing")
os.makedirs(housing_dir, exist_ok=True)
path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")
filepaths = []
m = len(data)
for file_idx, row_indices in enumerate(
np.array_split(np.arange(m), n_parts)):
part_csv = path_format.format(name_prefix, file_idx)
filepaths.append(part_csv)
# 현재 filepath에 대하여 data를 나누어 작성한다.
with open(part_csv, "wt", encoding="utf-8") as f:
if header is not None:
f.write(header)
f.write("\n")
for row_idx in row_indices:
#repr로 하면 문자열로 된 객체가 반환되지만 eval()에 넣으면 다시 원본 객체가 튀어나온다.
f.write(",".join([repr(col) for col in data[row_idx]]))
f.write("\n")
return filepaths
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
X_train_full, y_train_full, random_state=42)
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)
train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)
이걸 실행하면 train_filepaths에 훈련 파일 경로를 담은 리스트가 생성되고
train_filepaths = ['datasets/housing/my_train_00.csv', ...]
이제 이런 파일 경로가 담긴 데이터셋을 형성한다.
(참고로 파일 경로가 shuffle 된 채로 나온다. 싫다면 shuffle=False 로 두자)
filepath_dataset=tf.data.Dataset.list_files(train_filepaths, seed=42)
그다음 interleave() 메서드를 호출하여 한 번에 다섯 개의 파일을 한 줄씩 번갈아 읽는다.
interleave() 메서드는 각 TextLineDataset에서 각 파일을 읽는 방식으로 진행되고
이는 모든 경로가 소진될 때 까지 진행된다. (가장 긴 파일의 끝은 무시될 수 있다)
n_readers = 5
#interleave() 메서드가 dataset을 따라 lambda 함수를 적용시킨다.
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
기본적으로 interleave() 메서드가 병렬화를 사용하지 않아서 num_parallel_calls 파라미터에 스레드 개수를 지정한다.
이 파라미터에 tf.data.experimental.AUTOTUNE 으로 지정하면 텐서플로가 가능한 CPU를 기반으로 알아서 고른다.
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
저장된 데이터셋을 살펴보면 header를 제외한 값들이 담겨 있으며 바이트 스트링 형태 임을 알 수 있다.
바이트 객체 설명
https://dojang.io/mod/page/view.php?id=2462
for line in dataset.take(5):
print(line.numpy())
b'4.2083,44.0,5.323204419889502,0.9171270718232044,846.0,2.3370165745856353,37.47,-122.2,2.782'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442'
데이터 전처리¶
tf.io.decode_csv 를 이용하여 csv파일의 한줄을 읽고 해독할 뿐만 아니라 이를 통해 전처리가 가능하다.
위 메서드를 이용하면 스칼라 텐서의 리스트를 반환해주는데 이를 tf.stack()을 이용하면 1D 텐서 배열로 변환된다.
scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_
# 특성의 개수, X_train.shape[-1]
n_inputs = 8
@tf.function
def preprocess(line):
# default(defs)값으로 앞에서 부터 8개(특성)을 0. 으로, 마지막 열을 빈 배열을 넘겨준다.
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
# 이렇게 해서 얻어낸 csv 파일의 한 줄에대한 해독 결과를 tf.stack()을 통해 1D 배열로 만든다.
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')
(<tf.Tensor: shape=(8,), dtype=float32, numpy=
array([ 0.16579157, 1.216324 , -0.05204565, -0.39215982, -0.5277444 ,
-0.2633488 , 0.8543046 , -1.3072058 ], dtype=float32)>,
<tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)
데이터 적재와 전처리를 합치기¶
재사용 가능한 코드를 만들기 위하여 지금까지 언급한 모든 것을 하나의 함수로 만들자.
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
dataset = dataset.batch(batch_size)
return dataset.prefetch(1)
프리페치¶
뭔가 저 위 코드의 마지막줄에 이상한게 달려있다.
prefetch(1)을 호출하면 데이터셋은 항상 배치 데이터 하나가 준비가 되도록 내가 다른 셋에서 준비하는 동안
디스크에서 가져오려한다. 왠만하면 훈련스텝이 진행되는 시간보다 준비시간이 짧아서 GPU를 다 쓸수 있다.
만약 데이터셋이 메모리에 다 들어갈 수 있다면 cache() 메서드를 사용하여 훈련속도를 높일 수 있다.
이 때, 데이터를 적재하고 전처리한 후에 셔플링 되기 전에 캐싱이 수행되므로
캐싱된 데이터마다 셔플 결과가 달라지므로, 매 에포크마다 다르게 셔플링된다. (전처리는 단 한번)
tf.keras 와 데이터셋 사용¶
이제 데이터셋을 반환해주는 함수를 만들어 냈으니 tf.keras에서 이용을 해보자
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)
여기서 케라스 모델을 만들고 데이터셋으로 훈련이 가능하다.
model = keras.models.Sequential([...])
model.compile([...])
model.fit(train_set, epochs=10, validation_data=valid_set)
evaluate()와 predict() 메서드에도 데이터셋을 전달할 수 있다.
model.evaluate(test_set)
# preprocess에서 X, y로 나눠준다
new_set = test_set.take(3).map(lambda X, y: X)
model.predict(new_set)
TFRecord 포맷¶
대용량 데이터를 저장하고 읽기 위하여 텐서플로가 사용하는 TFRecord 포매은 이진 레코드를 저장하는 단순한 이진 포맷이다.
각 레코드는 레코드 길이, 길이를 체크하는 CRC 체크섬, 실제 데이터, 데이터를 위한 CRC 체크섬으로 구성된다)
tf.io.TFRcordWriter 클래스를 사용하면 TFRecord를 손쉽게 만들 수 있다.
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
f.write(b"데이터를 작성할 때는 바이트 객체로 작성하자")
f.write(b"그리고 이건 두번째 레코드가 된다")
그리고 tf.data.TFRecordDataset을 사용하여 하나 이상의 TFRecord를 읽을 수 있다.
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
print(item)
압축된 TFRecord 파일¶
가끔 TFRecord을 압축해야 할 때가 있다. 네트워크를 통해 읽어야 하는 경우인데 option 객체를 이용하자.
options = tf.io.TFRecordOptions(compression_type = "GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
[...]
만약 이렇게 압축된 TFRecord 파일을 읽으려면 압축 형식을 지정해주면 된다.
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")
프로토콜 버퍼¶
TFRecord가 지원하는 직렬화된 프로토콜 버퍼는 이식성과 확장성이 좋아 널리 사용된다.
특히 구글의 원격 프로시저 호출 시스템임 gRPC에 사용된다.
아래 예시는 프로토콜 버퍼 객체인 메시지(직렬화 되어 전송되므로)인 Person을 정의한 것이다.
프로토콜 버퍼 포맷의 버전 3을 사용하여 각각 필드를 정의해 놓은 것을 알 수 있다.
syntax = "proto3"
message Person{
string name = 1;
int32 id = 2;
repeated string email = 3;
}
%%writefile person.proto
syntax = "proto3";
message Person {
string name = 1;
int32 id = 2;
repeated string email = 3;
}
Overwriting person.proto
!protoc person.proto --python_out=. --descriptor_set_out=person.desc --include_imports
from person_pb2 import Person
person = Person(name="Al", id=123, email=["a@b.com"]) # Person 생성
print(person) # Person 출력
name: "Al"
id: 123
email: "a@b.com"
s = person.SerializeToString() # s는 바이트 문자열로 직렬화된 person 객체이다.
print(s)
person2 = Person() # person2는 새로운 Person 객체이다.
person2.ParseFromString(s) # ParseFromString에서 값을 파싱하여 가져온다.
person2 # 그러면 person과 동일한 person2를 얻을 수 있다.
b'\n\x02Al\x10{\x1a\x07a@b.com'
name: "Al"
id: 123
email: "a@b.com"
텐서플로 프로토콜 버퍼¶
TFRecord 파일에서 사용하는 프로토콜 버퍼는 데이터셋에 있는 하나의 샘플을 표현하는 Example 프로토콜 버퍼다.
이름을 가진 특성의 리스트를 가지고 있는데 각 특성은 바이트스트링, 정수, 실수 리스트중 하나이다.
syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
Feature는 바이트, 실수, 정수 리스트중 하나를 가지고 있다.
또한 Features는 string과 대응되는 Feature가 있는 딕셔너리를 가진다.
마지막으로 Example은 Features 객체 하나를 가진다. 따라서 아래와 같은 코드를 제작할 수 있다.
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example
person_example = Example(
# Example은 features라는 이름의 Features객체를 가지고 있으며 이는 딕셔너리 리스트이다.
features=Features(
# 또다시 Features는 feature라는 이름의 Feature객체를 가지고 이는 딕셔너리파일 이다.
feature={
# 그리고 Feature는 정수, 실수, 문자열 리스트중 하나를 가지고 있다.
"name": Feature(bytes_list=BytesList(value=[b"Alice"])),
"id": Feature(int64_list=Int64List(value=[123])),
"emails": Feature(bytes_list=BytesList(value=[b"a@b.com", b"c@d.com"]))
}))
# 이제 Example 프로토콜 버퍼가 생겼으므로 SerializeToString()을 통하여 직렬화 하고 TFRecord에 저장한다.
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
f.write(person_example.SerializeToString())
보통 하나 이상의 Example이 만들어지므로 샘플마다 하나의 Example 프로토콜 버퍼를 만들고,
직렬화를 하면 하나의 스트링이 되므로 다음 프로세스에서 셔플링을 진행하여 TFRecord 파일 여러개의 저장하면 된다.
Example 프로토콜 버퍼를 읽고 파싱하기¶
직렬화된 Example 프로토콜 버퍼를 읽으려면 tf.data.TFRecordDataset을 다시 한번 사용하고
tf.io.parse_single_example()을 사용하여 각 Example을 파싱하자. parse_single_example()에는 두가지 파라미터가 필요한데 직렬화된 데이터와 각 특성에 대한 설명이다.
이 설명은 각 특성 이름에 대해 특성의 크기, 타입, 기본값을 표현한 tf.io.FixedLenFeature이나
특성 타입만 표현된 tf.io.VarLenFeature에 매핑한 딕셔너리 이다.
feature_description = {
"name": tf.io.FixedLenFeature([], tf.string, default_value=""),
"id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"emails": tf.io.VarLenFeature(tf.string),
}
for serialized_example in tf.data.TFRecordDataset(["my_contacts.tfrecord"]):
# 결과는 딕셔너리 객체로 나온다.
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
FixedLenFeature는 보통의 텐서로 파싱되어 나오지만 VarLenFeature는 희소 텐서로 파싱된다.
sparse.to_dense()를 통해 밀집 텐서로 변환할 수 있지만 여기에서는 희소 텐서의 값을 바로 참조하는게 편하다.
BtyesList는 직렬화된 객체를 포함하여 모든 이진 데이터를 포함할 수 있다.
특히 이미지 데이터를 tf.io.encode_jpeg()를 사용하여 jpeg 포맷 이미지를 인코딩하고 이를 ByteList에
넣어서 나중에 Example을 파싱하고 다시 여기서 데이터를 파싱한 뒤, 이 데이터를 tf.io.decode_jpeg()에 넘겨서
원본 이미지를 얻을 수 있다.
from sklearn.datasets import load_sample_images
img = load_sample_images()["images"][0]
plt.imshow(img)
plt.axis("off")
plt.title("Original Image")
plt.show()
data = tf.io.encode_jpeg(img)
example_with_image = Example(features=Features(
feature={"image": Feature(bytes_list=BytesList(value=[data.numpy()]))}))
serialized_example = example_with_image.SerializeToString()
with tf.io.TFRecordWriter("my_img.tfrecord") as f:
f.write(serialized_example)
dataset = tf.data.TFRecordDataset(["my_img.tfrecord"])
for item in dataset:
serialized_example2 = item
# then save to TFRecord
feature_description = {"image": tf.io.VarLenFeature(tf.string)}
example_with_image = tf.io.parse_single_example(serialized_example2,
feature_description)
decoded_img = tf.io.decode_jpeg(example_with_image["image"].values[0])
# 이것도 가능
decoded_img = tf.io.decode_image(example_with_image["image"].values[0])
plt.imshow(decoded_img)
plt.title("Decoded Image")
plt.axis("off")
plt.show()
tf.io.serialize_tensor()을 통해서 어떤 텐서라도 직렬화 하여 바이트 스트링으로 마들 수 있다.
이를 파싱할 때는 tf.io.parse_tensor()를 사용하면 이 데이터를 파싱할 수 있다.
또한 tf.io.parse_example()를 사용하면 배치 단위로 파싱이 가능하다.
dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(10)
for serialized_examples in dataset:
parsed_examples = tf.io.parse_example(serialized_examples,
feature_description)
사실 Example 프로토콜 버퍼로 왠만하면 충분하지만 사용하기가 좀 어렵다.
샘플마다 하나의 Example 프로토콜 버퍼를 만들고 직렬화하고 셔플링하고 이를 TFRecord 파일 여러개에 저장하고...
진짜로 필요할 때 적용하자ㅠ
입력 특성 전처리¶
지금까지는 넘파이, 판다스, 사이킷런으로 데이터 파일을 전처리하곤 했다. 데이터 API로 데이터를 적재할 때
동적으로 전처리 하는 것이 가능하다 (map 메서드를 사용) 아니면 전처리 layer를 모델에 포함시키는 방식도 있다
means = np.mean(X_train, axis=0,keepdims=True)
stds = np.std(X_train, axis=0, keepdims=True)
eps = keras.backend.epsilon()
model = keras.models.Sequential([
keras.layer.Lambda(lambda inputs : (inputs-means)/(stds+eps)),
[...]
])
아니라면 사용자 정의 layer를 구성하여 짤수 있다.
class Standardization(keras.layers.Layer):
def adapt(self, data_sample):
self.means_ = np.mean(data_sample, axis=0, keepdims=True)
self.stds_ = np.std(data_sample, axis=0, keepdims=True)
def call(self, inputs):
return (inputs - self.means_) /
(self.stds_ + keras.backend.epsilon())
# std_layer를 추가하기 전에 adapt 메서드를 실행시키자.
std_layer = Standardization()
std_layer.adapt(data_sample)
원-핫 벡터를 사용해 범주형 특성 인코딩하기¶
2장에서 사용했던 ocean_proximity 특성은 범주형 특성으로 5개의 값이 존재했다.
이를 인코딩 해줘야하는데 범주 개수가 작으니 원-핫 인코딩이 가능하다.
# 어휘 사전(vocab)과 각 해당하는 인덱스의 텐서를 만든다.
vocab = ['<1H OCEAN', 'INLAND', 'ISLAND', 'NEAR BAY', 'NEAR OCEAN']
indices = tf.range(len(vocab), dtype=tf.int64)
# 카테고리 리스트와 해당 인덱스를 전달해서 룩업 테이블을 위한 초기화 개체를 만든다.
# 만약 카테고리가 텍스트 파일에 한줄 씩 있다면 TextFileInitializer를 사용하자
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
# 만약 사전에 없는 단어가 나온다면 oov 버킷 중 하나에 지정한다 (oov 버킷의 인덱스는 알려진 이후부터 할당)
# 버킷이 충분하지 않다면 다른 카테고리가 동일한 버킷에 할당될 수 있다.
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)
categories = tf.constant(["NEAR BAY","DESERT","INLAND","INLAND"])
cat_indices = table.lookup(categories)
# 각각 해당하는 인덱스를 찾아갔고 DESERT는 없었기 때문에 oov 버킷에 할당된다(5)
cat_indices
<tf.Tensor: shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1], dtype=int64)>
cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab)+num_oov_buckets)
cat_one_hot
<tf.Tensor: shape=(4, 7), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
[0., 0., 0., 0., 0., 1., 0.],
[0., 1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>
임베딩을 사용하여 범주형 특성 인코딩하기¶
임베딩은 카테고리를 표현하는 훈련 가능한 밀집 벡터이다. 처음에는 랜덤하게 초기화 되어있지만
시간이 지날수록 비슷한 카테고리들은 가까워지고 아니라면 멀어진다. (표현 학습)
먼저 각 카테고리의 임베딩을 담음 임베딩 행렬을 만들고 랜덤하게 초기화시킨다.
embedding_dim = 2
embed_init = tf.random.uniform([len(vocab)+ num_oov_buckets, embedding_dim])
embedding_matrix = tf.Variable(embed_init)
# 각 카테고리와 oov에 대한 임베딩이 랜덤하게 지정되었음을 확인할 수 있다.
embedding_matrix
<tf.Variable 'Variable:0' shape=(7, 2) dtype=float32, numpy=
array([[0.6860026 , 0.00978911],
[0.55632424, 0.23093927],
[0.16382265, 0.70073605],
[0.45057046, 0.17368972],
[0.01148427, 0.600549 ],
[0.6090733 , 0.2723205 ],
[0.43012857, 0.02130008]], dtype=float32)>
# 임베딩 행렬에서 주어진 인덱스에 해당하는 행을 찾아준다.
tf.nn.embedding_lookup(embedding_matrix, cat_indices)
<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[0.45057046, 0.17368972],
[0.6090733 , 0.2723205 ],
[0.55632424, 0.23093927],
[0.55632424, 0.23093927]], dtype=float32)>
케라스에는 이미 임베딩 행렬을 처리해주는 keras.layers.Embedding 층이 있다.
층을 생성할 때 행렬을 랜덤하게 초기화하고 카테고리 인덱스로 호출할 때 해당하는 인덱스의 행을 반환한다.
embedding = keras.layers.Embedding(input_dim=len(vocab)+num_oov_buckets,
output_dim=embedding_dim)
embedding(cat_indices)
<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[ 0.04570037, 0.01323093],
[-0.00908439, 0.03023663],
[ 0.01441665, -0.01148947],
[ 0.01441665, -0.01148947]], dtype=float32)>
케라스 전처리 layer¶
텐서플로 팀이 표준 케라스 전처리 layer를 제공하기 위해 노력중이다.
케라스 전처리 layer를 사용하면 전처리 과정을 훨씬 편하게 진행할 수 있다.
# [0,1] 사이의 값으로 스케일을 조정해주는 정규화 layer
normalization = keras.layers.Normalization()
# 연속적인 데이터를 몇 개의 구간(bin)으로 나누고 각 구간을 원-핫 벡터로 인코딩해준다.
discretization = keras.layers.Discretization([...])
# 여러 전처리 layer를 연결할 수 있는 pipeline을 만들어준다.
pipeline = keras.layers.PreprocessingStage(
[normalization, discretization])
# 해당 파이프라인을 샘플 데이터에 적응시킨 다음에 일반적인 층처럼 모델에 이용할 수 있다.
pipeline.adapt(data_sample)
그리고 TextVectoization layer를 이용하여 각 단어를 정수 인덱스와 매칭시켜준다.
(option 에 따라서 단어의 출현 횟수를 카운팅 할 수 있다)
text_dataset = tf.data.Dataset.from_tensor_slices(["foo", "bar", "baz"])
vectorize_layer = tf.keras.layers.TextVectorization(output_mode='int')
vectorize_layer.adapt(text_dataset)
print(vectorize_layer.get_vocabulary())
vectorize_layer("foo baz foo")
['', '[UNK]', 'foo', 'baz', 'bar']
<tf.Tensor: shape=(3,), dtype=int64, numpy=array([2, 3, 2], dtype=int64)>
# TextVectorization layer에는 단어 인덱스 대신, 단어가 총 몇번 나왔는지 계산해주는 옵션이 있다.
text_dataset = tf.data.Dataset.from_tensor_slices(["foo", "bar", "baz"])
vectorize_layer = tf.keras.layers.TextVectorization(output_mode='count')
vectorize_layer.adapt(text_dataset)
print(vectorize_layer.get_vocabulary())
vectorize_layer("foo baz foo")
['[UNK]', 'foo', 'baz', 'bar']
<tf.Tensor: shape=(4,), dtype=float32, numpy=array([0., 2., 1., 0.], dtype=float32)>
그런데 자주 등장하는 단어보다 적게 등장하는 단어가 중요한 단어인 경우가 많다.
따라서 단어의 카운트는 자주 등장하는 단어의 중요도를 줄이는 방향으로 진행될 필요가 있다.
이를 TF-IDF (Term Frequency- Inverse Document Frequency) 라고 부른다.
\begin{equation} \textrm{단어 카운트}\;\bullet\;\log{(1+\frac{\textrm{전체 샘플 수}}{\textrm{1 + 단어가 등장하는 샘플 수}})} \end{equation}
TF 변환¶
전처리는 계산 비용이 크기 때문에 매번 처리하기 보다는 사전에 한번만 처리하는 것이 효율적이다.
특히 RAM에 들어갈 정도로 데이터셋이 작다면 cache() 메서드를 사용할 수 있다.
만약 데이터가 너무 크다면 아파치 빔이나 스파크를 이용하는데 문제는 모델을 배포할 때 발생한다.
훈련시에야 괜찮다지만 온라인 서빙시에는 매번 들어오는 데이터에 대해 전처리를 진행해야한다.
하나의 모델에 대하여 여러 플렛폼에다가 배포한다고 한다면 각각의 플렛폼마다 전처리를 위한 코드를 추가해야한다.
그런와중에 전처리 과정이 바뀌면? 모든 플렛폼 각각에 대하여 전처리 코드를 수정해야하고 이는 시간과 에러를 일으킨다.
게다가 훈련 전에 수행한 전처리와 플렛폼 환경에서 시행하는 전처리 과정에는 차이가 있을 수 있고
이런 훈련 / 서빙 왜곡 은 버그와 성능감소로 이어진다.
이를 막기위하여 아파치 빔이나 스파크와 같은 코드로 작성된 모델을 들고와서 각 플렛폼에 배포하기 전에
전처리를 담당하는 layer를 추가하는 것이 훨씬 낫다. 이러면 모델 코드와 전처리 layer의 코드로만 나뉘기에 관리가 편하다.
그런데 이마저도 싫어서 전처리 연산을 딱 한번만 정의해서 모든 플렛폼에서 사용하고 싶다면? TF 변환 을 이용하자
정리글 : https://datacrew.tech/tfx-preprocessing/
import tensorflow_transform as tft
def preprocess(inputs): # inputs is a batch of input features
median_age = inputs["housing_median_age"]
ocean_proximity = inputs["ocean_proximity"]
# 이렇게 통계를 계산하는 컴포넌트를 analyzer 라고 부른다.
standardized_age = tft.scale_to_z_score(median_age)
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return {
"standardized_median_age": standardized_age,
"ocean_proximity_id": ocean_proximity_id
}
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Input In [61], in <cell line: 1>()
----> 1 import tensorflow_transform as tft
3 def preprocess(inputs): # inputs is a batch of input features
4 median_age = inputs["housing_median_age"]
ModuleNotFoundError: No module named 'tensorflow_transform'
텐서플로 데이터셋(TFDS)¶
널리사용되는 데이터셋을 손쉽게 다운받아보자.
간단하게 pip로 설치하고 tfds.load() 를 호출하면 끝이다.
보통 데이터셋의 딕셔너리로 데이터를 반환해준다.
import tensorflow_datasets as tfds
datasets = tfds.load(name="mnist")
mnist_train, mnist_test = datasets["train"], datasets["test"]
plt.figure(figsize=(6, 3))
mnist_train_done = mnist_train.repeat(5).shuffle(1000).batch(32).prefetch(1)
for item in mnist_train_done:
images = item["image"]
labels = item["label"]
for index in range(5):
plt.subplot(1, 5, index + 1)
image = images[index, ..., 0]
label = labels[index].numpy()
plt.imshow(image, cmap="binary")
plt.title(label)
plt.axis("off")
break # just showing part of the first batch
다만 케라스는 특성과 레이블이 담긴 튜플 아이템을 기대하므로 map 메서드로 변환해주자.
mnist_train_div = mnist_train_done.map(lambda items:
(items["image"], items["label"]))
mnist_train_div = mnist_train_div.prefetch(1)
for images, labels in mnist_train_div.take(1):
print(images.shape)
print(labels.numpy())
(32, 28, 28, 1)
[7 0 6 4 7 3 5 1 9 4 0 2 4 7 9 1 3 8 1 4 4 1 3 9 8 7 1 5 7 5 3 4]
as_supervised 옵션을 True로 지정해주면 load() 함수를 호출하는 것이 간단하다.
배치 크기도 지정할 수 있으며 이를 바로 tf.keras 모델에 넘겨줄 수 있다.
datasets = tfds.load(name="mnist",
batch_size=32,
as_supervised=True)
mnist_train = datasets["train"].repeat().prefetch(1)
model = keras.models.Sequential([...])
model.compile(...)
model.fit(mnist_train, epochs=5)
'AI' 카테고리의 다른 글
[핸즈온 머신러닝] chapter 15. RNN (0) | 2022.06.01 |
---|---|
[핸즈온 머신러닝] chapter 14. 합성곱 신경망 (0) | 2022.05.18 |
[핸즈온 머신러닝] chapter 12. 텐서플로에서의 사용자 정의 모델과 훈련 (0) | 2022.04.28 |
[핸즈온 머신러닝] chapter 11. 심층 신경망 훈련 (0) | 2022.04.22 |
[핸즈온 머신러닝] chapter 10. 케라스를 이용한 인공 신경망 (0) | 2022.04.22 |
- Total
- Today
- Yesterday