Commit 058e29a2 authored by Gerhard Gossen's avatar Gerhard Gossen

Allow splitting of WARC files

parent dd8f6b45
Pipeline #104 skipped
...@@ -30,8 +30,7 @@ public class WARCFileInputFormat extends FileInputFormat<String, Snapshot> { ...@@ -30,8 +30,7 @@ public class WARCFileInputFormat extends FileInputFormat<String, Snapshot> {
@Override @Override
protected boolean isSplitable(JobContext context, Path filename) { protected boolean isSplitable(JobContext context, Path filename) {
// As these are compressed files, they cannot be (sanely) split return true;
return false;
} }
public static String getUrlPattern(Configuration conf, String defaultValue) { public static String getUrlPattern(Configuration conf, String defaultValue) {
......
package de.l3s; package de.l3s;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
...@@ -50,17 +57,20 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> { ...@@ -50,17 +57,20 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> {
private static final Logger logger = LoggerFactory.getLogger(WARCFileRecordReader.class); private static final Logger logger = LoggerFactory.getLogger(WARCFileRecordReader.class);
private final static int GZIP_ID1 = 0x1f;
private final static int GZIP_ID2 = 0x8b;
private final static int GZIP_CM_DEFLATE = 0x08;
private Iterator<ArchiveRecord> ar; private Iterator<ArchiveRecord> ar;
private FSDataInputStream fsin; private FSDataInputStream fsin;
private ArchiveReader reader; private ArchiveReader reader;
private final HttpResponseParser responseParser = new HttpResponseParser(); private final HttpResponseParser responseParser = new HttpResponseParser();
private Snapshot value; private Snapshot value;
private long position = 0; private long splitLength;
private long fileSize;
private TaskAttemptContext context; private TaskAttemptContext context;
private Pattern urlPattern; private Pattern urlPattern;
private Pattern mimePattern; private Pattern mimePattern;
private WARCFileInputFormat.StatusSelection statusSelection; private WARCFileInputFormat.StatusSelection statusSelection;
private long start;
public WARCFileRecordReader() { public WARCFileRecordReader() {
// default constructor for use in FileInputFormat // default constructor for use in FileInputFormat
...@@ -69,16 +79,22 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> { ...@@ -69,16 +79,22 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> {
// Constructor for use by CombineFileInputFormat // Constructor for use by CombineFileInputFormat
public WARCFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { public WARCFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException {
logger.info("Processing input file {}", split.getPath(index)); logger.info("Processing input file {}", split.getPath(index));
initialize(context, split.getPath(index)); initialize(context, split.getPath(index), split.getOffset(index), split.getLength(index));
context.setStatus(String.format("Processing %s (%d/%d)", split.getPath(index), index, split.getNumPaths())); context.setStatus(String.format("Processing %s (%d/%d)", split.getPath(index), index, split.getNumPaths()));
} }
private void initialize(TaskAttemptContext context, Path path) throws IOException { private void initialize(TaskAttemptContext context, Path path, long start, long length) throws IOException {
this.context = context; this.context = context;
this.start = start;
this.splitLength = length;
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
FileSystem fs = path.getFileSystem(conf); FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path); fsin = fs.open(path);
fileSize = fs.getFileStatus(path).getLen(); BlockLocation[] locations = fs.getFileBlockLocations(fs.getFileStatus(path), start, length);
logger.info("Using block locations {} on host {}", locations, getExternalAddress().getHostName());
if (start > 0) {
seekToGZIPHeader(fsin);
}
reader = ArchiveReaderFactory.get(path.getName(), fsin, true); reader = ArchiveReaderFactory.get(path.getName(), fsin, true);
ar = reader.iterator(); ar = reader.iterator();
...@@ -90,10 +106,79 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> { ...@@ -90,10 +106,79 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> {
logger.info("Using URL pattern='{}', MIME pattern='{}', status selection={}", urlRegex, mimeRegex, statusSelection); logger.info("Using URL pattern='{}', MIME pattern='{}', status selection={}", urlRegex, mimeRegex, statusSelection);
} }
private enum State {
START, ID1, ID2
}
private boolean seekToGZIPHeader(FSDataInputStream is) throws IOException {
State state = State.START;
int nextByte;
while ((nextByte = is.read()) != -1) {
switch (state) {
case START:
if (nextByte == GZIP_ID1) {
state = State.ID1;
}
break;
case ID1:
if (nextByte == GZIP_ID2) {
state = State.ID2;
} else {
state = State.START;
}
break;
case ID2:
if (nextByte == GZIP_CM_DEFLATE) {
is.seek(is.getPos() - 3);
return true;
} else {
state = State.START;
}
break;
default:
throw new IllegalStateException();
}
}
return false;
}
private static InetAddress getExternalAddress() throws IOException {
List<InetAddress> globalAddresses = new ArrayList<>();
List<InetAddress> siteLocalAddresses = new ArrayList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
for (InterfaceAddress address : iface.getInterfaceAddresses()) {
InetAddress inetAddress = address.getAddress();
if (inetAddress.isLinkLocalAddress() || inetAddress.isLoopbackAddress()) {
continue;
}
if (inetAddress.isSiteLocalAddress()) {
siteLocalAddresses.add(inetAddress);
} else {
globalAddresses.add(inetAddress);
}
}
}
if (!globalAddresses.isEmpty()) {
return globalAddresses.get(0);
} else if (!siteLocalAddresses.isEmpty()) {
return siteLocalAddresses.get(0);
} else {
logger.info("Could not find external InetAddress, returning localhost");
return InetAddress.getLocalHost();
}
}
@Override @Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException { public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
if (inputSplit instanceof FileSplit) { if (inputSplit instanceof FileSplit) {
initialize(context, ((FileSplit) inputSplit).getPath()); FileSplit split = (FileSplit) inputSplit;
initialize(context, split.getPath(), split.getStart(), split.getLength());
} else {
throw new IllegalArgumentException("Not a FileSplit: " + inputSplit);
} }
} }
...@@ -115,12 +200,12 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> { ...@@ -115,12 +200,12 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> {
@Override @Override
public float getProgress() throws IOException, InterruptedException { public float getProgress() throws IOException, InterruptedException {
return (position * 1.0f) / fileSize; return ((fsin.getPos() - start) * 1.0f) / splitLength;
} }
@Override @Override
public boolean nextKeyValue() throws IOException, InterruptedException { public boolean nextKeyValue() throws IOException, InterruptedException {
while (ar.hasNext()) { while (fsin.getPos() < start + splitLength && ar.hasNext()) {
try { try {
ArchiveRecord record = ar.next(); ArchiveRecord record = ar.next();
ArchiveRecordHeader header = record.getHeader(); ArchiveRecordHeader header = record.getHeader();
...@@ -169,7 +254,6 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> { ...@@ -169,7 +254,6 @@ public class WARCFileRecordReader extends RecordReader<String, Snapshot> {
timestamp = -1; timestamp = -1;
} }
this.value = new Snapshot(targetURI, contentType, timestamp, response); this.value = new Snapshot(targetURI, contentType, timestamp, response);
this.position = fsin.getPos();
return true; return true;
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.warn("Could not read archive record, skipping", e); logger.warn("Could not read archive record, skipping", e);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment