Multi-Column Key and Value – Reduce a Tuple in Spark

In many tutorials key-value is typically a pair of single scalar values, for example (‘Apple’, 7). But key-value is a general concept and both key and value often consist of multiple fields, and they both can be non-unique.

Consider a typical SQL statement:

SELECT store, product, SUM(amount), MIN(amount), MAX(amount), SUM(units)
FROM sales
GROUP BY store, product

Columns store and product can be considered as a key, and columns amount and units as values.

Let’s implement this SQL statement in Spark. Firstly we define a sample data set:

 

val sales=sc.parallelize(List(
   ("West""Apple"2.0, 10),
   ("West""Apple"3.0, 15),
   ("West""Orange", 5.0, 15),
   ("South", "Orange", 3.0, 9),
   ("South", "Orange", 6.0, 18),
   ("East""Milk",   5.0, 5)))
The Spark/Scala code equivalent to the SQL statement is as follows:
sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }.
  reduceByKey((x, y) =>
   (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4)).collect
The result of the execution (formatted):
res: Array[((String, String), (Double, Double, Double, Int))] = Array(
  ((West, Orange), (5.0, 5.0, 5.0, 15)), 
  ((West, Apple),  (5.0, 2.0, 3.0, 25)), 
  ((East, Milk),   (5.0, 5.0, 5.0, 5)), 
  ((South, Orange),(9.0, 3.0, 6.0, 27)))

How It Works

We have an input RRD sales containing 6 rows and 4 columns (String, String, Double, Int). The first step is to define which columns belong to the key and which to the value. You can use map function on RDD as follows:
sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }

We defined a key-value tuple where key is also tuple containing (store, prod) and value is tuple containing the final results we are going to calculate (amt, amt, amt, units)

Note that we initialized SUM, MIN and MAX with amt, so if there is only one row in a group then SUM, MIN, MAX values will be the same and equal to amt.

The next step is to reduce values by key:

reduceByKey((x, y) =>

In this function x is the result of reduction of 2 previous values, and y is the current value. Remember that both x and yare tuples containing (amt, amt, amt, units).

Reduce is an associative operation and it works similar to adding 2 + 4 + 3 + 5 + 6 + … You take first 2 values, add them, then take 3rd values, add it and so on.

Now it is easier to understand the meaning of:

reduceByKey((x, y) =>
  (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4)).collect
Sum of the previous and current amounts (_1 means the first item of a tuple):
x._1 + y._1
Selecting MIN amount:
math.min(x._2, y._2)
Selecting MAX amount:
math.max(x._3, y._3)
Sum of the previous and current units:
x._4 + y._4

Spark Introduction

Interesting introduction of Spark  (http://spark.apache.org/)

Apache Spark provides programmers with an application programming interface centered on a data structure called the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way. It was developed in response to limitations in the MapReduce cluster computing paradigm, which forces a particular linear dataflow structure on distributed programs: MapReduce programs read input data from disk, map a function across the data, reduce the results of the map, and store reduction results on disk. Spark’s RDDs function as a working set for distributed programs that offers a (deliberately) restricted form of distributed shared memory.