Module pipeline_dp.private_beam
Expand source code
from apache_beam.transforms import ptransform
from abc import abstractmethod
from typing import Callable, Optional
from apache_beam import pvalue
import apache_beam as beam
import pipeline_dp
from pipeline_dp import aggregate_params, budget_accounting
class PrivatePTransform(ptransform.PTransform):
"""Abstract class for PrivatePTransforms."""
def __init__(self, return_anonymized: bool, label: Optional[str] = None):
super().__init__(label)
self._return_anonymized = return_anonymized
self._budget_accountant = None
def set_additional_parameters(
self, budget_accountant: budget_accounting.BudgetAccountant):
"""Sets the additional parameters needed for the private transform."""
self._budget_accountant = budget_accountant
@abstractmethod
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
pass
class PrivatePCollection:
"""Private counterpart for PCollection.
PrivatePCollection guarantees that only anonymized data within the specified
privacy budget can be extracted from it through PrivatePTransforms."""
def __init__(self, pcol: pvalue.PCollection,
budget_accountant: budget_accounting.BudgetAccountant):
self._pcol = pcol
self._budget_accountant = budget_accountant
def __or__(self, private_transform: PrivatePTransform):
if not isinstance(private_transform, PrivatePTransform):
raise TypeError(
"private_transform should be of type PrivatePTransform but is "
+ "%s", private_transform)
private_transform.set_additional_parameters(
budget_accountant=self._budget_accountant)
transformed = self._pcol.pipeline.apply(private_transform, self._pcol)
return (transformed if private_transform._return_anonymized else
(PrivatePCollection(transformed, self._budget_accountant)))
class MakePrivate(PrivatePTransform):
"""Transform class for creating a PrivatePCollection."""
def __init__(self,
budget_accountant: budget_accounting.BudgetAccountant,
privacy_id_extractor: Callable,
label: Optional[str] = None):
super().__init__(return_anonymized=False, label=label)
self._budget_accountant = budget_accountant
self._privacy_id_extractor = privacy_id_extractor
def expand(self, pcol: pvalue.PCollection):
pcol = pcol | "Extract privacy id" >> beam.Map(
lambda x: (self._privacy_id_extractor(x), x))
return PrivatePCollection(pcol, self._budget_accountant)
class Sum(PrivatePTransform):
"""Transform class for performing DP Sum on PrivatePCollection."""
def __init__(self,
sum_params: aggregate_params.SumParams,
label: Optional[str] = None):
super().__init__(return_anonymized=True, label=label)
self._sum_params = sum_params
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)
params = pipeline_dp.AggregateParams(
noise_kind=self._sum_params.noise_kind,
metrics=[pipeline_dp.Metrics.SUM],
max_partitions_contributed=self._sum_params.
max_partitions_contributed,
max_contributions_per_partition=self._sum_params.
max_contributions_per_partition,
min_value=self._sum_params.min_value,
max_value=self._sum_params.max_value,
public_partitions=self._sum_params.public_partitions)
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._sum_params.partition_extractor(
x[1]),
privacy_id_extractor=lambda x: x[0],
value_extractor=lambda x: self._sum_params.value_extractor(x[1]))
dp_result = dp_engine.aggregate(pcol, params, data_extractors)
# dp_result : (partition_key, [dp_sum])
# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - sum. Remove list.
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_sum)
return dp_result
class Count(PrivatePTransform):
"""Transform class for performing DP Count on PrivatePCollection."""
def __init__(self,
count_params: aggregate_params.CountParams,
label: Optional[str] = None):
super().__init__(return_anonymized=True, label=label)
self._count_params = count_params
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)
params = pipeline_dp.AggregateParams(
noise_kind=self._count_params.noise_kind,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=self._count_params.
max_partitions_contributed,
max_contributions_per_partition=self._count_params.
max_contributions_per_partition,
public_partitions=self._count_params.public_partitions)
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._count_params.
partition_extractor(x[1]),
privacy_id_extractor=lambda x: x[0],
# Count calculates the number of elements per partition key and
# doesn't use value extractor.
value_extractor=lambda x: None)
dp_result = dp_engine.aggregate(pcol, params, data_extractors)
# dp_result : (partition_key, [dp_count])
# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - count. Remove list.
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_count)
return dp_result
class PrivacyIdCount(PrivatePTransform):
"""Transform class for performing DP Privacy ID Count on PrivatePCollection."""
def __init__(self,
privacy_id_count_params: aggregate_params.PrivacyIdCountParams,
label: Optional[str] = None):
super().__init__(return_anonymized=True, label=label)
self._privacy_id_count_params = privacy_id_count_params
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)
params = pipeline_dp.AggregateParams(
noise_kind=self._privacy_id_count_params.noise_kind,
metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
max_partitions_contributed=self._privacy_id_count_params.
max_partitions_contributed,
max_contributions_per_partition=1,
public_partitions=self._privacy_id_count_params.public_partitions)
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: self._privacy_id_count_params.
partition_extractor(x[1]),
privacy_id_extractor=lambda x: x[0],
# PrivacyIdCount ignores values.
value_extractor=lambda x: None)
dp_result = dp_engine.aggregate(pcol, params, data_extractors)
# dp_result : (partition_key, [dp_privacy_id_count])
# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - privacy_id_count. Remove list.
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_privacy_id_count)
return dp_result
class Map(PrivatePTransform):
"""Transform class for performing Map on PrivatePCollection."""
def __init__(self, fn: Callable, label: Optional[str] = None):
super().__init__(return_anonymized=False, label=label)
self._fn = fn
def expand(self, pcol: pvalue.PCollection):
return pcol | "map values" >> beam.Map(lambda x: (x[0], self._fn(x[1])))
class FlatMap(PrivatePTransform):
"""Transform class for performing FlatMap on PrivatePCollection."""
class _FlattenValues(beam.DoFn):
"""Inner class for flattening values of key value pair.
Flattens (1, (2,3,4)) into ((1,2), (1,3), (1,4))"""
def __init__(self, map_fn: Callable):
self._map_fn = map_fn
def process(self, row):
key = row[0]
values = self._map_fn(row[1])
for value in values:
yield key, value
def __init__(self, fn: Callable, label: Optional[str] = None):
super().__init__(return_anonymized=False, label=label)
self._fn = fn
def expand(self, pcol: pvalue.PCollection):
return pcol | "flatten values" >> beam.ParDo(
FlatMap._FlattenValues(map_fn=self._fn))
Classes
class Count (count_params: CountParams, label: Optional[str] = None)
-
Transform class for performing DP Count on PrivatePCollection.
Expand source code
class Count(PrivatePTransform): """Transform class for performing DP Count on PrivatePCollection.""" def __init__(self, count_params: aggregate_params.CountParams, label: Optional[str] = None): super().__init__(return_anonymized=True, label=label) self._count_params = count_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._count_params.noise_kind, metrics=[pipeline_dp.Metrics.COUNT], max_partitions_contributed=self._count_params. max_partitions_contributed, max_contributions_per_partition=self._count_params. max_contributions_per_partition, public_partitions=self._count_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._count_params. partition_extractor(x[1]), privacy_id_extractor=lambda x: x[0], # Count calculates the number of elements per partition key and # doesn't use value extractor. value_extractor=lambda x: None) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_count]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - count. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_count) return dp_result
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection) ‑> apache_beam.pvalue.PCollection
-
Expand source code
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._count_params.noise_kind, metrics=[pipeline_dp.Metrics.COUNT], max_partitions_contributed=self._count_params. max_partitions_contributed, max_contributions_per_partition=self._count_params. max_contributions_per_partition, public_partitions=self._count_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._count_params. partition_extractor(x[1]), privacy_id_extractor=lambda x: x[0], # Count calculates the number of elements per partition key and # doesn't use value extractor. value_extractor=lambda x: None) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_count]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - count. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_count) return dp_result
Inherited members
class FlatMap (fn: Callable, label: Optional[str] = None)
-
Transform class for performing FlatMap on PrivatePCollection.
Expand source code
class FlatMap(PrivatePTransform): """Transform class for performing FlatMap on PrivatePCollection.""" class _FlattenValues(beam.DoFn): """Inner class for flattening values of key value pair. Flattens (1, (2,3,4)) into ((1,2), (1,3), (1,4))""" def __init__(self, map_fn: Callable): self._map_fn = map_fn def process(self, row): key = row[0] values = self._map_fn(row[1]) for value in values: yield key, value def __init__(self, fn: Callable, label: Optional[str] = None): super().__init__(return_anonymized=False, label=label) self._fn = fn def expand(self, pcol: pvalue.PCollection): return pcol | "flatten values" >> beam.ParDo( FlatMap._FlattenValues(map_fn=self._fn))
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection)
-
Expand source code
def expand(self, pcol: pvalue.PCollection): return pcol | "flatten values" >> beam.ParDo( FlatMap._FlattenValues(map_fn=self._fn))
Inherited members
class MakePrivate (budget_accountant: pipeline_dp.budget_accounting.BudgetAccountant, privacy_id_extractor: Callable, label: Optional[str] = None)
-
Transform class for creating a PrivatePCollection.
Expand source code
class MakePrivate(PrivatePTransform): """Transform class for creating a PrivatePCollection.""" def __init__(self, budget_accountant: budget_accounting.BudgetAccountant, privacy_id_extractor: Callable, label: Optional[str] = None): super().__init__(return_anonymized=False, label=label) self._budget_accountant = budget_accountant self._privacy_id_extractor = privacy_id_extractor def expand(self, pcol: pvalue.PCollection): pcol = pcol | "Extract privacy id" >> beam.Map( lambda x: (self._privacy_id_extractor(x), x)) return PrivatePCollection(pcol, self._budget_accountant)
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection)
-
Expand source code
def expand(self, pcol: pvalue.PCollection): pcol = pcol | "Extract privacy id" >> beam.Map( lambda x: (self._privacy_id_extractor(x), x)) return PrivatePCollection(pcol, self._budget_accountant)
Inherited members
class Map (fn: Callable, label: Optional[str] = None)
-
Transform class for performing Map on PrivatePCollection.
Expand source code
class Map(PrivatePTransform): """Transform class for performing Map on PrivatePCollection.""" def __init__(self, fn: Callable, label: Optional[str] = None): super().__init__(return_anonymized=False, label=label) self._fn = fn def expand(self, pcol: pvalue.PCollection): return pcol | "map values" >> beam.Map(lambda x: (x[0], self._fn(x[1])))
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection)
-
Expand source code
def expand(self, pcol: pvalue.PCollection): return pcol | "map values" >> beam.Map(lambda x: (x[0], self._fn(x[1])))
Inherited members
class PrivacyIdCount (privacy_id_count_params: PrivacyIdCountParams, label: Optional[str] = None)
-
Transform class for performing DP Privacy ID Count on PrivatePCollection.
Expand source code
class PrivacyIdCount(PrivatePTransform): """Transform class for performing DP Privacy ID Count on PrivatePCollection.""" def __init__(self, privacy_id_count_params: aggregate_params.PrivacyIdCountParams, label: Optional[str] = None): super().__init__(return_anonymized=True, label=label) self._privacy_id_count_params = privacy_id_count_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._privacy_id_count_params.noise_kind, metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT], max_partitions_contributed=self._privacy_id_count_params. max_partitions_contributed, max_contributions_per_partition=1, public_partitions=self._privacy_id_count_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._privacy_id_count_params. partition_extractor(x[1]), privacy_id_extractor=lambda x: x[0], # PrivacyIdCount ignores values. value_extractor=lambda x: None) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_privacy_id_count]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - privacy_id_count. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_privacy_id_count) return dp_result
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection) ‑> apache_beam.pvalue.PCollection
-
Expand source code
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._privacy_id_count_params.noise_kind, metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT], max_partitions_contributed=self._privacy_id_count_params. max_partitions_contributed, max_contributions_per_partition=1, public_partitions=self._privacy_id_count_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._privacy_id_count_params. partition_extractor(x[1]), privacy_id_extractor=lambda x: x[0], # PrivacyIdCount ignores values. value_extractor=lambda x: None) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_privacy_id_count]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - privacy_id_count. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_privacy_id_count) return dp_result
Inherited members
class PrivatePCollection (pcol: apache_beam.pvalue.PCollection, budget_accountant: pipeline_dp.budget_accounting.BudgetAccountant)
-
Private counterpart for PCollection.
PrivatePCollection guarantees that only anonymized data within the specified privacy budget can be extracted from it through PrivatePTransforms.
Expand source code
class PrivatePCollection: """Private counterpart for PCollection. PrivatePCollection guarantees that only anonymized data within the specified privacy budget can be extracted from it through PrivatePTransforms.""" def __init__(self, pcol: pvalue.PCollection, budget_accountant: budget_accounting.BudgetAccountant): self._pcol = pcol self._budget_accountant = budget_accountant def __or__(self, private_transform: PrivatePTransform): if not isinstance(private_transform, PrivatePTransform): raise TypeError( "private_transform should be of type PrivatePTransform but is " + "%s", private_transform) private_transform.set_additional_parameters( budget_accountant=self._budget_accountant) transformed = self._pcol.pipeline.apply(private_transform, self._pcol) return (transformed if private_transform._return_anonymized else (PrivatePCollection(transformed, self._budget_accountant)))
class PrivatePTransform (return_anonymized: bool, label: Optional[str] = None)
-
Abstract class for PrivatePTransforms.
Expand source code
class PrivatePTransform(ptransform.PTransform): """Abstract class for PrivatePTransforms.""" def __init__(self, return_anonymized: bool, label: Optional[str] = None): super().__init__(label) self._return_anonymized = return_anonymized self._budget_accountant = None def set_additional_parameters( self, budget_accountant: budget_accounting.BudgetAccountant): """Sets the additional parameters needed for the private transform.""" self._budget_accountant = budget_accountant @abstractmethod def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: pass
Ancestors
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Subclasses
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection) ‑> apache_beam.pvalue.PCollection
-
Expand source code
@abstractmethod def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: pass
def set_additional_parameters(self, budget_accountant: pipeline_dp.budget_accounting.BudgetAccountant)
-
Sets the additional parameters needed for the private transform.
Expand source code
def set_additional_parameters( self, budget_accountant: budget_accounting.BudgetAccountant): """Sets the additional parameters needed for the private transform.""" self._budget_accountant = budget_accountant
class Sum (sum_params: SumParams, label: Optional[str] = None)
-
Transform class for performing DP Sum on PrivatePCollection.
Expand source code
class Sum(PrivatePTransform): """Transform class for performing DP Sum on PrivatePCollection.""" def __init__(self, sum_params: aggregate_params.SumParams, label: Optional[str] = None): super().__init__(return_anonymized=True, label=label) self._sum_params = sum_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._sum_params.noise_kind, metrics=[pipeline_dp.Metrics.SUM], max_partitions_contributed=self._sum_params. max_partitions_contributed, max_contributions_per_partition=self._sum_params. max_contributions_per_partition, min_value=self._sum_params.min_value, max_value=self._sum_params.max_value, public_partitions=self._sum_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._sum_params.partition_extractor( x[1]), privacy_id_extractor=lambda x: x[0], value_extractor=lambda x: self._sum_params.value_extractor(x[1])) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_sum]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - sum. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_sum) return dp_result
Ancestors
- PrivatePTransform
- apache_beam.transforms.ptransform.PTransform
- apache_beam.typehints.decorators.WithTypeHints
- apache_beam.transforms.display.HasDisplayData
Methods
def expand(self, pcol: apache_beam.pvalue.PCollection) ‑> apache_beam.pvalue.PCollection
-
Expand source code
def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: backend = pipeline_dp.BeamBackend() dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._sum_params.noise_kind, metrics=[pipeline_dp.Metrics.SUM], max_partitions_contributed=self._sum_params. max_partitions_contributed, max_contributions_per_partition=self._sum_params. max_contributions_per_partition, min_value=self._sum_params.min_value, max_value=self._sum_params.max_value, public_partitions=self._sum_params.public_partitions) data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: self._sum_params.partition_extractor( x[1]), privacy_id_extractor=lambda x: x[0], value_extractor=lambda x: self._sum_params.value_extractor(x[1])) dp_result = dp_engine.aggregate(pcol, params, data_extractors) # dp_result : (partition_key, [dp_sum]) # aggregate() returns a list of metrics for each partition key. # Here is only one metric - sum. Remove list. dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_sum) return dp_result
Inherited members