Skip to content

TFX-BSL Public TFXIO

tfx_bsl.public.tfxio

Module level imports for tfx_bsl.public.tfxio.

TFXIO defines a common in-memory data representation shared by all TFX libraries and components, as well as an I/O abstraction layer to produce such representations. See the RFC for details: https://github.com/tensorflow/community/blob/master/rfcs/20191017-tfx-standardized-inputs.md

Attributes

TensorRepresentations module-attribute

TensorRepresentations = Dict[str, TensorRepresentation]

Classes

BeamRecordCsvTFXIO

BeamRecordCsvTFXIO(
    physical_format: Text,
    column_names: List[Text],
    delimiter: Optional[Text] = ",",
    skip_blank_lines: bool = True,
    multivalent_columns: Optional[Text] = None,
    secondary_delimiter: Optional[Text] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[Text] = None,
    telemetry_descriptors: Optional[List[Text]] = None,
)

Bases: _CsvTFXIOBase

TFXIO implementation for CSV records in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def __init__(self,
             physical_format: Text,
             column_names: List[Text],
             delimiter: Optional[Text] = ",",
             skip_blank_lines: bool = True,
             multivalent_columns: Optional[Text] = None,
             secondary_delimiter: Optional[Text] = None,
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[Text] = None,
             telemetry_descriptors: Optional[List[Text]] = None):
  super().__init__(
      telemetry_descriptors=telemetry_descriptors,
      raw_record_column_name=raw_record_column_name,
      logical_format="csv",
      physical_format=physical_format)
  self._schema = schema
  self._column_names = column_names
  self._delimiter = delimiter
  self._skip_blank_lines = skip_blank_lines
  self._multivalent_columns = multivalent_columns
  self._secondary_delimiter = secondary_delimiter
  self._raw_record_column_name = raw_record_column_name
  if schema is not None:
    feature_names = [f.name for f in schema.feature]
    if not set(feature_names).issubset(set(column_names)):
      raise ValueError(
          "Schema features are not a subset of column names: {} vs {}".format(
              column_names, feature_names))
  self._schema_projected = False
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/csv_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
  raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/csv_tfxio.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorFlowDataset(self,
                      options: dataset_options.TensorFlowDatasetOptions):
  raise NotImplementedError
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return self._TensorRepresentations(not self._schema_projected)

CsvTFXIO

CsvTFXIO(
    file_pattern: Text,
    column_names: List[Text],
    telemetry_descriptors: Optional[List[Text]] = None,
    validate: bool = True,
    delimiter: Optional[Text] = ",",
    skip_blank_lines: Optional[bool] = True,
    multivalent_columns: Optional[Text] = None,
    secondary_delimiter: Optional[Text] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[Text] = None,
    skip_header_lines: int = 0,
)

Bases: _CsvTFXIOBase

TFXIO implementation for CSV.

Initializes a CSV TFXIO.

PARAMETER DESCRIPTION
file_pattern

A file glob pattern to read csv files from.

TYPE: Text

column_names

List of csv column names. Order must match the order in the CSV file.

TYPE: List[Text]

telemetry_descriptors

A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

TYPE: Optional[List[Text]] DEFAULT: None

validate

Boolean flag to verify that the files exist during the pipeline creation time.

TYPE: bool DEFAULT: True

delimiter

A one-character string used to separate fields.

TYPE: Optional[Text] DEFAULT: ','

skip_blank_lines

A boolean to indicate whether to skip over blank lines rather than interpreting them as missing values.

TYPE: Optional[bool] DEFAULT: True

multivalent_columns

Name of column that can contain multiple values. If secondary_delimiter is provided, this must also be provided.

TYPE: Optional[Text] DEFAULT: None

secondary_delimiter

Delimiter used for parsing multivalent columns. If multivalent_columns is provided, this must also be provided.

TYPE: Optional[Text] DEFAULT: None

schema

An optional TFMD Schema describing the dataset. If schema is provided, it will determine the data type of the csv columns. Otherwise, the each column's data type will be inferred by the csv decoder. The schema should contain exactly the same features as column_names.

TYPE: Optional[Schema] DEFAULT: None

raw_record_column_name

If not None, the generated Arrow RecordBatches will contain a column of the given name that contains raw csv rows.

TYPE: Optional[Text] DEFAULT: None

skip_header_lines

Number of header lines to skip. Same number is skipped from each file. Must be 0 or higher. Large number of skipped lines might impact performance.

TYPE: int DEFAULT: 0

Source code in tfx_bsl/tfxio/csv_tfxio.py
def __init__(self,
             file_pattern: Text,
             column_names: List[Text],
             telemetry_descriptors: Optional[List[Text]] = None,
             validate: bool = True,
             delimiter: Optional[Text] = ",",
             skip_blank_lines: Optional[bool] = True,
             multivalent_columns: Optional[Text] = None,
             secondary_delimiter: Optional[Text] = None,
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[Text] = None,
             skip_header_lines: int = 0):
  """Initializes a CSV TFXIO.

  Args:
    file_pattern: A file glob pattern to read csv files from.
    column_names: List of csv column names. Order must match the order in the
      CSV file.
    telemetry_descriptors: A set of descriptors that identify the component
      that is instantiating this TFXIO. These will be used to construct the
      namespace to contain metrics for profiling and are therefore expected to
      be identifiers of the component itself and not individual instances of
      source use.
    validate: Boolean flag to verify that the files exist during the pipeline
      creation time.
    delimiter: A one-character string used to separate fields.
    skip_blank_lines: A boolean to indicate whether to skip over blank lines
      rather than interpreting them as missing values.
    multivalent_columns: Name of column that can contain multiple values. If
      secondary_delimiter is provided, this must also be provided.
    secondary_delimiter: Delimiter used for parsing multivalent columns. If
      multivalent_columns is provided, this must also be provided.
    schema: An optional TFMD Schema describing the dataset. If schema is
      provided, it will determine the data type of the csv columns. Otherwise,
      the each column's data type will be inferred by the csv decoder. The
      schema should contain exactly the same features as column_names.
    raw_record_column_name: If not None, the generated Arrow RecordBatches
      will contain a column of the given name that contains raw csv rows.
    skip_header_lines: Number of header lines to skip. Same number is
      skipped from each file. Must be 0 or higher. Large number of
      skipped lines might impact performance.
  """
  super().__init__(
      column_names=column_names,
      delimiter=delimiter,
      skip_blank_lines=skip_blank_lines,
      multivalent_columns=multivalent_columns,
      secondary_delimiter=secondary_delimiter,
      schema=schema,
      raw_record_column_name=raw_record_column_name,
      telemetry_descriptors=telemetry_descriptors,
      physical_format="text")
  self._file_pattern = file_pattern
  self._validate = validate
  self._skip_header_lines = skip_header_lines
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/csv_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
  raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/csv_tfxio.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorFlowDataset(self,
                      options: dataset_options.TensorFlowDatasetOptions):
  raise NotImplementedError
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return self._TensorRepresentations(not self._schema_projected)

RecordBatchToExamplesEncoder

RecordBatchToExamplesEncoder(
    schema: Optional[Schema] = None,
)

Encodes pa.RecordBatch as a list of serialized tf.Examples.

Requires TFMD schema only if RecordBatches contains nested lists with depth > 2 that represent TensorFlow's RaggedFeatures.

Source code in tfx_bsl/coders/example_coder.py
def __init__(self, schema: Optional[schema_pb2.Schema] = None):
  self._schema = schema
  self._coder = RecordBatchToExamplesEncoderCpp(
      None if schema is None else schema.SerializeToString()
  )
Functions
encode
encode(record_batch: RecordBatch) -> List[bytes]
Source code in tfx_bsl/coders/example_coder.py
def encode(self, record_batch: pa.RecordBatch) -> List[bytes]:  # pylint: disable=invalid-name
  return self._coder.Encode(record_batch)

RecordBatchesOptions

Bases: NamedTuple('RecordBatchesOptions', [('batch_size', int), ('drop_final_batch', bool), ('num_epochs', Optional[int]), ('shuffle', bool), ('shuffle_buffer_size', int), ('shuffle_seed', Optional[int])])

Options for TFXIO's RecordBatches.

Note: not all of these options may be effective. It depends on the particular TFXIO's implementation.

TFExampleBeamRecord

TFExampleBeamRecord(
    physical_format: str,
    telemetry_descriptors: Optional[List[str]] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
)

Bases: _TFExampleRecordBase

TFXIO implementation for serialized tf.Examples in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes (serialized tf.Examples).

Initializer.

PARAMETER DESCRIPTION
physical_format

The physical format that describes where the input pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text", "tfrecord".

TYPE: str

telemetry_descriptors

A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

TYPE: Optional[List[str]] DEFAULT: None

schema

A TFMD Schema describing the dataset.

TYPE: Optional[Schema] DEFAULT: None

raw_record_column_name

If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

TYPE: Optional[str] DEFAULT: None

Source code in tfx_bsl/tfxio/tf_example_record.py
def __init__(self,
             physical_format: str,
             telemetry_descriptors: Optional[List[str]] = None,
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[str] = None):
  """Initializer.

  Args:
    physical_format: The physical format that describes where the input
      pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text",
      "tfrecord".
    telemetry_descriptors: A set of descriptors that identify the component
      that is instantiating this TFXIO. These will be used to construct the
      namespace to contain metrics for profiling and are therefore expected to
      be identifiers of the component itself and not individual instances of
      source use.
    schema: A TFMD Schema describing the dataset.
    raw_record_column_name: If not None, the generated Arrow RecordBatches
      will contain a column of the given name that contains serialized
      records.
  """
  super().__init__(
      schema=schema, raw_record_column_name=raw_record_column_name,
      telemetry_descriptors=telemetry_descriptors,
      physical_format=physical_format)
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
  raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_example_record.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorFlowDataset(self,
                      options: dataset_options.TensorFlowDatasetOptions):
  raise NotImplementedError(
      "TFExampleBeamRecord is unable to provide a TensorFlowDataset "
      "because it does not do I/O")
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
      self._schema)

TFExampleRecord

TFExampleRecord(
    file_pattern: Union[List[str], str],
    validate: bool = True,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
    telemetry_descriptors: Optional[List[str]] = None,
)

Bases: _TFExampleRecordBase

TFXIO implementation for tf.Example on TFRecord.

Initializes a TFExampleRecord TFXIO.

PARAMETER DESCRIPTION
file_pattern

A file glob pattern to read TFRecords from.

TYPE: Union[List[str], str]

validate

Not used. do not set. (not used since post 0.22.1).

TYPE: bool DEFAULT: True

schema

A TFMD Schema describing the dataset.

TYPE: Optional[Schema] DEFAULT: None

raw_record_column_name

If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

TYPE: Optional[str] DEFAULT: None

telemetry_descriptors

A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

TYPE: Optional[List[str]] DEFAULT: None

Source code in tfx_bsl/tfxio/tf_example_record.py
def __init__(self,
             file_pattern: Union[List[str], str],
             validate: bool = True,
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[str] = None,
             telemetry_descriptors: Optional[List[str]] = None):
  """Initializes a TFExampleRecord TFXIO.

  Args:
    file_pattern: A file glob pattern to read TFRecords from.
    validate: Not used. do not set. (not used since post 0.22.1).
    schema: A TFMD Schema describing the dataset.
    raw_record_column_name: If not None, the generated Arrow RecordBatches
      will contain a column of the given name that contains serialized
      records.
    telemetry_descriptors: A set of descriptors that identify the component
      that is instantiating this TFXIO. These will be used to construct the
      namespace to contain metrics for profiling and are therefore expected to
      be identifiers of the component itself and not individual instances of
      source use.
  """
  super().__init__(
      schema=schema, raw_record_column_name=raw_record_column_name,
      telemetry_descriptors=telemetry_descriptors,
      physical_format="tfrecords_gzip")
  del validate
  if not isinstance(file_pattern, list):
    file_pattern = [file_pattern]
  assert file_pattern, "Must provide at least one file pattern."
  self._file_pattern = file_pattern
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(
    options: RecordBatchesOptions,
) -> Iterator[RecordBatch]

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/tf_example_record.py
def RecordBatches(
    self, options: dataset_options.RecordBatchesOptions
) -> Iterator[pa.RecordBatch]:
  dataset = dataset_util.make_tf_record_dataset(
      self._file_pattern, options.batch_size, options.drop_final_batch,
      options.num_epochs, options.shuffle, options.shuffle_buffer_size,
      options.shuffle_seed)

  decoder = example_coder.ExamplesToRecordBatchDecoder(
      self._schema.SerializeToString() if self._schema is not None else None
  )
  for examples in dataset.as_numpy_iterator():
    decoded = decoder.DecodeBatch(examples)
    if self._raw_record_column_name is None:
      yield decoded
    else:
      yield record_based_tfxio.AppendRawRecordColumn(
          decoded, self._raw_record_column_name, examples.tolist())
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_example_record.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Creates a TFRecordDataset that yields Tensors.

The serialized tf.Examples are parsed by tf.io.parse_example to create Tensors.

See base class (tfxio.TFXIO) for more details.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. See dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

RETURNS DESCRIPTION
Dataset

A dataset of dict elements, (or a tuple of dict elements and label).

Dataset

Each dict maps feature keys to Tensor, SparseTensor, or

Dataset

RaggedTensor objects.

RAISES DESCRIPTION
ValueError

if there is something wrong with the tensor_representation.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Creates a TFRecordDataset that yields Tensors.

  The serialized tf.Examples are parsed by `tf.io.parse_example` to create
  Tensors.

  See base class (tfxio.TFXIO) for more details.

  Args:
    options: an options object for the tf.data.Dataset. See
      `dataset_options.TensorFlowDatasetOptions` for more details.

  Returns:
    A dataset of `dict` elements, (or a tuple of `dict` elements and label).
    Each `dict` maps feature keys to `Tensor`, `SparseTensor`, or
    `RaggedTensor` objects.

  Raises:
    ValueError: if there is something wrong with the tensor_representation.
  """
  (tf_example_parser_config,
   feature_name_to_tensor_name) = self._GetTfExampleParserConfig()

  file_pattern = tf.convert_to_tensor(self._file_pattern)
  dataset = dataset_util.make_tf_record_dataset(
      file_pattern,
      batch_size=options.batch_size,
      num_epochs=options.num_epochs,
      shuffle=options.shuffle,
      shuffle_buffer_size=options.shuffle_buffer_size,
      shuffle_seed=options.shuffle_seed,
      reader_num_threads=options.reader_num_threads,
      drop_final_batch=options.drop_final_batch)

  # Parse `Example` tensors to a dictionary of `Feature` tensors.
  dataset = dataset.apply(
      tf.data.experimental.parse_example_dataset(tf_example_parser_config))

  dataset = dataset.map(
      lambda x: self._RenameFeatures(x, feature_name_to_tensor_name))

  label_key = options.label_key
  if label_key is not None:
    dataset = self._PopLabelFeatureFromDataset(dataset, label_key)

  return dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
      self._schema)

TFGraphRecordDecoder

Base class for decoders that turns a list of bytes to (composite) tensors.

Sub-classes must implement decode_record() (see its docstring for requirements).

Decoder instances can be saved as a SavedModel by save_decoder(). The SavedModel can be loaded back by load_decoder(). However, the loaded decoder will always be of the type LoadedDecoder and only have the public interfaces listed in this base class available.

Attributes
record_index_tensor_name property
record_index_tensor_name: Optional[str]

The name of the tensor indicating which record a slice is from.

The decoded tensors are batch-aligned among themselves, but they don't necessarily have to be batch-aligned with the input records. If not, sub-classes should implement this method to tie the batch dimension with the input record.

The record index tensor must be a SparseTensor or a RaggedTensor of integral type, and must be 2-D and must not contain "missing" values.

A record index tensor like the following: [[0], [0], [2]] means that of 3 "rows" in the output "batch", the first two rows came from the first record, and the 3rd row came from the third record.

The name must not be an empty string.

RETURNS DESCRIPTION
Optional[str]

The name of the record index tensor.

Functions
decode_record abstractmethod
decode_record(records: Tensor) -> Dict[str, TensorAlike]

Sub-classes should implement this.

Implementations must use TF ops to derive the result (composite) tensors, as this function will be traced and become a tf.function (thus a TF Graph). Note that autograph is not enabled in such tracing, which means any python control flow / loops will not be converted to TF cond / loops automatically.

The returned tensors must be batch-aligned (i.e. they should be at least of rank 1, and their outer-most dimensions must be of the same size). They do not have to be batch-aligned with the input tensor, but if that's the case, an additional tensor must be provided among the results, to indicate which input record a "row" in the output batch comes from. See record_index_tensor_name for more details.

PARAMETER DESCRIPTION
records

a 1-D string tensor that contains the records to be decoded.

TYPE: Tensor

RETURNS DESCRIPTION
Dict[str, TensorAlike]

A dict of (composite) tensors.

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
@abc.abstractmethod
def decode_record(self, records: tf.Tensor) -> Dict[str, TensorAlike]:
  """Sub-classes should implement this.

  Implementations must use TF ops to derive the result (composite) tensors, as
  this function will be traced and become a tf.function (thus a TF Graph).
  Note that autograph is not enabled in such tracing, which means any python
  control flow / loops will not be converted to TF cond / loops automatically.

  The returned tensors must be batch-aligned (i.e. they should be at least
  of rank 1, and their outer-most dimensions must be of the same size). They
  do not have to be batch-aligned with the input tensor, but if that's the
  case, an additional tensor must be provided among the results, to indicate
  which input record a "row" in the output batch comes from. See
  `record_index_tensor_name` for more details.

  Args:
    records: a 1-D string tensor that contains the records to be decoded.

  Returns:
    A dict of (composite) tensors.
  """
output_type_specs
output_type_specs() -> Dict[str, TypeSpec]

Returns the tf.TypeSpecs of the decoded tensors.

RETURNS DESCRIPTION
Dict[str, TypeSpec]

A dict whose keys are the same as keys of the dict returned by

Dict[str, TypeSpec]

decode_record() and values are the tf.TypeSpec of the corresponding

Dict[str, TypeSpec]

(composite) tensor.

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
def output_type_specs(self) -> Dict[str, tf.TypeSpec]:
  """Returns the tf.TypeSpecs of the decoded tensors.

  Returns:
    A dict whose keys are the same as keys of the dict returned by
    `decode_record()` and values are the tf.TypeSpec of the corresponding
    (composite) tensor.
  """
  return {
      k: tf.type_spec_from_value(v) for k, v in
      self._make_concrete_decode_function().structured_outputs.items()
  }
save
save(path: str) -> None

Saves this TFGraphRecordDecoder to a SavedModel at path.

This functions the same as tf_graph_record_decoder.save_decoder(). This is provided purely for convenience, and should not impact the actual saved model, since only the tf.function from _make_concrete_decode_function is saved.

PARAMETER DESCRIPTION
path

The path to where the saved_model is saved.

TYPE: str

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
def save(self, path: str) -> None:
  """Saves this TFGraphRecordDecoder to a SavedModel at `path`.

  This functions the same as `tf_graph_record_decoder.save_decoder()`. This is
  provided purely for convenience, and should not impact the actual saved
  model, since only the `tf.function` from `_make_concrete_decode_function` is
  saved.

  Args:
    path: The path to where the saved_model is saved.
  """
  save_decoder(self, path)

TFSequenceExampleBeamRecord

TFSequenceExampleBeamRecord(
    physical_format: Text,
    telemetry_descriptors: List[Text],
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[Text] = None,
)

Bases: _TFSequenceExampleRecordBase

TFXIO implementation for serialized tf.SequenceExamples in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes (serialized tf.SequenceExamples).

Initializer.

PARAMETER DESCRIPTION
physical_format

The physical format that describes where the input pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text", "tfrecord".

TYPE: Text

telemetry_descriptors

A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

TYPE: List[Text]

schema

A TFMD Schema describing the dataset.

TYPE: Optional[Schema] DEFAULT: None

raw_record_column_name

If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

TYPE: Optional[Text] DEFAULT: None

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def __init__(self,
             physical_format: Text,
             telemetry_descriptors: List[Text],
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[Text] = None):
  """Initializer.

  Args:
    physical_format: The physical format that describes where the input
      pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text",
      "tfrecord".
    telemetry_descriptors: A set of descriptors that identify the component
      that is instantiating this TFXIO. These will be used to construct the
      namespace to contain metrics for profiling and are therefore expected to
      be identifiers of the component itself and not individual instances of
      source use.
    schema: A TFMD Schema describing the dataset.
    raw_record_column_name: If not None, the generated Arrow RecordBatches
      will contain a column of the given name that contains serialized
      records.
  """
  super().__init__(
      schema=schema, raw_record_column_name=raw_record_column_name,
      telemetry_descriptors=telemetry_descriptors,
      physical_format=physical_format)
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
schema property
schema: Schema
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
  raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorFlowDataset(self,
                      options: dataset_options.TensorFlowDatasetOptions):
  raise NotImplementedError(
      "TFExampleBeamRecord is unable to provide a TensorFlowDataset "
      "because it does not do I/O")
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
      self._schema)

TFSequenceExampleRecord

TFSequenceExampleRecord(
    file_pattern: Union[List[Text], Text],
    telemetry_descriptors: List[Text],
    validate: bool = True,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[Text] = None,
)

Bases: _TFSequenceExampleRecordBase

TFXIO implementation for tf.SequenceExample on TFRecord.

Initializes a TFSequenceExampleRecord TFXIO.

PARAMETER DESCRIPTION
file_pattern

One or a list of glob patterns. If a list, must not be empty.

TYPE: Union[List[Text], Text]

telemetry_descriptors

A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

TYPE: List[Text]

validate

Not used. do not set. (not used since post 0.22.1).

TYPE: bool DEFAULT: True

schema

A TFMD Schema describing the dataset.

TYPE: Optional[Schema] DEFAULT: None

raw_record_column_name

If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

TYPE: Optional[Text] DEFAULT: None

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def __init__(self,
             file_pattern: Union[List[Text], Text],
             telemetry_descriptors: List[Text],
             validate: bool = True,
             schema: Optional[schema_pb2.Schema] = None,
             raw_record_column_name: Optional[Text] = None):
  """Initializes a TFSequenceExampleRecord TFXIO.

  Args:
    file_pattern: One or a list of glob patterns. If a list, must not be
      empty.
    telemetry_descriptors: A set of descriptors that identify the component
      that is instantiating this TFXIO. These will be used to construct the
      namespace to contain metrics for profiling and are therefore expected to
      be identifiers of the component itself and not individual instances of
      source use.
    validate: Not used. do not set. (not used since post 0.22.1).
    schema: A TFMD Schema describing the dataset.
    raw_record_column_name: If not None, the generated Arrow RecordBatches
      will contain a column of the given name that contains serialized
      records.
  """
  super().__init__(
      schema=schema, raw_record_column_name=raw_record_column_name,
      telemetry_descriptors=telemetry_descriptors,
      physical_format="tfrecords_gzip")
  del validate
  if not isinstance(file_pattern, list):
    file_pattern = [file_pattern]
  assert file_pattern, "Must provide at least one file pattern."
  self._file_pattern = file_pattern
Attributes
raw_record_column_name property
raw_record_column_name: Optional[Text]
schema property
schema: Schema
telemetry_descriptors property
telemetry_descriptors: Optional[List[Text]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
  schema = self._ArrowSchemaNoRawRecordColumn()
  if self._raw_record_column_name is not None:
    if schema.get_field_index(self._raw_record_column_name) != -1:
      raise ValueError(
          "Raw record column name {} collided with a column in the schema."
          .format(self._raw_record_column_name))
    schema = schema.append(
        pa.field(self._raw_record_column_name,
                 pa.large_list(pa.large_binary())))
  return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll_or_pipeline: Any):
    """Converts raw records to RecordBatches."""
    return (
        pcoll_or_pipeline
        | "RawRecordBeamSource" >> self.RawRecordBeamSource()
        | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size))

  return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
  """Returns a PTransform that produces a PCollection[bytes].

  Used together with RawRecordToRecordBatch(), it allows getting both the
  PCollection of the raw records and the PCollection of the RecordBatch from
  the same source. For example:

  record_batch = pipeline | tfxio.BeamSource()
  raw_record = pipeline | tfxio.RawRecordBeamSource()

  would result in the files being read twice, while the following would only
  read once:

  raw_record = pipeline | tfxio.RawRecordBeamSource()
  record_batch = raw_record | tfxio.RawRecordToRecordBatch()
  """

  @beam.typehints.with_input_types(Any)
  @beam.typehints.with_output_types(bytes)
  def _PTransformFn(pcoll_or_pipeline: Any):
    return (pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry" >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors, self._logical_format,
                self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.

PARAMETER DESCRIPTION
options

A TensorFlowDatasetOptions object. Not all options will apply.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a Dataset that contains nested Datasets of raw records.

  May not be implemented for some TFXIOs.

  This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
  suffice. Namely, if there is some logical grouping of files which we need
  to perform operations on, without applying the operation to each individual
  group (i.e. shuffle).

  The returned Dataset object is a dataset of datasets, where each nested
  dataset is a dataset of serialized records. When shuffle=False (default),
  the nested datasets are deterministically ordered. Each nested dataset can
  represent multiple files. The files are merged into one dataset if the files
  have the same format. For example:

  ```
  file_patterns = ['file_1', 'file_2', 'dir_1/*']
  file_formats = ['recordio', 'recordio', 'sstable']
  tfxio = SomeTFXIO(file_patterns, file_formats)
  datasets = tfxio.RawRecordTensorFlowDataset(options)
  ```
  `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
  iterates over records from 'file_1' and 'file_2', and ds2 iterates over
  records from files matched by 'dir_1/*'.

  Example usage:
  ```
  tfxio = SomeTFXIO(file_patterns, file_formats)
  ds = tfxio.RawRecordTensorFlowDataset(options=options)
  ds = ds.flat_map(lambda x: x)
  records = list(ds.as_numpy_iterator())
  # iterating over `records` yields records from the each file in
  # `file_patterns`. See `tf.data.Dataset.list_files` for more information
  # about the order of files when expanding globs.
  ```
  Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
  a dataset of datasets.

  When shuffle=True, then the datasets not deterministically ordered,
  but the contents of each nested dataset are deterministcally ordered.
  For example, we may potentially have [ds2, ds1, ds3], where the
  contents of ds1, ds2, and ds3 are all deterministcally ordered.

  Args:
    options: A TensorFlowDatasetOptions object. Not all options will apply.
  """
  raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(self,
                           batch_size: Optional[int] = None
                          ) -> beam.PTransform:
  """Returns a PTransform that converts raw records to Arrow RecordBatches.

  The input PCollection must be from self.RawRecordBeamSource() (also see
  the documentation for that method).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """

  @beam.typehints.with_input_types(bytes)
  @beam.typehints.with_output_types(pa.RecordBatch)
  def _PTransformFn(pcoll: beam.pvalue.PCollection):
    return (pcoll
            | "RawRecordToRecordBatch" >>
            self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry" >>
            telemetry.ProfileRecordBatches(self._telemetry_descriptors,
                                           self._logical_format,
                                           self._physical_format))

  return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
  raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def SupportAttachingRawRecords(self) -> bool:
  return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Creates a tf.data.Dataset that yields Tensors.

The serialized tf.SequenceExamples are parsed by tf.io.parse_sequence_example.

See base class (tfxio.TFXIO) for more details.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. See dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

RETURNS DESCRIPTION
Dataset

A dataset of dict elements, (or a tuple of dict elements and label).

Dataset

Each dict maps feature keys to Tensor, SparseTensor, or

Dataset

RaggedTensor objects.

RAISES DESCRIPTION
ValueError

if there is something wrong with the provided schema.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Creates a tf.data.Dataset that yields Tensors.

  The serialized tf.SequenceExamples are parsed by
  `tf.io.parse_sequence_example`.

  See base class (tfxio.TFXIO) for more details.

  Args:
    options: an options object for the tf.data.Dataset. See
      `dataset_options.TensorFlowDatasetOptions` for more details.

  Returns:
    A dataset of `dict` elements, (or a tuple of `dict` elements and label).
    Each `dict` maps feature keys to `Tensor`, `SparseTensor`, or
    `RaggedTensor` objects.

  Raises:
    ValueError: if there is something wrong with the provided schema.
  """
  file_pattern = tf.convert_to_tensor(self._file_pattern)
  dataset = dataset_util.make_tf_record_dataset(
      file_pattern,
      batch_size=options.batch_size,
      num_epochs=options.num_epochs,
      shuffle=options.shuffle,
      shuffle_buffer_size=options.shuffle_buffer_size,
      shuffle_seed=options.shuffle_seed,
      reader_num_threads=options.reader_num_threads,
      drop_final_batch=options.drop_final_batch)

  return self._ParseRawRecordTensorFlowDataset(dataset, options.label_key)
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
      self._schema)

TFXIO

Bases: object

Abstract basic class of all TFXIO API implementations.

Functions
ArrowSchema abstractmethod
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def ArrowSchema(self) -> pa.Schema:
  """Returns the schema of the `RecordBatch` produced by `self.BeamSource()`.

  May raise an error if the TFMD schema was not provided at construction time.
  """
BeamSource abstractmethod
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).

PARAMETER DESCRIPTION
batch_size

if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

TYPE: Optional[int] DEFAULT: None

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
  """Returns a beam `PTransform` that produces `PCollection[pa.RecordBatch]`.

  May NOT raise an error if the TFMD schema was not provided at construction
  time.

  If a TFMD schema was provided at construction time, all the
  `pa.RecordBatch`es in the result `PCollection` must be of the same schema
  returned by `self.ArrowSchema`. If a TFMD schema was not provided, the
  `pa.RecordBatch`es might not be of the same schema (they may contain
  different numbers of columns).

  Args:
    batch_size: if not None, the `pa.RecordBatch` produced will be of the
      specified size. Otherwise it's automatically tuned by Beam.
  """
Project
Project(tensor_names: List[Text]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
tensor_names

a set of tensor names.

TYPE: List[Text]

RETURNS DESCRIPTION
TFXIO

A TFXIO instance that is the same as self except that:

TFXIO
  • Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource()
TFXIO
  • self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.
Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[Text]) -> "TFXIO":
  """Projects the dataset represented by this TFXIO.

  A Projected TFXIO:
  - Only columns needed for given tensor_names are guaranteed to be
    produced by `self.BeamSource()`
  - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
    to contain only those tensors.
  - It retains a reference to the very original TFXIO, so its TensorAdapter
    knows about the specs of the tensors that would be produced by the
    original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    tensor_names: a set of tensor names.

  Returns:
    A `TFXIO` instance that is the same as `self` except that:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
  """
  if isinstance(self, _ProjectedTFXIO):
    # pylint: disable=protected-access
    return _ProjectedTFXIO(self.origin,
                           self.projected._ProjectImpl(tensor_names))
  return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RecordBatches abstractmethod
RecordBatches(
    options: RecordBatchesOptions,
) -> Iterator[RecordBatch]

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.

PARAMETER DESCRIPTION
options

An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

TYPE: RecordBatchesOptions

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def RecordBatches(
    self, options: dataset_options.RecordBatchesOptions
) -> Iterator[pa.RecordBatch]:
  """Returns an iterable of record batches.

  This can be used outside of Apache Beam or TensorFlow to access data.

  Args:
    options: An options object for iterating over record batches. Look at
      `dataset_options.RecordBatchesOptions` for more details.
  """
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
  """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.
  """
  return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

RETURNS DESCRIPTION
TensorAdapterConfig

a TensorAdapterConfig that is the same as what is used to initialize the

TensorAdapterConfig

TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
  """Returns the config to initialize a `TensorAdapter`.

  Returns:
    a `TensorAdapterConfig` that is the same as what is used to initialize the
    `TensorAdapter` returned by `self.TensorAdapter()`.
  """
  return tensor_adapter.TensorAdapterConfig(
      self.ArrowSchema(), self.TensorRepresentations())
TensorFlowDataset abstractmethod
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

PARAMETER DESCRIPTION
options

an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

TYPE: TensorFlowDatasetOptions

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def TensorFlowDataset(
    self,
    options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset:
  """Returns a tf.data.Dataset of TF inputs.

  May raise an error if the TFMD schema was not provided at construction time.

  Args:
    options: an options object for the tf.data.Dataset. Look at
      `dataset_options.TensorFlowDatasetOptions` for more details.
  """
TensorRepresentations abstractmethod
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
  """Returns the `TensorRepresentations`.

  These `TensorRepresentation`s describe the tensors or composite tensors
  produced by the `TensorAdapter` created from `self.TensorAdapter()` or
  the tf.data.Dataset created from `self.TensorFlowDataset()`.

  May raise an error if the TFMD schema was not provided at construction time.
  May raise an error if the tensor representations are invalid.
  """

TensorAdapter

TensorAdapter(config: TensorAdapterConfig)

Bases: object

A TensorAdapter converts a RecordBatch to a collection of TF Tensors.

The conversion is determined by both the Arrow schema and the TensorRepresentations, which must be provided at the initialization time. Each TensorRepresentation contains the information needed to translates one or more columns in a RecordBatch of the given Arrow schema into a TF Tensor or CompositeTensor. They are contained in a Dict whose keys are the names of the tensors, which will be the keys of the Dict produced by ToBatchTensors().

TypeSpecs() returns static TypeSpecs of those tensors by their names, i.e. if they have a shape, then the size of the first (batch) dimension is always unknown (None) because it depends on the size of the RecordBatch passed to ToBatchTensors().

It is guaranteed that for any tensor_name in the given TensorRepresentations self.TypeSpecs()[tensor_name].is_compatible_with( self.ToBatchedTensors(...)[tensor_name])

Sliced RecordBatches and LargeListArray columns having null elements backed by non-empty sub-lists are not supported and will yield undefined behaviour.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def __init__(self, config: TensorAdapterConfig):

  self._arrow_schema = config.arrow_schema
  self._type_handlers = _BuildTypeHandlers(config.tensor_representations,
                                           config.arrow_schema)
  self._type_specs = {
      tensor_name: handler.type_spec
      for tensor_name, handler in self._type_handlers
  }

  self._original_type_specs = (
      self._type_specs
      if config.original_type_specs is None else config.original_type_specs)

  for tensor_name, type_spec in self._type_specs.items():
    original_type_spec = self._original_type_specs.get(tensor_name, None)
    if original_type_spec is None or original_type_spec != type_spec:
      raise ValueError(
          "original_type_specs must be a superset of type_specs derived from "
          "TensorRepresentations. But for tensor {}, got {} vs {}".format(
              tensor_name, original_type_spec, type_spec))
Functions
OriginalTypeSpecs
OriginalTypeSpecs() -> Dict[str, TypeSpec]

Returns the origin's type specs.

A TFXIO 'Y' may be a result of projection of another TFXIO 'X', in which case then 'X' is the origin of 'Y'. And this method returns what X.TensorAdapter().TypeSpecs() would return.

May equal to self.TypeSpecs().

Returns: a mapping from tensor names to tf.TypeSpecs.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def OriginalTypeSpecs(self) -> Dict[str, tf.TypeSpec]:
  """Returns the origin's type specs.

  A TFXIO 'Y' may be a result of projection of another TFXIO 'X', in which
  case then 'X' is the origin of 'Y'. And this method returns what
  X.TensorAdapter().TypeSpecs() would return.

  May equal to `self.TypeSpecs()`.

  Returns: a mapping from tensor names to `tf.TypeSpec`s.
  """
  return self._original_type_specs
ToBatchTensors
ToBatchTensors(
    record_batch: RecordBatch,
    produce_eager_tensors: Optional[bool] = None,
) -> Dict[str, Any]

Returns a batch of tensors translated from record_batch.

PARAMETER DESCRIPTION
record_batch

input RecordBatch.

TYPE: RecordBatch

produce_eager_tensors

controls whether the ToBatchTensors() produces eager tensors or ndarrays (or Tensor value objects). If None, determine that from whether TF Eager mode is enabled.

TYPE: Optional[bool] DEFAULT: None

RAISES DESCRIPTION
RuntimeError

when Eager Tensors are requested but TF is not executing eagerly.

ValueError

when Any handler failed to produce a Tensor.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def ToBatchTensors(
    self,
    record_batch: pa.RecordBatch,
    produce_eager_tensors: Optional[bool] = None) -> Dict[str, Any]:
  """Returns a batch of tensors translated from `record_batch`.

  Args:
    record_batch: input RecordBatch.
    produce_eager_tensors: controls whether the ToBatchTensors() produces
      eager tensors or ndarrays (or Tensor value objects). If None, determine
      that from whether TF Eager mode is enabled.

  Raises:
    RuntimeError: when Eager Tensors are requested but TF is not executing
      eagerly.
    ValueError: when Any handler failed to produce a Tensor.
  """

  tf_executing_eagerly = tf.executing_eagerly()
  if produce_eager_tensors and not tf_executing_eagerly:
    raise RuntimeError(
        "Eager Tensors were requested but eager mode was not enabled.")
  if produce_eager_tensors is None:
    produce_eager_tensors = tf_executing_eagerly

  if not record_batch.schema.equals(self._arrow_schema):
    raise ValueError("Expected same schema.")
  result = {}
  for tensor_name, handler in self._type_handlers:
    try:
      result[tensor_name] = handler.GetTensor(record_batch,
                                              produce_eager_tensors)
    except Exception as e:
      raise ValueError(
          "Error raised when handling tensor '{}'".format(tensor_name)) from e

  return result
TypeSpecs
TypeSpecs() -> Dict[str, TypeSpec]

Returns the TypeSpec for each tensor.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def TypeSpecs(self) -> Dict[str, tf.TypeSpec]:
  """Returns the TypeSpec for each tensor."""
  return self._type_specs

TensorAdapterConfig

TensorAdapterConfig(
    arrow_schema: Schema,
    tensor_representations: TensorRepresentations,
    original_type_specs: Optional[
        Dict[str, TypeSpec]
    ] = None,
)

Bases: object

Config to a TensorAdapter.

Contains all the information needed to create a TensorAdapter.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def __init__(self,
             arrow_schema: pa.Schema,
             tensor_representations: TensorRepresentations,
             original_type_specs: Optional[Dict[str, tf.TypeSpec]] = None):
  self.arrow_schema = arrow_schema
  self.tensor_representations = tensor_representations
  self.original_type_specs = original_type_specs
Attributes
arrow_schema instance-attribute
arrow_schema = arrow_schema
original_type_specs instance-attribute
original_type_specs = original_type_specs
tensor_representations instance-attribute
tensor_representations = tensor_representations
Functions

TensorFlowDatasetOptions

Bases: NamedTuple('TensorFlowDatasetOptions', [('batch_size', int), ('drop_final_batch', bool), ('num_epochs', Optional[int]), ('shuffle', bool), ('shuffle_buffer_size', int), ('shuffle_seed', Optional[int]), ('prefetch_buffer_size', int), ('reader_num_threads', int), ('parser_num_threads', int), ('sloppy_ordering', bool), ('label_key', Optional[str])])

Options for TFXIO's TensorFlowDataset.

Note: not all of these options may be effective. It depends on the particular TFXIO's implementation.