Tensorflow——多线程输入数据处理框架(完整代码)

发布时间:2023年12月26日

我们学习使用TensorFlow对图像数据进行预处理的方法。虽然使用这些图像数据预处理的方法可以减少无关因素对图像识别模型效果的影响,但这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,TensorFlow提供了一套多线程处理输入数据的框架。

  下面总结了一个经典的输入数据处理的流程:

?  下面我们首先学习TensorFlow中队列的概念。在TensorFlow中,队列不仅是一种数据结构,它更提供了多线程机制。队列也是TensorFlow多线程输入数据处理框架的基础。然后再学习上面的流程。最后这个流程将处理好的单个训练数据整理成训练数据 batch,这些batch就可以作为神经网络的输入。

准备知识:多线程的简单介绍

  在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程。线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于CPU),而一条流水线必须属于一个车间,一个车间就是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是CPU上的执行单位。

  多线程(即多个控制线程)的概念就是:在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。比如成都地铁和西安地铁是不同的进程,而成都地铁3号线是一个线程,成都地铁所有的线程共享成都所有的资源,比如成都所有的乘客可以被所有线拉。

  开启多线程的方式:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import time

import random

from?threading import Thread

def study(name):

????print("%s is learning"?% name)

????time.sleep(random.randint(1, 3))

????print("%s is playing "?% name)

if?__name__ ==?'__main__':

????t = Thread(target=study, args=('james', ))

????t.start()

????print("主线程开始运行")

'''

结果展示:

james?is?learning

主线程开始运行

james?is?playing

'''

    t.start() 将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程。

1,队列与多线程

  在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改他们的状态。对于变量,可以通过赋值操作修改变量的取值。对于队列,修改队列状态的操作主要有Enqueue,EnqueueMany和Dequeue。下面程序展示了如何使用这些函数来操作一个队列。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 创建一个先进先出的队列,指定队列中最多可以保存两个元素,并指定类型为整数

q = tf.FIFOQueue(2,?'int32')

# 使用enqueue_many 函数来初始化队列中的元素。

# 和变量初始化类似,在使用队列之前需要明确的调用这个初始化过程

init = q.enqueue_many(([0, 10], ))

# 使用Dequeue 函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中

x = q.dequeue()

# 将得到的值加1

y = x + 1

# 将加 1 后的值在重新加入队列

q_inc = q.enqueue([y])

with tf.Session()?as?sess:

????# 运行初始化队列的操作

????init.run()

????for?_?in?range(6):

????????#运行q_inc 将执行数据出队列,出队的元素 +1 ,重新加入队列的整个过程

????????v, _ = sess.run([x, q_inc])

????????# 打印出队元素的取值

????????print('%s'%v)

'''

队列开始有[0, 10] 两个元素,第一个出队的为0, 加1之后为[10, 1]

第二次出队的为10, 加1之后入队的为11, 得到的队列为[1, 11]

以此类推,最后得到的输出为:

0

10

1

11

2

'''

  TensorFlow中提供了FIFOQueue 和 RandomShuffleQueue 两种队列。在上面的程序中,已经展示了如何使用FIFOQueue,它的实现的一个先进先出队列。?RandomShuffleQueue 会将队列中的元素打乱,每次出队操作得到的是从当前队列所有元素中随机选择的一个。在训练审计网络时希望每次使用的训练数据尽量随机。?RandomShuffleQueue 就提供了这样的功能。

  在TensorFlow中,队列不仅仅是一种数据结构,还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。在后面我们会学习TensorFlow是如何利用队列来实现多线程输入数据处理的。

  TensorFlow提供了 tf.Coordinator 和 tf.QueueRunner 两个类来完成多线程协同的功能。tf.Coordinator 主要用于协同多个线程一起停止,并提供了 should_stop, request_stop 和 join 三个函数。在启动线程之前,需要先声明一个 tf.Coordinator 类,并将这个类传入每一个创建的线程中。启动的线程需要一直查询 tf.Coordinator 类中提供的 should_stop 函数,当这个函数的返回值为 True时,则当前线程也需要退出。每一个启动的线程都可以通过调用 request_stop 函数来通知其他线程退出。当某一个线程调用 ?request_stop 函数之后, should_stop 函数的返回值将被设置为 TRUE,这样其他的线程就可以同时终止了。下面程序展示了如何使用 tf.Coordinator。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

#_*_coding:utf-8_*_

import tensorflow?as?tf

import numpy?as?np

import threading

import time

# 线程中运行的程序,这个程序每隔1秒判断是否停止并打印自己的ID

def MyLoop(coord, worker_id):

????# 使用 tf.Coordinator 类提供的协同工具判断当前是否需要停止

????while?not coord.should_stop():

????????# 随机停止所有的线程

????????if?np.random.rand() < 0.1:

????????????print("Stopping from id: %d\n"?% worker_id)

????????????# 调用 coord.request_stop() 函数来通知其他线程停止

????????????coord.request_stop()

????????else:

????????????# 打印当前线程的 ID

????????????print("Working on id: %d\n"?% worker_id)

????????# 暂停1 s

????????time.sleep(1)

# 声明一个? tf.train.Coordinator 类来协同多个线程

coord = tf.train.Coordinator()

# 声明创建 5 个线程

threads = [

????threading.Thread(target=MyLoop, args=(coord, i, ))?for?i?in?range(5)

]

# 启动所有的线程

for?t?in?threads:

????t.start()

# 等待所有线程退出

coord.join(threads)

'''

Working?on?id: 0

Working?on?id: 1

Working?on?id: 2

Working?on?id: 3

Working?on?id: 4

Working?on?id: 0

Working?on?id: 1

Working?on?id: 3

Working?on?id: 2

Working?on?id: 4

Working?on?id: 0

Working?on?id: 2

Working?on?id: 1

Working?on?id: 3

Working?on?id: 4

Working?on?id: 2

Working?on?id: 1

Working?on?id: 0

Working?on?id: 3

Working?on?id: 4

Working?on?id: 3

Working?on?id: 0

Working?on?id: 1

Working?on?id: 2

Working?on?id: 4

Working?on?id: 1

Stopping?from?id: 0

'''

  当所有线程启动之后,每个线程会打印各自的ID,于是前面4行打印出了他们的ID。然后在暂停1秒之后,所有的线程又开始第二遍打印ID。在这个时候有一个线程推出的条件达到,于是调用了coord.request_stop 函数来停止所有其他的线程。然而在打印Stoping_from_id:4之后,可以看到有线程仍然在输出。这是因为这些线程已经执行完 coord.should_stop 的判断,于是仍然会继续输出自己的ID。但在下一轮判断是否需要停止时将推出线程。于是在打印一次ID之后就不会再有输出了。

  tf.QueueRunner 主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的 tf.Coordinator 类来统一管理,下面代码展示了如何使用 tf.QueueRunner 和 tf.Coordinator 来管理多线程队列操作。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 声明一个先进先出的队列,队列中最多100个元素,类型为实数

queue = tf.FIFOQueue(100,?'float')

# 定义队列的入队操作

enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用 tf.train.QueueRunner 来创建多个线程运行队列的入队操作

# tf.train.QueueRunner 的第一个参数给出了被操作的队列

# [enqueue_op] * 5 表示了需要启动5个线程,每个线程运行的是equeue_op操作

qr = tf.train.QueueRunner(queue, [enqueue_op]*5)

# 将定义过的 QueueRunner 加入 TensorFlow计算图上指定的集合

# tf.train.add_queue_runner 函数没有指定集合

# 则加入默认集合 tf.GraphKeys.QUEUE_RUNNERS

# 下面的函数就是讲刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS集合

tf.train.add_queue_runner(qr)

# 定义出队操作

out_tensor = queue.dequeue()

with tf.Session()?as?sess:

????# 使用 tf.train.coordinator 来协同启动的线程

????coord = tf.train.Coordinator()

????# 使用tf.train.QueueRunner时,需要明确调用 tf.train.start_queue_runnsers来启动所有线程

????# 否则因为没有线程运行入队操作,当调用出队操作时,程序会一直等待入队操作被运行。

????# tf.train.start_queue_runners 函数会默认启动 tf.GraphKeys.QUEUE_RUNNERS集合

????# 所说的 tf.train.add_queue_runner 函数和 tf.train.start_queue_runners 函数会指定同一个集合

????threads = tf.train.start_queue_runners(sess=sess, coord=coord)

????# 获取队列中的取值

????for?_?in?range(3):

????????print(sess.run(out_tensor)[0])

????# s使用 tf.train.Coordinator 来停止所有的线程

????coord.request_stop()

????coord.join(threads)

'''

-0.88587755

-0.6659831

-2.9722364

'''

  

输入文件队列

  下面将学习如何使用TensorFlow中的队列管理输入文件列表。这里假设所有的输入数据都已经整理成了TFRecord 格式。虽然一个 TFRecord 文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个 TFRecord 文件来提高处理效率。 TensorFlow 提供了 tf.train.match_filenames_once 函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过 tf.train.string_input_producer 函数进行有效的管理。

  tf.train.string_input_producer 函数会使用初始化时提供的文件列表创建一个输入队列,输入对垒中原始的元素为文件列表中的所有文件。如上面的代码所示,创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否已有打开的文件可读,如果没有或者打开的文件以及读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。

  通过设置 shuffle 参数,tf.train.string_input_producer 函数支持随机打乱文件列表中文件出队的顺序。当 shuffle 参数为 TRUE时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会泡在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer 函数生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀的分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。

  当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer 函数可以设置 num_epochs 参数来限制加载初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报 OutOfRange 的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将 num_epochs 参数设置为1,这样在计算完一轮之后程序将自动停止。在展示 ?tf.train.match_filenames_once 和 tf.train.string_input_producer 函数的使用方法之前,我们可以先给出一个简单的程序来生成数据。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 创建TFReocrd文件的帮助函数

def _int64_feature(value):

????return?tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 模拟海量数据情况下降数据写入不同的文件,num_shards 定义了总共写入多少文件

# instances_per_shard 定义了每个文件中有多少个数据

num_shards = 2

instances_per_shard = 2

for?i?in?range(num_shards):

????# 将数据分为多个文件时,可以将不同文件以类似0000n-of-0000m 的后缀区分

????# 其中m表示了数据总共被存在了多少个文件中,n表示当前文件的编号

????# 式样的方式既方便了通过正则表达式获取文件列表,又在文件名中加入了更多的信息

????filename = ('data.tfrecords-%.5d-of-%.5d'?% (i, num_shards))

????writer = tf.python_io.TFRecordWriter(filename)

????# 将数据封装成Example结构并写入 TFRecord 文件

????for?j?in?range(instances_per_shard):

????????# Example 结构仅包含当前样例属于第几个文件以及时当前文件的第几个样本

????????example = tf.train.Example(features=tf.train.Features(

????????????feature={

????????????????'i': _int64_feature(i),

????????????????'j': _int64_feature(j)

????????????}

????????))

????????writer.write(example.SerializeToString())

????writer.close()

  程序运行之后,在指定的目录下生产两个文件,每一个文件中存储了两个样例,在生成了样例数据之后,下面代码展示了 tf.train.match_filenames_once 函数 和 tf.train.string_input_producer 函数的使用方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 使用tf.train.match_filenames_once 函数获取文件列表

files = tf.train.match_filenames_once('path/data.tfrecords-*')

# print(files)

# 输入队列中的文件列表为 tf.train.match_filenames_once 函数获取的文件列表

# 这里将 shuffle参数设置为FALSE来避免随机打乱读文件的顺序

# 但是一般在解决真实问题,会将shuffle参数设置为TRUE

filename_queue = tf.train.string_input_producer(files, shuffle=False)

# print(filename_queue)

# 读取并解析一个样本

reader = tf.TFRecordReader()

_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(

????serialized_example,

????features={

????????'i': tf.FixedLenFeature([], tf.int64),

????????'j': tf.FixedLenFeature([], tf.int64),

????}

)

with tf.Session()?as?sess:

????# 虽然在本段程序中没有声明任何变量

????# 但在使用 tf.train.match_filenames_once 函数时需要初始化一些变量

????# init = tf.global_variables_initializer()

????# init = tf.initialize_all_variables()

????init = tf.local_variables_initializer()

????sess.run(init)

????# sess.run(files)

????# sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])

????print(sess.run(files))

????# 声明 tf.train.Coordinator 类来协同不同线程,并启动线程

????coord = tf.train.Coordinator()

????threads = tf.train.start_queue_runners(sess=sess, coord=coord)

????# 多次执行获取数据的操作

????for?i?in?range(6):

????????print(sess.run([features['i'], features['j']]))

????coord.request_stop()

????coord.join(threads)

  打印结果如下:

1

2

3

4

5

6

7

8

[b'path\\data.tfrecords-00000-of-00002'

?b'path\\data.tfrecords-00001-of-00002']

[0, 0]

[0, 1]

[1, 0]

[1, 1]

[0, 0]

[0, 1]

  在不打乱文件列表的情况下,会依次独处样例数据中的每一个样例。而且当所有样例都被读完之后,程序会自动从头开始。如果限制 num_epochs=1,那么程序会报错。

组合训练数据(batching)

  在上面,我们已经学习了如何从文件列表中读取单个样例,将这些单个样例通过预处理方法进行处理,就可以得到提高给神经网络输入层的训练数据了。在之前学习过,将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后,还需要将他们组织成batch,然后再提供给审计网络的输入层。TensorFlow提供了 tf.train.batch 和 tf.train.shuffle_batch 函数来将单个的样例组织成 batch 的形式输出。这两个函数都会生成一个队列,队列的入队操作时生成单个样例的方法,而每次出队得到的时一个batch的样例。他们唯一的区别自安于是否会将数据顺序打乱。下面代码展示了这两个函数的使用方法。

?  下面代码展示了 tf.train.batch函数的用法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 读取解析得到样例,这里假设Example结构中 i表示一个样例的特征向量

# 比如一张图像的像素矩阵,而j表示该样例对应的标签

# 使用tf.train.match_filenames_once 函数获取文件列表

files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 输入队列中的文件列表为 tf.train.match_filenames_once 函数获取的文件列表

# 这里将 shuffle参数设置为FALSE来避免随机打乱读文件的顺序

# 但是一般在解决真实问题,会将shuffle参数设置为TRUE

filename_queue = tf.train.string_input_producer(files, shuffle=False)

# print(filename_queue)

# 读取并解析一个样本

reader = tf.TFRecordReader()

_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(

????serialized_example,

????features={

????????'i': tf.FixedLenFeature([], tf.int64),

????????'j': tf.FixedLenFeature([], tf.int64),

????}

)

example, label = features['i'], features['j']

# 一个 batch 中样例的个数

batch_size = 2

# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大,

# 那么需要占用很多内存资源,如果太小,那么出队操作可能会因为

# 没有数据而被阻碍(block),从而导致训练效率降低,一般来说

# 这个队列的大小会和每一个batch的大小相关,下面代码给出了设置

# 队列大小的一种方式。

capacity = 1000 + 3 * batch_size

# 使用 tf.train.batch 函数来组合样例。[example, label] 参数给

# 出了需要组合的元素,一般 example 和 label分别代表训练样本和这个样本

# 对应的正确标签。batch_size 参数给出了每个batch中样例的个数。

# capacity 给出了队列的最大容量。当队列长度等于容量时,TensorFlow将暂停

# 入队操作,而只是等待元素出队。当元素个数小于容量时,TensorFlow将自动重新启动入队操作

example_batch, label_batch = tf.train.batch(

????[example, label], batch_size=batch_size, capacity=capacity

)

with tf.Session()?as?sess:

????tf.global_variables_initializer().run()

????tf.local_variables_initializer().run()

????coord = tf.train.Coordinator()

????threads = tf.train.start_queue_runners(sess=sess, coord=coord)

????# 获取并打印组合之后的样例,在真实问题中,这个输出一般会作为神经网络的输入

????for?i?in?range(3):

????????cur_example_batch, cur_label_batch = sess.run(

????????????[example_batch, label_batch]

????????)

????????print(cur_example_batch, cur_label_batch)

????coord.request_stop()

????coord.join(threads)

'''

运行上面的程式会得到下面的输出:

[0 0] [0 1]

[1 1] [0 1]

[0 0] [0 1]

从这个输出可以看到 tf.train.batch函数可以将单个的数据组织成3个一组的batch

在 example, lable 中读取的数据依次为:

example:0? label:0

example:0? label:1

example:1? label:1

example:0? label:1

example:0? label:0

example:0? label:1

????这是因为 tf.train.batch 函数不会随机打乱顺序,所以在组合之后得到的数据

????组成了上面给出的输出。

'''

  下面代码展示了 tf.train.shuffle_batch 函数的使用方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

import tensorflow?as?tf

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 读取解析得到样例,这里假设Example结构中 i表示一个样例的特征向量

# 比如一张图像的像素矩阵,而j表示该样例对应的标签

# 使用tf.train.match_filenames_once 函数获取文件列表

files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 输入队列中的文件列表为 tf.train.match_filenames_once 函数获取的文件列表

# 这里将 shuffle参数设置为FALSE来避免随机打乱读文件的顺序

# 但是一般在解决真实问题,会将shuffle参数设置为TRUE

filename_queue = tf.train.string_input_producer(files, shuffle=False)

# print(filename_queue)

# 读取并解析一个样本

reader = tf.TFRecordReader()

_, serialized_example = reader.read(filename_queue)

features = tf.parse_single_example(

????serialized_example,

????features={

????????'i': tf.FixedLenFeature([], tf.int64),

????????'j': tf.FixedLenFeature([], tf.int64),

????}

)

example, label = features['i'], features['j']

# 一个 batch 中样例的个数

batch_size = 2

# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大,

# 那么需要占用很多内存资源,如果太小,那么出队操作可能会因为

# 没有数据而被阻碍(block),从而导致训练效率降低,一般来说

# 这个队列的大小会和每一个batch的大小相关,下面代码给出了设置

# 队列大小的一种方式。

capacity = 1000 + 3 * batch_size

# 使用 tf.train.shuffle_batch 函数来组合样例。[example, label] 参数给

# 出了需要组合的元素,一般 example 和 label分别代表训练样本和这个样本

# 对应的正确标签。batch_size 参数给出了每个batch中样例的个数。

# capacity 给出了队列的最大容量。min_after_dequeue参数是

# tf.train.shuffle_batch 特有的。min_after_dequeue参数限制了出队时

# 队列中元素的最小个数,当队列中元素太小时,随机打乱样例的顺序作用就不大了

# 所以 tf.train.shuffle_batch 函数提供了限制出队时的最小元素的个数来保证

# 随机打乱顺序的作用。当出队函数被调用但是队列中元素不够时,出队操作将等待更多

# 的元素入队才会完成。如果min_after_dequeue参数被设定,capacity也应该来调整

example_batch, label_batch = tf.train.shuffle_batch(

????[example, label], batch_size=batch_size, capacity=capacity,

????min_after_dequeue=30

)

with tf.Session()?as?sess:

????tf.global_variables_initializer().run()

????tf.local_variables_initializer().run()

????coord = tf.train.Coordinator()

????threads = tf.train.start_queue_runners(sess=sess, coord=coord)

????# 获取并打印组合之后的样例,在真实问题中,这个输出一般会作为神经网络的输入

????for?i?in?range(3):

????????cur_example_batch, cur_label_batch = sess.run(

????????????[example_batch, label_batch]

????????)

????????print(cur_example_batch, cur_label_batch)

????coord.request_stop()

????coord.join(threads)

'''

运行上面的程式会得到下面的输出:

[0 1] [0 0]

[1 0] [0 0]

[1 0] [0 1]

从这个输出可以看到 tf.train.shuffle_batch函数已经将样例顺序打乱了

'''

  tf.train.batch 函数 和 tf.train.shuffle_batch 函数除了将单个训练数据整理成输入 batch,也提供了并行化处理输入数据的方法。tf.train.batch 函数 和 tf.train.shuffle_batch 函数并行化的方式一样,所以我们执行应用更多的 tf.train.shuffle_batch 函数为例。通过设置tf.train.shuffle_batch 函数中的 num_threads参数,可以指定多个线程同时执行入队操作。tf.train.shuffle_batch 函数的入队操作就是数据读取以及预处理的过程。当 num_threads 参数大于1时,多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时,可以使用tf.train.shuffle_batch_join 函数。此函数会从输入文件队列中获取不同的文件分配给不同的线程。一般来说,输入文件队列时通过 tf.train.string_input_producer 函数生成的。这个函数会分均分配文件以保证不同文件中的数据会被尽量平均地使用。

  tf.train.shuffle_batch 函数 和 tf.train.shuffle_batch_join 函数都可以完成多线程并行的方式来进行数据预处理,但是他们各有优劣。对于tf.train.shuffle_batch 函数,不同线程会读取同一个文件。如果一个文件中的样例比较相似(比如都属于同一个类别),那么神经网络的训练效果有可能会受到影响。所以在使用 tf.train.shuffle_batch 函数时,需要尽量将同一个TFRecord 文件中的样例随机打乱。而是用 tf.train.shuffle_batch_join 函数时,不同线程会读取不同文件。如果读取数据的线程数比总文件数还大,那么多个线程可能会读取同一个文件中相近部分的数据。而却多个线程读取多个文件可能导致过多的硬盘寻址,从而使得读取的效率降低。不同的并行化方式各有所长。具体采用哪一种方法需要根据具体情况来确定。

输入数据处理框架

  前面已经学习了开始给出的流程图中的所有步骤,下面将这些步骤串成一个完成的TensorFlow来处理输入数据,下面代码给出了这个步骤:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

#_*_coding:utf-8_*_

import tensorflow?as?tf

# 创建文件队列,并通过文件列表创建输入文件队列

# 需要统一所有原始数据的格式并将他们存储到TFRecord文件中

# 下面给出的文件列表应该包含所有提供训练数据的TFRecord文件

files = tf.train.match_filenames_once('path/output.tfrecords')

filename_queue = tf.train.string_input_producer([files])

# 解析TFRecord文件中的数据,这里假设image中存储的时图像的原始数据

# label为该样例所对应的标签。height,width 和 channels 给出了图片的维度

reader = tf.TFRecordReader()

_, serialized_example = reader.read(filename_queue)

# 用FixedLenFeature 将读入的Example解析成 tensor

features = tf.parse_single_example(

????serialized_example,

????features={

????????'image_raw': tf.FixedLenFeature([], tf.string),

????????'pixels': tf.FixedLenFeature([], tf.int64),

????????'label': tf.FixedLenFeature([], tf.int64)

????}

)

# 从原始图像数据解析出像素矩阵,并根据图像尺寸还原图像

decoded_images = tf.decode_raw(features['image_raw'], tf.uint8)

labels = tf.cast(features['label'], tf.int32)

pixels = tf.cast(features['pixels'], tf.int32)

retyped_images = tf.cast(decoded_images, tf.float32)

images = tf.reshape(retyped_images, [784])

# 将处理后的图像和标签数据通过 tf.train.shuffle_batch 整理成

# 神经网络训练训练时需要的batch

# 将文件以100个为一组打包

min_after_dequeue = 10000

batch_size = 100

capacity = min_after_dequeue + 3 * batch_size

image_batch, label_batch = tf.train.shuffle_batch([images, labels],

??????????????????????????????????????????????????batch_size=batch_size,

??????????????????????????????????????????????????capacity=capacity,

??????????????????????????????????????????????????min_after_dequeue=min_after_dequeue)

# 训练模型 计算审计网络的前向传播结果

def inference(input_tensor, weights1, biases1, weights2, biases2):

????# 引入激活函数让每一层去线性化 tf.nn.relu()

????layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)

????return?tf.matmul(layer1, weights2) + biases2

# 模型相关的参数

INPUT_NODE = 784

OUTPUT_NODE = 10

LAYER1_NODE = 500

REGULARAZTION_RATE = 0.0001

TREINING_STEPS = 5000

# 生成隐藏层的参数

weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))

biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))

# 生成输出层的参数

weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))

biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))

y = inference(image_batch, weights1, biases1, weights2, biases2)

# 计算交叉熵及其平均值(对于分类问题,通常将交叉熵与softmax回归一起使用

cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y,

???????????????????????????????????????????????????????????????labels=label_batch)

cross_entropy_mean = tf.reduce_mean(cross_entropy)

# 损失函数的计算

regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)

# 计算模型的正则化损失,一般只计算神经网络边上的权重的正则化损失,而不是用偏置项

regularization = regularizer(weights1) + regularizer(weights2)

# 总损失等于交叉熵损失和正则化损失的和

loss = cross_entropy_mean + regularization

# 优化损失函数

# 一般优化器的目的是优化权重W和偏差 biases,最小化损失函数的结果

train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 初始化会话,并开始训练过程

with tf.Session()?as?sess:

????# 由于使用了Coordinator,必须对local 和 global 变量进行初始化

????sess.run(tf.local_variables_initializer())

????sess.run(tf.global_variables_initializer())

????coord = tf.train.Coordinator()

????threads = tf.train.start_queue_runners(sess=sess, coord=coord)

????# 循环的训练神经网络

????for?i?in?range(TREINING_STEPS):

????????if?i %1000 == 0:

????????????print("After %d training step(s), loss is %g "?% (i, sess.run(loss)))

????????sess.run(train_step)

????coord.request_stop()

????coord.join(threads)

  下面代码是生成TFRecord文件的(数据是MNIST数据)代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

#_*_coding:utf-8_*_

import tensorflow?as?tf

from?tensorflow.examples.tutorials.mnist import input_data

import numpy?as?np

# 生成整数型的属性

def _int64_feature(value):

????return?tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 生成字符串型的属性

def _bytes_feature(value):

????return?tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

mnist = input_data.read_data_sets(

????'data', dtype=tf.uint8, one_hot=True

)

images = mnist.train.images

# 训练数据所对应的正确答案,可以作为一个属性保存在TFRecord中

labels = mnist.train.labels

# 训练数据的图像分辨率,这可以作为Example中的一个属性

pixels = images.shape[1]

num_examples = mnist.train.num_examples

# 输出TFRecord 文件的地址

filename =?'path/output.tfrecords'

# 创建一个writer来写TFRecord 文件

writer = tf.python_io.TFRecordWriter(filename)

for?index?in?range(num_examples):

????# 将图像矩阵转化为一个字符串

????image_raw = images[index].tostring()

????# 将一个样例转化为 Example Protocol Buffer,并将所有的信息写入这个数据结构

????example = tf.train.Example(

????????features=tf.train.Features(

????????????feature={

????????????????'pixels': _int64_feature(pixels),

????????????????'labels': _int64_feature(np.argmax(labels[index])),

????????????????'image_raw': _bytes_feature(image_raw)

????????????}

????????))

????# 将一个Example写入 TFRecord文件

????writer.write(example.SerializeToString())

writer.close()

  上面代码给出了从输入数据处理的整个流程。(但是程序可能会报错,我们这里主要学习思路)。从下图中可以看出,输入数据处理的第一步是为获取存储训练数据的文件列表。下图的文件列表为{A, B, C}.通过 tf.train.string_input_producer 函数可以选择性地将文件列表中文件的顺序打乱,并加入输入队列。因为是否打乱文件的顺序是可选的,所以在图中是虚线的。tf.train.string_input_producer 函数会生成并维护一个输入文件队列,不同线程中的文件读取函数可以共享这个输入文件队列。在读取样例数据之后,需要将图像进行预处理。图像预处理的过程也会通过tf.train.shuffle_batch 提供的机制并行地跑在多个线程中。输入数据处理流程的最后通过 tf.train.shuffle_batch 函数将处理好的单个样例整理成 batch 提供给神经网络的输入层。通过这种方式,可以有效地提高数据预处理的效率,避免数据预处理成为神经网络模型性能过程中的性能瓶颈。

?TensorFlow 数据读取机制主要是两种方法:

  • (1)使用文件队列方法,如使用 slice_input_producer 和 string_input_producer;这种方法既可以将数据转存为 TFRecord数据格式,也可以直接读取文件图片数据,当然转存为 TFRecord 数据格式进行读取会更高效点。而这两者之间的区别就是前者是输入 tensor_list ,因此可以将多个list组合成一个 tensorlist 作为输入;而后者只能是一个 string_tensor了。
  • (2)使用TensorFlow 1.4版本后出现的 tf.data.DataSet 的数据读取机制(pipeline机制),这是TensorFlow强烈推荐的方式,是一种更高效的读取方式。使用 tf.data.Dataset 模块的pipeline机制,可以实现 CPU 多线程处理输入的数据,如读取图片和图片的一些预处理,这样 GPU就可以专注于训练过程,而CPU去准备数据。

  举例如下:

1

2

3

4

5

6

7

8

9

10

image_dir ='path/to/image_dir/*.jpg'

image_list = glob.glob(image_dir)

label_list=...

image_list = tf.convert_to_tensor(image_list, dtype=tf.string)

# 可以将image_list,label_list多个list组合成一个tensor_list

image_que, label_que = tf.train.slice_input_producer([image_list,label_list], num_epochs=1)

# 只能时string_tensor,所以不能组合多个list

image = tf.train.string_input_producer(image_list, num_epochs=1)

  

tf.train.slice_input_produce() 函数的用法

  这个函数的作用就是从输入的 tensor_list 按要求抽取一个 tensor 放入文件名队列,下面学习各个参数:

1

2

tf.slice_input_producer(tensor_list, num_epochs=None, shuffle=True,

?????????????????????????seed=None,capacity=32, shared_name=None, name=None)

  说明:

  • tensor_list 这个就是输入,格式为tensor的列表;一般为[data, label],即由特征和标签组成的数据集
  • num_epochs 这个是你抽取batch的次数,如果没有给定值,那么将会抽取无数次batch(这会导致你训练过程停不下来),如果给定值,那么在到达次数之后就会报OutOfRange的错误
  • shuffle 是否随机打乱,如果为False,batch是按顺序抽取;如果为True,batch是随机抽取
  • seed 随机种子
  • capcity 队列容量的大小,为整数
  • name 名称

  举个例子:我们的数据data的 shape是(4000,10),label的shape是(4000, 2),运行下面这行代码:

1

2

input_queue = tf.train.slice_input_producer([data, label],

???????????????????????????????????num_epochs=1, shuffle=True, capacity=32 )

  结果肯定是返回值包含两组数据的 list,每个list的shape和输入的data和label的shape对应。

batch_size 的设置与影响

1,batch_size 的含义

  batch_size 可以理解为批处理参数,它的极限值为训练集样本总数,当数据量比较小时,可以将batch_size 值设置为全数据集(Full batch cearning)。实际上,在深度学习中所涉及到的数据都是比较多的,一般都采用小批量数据处理原则。

2,关于小批量训练网络的优缺点

小批量训练网络的优点:

  • 相对海量的的数据集和内存容量,小批量处理需要更少的内存就可以训练网络。
  • 通常小批量训练网络速度更快,例如我们将一个大样本分成11小样本(每个样本100个数据),采用小批量训练网络时,每次传播后更新权重,就传播了11批,在每批次后我们均更新了网络的(权重)参数;如果在传播过程中使用了一个大样本,我们只会对训练网络的权重参数进行1次更新。
  • 全数据集确定的方向能够更好地代表样本总体,从而能够更准确地朝着极值所在的方向;但是不同权值的梯度值差别较大,因此选取一个全局的学习率很困难。

小批量训练网络的缺点:

  • 批次越小,梯度的估值就越不准确,在下图中,我们可以看到,与完整批次渐变(蓝色)方向相比,小批量渐变(绿色)的方向波动更大。
  • 极端特例batch_size = 1,也成为在线学习(online learning);线性神经元在均方误差代价函数的错误面是一个抛物面,横截面是椭圆,对于多层神经元、非线性网络,在局部依然近似是抛物面,使用online learning,每次修正方向以各自样本的梯度方向修正,这就造成了波动较大,难以达到收敛效果。
3,为什么需要 batch_size 的参数

  Batch 的选择,首先决定的时下降的方向。如果数据集比较小,完全可以采用全数据集(Full ?Batch Learning)的形式,这样做有如下好处:

  • 全数据集确定的方向能够更好的代表样本总体,从而更准确地朝着极值所在的方向
  • 由于不同权值的梯度差别较大,因此选取一个全局的学习率很困难

  Full ?Batch Learning 可以使用 Rprop 只基于梯度符号并且针对性单独更新各权值。但是对于非常大的数据集,上述两个好处变成了两个坏处:

  • 随着数据集的海量增加和内存限制,一次载入所有数据不现实
  • 以Rprop的方式迭代,会由于各个 batch之间的采样差异性,各次梯度修正值相互抵消,无法修正。这才有了后来的RMSprop的妥协方案。
4,选择适中的 batch_size

  可不可以选择一个适中的Batch_size 值呢?当然可以,就是批梯度下降法(Mini-batches Learning)。因为如果数据集足够充分,那么用一半(甚至少得多)的数据训练算出来的梯度与用全部数据训练出来的梯度是几乎一样的。

在合理的范围内,增大Batch_size 有什么好处?

  1. 内存利用率提高了,大矩阵乘法的并行化效率提高
  2. 跑完一次epoch(全数据集)所需要的迭代次数减少,对于相同数据量的处理速度进一步加快。
  3. 在一定范围内,一般来说Batch_Size 越大,其确定的下降方向越准,引起训练震荡越小。

盲目增大Batch_size 有什么坏处?

内存利用率提高了,但是内存容量可能撑不住了

跑完一次epoch(全数据集)所需要的迭代次数减少,要想达到相同的精度,其所花费的时间大大的增加了,从而对参数的修正也就显得更加缓慢。

Batch_size 增大到一定程度,其确定的下降方向已经基本不再变化。

5,调节Batch_Size 对训练效果影响到底如何?

  这里有一个LeNet 在MNIST 数据集上的效果。MNIST 是一个手写体标准库。

  运行结果如上图所示,其中绝对时间做了标准化处理。运行结果与上文分析相印证:

  1. Batch_Size 太小,算法在200 epochs 内不收敛。
  2. 随着Batch_Size 增大,处理相同数据量的速度越快。
  3. 随着Batch_Size 增大,达到相同精度所需要的epoch 数量越来越多
  4. 由于上述两种因素的矛盾,Batch_Size 增大到某个时候,达到时间上的最优
  5. 由于最终收敛精度会陷入不同的局部极值,因此Batch_Size 增大到某些时候,达到最终收敛精度上的最优
文章来源:https://blog.csdn.net/2301_81887304/article/details/135222358
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。