Commit a9856157 authored by Gerhard Gossen's avatar Gerhard Gossen

Initial import from archive-crawler

parents
Pipeline #46 skipped
/target/
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.l3s.gossen</groupId>
<artifactId>crawler-queue</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<guava.version>19.0-rc2</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package de.l3s.gossen.crawler;
import java.util.Objects;
import com.google.common.base.Preconditions;
public class CrawlUrl {
public enum Path {
SEED('S'), LINK('L');
private final char name;
private Path(char name) {
this.name = name;
}
public char getName() {
return name;
}
}
private final String url;
private final String path;
private final float priority;
public CrawlUrl(String url, String path, float priority) {
Preconditions.checkNotNull(url);
Preconditions.checkArgument(path != null && !path.isEmpty(), "Invalid path: '%s'", path);
Preconditions.checkArgument(0 <= priority && priority <= 1, "Invalid priority, %s not in [0,1]", priority);
this.url = url;
this.path = path;
this.priority = priority;
}
public static CrawlUrl fromSeed(String url, float priority) {
return new CrawlUrl(url, Character.toString(Path.SEED.getName()), priority);
}
public CrawlUrl outlink(String url, float priority) {
return new CrawlUrl(url, this.path + Path.LINK.getName(), priority);
}
public String getUrl() {
return url;
}
public String getPath() {
return path;
}
public float getPriority() {
return priority;
}
@Override
public String toString() {
return String.format("%s (%s)", url, path);
}
@Override
public int hashCode() {
return Objects.hash(url);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CrawlUrl)) {
return false;
}
return url.equals(((CrawlUrl) obj).url);
}
/**
* Merge with other instance.
*
* @param mergee
* a different instance (non-null)
* @return the instance with the shorter path, iff the URL is the same,
* <tt>this</tt> otherwise
*/
public CrawlUrl merge(CrawlUrl mergee) {
if (!this.url.equals(mergee.url)) {
return this;
}
return path.length() <= mergee.path.length() ? this : mergee;
}
}
package de.l3s.gossen.crawler.frontier;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.google.common.hash.BloomFilter;
import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.hash.Funnels.stringFunnel;
import static java.nio.charset.StandardCharsets.US_ASCII;
import de.l3s.gossen.crawler.CrawlUrl;
public abstract class BaseFrontier implements Frontier, Closeable {
protected final BloomFilter<CharSequence> seenUrls = BloomFilter.create(stringFunnel(US_ASCII), 1_000_000);
private final MetricRegistry metrics;
protected final Meter incoming;
protected final Meter outgoing;
private Counter size;
protected Meter emptyQueue;
protected long totalIncoming = 0;
protected final Object lock = new Object();
public BaseFrontier(MetricRegistry metrics) {
this.metrics = metrics;
incoming = metrics.meter(name(getClass(), "incomingUrls"));
outgoing = metrics.meter(name(getClass(), "outgoingUrls"));
emptyQueue = metrics.meter(name(getClass(), "emptyQueue"));
size = metrics.counter(name(getClass(), "size"));
metrics.register(name(getClass(), "seenUrlsFpp"), new Gauge<Double>() {
@Override
public Double getValue() {
return seenUrls.expectedFpp();
}
});
metrics.register(name(getClass(), "unseenRate"), new RatioGauge() {
@Override
protected Ratio getRatio() {
return Ratio.of(incoming.getCount(), totalIncoming);
}
});
}
@Override
public void push(Collection<CrawlUrl> urls) {
synchronized (lock) {
totalIncoming += urls.size();
for (CrawlUrl url : urls) {
if (!seenUrls.mightContain(url.getUrl())) {
incoming.mark();
size.inc();
pushInternal(url);
seenUrls.put(url.getUrl());
}
}
}
}
@Override
public Optional<CrawlUrl> pop() {
synchronized (lock) {
Optional<CrawlUrl> internal = popInternal();
if (internal.isPresent()) {
outgoing.mark();
size.dec();
} else {
emptyQueue.mark();
}
return internal;
}
}
protected abstract Optional<CrawlUrl> popInternal();
protected abstract void pushInternal(CrawlUrl url);
@Override
public void close() throws IOException {
metrics.removeMatching(new ClassMetricFilter(getClass()));
}
}
package de.l3s.gossen.crawler.frontier;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
final class ClassMetricFilter implements MetricFilter {
private final String className;
ClassMetricFilter(Class<?> clazz) {
this.className = clazz.getName();
}
@Override
public boolean matches(String name, Metric metric) {
return name.startsWith(className);
}
}
\ No newline at end of file
package de.l3s.gossen.crawler.frontier;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.util.Locale;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Throwables;
import de.l3s.gossen.crawler.CrawlUrl;
public class FileBasedFrontier extends BaseFrontier implements Frontier {
private static final Logger logger = LoggerFactory.getLogger(FileBasedFrontier.class);
final RandomAccessFile[] files;
private long[] readIndices;
private long[] writeIndices;
private final File queueDirectory;
private final double intervalSize;
private final WeightedRandomSelector selector;
private final boolean persist;
public FileBasedFrontier(File queueDirectory, MetricRegistry metrics, int numQueues, boolean persist)
throws IOException {
super(metrics);
intervalSize = 1.0 / numQueues;
this.queueDirectory = queueDirectory;
files = new RandomAccessFile[numQueues];
int width = Math.max((int) Math.log10(numQueues), 1);
queueDirectory.mkdirs();
for (int i = 0; i < numQueues; i++) {
files[i] = new RandomAccessFile(queueFile(queueDirectory, width, i + 1), "rw");
}
File positionsFile = positionsFile(queueDirectory);
if (persist && positionsFile.exists()) {
logger.info("Continuing queue from positions in {}", positionsFile.getAbsoluteFile());
try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(positionsFile))) {
for (int i = 0; i < numQueues; i++) {
readIndices[i] = is.readLong();
writeIndices[i] = is.readLong();
}
}
} else {
readIndices = new long[numQueues];
writeIndices = new long[numQueues];
}
selector = new WeightedRandomSelector(numQueues, 2);
this.persist = persist;
}
private File queueFile(File queueDirectory, int width, int priority) {
return new File(queueDirectory, String.format(Locale.ROOT, "%" + width + "f", priority * intervalSize));
}
private File positionsFile(File queueDirectory) {
return new File(queueDirectory, "positions");
}
@Override
protected void pushInternal(CrawlUrl url) {
try {
int queueIndex = (int) (url.getPriority() / intervalSize);
// treat 1.0 equal to .99...
if (queueIndex == files.length) {
queueIndex = files.length - 1;
}
files[queueIndex].seek(writeIndices[queueIndex]);
files[queueIndex].writeUTF(url.getUrl());
files[queueIndex].writeUTF(url.getPath());
files[queueIndex].writeFloat(url.getPriority());
writeIndices[queueIndex] = files[queueIndex].getFilePointer();
selector.enable(queueIndex);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
protected Optional<CrawlUrl> popInternal() {
if (allQueuesEmpty()) {
return Optional.empty();
}
try {
int queueIndex = selector.next();
if (queueIndex < 0) {
return Optional.empty();
}
files[queueIndex].seek(readIndices[queueIndex]);
String url = files[queueIndex].readUTF();
String path = files[queueIndex].readUTF();
float priority = files[queueIndex].readFloat();
readIndices[queueIndex] = files[queueIndex].getFilePointer();
if (isQueueEmpty(queueIndex)) {
selector.disable(queueIndex);
}
return Optional.of(new CrawlUrl(url, path, priority));
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
private boolean allQueuesEmpty() {
for (int i = 0; i < files.length; i++) {
if (!isQueueEmpty(i)) {
return false;
}
}
return true;
}
private boolean isQueueEmpty(int queueIndex) {
return readIndices[queueIndex] >= writeIndices[queueIndex];
}
@Override
public void close() throws IOException {
super.close();
synchronized (lock) {
for (RandomAccessFile file : files) {
file.close();
}
if (persist) {
File positionsFile = positionsFile(queueDirectory);
try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(positionsFile))) {
for (int i = 0; i < files.length; i++) {
os.writeLong(readIndices[i]);
os.writeLong(writeIndices[i]);
}
}
logger.info("Wrote current positions to {}", positionsFile.getAbsoluteFile());
} else {
for (File file : queueDirectory.listFiles()) {
file.delete();
}
queueDirectory.delete();
logger.info("Removed queue in {}", queueDirectory.getAbsoluteFile());
}
}
}
}
package de.l3s.gossen.crawler.frontier;
import java.io.Closeable;
import java.util.Collection;
import java.util.Optional;
import de.l3s.gossen.crawler.CrawlUrl;
public interface Frontier extends Closeable {
void push(Collection<CrawlUrl> urls);
Optional<CrawlUrl> pop();
}
package de.l3s.gossen.crawler.frontier;
import java.util.LinkedList;
import java.util.Optional;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
import de.l3s.gossen.crawler.CrawlUrl;
public class InMemoryFrontier extends BaseFrontier implements Frontier {
private final LinkedList<CrawlUrl> queue = new LinkedList<>();
public InMemoryFrontier(MetricRegistry metrics) {
super(metrics);
metrics.register(name(getClass(), "size"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
});
}
@Override
protected void pushInternal(CrawlUrl url) {
queue.offer(url);
}
@Override
protected Optional<CrawlUrl> popInternal() {
return Optional.ofNullable(queue.poll());
}
}
package de.l3s.gossen.crawler.frontier;
import java.util.Arrays;
import java.util.BitSet;
import java.util.concurrent.ThreadLocalRandom;
import com.google.common.annotations.VisibleForTesting;
/**
* Pick a number in [0, n) such that higher numbers are picked more frequently.
*
* The value i is picked with a probability proportional to x^i, where x is the
* specified base.
*/
public class WeightedRandomSelector {
private final double[] distribution;
private final BitSet enabled;
private double[] activeValues;
public WeightedRandomSelector(int choices, int base) {
distribution = createWeightedDistribution(choices, base);
enabled = new BitSet(choices);
}
/**
* Create the probability density function of an exponentially decreasing
* distribution.
* @param choices
* number of values
* @param base
* exponent base
*
* @return an array of length <tt>length</tt>
*/
private static double[] createWeightedDistribution(int choices, int base) {
// see http://mikestoolbox.com/powersum.html
double normalization = base == 1 ? base + 1 : (Math.pow(base, choices + 1) - 1) / (base - 1);
double[] ret = new double[choices];
for (int i = 0; i < choices; i++) {
ret[i] = Math.pow(base, choices - i) / normalization;
}
return ret;
}
public int next() {
if (activeValues == null) {
activeValues = fillActiveValues();
}
if (activeValues.length == 0) {
return -1;
}
double random = ThreadLocalRandom.current().nextDouble(activeValues[activeValues.length - 1]);
int selected = findPosition(activeValues, random);
return distribution.length - originalIndex(selected) - 1;
}
@VisibleForTesting
static int findPosition(double[] haystack, double needle) {
int pos = Arrays.binarySearch(haystack, needle);
return pos >= 0 ? pos : -(pos + 1);
}
private int originalIndex(int idx) {
for (int i = enabled.nextSetBit(0), pos = 0; i >= 0; i = enabled.nextSetBit(i + 1)) {
if (pos++ == idx) {
return i;
}
}
return -1;
}
private double[] fillActiveValues() {
int active = enabled.cardinality();
double[] values = new double[active];
int valIdx = 0;
for (int i = enabled.nextSetBit(0); i >= 0; i = enabled.nextSetBit(i+1)) {
values[valIdx] = distribution[i];
if (valIdx >= 1) {
values[valIdx] += values[valIdx-1];
}
valIdx++;
}
return values;
}
public void enable(int idx) {
set(idx, true);
}
public void disable(int idx) {
set(idx, false);
}
private void set(int idx, boolean value) {
int flippedIndex = distribution.length - idx - 1;
if (enabled.get(flippedIndex) != value) {
enabled.flip(flippedIndex);
activeValues = null;
}
}
}
/**
* Crawler queue / frontier management
*/
package de.l3s.gossen.crawler.frontier;
\ No newline at end of file
package de.l3s.gossen.crawler;
import java.util.Optional;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
public class TestUtils {
private TestUtils() {
throw new UnsupportedOperationException();
}
public static Matcher<Optional<?>> present() {
return new TypeSafeMatcher<Optional<?>>() {
@Override
public void describeTo(Description description) {
description.appendText("present value");
}
@Override
protected boolean matchesSafely(Optional<?> item) {
return item.isPresent();
}
};
}
}
package de.l3s.gossen.crawler.frontier;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Optional;
import org.junit.Test;
import com.codahale.metrics.MetricRegistry;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import de.l3s.gossen.crawler.CrawlUrl;
import de.l3s.gossen.crawler.TestUtils;
public class FileBasedFrontierTest {
@Test
public void testRoundtrip() throws IOException {
String URL = "http://example.org/";
File queueDirectory = Files.createTempDirectory("queue-").toFile();
try (FileBasedFrontier frontier = new FileBasedFrontier(queueDirectory, new MetricRegistry(), 2, false)) {
frontier.push(Collections.singleton(CrawlUrl.fromSeed(URL, 0.0f)));