Skip to content

TensorFlow Transform tft.experimental Module

tensorflow_transform.experimental

Module level imports for tensorflow_transform.experimental.

Attributes

PTransformAnalyzerCacheCoder module-attribute

PTransformAnalyzerCacheCoder = CacheCoder

Sequence module-attribute

Sequence = _alias(Sequence, 1)

SimpleJsonPTransformAnalyzerCacheCoder module-attribute

SimpleJsonPTransformAnalyzerCacheCoder = JsonNumpyCacheCoder

Classes

CacheablePTransformAnalyzer

Bases: TypedNamedTuple('PTransformCachedAnalyzer', [('make_accumulators_ptransform', _BeamPTransform), ('merge_accumulators_ptransform', _BeamPTransform), ('extract_output_ptransform', _BeamPTransform), ('cache_coder', PTransformAnalyzerCacheCoder)])

A PTransformAnalyzer which enables analyzer cache.

WARNING: This should only be used if the analyzer can correctly be separated into make_accumulators, merge_accumulators and extract_output stages. 1. make_accumulators_ptransform: this is a beam.PTransform which maps data to a more compact mergeable representation (accumulator). Mergeable here means that it is possible to combine multiple representations produced from a partition of the dataset into a representation of the entire dataset. 1. merge_accumulators_ptransform: this is a beam.PTransform which operates on a collection of accumulators, i.e. the results of both the make_accumulators_ptransform and merge_accumulators_ptransform stages, and produces a single reduced accumulator. This operation must be associative and commutative in order to have reliably reproducible results. 1. extract_output: this is a beam.PTransform which operates on the result of the merge_accumulators_ptransform stage, and produces the outputs of the analyzer. These outputs must be consistent with the output_dtypes and output_shapes provided to ptransform_analyzer.

This container also holds a cache_coder (PTransformAnalyzerCacheCoder) which can encode outputs and decode the inputs of the merge_accumulators_ptransform stage. In many cases, SimpleJsonPTransformAnalyzerCacheCoder would be sufficient.

To ensure the correctness of this analyzer, the following must hold: merge(make({D1, ..., Dn})) == merge({make(D1), ..., make(Dn)})

Functions

Any

Any(self, parameters)

Special type indicating an unconstrained type.

  • Any is compatible with every type.
  • Any assumed to have all methods.
  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance or class checks.

Source code in python3.9/typing.py
@_SpecialForm
def Any(self, parameters):
    """Special type indicating an unconstrained type.

    - Any is compatible with every type.
    - Any assumed to have all methods.
    - All values assumed to be instances of Any.

    Note that all the above statements are true from the point of view of
    static type checkers. At runtime, Any should not be used with instance
    or class checks.
    """
    raise TypeError(f"{self} is not subscriptable")

Optional

Optional(self, parameters)

Optional type.

Optional[X] is equivalent to Union[X, None].

Source code in python3.9/typing.py
@_SpecialForm
def Optional(self, parameters):
    """Optional type.

    Optional[X] is equivalent to Union[X, None].
    """
    arg = _type_check(parameters, f"{self} requires a single type.")
    return Union[arg, type(None)]

Union

Union(self, parameters)

Union type; Union[X, Y] means either X or Y.

To define a union, use e.g. Union[int, str]. Details: - The arguments must be types and there must be at least one. - None as an argument is a special case and is replaced by type(None). - Unions of unions are flattened, e.g.::

Union[Union[int, str], float] == Union[int, str, float]
  • Unions of a single argument vanish, e.g.::

    Union[int] == int # The constructor actually returns int

  • Redundant arguments are skipped, e.g.::

    Union[int, str, int] == Union[int, str]

  • When comparing unions, the argument order is ignored, e.g.::

    Union[int, str] == Union[str, int]

  • You cannot subclass or instantiate a union.

  • You can use Optional[X] as a shorthand for Union[X, None].
Source code in python3.9/typing.py
@_SpecialForm
def Union(self, parameters):
    """Union type; Union[X, Y] means either X or Y.

    To define a union, use e.g. Union[int, str].  Details:
    - The arguments must be types and there must be at least one.
    - None as an argument is a special case and is replaced by
      type(None).
    - Unions of unions are flattened, e.g.::

        Union[Union[int, str], float] == Union[int, str, float]

    - Unions of a single argument vanish, e.g.::

        Union[int] == int  # The constructor actually returns int

    - Redundant arguments are skipped, e.g.::

        Union[int, str, int] == Union[int, str]

    - When comparing unions, the argument order is ignored, e.g.::

        Union[int, str] == Union[str, int]

    - You cannot subclass or instantiate a union.
    - You can use Optional[X] as a shorthand for Union[X, None].
    """
    if parameters == ():
        raise TypeError("Cannot take a Union of no types.")
    if not isinstance(parameters, tuple):
        parameters = (parameters,)
    msg = "Union[arg, ...]: each arg must be a type."
    parameters = tuple(_type_check(p, msg) for p in parameters)
    parameters = _remove_dups_flatten(parameters)
    if len(parameters) == 1:
        return parameters[0]
    return _UnionGenericAlias(self, parameters)

annotate_sparse_output_shape

annotate_sparse_output_shape(
    tensor: SparseTensor,
    shape: Union[Sequence[int], Tensor],
)

Annotates a sparse output to have a given dense_shape.

PARAMETER DESCRIPTION
tensor

An SparseTensor to be annotated.

TYPE: SparseTensor

shape

A dense_shape to annotate tensor with. Note that this shape does not include batch_size.

TYPE: Union[Sequence[int], Tensor]

Source code in tensorflow_transform/experimental/annotators.py
def annotate_sparse_output_shape(
    tensor: tf.SparseTensor, shape: Union[Sequence[int], tf.Tensor]):
  """Annotates a sparse output to have a given dense_shape.

  Args:
    tensor: An `SparseTensor` to be annotated.
    shape: A dense_shape to annotate `tensor` with. Note that this shape does
      not include batch_size.
  """
  if not isinstance(shape, tf.Tensor):
    if (tensor.shape.rank > 1 and tensor.shape.rank - 1 != len(shape)) or (
        tensor.shape.rank == 1 and len(shape) != 1):
      raise ValueError(
          f'Annotated shape {shape} was expected to have rank'
          f' {tensor.shape.rank - 1}')
    if not all(a is None or a <= b for a, b in zip(tensor.shape[1:], shape)):
      raise ValueError(
          f'Shape {shape} cannot contain annotated tensor {tensor}')
    shape = tf.convert_to_tensor(shape, dtype=tf.int64)
  elif shape.shape.rank > 1 or (
      shape.shape.rank == 1 and shape.shape[0] != tensor.shape.rank - 1):
    raise ValueError(
        f'Annotation shape has rank {shape.shape.rank} but expected to have'
        f' rank {tensor.shape.rank - 1}')
  if shape.shape.rank < 1:
    shape = tf.expand_dims(shape, -1)
  # There's currently no way to override SparseTensor.dense_shape directly,
  # unless composing and returning a new SparseTensor.
  tensor._dense_shape = tf.concat(  # pylint: disable=protected-access
      [tf.expand_dims(tensor.dense_shape[0], -1), tf.cast(shape, tf.int64)],
      axis=0)
  schema_inference.annotate_sparse_output_shape(tensor, shape)

annotate_true_sparse_output

annotate_true_sparse_output(tensor: SparseTensor)

Annotates a sparse output to be truely sparse and not varlen.

Source code in tensorflow_transform/experimental/annotators.py
def annotate_true_sparse_output(tensor: tf.SparseTensor):
  """Annotates a sparse output to be truely sparse and not varlen."""
  schema_inference.annotate_true_sparse_output(tensor)

approximate_vocabulary

approximate_vocabulary(
    x: TensorType,
    top_k: int,
    *,
    vocab_filename: Optional[str] = None,
    store_frequency: bool = False,
    reserved_tokens: Optional[
        Union[Sequence[str], Tensor]
    ] = None,
    weights: Optional[Tensor] = None,
    file_format: VocabularyFileFormatType = DEFAULT_VOCABULARY_FILE_FORMAT,
    name: Optional[str] = None,
) -> TemporaryAnalyzerOutputType

Computes the unique values of a Tensor over the whole dataset.

Approximately computes the unique values taken by x, which can be a Tensor, SparseTensor, or RaggedTensor of any size. The unique values will be aggregated over all dimensions of x and all instances.

This analyzer provides an approximate alternative to tft.vocabulary that can be more efficient with smaller top_k and/or smaller number of unique elements in x. As a rule of thumb, approximate_vocabulary becomes more efficient than tft.vocabulary if top_k or the number of unique elements in x is smaller than 2*10^5. Moreover, this analyzer is subject to combiner packing optimization that does not apply to tft.vocabulary. Caching is also more efficient with the approximate implementation since the filtration happens before writing out cache. Output artifact of approximate_vocabulary is consistent with tft.vocabulary and can be used in tft.apply_vocabulary mapper.

Implementation of this analyzer is based on the Misra-Gries algorithm [1]. It stores at most top_k elements with lower bound frequency estimates at a time. The algorithm keeps track of the approximation error delta such that for any item x with true frequency X:

        frequency[x] <= X <= frequency[x] + delta,
        delta <= (m - m') / (top_k + 1),

where m is the total frequency of the items in the dataset and m' is the sum of the lower bound estimates in frequency [2]. For datasets that are Zipfian distributed with parameter a, the algorithm provides an expected value of delta = m / (top_k ^ a) [3].

[1] https://www.cs.utexas.edu/users/misra/scannedPdf.dir/FindRepeatedElements.pdf [2] http://www.cohenwang.com/edith/bigdataclass2013/lectures/lecture1.pdf [3] http://dimacs.rutgers.edu/~graham/pubs/papers/countersj.pdf

In case file_format is 'text' and one of the tokens contains the '\n' or '\r' characters or is empty it will be discarded.

If an integer Tensor is provided, its semantic type should be categorical not a continuous/numeric, since computing a vocabulary over a continuous feature is not appropriate.

The unique values are sorted by decreasing frequency and then reverse lexicographical order (e.g. [('a', 5), ('c', 3), ('b', 3)]). This is true even if x is numerical dtype (e.g. [('3', 5), ('2', 3), ('111', 3)]).

PARAMETER DESCRIPTION
x

A categorical/discrete input Tensor, SparseTensor, or RaggedTensor with dtype tf.string or tf.int[8|16|32|64].

TYPE: TensorType

top_k

Limit the generated vocabulary to the first top_k elements. Note that if top_k is larger than the number of unique elements in x, then the result will be exact.

TYPE: int

vocab_filename

The file name for the vocabulary file. If None, a file name will be chosen based on the current scope. If not None, should be unique within a given preprocessing function. NOTE: To make your pipelines resilient to implementation details please set vocab_filename when you are using the vocab_filename on a downstream component.

TYPE: Optional[str] DEFAULT: None

store_frequency

If True, frequency of the words is stored in the vocabulary file. Each line in the file will be of the form 'frequency word'. NOTE: if this is True then the computed vocabulary cannot be used with tft.apply_vocabulary directly, since frequencies are added to the beginning of each row of the vocabulary, which the mapper will not ignore.

TYPE: bool DEFAULT: False

reserved_tokens

(Optional) A list of tokens that should appear in the vocabulary regardless of their appearance in the input. These tokens would maintain their order, and have a reserved spot at the beginning of the vocabulary. Note: this field has no affect on cache.

TYPE: Optional[Union[Sequence[str], Tensor]] DEFAULT: None

weights

(Optional) Weights Tensor for the vocabulary. It must have the same shape as x.

TYPE: Optional[Tensor] DEFAULT: None

file_format

(Optional) A str. The format of the resulting vocabulary file. Accepted formats are: 'tfrecord_gzip', 'text'. 'tfrecord_gzip' requires tensorflow>=2.4. The default value is 'text'.

TYPE: VocabularyFileFormatType DEFAULT: DEFAULT_VOCABULARY_FILE_FORMAT

name

(Optional) A name for this operation.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
TemporaryAnalyzerOutputType

The path name for the vocabulary file containing the unique values of x.

RAISES DESCRIPTION
ValueError

If top_k is negative. If file_format is not in the list of allowed formats. If x.dtype is not string or integral.

Source code in tensorflow_transform/experimental/analyzers.py
@common.log_api_use(common.ANALYZER_COLLECTION)
def approximate_vocabulary(
    x: common_types.TensorType,
    top_k: int,
    *,  # Force passing optional parameters by keys.
    vocab_filename: Optional[str] = None,
    store_frequency: bool = False,
    reserved_tokens: Optional[Union[Sequence[str], tf.Tensor]] = None,
    weights: Optional[tf.Tensor] = None,
    file_format: common_types.VocabularyFileFormatType = analyzers.DEFAULT_VOCABULARY_FILE_FORMAT,
    name: Optional[str] = None
) -> common_types.TemporaryAnalyzerOutputType:
  r"""Computes the unique values of a `Tensor` over the whole dataset.

  Approximately computes the unique values taken by `x`, which can be a
  `Tensor`, `SparseTensor`, or `RaggedTensor` of any size.  The unique values
  will be aggregated over all dimensions of `x` and all instances.

  This analyzer provides an approximate alternative to `tft.vocabulary` that can
  be more efficient with smaller `top_k` and/or smaller number of unique
  elements in `x`. As a rule of thumb, `approximate_vocabulary` becomes more
  efficient than `tft.vocabulary` if `top_k` or the number of unique elements in
  `x` is smaller than 2*10^5. Moreover, this analyzer is subject to combiner
  packing optimization that does not apply to `tft.vocabulary`. Caching is also
  more efficient with the approximate implementation since the filtration
  happens before writing out cache. Output artifact of `approximate_vocabulary`
  is consistent with `tft.vocabulary` and can be used in `tft.apply_vocabulary`
  mapper.

  Implementation of this analyzer is based on the Misra-Gries algorithm [1]. It
  stores at most `top_k` elements with lower bound frequency estimates at a
  time. The algorithm keeps track of the approximation error `delta` such that
  for any item x with true frequency X:

              frequency[x] <= X <= frequency[x] + delta,
              delta <= (m - m') / (top_k + 1),

  where m is the total frequency of the items in the dataset and m' is the sum
  of the lower bound estimates in `frequency` [2]. For datasets that are Zipfian
  distributed with parameter `a`, the algorithm provides an expected value of
  delta = m / (top_k ^ a) [3].

  [1]
  https://www.cs.utexas.edu/users/misra/scannedPdf.dir/FindRepeatedElements.pdf
  [2] http://www.cohenwang.com/edith/bigdataclass2013/lectures/lecture1.pdf
  [3] http://dimacs.rutgers.edu/~graham/pubs/papers/countersj.pdf

  In case `file_format` is 'text' and one of the tokens contains the '\n' or
  '\r' characters or is empty it will be discarded.

  If an integer `Tensor` is provided, its semantic type should be categorical
  not a continuous/numeric, since computing a vocabulary over a continuous
  feature is not appropriate.

  The unique values are sorted by decreasing frequency and then reverse
  lexicographical order (e.g. [('a', 5), ('c', 3), ('b', 3)]). This is true even
  if `x` is numerical dtype (e.g. [('3', 5), ('2', 3), ('111', 3)]).

  Args:
    x: A categorical/discrete input `Tensor`, `SparseTensor`, or `RaggedTensor`
      with dtype tf.string or tf.int[8|16|32|64].
    top_k: Limit the generated vocabulary to the first `top_k` elements. Note
      that if `top_k` is larger than the number of unique elements in `x`, then
      the result will be exact.
    vocab_filename: The file name for the vocabulary file. If None, a file name
      will be chosen based on the current scope. If not None, should be unique
      within a given preprocessing function. NOTE: To make your pipelines
      resilient to implementation details please set `vocab_filename` when you
      are using the vocab_filename on a downstream component.
    store_frequency: If True, frequency of the words is stored in the vocabulary
      file. Each line in the file will be of the form 'frequency word'. NOTE: if
      this is True then the computed vocabulary cannot be used with
      `tft.apply_vocabulary` directly, since frequencies are added to the
      beginning of each row of the vocabulary, which the mapper will not ignore.
    reserved_tokens: (Optional) A list of tokens that should appear in the
      vocabulary regardless of their appearance in the input. These tokens would
      maintain their order, and have a reserved spot at the beginning of the
      vocabulary. Note: this field has no affect on cache.
    weights: (Optional) Weights `Tensor` for the vocabulary. It must have the
      same shape as x.
    file_format: (Optional) A str. The format of the resulting vocabulary file.
      Accepted formats are: 'tfrecord_gzip', 'text'. 'tfrecord_gzip' requires
      tensorflow>=2.4. The default value is 'text'.
    name: (Optional) A name for this operation.

  Returns:
    The path name for the vocabulary file containing the unique values of `x`.

  Raises:
    ValueError: If `top_k` is negative.
      If `file_format` is not in the list of allowed formats.
      If x.dtype is not string or integral.
  """

  if top_k <= 0:
    raise ValueError('top_k must be positive, but got: %r' % top_k)
  elif top_k > analyzers.LARGE_VOCAB_TOP_K:
    raise ValueError('Provided top_k threshold is too large for the '
                     'approximate calculation: if the expected number of '
                     'unique elements is larger than top_k, tft.vocabulary may '
                     'be more efficient. Maximum allowed top_k is {}'.format(
                         analyzers.LARGE_VOCAB_TOP_K))

  if file_format not in analyzers.ALLOWED_VOCABULARY_FILE_FORMATS:
    raise ValueError(
        '"{}" is not an accepted file_format. It should be one of: {}'.format(
            file_format, analyzers.ALLOWED_VOCABULARY_FILE_FORMATS))

  if x.dtype != tf.string and not x.dtype.is_integer:
    raise ValueError('expected tf.string or integer but got %r' % x.dtype)

  with tf.compat.v1.name_scope(name, 'approximate_vocabulary'):
    vocabulary_key = vocab_filename
    vocab_filename = _get_approx_vocab_filename(vocab_filename, store_frequency)
    analyzer_inputs = _get_approximate_vocabulary_analyzer_inputs(
        x=x, file_format=file_format, weights=weights)
    return _approximate_vocabulary_analyzer_nodes(
        analyzer_inputs=analyzer_inputs,
        input_dtype=x.dtype.name,
        vocab_filename=vocab_filename,
        top_k=top_k,
        store_frequency=store_frequency,
        reserved_tokens=reserved_tokens,
        file_format=file_format,
        vocabulary_key=vocabulary_key,
    )

compute_and_apply_approximate_vocabulary

compute_and_apply_approximate_vocabulary(
    x: ConsistentTensorType,
    *,
    default_value: Any = -1,
    top_k: Optional[int] = None,
    num_oov_buckets: int = 0,
    vocab_filename: Optional[str] = None,
    weights: Optional[Tensor] = None,
    file_format: VocabularyFileFormatType = DEFAULT_VOCABULARY_FILE_FORMAT,
    store_frequency: Optional[bool] = False,
    reserved_tokens: Optional[
        Union[Sequence[str], Tensor]
    ] = None,
    name: Optional[str] = None,
) -> ConsistentTensorType

Generates an approximate vocabulary for x and maps it to an integer.

PARAMETER DESCRIPTION
x

A Tensor, SparseTensor, or RaggedTensor of type tf.string or tf.int[8|16|32|64].

TYPE: ConsistentTensorType

default_value

The value to use for out-of-vocabulary values, unless 'num_oov_buckets' is greater than zero.

TYPE: Any DEFAULT: -1

top_k

Limit the generated vocabulary to the first top_k elements. If set to None, the full vocabulary is generated.

TYPE: Optional[int] DEFAULT: None

num_oov_buckets

Any lookup of an out-of-vocabulary token will return a bucket ID based on its hash if num_oov_buckets is greater than zero. Otherwise it is assigned the default_value.

TYPE: int DEFAULT: 0

vocab_filename

The file name for the vocabulary file. If None, a name based on the scope name in the context of this graph will be used as the file name. If not None, should be unique within a given preprocessing function. NOTE in order to make your pipelines resilient to implementation details please set vocab_filename when you are using the vocab_filename on a downstream component.

TYPE: Optional[str] DEFAULT: None

weights

(Optional) Weights Tensor for the vocabulary. It must have the same shape as x.

TYPE: Optional[Tensor] DEFAULT: None

file_format

(Optional) A str. The format of the resulting vocabulary file. Accepted formats are: 'tfrecord_gzip', 'text'. 'tfrecord_gzip' requires tensorflow>=2.4. The default value is 'text'.

TYPE: VocabularyFileFormatType DEFAULT: DEFAULT_VOCABULARY_FILE_FORMAT

store_frequency

If True, frequency of the words is stored in the vocabulary file. In the case labels are provided, the mutual information is stored in the file instead. Each line in the file will be of the form 'frequency word'. NOTE: if True and text_format is 'text' then spaces will be replaced to avoid information loss.

TYPE: Optional[bool] DEFAULT: False

reserved_tokens

(Optional) A list of tokens that should appear in the vocabulary regardless of their appearance in the input. These tokens would maintain their order, and have a reserved spot at the beginning of the vocabulary. Note: this field has no affect on cache.

TYPE: Optional[Union[Sequence[str], Tensor]] DEFAULT: None

name

(Optional) A name for this operation.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
ConsistentTensorType

A Tensor, SparseTensor, or RaggedTensor where each string value is

ConsistentTensorType

mapped to an integer. Each unique string value that appears in the

ConsistentTensorType

vocabulary is mapped to a different integer and integers are consecutive

ConsistentTensorType

starting from zero. String value not in the vocabulary is assigned

ConsistentTensorType

default_value. Alternatively, if num_oov_buckets is specified, out of

ConsistentTensorType

vocabulary strings are hashed to values in

ConsistentTensorType

[vocab_size, vocab_size + num_oov_buckets) for an overall range of

ConsistentTensorType

[0, vocab_size + num_oov_buckets).

RAISES DESCRIPTION
ValueError

If top_k is negative. If file_format is not in the list of allowed formats. If x.dtype is not string or integral.

Source code in tensorflow_transform/experimental/mappers.py
@common.log_api_use(common.MAPPER_COLLECTION)
def compute_and_apply_approximate_vocabulary(
    x: common_types.ConsistentTensorType,
    *,  # Force passing optional parameters by keys.
    default_value: Any = -1,
    top_k: Optional[int] = None,
    num_oov_buckets: int = 0,
    vocab_filename: Optional[str] = None,
    weights: Optional[tf.Tensor] = None,
    file_format: common_types.VocabularyFileFormatType = analyzers.DEFAULT_VOCABULARY_FILE_FORMAT,
    store_frequency: Optional[bool] = False,
    reserved_tokens: Optional[Union[Sequence[str], tf.Tensor]] = None,
    name: Optional[str] = None,
) -> common_types.ConsistentTensorType:
  """Generates an approximate vocabulary for `x` and maps it to an integer.

  Args:
    x: A `Tensor`, `SparseTensor`, or `RaggedTensor` of type tf.string or
      tf.int[8|16|32|64].
    default_value: The value to use for out-of-vocabulary values, unless
      'num_oov_buckets' is greater than zero.
    top_k: Limit the generated vocabulary to the first `top_k` elements. If set
      to None, the full vocabulary is generated.
    num_oov_buckets:  Any lookup of an out-of-vocabulary token will return a
      bucket ID based on its hash if `num_oov_buckets` is greater than zero.
      Otherwise it is assigned the `default_value`.
    vocab_filename: The file name for the vocabulary file. If None, a name based
      on the scope name in the context of this graph will be used as the file
      name. If not None, should be unique within a given preprocessing function.
      NOTE in order to make your pipelines resilient to implementation details
      please set `vocab_filename` when you are using the vocab_filename on a
      downstream component.
    weights: (Optional) Weights `Tensor` for the vocabulary. It must have the
      same shape as x.
    file_format: (Optional) A str. The format of the resulting vocabulary file.
      Accepted formats are: 'tfrecord_gzip', 'text'. 'tfrecord_gzip' requires
      tensorflow>=2.4. The default value is 'text'.
    store_frequency: If True, frequency of the words is stored in the vocabulary
      file. In the case labels are provided, the mutual information is stored in
      the file instead. Each line in the file will be of the form 'frequency
      word'. NOTE: if True and text_format is 'text' then spaces will be
      replaced to avoid information loss.
    reserved_tokens: (Optional) A list of tokens that should appear in the
      vocabulary regardless of their appearance in the input. These tokens would
      maintain their order, and have a reserved spot at the beginning of the
      vocabulary. Note: this field has no affect on cache.
    name: (Optional) A name for this operation.

  Returns:
    A `Tensor`, `SparseTensor`, or `RaggedTensor` where each string value is
    mapped to an integer. Each unique string value that appears in the
    vocabulary is mapped to a different integer and integers are consecutive
    starting from zero. String value not in the vocabulary is assigned
    `default_value`. Alternatively, if `num_oov_buckets` is specified, out of
    vocabulary strings are hashed to values in
    [vocab_size, vocab_size + num_oov_buckets) for an overall range of
    [0, vocab_size + num_oov_buckets).

  Raises:
    ValueError: If `top_k` is negative.
      If `file_format` is not in the list of allowed formats.
      If x.dtype is not string or integral.
  """
  with tf.compat.v1.name_scope(name,
                               'compute_and_apply_approximate_vocabulary'):
    if store_frequency and file_format == 'text':
      x = tf_utils.maybe_format_vocabulary_input(x)
    deferred_vocab_and_filename = experimental_analyzers.approximate_vocabulary(
        x=x,
        top_k=top_k,
        vocab_filename=vocab_filename,
        weights=weights,
        file_format=file_format,
        store_frequency=store_frequency,
        reserved_tokens=reserved_tokens,
        name=name,
    )
    return mappers._apply_vocabulary_internal(  # pylint: disable=protected-access
        x,
        deferred_vocab_and_filename,
        default_value,
        num_oov_buckets,
        lookup_fn=None,
        file_format=file_format,
        store_frequency=store_frequency,
        name=None,
    )

document_frequency

document_frequency(
    x: SparseTensor,
    vocab_size: int,
    name: Optional[str] = None,
) -> SparseTensor

Maps the terms in x to their document frequency in the same order.

The document frequency of a term is the number of documents that contain the term in the entire dataset. Each unique vocab term has a unique document frequency.

Example usage:

def preprocessing_fn(inputs): ... integerized = tft.compute_and_apply_vocabulary(inputs['x']) ... vocab_size = tft.get_num_buckets_for_transformed_feature(integerized) ... return { ... 'df': tft.experimental.document_frequency(integerized, vocab_size), ... 'integerized': integerized, ... } raw_data = [dict(x=["I", "like", "pie", "pie", "pie"]), ... dict(x=["yum", "yum", "pie"])] feature_spec = dict(x=tf.io.VarLenFeature(tf.string)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... transformed_dataset, transform_fn = ( ... (raw_data, raw_data_metadata) ... | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) transformed_data, transformed_metadata = transformed_dataset transformed_data [{'df': array([1, 1, 2, 2, 2]), 'integerized': array([3, 2, 0, 0, 0])}, {'df': array([1, 1, 2]), 'integerized': array([1, 1, 0])}]

example strings: [["I", "like", "pie", "pie", "pie"], ["yum", "yum", "pie]]
in: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                          [1, 0], [1, 1], [1, 2]],
                 values=[1, 2, 0, 0, 0, 3, 3, 0])
out: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                          [1, 0], [1, 1], [1, 2]],
                 values=[1, 1, 2, 2, 2, 1, 1, 2])
PARAMETER DESCRIPTION
x

A 2D SparseTensor representing int64 values (most likely that are the result of calling compute_and_apply_vocabulary on a tokenized string).

TYPE: SparseTensor

vocab_size

An int - the count of vocab used to turn the string into int64s including any OOV buckets.

TYPE: int

name

(Optional) A name for this operation.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SparseTensor

SparseTensors with indices [index_in_batch, index_in_local_sequence] and

SparseTensor

values document_frequency. Same shape as the input x.

Source code in tensorflow_transform/experimental/mappers.py
@common.log_api_use(common.MAPPER_COLLECTION)
def document_frequency(x: tf.SparseTensor,
                       vocab_size: int,
                       name: Optional[str] = None) -> tf.SparseTensor:
  """Maps the terms in x to their document frequency in the same order.

  The document frequency of a term is the number of documents that contain the
  term in the entire dataset. Each unique vocab term has a unique document
  frequency.

  Example usage:

  >>> def preprocessing_fn(inputs):
  ...   integerized = tft.compute_and_apply_vocabulary(inputs['x'])
  ...   vocab_size = tft.get_num_buckets_for_transformed_feature(integerized)
  ...   return {
  ...      'df': tft.experimental.document_frequency(integerized, vocab_size),
  ...      'integerized': integerized,
  ...   }
  >>> raw_data = [dict(x=["I", "like", "pie", "pie", "pie"]),
  ...             dict(x=["yum", "yum", "pie"])]
  >>> feature_spec = dict(x=tf.io.VarLenFeature(tf.string))
  >>> raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
  >>> with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  ...   transformed_dataset, transform_fn = (
  ...       (raw_data, raw_data_metadata)
  ...       | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
  >>> transformed_data, transformed_metadata = transformed_dataset
  >>> transformed_data
  [{'df': array([1, 1, 2, 2, 2]), 'integerized': array([3, 2, 0, 0, 0])},
   {'df': array([1, 1, 2]), 'integerized': array([1, 1, 0])}]

    ```
    example strings: [["I", "like", "pie", "pie", "pie"], ["yum", "yum", "pie]]
    in: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                              [1, 0], [1, 1], [1, 2]],
                     values=[1, 2, 0, 0, 0, 3, 3, 0])
    out: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                              [1, 0], [1, 1], [1, 2]],
                     values=[1, 1, 2, 2, 2, 1, 1, 2])
    ```

  Args:
    x: A 2D `SparseTensor` representing int64 values (most likely that are the
      result of calling `compute_and_apply_vocabulary` on a tokenized string).
    vocab_size: An int - the count of vocab used to turn the string into int64s
      including any OOV buckets.
    name: (Optional) A name for this operation.

  Returns:
    `SparseTensor`s with indices [index_in_batch, index_in_local_sequence] and
    values document_frequency. Same shape as the input `x`.

  Raises:
    ValueError if `x` does not have 2 dimensions.
  """
  if x.get_shape().ndims != 2:
    raise ValueError('tft.tfidf requires a 2D SparseTensor input. '
                     'Input had {} dimensions.'.format(x.get_shape().ndims))

  with tf.compat.v1.name_scope(name, 'df'):
    cleaned_input = tf_utils.to_vocab_range(x, vocab_size)

    # all_df is a (1, vocab_size)-shaped sparse tensor storing number of docs
    # containing each term in the entire dataset.
    all_df = _to_global_document_frequency(cleaned_input, vocab_size)

    # df_values is a batch_size * sequence_size sparse tensor storing the
    # document frequency of each term, following the same order as the terms
    # within each document.
    df_values = tf.gather(tf.squeeze(all_df), cleaned_input.values)

    return tf.SparseTensor(
        indices=cleaned_input.indices,
        values=df_values,
        dense_shape=cleaned_input.dense_shape)

get_vocabulary_size_by_name

get_vocabulary_size_by_name(vocab_filename: str) -> Tensor

Gets the size of a vocabulary created using tft.vocabulary.

This is the number of keys in the output vocab_filename and does not include number of OOV buckets.

PARAMETER DESCRIPTION
vocab_filename

The name of the vocabulary file whose size is to be retrieved.

TYPE: str

Example:

def preprocessing_fn(inputs): ... num_oov_buckets = 1 ... x_int = tft.compute_and_apply_vocabulary( ... inputs['x'], vocab_filename='my_vocab', ... num_oov_buckets=num_oov_buckets) ... depth = ( ... tft.experimental.get_vocabulary_size_by_name('my_vocab') + ... num_oov_buckets) ... x_encoded = tf.one_hot( ... x_int, depth=tf.cast(depth, tf.int32), dtype=tf.int64) ... return {'x_encoded': x_encoded} raw_data = [dict(x='foo'), dict(x='foo'), dict(x='bar')] feature_spec = dict(x=tf.io.FixedLenFeature([], tf.string)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... transformed_dataset, transform_fn = ( ... (raw_data, raw_data_metadata) ... | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) transformed_data, transformed_metadata = transformed_dataset transformed_data [{'x_encoded': array([1, 0, 0])}, {'x_encoded': array([1, 0, 0])}, {'x_encoded': array([0, 1, 0])}]

RETURNS DESCRIPTION
Tensor

An integer tensor containing the size of the requested vocabulary.

RAISES DESCRIPTION
ValueError

if no vocabulary size found for the given vocab_filename.

Source code in tensorflow_transform/experimental/annotators.py
def get_vocabulary_size_by_name(vocab_filename: str) -> tf.Tensor:
  # pyformat: disable
  """Gets the size of a vocabulary created using `tft.vocabulary`.

  This is the number of keys in the output `vocab_filename` and does not include
  number of OOV buckets.

  Args:
    vocab_filename: The name of the vocabulary file whose size is to be
      retrieved.

  Example:

  >>> def preprocessing_fn(inputs):
  ...   num_oov_buckets = 1
  ...   x_int = tft.compute_and_apply_vocabulary(
  ...     inputs['x'], vocab_filename='my_vocab',
  ...     num_oov_buckets=num_oov_buckets)
  ...   depth = (
  ...     tft.experimental.get_vocabulary_size_by_name('my_vocab') +
  ...     num_oov_buckets)
  ...   x_encoded = tf.one_hot(
  ...     x_int, depth=tf.cast(depth, tf.int32), dtype=tf.int64)
  ...   return {'x_encoded': x_encoded}
  >>> raw_data = [dict(x='foo'), dict(x='foo'), dict(x='bar')]
  >>> feature_spec = dict(x=tf.io.FixedLenFeature([], tf.string))
  >>> raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
  >>> with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  ...   transformed_dataset, transform_fn = (
  ...       (raw_data, raw_data_metadata)
  ...       | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
  >>> transformed_data, transformed_metadata = transformed_dataset
  >>> transformed_data
  [{'x_encoded': array([1, 0, 0])}, {'x_encoded': array([1, 0, 0])},
  {'x_encoded': array([0, 1, 0])}]

  Returns:
    An integer tensor containing the size of the requested vocabulary.

  Raises:
    ValueError: if no vocabulary size found for the given `vocab_filename`.

  """
  # pyformat: enable
  vocabulary_sizes_coll = ops.get_default_graph().get_collection(
      annotators.VOCABULARY_SIZE_BY_NAME_COLLECTION)

  result = dict(vocabulary_sizes_coll).get(vocab_filename, None)

  if result is None:
    raise ValueError(
        f'Vocabulary size not found for {vocab_filename}. If this vocabulary '
        'was created using `tft.vocabulary`, this should be the same as the '
        '`vocab_filename` argument passed to it.')

  return result

idf

idf(
    x: SparseTensor,
    vocab_size: int,
    smooth: bool = True,
    add_baseline: bool = True,
    name: Optional[str] = None,
) -> SparseTensor

Maps the terms in x to their inverse document frequency in the same order.

The inverse document frequency of a term, by default, is calculated as 1 + log ((corpus size + 1) / (count of documents containing term + 1)).

Example usage:

def preprocessing_fn(inputs): ... integerized = tft.compute_and_apply_vocabulary(inputs['x']) ... vocab_size = tft.get_num_buckets_for_transformed_feature(integerized) ... idf_weights = tft.experimental.idf(integerized, vocab_size) ... return { ... 'idf': idf_weights, ... 'integerized': integerized, ... } raw_data = [dict(x=["I", "like", "pie", "pie", "pie"]), ... dict(x=["yum", "yum", "pie"])] feature_spec = dict(x=tf.io.VarLenFeature(tf.string)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... transformed_dataset, transform_fn = ( ... (raw_data, raw_data_metadata) ... | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) transformed_data, transformed_metadata = transformed_dataset

1 + log(3/2) = 1.4054651

transformed_data [{'idf': array([1.4054651, 1.4054651, 1., 1., 1.], dtype=float32), 'integerized': array([3, 2, 0, 0, 0])}, {'idf': array([1.4054651, 1.4054651, 1.], dtype=float32), 'integerized': array([1, 1, 0])}]

example strings: [["I", "like", "pie", "pie", "pie"], ["yum", "yum", "pie]]
in: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                          [1, 0], [1, 1], [1, 2]],
                 values=[1, 2, 0, 0, 0, 3, 3, 0])
out: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                          [1, 0], [1, 1], [1, 2]],
                 values=[1 + log(3/2), 1 + log(3/2), 1, 1, 1,
                         1 + log(3/2), 1 + log(3/2), 1])
PARAMETER DESCRIPTION
x

A 2D SparseTensor representing int64 values (most likely that are the result of calling compute_and_apply_vocabulary on a tokenized string).

TYPE: SparseTensor

vocab_size

An int - the count of vocab used to turn the string into int64s including any OOV buckets.

TYPE: int

smooth

A bool indicating if the inverse document frequency should be smoothed. If True, which is the default, then the idf is calculated as 1 + log((corpus size + 1) / (document frequency of term + 1)). Otherwise, the idf is 1 + log((corpus size) / (document frequency of term)), which could result in a division by zero error.

TYPE: bool DEFAULT: True

add_baseline

A bool indicating if the inverse document frequency should be added with a constant baseline 1.0. If True, which is the default, then the idf is calculated as 1 + log(). Otherwise, the idf is log() without the constant 1 baseline. Keeping the baseline reduces the discrepancy in idf between commonly seen terms and rare terms.

TYPE: bool DEFAULT: True

name

(Optional) A name for this operation.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SparseTensor

SparseTensors with indices [index_in_batch, index_in_local_sequence] and

SparseTensor

values inverse document frequency. Same shape as the input x.

Source code in tensorflow_transform/experimental/mappers.py
@common.log_api_use(common.MAPPER_COLLECTION)
def idf(x: tf.SparseTensor,
        vocab_size: int,
        smooth: bool = True,
        add_baseline: bool = True,
        name: Optional[str] = None) -> tf.SparseTensor:
  """Maps the terms in x to their inverse document frequency in the same order.

  The inverse document frequency of a term, by default, is calculated as
  1 + log ((corpus size + 1) / (count of documents containing term + 1)).

  Example usage:

  >>> def preprocessing_fn(inputs):
  ...   integerized = tft.compute_and_apply_vocabulary(inputs['x'])
  ...   vocab_size = tft.get_num_buckets_for_transformed_feature(integerized)
  ...   idf_weights = tft.experimental.idf(integerized, vocab_size)
  ...   return {
  ...      'idf': idf_weights,
  ...      'integerized': integerized,
  ...   }
  >>> raw_data = [dict(x=["I", "like", "pie", "pie", "pie"]),
  ...             dict(x=["yum", "yum", "pie"])]
  >>> feature_spec = dict(x=tf.io.VarLenFeature(tf.string))
  >>> raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
  >>> with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  ...   transformed_dataset, transform_fn = (
  ...       (raw_data, raw_data_metadata)
  ...       | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
  >>> transformed_data, transformed_metadata = transformed_dataset
  >>> # 1 + log(3/2) = 1.4054651
  >>> transformed_data
  [{'idf': array([1.4054651, 1.4054651, 1., 1., 1.], dtype=float32),
    'integerized': array([3, 2, 0, 0, 0])},
   {'idf': array([1.4054651, 1.4054651, 1.], dtype=float32),
    'integerized': array([1, 1, 0])}]

    ```
    example strings: [["I", "like", "pie", "pie", "pie"], ["yum", "yum", "pie]]
    in: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                              [1, 0], [1, 1], [1, 2]],
                     values=[1, 2, 0, 0, 0, 3, 3, 0])
    out: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
                              [1, 0], [1, 1], [1, 2]],
                     values=[1 + log(3/2), 1 + log(3/2), 1, 1, 1,
                             1 + log(3/2), 1 + log(3/2), 1])
    ```

  Args:
    x: A 2D `SparseTensor` representing int64 values (most likely that are the
      result of calling `compute_and_apply_vocabulary` on a tokenized string).
    vocab_size: An int - the count of vocab used to turn the string into int64s
      including any OOV buckets.
    smooth: A bool indicating if the inverse document frequency should be
      smoothed. If True, which is the default, then the idf is calculated as 1 +
      log((corpus size + 1) / (document frequency of term + 1)). Otherwise, the
      idf is 1 + log((corpus size) / (document frequency of term)), which could
      result in a division by zero error.
    add_baseline: A bool indicating if the inverse document frequency should be
      added with a constant baseline 1.0. If True, which is the default, then
      the idf is calculated as 1 + log(*). Otherwise, the idf is log(*) without
      the constant 1 baseline. Keeping the baseline reduces the discrepancy in
      idf between commonly seen terms and rare terms.
    name: (Optional) A name for this operation.

  Returns:
    `SparseTensor`s with indices [index_in_batch, index_in_local_sequence] and
    values inverse document frequency. Same shape as the input `x`.

  Raises:
    ValueError if `x` does not have 2 dimensions.
  """
  if x.get_shape().ndims != 2:
    raise ValueError('tft.tfidf requires a 2D SparseTensor input. '
                     'Input had {} dimensions.'.format(x.get_shape().ndims))

  with tf.compat.v1.name_scope(name, 'idf'):
    cleaned_input = tf_utils.to_vocab_range(x, vocab_size)

    batch_sizes = tf.expand_dims(tf.shape(input=cleaned_input)[0], 0)

    # all_df is a (1, vocab_size)-shaped tensor storing number of documents
    # containing each term in the entire dataset.
    all_df = _to_global_document_frequency(cleaned_input, vocab_size)

    # all_idf is a (1, vocab_size)-shaped tensor storing the inverse document
    # frequency of each term in the entire dataset.
    all_idf = tf_utils.document_frequency_to_idf(
        all_df,
        analyzers.sum(batch_sizes),
        smooth=smooth,
        add_baseline=add_baseline)

    # idf_values is a batch_size * sequence_size sparse tensor storing the
    # inverse document frequency of each term, following the same order as the
    # terms within each document.
    idf_values = tf.gather(
        tf.reshape(all_idf, [-1]), tf.cast(cleaned_input.values, dtype=tf.int64)
    )

    return tf.SparseTensor(
        indices=cleaned_input.indices,
        values=idf_values,
        dense_shape=cleaned_input.dense_shape)

ptransform_analyzer

ptransform_analyzer(
    inputs: Collection[Tensor],
    ptransform: Union[
        _BeamPTransform, CacheablePTransformAnalyzer
    ],
    output_dtypes: Collection[DType],
    output_shapes: Collection[List[int]],
    output_asset_default_values: Optional[
        Collection[Optional[bytes]]
    ] = None,
    name: Optional[str] = None,
)

Applies a user-provided PTransform over the whole dataset.

WARNING: This is experimental.

Note that in order to have asset files copied correctly, any outputs that represent asset filenames must be added to the tf.GraphKeys.ASSET_FILEPATHS collection by the caller if using Transform's APIs in compat v1 mode.

Example:

class MeanPerKey(beam.PTransform): ... def expand(self, pcoll: beam.PCollection[Tuple[np.ndarray, np.ndarray]]) -> Tuple[beam.PCollection[np.ndarray], beam.PCollection[np.ndarray]]: ... def extract_output(key_value_pairs): ... keys, values = zip(key_value_pairs) ... return [beam.TaggedOutput('keys', keys), ... beam.TaggedOutput('values', values)] ... return tuple( ... pcoll ... | 'ZipAndFlatten' >> beam.FlatMap(lambda batches: list(zip(batches))) ... | 'MeanPerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn()) ... | 'ToList' >> beam.combiners.ToList() ... | 'Extract' >> beam.FlatMap(extract_output).with_outputs( ... 'keys', 'values')) def preprocessing_fn(inputs): ... outputs = tft.experimental.ptransform_analyzer( ... inputs=[inputs['s'], inputs['x']], ... ptransform=MeanPerKey(), ... output_dtypes=[tf.string, tf.float32], ... output_shapes=[[2], [2]]) ... (keys, means) = outputs ... mean_a = tf.reshape(tf.gather(means, tf.where(keys == 'a')), []) ... return { 'x/mean_a': inputs['x'] / mean_a } raw_data = [dict(x=1, s='a'), dict(x=8, s='b'), dict(x=3, s='a')] feature_spec = dict( ... x=tf.io.FixedLenFeature([], tf.float32), ... s=tf.io.FixedLenFeature([], tf.string)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... transformed_dataset, transform_fn = ( ... (raw_data, raw_data_metadata) ... | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) transformed_data, transformed_metadata = transformed_dataset transformed_data [{'x/mean_a': 0.5}, {'x/mean_a': 4.0}, {'x/mean_a': 1.5}]

PARAMETER DESCRIPTION
inputs

An ordered collection of input Tensors.

TYPE: Collection[Tensor]

ptransform

A Beam PTransform that accepts a Beam PCollection where each element is a tuple of ndarrays. Each element in the tuple contains a batch of values for the corresponding input tensor of the analyzer and maintain their shapes and dtypes. It returns a PCollection, or a tuple of PCollections, each containing a single element which is an ndarray or a list of primitive types. The contents of these output PCollections must be consistent with the given values of output_dtypes and output_shapes. It may inherit from tft_beam.experimental.PTransformAnalyzer if access to a temp base directory is needed. Alternatively, it could be an instance of tft.experimental.CacheablePTransformAnalyzer in order to enable cache for this analyzer, when analyzer cache is enabled for this pipeline.

TYPE: Union[_BeamPTransform, CacheablePTransformAnalyzer]

output_dtypes

An ordered collection of TensorFlow dtypes of the output of the analyzer.

TYPE: Collection[DType]

output_shapes

An ordered collection of shapes of the output of the analyzer. Must have the same length as output_dtypes.

TYPE: Collection[List[int]]

output_asset_default_values

(Optional) An ordered collection of optional bytes aligned with output_dtypes/output_shapes. Every item in this collection which is not None indicates that the output is a TF asset path, and its value would be used as the default value of this asset file prior to analysis.

TYPE: Optional[Collection[Optional[bytes]]] DEFAULT: None

name

(Optional) Similar to a TF op name. Used to define a unique scope for this analyzer, which can be used for debugging info.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION

A list of output Tensors. These will have dtype and shape as specified by output_dtypes and output_shapes.

RAISES DESCRIPTION
ValueError

If output_dtypes and output_shapes have different lengths.

Source code in tensorflow_transform/experimental/analyzers.py
@common.log_api_use(common.ANALYZER_COLLECTION)
def ptransform_analyzer(
    inputs: Collection[tf.Tensor],
    ptransform: Union[_BeamPTransform, CacheablePTransformAnalyzer],
    output_dtypes: Collection[tf.dtypes.DType],
    output_shapes: Collection[List[int]],
    output_asset_default_values: Optional[Collection[Optional[bytes]]] = None,
    name: Optional[str] = None):
  # pylint: disable=line-too-long
  """Applies a user-provided PTransform over the whole dataset.

  WARNING: This is experimental.

  Note that in order to have asset files copied correctly, any outputs that
  represent asset filenames must be added to the `tf.GraphKeys.ASSET_FILEPATHS`
  collection by the caller if using Transform's APIs in compat v1 mode.

  Example:

  >>> class MeanPerKey(beam.PTransform):
  ...   def expand(self, pcoll: beam.PCollection[Tuple[np.ndarray, np.ndarray]]) -> Tuple[beam.PCollection[np.ndarray], beam.PCollection[np.ndarray]]:
  ...     def extract_output(key_value_pairs):
  ...       keys, values = zip(*key_value_pairs)
  ...       return [beam.TaggedOutput('keys', keys),
  ...               beam.TaggedOutput('values', values)]
  ...     return tuple(
  ...         pcoll
  ...         | 'ZipAndFlatten' >> beam.FlatMap(lambda batches: list(zip(*batches)))
  ...         | 'MeanPerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
  ...         | 'ToList' >> beam.combiners.ToList()
  ...         | 'Extract' >> beam.FlatMap(extract_output).with_outputs(
  ...             'keys', 'values'))
  >>> def preprocessing_fn(inputs):
  ...   outputs = tft.experimental.ptransform_analyzer(
  ...       inputs=[inputs['s'], inputs['x']],
  ...       ptransform=MeanPerKey(),
  ...       output_dtypes=[tf.string, tf.float32],
  ...       output_shapes=[[2], [2]])
  ...   (keys, means) = outputs
  ...   mean_a = tf.reshape(tf.gather(means, tf.where(keys == 'a')), [])
  ...   return { 'x/mean_a': inputs['x'] / mean_a }
  >>> raw_data = [dict(x=1, s='a'), dict(x=8, s='b'), dict(x=3, s='a')]
  >>> feature_spec = dict(
  ...     x=tf.io.FixedLenFeature([], tf.float32),
  ...     s=tf.io.FixedLenFeature([], tf.string))
  >>> raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
  >>> with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  ...   transformed_dataset, transform_fn = (
  ...       (raw_data, raw_data_metadata)
  ...       | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
  >>> transformed_data, transformed_metadata = transformed_dataset
  >>> transformed_data
  [{'x/mean_a': 0.5}, {'x/mean_a': 4.0}, {'x/mean_a': 1.5}]

  Args:
    inputs: An ordered collection of input `Tensor`s.
    ptransform: A Beam PTransform that accepts a Beam PCollection where each
      element is a tuple of `ndarray`s.  Each element in the tuple contains a
      batch of values for the corresponding input tensor of the analyzer and
      maintain their shapes and dtypes.
      It returns a `PCollection`, or a tuple of `PCollections`, each containing
      a single element which is an `ndarray` or a list of primitive types. The
      contents of these output `PCollection`s must be consistent with the given
      values of `output_dtypes` and `output_shapes`.
      It may inherit from `tft_beam.experimental.PTransformAnalyzer` if access
      to a temp base directory is needed.
      Alternatively, it could be an instance of
      `tft.experimental.CacheablePTransformAnalyzer` in order to enable cache
      for this analyzer, when analyzer cache is enabled for this pipeline.
    output_dtypes: An ordered collection of TensorFlow dtypes of the output of
      the analyzer.
    output_shapes: An ordered collection of shapes of the output of the
      analyzer. Must have the same length as output_dtypes.
    output_asset_default_values: (Optional) An ordered collection of optional
      `bytes` aligned with output_dtypes/output_shapes. Every item in this
      collection which is not `None` indicates that the output is a TF asset
      path, and its value would be used as the default value of this asset file
      prior to analysis.
    name: (Optional) Similar to a TF op name.  Used to define a unique scope for
      this analyzer, which can be used for debugging info.

  Returns:
    A list of output `Tensor`s.  These will have `dtype` and `shape` as
      specified by `output_dtypes` and `output_shapes`.

  Raises:
    ValueError: If output_dtypes and output_shapes have different lengths.
  """
  # pylint: enable=line-too-long
  if len(output_dtypes) != len(output_shapes):
    raise ValueError('output_dtypes ({}) and output_shapes ({}) had different'
                     ' lengths'.format(output_dtypes, output_shapes))
  if output_asset_default_values is not None:
    if len(output_asset_default_values) != len(output_dtypes):
      raise ValueError(
          'output_dtypes ({}) and output_asset_default_values ({}) had '
          'different lengths'.format(output_dtypes,
                                     output_asset_default_values))
    output_asset_default_values = [
        analyzer_nodes.TemporaryAssetInfo(value, 'text')
        for value in output_asset_default_values
    ]
  else:
    output_asset_default_values = [None] * len(output_dtypes)
  with tf.compat.v1.name_scope(name, 'ptransform'):
    output_tensor_infos = [
        analyzer_nodes.TensorInfo(dtype, shape, default_asset_content)
        for dtype, shape, default_asset_content in zip(
            output_dtypes, output_shapes, output_asset_default_values)
    ]
    return _apply_analyzer(
        ptransform, *inputs, output_tensor_info_list=output_tensor_infos)