Skip to content

TensorFlow Transform tft.coders Module

tensorflow_transform.coders

Module level imports for tensorflow_transform.coders.

Classes

CsvCoder

CsvCoder(
    column_names,
    schema,
    delimiter=",",
    secondary_delimiter=None,
    multivalent_columns=None,
)

A coder to encode CSV formatted data.

Initializes CsvCoder.

PARAMETER DESCRIPTION
column_names

Tuple of strings. Order must match the order in the file.

schema

A Schema proto.

delimiter

A one-character string used to separate fields.

DEFAULT: ','

secondary_delimiter

A one-character string used to separate values within the same field.

DEFAULT: None

multivalent_columns

A list of names for multivalent columns that need to be split based on secondary delimiter.

DEFAULT: None

RAISES DESCRIPTION
ValueError

If schema is invalid.

Source code in tensorflow_transform/coders/csv_coder.py
def __init__(self,
             column_names,
             schema,
             delimiter=',',
             secondary_delimiter=None,
             multivalent_columns=None):
  """Initializes CsvCoder.

  Args:
    column_names: Tuple of strings. Order must match the order in the file.
    schema: A `Schema` proto.
    delimiter: A one-character string used to separate fields.
    secondary_delimiter: A one-character string used to separate values within
      the same field.
    multivalent_columns: A list of names for multivalent columns that need to
      be split based on secondary delimiter.

  Raises:
    ValueError: If `schema` is invalid.
  """
  self._column_names = column_names
  self._schema = schema
  self._delimiter = delimiter
  self._secondary_delimiter = secondary_delimiter
  self._encoder = self._WriterWrapper(delimiter)

  if multivalent_columns is None:
    multivalent_columns = []
  self._multivalent_columns = multivalent_columns

  if secondary_delimiter:
    secondary_encoder = self._WriterWrapper(secondary_delimiter)
  elif multivalent_columns:
    raise ValueError(
        'secondary_delimiter unspecified for multivalent columns "{}"'.format(
            multivalent_columns))
  secondary_encoder_by_name = {
      name: secondary_encoder for name in multivalent_columns
  }
  indices_by_name = {
      name: index for index, name in enumerate(self._column_names)
  }

  def index(name):
    index = indices_by_name.get(name)
    if index is None:
      raise ValueError('Column not found: "{}"'.format(name))
    else:
      return index

  self._feature_handlers = []
  for name, feature_spec in schema_utils.schema_as_feature_spec(
      schema).feature_spec.items():
    if isinstance(feature_spec, tf.io.FixedLenFeature):
      self._feature_handlers.append(
          _FixedLenFeatureHandler(name, feature_spec, index(name),
                                  secondary_encoder_by_name.get(name)))
    elif isinstance(feature_spec, tf.io.VarLenFeature):
      self._feature_handlers.append(
          _VarLenFeatureHandler(name, feature_spec.dtype, index(name),
                                secondary_encoder_by_name.get(name)))
    elif isinstance(feature_spec, tf.io.SparseFeature):
      index_keys = (
          feature_spec.index_key if isinstance(feature_spec.index_key, list)
          else [feature_spec.index_key])
      for key in index_keys:
        self._feature_handlers.append(
            _VarLenFeatureHandler(key, tf.int64, index(key),
                                  secondary_encoder_by_name.get(name)))
      self._feature_handlers.append(
          _VarLenFeatureHandler(feature_spec.value_key, feature_spec.dtype,
                                index(feature_spec.value_key),
                                secondary_encoder_by_name.get(name)))
    else:
      raise ValueError(
          'feature_spec should be one of tf.FixedLenFeature, '
          'tf.VarLenFeature or tf.SparseFeature: {!r} was {!r}'.format(
              name, type(feature_spec)))
Functions
encode
encode(instance)

Encode a tf.transform encoded dict to a csv-formatted string.

PARAMETER DESCRIPTION
instance

A python dictionary where the keys are the column names and the values are fixed len or var len encoded features.

RETURNS DESCRIPTION

A csv-formatted string. The order of the columns is given by column_names.

Source code in tensorflow_transform/coders/csv_coder.py
def encode(self, instance):
  """Encode a tf.transform encoded dict to a csv-formatted string.

  Args:
    instance: A python dictionary where the keys are the column names and the
      values are fixed len or var len encoded features.

  Returns:
    A csv-formatted string. The order of the columns is given by column_names.
  """
  string_list = [None] * len(self._column_names)
  for feature_handler in self._feature_handlers:
    try:
      feature_handler.encode_value(string_list,
                                   instance[feature_handler.name])
    except TypeError as e:
      raise TypeError('{} while encoding feature "{}"'.format(
          e, feature_handler.name))
  return self._encoder.encode_record(string_list)

ExampleProtoCoder

ExampleProtoCoder(schema, serialized=True)

A coder between maybe-serialized TF Examples and tf.Transform datasets.

Build an ExampleProtoCoder.

PARAMETER DESCRIPTION
schema

A Schema proto.

serialized

Whether to encode serialized Example protos (as opposed to in-memory Example protos).

DEFAULT: True

RAISES DESCRIPTION
ValueError

If schema is invalid.

Source code in tensorflow_transform/coders/example_proto_coder.py
def __init__(self, schema, serialized=True):
  """Build an ExampleProtoCoder.

  Args:
    schema: A `Schema` proto.
    serialized: Whether to encode serialized Example protos (as opposed to
      in-memory Example protos).

  Raises:
    ValueError: If `schema` is invalid.
  """
  self._schema = schema
  self._serialized = serialized

  # Using pre-allocated tf.train.Example and FeatureHandler objects for
  # performance reasons.
  #
  # Since the output of "encode" is deep as opposed to shallow
  # transformations, and since the schema always fully defines the Example's
  # FeatureMap (ie all fields are always cleared/assigned or copied), the
  # optimization and implementation are correct and thread-compatible.
  self._encode_example_cache = tf.train.Example()
  self._feature_handlers = []
  for name, feature_spec in schema_utils.schema_as_feature_spec(
      schema).feature_spec.items():
    if isinstance(feature_spec, tf.io.FixedLenFeature):
      self._feature_handlers.append(
          _FixedLenFeatureHandler(name, feature_spec))
    elif isinstance(feature_spec, tf.io.VarLenFeature):
      self._feature_handlers.append(
          _VarLenFeatureHandler(name, feature_spec.dtype))
    elif isinstance(feature_spec, tf.io.SparseFeature):
      index_keys = (
          feature_spec.index_key if isinstance(feature_spec.index_key, list)
          else [feature_spec.index_key])
      for index_key in index_keys:
        self._feature_handlers.append(
            _VarLenFeatureHandler(index_key, tf.int64))
      self._feature_handlers.append(
          _VarLenFeatureHandler(feature_spec.value_key, feature_spec.dtype))
    elif isinstance(feature_spec, tf.io.RaggedFeature):
      uniform_partition = False
      for partition in feature_spec.partitions:
        if isinstance(partition, tf.io.RaggedFeature.RowLengths):
          if uniform_partition:
            raise ValueError(
                'Encountered ragged dimension after uniform for feature '
                '"{}": only inner dimensions can be uniform. Feature spec '
                'is {}'.format(name, feature_spec))
          self._feature_handlers.append(
              _VarLenFeatureHandler(partition.key, tf.int64))
        elif isinstance(partition, tf.io.RaggedFeature.UniformRowLength):
          # We don't encode uniform partitions since they can be recovered
          # from the shape information.
          uniform_partition = True
        else:
          raise ValueError(
              'Only `RowLengths` and `UniformRowLength` partitions of ragged '
              'features are supported, got {}'.format(type(partition)))
      self._feature_handlers.append(
          _VarLenFeatureHandler(feature_spec.value_key, feature_spec.dtype))
    else:
      raise ValueError('feature_spec should be one of tf.io.FixedLenFeature, '
                       'tf.io.VarLenFeature, tf.io.SparseFeature or '
                       'tf.io.RaggedFeature: "{}" was {}'.format(
                           name, type(feature_spec)))

  for feature_handler in self._feature_handlers:
    feature_handler.initialize_encode_cache(self._encode_example_cache)
Functions
encode
encode(instance)

Encode a tf.transform encoded dict as tf.Example.

Source code in tensorflow_transform/coders/example_proto_coder.py
def encode(self, instance):
  """Encode a tf.transform encoded dict as tf.Example."""
  # The feature handles encode using the self._encode_example_cache.
  for feature_handler in self._feature_handlers:
    value = instance[feature_handler.name]
    try:
      feature_handler.encode_value(value)
    except TypeError as e:
      raise TypeError('%s while encoding feature "%s"' %
                      (e, feature_handler.name))

  if self._serialized:
    return self._encode_example_cache.SerializeToString()

  result = tf.train.Example()
  result.CopyFrom(self._encode_example_cache)
  return result