Tensorflow学习笔记-输入数据处理框架

引入

TensorFlow提供了一种统一的格式来存储数据,这个格式就是TFRecord。基于这个统一的数据格式,在处理数据的时候有一些通用的框架。这些通用的框架总结为获取文件列表、创建文件队列、图像预处理、合成Batch、设计损失函数、梯度下降算法。如图片总结如下:

获取文件列表、创建文件队列

TFRecord介绍

TFRecord数据文件是一种将图像数据和标签统一存储的二进制文件,能更好的利用内存,在tensorflow中快速的复制,移动,读取,存储等。

写入

通过将数据填入到tf.train.Example类,Example的protocol buffer包含了字段的tf.train.Features,使用数据修改Features, 实现将protocol buffer序列化成一个字符串, 再通过tf.python_io.TFRecordWriter类将序列化的字符串写入到TFRecord中.

读出

使用tf.TFRecordReader读取器, 通过tf.parse_single_example解析器解析,parse_single_example操作可以将Example protocol buffer解析为张量, 然后用解码器tf.decode_raw解码.

代码实现

将数据保存为tfrecord格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import tensorflowas tf
from PILimport Image
import matplotlib.pyplotas plt

cwd='/home/ruyiwei/Documents/Iris&Contact/'
classes={'iris','contact'}

writer=tf.python_io.TFRecordWriter("iris_contact.tfrecords")

for index,name in enumerate(classes):   #enumerate函数返回的是索引和索引对应的取值
  class_path=cwd+name+'/'
  for img_name in os.listdir(class_path):  #os.listdir() 方法用于返回指定的文件夹包含的文件或文件夹的名字的列表。
    img_path=class_path+img_name
    img=Image.open(img_path)
    img= img.resize((512,80))
    img_raw=img.tobytes()    #将图像矩阵转化成一个字符串进行存储
    #plt.imshow(img) # if you want to check you image,pleasedelete '#'
    #plt.show()
 #将一个样例转化为Example Protocol Buffer,并将所有的信息写入这个数据结构
    example= tf.train.Example(features=tf.train.Features(feature={
      "label":tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),
      'img_raw':tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw]))
    }))
 #将一个Example序列化后写入到TFRecord文件
    writer.write(example.SerializeToString())
writer.close()

Tensorflow从TFRecord中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def read_and_decode(filename): # read iris_contact.tfrecords
  filename_queue = tf.train.string_input_producer([filename])# create a queue

  reader = tf.TFRecordReader()
  _, serialized_example = reader.read(filename_queue) #return file_name and file
  features = tf.parse_single_example(serialized_example,
                    features={
                      'label': tf.FixedLenFeature([], tf.int64),
                      'img_raw' : tf.FixedLenFeature([], tf.string),
                    })#return image and label
  #tf.decode_raw可以将字符串解析成图像对应的像素数组
  img = tf.decode_raw(features['img_raw'], tf.uint8)
  img = tf.reshape(img, [512, 80, 3]) #reshape image to 512*80*3
  img = tf.cast(img, tf.float32) * (1. / 255) - 0.5 #throw img tensor
  label = tf.cast(features['label'], tf.int32) #throw label tensor
  return img, label

将TFRecord中的数据保存为图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
filename_queue = tf.train.string_input_producer(["iris_contact.tfrecords"]) 
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)  #return file and file_name
features = tf.parse_single_example(serialized_example,
                  features={
                    'label': tf.FixedLenFeature([], tf.int64),
                    'img_raw' : tf.FixedLenFeature([], tf.string),
                  })
image = tf.decode_raw(features['img_raw'], tf.uint8)
image = tf.reshape(image, [512, 80, 3])
label = tf.cast(features['label'], tf.int32)
with tf.Session() as sess:
  init_op = tf.initialize_all_variables()
  sess.run(init_op)
  coord=tf.train.Coordinator()
  threads= tf.train.start_queue_runners(coord=coord)
  for i in range(20):
    example, l = sess.run([image,label])#take out image and label
    img=Image.fromarray(example, 'RGB')
    img.save(cwd+str(i)+'_''Label_'+str(l)+'.jpg')#save image
    print(example, l)
  coord.request_stop()
  coord.join(threads)

图像预处理

图像编/解码

图像是按照jpg等格式编码并保存在文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#解码
#读取图像的原始数据
image_raw_data=tf.gfile.FastGFile("path","rb").read()
with tf.Session() as sess:
#对jpg格式的图像进行解码,解码的结果是一个张量
img_data=tf.image.decode_jpeg(image_raw_data)
print(img_data.eval()) #输出的是一个三维矩阵
        
plt.imshow(img_data.eval()) #imshow()函数的参数X是一个要绘制的图像或数组。
plt.show()

#编码
encoded_image=tf.image.encode_jpeg(img_data)
with tf.gfile.Gfile("path","wb") as f:
f.write(encoded_image.eval()

图像大小调整

作用:在将图像的像素作为输入提供给神经网络之前,需要将图像的参数大小统一。

1
2
3
4
5
6
7
8
9
10
11
#代码实现一:调整图片大小
#首先将图片数据转化为实数类型,这一步是将0-255中的像素值转化为0.0-1.0范围内的实数
img_data=tf.image.convert_image_dtype(img_data,dtype=tf.float32)
resized=tf.image.resize_images(img_data,[300,300],method=0)
#说明:method表示图片大小调整算法

#代码实现二:对图像进行裁剪或者填充
croped=tf.image.resize_image_with_crop_or_pad(img_data,1000,1000)

#代码实现三:截取或者填充图像的中间部分
central_cropped=tf.image.central_crop(im_data,0.5)

图像翻转

1
2
3
4
5
flipped=tf.image.flip_up_down(img_data)   #上下翻转
flipped=tf.image.flip_left_right(img_data) #左右翻转
transposed=tf.image.transpose_image(img_data) #沿对角线翻转
flipped=tf.image.random_flip_left_right(img_data) #以50%的概率随机上下翻转
flipped=tf.image.random_flip_up_down(img_data) #以50%的概率随机左右翻转

图像色彩调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#调整亮度
adjusted=tf.image.adjust_brightness(img_data,-0.5) #亮度减0.5
adjusted=tf.clip_by_value(adjusted,0.0,1.)

#调整对比度
adjusted=tf.image.adjust_contrast(img_data,0.5) #将图像的对比度调整到0.5倍
adjusted=tf.image.adjust_contrast(img_data,lower,upper) #在[lower,upper]的范围内随机调整图的对比度

#调整色相
adjusted=tf.image.adjust_hue(img_data,0.1)
adjust=tf.image.adjust_hue(img_data,max_delta) #在[-max_delta,max_delta]范围内随机调整色相

#调整饱和度
adjust=tf.image.adjust_saturation(img_data,-5) #将图像的饱和度-5
adjusted=tf.image.adjust_saturation(img_data,lower,upper) #将图像在[lower,upper]范围内随机调整饱和度

#图像的标准化
adjusted=tf.image.per_image_standardization(img_data) #将图像中的三维矩阵中的数字均值变为0,方差变为1

处理标注框

1
2
3
4
5
6
7
8
9
#函数tf.image.draw_bounding_boxes()的输入是一个batch的数据
#首先需要将图片中的三维矩阵变为四维数组
boxes=tf.constant([[[0.05,0.05,0.9,0.7],[0.35,0.47,0.5,0.56]]])
begin,size,bbox_for_draw=tf.image.sample_distorted_bounding_box(
tf.shape(img_data),bounding_boxes=boxes,min_object_covered=0.4
)
batched=tf.expand_dims(tf.image.convert_image_dtype(img_data,tf.float32),0)
image_with_box=tf.image.draw_bounding_boxes(batches,bbox_for_draw)
distorted_image=tf.slice(img_data,begin,size)

合成Batch

TensorFlow队列与多线程的应用

实现队列

FIFOQueue():创建一个先入先出(FIFO)的队列

RandomShuffleQueue():创建一个随机出队的队列

enqueue_many():初始化队列中的元素

dequeue():出队

enqueue():入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import tensorflow as tf
q = tf.FIFOQueue(3,"int32")
init = q.enqueue_many(([0,1,2],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
with tf.Session() as sess:
init.run()
for a in range(5):
v,a = sess.run([x,q_inc])
print(v)
打印结果: 
0 
1 
2 
1 
2

多线程协同

TensorFlow为我们提供了多线程协同操作的类—tf.Coordinator,其函数主要有:

should_stop():确定当前线程是否退出

request_stop():通知其他线程退出

join():等待所有线程终止

假设有五个线程同时在工作,每个线程自身会先判断should_stop()的值,当其返回值为True时,则退出当前线程;如果为Flase,也继续该线程。此时如果线程3发出了request_stop()通知,则其它4个线程的should_stop()将全部变为True,然后线程4自身的should_stop()也将变为True,则退出了所有线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import tensorflow as tf
import numpy as np
import time
import threading

def MyLoop(coord,worker_id):
while not coord.should_stop():
if np.random.rand()<0.09:
print('stoping from id:',worker_id)
coord.request_stop()
else:
print('working from id:',worker_id)
time.sleep(1)

coord = tf.train.Coordinator()
#声明5个线程
threads=[threading.Thread(target=MyLoop,args=(coord,i,)) for i in range(5)]
#遍历五个线程
for t in threads:
t.start()
coord.join(threads)
打印结果:
working from id: 0
working from id: 1
working from id: 2
working from id: 3
working from id: 4
stoping from id: 0

在第一轮遍历过程中,所有进程的should_stop()都为Flase,且随机数都大于等于0.09,所以依次打印了workingfrom id:0-5,再重新回到进程0时,出现了小于0.09的随机数,即进程0发出了request_stop()请求,进程1-4的should_stop()返回值全部为True(进程退出),也就无法进入while,进程0的should_stop()返回值也将为True(退出),五个进程全部退出。

多线程操作队列

前面说到了队列的操作,多线程协同的操作,在多线程协同的代码中让每一个线程打印自己的id编号,下面我们说下如何用多线程操作一个队列。

TensorFlow提供了队列tf.QueueRunner类处理多个线程操作同一队列,启动的线程由上面提到的tf.Coordinator类统一管理,常用的操作有:

QueueRunner():启动线程,第一个参数为线程需要操作的队列,第二个参数为对队列的操作,如enqueue_op,此时的enqueue_op= queue.enqueue()

add_queue_runner():在图中的一个集合中加‘QueueRunner’,如果没有指定的合集的话,会被添加到tf.GraphKeys.QUEUE_RUNNERS合集

start_queue_runners():启动所有被添加到图中的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import tensorflow as tf

#创建队列
queue = tf.FIFOQueue(100,'float')
#入队
enqueue_op = queue.enqueue(tf.random_normal([1]))
#启动5个线程,执行enqueue_op
qr = tf.train.QueueRunner( queue,[enqueue_op] * 5)
#添加线程到图
tf.train.add_queue_runner(qr)
#出队
out_tensor = queue.dequeue()

with tf.Session() as sess:
coord = tf.train.Coordinator()
threads=tf.train.start_queue_runners(sess=sess,coord=coord)
for i in range(6):
print(sess.run(out_tensor)[0])
coord.request_stop()
coord.join(threads)
打印结果: 
-0.543751 
-0.712543 
1.32066 
0.2471 
0.313005 
-2.16349

组合训练数据

Tensorflow读出TFRecord中的数据,然后在经过预处理操作,此时需要注意:数据还是单个,而网络的输入一般以Batch为单位,因此我们需要将单个的数据组合成一个Batch,做为神经网络的输入。

Tensorflow提供组合训练数据的函数有四个:tf.train.batch(),tf.train.shuffle_batch()与tf.train.batch_join、tf.train.shuffle_batch_join,这里为什么要用与呢?其实他们是针对两种情况。tf.train.batch和tf.train.batch_join的区别,一般来说,单一文件多线程,选用tf.train.batch(需要打乱样本,有对应的tf.train.shuffle_batch);而对于多线程多文件的情况,一般选用tf.train.batch_join来获取样本(打乱样本同样也有对应的tf.train.shuffle_batch_join使用)。下面会通过具体的例子来说明。tf.train.batch(),tf.train.shuffle_batch()这两个函数都会生成一个队列,队列的入队操作是生成单个样例的方法,也就是经过预处理之后的图像。

我们首先看看一下这两个函数的定义:

1
2
3
4
5
6
def batch(tensors, batch_size, num_threads=1, capacity=32,
enqueue_many=False, shapes=None, dynamic_pad=False,
allow_smaller_final_batch=False, shared_name=None, name=None):
def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
num_threads=1, seed=None, enqueue_many=False, shapes=None,
allow_smaller_final_batch=False, shared_name=None, name=None):

这两个函数的主要参数为:

  • tensors入队队列,预处理后的数据和对应的标签。
  • batch_size:batch的大小。如果太大,则需要占用较多的内存资源,如果太小,那么出队操作可能会因为没有数据而被阻塞,从而导致训练效率降低。
  • capacity:队列的最大容量,当队列的长度等于容量时,Tensorflow将暂停入队操作,而只是等待元素出队。当队列个数小于容量时,Tensorflow将自动启动入队操作。
  • num_threads:启动多少个线程读取文件和预处理。
  • allow_smaller_final_batch:如果设置True,则会允许最后一个Batch的大小比较小,当没有足够的数据输入时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# -*- coding: utf-8 -*-
import tensorflow as tf
import os
# 生成整数型的属性
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

num_shards = 2
instance_per_shard = 10
for i in range(num_shards):
filename = 'model/data.tfrecord-%.5d-of%.5d' %(i,num_shards)
writer = tf.python_io.TFRecordWriter(filename)
for j in range(instance_per_shard):
example = tf.train.Example(features=tf.train.Features(feature={
'i':_int64_feature(i),
'j':_int64_feature(j)
}))
writer.write(example.SerializeToString())
writer.close()

tf_record_pattern = os.path.join( 'model/', 'data.tfrecord-*' )
data_files = tf.gfile.Glob( tf_record_pattern )
filename_quene = tf.train.string_input_producer(data_files,shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_quene)

features = tf.parse_single_example(serialized_example,features={
'i': tf.FixedLenFeature([],tf.int64),
'j': tf.FixedLenFeature( [], tf.int64),
})

with tf.Session() as sess:
tf.global_variables_initializer().run()
# print(sess.run(filename))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess = sess, coord = coord)
for i in range(15):
print(sess.run([features['i'],features['j']]))

coord.request_stop()
coord.join(threads)
# 输出结果为:
[0, 0]
[0, 1]
[0, 2]
[0, 3]
[0, 4]
[0, 5]
[0, 6]
[0, 7]
[0, 8]
[0, 9]
[1, 0]
[1, 1]
[1, 2]
[1, 3]
[1, 4]

设计损失函数、梯度下降算法

经典的损失函数

交叉熵损失函数

$\mathrm{H}(\mathrm{p}, \mathrm{q})=-\sum_{x} p(x) \log q(x)$,p是真实的分布,q是模型给出的分布

刻画了两个概率分布之间的距离,一般需要加一个softmax层将输出层变为概率分布,交叉熵越小,越说明两个分布之间的距离就越小。

1
tf.reduce.mean(y_*tf.log(tf.clip_by_value(y,1e-10,1.0)))

均方误差

$\operatorname{MSE}\left(\mathrm{y}, y^{\prime}\right)=\frac{\sum_{i=1}^{n}\left(y_{i}-y_{i}\right)}{n}$,常用于回归问题

1
mse=tf.reduce_mean(tf.square(y_-y))

交叉熵和softmax回归

1
2
cross_entrop=tf.nn.softmax_cross_entrop_with_logits(labels=y_,logits=y)
loss=tf.reduce_mean(cross_entrop)

说明:labels表示训练数据的正确答案,只需要传入正确答案的数字就可以,常用tf.argmax(y_,1),logits表示训练的结果

作用:在只有一个正确答案的分类问题中,使用这个函数可以进一步加速计算过程,因为相当于只有一个数来乘以一个向量了,而不是两个向量相乘

神经网络训练过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
batch_size=n
x=tf.placeholder(tf.float32.shape=(batch_size,2),name="x-input")
y_=tf.placeholder(tf.float32.shape=(batch_size,1),name="y-input")

loss=…
train_op=tf.train.AdamOptimizer(0.001).minimize(loss)

with tf.Session() as sess:
init_op=tf.global_variables_initializer()
sess.run(init_op)

for I in range(STEPS):
current_x,current_y=…
sess.run(train_op,feed_dict={x:current_x,y_:current_y})

代码总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 创建文件列表,并通过文件列表来创建文件队列。在调用输入数据处理流程前,需要统一
# 所有的原始数据格式,并将它们存储到TFRecord文件中
# match_filenames_once 获取符合正则表达式的所有文件
files = tf.train.match_filenames_once('path/to/file-*-*')
# 将文件列表生成文件队列
filename_queue = tf.train.string_input_producer(files,shuffle=True)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
# image:存储图像中的原始数据
# label该样本所对应的标签
# width,height,channel
features = tf.parse_single_example(serialized_example,features={
'image' : tf.FixedLenFeature([],tf.string),
'label': tf.FixedLenFeature([], tf.int64),
'width': tf.FixedLenFeature([], tf.int64),
'heigth': tf.FixedLenFeature([], tf.int64),
'channel': tf.FixedLenFeature([], tf.int64)
})

image, label = features['image'], features['label']
width, height = features['width'], features['height']
channel = features['channel']
# 将原始图像数据解析出像素矩阵,并根据图像尺寸还原糖图像。
decode_image = tf.decode_raw(image)
decode_image.set_shape([width,height,channel])
# 神经网络的输入大小
image_size = 299
# 对图像进行预处理操作,比对亮度、对比度、随机裁剪等操作
distorted_image = propocess_train(decode_image,image_size,None)

# shuffle_batch中的参数
min_after_dequeue = 1000
batch_size = 100
capacity = min_after_dequeue + 3*batch_size
image_batch,label_batch = tf.train.shuffle_batch([distorted_image,label],
batch_size=batch_size,capacity=capacity,
min_after_dequeue=min_after_dequeue)

logit = inference(image_batch)
loss = cal_loss(logit,label_batch)
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)

with tf.Session() as sess:
# 变量初始化
tf.global_variables_initializer().run()
# 线程初始化和启动
coord = tf.train.Coordinator()
theads = tf.train.start_queue_runners(sess=sess,coord=coord)

for i in range(STEPS):
sess.run(train_step)
# 停止所有线程
coord.request_stop()
coord.join(threads)

参考资料:

《Tensorflow:实战Google深度学习框架》

Tensorflow学习笔记-输入数据处理框架