Skip to content

TensorFlow Transform tft.beam Module

tensorflow_transform.beam

Module level imports for tensorflow_transform.beam.

Classes

AnalyzeAndTransformDataset

AnalyzeAndTransformDataset(
    preprocessing_fn, output_record_batches=False
)

Bases: PTransform

Combination of AnalyzeDataset and TransformDataset.

transformed, transform_fn = AnalyzeAndTransformDataset(
    preprocessing_fn).expand(dataset)

should be equivalent to

transform_fn = AnalyzeDataset(preprocessing_fn).expand(dataset)
transformed = TransformDataset().expand((dataset, transform_fn))

but may be more efficient since it avoids multiple passes over the data.

Init method.

PARAMETER DESCRIPTION
preprocessing_fn

A function that accepts and returns a dictionary from strings to Tensors, SparseTensors, or RaggedTensors.

output_record_batches

(Optional) A bool. If True, AnalyzeAndTransformDataset outputs pyarrow.RecordBatches; otherwise, outputs instance dicts.

DEFAULT: False

Source code in tensorflow_transform/beam/impl.py
def __init__(self, preprocessing_fn, output_record_batches=False):
  """Init method.

  Args:
    preprocessing_fn: A function that accepts and returns a dictionary from
        strings to `Tensor`s, `SparseTensor`s, or `RaggedTensor`s.
    output_record_batches: (Optional) A bool. If `True`,
        `AnalyzeAndTransformDataset` outputs `pyarrow.RecordBatch`es;
        otherwise, outputs instance dicts.
  """
  self._preprocessing_fn = preprocessing_fn
  self._output_record_batches = output_record_batches
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(dataset)

Transform the dataset by applying the preprocessing_fn.

PARAMETER DESCRIPTION
dataset

A dataset.

RETURNS DESCRIPTION

A (Dataset, TransformFn) pair containing the preprocessed dataset and

the graph that maps the input to the output data.

Source code in tensorflow_transform/beam/impl.py
def expand(self, dataset):
  """Transform the dataset by applying the preprocessing_fn.

  Args:
    dataset: A dataset.

  Returns:
    A (Dataset, TransformFn) pair containing the preprocessed dataset and
    the graph that maps the input to the output data.
  """
  # Expand is currently implemented by composing AnalyzeDataset and
  # TransformDataset.  Future versions however could do somthing more optimal,
  # e.g. caching the values of expensive computations done in AnalyzeDataset.
  transform_fn = (
      dataset | 'AnalyzeDataset' >> AnalyzeDataset(self._preprocessing_fn))

  if Context.get_use_deep_copy_optimization():
    data, metadata = dataset

    # obviates unnecessary data materialization when the input data source is
    # safe to read more than once.
    logging.info(
        'Deep copying the dataset before applying transformation')
    dataset = (deep_copy.deep_copy(data), metadata)

  transformed_dataset = (
      (dataset, transform_fn)
      | 'TransformDataset' >>
      TransformDataset(output_record_batches=self._output_record_batches))
  return transformed_dataset, transform_fn
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

AnalyzeDataset

AnalyzeDataset(preprocessing_fn, pipeline=None)

Bases: _AnalyzeDatasetCommon

Takes a preprocessing_fn and computes the relevant statistics.

AnalyzeDataset accepts a preprocessing_fn in its constructor. When its expand method is called on a dataset, it computes all the relevant statistics required to run the transformation described by the preprocessing_fn, and returns a TransformFn representing the application of the preprocessing_fn.

Init method.

PARAMETER DESCRIPTION
preprocessing_fn

A function that accepts and returns a dictionary from strings to Tensors, SparseTensors, or RaggedTensors.

pipeline

(Optional) a beam Pipeline.

DEFAULT: None

Source code in tensorflow_transform/beam/impl.py
def __init__(self, preprocessing_fn, pipeline=None):
  """Init method.

  Args:
    preprocessing_fn: A function that accepts and returns a dictionary from
      strings to `Tensor`s, `SparseTensor`s, or `RaggedTensor`s.
    pipeline: (Optional) a beam Pipeline.
  """
  self._preprocessing_fn = preprocessing_fn
  self.pipeline = pipeline
  self._save_options = Context.get_save_options()
  self._use_tf_compat_v1 = Context.get_use_tf_compat_v1()
  if self._use_tf_compat_v1:
    _warn_about_tf_compat_v1()
Attributes
label property writable
label
pipeline instance-attribute
pipeline = pipeline
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(dataset)

Analyze the dataset.

PARAMETER DESCRIPTION
dataset

A dataset.

RETURNS DESCRIPTION

A TransformFn containing the deferred transform function.

RAISES DESCRIPTION
ValueError

If preprocessing_fn has no outputs.

Source code in tensorflow_transform/beam/impl.py
def expand(self, dataset):
  input_values, input_metadata = dataset
  result, cache = super().expand((input_values, None, None, input_metadata))
  assert not cache
  return result
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

AnalyzeDatasetWithCache

AnalyzeDatasetWithCache(preprocessing_fn, pipeline=None)

Bases: _AnalyzeDatasetCommon

Takes a preprocessing_fn and computes the relevant statistics.

WARNING: This is experimental.

Operates similarly to AnalyzeDataset, by computing the required statistics except this will not re-compute statistics when they are already cached, and will write out cache for statistics that it does compute whenever possible.

Example use:

span_0_key = tft_beam.analyzer_cache.DatasetKey('span-0') cache_dir = tempfile.mkdtemp() output_path = os.path.join(tempfile.mkdtemp(), 'result') def preprocessing_fn(inputs): ... x = inputs['x'] ... return {'x_mean': tft.mean(x, name='x') + tf.zeros_like(x)} feature_spec = {'x': tf.io.FixedLenFeature([], tf.float32)} input_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) input_data_dict_0 = {span_0_key: [{'x': x} for x in range(6)]} input_data_dict_1 = {span_0_key: [{'x': x} for x in range(6, 11)]} empty_input_cache = {} with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... with beam.Pipeline() as p: ... # Iteration #0: ... transform_fn, output_cache = ( ... (input_data_dict_0, empty_input_cache, input_metadata) ... | tft_beam.AnalyzeDatasetWithCache(preprocessing_fn)) ... output_cache | tft_beam.analyzer_cache.WriteAnalysisCacheToFS( ... p, cache_dir) ... ... # Iteration #1: ... input_cache = p | tft_beam.analyzer_cache.ReadAnalysisCacheFromFS( ... cache_dir, [span_0_key]) ... transform_fn, output_cache = ( ... (input_data_dict_1, input_cache, input_metadata) ... | tft_beam.AnalyzeDatasetWithCache(preprocessing_fn)) ... output_cache | tft_beam.analyzer_cache.WriteAnalysisCacheToFS( ... p, cache_dir) ... ... # Applying the accumulated transformation: ... transform_data = p | beam.Create(input_data_dict_0[span_0_key]) ... transformed_dataset = ( ... ((transform_data, input_metadata), transform_fn) ... | tft_beam.TransformDataset()) ... transformed_data, transformed_metadata = transformed_dataset ... (transformed_data ... | beam.combiners.Sample.FixedSizeGlobally(1) ... | beam.io.WriteToText(output_path, shard_name_template='')) with open(output_path) as f: ... f.read()

"[{'x_mean': 5.0}]\n"

Init method.

PARAMETER DESCRIPTION
preprocessing_fn

A function that accepts and returns a dictionary from strings to Tensors, SparseTensors, or RaggedTensors.

pipeline

(Optional) a beam Pipeline.

DEFAULT: None

Source code in tensorflow_transform/beam/impl.py
def __init__(self, preprocessing_fn, pipeline=None):
  """Init method.

  Args:
    preprocessing_fn: A function that accepts and returns a dictionary from
      strings to `Tensor`s, `SparseTensor`s, or `RaggedTensor`s.
    pipeline: (Optional) a beam Pipeline.
  """
  self._preprocessing_fn = preprocessing_fn
  self.pipeline = pipeline
  self._save_options = Context.get_save_options()
  self._use_tf_compat_v1 = Context.get_use_tf_compat_v1()
  if self._use_tf_compat_v1:
    _warn_about_tf_compat_v1()
Attributes
label property writable
label
pipeline instance-attribute
pipeline = pipeline
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(dataset)

Analyze the dataset.

PARAMETER DESCRIPTION
dataset

A dataset.

RETURNS DESCRIPTION

A TransformFn containing the deferred transform function.

RAISES DESCRIPTION
ValueError

If preprocessing_fn has no outputs.

Source code in tensorflow_transform/beam/impl.py
def expand(self, dataset):
  input_values_pcoll_dict = dataset[1] or dict()
  analyzer_cache.validate_dataset_keys(input_values_pcoll_dict.keys())
  return super().expand(self._make_parent_dataset(dataset))
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

Context

Context(
    temp_dir: Optional[str] = None,
    desired_batch_size: Optional[int] = None,
    passthrough_keys: Optional[Iterable[str]] = None,
    use_deep_copy_optimization: Optional[bool] = None,
    force_tf_compat_v1: Optional[bool] = None,
    save_options: Optional[SaveOptions] = None,
)

Context manager for tensorflow-transform.

All the attributes in this context are kept on a thread local state.

ATTRIBUTE DESCRIPTION
temp_dir

(Optional) The temporary directory used within in this block.

desired_batch_size

(Optional) A batch size to batch elements by. If not provided, a batch size will be computed automatically.

passthrough_keys

(Optional) A set of strings that are keys to instances that should pass through the pipeline and be hidden from the preprocessing_fn. This should only be used in cases where additional information should be attached to instances in the pipeline which should not be part of the transformation graph, instance keys is one such example.

use_deep_copy_optimization

(Optional) If True, makes deep copies of PCollections that are used in multiple TFT phases.

force_tf_compat_v1

(Optional) If True, TFT's public APIs (e.g. AnalyzeDataset) will use Tensorflow in compat.v1 mode irrespective of installed version of Tensorflow. Defaults to False.

save_options

(Optional) If set, the tf.saved_model.SaveOptions to save the transform_fn with. Only applies for TF2.

Note that the temp dir should be accessible to worker jobs, e.g. if running with the Cloud Dataflow runner, the temp dir should be on GCS and should have permissions that allow both launcher and workers to access it.

Source code in tensorflow_transform/beam/context.py
def __init__(self,
             temp_dir: Optional[str] = None,
             desired_batch_size: Optional[int] = None,
             passthrough_keys: Optional[Iterable[str]] = None,
             use_deep_copy_optimization: Optional[bool] = None,
             force_tf_compat_v1: Optional[bool] = None,
             save_options: Optional[tf.saved_model.SaveOptions] = None):
  state = getattr(self._thread_local, 'state', None)
  if not state:
    self._thread_local.state = self._StateStack()
    self._thread_local.state.frames.append(
        self._State(*(None,) * len(dataclasses.fields(self._State))))

  self._temp_dir = temp_dir
  self._desired_batch_size = desired_batch_size
  self._passthrough_keys = passthrough_keys
  self._use_deep_copy_optimization = use_deep_copy_optimization
  self._force_tf_compat_v1 = force_tf_compat_v1
  self._save_options = save_options
Functions
create_base_temp_dir classmethod
create_base_temp_dir() -> str

Generate a temporary location.

Source code in tensorflow_transform/beam/context.py
@classmethod
def create_base_temp_dir(cls) -> str:
  """Generate a temporary location."""
  state = cls._get_topmost_state_frame()
  if not state.temp_dir:
    raise ValueError(
        'A tf.Transform function that required a temp dir was called but no '
        'temp dir was set.  To set a temp dir use the impl.Context context '
        'manager.')
  base_temp_dir = os.path.join(state.temp_dir, cls._TEMP_SUBDIR)

  # TODO(b/35363519): Perhaps use Beam IO eventually?
  tf.io.gfile.makedirs(base_temp_dir)
  return base_temp_dir
get_desired_batch_size classmethod
get_desired_batch_size() -> Optional[int]

Retrieves a user set fixed batch size, None if not set.

Source code in tensorflow_transform/beam/context.py
@classmethod
def get_desired_batch_size(cls) -> Optional[int]:
  """Retrieves a user set fixed batch size, None if not set."""
  state = cls._get_topmost_state_frame()
  if state.desired_batch_size is not None:
    tf.compat.v1.logging.info('Using fixed batch size: %d',
                              state.desired_batch_size)
    return state.desired_batch_size
  return None
get_passthrough_keys classmethod
get_passthrough_keys() -> Iterable[str]

Retrieves a user set passthrough_keys, None if not set.

Source code in tensorflow_transform/beam/context.py
@classmethod
def get_passthrough_keys(cls) -> Iterable[str]:
  """Retrieves a user set passthrough_keys, None if not set."""
  state = cls._get_topmost_state_frame()
  if state.passthrough_keys is not None:
    return state.passthrough_keys
  return set()
get_save_options classmethod
get_save_options() -> Optional[SaveOptions]

Retrieves a user set save_options, None if not set.

Source code in tensorflow_transform/beam/context.py
@classmethod
def get_save_options(cls) -> Optional[tf.saved_model.SaveOptions]:
  """Retrieves a user set save_options, None if not set."""
  state = cls._get_topmost_state_frame()
  if state.save_options is not None:
    tf.compat.v1.logging.info('Using save_options: %s', state.save_options)
    return state.save_options
  return None
get_use_deep_copy_optimization classmethod
get_use_deep_copy_optimization() -> bool

Retrieves a user set use_deep_copy_optimization, None if not set.

Source code in tensorflow_transform/beam/context.py
@classmethod
def get_use_deep_copy_optimization(cls) -> bool:
  """Retrieves a user set use_deep_copy_optimization, None if not set."""
  state = cls._get_topmost_state_frame()
  if state.use_deep_copy_optimization is not None:
    return state.use_deep_copy_optimization
  return False
get_use_tf_compat_v1 classmethod
get_use_tf_compat_v1() -> bool

Computes use_tf_compat_v1 from TF environment and force_tf_compat_v1.

Source code in tensorflow_transform/beam/context.py
@classmethod
def get_use_tf_compat_v1(cls) -> bool:
  """Computes use_tf_compat_v1 from TF environment and force_tf_compat_v1."""
  force_tf_compat_v1 = cls._get_force_tf_compat_v1()
  return tf2_utils.use_tf_compat_v1(force_tf_compat_v1)

EncodeTransformedDataset

EncodeTransformedDataset(label=None)

Bases: PTransform

Encodes transformed data into serialized tf.Examples.

Should operate on the output of TransformDataset, this can operate on either record batch or instance dict data. The expected input is a (transformed_data, transformed_metadata) tuple.

Example use:

def preprocessing_fn(inputs): ... return {'x_scaled': tft.scale_to_z_score(inputs['x'], name='x')} raw_data = [dict(x=1), dict(x=2), dict(x=3)] feature_spec = dict(x=tf.io.FixedLenFeature([], tf.int64)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) output_path = os.path.join(tempfile.mkdtemp(), 'result') with beam.Pipeline() as p: ... with tft_beam.Context(temp_dir=tempfile.mkdtemp()): ... data_pcoll = p | beam.Create(raw_data) ... transformed_dataset, transform_fn = ( ... (data_pcoll, raw_data_metadata) ... | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) ... _ = ( ... transformed_dataset ... | tft_beam.EncodeTransformedDataset() ... | beam.io.WriteToTFRecord(output_path, shard_name_template='')) result_feature_spec ={'x_scaled': tf.io.FixedLenFeature([], tf.float32)} list(tf.data.TFRecordDataset([output_path]) ... .map(lambda x: tf.io.parse_example(x, result_feature_spec)) ... .as_numpy_iterator()) [{'x_scaled': -1.2247448}, {'x_scaled': 0.0}, {'x_scaled': 1.2247448}]

Source code in apache_beam/transforms/ptransform.py
def __init__(self, label=None):
  # type: (Optional[str]) -> None
  super().__init__()
  self.label = label  # type: ignore # https://github.com/python/mypy/issues/3004
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(transformed_data_and_metadata)
Source code in tensorflow_transform/beam/impl.py
def expand(self, transformed_data_and_metadata):

  transformed_data, transformed_metadata = transformed_data_and_metadata

  deferred_schema = (
      transformed_metadata.deferred_metadata
      | 'GetDeferredSchema' >> beam.Map(lambda m: m.schema))

  if transformed_metadata.dataset_metadata._output_record_batches:  # pylint: disable=protected-access
    transformed_data_coder_pcol = (
        deferred_schema | 'RecordBatchToExamplesEncoder' >> beam.Map(
            example_coder.RecordBatchToExamplesEncoder))
    encode_ptransform = 'EncodeRecordBatches' >> beam.FlatMap(
        # Dropping passthrough features.
        lambda elem, coder: coder.encode(elem[0]),
        coder=beam.pvalue.AsSingleton(transformed_data_coder_pcol))
  else:
    transformed_data_coder_pcol = (
        deferred_schema
        | 'ExampleProtoCoder' >> beam.Map(
            example_proto_coder.ExampleProtoCoder))
    encode_ptransform = 'EncodeInstances' >> beam.Map(
        lambda data, data_coder: data_coder.encode(data),
        data_coder=beam.pvalue.AsSingleton(transformed_data_coder_pcol))

  return transformed_data | encode_ptransform
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

ReadTransformFn

ReadTransformFn(path)

Bases: PTransform

Reads a TransformFn written by WriteTransformFn.

Source code in tensorflow_transform/beam/tft_beam_io/transform_fn_io.py
def __init__(self, path):
  super().__init__()
  self._path = path
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(pvalue)
Source code in tensorflow_transform/beam/tft_beam_io/transform_fn_io.py
def expand(self, pvalue):
  transform_fn_path = os.path.join(self._path,
                                   tft.TFTransformOutput.TRANSFORM_FN_DIR)
  saved_model_dir_pcoll = (
      pvalue.pipeline
      | 'CreateTransformFnPath' >> beam.Create([transform_fn_path]))

  metadata = metadata_io.read_metadata(
      os.path.join(self._path,
                   tft.TFTransformOutput.TRANSFORMED_METADATA_DIR))

  return saved_model_dir_pcoll, metadata
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

TransformDataset

TransformDataset(
    exclude_outputs=None, output_record_batches=False
)

Bases: PTransform

Applies the transformation computed by transforming a Dataset.

TransformDataset's expand method is called on a (dataset, transform_fn) pair. It applies the transform_fn to each row of the input dataset and returns the resulting dataset.

PARAMETER DESCRIPTION
exclude_outputs

(Optional) Output features that should not be produced.

DEFAULT: None

output_record_batches

(Optional) A bool. If True, TransformDataset outputs pyarrow.RecordBatches; otherwise, outputs instance dicts.

DEFAULT: False

Source code in tensorflow_transform/beam/impl.py
def __init__(self, exclude_outputs=None, output_record_batches=False):
  self._exclude_outputs = exclude_outputs
  self._output_record_batches = output_record_batches
  self._use_tf_compat_v1 = Context.get_use_tf_compat_v1()
  if self._use_tf_compat_v1:
    _warn_about_tf_compat_v1()
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(dataset_and_transform_fn)

Transforms the dataset using the transform_fn.

PARAMETER DESCRIPTION
dataset_and_transform_fn

A tuple of dataset and preprocessing

RETURNS DESCRIPTION

A dataset transformed according to the transform_fn.

Source code in tensorflow_transform/beam/impl.py
def expand(self, dataset_and_transform_fn):
  """Transforms the dataset using the transform_fn.

  Args:
    dataset_and_transform_fn: A tuple of dataset and preprocessing
    function.

  Returns:
    A dataset transformed according to the transform_fn.
  """
  (input_values, input_metadata), (transform_fn, output_metadata) = (
      dataset_and_transform_fn)
  if isinstance(input_metadata, dataset_metadata.DatasetMetadata):
    if Context.get_passthrough_keys():
      raise ValueError('passthrough_keys is set to {} but it is not '
                       'supported with instance dicts + DatasetMetadata '
                       'input. Follow the guide to switch to the TFXIO '
                       'format.'.format(Context.get_passthrough_keys()))
    logging.warning(
        'You are passing instance dicts and DatasetMetadata to TFT which '
        'will not provide optimal performance. Consider following the TFT '
        'guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).')
    to_tfxio_ptransform = _InstanceDictInputToTFXIOInput(
        input_metadata.schema, Context.get_desired_batch_size())
    input_tensor_adapter_config = to_tfxio_ptransform.tensor_adapter_config()
    input_values |= 'InstanceDictToRecordBatch' >> to_tfxio_ptransform
  else:
    input_tensor_adapter_config = input_metadata

  # If exclude_outputs is set, update the output metadata.
  if self._exclude_outputs is not None:
    if isinstance(output_metadata, beam_metadata_io.BeamDatasetMetadata):
      new_metadata = _remove_columns_from_metadata(
          output_metadata.dataset_metadata, self._exclude_outputs)
      new_deferred_metadata = (
          output_metadata.deferred_metadata
          | 'RemoveColumns'
          >> beam.Map(_remove_columns_from_metadata, self._exclude_outputs)
      )
      output_metadata = beam_metadata_io.BeamDatasetMetadata(
          new_metadata, new_deferred_metadata, output_metadata.asset_map)
    else:
      output_metadata = _remove_columns_from_metadata(
          output_metadata, self._exclude_outputs)

  if isinstance(output_metadata, beam_metadata_io.BeamDatasetMetadata):
    deferred_schema = (
        output_metadata.deferred_metadata
        | 'GetDeferredSchema' >> beam.Map(lambda m: m.schema))
    output_dataset_metadata = output_metadata.dataset_metadata
  else:
    deferred_schema = (
        self.pipeline
        | 'CreateDeferredSchema' >> beam.Create([output_metadata.schema]))
    output_dataset_metadata = output_metadata
  output_dataset_metadata._output_record_batches = self._output_record_batches  # pylint: disable=protected-access

  # Increment input metrics.
  _ = (
      input_values
      | 'InstrumentInputBytes[Transform]' >> telemetry.TrackRecordBatchBytes(
          beam_common.METRICS_NAMESPACE, 'transform_input_bytes'))

  _ = (
      self.pipeline | 'CreateTransformInputTensorRepresentations' >>
      beam.Create([input_tensor_adapter_config.tensor_representations])
      | 'InstrumentTransformInputTensors' >>
      telemetry.TrackTensorRepresentations(
          telemetry_util.AppendToNamespace(beam_common.METRICS_NAMESPACE,
                                           ['transform_input_tensors'])))

  tf_config = _DEFAULT_TENSORFLOW_CONFIG_BY_BEAM_RUNNER_TYPE.get(
      type(self.pipeline.runner))
  output_batches = input_values | 'Transform' >> beam.ParDo(
      _RunMetaGraphDoFn(
          tf_config,
          input_tensor_adapter_config=input_tensor_adapter_config,
          use_tf_compat_v1=self._use_tf_compat_v1,
          shared_graph_state_handle=shared.Shared(),
          passthrough_keys=Context.get_passthrough_keys(),
          exclude_outputs=self._exclude_outputs,
      ),
      saved_model_dir=beam.pvalue.AsSingleton(transform_fn),
  )

  # Since we are using a deferred schema, obtain a pcollection containing
  # the converter that will be created from it.
  converter_pcol = deferred_schema | 'MakeTensorToArrowConverter' >> beam.Map(
      impl_helper.make_tensor_to_arrow_converter
  )

  # Increment output data metrics.
  _ = (
      converter_pcol
      | 'MapToTensorRepresentations'
      >> beam.Map(lambda converter: converter.tensor_representations())
      | 'InstrumentTransformOutputTensors'
      >> telemetry.TrackTensorRepresentations(
          telemetry_util.AppendToNamespace(
              beam_common.METRICS_NAMESPACE, ['transform_output_tensors']
          )
      )
  )

  output_data = output_batches | 'ConvertToRecordBatch' >> beam.FlatMap(
      _convert_to_record_batch,
      converter=beam.pvalue.AsSingleton(converter_pcol),
      passthrough_keys=Context.get_passthrough_keys(),
      input_metadata=input_metadata,
      # TODO(b/254822532): Consider always doing the validation.
      validate_varlen_sparse_values=not self._output_record_batches,
  )

  if not self._output_record_batches:
    logging.warning(
        'You are outputting instance dicts from `TransformDataset` which '
        'will not provide optimal performance. Consider setting  '
        '`output_record_batches=True` to upgrade to the TFXIO format (Apache '
        'Arrow RecordBatch). Encoding functionality in this module works '
        'with both formats.'
    )
    output_data |= 'ConvertAndUnbatchToInstanceDicts' >> beam.FlatMap(
        _transformed_batch_to_instance_dicts,
        schema=beam.pvalue.AsSingleton(deferred_schema),
    )

  _clear_shared_state_after_barrier(self.pipeline, output_data)

  return (output_data, output_metadata)
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

WriteMetadata

WriteMetadata(
    path, pipeline, write_to_unique_subdirectory=False
)

Bases: PTransform

A PTransform to write Metadata to disk.

Input can either be a DatasetMetadata or a tuple of properties.

Depending on the optional write_to_unique_subdirectory, writes the given metadata to either path or a new unique subdirectory under path.

Returns a singleton with the path to which the metadata was written.

Init method.

PARAMETER DESCRIPTION
path

A str, the default path that the metadata should be written to.

pipeline

A beam Pipeline.

write_to_unique_subdirectory

(Optional) A bool indicating whether to write the metadata out to path or a unique subdirectory under path.

DEFAULT: False

Source code in tensorflow_transform/beam/tft_beam_io/beam_metadata_io.py
def __init__(self, path, pipeline, write_to_unique_subdirectory=False):
  """Init method.

  Args:
    path: A str, the default path that the metadata should be written to.
    pipeline: A beam Pipeline.
    write_to_unique_subdirectory: (Optional) A bool indicating whether to
      write the metadata out to `path` or a unique subdirectory under `path`.
  """
  super().__init__()
  self._path = path
  self._write_to_unique_subdirectory = write_to_unique_subdirectory
  self.pipeline = pipeline
Attributes
label property writable
label
pipeline instance-attribute
pipeline = pipeline
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(metadata)
Source code in tensorflow_transform/beam/tft_beam_io/beam_metadata_io.py
def expand(self, metadata):
  if hasattr(metadata, 'deferred_metadata'):
    metadata_pcoll = metadata.deferred_metadata
  else:
    metadata_pcoll = self.pipeline | beam.Create([metadata])

  asset_map = getattr(metadata, 'asset_map', {})

  def write_metadata_output(metadata):
    output_path = self._path
    if self._write_to_unique_subdirectory:
      output_path = common.get_unique_temp_path(self._path)
    metadata_io.write_metadata(metadata, output_path)
    if asset_map:
      with tf.io.gfile.GFile(
          os.path.join(output_path,
                       output_wrapper.TFTransformOutput.ASSET_MAP), 'w') as f:
        f.write(json.dumps(asset_map))
    return output_path

  return metadata_pcoll | 'WriteMetadata' >> beam.Map(write_metadata_output)
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

WriteTransformFn

WriteTransformFn(path)

Bases: PTransform

Writes a TransformFn to disk.

The internal structure is a directory containing two subdirectories. The first is 'transformed_metadata' and contains metadata of the transformed data. The second is 'transform_fn' and contains a SavedModel representing the transformed data.

Source code in tensorflow_transform/beam/tft_beam_io/transform_fn_io.py
def __init__(self, path):
  super().__init__()
  self._path = path
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(transform_fn)
Source code in tensorflow_transform/beam/tft_beam_io/transform_fn_io.py
def expand(self, transform_fn):
  saved_model_dir, metadata = transform_fn
  pipeline = saved_model_dir.pipeline

  # Using a temp dir within `path` ensures that the source and dstination
  # paths for the rename below are in the same file system.
  base_temp_dir = os.path.join(self._path, 'transform_tmp')
  temp_metadata_path = (
      metadata
      | 'WriteMetadataToTemp' >> beam_metadata_io.WriteMetadata(
          base_temp_dir, pipeline, write_to_unique_subdirectory=True))

  temp_transform_fn_path = (
      saved_model_dir
      | 'WriteTransformFnToTemp' >> beam.Map(_copy_tree_to_unique_temp_dir,
                                             base_temp_dir))

  metadata_path = os.path.join(self._path,
                               tft.TFTransformOutput.TRANSFORMED_METADATA_DIR)
  transform_fn_path = os.path.join(self._path,
                                   tft.TFTransformOutput.TRANSFORM_FN_DIR)

  def publish_outputs(unused_element, metadata_source_path,
                      transform_fn_source_path):
    import tensorflow as tf  # pylint: disable=g-import-not-at-top
    if not tf.io.gfile.exists(self._path):
      tf.io.gfile.makedirs(self._path)

    if tf.io.gfile.exists(metadata_path):
      tf.io.gfile.rmtree(metadata_path)
    tf.io.gfile.rename(metadata_source_path, metadata_path, overwrite=True)

    if tf.io.gfile.exists(transform_fn_path):
      tf.io.gfile.rmtree(transform_fn_path)
    tf.io.gfile.rename(
        transform_fn_source_path, transform_fn_path, overwrite=True)

    # TODO(b/211615643): Remove the exists check once importing TFIO in S3
    # addresses NotFoundError.
    if tf.io.gfile.exists(base_temp_dir):
      tf.io.gfile.rmtree(base_temp_dir)

  # TODO(KesterTong): Move this "must follows" logic into a tfx_bsl helper
  # function or into Beam.
  return (
      pipeline
      | 'CreateSole' >> beam.Create([None])
      | 'PublishMetadataAndTransformFn' >> beam.Map(
          publish_outputs,
          metadata_source_path=beam.pvalue.AsSingleton(temp_metadata_path),
          transform_fn_source_path=beam.pvalue.AsSingleton(
              temp_transform_fn_path)))
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self