In many tutorials key-value is typically a pair of single scalar values, for example (‘Apple’, 7). But key-value is a general concept and both key and value often consist of multiple fields, and they both can be non-unique.

Consider a typical SQL statement:

`SELECT`

`store, product, `

`SUM`

`(amount), `

`MIN`

`(amount), `

`MAX`

`(amount), `

`SUM`

`(units)`

`FROM`

`sales`

`GROUP`

`BY`

`store, product`

Columns `store`

and `product`

can be considered as a key, and columns `amount`

and `units`

as values.

Let’s implement this SQL statement in Spark. Firstly we define a sample data set:

`val`

`sales`

`=`

`sc.parallelize(List(`

` `

`(`

`"West"`

`, `

`"Apple"`

`, `

`2.0`

`, `

`10`

`),`

` `

`(`

`"West"`

`, `

`"Apple"`

`, `

`3.0`

`, `

`15`

`),`

` `

`(`

`"West"`

`, `

`"Orange"`

`, `

`5.0`

`, `

`15`

`),`

` `

`(`

`"South"`

`, `

`"Orange"`

`, `

`3.0`

`, `

`9`

`),`

` `

`(`

`"South"`

`, `

`"Orange"`

`, `

`6.0`

`, `

`18`

`),`

` `

`(`

`"East"`

`, `

`"Milk"`

`, `

`5.0`

`, `

`5`

`)))`

The Spark/Scala code equivalent to the SQL statement is as follows:

`sales.map{ `

`case`

`(store, prod, amt, units) `

`=`

`> ((store, prod), (amt, amt, amt, units)) }.`

` `

`reduceByKey((x, y) `

`=`

`> `

` `

`(x.`

`_`

`1`

`+ y.`

`_`

`1`

`, math.min(x.`

`_`

`2`

`, y.`

`_`

`2`

`), math.max(x.`

`_`

`3`

`, y.`

`_`

`3`

`), x.`

`_`

`4`

`+ y.`

`_`

`4`

`)).collect`

The result of the execution (formatted):

res: Array[((String, String), (Double, Double, Double, Int))] = Array(
((West, Orange), (5.0, 5.0, 5.0, 15)),
((West, Apple), (5.0, 2.0, 3.0, 25)),
((East, Milk), (5.0, 5.0, 5.0, 5)),
((South, Orange),(9.0, 3.0, 6.0, 27)))

#### How It Works

We have an input RRD `sales`

containing 6 rows and 4 columns (String, String, Double, Int). The first step is to define which columns belong to the key and which to the value. You can use `map`

function on RDD as follows:

`sales.map{ `

`case`

`(store, prod, amt, units) `

`=`

`> ((store, prod), (amt, amt, amt, units)) }`

We defined a key-value tuple where key is also tuple containing `(store, prod)`

and value is tuple containing the final results we are going to calculate `(amt, amt, amt, units)`

Note that we initialized SUM, MIN and MAX with `amt`

, so if there is only one row in a group then SUM, MIN, MAX values will be the same and equal to `amt`

.

The next step is to reduce values by key:

`reduceByKey((x, y) `

`=`

`>`

In this function `x`

is the result of reduction of 2 previous values, and `y`

is the current value. Remember that both `x`

and `y`

are tuples containing (amt, amt, amt, units).

Reduce is an associative operation and it works similar to adding 2 + 4 + 3 + 5 + 6 + … You take first 2 values, add them, then take 3rd values, add it and so on.

Now it is easier to understand the meaning of:

`reduceByKey((x, y) `

`=`

`> `

` `

`(x.`

`_`

`1`

`+ y.`

`_`

`1`

`, math.min(x.`

`_`

`2`

`, y.`

`_`

`2`

`), math.max(x.`

`_`

`3`

`, y.`

`_`

`3`

`), x.`

`_`

`4`

`+ y.`

`_`

`4`

`)).collect`

Sum of the previous and current amounts (_1 means the first item of a tuple):

`x.`

`_`

`1`

`+ y.`

`_`

`1`

Selecting MIN amount:

`math.min(x.`

`_`

`2`

`, y.`

`_`

`2`

`)`

Selecting MAX amount:

`math.max(x.`

`_`

`3`

`, y.`

`_`

`3`

`)`

Sum of the previous and current units:

`x.`

`_`

`4`

`+ y.`

`_`

`4`