티스토리 뷰
# 파이썬 ≥3.5 필수
import sys
assert sys.version_info >= (3, 5)
# 사이킷런 ≥0.20 필수
import sklearn
assert sklearn.__version__ >= "0.20"
# 이 노트북은 텐서플로 ≥2.4이 필요합니다
# 2.x 버전은 대부분 동일한 결과를 만들지만 몇 가지 버그가 있습니다.
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"
# 공통 모듈 임포트
import numpy as np
import os
# 노트북 실행 결과를 동일하게 유지하기 위해
np.random.seed(42)
tf.random.set_seed(42)
# 깔끔한 그래프 출력을 위해
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
plt.style.use('default')
텐서플로를 사용한 사용자 정의 모델과 훈련¶
케라스를 넘어 이제는 텐서플로 그자체이다.
구글이 개발한 수치 계산을 위한 라이브러리이기에 대규모 머신러닝에 잘 맞도로 튜닝되어 있다.
핵심구조는 Numpy와 유사하지만 GPU를 지원하기에 병렬연산이 가능하고 분산 컴퓨팅을 지원한다.
C++ 코드로 구현되어 있고, 많은 연산들이 커널이라고 부르는 여러 구현을 가진다.
텐서플로를 Numpy 처럼 이용하기¶
텐서플로 API는 텐서라는 넘파이의 ndarray와 비슷한 다차원 배열의 자료구조를 순환시킨다.
텐서는 한 연산에서 다른 연산으로 흘러다니고 이 때문에 tensorFlow라고 부른다
텐서와 연산¶
tf.constant() 함수로 텐서를 만들 수 있다. 사실 Numpy랑 엄청 닮았다. (라고 하기엔 numpy속성을 가진다)
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
print(t, '\n') # 행렬
print(t.shape, '\n') # tf.Tensor는 shape와 dtype을 속성으로 가진다.
print(tf.constant(42)) # 스칼라 값도 가능!
tf.Tensor(
[[1. 2. 3.]
[4. 5. 6.]], shape=(2, 3), dtype=float32)
(2, 3)
tf.Tensor(42, shape=(), dtype=int32)
인덱스 참조나 연산 함수도 Numpy하고 매우 비슷하게 작동한다. (물론 기억이 가물가물)
그런데 종종 다른 것도 있으니 해보고 안되면 API 레퍼런스 찾아보자.
print(t[:, 1:], '\n')
# tf.newaxis는 차원을 추가시켜 준다. 원래대로면 shape가 (2)였겠지만 (2,1)로 만든다.
print(t[:, 1, tf.newaxis], '\n')
print(t + 10, '\n')
print(tf.square(t), '\n')
# @ 기호는 행렬 곱셈을 의미한다 == tf.matmul()
# t.T 와 같이 numpy에서는 가능했지만 텐서플로는 불가능하다;;
print(t @ tf.transpose(t), '\n')
tf.Tensor(
[[2. 3.]
[5. 6.]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[2.]
[5.]], shape=(2, 1), dtype=float32)
tf.Tensor(
[[11. 12. 13.]
[14. 15. 16.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[ 1. 4. 9.]
[16. 25. 36.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[14. 32.]
[32. 77.]], shape=(2, 2), dtype=float32)
텐서와 Numpy¶
텐서는 Numpy와 함께 사용하기 좋다. 텐서로 넘파이 배열을 만들 수 있고 그 역도 된다.
텐서에 Numpy연산이 되고 그 역도 된다! 다만, 넘파이는 64bit 정밀도를, 텐서플로는 32bit 정밀도를 기본값으로 이용하기에
넘파이 배열로 텐서를 만들 때 dtype=tf.float32 로 지정해야한다.
a = np.array([2., 4., 5.])
print(tf.constant(a), '\n') # Numpy 배열로 텐서 만들기
print(t, '\n') # 텐서
print(t.numpy()) # Numpy 배열 (np.array(t) 와 같다)
print(tf.square(a),'\n') # 텐서 연산을 넘파이 배열에 적용
print(np.square(t)) # 넘파이 연산을 텐서에 적용
tf.Tensor([2. 4. 5.], shape=(3,), dtype=float64)
tf.Tensor(
[[1. 2. 3.]
[4. 5. 6.]], shape=(2, 3), dtype=float32)
[[1. 2. 3.]
[4. 5. 6.]]
tf.Tensor([ 4. 16. 25.], shape=(3,), dtype=float64)
[[ 1. 4. 9.]
[16. 25. 36.]]
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
(그렇지 않으면 역전파로 갱신해야 하는 신경망의 가중치를 조작하는 것이 불가능)
tf.Variable 은 tf.Tensor와 비슷하게 동작하고 연산도 비슷하다.
다만 assign() 메서드를 사용하여 변숫값을 변경할 수 있다.
scatter_update(), scatter_nd_update() 메서드를 사용하면 개별원소 수정 및 슬라이스를 할 수 있다.
# 전체 원소에 2를 곱한다.
v.assign(2*v)
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 4., 6.],
[ 8., 10., 12.]], dtype=float32)>
# v[0][1] 위치에 42를 넣어준다.
v[0, 1].assign(42)
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 6.],
[ 8., 10., 12.]], dtype=float32)>
# 2번째 열에 각각 0. 1. 을 넣어준다.
v[:,2].assign([0., 1.])
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 0.],
[ 8., 10., 1.]], dtype=float32)>
# v[0][0]와 v[1][2]에 각각 값을 넣어줌
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100., 42., 0.],
[ 8., 10., 200.]], dtype=float32)>
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
X_train_full, y_train_full, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)
# 텐서플로의 장점을 활용하려면 텐서플로 연산만을 이용하자.
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < 1
squared_loss = tf.square(error) / 2
linear_loss = tf.abs(error) - 0.5
return tf.where(is_small_error, squared_loss, linear_loss)
input_shape = X_train.shape[1:]
model = keras.models.Sequential([
keras.layers.Dense(30,
activation="selu",
kernel_initializer="lecun_normal",
input_shape=input_shape),
keras.layers.Dense(1),
])
model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled,
y_train,
epochs=2,
validation_data=(X_valid_scaled, y_valid))
Epoch 1/2
363/363 [==============================] - 1s 1ms/step - loss: 0.6235 - mae: 0.9953 - val_loss: 0.2862 - val_mae: 0.5866
Epoch 2/2
363/363 [==============================] - 0s 936us/step - loss: 0.2197 - mae: 0.5177 - val_loss: 0.2382 - val_mae: 0.5281
<keras.callbacks.History at 0x1a03869fbe0>
사용자 정의 요소를 가진 모델을 저장하고 로드하기¶
만약 위의 huber_fn 처럼 사용자 정의 손실함수 처럼 사용자 정의 요소를 사용하더라도 모델의 저장은 잘 이뤄진다.
다만, 모델을 로드할 때, 사용자 정의 요소의 이름과 객체를 매핑한 딕셔너리를 전달해야 한다.
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
custom_objects={"huber_fn": huber_fn})
만약 후버 손실 함수에서 오차의 기준을 바꾸고 싶어서 파라미터를 받을 수 있도록 한다고 하자.
def create_huber(threshold=1.0):
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < threshold
squared_loss = tf.square(error) / 2
linear_loss = threshold * tf.abs(error) - threshold**2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
return huber_fn
그러고 모델을 저장한다면 threshold가 저장되기를 원하지만 실제로는 그렇지 않다.
따라서 다음과 같이 모델을 로드할 때, 딕셔너리에 넘겨줘야 한다.
keras.models.load_model(
"my_model_with_a_custom_loss2.h5",
custom_objects={"huber_fn": create_huber(2.0)}
)
만약 이렇게 매번 지정해주기 싫다면 keras.losses.Loss 클래스를 상속하고 get_config()를 구현하자
class HuberLoss(keras.losses.Loss):
def __init__(self, threshold=1.0, **kwargs):
self.threshold = threshold
super().__init__(**kwargs) # super() : 부모 클래스를 호출해준다.
def call(self, y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < self.threshold
squared_loss = tf.square(error) / 2
linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
def get_config(self):
base_config = super().get_config() # 부모 클래스에서 get_config()로 정보를 받아온다.
return {**base_config, "threshold": self.threshold}
# **는 딕셔너리 언패킹 연산자로 이를 통해 딕셔너리를 파라미터로 빠르게 넘겨줄 수 있다.
이런식으로 손실함수를 선언해주면 나중에 모델을 컴파일할 때 임계값도 같이 넘어가고 저장된다.
모델을 저장할 때 케라스가 get_config() 메서드를 호출해서 반환된 설정을 HDF5 파일에 JSON형태로 저장해준다.
모델을 로드하면 HuberLoss 클래스의 from_config() 클래스 메서드를 호출하는데
이 메서드는 기본 손실 클래스 (Loss)에 구현되어 있으며
생성자에게 **config 파라미터를 전달해서 클래스의 인스턴스를 만든다.
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])
model.save("my_model_with_a_custom_loss_class.h5")
model =
keras.models.load_model("my_model_with_a_custom_loss_class.h5",
custom_objects={"HuberLoss": HuberLoss})
활성화 함수, 초기화, 규제, 제한을 커스터마이징하기¶
대부분의 케라스 기능들은 유사한 방법으로 커스터마이징이 가능하다.
대부분의 입력과 출력을 가진 간단한 함수를 작성하면 된다.
def my_softplus(z): # tf.nn.softplus(z) 값을 반환합니다
return tf.math.log(tf.exp(z) + 1.0)
def my_glorot_initializer(shape, dtype=tf.float32):
stddev = tf.sqrt(2. / (shape[0] + shape[1]))
return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights):
return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights): # tf.nn.relu(weights) 값을 반환합니다
return tf.where(weights < 0., tf.zeros_like(weights), weights)
layer = keras.layers.Dense(1,
activation=my_softplus,
kernel_initializer=my_glorot_initializer,
kernel_regularizer=my_l1_regularizer,
kernel_constraint=my_positive_weights)
함수가 모델과 함께 저장해야 하는 하이퍼파라미터를 가지고 있다면 다음과 같은 클래스를 상속하자.
keras.regularizers.Regularizer
keras.constraints.Constraint
keras.initializers.Initializer
keras.layers.Layer
그 다음부터는 사용자 정의 손실을 만들었던 것처럼 init, call, get_config를 만들어주자.
class MyL1Regularizer(keras.regularizers.Regularizer):
def __init__(self, factor):
self.factor = factor
def __call__(self, weights):
return tf.reduce_sum(tf.abs(self.factor * weights))
def get_config(self):
return {"factor": self.factor}
다만 사용자 정의 지표 (Metrics)는 조금 다르다.
사용자 정의 지표¶
손실과 지표가 아주 다른 것은 아니지만 지표는 미분 불능이어도 되고, 보기좋고 이해하기 편해야한다.
그렇기에 사용자 정의 지표를 사용해서 내가 보기 쉽게 만들어보자.
대부분의 경우 지표를 만드는 방식은 손실 함수를 만드는 것과 동일하다.
당장 후버 손실 함수를 지표로 가져다 써도 잘 작동한다.
model.compile.(loss="mse",
optimizer="nadam",
metrics=[create_huber(2.0)])
훈련하는 동안 각 배치에 대하여 케라스는 지표를 계산하고 에포크마다 시작할 때부터 평균을 새롭게 계산한다.
이 방식이 왠만하면 맞는 편이지만 항상 맞지는 않는다. 당장 정밀도 (precision)을 예시로 들어보자.
우리는 정밀도를 내가 양성이라고 판정내린 것들중에 실제 양성인것의 비율이라고 부른다.
만약 매번 에포크마다 계산을하고 평균을 낸다면 모델 전체의 정밀도를 구할 수 없다. 따라서 점진적으로 전체 정밀도를
업데이트해줘야 하는데 이를 위해 keras.metrics.Precision 클래스를 사용한다.
이 클래스는 배치를 처리하면서 지금까지 전체 정밀도를 계산해준다.
따라서 점진적으로 업데이트되므로 스트리밍 지표 또는 stateful metrics 라고 부른다.
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])
<tf.Tensor: shape=(), dtype=float32, numpy=0.8>
# 지표를 계속해서 업데이트 해준다.
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>
# 중간 결과를 보고싶다면 result() 메서드를 이용하자.
precision.result()
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>
# 중간 true pos 와 false pos를 저장한 variables attribs
precision.variables
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
<tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]
precision.reset_states() # 초기화
만약 이런 스트리밍 지표를 만들고 싶다면 keras.metrics.Metric 클래스를 상속한다.
class HuberMetric(keras.metrics.Metric):
def __init__(self, threshold=1.0, **kwargs):
super().__init__(**kwargs) # 기본 매개변수 처리 (예를 들면, dtype)
self.threshold = threshold
self.huber_fn = create_huber(threshold)
self.total = self.add_weight("total", initializer="zeros")
self.count = self.add_weight("count", initializer="zeros")
def update_state(self, y_true, y_pred, sample_weight=None):
metric = self.huber_fn(y_true, y_pred)
self.total.assign_add(tf.reduce_sum(metric))
self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
def result(self):
return self.total / self.count
def get_config(self):
base_config = super().get_config()
return {**base_config, "threshold": self.threshold}
사용자 정의 층 (layer)¶
텐서플로에 없는 충이나, 반복되는 여러 층을 묶어서 사용자 정의 layer로 만들 수 있다.
먼저, 가중치가 필요 없는 사용자 정의 층을 만들기 위해 keras.layers.Lambda 층으로 감싸자.
exponential_layer = keras.layers.Lambda(lambda x:tf.exp(x))
여담이지만 지수 함수는 이따금 회귀 모델에서 예측값의 스케일이 매우 다를 때 이용할 수 있다.
상태가 있는 층, 즉 가중치를 가진 층을 만들기 위해서는 keras.layers.Layer 를 상속해야 한다.
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
#문자열로 받아서 활성화 함수 객체를 돌려준다.
self.activation = keras.activations.get(activation)
def build(self, batch_input_shape):
self.kernel = self.add_weight(
name="kernel",
shape=[batch_input_shape[-1], self.units],
initializer="glorot_normal")
self.bias = self.add_weight(name="bias",
shape=[self.units],
initializer="zeros")
# must be at the end, 그래야 케라스가 층이 만들어 졌음을 알 수 있다.
super().build(batch_input_shape)
def call(self, X):
return self.activation(X @ self.kernel + self.bias)
def compute_output_shape(self, batch_input_shape):
# 이 층의 출력 크기를 반환한다. 이 예시에서는 마지막 차원을 제외하고 입력과 크기가 같다.
# 참고로 마지막 차원의 크기는 이 층의 뉴런 개수이다.
# 케라스에서 크기는 tf.TensorShape 객체로 받아올 수 있다.
return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
def get_config(self):
base_config = super().get_config()
return {
**base_config,
"units": self.units,
# keras.activations.serialize()를 이용하여 활성화 함수의 모든 설정을 저장한다.
"activation": keras.activations.serialize(self.activation)
}
여러가지 입력을 받는 layer (예를 들어 concat layer)를 만들 때는 call() 메서드에
모든 입력이 포함된 튜플을 파라미터로 넘겨줘야 한다.
이와 마찬가지로 compute_output_shape() 를 만들때도 각 입력의 배치 크기를 담은 튜플을 넘겨줘야 한다.
class MyMultiLayer(keras.layers.Layer):
def call(self, X):
X1, X2 = X
print("X1.shape: ", X1.shape, " X2.shape: ", X2.shape) # 사용자 정의 층 디버깅
return X1 + X2, X1 * X2
def compute_output_shape(self, batch_input_shape):
batch_input_shape1, batch_input_shape2 = batch_input_shape
return [batch_input_shape1, batch_input_shape2]
훈련과 테스트에서 다르게 동작하는 드롭아웃이나 배치 정규화층과 같은 layer라면 call() 메서드에
training 파라미터를 추가하여 훈련인지 테스트인지 결정하자.
class AddGaussianNoise(keras.layers.Layer):
def __init__(self, stddev, **kwargs):
super().__init__(**kwargs)
self.stddev = stddev
def call(self, X, training=None):
if training:
noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
return X + noise
else:
return X
def compute_output_shape(self, batch_input_shape):
return batch_input_shape
사용자 정의 모델¶
keras.Model 클래스를 상속하여 생성자에서 층과 변수를 만들자.
스킵연결이 있는 잔차 블록을 3번 반복 시키고 다음 잔차 블록으로 넘기는 모델을 만들어보자.
우선 출력값에 입력값을 더해주는 잔차 블록 (Residual block) 을 만든다.
class ResidualBlock(keras.layers.Layer):
def __init__(self, n_layers, n_neurons, **kwargs):
super().__init__(**kwargs)
self.hidden = [
keras.layers.Dense(n_neurons,
activation="elu",
kernel_initializer="he_normal")
for _ in range(n_layers) # n_layer 만큼 레이어의 개수를 늘려준다.
]
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
return inputs + Z
class ResidualRegressor(keras.models.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(30, activation="elu",
kernel_initializer="he_normal")
self.block1 = ResidualBlock(2, 30)
self.block2 = ResidualBlock(2, 30)
self.out = keras.layers.Dense(output_dim)
def call(self, inputs):
Z = self.hidden1(inputs)
for _ in range(1 + 3):
Z = self.block1(Z)
Z = self.block2(Z)
return self.out(Z)
save() 메서드와 load_model() 을 사용하고 싶다면 layer와 model 클래스 모두 get_config()를 구현하자.
또한 이 과정에서 weights가 저장되진 않으니 save_weights() 와 load_weights() 를 사용하면 된다.
모델 구성 요소에 기반한 손실과 지표¶
앞선 사용자 손실과 지표 모두 예측값과 실제 레이블을 기반으로 한다.
하지만 때때로 은닉층의 가중치나 활성화 함수와 같이 모델의 구성요소에 기반한 손실을 정의해야 할 때가 있다.
이런 손실은 규제나 모델의 내부 상황을 모니터링할 때 유용하다.
연습을 위하여 맨 위 은닉층에 보조 출력으로 재구성 손실을 구한다. (재구성과 입력 사이의 MSE)
재구성 손실을 주 손실에 더하여 회귀에 도움이 안되도 은닉층을 통과하면서도 가능한 많은 정보를 유지하도록 해보자
(사실 이래야 규제 손실처럼 동작하며 일반화 성능이 강화된다)
class ReconstructingRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden = [
keras.layers.Dense(30,
activation="selu",
kernel_initializer="lecun_normal")
for _ in range(5)
]
self.out = keras.layers.Dense(output_dim)
def build(self, batch_input_shape):
n_inputs = batch_input_shape[-1]
self.reconstruct = keras.layers.Dense(n_inputs)
# *중요* layer를 상속받고 build 메서드를 작성할 때는 super().build()를 해주는게 맞다.
# 그런데 TF 2.2 에서 발생한 이슈로 model을 상속받고 build 메서드를 작성할 때는 쓰면 안된다.
# super().build(batch_input_shape)
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
reconstruction = self.reconstruct(Z)
recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
# 훈련중에 재구성과 입력 사이에 발생한 제곱 오차를 모델의 손실 리스트에 추가한다.
self.add_loss(0.05 * recon_loss)
return self.out(Z)
자동 미분을 사용하여 그레디언트 계산하기¶
신경망은 워낙 커서 도함수를 계산하기가 어렵다. 파라미터 마다 계산하기도 어렵기에 자동 미분을 사용한다.
이를 위해 tf.GradientTape() 를 만들어 이 변수와 관련된 모든 연산을 기록하자.
def f(w1, w2):
return 3 * w1**2 + 2 * w1 * w2
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
z = f(w1, w2)
# 이러면 w1, w2에 대한 z의 그레디언트가 저장된다.
# 다만, gradient()를 한번이라도 호출하는 순간 자동으로 테이프가 지워지기에 2번 이상 호출하면 에러가 발생
# 지속 가능한 테이프를 만들고 싶다면 tf.GradientTape(persistent=True)로 선언하자.
gradients = tape.gradient(z,[w1,w2])
gradients
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
테이프는 변수에 대해서만 연산을 기록해준다. 만약 tf.Constant를 이용한다면 None이 반환된다.
굳이 Constant에 대해서도 보고싶다면 테이프 내부에서 tape.watch(모든 텐서) 로 써서 강제할 수 있다.
입력값 처럼 변수가 아닌 값들에 대해 변동 폭이 큰 활성화 함수에 대한 규제 손실을 구현할 때 유용하다.
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
tape.watch(c1)
tape.watch(c2)
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
가끔은 그레디언트가 역전파되지 않도록 막을 필요가 있다. 이를 위해 tf.stop_gradient()를 이용하자.
그러면 f(w1, w2)를 계산할 때 (정방향)는 값이 제대로 나오지만 미분값을 계산할 때는 상수항 취급된다.
def f(w1, w2):
return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)
with tf.GradientTape() as tape:
z = f(w1, w2)
tape.gradient(z, [w1, w2])
[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]
자동 미분을 사용하여 함수의 그레디언트를 계산하는 것은 수치적으로 불안정하다.
부동소수점 정밀도 오차에 의해 무한 나누기 무한을 계산할 수 있다. (NaN반환)
예를 들어 my_softplus() 함수의 그레디언트를 구한다 했을 때 자동미분을 이용하면 NaN이 반환된다.
이를 막기위해 일반 출력과 도함수를 계산하는 함수를 반환해서 안전한 계산을 유도하자
@tf.custom_gradient
def my_better_softplus(z):
exp = tf.exp(z)
def my_softplus_gradients(grad):
# 원래는 exp (1 + 1 / exp) 이다.
return grad / (1 + 1 / exp)
return tf.math.log(exp + 1), my_softplus_gradients
사용자 정의 훈련 반복¶
fit() 마저도 유연하지 않을 수 있다. 예를 들면 두 개의 다른 옵티마이저를 사용한다거나...
이 경우 fit()은 하나의 옵티마이저만 이용하므로 불가능하다.
이를 위해 사용자 정의 훈련을 만들어보자.
# 우선 모델을 정의하자.
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
keras.layers.Dense(30,
activation="elu",
kernel_initializer="he_normal",
kernel_regularizer=l2_reg),
keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
# 샘플 배치를 랜덤하게 추출하는 함수
def random_batch(X, y, batch_size=32):
idx = np.random.randint(len(X), size=batch_size)
return X[idx], y[idx]
# 메트릭스 이름과 결과를 출력한다.
def print_status_bar(iteration, total, loss, metrics=None):
metrics = " - ".join([
"{}: {:.4f}".format(m.name, m.result())
for m in [loss] + (metrics or [])
])
end = "" if iteration < total else "\n"
print("\r{}/{} - ".format(iteration, total) + metrics, end=end)
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
for epoch in range(1, n_epochs + 1):
print("Epoch {}/{}".format(epoch, n_epochs))
for step in range(1, n_steps + 1):
X_batch, y_batch = random_batch(X_train_scaled, y_train)
# 훈련가능한 모든 변수에 대한 손실함수의 그레디언트를 계산한다.
with tf.GradientTape() as tape:
y_pred = model(X_batch)
# mean_squared_error 가 하나의 샘플에 대한 값을 반환하므로
# reduce_mean 을 이용하여 배치에 대한 평균을 구해주자.
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
# 층마다 존재하는 규제 손실 (model.losses)를 더해주자.
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# 모델에 가중치 제한이 추가되면 변수들에 대하여서 constraint를 적용해주자.
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
mean_loss(loss) # 스트리밍 지표임 (지금까지의 평균 오차 계산)
for metric in metrics:
metric(y_batch, y_pred)
print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
for metric in [mean_loss] + metrics:
metric.reset_states()
책에서도 언급했듯이 주의할 점도 많고 복잡하다. 하지만 확실한 것은 완전하게 모델을 제어할 수 있다는 장점이 있다.
텐서플로 함수와 그래프¶
텐서플로에서 그래프는 API의 핵심이므로 피할 수 없었다. 예전이야 복잡했지만 지금은 쉬워졌다.
예시로 파이썬 함수를 만들면 정수나 파이썬 상수, 텐서를 통해 호출 할 수 있다.
텐서플로 함수는 불필요한 노드를 제거하고 표현을 단순화 시켜주는 식으로 계산 그래프를 최적화시킨다.
따라서 원본 파이썬 함수보다 훨씬 빠르게 실행되므로 복잡한 연산을 수행할 때 필수적이다.
그런데 사용자 정의 함수를 작성하고 케라스 모델에 가져다 쓰면 알아서 케라스가 텐서플로 함수로 변환해준다.
def cube(x):
return x ** 3
print(cube(2))
print(cube(tf.constant(2.0)))
8
tf.Tensor(8.0, shape=(), dtype=float32)
이 파이썬 함수를 이제 tf.function() 을 사용하여 텐서플로 함수로 만들면
파이썬 함수로 이용하는 것은 당연하고 동일한 결과를 텐서 형태로 반환해준다.
내부적으로는 파이썬 함수에서 수행되는 계산을 분석하고 동일한 작업을 수행하는 계산 그래프를 생성한다.
다만, 같은 크기의 자료형을 가지는 입력값이 들어오면 그래프를 재활용해주고 새로운 유형이면 새롭게 그래프를 생성한다.
tf_cube = tf.function(cube)
print(tf_cube(2))
print(tf_cube(tf.Variable(2.0)))
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(8.0, shape=(), dtype=float32)
다른방법으로는 tf.function 데코레이터가 더 널리 사용된다.
저렇게 @tf.function 쓰고 밑에다가 사용하고 싶은 함수를 작성하면 알아서 텐서플로 함수로 바꿔준다.
@tf.function
def tf_tube(x):
return x ** 3
또한 원본 파이썬 함수는 python_function 속성으로 참조할 수 있다.
tf_cube.python_function
<function __main__.cube(x)>
오토그래프와 트레이싱¶
텐서플로는 파이썬 함수의 소스코드를 분석하여 모든 제어문들을 텐서플로 연산으로 바꿔준다.
이를 오토그래프 라고 부르며 이를 통해 업그레이드된 함수를 작성하게 된다.
그다음, 이 함수를 파라미터 대신 심볼릭 텐서 라는 이름의 값이 없지만 이름, 데이터 타입과 크기를 가진 텐서를 넘겨줘서
호출시킨다. 이 함수는 그래프 모드 로 실행되며 각 텐서플로 연산들은 계산을 수행하는 것 대신에,
텐서를 출력하기 위한 그래프 노드를 추가시켜준다.
텐서플로 함수 주의점¶
- 넘파이나 표준 라이브러리를 포함한 텐서플로 이외의 라이브러리를 사용하면 트레이싱 과정중에 실행된다.
심지어 그래프에 포함도 되지 않기 때문에 텐서플로 구성 요소로 작성하자. - 다른 파이썬 함수나 텐서플로 함수를 호출할 수 있다. 이 경우 해당 함수들에 대해서 데코레이터를 안써줘도 된다.
- 함수에서 텐서플로 변수를 만든다면 처음 호출될 때만 수행되어야 한다.
- 파이썬 함수의 소스코드가 텐서플로에서 이용 가능해야 한다.
- for문에서 텐서나 데이터셋을 순회하는 for문만 써야한다. (for i in range(x) 이용 불가능)
- 성능면에서 반복문보다 벡터화된 구현이 더 좋다.
'AI' 카테고리의 다른 글
[핸즈온 머신러닝] chapter 14. 합성곱 신경망 (0) | 2022.05.18 |
---|---|
[핸즈온 머신러닝] chapter 13. 텐서플로에서의 데이터 적재와 전처리 (0) | 2022.05.04 |
[핸즈온 머신러닝] chapter 11. 심층 신경망 훈련 (0) | 2022.04.22 |
[핸즈온 머신러닝] chapter 10. 케라스를 이용한 인공 신경망 (0) | 2022.04.22 |
[핸즈온 머신러닝] chapter 9. 비지도 학습 (0) | 2022.04.22 |
- Total
- Today
- Yesterday