Commit 314afebc authored by Xiaofei Zhu's avatar Xiaofei Zhu

add anchorcount.jar

parents
/**
* Usage:
* export YARN_OPTS=-Xmx30G
* export HADOOP_CLIENT_OPTS="-Xmx10g"
*
* Input format : src_URL \t timestamp \t tar_URL \t placeholder \t anchor_text
* line[0] - src_URL
* line[2] - tar_URL
* line[4] - anchor_text
*
* Output format:
* tar_URL \t anchor_text1 \t freq1 \t anchor_text2 \t freq2 \t ...
*
* Test data: /home/zhu/AnchorCount/test.dat
* - Input:
* s1 \t 0 \t t1 \t 0 \t a1
* s1 \t 0 \t t1 \t 0 \t a1
* s1 \t 0 \t t1 \t 0 \t a2
* s1 \t 0 \t t1 \t 0 \t a3
* s2 \t 0 \t t1 \t 0 \t a1
* s3 \t 0 \t t1 \t 0 \t a1
* s3 \t 0 \t t1 \t 0 \t a2
* s1 \t 0 \t t2 \t 0 \t a1
* s1 \t 0 \t t2 \t 0 \t a1
* s1 \t 0 \t t2 \t 0 \t a2
* s2 \t 0 \t t2 \t 0 \t a3
*
* - Output:
* t1 a1 3 a2 2 a3 1
* t2 a3 1 a2 1 a1 1
*/
package de.l3s.webarchive;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
class ValueComparator implements Comparator<String> {
Map<String, Integer> map;
public ValueComparator(Map<String, Integer> base) {
this.map = base;
}
public int compare(String a, String b) {
if (map.get(a) >= map.get(b)) {
return -1;
} else {
return 1;
} // returning 0 would merge keys
}
}
class AnchorCountMapper extends Mapper<LongWritable, Text, Text, Text> {//Mapper<input_keytype, input_valuetype, output_keytype, output_valuetype>
private final static Text One = new Text("1");
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //when read file, we don't use,LongWritable key while only use value which represents a line in the file
String[] line = StringUtils.splitPreserveAllTokens(value.toString(), "\t");
if(line.length != 5){
return;
}
String src_URL = line[0];
String tar_URL = line[2];
String anchor_text = line[4];
String skey = tar_URL + "\t" + src_URL + "\t" + anchor_text;
context.write(new Text(skey), One);
}
}
class AnchorCountCombiner extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
String[] line = StringUtils.splitPreserveAllTokens(key.toString(), "\t");
String tar_URL = line[0];
String anchor_text = line[2];
context.write(new Text(tar_URL), new Text(anchor_text));
}
}
class AnchorCountReducer extends Reducer<Text, Text, Text, Text>{
public static TreeMap<String, Integer> SortByValue (HashMap<String, Integer> map) {
ValueComparator vc = new ValueComparator(map);
TreeMap<String,Integer> sortedMap = new TreeMap<String,Integer>(vc);
sortedMap.putAll(map);
return sortedMap;
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
HashMap<String,Integer> manchor = new HashMap<String,Integer>();
for (Text val : values){
String anchor_text = val.toString();
anchor_text = StringUtils.trim(anchor_text);
anchor_text = anchor_text.replace("&nbsp;", " ");
if(anchor_text.length() == 0) continue;
if(manchor.containsKey(anchor_text) == false){
manchor.put(anchor_text, 1);
}else{
int freq = manchor.get(anchor_text);
manchor.put(anchor_text, freq+1);
}
}
StringBuilder strbd = new StringBuilder();
TreeMap<String, Integer> sortedMap = SortByValue(manchor);
for(String anchor_text : sortedMap.keySet()){
Integer freq = manchor.get(anchor_text); //NOTE: here is map NOT sortedMap
System.out.println(key + "\t" + freq.toString());
strbd.append("\t" + anchor_text + "\t" + freq.toString());
}
context.write(key, new Text(strbd.toString()));
}
}
public class AnchorCount extends Configured implements Tool{
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new AnchorCount(), args);
System.exit(exitCode);
}
public int run(String[] args) throws Exception{
Configuration conf = getConf();
conf.setInt("yarn.app.mapreduce.am.resource.mb", 10_000);
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(args[1]), true);
Job job = Job.getInstance(conf, "Anchor Count");
job.setJarByClass(AnchorCount.class);
job.setMapperClass(AnchorCountMapper.class);
job.setCombinerClass(AnchorCountCombiner.class);
job.setReducerClass(AnchorCountReducer.class);
job.setNumReduceTasks(1);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); //
job.setOutputValueClass(Text.class); //
// job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment