Skip to content

TensorFlow Transform tft.beam.experimental Module

tensorflow_transform.beam.experimental

Module level imports for tensorflow_transform.beam.experimental.

Classes

PTransformAnalyzer

PTransformAnalyzer()

Bases: PTransform

A PTransform analyzer's base class which provides a temp dir if needed.

Source code in tensorflow_transform/beam/experimental/analyzer_impls.py
def __init__(self):
  self._base_temp_dir = None
Attributes
base_temp_dir property writable
base_temp_dir
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(input_or_inputs: InputT) -> OutputT
Source code in apache_beam/transforms/ptransform.py
def expand(self, input_or_inputs: InputT) -> OutputT:
  raise NotImplementedError
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(
    cls,
    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
    context  # type: PipelineContext
):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self