import java.io.*; import java.util.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapreduce.lib.input.*; // for FileInputFormat import org.apache.hadoop.mapreduce.lib.output.*; public class mapcounter extends Mapper { int tid; // taks id within job - used as part of output key int filenum; // file number this mapper is reading from // override the setup function to provide addtional configuration: protected void setup(Context context) throws IOException, InterruptedException { Configuration config = context.getConfiguration(); String taskid = config.get("mapred.task.partition"); // get task id tid = Integer.parseInt(taskid); // Another possibility is to get the current file name, but // config.get("map.input.file") is not working (Hadoop .20.2 bug). // Instead, one has to call String filename = ((FileSplit) context.getInputSplit()).getPath().getName(); filenum = Integer.parseInt(filename.substring(3,5)); super.setup(context); // default setup. }// setup public static long countcats(String line) { long count =0; // local counter int i = 0; // position counter while (i