字幕列表 影片播放
YURI: Hi, everyone.
My name is Yuri.
And today, I'm going to be talking
to you about tf.data, which is TensorFlow's input pipeline.
As a disclaimer, this presentation
assumes familiarity with basic TensorFlow concepts
such as ops and kernels.
And it contains a lot of code examples
that are not necessarily 100% accurate.
There may be some details that have been removed
because they're either unnecessary or distracting
for the purpose of presentation.
So with that, let's get started.
In this talk, we're going to cover a couple of topics.
We're going to peel the two main layers
of tf.data's implementation one by one,
first focusing on Python view and then on the C++ view
of tf.data.
And then I'm going to cover three areas of tf.data that
might be of interest to the broader audience,
support for non-tensor types, and both static and dynamic
optimizations in tf.data.
So let's get started with the Python view.
Throughout the course of the presentation,
I'm going to be using the following example, which
is a pretty standard example of an input pipeline.
What this input pipeline does, it's
reading files that are in TFRecord formats--
so this contains records--
then shuffling those records, applying a map transformation
that allows you to transform the records and parse them,
pre-process them, and finally, batching the pre-processed data
so that it's amenable to machine learning computation.
And the idiomatic way to iterate through elements
of an input pipeline in TF 2.0 is by a simple for loop.
And that's because in TF 2.0, data sets are Python iterables.
Besides this approach, you can also
use the explicit iter or next keywords.
Now, as the comment at the bottom mentions,
the user-defined function that you
can pass into the map transformation
can be both graph or non-graph computation
where the non-graph computation is enabled by AutoGraph.
And I'll talk a little more about that later on.
Just to contrast the simplifications that happened
in the transition between 1.x and 2.0,
let's take a look at what an input pipeline--
or idiomatic iteration of an input pipeline-- in 1.x
would look like.
And you can see that the definition of the input
pipeline that is the top part of the data set remains the same.
But the iteration is much more verbose.
So hopefully, this kind of illustrates that the simplest
way to iterate through a data set
has been made much more simple in the 2.0 release
of TensorFlow.
So let's talk a bit more about what's actually going on when
you run the Python program.
And what we're going to do is we're
going to go through different lines of the Python program
and talk about what actually happens under the hood in terms
of what types of TensorFlow ops these invocations correspond
to.
And I'm using a diagram to visualize
the different types of ops.
The gray box is the actual op-- so
in this case, TFRecordDataset-- while the yellow boxes
are the different inputs for the op,
while the blue box is the output of the op.
So in the case of the TFRecordDataset,
we have a couple of inputs-- file names, compression types,
buffer sizes.
And an important thing that I want to highlight here
is that this op produces a variant tensor, which
is a representation of the data set object
that can be passed between different ops.
And we will see how that's used right away when we're looking
at the map transformation.
So the MapDataset op, you can see that one of its inputs
is actually a variant, which is the downstream data set that
produces the elements that the map transformation transforms.
Other inputs are-- they're called other arguments.
And these are actually the captured inputs
for the function.
In this particular case, that input
would be empty, because the function doesn't
have any captured inputs, at least not as outlined
in the example.
And the round boxes are not inputs.
They are attributes.
The difference between inputs and attributes
is that the attribute values do not
change with different executions of the op.
They are constant.
And the attributes here are function,
which identifies the function parse, which
is stored separately in TensorFlow runtime--
but it allows the op to look it up when it executes--
as well as the type of the arguments
that the function inputs.
And again, like the TFRecordDataset,
it produces an output variant.
So a little more about the use of support
for user-defined functions in tf.data.
A number of tf.data transformations
are operations that actually allow users
to specify their own functions.
Examples of those are filter, flat_map, interleave, map,
or reduce.
And irrespective of the mode of the execution,
tf.data will convert the user-defined function
into a graph.
And as illustrated on the previous slide,
the function graph is--
a handle to the function graph is passed to the respective op
through an attr.
A little more detail on the tracing implementation-- it
was originally based on framework.function.Defun
and recently switched to the same tracing implementation
that's used for TF functions in 2.0.
This provided a number of benefits,
including control flow version 2,
support for resource variables, TensorArrayV2, and also
the ability for users to specify user-defined functions that
are not necessarily graph-compatible as
long as they're supported by AutoGraph.
And it's marked as work in progress
here because this functionality is actually
temporarily disabled.
And we're working on enabling it back on very soon.
So to tie it together, if we look at the input pipeline
definition, the four lines, this definition of an input pipeline
will roughly correspond to the following ops, and inputs,
and attributes.
Now, up to this point, we've only
talked about how to define the input pipeline.
But naturally, the thing that you
would want to do with the input pipeline
is that you would like to enumerate the elements
inside of it.
And that's where the iterator ops come in play.
Because iterator, it can be thought
of as an instance of a data set that has a state
and allows you to enumerate the elements in a sequential order.
So what are the iterator lifecycle ops?
The op on the left top corner called
Iterator that takes no input and produces a single output called
handle is an op that creates an empty iterator resource, which
is a way to pass the state, iterator state,
between different operations, while the MakeIterator op takes
two different inputs.
It takes iterator resource, which is something
that we've created by the iterator op,
and a data set variant.
And what this MakeIterator op does,
it instantiates the data set-- sorry,
the iterator resource with that particular data set.
So at that point, you have an iterator resource
that has been initialized to start producing
elements for that particular data set as defined by the data
set variant.
Now, the actual iteration happens
by the means of the IteratorGetNext op,
which takes an iterator resource handle
and produces the actual elements, which
can be a tensor, or a nest of tensors, or possibly
also non-tensor types.
And later in the presentation, I'll
talk about what exactly is supported
in tf.data in terms of types.
And finally, there's also a DeleteIterator op
that takes the iterator resource and makes
sure that the iterator state is properly
disposed of when the iterator is no longer needed.
This final op, as you can imagine,
is very important to make sure that iterator resources are not
being left behind.
Because it is not uncommon for the iterator resource
state to amass hundreds of megabytes or gigabytes
of memory.
And leaving these around can result in your computation
running out of memory.
As a side note, when you're looking at the performance
or profiling performance of your program or input pipeline,
you will see IteratorGetNext op in something
like a timeline, or an [INAUDIBLE],, or CPU profile
trace.
And this is the op that indicates the output
latency of your input pipeline.
And so if that op is very small in its runtime--
I would say on the order of microseconds--
it means that your input pipeline is not a bottleneck.
And if it's larger than that, chances
are you are bottlenecked by input, at least to some extent.
So now that we've talked about the different ops, let's
actually see how the execution of the Python program
corresponds or maps to the execution-- or creation
and execution-- of the different ops.
And what I'm going to do is I'm going to contrast the TF 2.0
eager mode style of execution with the TF 1.x graph mode
style of execution to help folks understand what are
the differences between TF--
between the two modes as far as tf.data is concerned.
So let's start with the 2.0 eager mode.
In eager mode, ops are created and executed
as the program runs.
So when the Python line that creates the TFRecodrDataset()
runs, we end up creating-- both creating and executing--
the RFRecordDataset() op.
And similarly, with the next line,
we create and execute the shuffle() data set op,
feeding the output of the previous op inside of it
as part of the input variant.
That way, we tie-- we're starting to build the input
pipeline, trying it to--
connecting the two stages together.
When the .map transformation is executed,
the user-defined function is traced and stored
in the TensorFlow runtime.
And a handle to it is passed, as an attribute,
to the map data set op along with the input variant
representing the input pipeline up to that point.
And finally, the batch() op is created and executed,
creating the final stage of the input pipeline.
Now, when the idiomatic way of iterating through tf.data is
used-- that is, the for loop for element in data set--
what happens under the hood is that an entire method is
called on the data set object.
And that actually triggers the creation and execution
of two ops.
We first create the iterator resource through an op.
It's called Anonymous Iterator.
And I'm going to point out the difference between that
and the iterator as I talk about the graph more execution.
And then we associate the iterator resource
with the input pipeline that we've created
via the MakeIterator op.
And as the Python for loop iterates,
we end up invoking next() on the Python iterator object.
And this translates to the IteratorGetNext op
being created and subsequently executed.
It's only created once, and it's executed as many times
as there's elements in the data set.
And finally, when the Python iterator object goes out
of scope, the DeleteIterator op is invoked,
which makes sure that the iterator state, iterator
resource state, is properly disposed of.
So let's contrast that with how this
would work in 1.x graph mode.
So in graph mode, the execution happens lazily,
which means we create the ops as the Python lines are invoked.
But they're not executed--
the execution is postponed until the particular ops are
run using decision mechanism.
So just stepping through the program,
we see that we are building a graph but not executing it.
There is a particular mechanism for creating the iterator
resource op and the MakeIterator op,
as well as creating the op that is later used for iteration.
And it's only within the run part of your program
that ops are executed.
When the iterator initializer op is executed,
we actually end up executing the entire graph of the input
pipeline, including the iterator op.
Now, the difference between the iterator op
and the anonymous iterator op that was used in the eager mode
is that anonymous iterator op creates a new resource
every time it's executed, while iterator op creates
a resource only the first time it's executed.
And any subsequent execution returns a handle
to that resource.
And the reason for that is when we run the get_next op,
that get_next op will actually execute the iterator op as well
by the nature of the difference between the graph
mode and eager mode executions.
And so that's kind of an artifact of graph mode
execution.
And thus we need different types of resource creation
ops for eager mode and graph mode inside of tf.data.
And there is no explicit delete iterator op.
And that's because the iterator resource
lifetime is tied to the lifetime of the surrounding session.
And so when the session is destroyed,
so is the iterator resource.
So far, so good?
OK.
So let's now, after--
now that we've kind of peeled the Python layer,
and we talked about an op-level view of tf.data, let's dive
a level deeper.
And let's talk about what actually
happens inside of the kernels that implement these ops.
And like with most other TensorFlow op kernels,
these are implemented in C++.
So before we retrace our steps or take a look at the example
program from a C++ point of view,
let's talk about what are the important tf.data C++
abstractions.
So the top-level one is a data set op kernel which
implements the op kernel API.
And this provides a mechanism for implementing different
types of data set ops through a single interface where
the different implementations of the data set op kernel
interface just need to override or implement the MakeDataset()
method.
What the MakeDataset() does, it returns a DatasetBase object.
Now, the purpose of the data set op kernel is to provide
a translation between a graph representation of the op
and a C++ representation of the op.
Now the data set object--
DatasetBase object-- in turn has a method for creating
an iterator for that particular data set as well as a method
called AsGraphDef(), which provides the reverse of what I
just talked about, which allows you to basically go from a C++
representation of a data set back to a graph representation
of a data set, which will come in handy when I talk about
static optimizations of tf.data.
The MakeIterator() method of the DatasetBase returns
an IteratorBase, which is an interface representing
the different types of iterators we have.
And the single most important method in that interface is
GetNext(), which is the actual C++ method used for iterating
through the state of the input pipeline.
And coupled with that is the IteratorResource,
which holds the state.
And so as we will see, the IteratorResource is actually
the entry point into the connected structure
of different C++ iterators through which ops like
IteratorGetNext get receive data.
And the SetIteratorFromDataset() method corresponds
to the MakeIterator op, as we'll shortly see.
Last but not least, tf.data has two C++ abstractions
for representing functions, its CapturedFunction
and InstantiatedCapturedFunction.
The CapturedFunction provides a mechanism
for bundling a function with its captured inputs
and later instantiating it.
And the InstantiateCapturedFunction
provides tf.data with a mechanism to actually run
the user-defined functions.
And so you can perhaps see how there
is a simple relationship between DatasetBase and IteratorBase
and CapturedFunction and InstantiatedCapturedFunction
where the letter is an instance of the former in both
of those cases.
All right, so let's go back to our example.
And now we're going to take a look at what happens when we
execute the different lines of the input pipeline,
but what happens in the C++ world.
And unlike in the previous Python view section,
in this section, the diagram at the bottom will not be graph
objects, but they will be C++ objects.
So in this case, the TFRecord Data set down below
is actually an instance of the TFRecord Data set
that's of type DatasetBase.
And for context, we are, again, in TF 2.0 eager mode.
So when the Python program executes a tf.data TFRecord
Data set with files argument, we end up creating the data set
variant through the following C++ code.
And just for illustration, I'm showing here
how that op kernel fetches the set of file names.
And the set of file names can either
be a single string or a list of strings.
So there is some subsequent string parsing
that's elided here.
But the important bit here is that we're then
storing the TFRecord Data set op data set in the output, where
the data set object itself is a variant, which
allows us to pass it on as an input to another op.
And that another op is the ShuffleDataset, which gets
executed immediately after.
And so here I'm illustrating how the op receives
or kind of extracts the variant tensor input from the op kernel
context and then passes it inside of the ShuffleDatasetOp
so that the ShuffleDatasetOp now understands
what stage is producing elements for it to consume.
Next, it's the map transformation.
What I want to illustrate here is how the CapturedFunction
mechanism works.
We use the CapturedFunction Create factory
that takes the identifier of the function, which
is a list of attributes, as well as
any captured inputs if there were any stored in the op
kernel context.
And similar to the ShuffleDatasetOp,
we end up passing the captured function as well as the input
to the downstream data set inside of the constructor.
And finally, there is not much new to see here
for the BatchDatasetOp.
So I pretty much elided all the details from this slide.
OK, so now for the interesting stuff,
because this is where we are going to start
iterating through the data set.
So the first thing that happens when you call--
or when you write "for element in dataset"--
under the hood, this gets translated
to a Python iter invocation on the data set object.
And the first thing that happens is
that we create the anonymous Iterator Resource.
And here is just an illustration of the actual mechanism that
does this as well as the code that then produces
the handle to the iterator.
And this handle, along with the variant tensor
representing the batch data set is then
passed to the MakeIteratorOp.
So here you can see how we extract both the data set
variant as well as the resource handle and use these two
to pass them into the SetIteratorFromDataset() method
that, as we will shortly see, will, in a cascading fashion,
create a sequence of connected iterator objects.
So let's take a closer look at what SetIteratorFromDataset()
does.
It takes a look at the outermost data set,
because that's the variant that it received.
And it invokes MakeIterator on that particular data set.
And this will prompt a creation of a Batch Iterator using
the MakeIterator() method on the Batch Data set,
but also trigger a recursive MakeIterator() invocation
on the input of Batch Data set, which is Map Data set.
And so in that fashion, the Map-- in a similar fashion,
Map Iterator is created, where the Map Iterator
creator will also instantiate the captured function.
So now we'll see that we have a parse_fn instance in addition
to just parse_fn CapturedFunction object.
And similarly, we create a Shuffle Iterator, and finally,
the TFRecord Iterator.
And because TFRecord Data set has no inputs,
this is where the recursive creation of the input pipeline
state stops.
And the control bubbles up back to IteratorResource, that
returns a resource handle--
actually, MakeIterator() doesn't return a resource handle.
We already have the resource handle.
AUDIENCE: Question.
YURI: Uh-huh?
AUDIENCE: Does the input pipeline have to be a line?
Or can it be a DAG?
YURI: So at the data set level, the input pipeline
can be a DAG.
At the iterator level, it will always
be a tree, if that makes sense.
OK.
So next, let's take a look at next().
So when the IteratorGetNextOp is invoked because we're starting
to iterate through the elements of the input pipeline,
we again look up the resource using to LookupResource()
method and then call to GetNext() method
on the resource.
And as I mentioned earlier, iterator resource
is thus the entry point to the state of the iterator.
And what happens is this recursively calls GetNext()
on the Batch Iterator.
And the Batch Iterator says, well, I need
batch size worth of elements.
So let me get them one by one.
So it calls to Map Iterator to say, please give me an element.
Map Iterator says, well, I need an element
to apply a user-defined function on.
So it's going to ask Shuffle for one.
And Shuffle says, well, I need a buffer size worth of elements
to do reasonable shuffling.
So it's going to call to TFRecord Iterator.
And TFRecord Iterator will say, OK, well, I have these files,
and I'm going to open and start reading elements out of them.
So at this point, we start returning data back
up this round trip between Shuffle and TFRecord Iterators.
It might happen multiple times initially.
And at some point, Shuffle or has filled its buffer
off of elements used for shuffling,
and produces a random element back up to the Map Iterator,
which then applies the user-defined function on it,
and takes its output, and returns it back
to the Batch Iterator.
And this would be repeated batch size number of times.
Then the Batch Iterator would take all those elements
and create one higher-level, higher-dimensional element out
of the individual slices of the batch
and pass it on to Iterator Resource.
And that would get created--
returned out to the Python program.
Now finally, when the Python iterator goes out of scope,
the Iterator Resource is deleted.
And in a cascading fashion, the other iterators
get created because their ref count goes to 0,
or, we actually use smart pointers that
are kind of connected between the different iterator objects.
So far, so good?
Any questions?
OK.
So up to this point, I talked primarily
about input pipeline that wasn't trying to be performance-savvy.
And if it wasn't obvious from the walkthrough
of the stages of the iterator, it
seems like there's a lot of steps
that would need to happen to produce a single batch.
And if all these steps lie on the critical path
of your computation, then you're probably not executing
at the best possible performance,
or at the peak of your performance.
So we have a tf.data performance guideline
that talks about different mechanisms
to make sure that your input pipeline is performant.
And the three main ones are software pipelining,
processing parallelization, and I/O parallelization.
And they all use various performance artifacts,
either buffers or parallelism, to allow
users to specify how their input pipeline should be executed.
And in the context of the parallelism--
and actually, pre-fetching as well--
they all map down to asynchronous threads
being started by the data set of kernels.
And they are running in the background,
disconnected from the GetNext() calls,
generating values into an internal buffer.
And when a GetNext() call arrives,
it just waits until there is something that buffer
and returns it.
So in the ideal case, there is data in the buffer,
and you don't need to wait at all.
But in case your consumer of the data
is faster than your producer, then
you might wait some of the time, but hopefully not all
of the time.
So let's just take a look at how this would change the diagram
that we talked about.
So what I did for the sake of an illustration is I
added the num_parallel_calls argument
to the map transformation as well as
a prefetch transformation at the very end of the input pipeline.
AUDIENCE: So you can prefetch anywhere [INAUDIBLE]??
YURI: You can.
Yes.
Rule of thumb is that the one at the very end
is usually the one that you get the most mileage out
of because it allows you to overlap the computation
the entire pipeline with the computation that
might be happening in the model, either on [INAUDIBLE]..
But yes, in theory--
I have a pertinent quiz later on.
We'll see that prefetching or decoupling the producer
and consumer anywhere throughout you input pipeline
might be a good idea.
So the changes that reflect what happened, or changes
in the pipeline code map to the following changes
in the diagram.
We now have a Prefetch Iterator and Prefetch Data Set
at the end of the pipeline.
And we also have a ParallelMap Iterator and ParallelMap
data set, instead of just regular Map Iterator, Map Data
Set.
It turns out with tf.data, we have a different op kernel
from the one that uses parallelism.
AUDIENCE: Do you support Racket tensors yet?
YURI: No.
But the CL that introduces that support on--
actually a couple of CLs-- but we're very close.
AUDIENCE: Excellent.
And do you see any performance decreases
when using Racket tensors?
YURI: So because we are not supporting them yet,
I don't have a good answer to that.
AUDIENCE: Yeah.
YURI: I think, through the course of the review process
for bringing that support in, we've
been cognizant of making sure that the implementation is
efficient so that it works well out of the gate.
AUDIENCE: Yeah.
But you support sparse tensors?
YURI: We do.
AUDIENCE: OK.
YURI: Yeah.
So the hope is that the programs that
use sparse tensors that also use Racket tensors will see
a performance boost by switching to Racket
tensors once that support is rolled up.
AUDIENCE: Excellent.
Thank you.
YURI: Mhm.
And so the runner threads that are our
illustrated here, these are the background threads that
decouple the producer and consumer in the Prefetch
Iterator and the ParallelMap Iterator respectively.
And they're actually not started until the very first getNext
invocation.
So before you call getNext for the first time,
the iterator is idle.
There's no activity happening.
But the moment you start fetching data out
of the iterator, background threads might be started
or thread pools might be created that might start
performing a background entity.
An interesting, exciting thing, or a consequence of this,
is that a traditional way to look
at the performance of TensorFlow would be a timeline, which
gives you a view into what happens
in the context of a single stack.
And this particular abstraction doesn't match well
with the asynchronous nature of tf.data execution.
In addition to that, you will only
see the iterator get mixed up, which might not necessarily
give you a good view into what's actually
happening at the different stages of the tf.data
into pipeline.
And so at the recent Dev Summit, we
announced that there is going to be an open source
version of a tool that we've had available internally
for some time that provides you with all
these details of information so that you can debug
the performance of your input pipeline
using the same tools that we use internally.
OK.
So that concludes the section where I talked about C++
and what happens in C++.
And there's a little bit of a switch
now, because we're going to go back to Python level
and talk about supporting non-tensor types.
So tf.data supports more than just regular tensors.
The different inputs or outputs of your data
transformations can actually be a number of different things--
sparse tensors, tensor arrays, nests
of any of these optionals, as well as nested data sets.
And here, I just illustrate it.
It's not an exhaustive list, but I just
illustrate some of the transformations in terms
of the types that they support either
as an input or an output.
AUDIENCE: What about NumPy arrays?
YURI: NumPy arrays are supported as well.
They kind of fall into the category of tensors by virtue
being of trivially convertible to tensors.
AUDIENCE: Yeah.
YURI: Similar to NumPy, there's something
called SparseTensorValue, which is really
just a Python namedtuple return type.
And that's kind of the NumPy equivalent for sparse tensors.
And I think Racket tensors have the same.
They have a Racket tensor value.
AUDIENCE: With the caveat in [INAUDIBLE] and in eager,
you don't actually need the value types
because you can have a sparse tensor or a Racket tensor whose
values are eager tensors, which are trivially
convertible to NumPy arrays.
YURI: Yeah.
Yeah.
So the value type is an artifact of graph mode 1.x.
So the mechanism that tf.data uses under the hoods
could provide support for these different types
is the tf.data structure API, which is this interface
and will require any type to be supported in tf.data
to implement this interface.
I'm not going to talk about each of these,
but the list of methods is neither short nor long.
So for example, the support for TensorArray
was introduced less than a month ago.
And it was one day of work.
So I don't think that the overhead of introducing
the support for the type, as long as it's
kind of natural how to implement this method is very large.
Having said that, the support for Racket tensor
has been in the works for some time.
And part of it is because we actually
want that implementation to be very performant.
And so it prompted creation of new C++ kernels to make sure
that the performance is good from the get-go.
Instead of talking about the individual
methods in the interface, what I want to do here
is I want to illustrate how this interface is actually
used to provide the polymorphism at the Python level
for different types of tf.data transformations.
So for instance, if we look at the tf.data data set
from tensors transformation, which is a data source that
just take a memory array and use it as a data set source, what
the implementation at the Python level does is it computes,
or it involves the structure from value methods,
to compute an instance of the structure object and stores
internally in its attribute.
And then it passes the output of structure to tensor arrays,
to the op kernel--
so the tensor data set op kernel.
I forgot to mention earlier, at the C++ level,
tf.data only deals with flat lists of tensors.
And so we need a mechanism to go between that representation
and the Python numTensor nested structure
of also arbitrary types.
And it's the to_tensor_list and from_tensor_list
that provide us with this boxing and unboxing, if you will,
between the two representations.
from_tensor_slices is similar.
The difference between from_tensors
and from_tensor_slices is that instead
of viewing data as a single tensor, we end up slicing it.
We assume that the value has a rank of at least 1.
And we end up slicing it into however many slices
it has in the dimension.
And these are the invocations of the structure API
that would allow us to do this kind of agnostically
to the actual type of the Python value.
I also want to illustrate how the structure
API is used in the context of user defined functions.
In particular, the function that we end up tracing
is actually a different function than just the function the user
process.
We end up wrapping the function that
gets passed in the Python program in invocations
to from_tensor_list and then to_tensor_list.
And this is kind of the glue that I
talked about where we make sure that the Python record expects
a flat list of tensors.
And then we reconstruct the structure and the typing,
using the from_tensor_list invocation,
because that's what user provided function expects.
And then we again deconstruct the structure and box
all the known tensor types and tensors,
because that's what the upstream transformation of tf.data
expects.
All right.
So next up, let's talk about static optimizations,
which illustrates that everything that I talked
about up to this point about how tf.data works
is only part of the truth.
So in tf.data, we have a number of transformations or a number
of optimizations implemented.
Here is a subset of the ones who we have.
And the ones that are in italics are
the ones that enabled by default.
And the ones that are not italic,
then those can be enabled through tf.data options.
This is how you would, for example,
go about enabling a map vectorization transformation
if you wanted to.
The tf.data options has other features.
It's not just for optimizations, it is also, for example,
for specifying threading or statistics collection features.
But in the context of static optimizations,
I'm just illustrating how it's used for those.
So what happens at the Python level when iterator is created
is that the data set Python object has an options
object associated with it.
And we use the information in the options
object to possibly chain additional data set
transformations on the end of the data set.
This is something that the user doesn't see in their code,
doesn't write in their code.
It's something that we do at the Python level
to allow us to insert functionality
that we would like to insert.
And one of the main uses of this is this optimized data set
that's used as an entry point for any static optimizations
that are to be applied to the input pipeline.
So if we take a look at the C++ level,
what's happening inside of the optimized data set kernel is
we'll again get the input of the data set,
and then invoke a data set optimized method
on the optimized data set kernel.
And the code is actually quite long,
so I just summarized it in high level statements here.
This is what happens inside of the optimized method.
We first use the AsGraphDef functionality to go from
the C++ representation of the input object to GraphDef
representation of the input object.
We then use Grappler to apply the subset of tf.data
optimizations that are either enabled by default,
or explicitly enabled by a user, which
will give us a transformed GraphDef representing the data
set.
And we then convert the rewritten GraphDef to a C++
representation using GraphRunner.
And finally, we update the input with the result.
So because map and batch optimization
is one of the optimizations enabled by default,
the map and batch stages that were in our example
would be, in fact, replaced with a single map and batch data
set, which is a more performant version of diffused map
and batch transformation.
And last topic that I want to talk about
are dynamic optimizations.
So I mentioned before that users have
a number of mechanisms to make their input pipelines more
performant.
They can insert prefetching with buffer_size.
They can insert map with num_parallel_calls
or interleave with num_parallel_calls
to apply various performance optimization tools.
But what are good values of these arguments,
like buffer_size and num_parallel_calls?
Well, so in the context of this section,
I'm only going to focus on the parallelism optimization.
And to get you interested, I have a quiz for you.
[LAUGHTER]
So imagine that you have an input
pipeline that looks like this.
It reads from a set of files, and then it
applies two transformations.
And on the right-hand side in the comment,
I mentioned how much time is required
to get a single element, assuming constant processing
time through the particular stage of the input pipeline.
So with this information, how much time do you
think you would need, when you call getNext,
how long would it take to get a single element out?
AUDIENCE: 200 milliseconds.
YURI: 320.
AUDIENCE: 320.
YURI: 320 is the answer, because everything
executes sequentially.
So when you call into the outermost map,
that goes recursively into the innermost
or the inner map, which calls recursively
into the tf record data set.
We spend 20 milliseconds there, then we
spend 100 milliseconds in the inner map executing f, and then
200 milliseconds in the order map executing g.
So it's 320.
I think you kind of jumped one step ahead here.
[LAUGHTER]
And this was supposed to be a trick question,
but you already got the answer.
What happens here if we just add num_parallel_calls
to the map transformations?
Nothing, except something happens.
And the reason for this being 200 milliseconds
is that num_parallel_calls uses a different op kernel which
has a background thread that will be performing activity
independently of the consumer of the data.
So the very first element will actually take 320 milliseconds.
But then over time, there is going
to be the processing done for the three different stages will
be actually overlapped because there is now two background
threads doing everything in parallel.
The parallelism is 1 at each stage,
but that still gives you 200 milliseconds,
in total, in the stable state, assuming--
AUDIENCE: Is that the correct mental model to think
that this implies prefetch?
YURI: Yes.
That a very good mental model.
Parallel map is, in fact, the prefetching
by the virtue of using a background thread.
And so in a way, this is an answer to your question
where prefetching inside of the input
pipeline, not just at the very end, might provide benefits.
AUDIENCE: Question.
So does that imply that the map function has to be traced?
Like, for example, if the map function is just a Python
function, and if you have multi-threading
on Python function, then really [INAUDIBLE]??
YURI: So I think the answer is that you
will get these benefits, irrespective of the cost,
the constant cost of the Python function is--
of the function passed into the map transformation.
If it's implemented as a py_func,
that function itself might be--
oh, I see what you're saying, that multiple functions would
be escaping into Python.
That's a good point.
Possibly.
I would want to convince myself that they actually
are all content for the same [INAUDIBLE]..
AUDIENCE: Either way.
AUDIENCE: If you ever use Python,
you need to make sure your Python is thread safe.
It's very hard for a TensorFlow runtime
to not accidentally run Python code from many threads
at the same time.
YURI: Yeah.
I think the bottom line here is that if you
can avoid using py_func, for instance, use autograph.
So it perhaps might not be surprising
that if you increase the values of num_parallel_calls,
because you know how much time you're
going to spend in each of those stages,
you can get to the optimal output latency of this input
pipeline.
You cannot run any faster than the slowest part of the input
pipeline, which in this case is the sequential TFRecordReader.
There might actually be a way to speed this up even further,
by using interleave, num_parallel_calls
over the different readers.
But instead of exploring that avenue, what I want to ask
is, what do you think happens here?
AUDIENCE: Dismal performance.
YURI: Yeah.
I think the answer is, it depends.
It might actually run well, well enough,
because num_parallel_calls doesn't
mean that you create that many threads, at least
in the case of map transformation anyhow.
It means that you allow to schedule as many ops
into the interop thread pool at the same time.
And because you're allowed to schedule them at the same time,
you need a place to store them.
So if nothing else, the downside of specifying
a very large value of num_parallel_calls
is that you're going to use more memory to store
these intermediate values than you would actually
need for equal performance, which
might hurt your temporal locality or thread locality.
So yes, the performance might actually become worse,
but the reasons for why can be subtle and
environment-specific.
AUDIENCE: You said earlier that ops create their own threads.
Is that actually the case?
Or do they use the shared thread pools in the executor?
YURI: They create their own thread.
Parallel map, prefetch end up creating their own thread.
Parallel interleave creates its own thread pool.
Under the hoods, they end up using this abstraction
of an unbounded thread pool, if you are familiar with that,
which was introduced recently to combat memory fragmentation
issues in the open source memory allocator,
resulting from excessive thread creation.
So the unbounded thread pool that the tf.data uses creates
this illusion of logical threads that are mapped onto a smaller
subset of physical threads.
But they're different from the inter_op thread pools
or any of the core TensorFlow runtime thread pools.
We do rely in tf.data data on the inter_op thread
pool for the execution of the user-defined functions,
by default. But there is also an option to override that.
And we also, by default, take advantage
or inherit the setting of the inter_op_parallelism.
And there is also a way to override that just
for the tf.data ops.
And as a matter of fact, our experience
has been that disabling inter_op_parallelism for--
and this is perhaps not a surprise to you,
but disabling inter_op_parallelism
altogether for CPU content input pipelines
gives you 10%, 20% speed up.
Because you don't need to parallelize the individual ops,
you get the parallelism by running multiple of them
in parallel.
AUDIENCE: Do we have any guides internally about how
you can do tricks with tf.data to get
performance enhancements?
Or not so much?
YURI: Great question.
So instead of having guides, why not
just have tf.data do the hard work for you?
AUDIENCE: Excellent.
YURI: And so this is something close to my heart
because I worked both on tf.data performance in the early days
in terms of exposing these knobs,
and then I worked very hard on making sure
that users don't need to use these knobs because there
is something that does a good enough job automatically.
And you might think, well, this seems
like a good fit for reinforcement learning,
since we're in RMI.
And that's something that I explored as an idea.
I didn't get it to work, but that
might be because of me, not because of reinforcement
learning.
[LAUGHTER]
The issue with reinforcement learning-- so the idea is this.
What if you just pick some values for the different paths
and knobs in your input pipeline,
observed behavior, and then try some different values
and use a smart algorithm that kind of converges,
hopefully, to global optimum.
Well, it turns out that the convergence was very slow.
And if you set abysmal parameters for something
that could be heavily parallelized,
then you would need a very long time
to actually realize that this is slow because
of poor parameters, as opposed to this
is slow because that's how fast the input pipeline runs.
So instead of exploring reinforcement
learning that tries to modify the parameters, or tf.data
has an analytical model that models
the performance of the input pipeline
that's currently instantiated.
And there's a little bit of math in my presentation
because I'm originally a formal verification person.
So here's some math for you.
You can model the output latency of a node
as a function of the output latencies of its inputs.
And the challenge was, how do you
implement these two functions-- the processing time of a node,
as well as the lambda function that captures
what the node itself does?
So the processing time is modeled through a lightweight
instrumentation of the C++ implementation we have.
By lightweight, I mean it imposes
roughly 100 nanoseconds of overhead on the critical path.
And we keep track of node creation, deletion,
as well as the computation, the processing time,
spent within a node and a user-defined function.
And the lambda, which is the transformation-specific
functionality which maps how the output latencies of the inputs
correspond to the output latency of the op,
turns out there is a fairly small number of categories
of node for each of which there is a different type of lambda.
And the autotuning mechanism then
takes advantage of these two implementation artifacts
to tie it all together.
And we start a background thread that periodically performs
the following optimization.
We snapshot the state of the analytical model, which
captures the processing times of the different iterators
that are currently floating around.
And then we perform a very simple hill-climbing algorithm
that allocates threads or cores to parallelism knobs that
provide the greatest benefit.
So it's really algorithm, assuming that this optimization
is actually monotonic.
And if you do that, you can specify
AUTOTUNE for num_parallel_calls in this example.
And as long as you have 16 or more cores,
you get the same output latency that you would get out
of the manually tuned one.
And the key is that it will actually not over-provision
by an order of magnitude.
It might over-provision a little bit, but not 1,024.
And that's it.
[APPLAUSE]