Introduction

Personalized recommender systems are used widely for offering the right products or content to the right users. Some examples of such systems are video recommendations (“What to Watch Next”) on YouTube, Google Play Store app recommendations and similar services offered by other app stores and content services. In essence, recommendation systems filter products or content from a large number of options and offer them to users. Read on to learn how to use the TPU embedding API to accelerate the training of recommendation systems, particularly models with large embedding tables.

The embedding lookup operation is a critical component for large scale recommendation systems (e.g., Wide and Deep, DLRM or Deep & Cross Network) and can easily become a performance bottleneck, particularly for large tables distributed across multiple accelerators. TPUEmbedding API addresses this bottleneck. The TPUEmbedding API allows users to efficiently handle very large tables by automatically sharding or partitioning them across all available TPU cores. Along with ultra-fast chip-to-chip interconnect it can scale seamlessly from the smallest TPU configuration (8 TPU devices) to a TPU pod slice (>=32 TPU devices). This allows embedding models to scale from ~ 200GB on one TPU v4-8, to multiple TBs on a TPU pod slice. 

Snap achieved ~3x more throughput while lowering the cost by 30% by using the TPU embedding API and other optimizations on a v3-32 system when compared to a 4xA100 system. Thanks to the dedicated chip-to-chip high speed interconnect and optimized TPU software stack we hope that you too can train your recommendation models faster, reducing the training cost at the same time. To this end, here, Google present an overview of the TPUEmbedding API along with various performance optimization techniques.

Simplified Recommendation Pipeline

TPUEmbedding API can be used to accelerate training of both retrieval models (such as the two-tower model) and ranking models (such as DLRM).

As a rule of thumb, we can expect that the TPUEmbedding API provides performance benefit for tables with more than 100K rows.

TPUEmbedding APIs

To use the TPUEmbedding API, one TPUEmbedding TableConfig (of type tf.tpu.experimental.embedding.TableConfig) for each table in your model needs to be defined.

tf.tpu.experimental.embedding.TableConfig(
vocabulary_size: int,
dim: int,
initializer: Optional[Callable[[Any], None]] = None,
optimizer: Optional[Optimizer] = None,
combiner: Text = ‘mean’,
name: Optional[Text] = None
)

Note that: 

  • vocabulary_size: is the size of the table’s vocabulary (number of rows), 
  • dim: is the embedding dimension (width) of the table.
  • optimizer: is a per table optimizer. If set it will override the global optimizer.
  • combiner: is an aggregator to be applied for multi-hot embedding lookups (for Sparse/Ragged Tensors it is applied to the last dimension). It’s ignored for one-hot or dense embedding lookups.

Next a TPUEmbedding FeatureConfig (of type tf.tpu.experimental.embedding.FeatureConfig) for each embedding feature in the model is defined.

tf.tpu.experimental.embedding.FeatureConfig(
table: TableConfig,
max_sequence_length: int = 0,
validate_weights_and_indices: bool = True,
output_shape: Optional[tf.TensorShape]] = None,
name: Optional[Text] = None
)

Note that:

  • max_sequence_length: is only used for sequence features with `max_sequence_length` > 0. If the sequence is longer than this, it will be truncated.
  • ouput_shape: Optional argument to configure the output shape of the feature activation. If not provided, the shape can be either provided to the TPUEmbedding.build or auto detected at the runtime.

Each feature is assigned to an embedding table through the table argument and multiple embedding features can use the same TableConfig instance. When two or more features share the same TableConfig instance, a shared table will be created for feature lookups.

The output shape argument can be set when the layer is unable to determine the shape of the input at compile time. It should be set to the desired output shape, without the embedding dimension. For example, if you are feeding dense inputs, this can be set to the same dimensions as the dense shape, since the combiner is not applied to dense inputs. For sparse inputs (such as SparseTensor or RaggedTensor), this can be set to the expected input shape without the last dimension (since that dimension is where the combiner acts.

table1 = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=200, dim=10, initializer=tf.initializers.Ones(),
combiner=’sum’, name=’table1′)
table2 = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=100, dim=4, initializer=tf.initializers.Ones(),
combiner=’sum’, name=’table2′)
feature_config = {
‘feature1’: tf.tpu.experimental.embedding.FeatureConfig(table=table1),
‘feature2’: tf.tpu.experimental.embedding.FeatureConfig(table=table1),
‘feature3’: tf.tpu.experimental.embedding.FeatureConfig(table=table2),
}

The above configuration has two tables, and three features. The first two features will be looked up in the first table and the third feature will be looked up in the second table.

After this you define a TF2 Keras layer for embedding lookups with TPU.

tfrs.layers.embedding.TPUEmbedding(
feature_config,
optimizer,
pipeline_execution_with_tensor_core=False,
batch_size=None
)

This layer must be applied to the categorical inputs before the dense layers are applied. Setting pipeline_execution_with_tensor_core=True improves the training performance by overlapping the TPU embedding lookup computations with the dense computations. This may lead to some weights being stale, but in practice the impact on correctness is small.

The optimizer argument is used as a global optimizer for each embedding table that doesn’t have a table level optimizer. For most use cases a single optimizer is enough for all embedding tables, but some advanced use cases can require a table level optimizer/learning rate.

The following code snippet creates a functional style Keras model:

import tensorflow_recommenders as tfrs
import tensorflow as tf

tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
strategy = tf.distribute.TPUStrategy(tpu)
Feature_config = … # Defined in the last snippet

strategy = tf.distribute.TPUStrategy(…)
with strategy.scope():
embedding_inputs = {
‘feature_one’: tf.keras.Input(batch_size=1024, shape=(1,),
dtype=tf.int32),
‘feature_two’: tf.keras.Input(batch_size=1024, shape=(1,),
dtype=tf.int32, ragged=True),
‘feature_three’: tf.keras.Input(batch_size=1024, shape=(1,),
dtype=tf.int32)}
# embedding, feature_config and embedding_inputs all have the same nested
# structure.
embedding = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config,
optimizer=tf.tpu.experimental.embedding.SGD(0.1))(embedding_inputs)
logits = tf.keras.layers.Dense(1)(
tf.concat(tf.nest.flatten(embedding), axis=1)
)
model = tf.keras.Model(embedding_inputs, logits)

Note: You can only have one TPUEmbedding layer created under a TPUStrategy and it can only be called once per training function. For a two tower network, both towers need to use the same TPUEmbedding call.

How to Optimize Performance?

Pipelining embedding lookups with dense computations

TPU Embedding API implementation allows the embedding lookup to run in parallel with dense computations, which can improve the performance. Setting pipeline_execution_with_tensor_core=True in the layers constructor will enable the embedding lookups for step n+1 in parallel with the dense computations for step n. In particular, the lookup for step n+1 will happen before the update on the embedding tables for step n. Although this is mathematically incorrect in general, we have found that this is safe to enable for most models, since the ids that are used in step n and n+1 have little overlap.

Choosing which tables to shard based on their sizes

Sharding small embedding tables (less than 10000 rows) between TPU cores can be suboptimal as it increases network communication between TPU cores without saving much HBM memory on each TPU core. The PartialTPUEmbedding API allows sharding large tables between TPU cores via the normal TPUEmbedding API, while keeping small tables mirrored on each TPU core.

tfrs.experimental.layers.embedding.PartialTPUEmbedding(
feature_config,
optimizer: tf.keras.optimizers.Optimizer,
pipeline_execution_with_tensor_core: bool = False,
batch_size: Optional[int] = None,
size_threshold: Optional[int] = 10000
)

The PartialTPUEmbedding API is very similar to the tfrs.layers.embedding.TPUEmbedding API, with size_threshold extra argument. Tables with vocabulary sizes less than size_threshold are not sharded (replicated across TPU cores), while tables with sizes more than size_threshold are sharded.

Further performance improvement by Input pipeline Optimization

Now that embedding table lookup on the TPU with the TPUEmbedding API is much faster, the next bottleneck might be in the input pipeline. Please refer to the Better performance with the tf.data API guide and the Analyze tf.data performance with the TF Profiler guide, to learn more about optimizing input data pipelines.

Exporting models for serving

There are multiple ways to export models for serving. The easiest way is to start from an already trained checkpoint:

import tensorflow_recommenders as tfrs
import tensorflow as tf

class EmbeddingModel(tf.keras.Model):

def init():
self.embedding = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config,
optimizer=tf.tpu.experimental.embedding.SGD(0.1))
self.dense = tf.keras.layers.Dense(1)

def call(inputs, serving_config=None):
embedding_activations = self.embedding(inputs, serving_config=None)
concatenated_embeddings = tf.concat(
tf.nest.flatten(embedding_activations), axis=1)
return self.dense(concatenated_embeddings)

tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
strategy = tf.distribute.TPUStrategy(tpu)

with tpu_strategy.scope():
tpu_model = EmbeddingModel()

tpu_model.fit(…)

tpu_checkpoint = tf.train.Checkpoint(model=tpu_model)
tpu_checkpoint.save(…)

Create a new copy of the model under the default scope.

cpu_model = EmbeddingModel()

cpu_checkpoint = tf.train.Checkpoint(model=cpu_model)
cpu_checkpoint.restore(…)

@tf.function(input_signature=[{
‘examples’:
tf.TensorSpec(
shape=[None], dtype=tf.string, name=’examples’)}])
def serve_examples(examples):
input_data = … # parse the examples tensor to produce input tensors.
return cpu_model(input_data)

tf.saved_model.save(cpu_model,
export_dir=…,
signatures={‘serving’: serve_examples})

Note that the TPUEmbedding layer supports serving a subset of the tables, which is useful when exporting part of a co-trained model:

serving_config = {
# When using serving config, it is important that the table config objects used
# are (a subset of) the table objects passed to the layer initialization.
‘feature1’: tf.tpu.experimental.embedding.FeatureConfig(table=table1),
}

@tf.function(input_signature=[
tf.TensorSpec(
shape=[None], dtype=tf.string, name=’examples’)}])
def serve_examples(examples):
input_data = … # parse the examples tensor to produce input tensors.
return cpu_model(input_data, serving_config=serving_config)

tf.saved_model.save(cpu_model,
export_dir=…,
signatures={‘serving’: serve_examples})

When creating a SavedModel using the method above, you must have a single VM with enough capacity to load and save the entire model. Since the TPUEmbedding layer supports multi TB embedding tables, this may be difficult. In this situation you can use the following method instead:

tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
strategy = tf.distribute.TPUStrategy(tpu)

with tpu_strategy.scope():
tpu_model = EmbeddingModel()

tpu_model.fit(…)

@tf.function(input_signature=[
tf.TensorSpec(
shape=[None], dtype=tf.string, name=’examples’)])
def serve_examples(examples):
input_data = … # parse the examples tensor to produce input tensors.
# It is important that serving_config is passed here, even if it is the same as
# the configuration used to initialize the layer. This prevents the layer from
# using TPU specific ops, even though it was created under a TPUStrategy.
return tpu_model(input_data, serving_config=feature_config)

tf.saved_model.save(tpu_model,
export_dir=…,
signatures={‘serving’: serve_examples})