WordCount Example with MapReduce

WordCount Example with MapReduce

Hello! Hope you're doing well. In my last post I've explained about internals of Hadoop MapReduce. As promised in that post, we will write and execute a MapReduce program in Java for a simple wordcount example. Let's dive in!

Topics covered in this post

  1. Pre-requisites
  2. Hadoop cluster setup on local machine and on Cloud
  3. Writing a MapReduce program on Eclipse
  4. Create a JAR file for the MapReduce Program and Uploading to HDFS
  5. Executing the MapReduce Program on the Hadoop Cluster
  6. Results

1. Pre-requisites

  1. Admin access to the machine (local preferably)
  2. Hadoop Cluster (Single/Multi node cluster) on local machine or on cloud
  3. Install JDK 1.8 or later on the local machine
  4. Eclipse IDE or any Java IDE installed on the local machine

2. Hadoop cluster setup on local machine and on Cloud

i. Single Node cluster setup

As we already discussed, the DataNodes store and process the data. We need at least a single node Hadoop cluster to run the MapReduce program and process the data.

Setting up a single node Hadoop cluster on a local machine is a bit lengthy process and often could lead us to errors. I'm sharing the guides that I've used to setup the cluster on my local for testing below for both and Windows and Linux.

ii. Multi node Cluster setup

Alternatively, we can use a cloud-based Hadoop cluster like DataProc on Google Cloud platform (GCP) which doesn't require any setup other than selecting the configuration of the NameNode and the DataNodes. The GCP account setup can be referred here. We'll see the setup in the following steps.

Before going any further you should consider two important steps while operating in any cloud environment.

a. Setting up the billing alerts to avoid any unexpected bills.

b. Turn off/delete the resources soon after the work is done

a. Sign up to the Google Cloud and login to your account

Untitled.png

b. Search for "DataProc" and select the option with the same name in the results

Untitled 1.png

c. Select the "Create Cluster" option

Untitled 2.png

d. Provide the following details in the create cluster page under "setup a cluster page"

i. Cluster name - **test-cluster**

ii. Cluster region and Zone - **us-central1**, **us-central1-a**

iii. Cluster Type - **Standard (1 master, N workers)**

Untitled 3.png

iv. Autoscaling Policy - **None**

v. Image type and version - **2.0-debian10 (default)**

vi. **Select Enable Component gateway**

Untitled 4.png

Untitled 5.png

e. Under "Configure nodes" select the following for Master node

i. Machine family - **General-Purpose (default)**

ii. Series - **N1 (default)**

iii. Machine type - **n1-standard-2 (2 vCPU, 7.5 GB memory)**

iv. Primary disk size (min 15 GB) - **100GB**

v. Primary disk type - **Standard Persistent Disk**

vi. Number of local SSDs - **0**

Untitled 6.png

f. Select the following for "Worker Nodes"

i. Machine family - **General-Purpose (default)**

ii. Series - **N1 (default)**

iii. Machine type - **n1-standard-2 (2 vCPU, 7.5 GB memory)**

iv. Number of worker nodes - **2**

v. Primary disk size (min 15 GB) - **100GB**

vi. Primary disk type - **Standard Persistent Disk**

vii. Number of local SSDs **- 0**

Untitled 7.png

g. Leave the rest of the config as is and select on "CREATE"

Untitled 8.png

h. Click on the cluster name and select the "VM Instances" tab in the page

Untitled 9.png

i. Click on "SSH" for the master node and you'll be presented with a new browser window connected to the master node of our HDFS cluster. I've used local terminal to connect to the master node for the rest of the post.

Untitled 10.png

Note: In real world scenarios, we would connect to the Hadoop cluster via a gateway node or edge node. We'll not use the NameNode for connecting to the cluster since it'll be very busy in handling the cluster.

3. Writing a MapReduce program on Eclipse

a. Create a new Java project called "wordcountmapreduce" in Eclipse IDE on your local machine. Here, I'm using a Linux (ubuntu) machine to create the project. The rest of the steps should stay same for Windows machine as well.

Untitled 11.png

b. Create a new Class for Map by right clicking on the project and select "Class". Once you select it, enter the name of the Map class as "WordCountMapper" and hit Finish.

Untitled 12.png

c. Once WordCountMapper class is created, use the following link for the mapper, reducer, partitioner implementation for the wordcount example. Refer the GitHub link for the code.

Untitled 13.png

d. To remove the errors in the IDE, we must mention the Hadoop libraries in project build path. The following are the libraries (only jar files) that should be added to the project:

  • <hadoop_dir>/share/hadoop/mapreduce (<hadoop_dir> is the path where you saved the hadoop distribution. Ex: /home/<username>/hadoop-3.3.1)
  • <hadoop_dir>/share/hadoop/hdfs
  • <hadoop_dir>/share/hadoop/client
  • <hadoop_dir>/share/hadoop/common
  • <hadoop_dir>/share/hadoop/yarn

Untitled 14.png

Click on "Add External JARs" and navigate to the paths mentioned in the above list. After all the required JARs, click "Apply and Close"

Untitled 15.png

e. After adding the jars to the project build path, we can see the errors disappeared in the IDE in the below image. Use the code for the reducer (WordCountReducer.java), partitioner (WordCountPartitioner.java) and the driver (WordCount.java) classes from the GitHub link

Untitled 16.png

f. Once the project setup is done, we will have a look at the "WordCount.java" class. This is a driver class which executes the Map, Reduce, Combiner and the Partitioner classes on the cluster. This class includes config like

i. Job Name - setJobName ii. Driver class - setJarByClass iii. Mapper class - setMapperClass iv. Combiner class - setCombinerClass. Same as Reducer class for wordcount example v. Reducer class - setReducerClass vi. Number of Reducers - setNumReduceTasks vii. Output data types from each class - setOutputKeyClass, setOutputValueClass viii. Input and Output paths - addInputPath, setOutputPath respectively

Untitled 17.png

This is basically the end of the project and code setup required for the wordcount problem in MapReduce.

4. Create a JAR file for the MapReduce Program and Uploading to HDFS

Once the project and the Mapreduce code setup is done, there are two ways we could execute the MapReduce Java program:

  1. Run the Java program within the eclipse. You can find the guide for the same here.
  2. Package the Java program as a JAR file with all the dependencies and execute on the Hadoop cluster. We'll follow this method for this guide

Steps to package the wordcount MapReduce Java program as a JAR file:

a. Right click on the project and select "Export" option

Untitled 18.png

b. Under Java, select "JAR" option and click Next.

Untitled 19.png

c. Select the path for saving the JAR file. Click Next until the final step

Untitled 20.png

d. Select the Main class as "WordCount" using Browse window.

Untitled 21.png

e. Select Finish to create the jar file

Untitled 22.png

f. The jar file will be created as shown below. Once the jar file is created, we'll upload it to the GCP Hadoop cluster and run it.

Untitled 23.png

g. Now, we'll upload this to the master node in the HDFS cluster using SCP. You can configure SSH to connect to HDFS cluster instance on GCP using this link. I've used Windows + Windows Terminal and the same steps mentioned below are followed. To copy the jar file(s) to master node on the cluster, we use the following command:

SCP -i "`<Path/to/SSH/key/ssh-key>`" Path/to/jar/file/wordcountmapperonly.jar  username@`<master-ip>`:/path/on/server

Untitled 24.png

h. Once the jar file is available on the master node instance, we can use the following commands to copy the jar file to the HDFS cluster. Please note master node instance and the HDFS cluster are different.

SSH -i "`<Path/to/SSH/key/>`" username@`<master-ip>`
hadoop fs -put -f Path/to/jar/file/wordcountmapperonly.jar `<hdfs_path>`
hadoop fs -ls `<hdfs_path>`

Untitled 25.png

Here, we are copying the jar files wordcountmapperonly.jar, wordcountmapreduce.jar and wordcountmapreducepartitioner.jar and the input data folder HadoopInputFiles for the Hadoop Directory '/'. The input folder contains 3 text files

5. Executing the MapReduce Program on the Hadoop Cluster

As we've seen already, the MapReduce driver class (WordCount.java) will be configured to execute Mapper, Combiner, Reducer and Partitioner. We'll run the MapReduce program with different configurations using the driver class

i. Only Mapper

ii. Mapper and Reducer

ii. Mapper, Reducer and Partitioner

i. Only Mapper

To run Mapper only, we need to comment out the Combiner, Reducer and Partitioner classes configured in the driver class and package the jar file as shown in the above step. The driver class should look like the below picture. The code for the same is here.

Untitled 26.png

The input files are in "/HadoopInputFiles" and has data as in three files as mentioned below. You can find the input files here.

Untitled 27.png

Now, run the jar file "wordcountmapperonly.jar" on the Hadoop cluster with the following command and above input files. The steps to copy the jar file to HDFS location is shown above section.

hadoop jar `<hdfs_path>`/wordcountmapperonly.jar `<input_file_or_dir_path>` `<output_path>`

The following image show how to run the mapreduce jars on Hadoop cluster. The full output log of the run is here.

Untitled 28.png

The output of the mapper only phase contains all the words with count 1 as shown below

Untitled 29.png

Once we run the MapReduce job, we can see the application is tracked under YARN which is a resource manager for the cluster. Every run gets an entry here. The default YARN URL is <cluster-hostname>:8088. For DataProc cluster though, we need to go to cluster details in the GCP console, select "Web Interfaces" tab under cluster details and select "YARN ResourceManager" to get the YARN web interface.

Untitled 30.png

Untitled 31.png

In case where the output path in the hadoop jar command already exists, the MapReduce framework throws "Output directory already exists" error as shown below. This is to avoid the overwriting of any output data.

Untitled 32.png

Note: -D mapred.reduce.tasks is set to 3 by default and we need only map phase to run. We can force the reducer count to zero using this property.

In the output path, we can see four different files

  1. _SUCCESS - Indicates the job status
  2. part-m-00000 to part-m-00002 - output file corresponding each input files. here 'm' in the output filename indicates 'mapper' phase. Since we don't have a reduce phase configured for this run, we'll get an output file for an input file

Untitled 33.png

As we already know, each mapper produces the key-value pairs <word,1> for all the words in the input sentence as output shown below

Untitled 34.png

ii. Mapper and Reducer

Now, Let's run the 'wordcountmapreduce.jar' with the same input files and a different output path. This has both map and reduce phase configured in the driver class. Logs for the run are here and code for the same is here

Untitled 35.png

The output is generated after the reduce phase into a single output file. Since we have only one reducer by default in the cluster

Untitled 36.png

Untitled 37.png

iii. Mapper, Reducer and Partitioner

Now, Let's run the 'wordcountmapreducepartitioner.jar' with the same input files and a different output path. This has map, partition and reduce phases configured in the driver class. Logs for the run are here and code for the same is here

Untitled 38.png

The output for the MapReduce with partitioner is as follows. As per the partitioner logic here, for each letter at the starting of the word, there will be a different output file created. This means we are creating 26 partitions which will create same number of reducers to process the records Example: all the words starting with letter 'a' will end up in 'part-r-00001' file with the count.

Untitled 39.png

Untitled 40.png

Conclusion

We have seen a practical example of wordcount with MapReduce as promised in my last post. This is an exhaustive guide to capture most known ways to create and execute the MapReduce programs in Java.

MapReduce as a compute has lost its edge to new compute framework like Spark. But do you know that we can use the MapReduce to ingest the data into HDFS from an RDBMS source? or write SQL like queries to execute MapReduce job? We will discuss about those in detail in my next blog posts. Stay tuned!

For now though, I'll delete the cloud resource that I've spun up for the tutorial. If you did the same, please delete the resource you have created else you'll end up with something like this

Resources

  1. Big Data course by Sumit M
  2. github.com/maninekkalapudi/dataengineeringb..

If my conent helped you in anyway and like to contribute to my knowledge quest and sharing, you can contribute to me here

Thanks,

Mani

Did you find this article valuable?

Support mani nekkalapudi by becoming a sponsor. Any amount is appreciated!