Skip to content

Commit

Permalink
Move input reading epoch counter into local variables.
Browse files Browse the repository at this point in the history
Fixes the issue:
"read_batch_features/file_name_queue/limit_epochs/epochs" not found in checkpoint.
Change: 124300678
  • Loading branch information
ilblackdragon authored and tensorflower-gardener committed Jun 7, 2016
1 parent 15c4b15 commit 4c85a95
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 9 deletions.
4 changes: 2 additions & 2 deletions tensorflow/contrib/learn/python/learn/io/graph_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def read_batch_features(file_pattern, batch_size, features, reader,
num_epochs: Integer specifying the number of times to read through the
dataset. If None, cycles through the dataset forever. NOTE - If specified,
creates a variable that must be initialized, so call
tf.initialize_all_variables() as shown in the tests.
tf.initialize_local_variables() as shown in the tests.
queue_capacity: Capacity for input queue.
reader_num_threads: The number of threads to read examples.
parser_num_threads: The number of threads to parse examples.
Expand Down Expand Up @@ -207,7 +207,7 @@ def read_batch_record_features(file_pattern, batch_size, features,
num_epochs: Integer specifying the number of times to read through the
dataset. If None, cycles through the dataset forever. NOTE - If specified,
creates a variable that must be initialized, so call
tf.initialize_all_variables() as shown in the tests.
tf.initialize_local_variables() as shown in the tests.
queue_capacity: Capacity for input queue.
reader_num_threads: The number of threads to read examples.
parser_num_threads: The number of threads to parse examples.
Expand Down
2 changes: 1 addition & 1 deletion tensorflow/contrib/learn/python/learn/io/graph_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_read_csv(self):
filename, batch_size,
reader=tf.TextLineReader, randomize_input=False,
num_epochs=1, queue_capacity=queue_capacity, name=name)
session.run(tf.initialize_all_variables())
session.run(tf.initialize_local_variables())

coord = tf.train.Coordinator()
tf.train.start_queue_runners(session, coord=coord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def testOutOfRangeError(self):

key, value = slim.parallel_reader.single_pass_read(
tfrecord_path, reader_class=tf.TFRecordReader)
init_op = tf.initialize_all_variables()
init_op = tf.initialize_local_variables()

with self.test_session() as sess:
sess.run(init_op)
Expand All @@ -142,7 +142,7 @@ def testTFRecordReader(self):

key, value = slim.parallel_reader.single_pass_read(
tfrecord_path, reader_class=tf.TFRecordReader)
init_op = tf.initialize_all_variables()
init_op = tf.initialize_local_variables()

with self.test_session() as sess:
sess.run(init_op)
Expand Down
4 changes: 3 additions & 1 deletion tensorflow/python/training/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def limit_epochs(tensor, num_epochs=None, name=None):
raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
with ops.op_scope([tensor], name, "limit_epochs") as name:
zero64 = constant_op.constant(0, dtype=dtypes.int64)
epochs = variables.Variable(zero64, name="epochs", trainable=False)
epochs = variables.Variable(
zero64, name="epochs", trainable=False,
collections=ops.GraphKeys.LOCAL_VARIABLES)
counter = epochs.count_up_to(num_epochs)
with ops.control_dependencies([counter]):
return array_ops.identity(tensor, name=name)
Expand Down
27 changes: 24 additions & 3 deletions tensorflow/python/training/input_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from __future__ import division
from __future__ import print_function

import os
import itertools
import os

import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
Expand All @@ -44,6 +45,7 @@ def test(self):
os.path.join(self.get_temp_dir(), "match_filenames.?"))
one = tf.train.match_filenames_once(additional[1])
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
self.assertItemsEqual(map(tf.compat.as_bytes, filenames), star.eval())
self.assertItemsEqual(map(tf.compat.as_bytes, additional),
question.eval())
Expand All @@ -56,15 +58,16 @@ def testNoLimit(self):
with self.test_session():
seven = tf.constant(7)
seven_forever = tf.train.limit_epochs(seven)
tf.initialize_all_variables().run()
for i in range(100):
tf.initialize_local_variables().run()
for _ in range(100):
self.assertEqual(7, seven_forever.eval())

def testLimit(self):
with self.test_session():
love_me = tf.constant("Love Me")
love_me_two_times = tf.train.limit_epochs(love_me, num_epochs=2)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
self.assertEqual(b"Love Me", love_me_two_times.eval())
self.assertEqual(b"Love Me", love_me_two_times.eval())
with self.assertRaises(tf.errors.OutOfRangeError):
Expand All @@ -84,6 +87,7 @@ def testNoShuffle(self):
dequeue_many = queue.dequeue_many(len(input_tensor) * num_epochs)
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# No randomness, so just see repeated copies of the input.
Expand All @@ -108,6 +112,7 @@ def testNoShapeInference(self):
dequeue_many = queue.dequeue_many(len(input_value) * num_epochs)
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# No randomness, so just see repeated copies of the input.
Expand Down Expand Up @@ -136,6 +141,7 @@ def testNoShuffle(self):
dequeue_many = queue.dequeue_many(len(strings) * num_epochs)
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# No randomness, so just see repeated copies of the input.
Expand All @@ -157,6 +163,7 @@ def testShuffle(self):
dequeue_many = queue.dequeue_many(len(strings))
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Validate that we only shuffle the strings within an epoch and
Expand Down Expand Up @@ -201,6 +208,7 @@ def testNullString(self):
queue = tf.train.string_input_producer(tf.constant([], dtype=tf.string))
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners(coord=coord)
with self.assertRaises(tf.errors.OutOfRangeError):
dequeue.eval()
Expand Down Expand Up @@ -229,6 +237,7 @@ def testNoShuffle(self):
dequeue_many = queue.dequeue_many(range_size * num_epochs)
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# No randomness, so just see repeated copies of the input.
Expand All @@ -250,6 +259,7 @@ def testShuffle(self):
dequeue_many = queue.dequeue_many(range_size)
dequeue = queue.dequeue()
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Validate that we only shuffle the integers within an epoch and
Expand Down Expand Up @@ -299,6 +309,7 @@ def testNoShuffle(self):
slices = tf.train.slice_input_producer(
[source_strings, source_ints], num_epochs=num_epochs, shuffle=False)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# No randomness, so just see repeated copies of the input.
Expand All @@ -323,6 +334,7 @@ def testShuffle(self):
[source_strings, source_ints], num_epochs=num_epochs, shuffle=True,
seed=161803)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Validate that we only shuffle the integers within an epoch and
Expand Down Expand Up @@ -406,6 +418,7 @@ def _testOneThreadHelper(self, use_dict):
[counter, sparse_counter, "string"], batch_size=batch_size)
batched_fetch = batched
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

for i in range(num_batches):
Expand Down Expand Up @@ -444,6 +457,7 @@ def testOneThreadDynamicPad(self):
counter = examples.count_up_to(num_batches * batch_size)
string = tf.tile(["string"], tf.to_int32(tf.pack([counter])))
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
batched = tf.train.batch(
[counter, string], batch_size=batch_size, dynamic_pad=True)
threads = tf.train.start_queue_runners()
Expand Down Expand Up @@ -480,6 +494,7 @@ def testOneThreadEnqueueMany(self):
batched = tf.train.batch(pre_batched, enqueue_many=True,
batch_size=batch_size)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

for i in range(num_batches):
Expand Down Expand Up @@ -516,6 +531,7 @@ def testManyThreads(self):
[counter, sparse_counter, "string"],
batch_size=batch_size, num_threads=4)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

all_counts = []
Expand Down Expand Up @@ -632,6 +648,7 @@ def _testTwoThreadsHelper(self, use_dict):
batch_size=batch_size)
batched_fetch = batched
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Should see the "a" and "b" threads mixed together.
Expand Down Expand Up @@ -706,6 +723,7 @@ def testTwoThreadsDynamicPad(self):
[ninety_nine, b]],
batch_size=batch_size, dynamic_pad=True)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Should see the "a" and "b" threads mixed together.
Expand Down Expand Up @@ -800,6 +818,7 @@ def _testTwoThreadsHelper(self, use_dict):
min_after_dequeue=16, seed=141421)
batched_fetch = batched
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

all_counts = []
Expand Down Expand Up @@ -847,6 +866,7 @@ def testManyThreads(self):
batch_size=batch_size, capacity=32,
min_after_dequeue=16, seed=173205, num_threads=4)
tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

all_counts = []
Expand Down Expand Up @@ -932,6 +952,7 @@ def _testTwoThreadsHelper(self, use_dict):
batched_fetch = batched

tf.initialize_all_variables().run()
tf.initialize_local_variables().run()
threads = tf.train.start_queue_runners()

# Should see the "a" and "b" threads mixed together.
Expand Down

0 comments on commit 4c85a95

Please sign in to comment.