Commit 0d628527 authored by Tibo's avatar Tibo

modify groupByKey to comply with spark definition + add tests

parent 72daff47
Pipeline #2145 failed with stage
in 15 seconds
......@@ -52,10 +52,10 @@ class Dataset
$groups = $this->groupByKey();
$reduced = [];
foreach ($groups->collect() as $key => $values) {
foreach ($groups->collect() as $group_tuple) {
$reduced[] = new Tuple(
$key,
array_reduce($values, $function, $initial)
$group_tuple->key,
array_reduce($group_tuple->value, $function, $initial)
);
}
......@@ -93,7 +93,7 @@ class Dataset
}
/**
*
* From a dataset of (K, V) tuples, returns a dataset of (K, [V]) tuples.
* @return \Cylab\Spark\Dataset
*/
public function groupByKey()
......@@ -102,7 +102,12 @@ class Dataset
foreach ($this->data as $tuple) {
$groups[$tuple->key][] = $tuple->value;
}
return new Dataset($groups);
$tuples = [];
foreach ($groups as $key => $values) {
$tuples[] = new Tuple($key, $values);
}
return new Dataset($tuples);
}
/**
......
......@@ -61,4 +61,22 @@ class DatasetTest extends TestCase
$data = new Dataset([1, 2, 1, 3, 4, 4]);
$this->assertEquals(4, count($data->distinct()->collect()));
}
public function testGroupByKey()
{
$data = new Dataset([new Tuple(1, "a"), new Tuple(1, "b"), new Tuple(2, "c")]);
$groups = $data->groupByKey();
$this->assertEquals(1, $groups->first()->key);
$this->assertEquals(["a", "b"], $groups->first()->value);
}
public function testReduceByKey()
{
$data = new Dataset([new Tuple(1, "a"), new Tuple(1, "b"), new Tuple(2, "c")]);
$groups = $data->reduceByKey(function($value, $agg) {
return $value . $agg;
}, "");
$this->assertEquals("ab", $groups->first()->value);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment