Module pipeline_dp.private_spark

Expand source code
from pyspark import RDD
from typing import Callable

import pipeline_dp
from pipeline_dp import aggregate_params, budget_accounting


class PrivateRDD:
    """A Spark RDD counterpart.

    PrivateRDD guarantees that only anonymized data
    within the specified privacy budget can be extracted from it through its API.

    PrivateRDD keeps `privacy_id` for each element
    in order to guarantees correct DP computations.
    """

    def __init__(self, rdd, budget_accountant, privacy_id_extractor=None):
        if privacy_id_extractor:
            self._rdd = rdd.map(lambda x: (privacy_id_extractor(x), x))
        else:
            # It's assumed that rdd is already in format (privacy_id, value)
            self._rdd = rdd
        self._budget_accountant = budget_accountant

    def map(self, fn: Callable) -> 'PrivateRDD':
        """A Spark map equivalent.

        Keeps track of privacy_id for each element.
        The output PrivateRDD has the same BudgetAccountant as self.
        """
        # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
        # and transforms each `element` according to the supplied function `fn`.
        rdd = self._rdd.mapValues(fn)
        return make_private(rdd, self._budget_accountant, None)

    def flat_map(self, fn: Callable) -> 'PrivateRDD':
        """A Spark flatMap equivalent.

        Keeps track of privacy_id for each element.
        The output PrivateRDD has the same BudgetAccountant as self.
        """
        # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
        # and transforms each `element` according to the supplied function `fn`.
        rdd = self._rdd.flatMapValues(fn)
        return make_private(rdd, self._budget_accountant, None)

    def sum(self, sum_params: aggregate_params.SumParams) -> RDD:
        """Computes DP sum.

        Args:
            sum_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=sum_params.noise_kind,
            metrics=[pipeline_dp.Metrics.SUM],
            max_partitions_contributed=sum_params.max_partitions_contributed,
            max_contributions_per_partition=sum_params.
            max_contributions_per_partition,
            min_value=sum_params.min_value,
            max_value=sum_params.max_value,
            public_partitions=sum_params.public_partitions,
            budget_weight=sum_params.budget_weight)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: sum_params.partition_extractor(x[1]),
            privacy_id_extractor=lambda x: x[0],
            value_extractor=lambda x: sum_params.value_extractor(x[1]))

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_sum])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - sum. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_sum)

        return dp_result

    def count(self, count_params: aggregate_params.CountParams) -> RDD:
        """Computes DP count.

        Args:
            count_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=count_params.noise_kind,
            metrics=[pipeline_dp.Metrics.COUNT],
            max_partitions_contributed=count_params.max_partitions_contributed,
            max_contributions_per_partition=count_params.
            max_contributions_per_partition,
            public_partitions=count_params.public_partitions,
            budget_weight=count_params.budget_weight)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: count_params.partition_extractor(x[1]
                                                                          ),
            privacy_id_extractor=lambda x: x[0],
            value_extractor=lambda x: count_params.value_extractor(x[1]))

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_count])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - count. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_count)

        return dp_result

    def privacy_id_count(
            self, privacy_id_count_params: aggregate_params.PrivacyIdCountParams
    ) -> RDD:
        """Computes DP Privacy ID count.

        Args:
            privacy_id_count_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=privacy_id_count_params.noise_kind,
            metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
            max_partitions_contributed=privacy_id_count_params.
            max_partitions_contributed,
            max_contributions_per_partition=1,
            public_partitions=privacy_id_count_params.public_partitions)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: privacy_id_count_params.
            partition_extractor(x[1]),
            privacy_id_extractor=lambda x: x[0],
            # PrivacyIdCount ignores values.
            value_extractor=lambda x: None)

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_privacy_id_count])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - privacy_id_count. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_privacy_id_count)

        return dp_result


def make_private(rdd: RDD,
                 budget_accountant: budget_accounting.BudgetAccountant,
                 privacy_id_extractor: Callable) -> PrivateRDD:
    """A factory method for PrivateRDD instance creation."""
    prdd = PrivateRDD(rdd, budget_accountant, privacy_id_extractor)
    return prdd

Functions

def make_private(rdd: pyspark.rdd.RDD, budget_accountant: pipeline_dp.budget_accounting.BudgetAccountant, privacy_id_extractor: Callable) ‑> PrivateRDD

A factory method for PrivateRDD instance creation.

Expand source code
def make_private(rdd: RDD,
                 budget_accountant: budget_accounting.BudgetAccountant,
                 privacy_id_extractor: Callable) -> PrivateRDD:
    """A factory method for PrivateRDD instance creation."""
    prdd = PrivateRDD(rdd, budget_accountant, privacy_id_extractor)
    return prdd

Classes

class PrivateRDD (rdd, budget_accountant, privacy_id_extractor=None)

A Spark RDD counterpart.

PrivateRDD guarantees that only anonymized data within the specified privacy budget can be extracted from it through its API.

PrivateRDD keeps privacy_id for each element in order to guarantees correct DP computations.

Expand source code
class PrivateRDD:
    """A Spark RDD counterpart.

    PrivateRDD guarantees that only anonymized data
    within the specified privacy budget can be extracted from it through its API.

    PrivateRDD keeps `privacy_id` for each element
    in order to guarantees correct DP computations.
    """

    def __init__(self, rdd, budget_accountant, privacy_id_extractor=None):
        if privacy_id_extractor:
            self._rdd = rdd.map(lambda x: (privacy_id_extractor(x), x))
        else:
            # It's assumed that rdd is already in format (privacy_id, value)
            self._rdd = rdd
        self._budget_accountant = budget_accountant

    def map(self, fn: Callable) -> 'PrivateRDD':
        """A Spark map equivalent.

        Keeps track of privacy_id for each element.
        The output PrivateRDD has the same BudgetAccountant as self.
        """
        # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
        # and transforms each `element` according to the supplied function `fn`.
        rdd = self._rdd.mapValues(fn)
        return make_private(rdd, self._budget_accountant, None)

    def flat_map(self, fn: Callable) -> 'PrivateRDD':
        """A Spark flatMap equivalent.

        Keeps track of privacy_id for each element.
        The output PrivateRDD has the same BudgetAccountant as self.
        """
        # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
        # and transforms each `element` according to the supplied function `fn`.
        rdd = self._rdd.flatMapValues(fn)
        return make_private(rdd, self._budget_accountant, None)

    def sum(self, sum_params: aggregate_params.SumParams) -> RDD:
        """Computes DP sum.

        Args:
            sum_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=sum_params.noise_kind,
            metrics=[pipeline_dp.Metrics.SUM],
            max_partitions_contributed=sum_params.max_partitions_contributed,
            max_contributions_per_partition=sum_params.
            max_contributions_per_partition,
            min_value=sum_params.min_value,
            max_value=sum_params.max_value,
            public_partitions=sum_params.public_partitions,
            budget_weight=sum_params.budget_weight)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: sum_params.partition_extractor(x[1]),
            privacy_id_extractor=lambda x: x[0],
            value_extractor=lambda x: sum_params.value_extractor(x[1]))

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_sum])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - sum. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_sum)

        return dp_result

    def count(self, count_params: aggregate_params.CountParams) -> RDD:
        """Computes DP count.

        Args:
            count_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=count_params.noise_kind,
            metrics=[pipeline_dp.Metrics.COUNT],
            max_partitions_contributed=count_params.max_partitions_contributed,
            max_contributions_per_partition=count_params.
            max_contributions_per_partition,
            public_partitions=count_params.public_partitions,
            budget_weight=count_params.budget_weight)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: count_params.partition_extractor(x[1]
                                                                          ),
            privacy_id_extractor=lambda x: x[0],
            value_extractor=lambda x: count_params.value_extractor(x[1]))

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_count])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - count. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_count)

        return dp_result

    def privacy_id_count(
            self, privacy_id_count_params: aggregate_params.PrivacyIdCountParams
    ) -> RDD:
        """Computes DP Privacy ID count.

        Args:
            privacy_id_count_params: parameters for calculation
        """

        backend = pipeline_dp.SparkRDDBackend()
        dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

        params = pipeline_dp.AggregateParams(
            noise_kind=privacy_id_count_params.noise_kind,
            metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
            max_partitions_contributed=privacy_id_count_params.
            max_partitions_contributed,
            max_contributions_per_partition=1,
            public_partitions=privacy_id_count_params.public_partitions)

        data_extractors = pipeline_dp.DataExtractors(
            partition_extractor=lambda x: privacy_id_count_params.
            partition_extractor(x[1]),
            privacy_id_extractor=lambda x: x[0],
            # PrivacyIdCount ignores values.
            value_extractor=lambda x: None)

        dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
        # dp_result : (partition_key, [dp_privacy_id_count])

        # aggregate() returns a list of metrics for each partition key.
        # Here is only one metric - privacy_id_count. Remove list.
        dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
        # dp_result : (partition_key, dp_privacy_id_count)

        return dp_result

Methods

def count(self, count_params: CountParams) ‑> pyspark.rdd.RDD

Computes DP count.

Args

count_params
parameters for calculation
Expand source code
def count(self, count_params: aggregate_params.CountParams) -> RDD:
    """Computes DP count.

    Args:
        count_params: parameters for calculation
    """

    backend = pipeline_dp.SparkRDDBackend()
    dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

    params = pipeline_dp.AggregateParams(
        noise_kind=count_params.noise_kind,
        metrics=[pipeline_dp.Metrics.COUNT],
        max_partitions_contributed=count_params.max_partitions_contributed,
        max_contributions_per_partition=count_params.
        max_contributions_per_partition,
        public_partitions=count_params.public_partitions,
        budget_weight=count_params.budget_weight)

    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: count_params.partition_extractor(x[1]
                                                                      ),
        privacy_id_extractor=lambda x: x[0],
        value_extractor=lambda x: count_params.value_extractor(x[1]))

    dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
    # dp_result : (partition_key, [dp_count])

    # aggregate() returns a list of metrics for each partition key.
    # Here is only one metric - count. Remove list.
    dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
    # dp_result : (partition_key, dp_count)

    return dp_result
def flat_map(self, fn: Callable) ‑> PrivateRDD

A Spark flatMap equivalent.

Keeps track of privacy_id for each element. The output PrivateRDD has the same BudgetAccountant as self.

Expand source code
def flat_map(self, fn: Callable) -> 'PrivateRDD':
    """A Spark flatMap equivalent.

    Keeps track of privacy_id for each element.
    The output PrivateRDD has the same BudgetAccountant as self.
    """
    # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
    # and transforms each `element` according to the supplied function `fn`.
    rdd = self._rdd.flatMapValues(fn)
    return make_private(rdd, self._budget_accountant, None)
def map(self, fn: Callable) ‑> PrivateRDD

A Spark map equivalent.

Keeps track of privacy_id for each element. The output PrivateRDD has the same BudgetAccountant as self.

Expand source code
def map(self, fn: Callable) -> 'PrivateRDD':
    """A Spark map equivalent.

    Keeps track of privacy_id for each element.
    The output PrivateRDD has the same BudgetAccountant as self.
    """
    # Assumes that `self._rdd` consists of tuples `(privacy_id, element)`
    # and transforms each `element` according to the supplied function `fn`.
    rdd = self._rdd.mapValues(fn)
    return make_private(rdd, self._budget_accountant, None)
def privacy_id_count(self, privacy_id_count_params: PrivacyIdCountParams) ‑> pyspark.rdd.RDD

Computes DP Privacy ID count.

Args

privacy_id_count_params
parameters for calculation
Expand source code
def privacy_id_count(
        self, privacy_id_count_params: aggregate_params.PrivacyIdCountParams
) -> RDD:
    """Computes DP Privacy ID count.

    Args:
        privacy_id_count_params: parameters for calculation
    """

    backend = pipeline_dp.SparkRDDBackend()
    dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

    params = pipeline_dp.AggregateParams(
        noise_kind=privacy_id_count_params.noise_kind,
        metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
        max_partitions_contributed=privacy_id_count_params.
        max_partitions_contributed,
        max_contributions_per_partition=1,
        public_partitions=privacy_id_count_params.public_partitions)

    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: privacy_id_count_params.
        partition_extractor(x[1]),
        privacy_id_extractor=lambda x: x[0],
        # PrivacyIdCount ignores values.
        value_extractor=lambda x: None)

    dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
    # dp_result : (partition_key, [dp_privacy_id_count])

    # aggregate() returns a list of metrics for each partition key.
    # Here is only one metric - privacy_id_count. Remove list.
    dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
    # dp_result : (partition_key, dp_privacy_id_count)

    return dp_result
def sum(self, sum_params: SumParams) ‑> pyspark.rdd.RDD

Computes DP sum.

Args

sum_params
parameters for calculation
Expand source code
def sum(self, sum_params: aggregate_params.SumParams) -> RDD:
    """Computes DP sum.

    Args:
        sum_params: parameters for calculation
    """

    backend = pipeline_dp.SparkRDDBackend()
    dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

    params = pipeline_dp.AggregateParams(
        noise_kind=sum_params.noise_kind,
        metrics=[pipeline_dp.Metrics.SUM],
        max_partitions_contributed=sum_params.max_partitions_contributed,
        max_contributions_per_partition=sum_params.
        max_contributions_per_partition,
        min_value=sum_params.min_value,
        max_value=sum_params.max_value,
        public_partitions=sum_params.public_partitions,
        budget_weight=sum_params.budget_weight)

    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: sum_params.partition_extractor(x[1]),
        privacy_id_extractor=lambda x: x[0],
        value_extractor=lambda x: sum_params.value_extractor(x[1]))

    dp_result = dp_engine.aggregate(self._rdd, params, data_extractors)
    # dp_result : (partition_key, [dp_sum])

    # aggregate() returns a list of metrics for each partition key.
    # Here is only one metric - sum. Remove list.
    dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
    # dp_result : (partition_key, dp_sum)

    return dp_result