项目作者: iv-stpn

项目描述 :
A project displaying example of MapReduce jobs, using the "Remarkable Trees of Paris" dataset (https://opendata.paris.fr/explore/dataset/arbresremarquablesparis/information/, 2015 version). The HiveQL query equivalents of the MapReduce jobs are described in HIVE.md.
高级语言: Java
项目地址: git://github.com/iv-stpn/hadoop-mapreduce-examples.git
创建时间: 2020-11-03T14:53:24Z
项目社区:https://github.com/iv-stpn/hadoop-mapreduce-examples

开源协议:

下载


Session 4/Lab2 - MapReduce2/Yarn/Java Jobs

1.1. Installing OpenJDK

Using Linux Ubuntu, we can install OpenJDK 8 using sudo apt-get install openjdk-8-jdk. Once installed, we can switch the currently used version to Java 8 using the update-alternatives --config java command:

  1. ~$ sudo update-alternatives --config java
  2. There are 2 choices for the alternative java (providing /usr/bin/java).
  3. Selection Path Priority Status
  4. ------------------------------------------------------------
  5. * 0 /usr/lib/jvm/java-14-openjdk-amd64/bin/java 1411 auto mode
  6. 1 /usr/lib/jvm/java-14-openjdk-amd64/bin/java 1411 manual mode
  7. 2 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java 1081 manual mode
  8. Press <enter> to keep the current choice[*], or type selection number: 2
  9. update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
  10. to provide /usr/bin/java (java) in manual mode

1.2. Installing Git

We can install git with the command sudo apt-get install git. We will install hub as well to simplify the process of creating the repository. To install hub, we can use Homebrew (https://docs.brew.sh/Homebrew-on-Linux) and execute brew install hub.

1.3. Installing a Java IDE

We will code using Eclipse (installed via sudo snap install --classic eclipse).

1.4. Cloning the project

To use hub, one needs to create a SSH key associated with their GitHub account. Once the key is added, we can clone the original repository (git clone https://github.com/makayel/hadoop-examples-mapreduce) and create our own repository derived from it using the hub create command, followed by git push -u origin main (setting the origin of the newly created repository to the “main” branch of the project, the renamed master branch).

1.5. Importing the project

We import the project using Eclipse’s Maven project importer : File > Import... > Maven > Existing Maven Projects, Browse > Open, select the pom.xml and press Finish.

To generate the JAR, we can launch the maven install goal in the start directory, running the command mvn install (can also be executed using the M2Eclipse plugin). To install maven, use sudo apt install maven.

1.6. Send the JAR to the edge node

To simplify the process of importing the target JAR to our edge node, we will directly clone the repository of our project in our local cloud storage (https://github.com/iv-stpn/hadoop-examples-mapreduce) and generate the new target files with maven directly on the edge (we could also generate it before, and use git pull to retrieve the target folder, were it to be included in the repository). To modify the source java files, we will use Eclipse on our machine, commit and push the changes, and pull the on the edge node before regenerating the jar. Using Linux, we will directly connect to the edge node via ssh.

1.7. The repository

You can find our repository at https://github.com/iv-stpn/hadoop-examples-mapreduce and visualize all the commits. We invite you to clone the repository to test it.

1.8. The Project

We first download the dataset using wget on the edge node, and put it on the HDFS:

  1. -sh-4.2$ wget https://raw.githubusercontent.com/makayel/hadoop-examples-mapreduce/main/src/test/resources/data/trees.csv
  2. --2020-11-10 17:21:36-- https://raw.githubusercontent.com/makayel/hadoop-examples-mapreduce/main/src/test/resources/data/trees.csv
  3. Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.120.133
  4. Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.120.133|:443... connected.
  5. HTTP request sent, awaiting response... 200 OK
  6. Length: 16680 (16K) [text/plain]
  7. Saving to: trees.csv
  8. 100%[======================================>] 16 680 --.-K/s in 0,001s
  9. 2020-11-17:22:01 (15,6 MB/s) - trees.csv saved [16680/16680]
  10. -sh-4.2$ hdfs dfs -put trees.csv
  11. -sh-4.2$ hdfs dfs -ls
  12. Found 18 items
  13. ...
  14. -rw-r--r-- 3 istepanian hdfs 121271 2020-11-10 17:22 trees.csv

Using the default job already implemented in the target JAR (wordcount), we can test it on our dataset (we will create an alias, launch_job, to avoid rewriting the full command):

  1. -sh-4.2$ alias launch_job="yarn jar ~/hadoop-examples-mapreduce/target/hadoop-examples-mapreduce-1.0-SNAPSHOT-jar-with-dependencies.jar"
  2. -sh-4.2$ launch_job wordcount trees.csv count
  3. ...
  4. 20/11/10 17:27:55 INFO mapreduce.Job: Running job: job_1603290159664_3340
  5. ...
  6. 20/11/10 17:28:06 INFO mapreduce.Job: map 0% reduce 0%
  7. 20/11/10 17:28:15 INFO mapreduce.Job: map 100% reduce 0%
  8. 20/11/10 17:28:20 INFO mapreduce.Job: map 100% reduce 100%
  9. ...
  10. File Input Format Counters
  11. Bytes Read=121271
  12. File Output Format Counters
  13. Bytes Written=68405
  14. -sh-4.2$ hdfs dfs -cat count/part-r-00000
  15. ...
  16. · 2
  17. à 13
  18. écus;;10;Parc 1
  19. écus;;31;Jardin 1
  20. écus;;46;Parc 1
  21. écus;;64;Bois 1
  22. écus;;84;Bois 1
  23. île 1
  24. ...

Everything works correctly, the job completed successfully.

1.8.1. DistinctDistricts

For this MapReduce job, we create a simple job based on the files from the previous job wordcount, creating a job class DistinctDistricts, a mapper class TreesMapper and a reducer class TreesReducer

We then add our class to the AppDriver so the program can interpret our new command distinctDistricts:

  1. programDriver.addClass("distinctDistricts", DistinctDistricts.class,
  2. "A map/reduce program that returns the distinct districts with trees in a predefined CSV formatting.");

We can also add a custom command description in case of a wrong typing:

  1. if (otherArgs.length < 2) {
  2. System.err.println("Usage: distinctDistricts <in> [<in>...] <out>");
  3. System.exit(2);
  4. }

Then, we set our mapper and reducer classes for the job in DistinctDistricts:

  1. if (otherArgs.length < 2) {
  2. System.err.println("Usage: distinctDistricts <in> [<in>...] <out>");
  3. System.exit(2);
  4. }
  5. Job job = Job.getInstance(conf, "distinctDistricts");
  6. job.setJarByClass(DistinctDistricts.class);
  7. job.setMapperClass(TreesMapper.class);
  8. job.setCombinerClass(TreesReducer.class);
  9. job.setReducerClass(TreesReducer.class);

Because the only information we will require is the district number, all we need is a Text key (that will contain the name of the district) with a null (NullWritable) value. By default, because of the way the MapReduce programming model works, all the keys will be made distinct, and we will get an Iterable of NullWritable instances aggregated. We can then just return the keys, as they will all be the distinct districts.

In DistinctDistricts.java:

  1. ...
  2. job.setOutputKeyClass(Text.class);
  3. job.setOutputValueClass(NullWritable.class);
  4. ...

In TreesMapper.java:java

  1. public class TreesMapper extends Mapper<Object, Text, Text, NullWritable> {
  2. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  3. ...
  4. context.write(district, new NullWritable());
  5. }
  6. }

In TreesReducer.java:

  1. public class TreesReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
  2. public void reduce(Text key, Iterable<NullWritable> values, Context context)
  3. throws IOException, InterruptedException {
  4. ...
  5. context.write(district, new NullWritable());
  6. }
  7. }

However, just for fun, we will use an IntWritable for our outputs, and count the number of trees each district has (you can just keep the keys of the final output to know the distinct districts themselves).

Now that we know the types of our outputs (we will note that the Mapper output is also the Reducer input, and because we set the Combiner class to the Reducer class, the output of the Mapper and the Reducer must be the same), we can work on the logic of our MapReduce job.

The Mapper retrieves the file’s data line by line and may do the operation context.write(k,v) (i.e. output key value couples) as many times as it wants for every input. In our case, we will output one key-value couple for every input as there is only one tree & one district described per line, except the first line, which contains the column names and not data. To ignore it, we can either ignore a line if it contains unique data found in the first line (e.g. the column names themselves are not found in the rest of the data) or use a line counter and ignore the first iteration (or simply use a boolean, since only the first line needs to be ignored, and afterwards all lines are processed). We will use the latter option as it is safer, more universal and would allow us to know the current line being processed in a future implementation of the mapper (which can be useful information).

To retrieve the district number from the lines, we simply need to access the value of the second column the same way that a CSV interpreter does: by splitting the line along its separator (in our case, a semi-colon). To be able to count the number of trees per district in the reducer, we will associate the value 1 to the keys (the district numbers), so as to be summed during the aggregation in the Reducer.

Mapper.java

  1. public class TreesMapper extends Mapper<Object, Text, Text, IntWritable> {
  2. public int curr_line = 0;
  3. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  4. if (curr_line != 0) {
  5. context.write(new Text(value.toString().split(";")[1]), new IntWritable(1));
  6. }
  7. curr_line++;
  8. }
  9. }

For the reducer, we do the same as a wordcount, summing all the values aggregated for the distinct keys and writing (key, sum) to the context.

Reducer.java

  1. public class TreesReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  3. throws IOException, InterruptedException {
  4. int sum = 0;
  5. for (IntWritable val : values) {
  6. sum += val.get();
  7. }
  8. context.write(key, new IntWritable(sum));
  9. }
  10. }

We then commit the modifications, push to the repository, pull back on the edge node, build the JAR and launch the command.
To simplify this process on the edge’s we will use an alias to do all those operations directly (simply pulling with Git causes issues because of the target folder, so we will reclone for simplicity):

  1. -sh-4.2$ alias refresh_project='cd ~; rm -R -f hadoop-examples-mapreduce/; git clone https://github.com/iv-stpn/hadoop-examples-mapreduce; cd hadoop-examples-mapreduce/; mvn install'

We will also create a new alias to check for the result of a job:

  1. -sh-4.2$ alias result='function _result() { hdfs dfs -cat "$1"/part-r-00000; } ; _result'
  1. -sh-4.2$ launch_job distinctDistricts trees.csv districts
  2. ...
  3. 20/11/11 21:29:17 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/11 21:29:26 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/11 21:29:32 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16680
  9. File Output Format Counters
  10. Bytes Written=80
  11. -sh-4.2$ result districts
  12. 11 1
  13. 12 29
  14. 13 2
  15. 14 3
  16. 15 1
  17. 16 36
  18. 17 1
  19. 18 1
  20. 19 6
  21. 20 3
  22. 3 1
  23. 4 1
  24. 5 2
  25. 6 1
  26. 7 3
  27. 8 5
  28. 9 1

The job works as expected.

1.8.2. & 1.8.3 TreeSpecies / TreeSpeciesCount

This job is extremely similar to the previous one; just instead of using the district number obtained from the second column as the key, we use the species of the trees obtained from the fourth column. Just like for the previous job, we will print the number of trees for each species. To just recover the unique species, it is possible to return NullWritable values for the keys and just return the values obtained from the Mapper in the Reducer. We will showcase both cases with TreeSpecies and TreeSpeciesCount:

AppDriver.java

  1. ...
  2. programDriver.addClass("treeSpecies", TreeSpecies.class,
  3. "A map/reduce program that returns the distinct tree species in the Remarkable Trees of Paris dataset.");
  4. ...

TreeSpecies.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: treeSpecies <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "treeSpecies");
  7. job.setJarByClass(TreeSpecies.class);
  8. job.setMapperClass(SpeciesMapper.class);
  9. job.setCombinerClass(SpeciesReducer.class);
  10. job.setReducerClass(SpeciesReducer.class);
  11. job.setOutputKeyClass(Text.class);
  12. job.setOutputValueClass(NullWritable.class);
  13. ...

SpeciesMapper.java

  1. ...
  2. public class SpeciesMapper extends Mapper<Object, Text, Text, NullWritable> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. context.write(new Text(value.toString().split(";")[3]), NullWritable.get());
  7. }
  8. curr_line++;
  9. }
  10. }

SpeciesReducer.java

  1. ...
  2. public class SpeciesReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
  3. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  4. throws IOException, InterruptedException {
  5. context.write(key, NullWritable.get());
  6. }
  7. }
  1. -sh-4.2$ launch_job treeSpecies trees.csv species; printf "\n"; result species
  2. ...
  3. 20/11/12 14:48:54 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 14:49:03 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 14:49:13 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16680
  9. File Output Format Counters
  10. Bytes Written=451
  11. araucana
  12. atlantica
  13. australis
  14. baccata
  15. bignonioides
  16. biloba
  17. bungeana
  18. cappadocicum
  19. carpinifolia
  20. colurna
  21. coulteri
  22. decurrens
  23. dioicus
  24. distichum
  25. excelsior
  26. fraxinifolia
  27. giganteum
  28. giraldii
  29. glutinosa
  30. grandiflora
  31. hippocastanum
  32. ilex
  33. involucrata
  34. japonicum
  35. kaki
  36. libanii
  37. monspessulanum
  38. nigra
  39. nigra laricio
  40. opalus
  41. orientalis
  42. papyrifera
  43. petraea
  44. pomifera
  45. pseudoacacia
  46. sempervirens
  47. serrata
  48. stenoptera
  49. suber
  50. sylvatica
  51. tomentosa
  52. tulipifera
  53. ulmoides
  54. virginiana
  55. x acerifolia

The job without the count works as expected.

AppDriver.java

  1. ...
  2. programDriver.addClass("treeSpeciesCount", TreeSpeciesCount.class,
  3. "A map/reduce program that returns the distinct tree species (and the number of trees for each one) in the Remarkable Trees of Paris dataset.");
  4. ...

TreeSpeciesCount.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: treeSpeciesCount <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "treeSpecies");
  7. job.setJarByClass(TreeSpecies.class);
  8. job.setMapperClass(SpeciesMapper.class);
  9. job.setCombinerClass(SpeciesReducer.class);
  10. job.setReducerClass(SpeciesReducer.class);
  11. job.setOutputKeyClass(Text.class);
  12. job.setOutputValueClass(IntWritable.class);
  13. ...

SpeciesCountMapper.java

  1. ...
  2. public class SpeciesMapper extends Mapper<Object, Text, Text, IntWritable> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. context.write(new Text(value.toString().split(";")[3]), new IntWritable(1));
  7. }
  8. curr_line++;
  9. }
  10. }

SpeciesCountReducer.java

  1. ...
  2. public class SpeciesReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  3. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  4. throws IOException, InterruptedException {
  5. int sum = 0;
  6. for (IntWritable val : values) {
  7. sum += val.get();
  8. }
  9. context.write(key, new IntWritable(sum));
  10. }
  11. }
  1. -sh-4.2$ launch_job treeSpeciesCount trees.csv species_count; printf "\n"; result species_count;
  2. ...
  3. 20/11/12 14:51:35 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 14:51:44 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 14:51:53 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16680
  9. File Output Format Counters
  10. Bytes Written=542
  11. araucana 1
  12. atlantica 2
  13. australis 1
  14. baccata 2
  15. bignonioides 1
  16. biloba 5
  17. bungeana 1
  18. cappadocicum 1
  19. carpinifolia 4
  20. colurna 3
  21. coulteri 1
  22. decurrens 1
  23. dioicus 1
  24. distichum 3
  25. excelsior 1
  26. fraxinifolia 2
  27. giganteum 5
  28. giraldii 1
  29. glutinosa 1
  30. grandiflora 1
  31. hippocastanum 3
  32. ilex 1
  33. involucrata 1
  34. japonicum 1
  35. kaki 2
  36. libanii 2
  37. monspessulanum 1
  38. nigra 3
  39. nigra laricio 1
  40. opalus 1
  41. orientalis 8
  42. papyrifera 1
  43. petraea 2
  44. pomifera 1
  45. pseudoacacia 1
  46. sempervirens 1
  47. serrata 1
  48. stenoptera 1
  49. suber 1
  50. sylvatica 8
  51. tomentosa 2
  52. tulipifera 2
  53. ulmoides 1
  54. virginiana 2
  55. x acerifolia 11

The job with the count works as expected.

1.8.4 MaxHeightSpecies

For this job, we need to find the height of the highest tree of each species, which means we need to aggregate heights for each species. We will use the species as the key output (string => Text) and the heights as the value outputs (numerical) of our Mapper and Reducer. If we look at the height column (the seventh one), we can see that the values are actually floating-point decimal, but that they all end in .0 (i.e they are all actually integers, which means we could use an IntWritable by doing IntWritable height = new IntWritable((int) Float.parseFloat(value.toString().split(";")[6]))) in the mapper). Even though the values might all be integers in the dataset logic, we will suppose that future values in the dataset might have floating-point precision, and so we will use FloatWritable as the numerical type. We can also notice that unlike the species and the district where the trees are, the height is not always a known for trees in the dataset. This makes sense within the logic of the dataset because while the district number is implicit from the location of the tree and the species is obvious from identifying the tree, its height might be more difficult to obtain, as it needs to be measured officially. For this reason, we will add a security check in the Mapper for the height (unlike for the previous operations where it is unnecessary in the dataset’s logic) so that height values that can’t be converted to a Float (i.e. empty values) are ignored.

From this point on, we will use the class StreamSupport from Java Utils to make aggregation/mapping operations on the iterables in the Reducer inputs.

AppDriver.java

  1. ...
  2. programDriver.addClass("maxHeightSpecies", MaxHeightSpecies.class,
  3. "A map/reduce program that returns the highest height of trees per species in the Remarkable Trees of Paris dataset.");
  4. ...

MaxHeightSpecies.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: maxHeightSpecies <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "maxHeightSpecies");
  7. job.setJarByClass(MaxHeightSpecies.class);
  8. job.setMapperClass(HeightSpeciesMapper.class);
  9. job.setCombinerClass(HeightSpeciesReducer.class);
  10. job.setReducerClass(HeightSpeciesReducer.class);
  11. job.setOutputKeyClass(Text.class);
  12. job.setOutputValueClass(FloatWritable.class);
  13. ...

HeightSpeciesMapper.java

  1. ...
  2. public class HeightSpeciesMapper extends Mapper<Object, Text, Text, FloatWritable> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. try {
  7. Float height = Float.parseFloat(value.toString().split(";")[6]);
  8. context.write(new Text(value.toString().split(";")[3]), new FloatWritable(height));
  9. } catch (NumberFormatException ex) {
  10. // If the value is not a float, skip it by catching the error from the parseFloat() method
  11. }
  12. } curr_line++;
  13. }
  14. }

HeightSpeciesReducer.java

  1. ...
  2. public class HeightSpeciesReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
  3. public class HeightSpeciesReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
  4. public void reduce(Text key, Iterable<FloatWritable> values, Context context)
  5. throws IOException, InterruptedException {
  6. context.write(key, new FloatWritable(StreamSupport.stream(values.spliterator(), false)
  7. .map((v) -> { return v.get(); }).
  8. max(Float::compare).get()));
  9. }
  10. }
  11. }
  1. -sh-4.2$ launch_job maxHeightSpecies trees.csv max_heights; printf "\n"; result max_heights;
  2. ...
  3. 20/11/12 15:12:35 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 15:12:44 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 15:12:54 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16821
  9. File Output Format Counters
  10. Bytes Written=675
  11. araucana 9.0
  12. atlantica 25.0
  13. australis 16.0
  14. baccata 13.0
  15. bignonioides 15.0
  16. biloba 33.0
  17. bungeana 10.0
  18. cappadocicum 16.0
  19. carpinifolia 30.0
  20. colurna 20.0
  21. coulteri 14.0
  22. decurrens 20.0
  23. dioicus 10.0
  24. distichum 35.0
  25. excelsior 30.0
  26. fraxinifolia 27.0
  27. giganteum 35.0
  28. giraldii 35.0
  29. glutinosa 16.0
  30. grandiflora 12.0
  31. hippocastanum 30.0
  32. ilex 15.0
  33. involucrata 12.0
  34. japonicum 10.0
  35. kaki 14.0
  36. libanii 30.0
  37. monspessulanum 12.0
  38. nigra 30.0
  39. nigra laricio 30.0
  40. opalus 15.0
  41. orientalis 34.0
  42. papyrifera 12.0
  43. petraea 31.0
  44. pomifera 13.0
  45. pseudoacacia 11.0
  46. sempervirens 30.0
  47. serrata 18.0
  48. stenoptera 30.0
  49. suber 10.0
  50. sylvatica 30.0
  51. tomentosa 20.0
  52. tulipifera 35.0
  53. ulmoides 12.0
  54. virginiana 14.0
  55. x acerifolia 45.0

The job works as expected.

1.8.5 TreesSortedByHeight

For this job, we suppose that we need an identifier to associate to the tree heights, instead of just sorting the heights without indicating which trees correspond to the heights. To accomplish just a job, we will create a special string in the Mapper, combining the OBJECTID of the tree (twelth colum) with its species/family and genus to have some extra information about the tree. As the identifiers (Text) are associated to each height (FloatWritable), there will actually be no aggregation in the Reducer over the identifiers; we will not use the reducer to sort the heights. Instead, we’re simply going to use the sort operation over the keys that is necessarily part of MapReduce operations, in between the Mapper and Reducer. To do so, we will use the heights as keys to the identifiers and then return the identifiers associated with each height in order in the Reducer (as multiple trees can have the same height).

Note: in this case the Mapper and Reducer do not share the same output types (they are inverted), and as such we need to set the Mapper output types separately in the job configuration, and remove Reducer as the combiner class.

AppDriver.java

  1. ...
  2. programDriver.addClass("treesSortedByHeight", TreesSortedByHeight.class,
  3. "A map/reduce program that returns all the trees in the Remarkable Trees of Paris dataset, sorted by height.");
  4. ...

TreesSortedByHeight.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: treesSortedByHeight <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "treesSortedByHeight");
  7. job.setJarByClass(TreesSortedByHeight.class);
  8. job.setMapperClass(HeightSortedTreeMapper.class);
  9. //job.setCombinerClass(HeightSortedTreeReducer.class);
  10. // The Mapper and the Reducer have mismatched key-value output types
  11. job.setReducerClass(HeightSortedTreeReducer.class);
  12. job.setMapOutputKeyClass(FloatWritable.class);
  13. job.setMapOutputValueClass(Text.class);
  14. job.setOutputKeyClass(Text.class);
  15. job.setOutputValueClass(FloatWritable.class);
  16. ...

HeightSortedTreeMapper.java

  1. ...
  2. public class HeightSortedTreeMapper extends Mapper<Object, Text, FloatWritable, Text> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. try {
  7. String[] line_tokens = value.toString().split(";");
  8. Float height = Float.parseFloat(line_tokens[6]);
  9. context.write(new FloatWritable(height),
  10. new Text(line_tokens[11] + " - " + line_tokens[2] + " " + line_tokens[3] + " (" + line_tokens[4] + ")"));
  11. } catch (NumberFormatException ex) {
  12. // If the value is not a float, skip by catching the error from the parseFloat() method
  13. }
  14. } curr_line++;
  15. }
  16. }

HeightSortedTreeReducer.java

  1. ...
  2. public class HeightSortedTreeReducer extends Reducer<FloatWritable, Text, Text, FloatWritable> {
  3. public void reduce(FloatWritable key, Iterable<Text> values, Context context)
  4. throws IOException, InterruptedException {
  5. for (Text val : values) {
  6. context.write(val, key);
  7. }
  8. }
  9. }
  1. -sh-4.2$ launch_job treesSortedByHeight trees.csv sorted_heights; printf "\n"; result sorted_heights;
  2. ...
  3. 20/11/12 15:28:45 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 15:28:54 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 15:29:05 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16680
  9. File Output Format Counters
  10. Bytes Written=3994
  11. 3 - Fagus sylvatica (Fagaceae) 2.0
  12. 89 - Taxus baccata (Taxaceae) 5.0
  13. 62 - Cedrus atlantica (Pinaceae) 6.0
  14. 39 - Araucaria araucana (Araucariaceae) 9.0
  15. 44 - Styphnolobium japonicum (Fabaceae) 10.0
  16. 32 - Quercus suber (Fagaceae) 10.0
  17. 95 - Pinus bungeana (Pinaceae) 10.0
  18. 61 - Gymnocladus dioicus (Fabaceae) 10.0
  19. 63 - Fagus sylvatica (Fagaceae) 10.0
  20. 4 - Robinia pseudoacacia (Fabaceae) 11.0
  21. 93 - Diospyros virginiana (Ebenaceae) 12.0
  22. 66 - Magnolia grandiflora (Magnoliaceae) 12.0
  23. 50 - Zelkova carpinifolia (Ulmaceae) 12.0
  24. 7 - Eucommia ulmoides (Eucomiaceae) 12.0
  25. 48 - Acer monspessulanum (Sapindacaees) 12.0
  26. 58 - Diospyros kaki (Ebenaceae) 12.0
  27. 33 - Broussonetia papyrifera (Moraceae) 12.0
  28. 71 - Davidia involucrata (Cornaceae) 12.0
  29. 36 - Taxus baccata (Taxaceae) 13.0
  30. 6 - Maclura pomifera (Moraceae) 13.0
  31. 68 - Diospyros kaki (Ebenaceae) 14.0
  32. 96 - Pinus coulteri (Pinaceae) 14.0
  33. 94 - Diospyros virginiana (Ebenaceae) 14.0
  34. 91 - Acer opalus (Sapindaceae) 15.0
  35. 5 - Catalpa bignonioides (Bignoniaceae) 15.0
  36. 70 - Fagus sylvatica (Fagaceae) 15.0
  37. 2 - Ulmus carpinifolia (Ulmaceae) 15.0
  38. 98 - Quercus ilex (Fagaceae) 15.0
  39. 28 - Alnus glutinosa (Betulaceae) 16.0
  40. 78 - Acer cappadocicum (Sapindaceae) 16.0
  41. 75 - Zelkova carpinifolia (Ulmaceae) 16.0
  42. 16 - Celtis australis (Cannabaceae) 16.0
  43. 64 - Ginkgo biloba (Ginkgoaceae) 18.0
  44. 83 - Zelkova serrata (Ulmaceae) 18.0
  45. 23 - Aesculus hippocastanum (Sapindaceae) 18.0
  46. 60 - Fagus sylvatica (Fagaceae) 18.0
  47. 34 - Corylus colurna (Betulaceae) 20.0
  48. 51 - Platanus x acerifolia (Platanaceae) 20.0
  49. 43 - Tilia tomentosa (Malvaceae) 20.0
  50. 15 - Corylus colurna (Betulaceae) 20.0
  51. 11 - Calocedrus decurrens (Cupressaceae) 20.0
  52. 1 - Corylus colurna (Betulaceae) 20.0
  53. 8 - Platanus orientalis (Platanaceae) 20.0
  54. 20 - Fagus sylvatica (Fagaceae) 20.0
  55. 35 - Paulownia tomentosa (Paulowniaceae) 20.0
  56. 12 - Sequoiadendron giganteum (Taxodiaceae) 20.0
  57. 87 - Taxodium distichum (Taxodiaceae) 20.0
  58. 13 - Platanus orientalis (Platanaceae) 20.0
  59. 10 - Ginkgo biloba (Ginkgoaceae) 22.0
  60. 47 - Aesculus hippocastanum (Sapindaceae) 22.0
  61. 86 - Platanus orientalis (Platanaceae) 22.0
  62. 14 - Pterocarya fraxinifolia (Juglandaceae) 22.0
  63. 88 - Liriodendron tulipifera (Magnoliaceae) 22.0
  64. 18 - Fagus sylvatica (Fagaceae) 23.0
  65. 24 - Cedrus atlantica (Pinaceae) 25.0
  66. 31 - Ginkgo biloba (Ginkgoaceae) 25.0
  67. 92 - Platanus x acerifolia (Platanaceae) 25.0
  68. 49 - Platanus orientalis (Platanaceae) 25.0
  69. 97 - Pinus nigra (Pinaceae) 25.0
  70. 84 - Ginkgo biloba (Ginkgoaceae) 25.0
  71. 73 - Platanus orientalis (Platanaceae) 26.0
  72. 65 - Pterocarya fraxinifolia (Juglandaceae) 27.0
  73. 42 - Platanus orientalis (Platanaceae) 27.0
  74. 85 - Juglans nigra (Juglandaceae) 28.0
  75. 76 - Pinus nigra laricio (Pinaceae) 30.0
  76. 19 - Quercus petraea (Fagaceae) 30.0
  77. 72 - Sequoiadendron giganteum (Taxodiaceae) 30.0
  78. 54 - Pterocarya stenoptera (Juglandaceae) 30.0
  79. 29 - Zelkova carpinifolia (Ulmaceae) 30.0
  80. 27 - Sequoia sempervirens (Taxodiaceae) 30.0
  81. 25 - Fagus sylvatica (Fagaceae) 30.0
  82. 41 - Platanus x acerifolia (Platanaceae) 30.0
  83. 77 - Taxodium distichum (Taxodiaceae) 30.0
  84. 55 - Platanus x acerifolia (Platanaceae) 30.0
  85. 69 - Pinus nigra (Pinaceae) 30.0
  86. 38 - Fagus sylvatica (Fagaceae) 30.0
  87. 59 - Sequoiadendron giganteum (Taxodiaceae) 30.0
  88. 52 - Fraxinus excelsior (Oleaceae) 30.0
  89. 37 - Cedrus libanii (Pinaceae) 30.0
  90. 22 - Cedrus libanii (Pinaceae) 30.0
  91. 30 - Aesculus hippocastanum (Sapindaceae) 30.0
  92. 80 - Quercus petraea (Fagaceae) 31.0
  93. 9 - Platanus orientalis (Platanaceae) 31.0
  94. 82 - Platanus x acerifolia (Platanaceae) 32.0
  95. 46 - Ginkgo biloba (Ginkgoaceae) 33.0
  96. 45 - Platanus orientalis (Platanaceae) 34.0
  97. 56 - Taxodium distichum (Taxodiaceae) 35.0
  98. 81 - Liriodendron tulipifera (Magnoliaceae) 35.0
  99. 17 - Platanus x acerifolia (Platanaceae) 35.0
  100. 53 - Ailanthus giraldii (Simaroubaceae) 35.0
  101. 57 - Sequoiadendron giganteum (Taxodiaceae) 35.0
  102. 26 - Platanus x acerifolia (Platanaceae) 40.0
  103. 74 - Platanus x acerifolia (Platanaceae) 40.0
  104. 40 - Platanus x acerifolia (Platanaceae) 40.0
  105. 90 - Platanus x acerifolia (Platanaceae) 42.0
  106. 21 - Platanus x acerifolia (Platanaceae) 45.0

The job works as expected.

1.8.6 OldestTreeDistrictSort / OldestTreeDistrictReduce

For this job, the guideline states that it is not possible to use the information directly as key-values and suggests to use a
single were all the data couples (used as a value output via ArrayWritable or MapWritable) are aggregated in the Reducer and successive comparisons are applied to all the second index of the couple (i.e. the years, IntWritable), so that the first indices of the couples (as there can be multiple districts where trees were planted the same year) with the lowest years (i.e. the district number, IntWritable) are returned. However, this statement is untrue, as it is possible once again to use the sorting operation that is part of MapReduce operations to have the first key-values couple received by the Reducer be the one with the smallest year (with all the districts with a tree planted at the smallest year aggregated), and ignore all other couples. It is possible to do so because the operation requires the selection of one data point that can be found directly in the sort result; if the aggregation we had to do was more complicated (like a count or calculating an average), it would require us to use the first method.

For the sake of completeness, we will showcase both methods, with OldestTreeDistrictSort using the more effective method with the automatic MapReduce sort and OldestTreeDistrictReduce consolidating all the data into a single key for processing

Note: for clarity, we will return the year of plantation along with the numbers of the districts where the oldest trees were planted.

In the current dataset, the oldest tree is the one with OBJECTID 4 (line 28 of the trees.csv file), planted in 1601 in the 5th district. For testing purposes, we will add two trees to the dataset: another one planted in district 5 in the same, and one planted in district 3 in 1601 as well.

  1. -sh-4.2$ echo ";5;Cedrus;atlantica;Pinaceae;1601;;;Cèdre bleu de l'Atlas;Glauca;99;" | hdfs dfs -appendToFile - "trees.csv"
  2. -sh-4.2$ echo ";3;Cedrus;atlantica;Pinaceae;1601;;;Cèdre bleu de l'Atlas;Glauca;100;" | hdfs dfs -appendToFile - "trees.csv"
  3. -sh-4.2$ hdfs dfs -cat trees.csv
  4. ...
  5. ;5;Cedrus;atlantica;Pinaceae;1601;;;Cèdre bleu de l'Atlas;Glauca;99;
  6. ;3;Cedrus;atlantica;Pinaceae;1601;;;Cèdre bleu de l'Atlas;Glauca;100;

AppDriver.java

  1. ...
  2. programDriver.addClass("oldestTreeDistrictSort", OldestTreeDistrictSort.class,
  3. "A map/reduce program that returns the district(s) with the oldest tree(s) in the Remarkable Trees of Paris dataset, using a sort.");
  4. ...

OldestTreeDistrictSort.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: oldestTreeDistrictSort <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "oldestTreeDistrictSort");
  7. job.setJarByClass(OldestTreeDistrictSort.class);
  8. job.setMapperClass(OldestDistrictSortMapper.class);
  9. job.setCombinerClass(OldestDistrictSortReducer.class);
  10. job.setReducerClass(OldestDistrictSortReducer.class);
  11. job.setOutputKeyClass(IntWritable.class);
  12. job.setOutputValueClass(IntWritable.class);
  13. // The Mapper & the Reducer have the same output key-values
  14. ...

OldestDistrictSortMapper.java

  1. ...
  2. public class OldestDistrictSortMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. try {
  7. Integer year = Integer.parseInt(value.toString().split(";")[5]);
  8. context.write(new IntWritable(year), new IntWritable(Integer.parseInt(value.toString().split(";")[1])));
  9. } catch (NumberFormatException ex) {
  10. // If the year is not an integer, skip by catching the error from the parseFloat() method
  11. }
  12. } curr_line++;
  13. }
  14. }

OldestDistrictSortReducer.java

  1. ...
  2. public class OldestDistrictSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  3. public boolean first = true;
  4. public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
  5. throws IOException, InterruptedException {
  6. if (first) {
  7. StreamSupport.stream(values.spliterator(), false).distinct().forEach(v -> {
  8. try {
  9. context.write(key, v);
  10. } catch (IOException | InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. first = false;
  16. }
  17. }
  1. -sh-4.2$ launch_job oldestTreeDistrictSort trees.csv oldest_sort; printf "\n"; result oldest_sort;
  2. ...
  3. 20/11/12 20:54:21 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 20:54:30 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 20:54:40 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16821
  9. File Output Format Counters
  10. Bytes Written=14
  11. 1601 5
  12. 1601 3

The job worked as expected: the job only shows the distinct districts with trees planted at the oldest date.

AppDriver.java

  1. ...
  2. programDriver.addClass("oldestTreeDistrictReduce", OldestTreeDistrictReduce.class,
  3. "A map/reduce program that returns the district(s) with the oldest tree(s) in the Remarkable Trees of Paris dataset, checking through all the data.");
  4. ...

OldestTreeDistrictSort.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: oldestTreeDistrictReduce <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "oldestTreeDistrictReduce");
  7. job.setJarByClass(OldestTreeDistrictReduce.class);
  8. job.setMapperClass(OldestDistrictReduceMapper.class);
  9. //job.setCombinerClass(OldestDistrictReduceReducer.class);
  10. // The Mapper and Reducer have mismatch key value output types
  11. job.setReducerClass(OldestDistrictReduceReducer.class);
  12. job.setMapOutputKeyClass(NullWritable.class);
  13. job.setMapOutputValueClass(MapWritable.class);
  14. job.setOutputKeyClass(IntWritable.class);
  15. job.setOutputValueClass(IntWritable.class);
  16. ...

OldestDistrictSortMapper.java

  1. ...
  2. public class OldestDistrictReduceMapper extends Mapper<Object, Text, NullWritable, MapWritable> {
  3. public int curr_line = 0;
  4. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  5. if (curr_line != 0) {
  6. try {
  7. Integer year = Integer.parseInt(value.toString().split(";")[5]);
  8. MapWritable map = new MapWritable();
  9. map.put(new IntWritable(Integer.parseInt(value.toString().split(";")[1])), new IntWritable(year));
  10. context.write(NullWritable.get(), map);
  11. } catch (NumberFormatException ex) {
  12. // If the year is not a integer, skip by catching the error from the parseFloat() method
  13. }
  14. } curr_line++;
  15. }
  16. }

OldestDistrictSortReducer.java

  1. ...
  2. public class OldestDistrictReduceReducer extends Reducer<NullWritable, MapWritable, IntWritable, IntWritable> {
  3. public void reduce(NullWritable key, Iterable<MapWritable> values, Context context)
  4. throws IOException, InterruptedException {
  5. ArrayList<Integer[]> district_years = (ArrayList<Integer[]>) StreamSupport.stream(values.spliterator(), false)
  6. .map( mw -> (new Integer[] { ((IntWritable) mw.keySet().toArray()[0]).get(), ((IntWritable) mw.get(mw.keySet().toArray()[0])).get() }))
  7. .collect(Collectors.toList());
  8. // Copies the iterable to an arraylist so multiple operations can be done on the iterable
  9. int min_year = district_years.stream().map((arr) -> arr[1]).min(Integer::compare).get();
  10. district_years.stream().filter(arr -> arr[1] == min_year).map(arr -> arr[0]).distinct().forEach((district) -> { try {
  11. context.write(new IntWritable(min_year), new IntWritable(district));
  12. } catch (IOException | InterruptedException e) {
  13. e.printStackTrace();
  14. } });
  15. }
  16. }
  1. -sh-4.2$ launch_job oldestTreeDistrictReduce trees.csv oldest_reduce; printf "\n"; result oldest_reduce;
  2. ...
  3. 20/11/12 20:56:09 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 20:56:18 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 20:56:27 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16821
  9. File Output Format Counters
  10. Bytes Written=14
  11. 1601 5
  12. 1601 3

The job worked as expected.

1.8.7 MaxTreesDistrict / MaxTreesDistrict2

As we saw in the last job, a Reducer that aggregates all data in one iteration over a single key with which all the data elements sent from the Mapper are associated is very inefficient. For this last job that involves finding the districts with the maximum number of trees, we will design a job that uses multiple key-value outputs for the Mapper with a proper aggregation in the reducer. To do so, there are two methods: the first one, mentioned in the guideline, involves creating a second layer of MapReduce by chaining two jobs together. We will use our previous DistinctDistricts job as the first job (it will give us the number of trees associated with each district, cf. 1.8.1) and for the second job we will use a mapper similar to that of OldestDistrictReduceReducer (returning all the key value couples with a single key with an unimportant value for consolidation in the Reducer) and a simple maximum Reducer using the values as the elements to be compared and returning the corresponding key value couple(s). The other method, more effective and requiring less classes and less operations, is to add a cleanup() method to the Reducer of the DistinctDistricts job so that it returns the district with the most trees out of a collection of associated district numbers and number of trees planted.

Once again, we will showcase both methods, with MaxTreesDistrict corresponding to the most effective method, and MaxTreesDistrict2 corresponding to the method involving two MapReduce jobs.

AppDriver.java

  1. ...
  2. programDriver.addClass("maxTreesDistrict", MaxTreesDistrict.class,
  3. "A map/reduce program that returns the district(s) with the most trees in the Remarkable Trees of Paris dataset, checking through all the data, using the Reducer's cleanup.");
  4. ...

MaxTreesDistrict.java

  1. ...
  2. if (otherArgs.length < 2) {
  3. System.err.println("Usage: maxTreesDistrict <in> [<in>...] <out>");
  4. System.exit(2);
  5. }
  6. Job job = Job.getInstance(conf, "maxTreesDistrict");
  7. job.setJarByClass(MaxTreesDistrict.class);
  8. job.setMapperClass(TreesMapper.class);
  9. job.setCombinerClass(MaxTreesDistrictReducer.class);
  10. job.setReducerClass(MaxTreesDistrictReducer.class);
  11. job.setOutputKeyClass(IntWritable.class);
  12. job.setOutputValueClass(IntWritable.class);
  13. ...

MaxTreesDistrictReducer.java

  1. ...
  2. public class MaxTreesDistrictReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  3. public ArrayList<Integer[]> sum_districts = new ArrayList<Integer[]>();
  4. public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
  5. throws IOException, InterruptedException {
  6. int sum = 0;
  7. for (IntWritable val : values) {
  8. sum += val.get();
  9. }
  10. sum_districts.add(new Integer[] {key.get(), sum });
  11. }
  12. public void cleanup(Context context) {
  13. int max = sum_districts.stream().map(arr -> arr[1]).max(Integer::compare).get();
  14. sum_districts.stream().filter(arr -> arr[1] == max)
  15. .forEach(arr -> {
  16. try {
  17. context.write(new IntWritable(arr[0]), new IntWritable(max));
  18. } catch (IOException | InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. });
  22. }
  23. }
  1. -sh-4.2$ launch_job maxTreesDistrict trees.csv max_trees_district; printf "\n"; result max_trees_district;
  2. ...
  3. 20/11/12 22:41:11 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 22:41:20 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 22:41:24 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16821
  9. File Output Format Counters
  10. Bytes Written=6
  11. 16 36

The job worked as expected: district number 16 is indeed the one with the most trees (with a total of 36).

AppDriver.java

  1. ...
  2. programDriver.addClass("maxTreesDistrict", MaxTreesDistrict.class,
  3. "A map/reduce program that returns the district(s) with the most trees in the Remarkable Trees of Paris dataset, checking through all the data, using the Reducer's cleanup.");
  4. ...

MaxTreesDistrict2.java

  1. ...
  2. public class MaxTreesDistrict2 {
  3. public static void main(String[] args) throws Exception {
  4. Configuration conf = new Configuration();
  5. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  6. if (otherArgs.length < 2) {
  7. System.err.println("Usage: maxTreesDistrict2 <in> [<in>...] <out>");
  8. System.exit(2);
  9. }
  10. Job job = Job.getInstance(conf, "maxTreesDistrict2");
  11. job.setJarByClass(MaxTreesDistrict2.class);
  12. job.setMapperClass(TreesMapper.class);
  13. job.setCombinerClass(TreesReducer.class);
  14. job.setReducerClass(TreesReducer.class);
  15. job.setOutputKeyClass(IntWritable.class);
  16. job.setOutputValueClass(IntWritable.class);
  17. Path temp_file = new Path("##.temp");
  18. for (int i = 0; i < otherArgs.length - 1; ++i) {
  19. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  20. }
  21. FileOutputFormat.setOutputPath(job,
  22. temp_file);
  23. job.waitForCompletion(true);
  24. Configuration conf_max = new Configuration();
  25. Job job_max = Job.getInstance(conf_max, "max");
  26. job_max.setJarByClass(MaxTreesDistrict2.class);
  27. job_max.setMapperClass(ConsolidateInputMapper.class);
  28. //job_max.setCombinerClass(MaxTreesDistrictReducer2.class);
  29. job_max.setReducerClass(MaxTreesDistrictReducer2.class);
  30. job_max.setMapOutputKeyClass(NullWritable.class);
  31. job_max.setMapOutputValueClass(MapWritable.class);
  32. job_max.setOutputKeyClass(IntWritable.class);
  33. job_max.setOutputValueClass(IntWritable.class);
  34. FileInputFormat.addInputPath(job_max, temp_file);
  35. FileOutputFormat.setOutputPath(job_max,
  36. new Path(otherArgs[otherArgs.length-1]));
  37. boolean finished = job_max.waitForCompletion(true);
  38. FileUtils.deleteDirectory(new File(temp_file.toString()));
  39. System.exit(finished ? 0 : 1);
  40. }
  41. }

ConsolidateInputMapper.java

  1. public class ConsolidateInputMapper extends Mapper<LongWritable, IntWritable, NullWritable, MapWritable> {
  2. public void map(LongWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
  3. MapWritable map = new MapWritable();
  4. map.put(new IntWritable((int) key.get()), value);
  5. context.write(NullWritable.get(), map);
  6. }
  7. }

MaxTreesDistrictReducer2.java

  1. ...
  2. public class MaxTreesDistrictReducer2 extends Reducer<NullWritable, MapWritable, IntWritable, IntWritable> {
  3. public void reduce(NullWritable key, Iterable<MapWritable> values, Context context)
  4. throws IOException, InterruptedException {
  5. ArrayList<Integer[]> district_trees = (ArrayList<Integer[]>) StreamSupport.stream(values.spliterator(), false)
  6. .map( mw -> (new Integer[] { ((IntWritable) mw.keySet().toArray()[0]).get(), ((IntWritable) mw.get(mw.keySet().toArray()[0])).get() }))
  7. .collect(Collectors.toList());
  8. // Copies the iterable to an arraylist so multiple operations can be done on the iterable
  9. int max_trees = district_trees.stream().map((arr) -> arr[1]).max(Integer::compare).get();
  10. district_trees.stream().filter(arr -> arr[1] == max_trees).map(arr -> arr[0]).distinct().forEach((district) -> { try {
  11. context.write(new IntWritable(max_trees), new IntWritable(district));
  12. } catch (IOException | InterruptedException e) {
  13. e.printStackTrace();
  14. } });
  15. }
  16. }
  1. -sh-4.2$ launch_job maxTreesDistrict2 trees.csv max_trees_district2; printf "\n"; result max_trees_district2;
  2. ...
  3. 20/11/12 23:17:41 INFO mapreduce.Job: map 0% reduce 0%
  4. 20/11/12 23:17:50 INFO mapreduce.Job: map 100% reduce 0%
  5. 20/11/12 23:17:59 INFO mapreduce.Job: map 100% reduce 100%
  6. ...
  7. File Input Format Counters
  8. Bytes Read=16821
  9. File Output Format Counters
  10. Bytes Written=80
  11. ...
  12. 20/11/12 23:18:11 INFO mapreduce.Job: map 0% reduce 0%
  13. 20/11/12 23:18:20 INFO mapreduce.Job: map 100% reduce 0%
  14. 20/11/12 23:18:29 INFO mapreduce.Job: map 100% reduce 100%
  15. ...
  16. File Input Format Counters
  17. Bytes Read=80
  18. File Output Format Counters
  19. Bytes Written=6
  20. 16 36

The job worked as expected.

N.B. We tried adding JUnit tests to our project but could not proceed because of some error with the library we could not solve:

  1. Wanted but not invoked:
  2. context.write(pomifera, (null));
  3. -> at com.opstty.mapper.SpeciesMapperTest.testMap(SpeciesMapperTest.java:34)
  4. Actually, there were zero interactions with this mock.

We managed to finish the lab without using the unit tests, thanks to effective debugging.