Skip to content

Commit

Permalink
CouchDB River: Allow to define a javascript that can munge the change…
Browse files Browse the repository at this point in the history
…s stream, closes elastic#431.
  • Loading branch information
kimchy committed Oct 14, 2010
1 parent 81fd17b commit ed9d9aa
Showing 1 changed file with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -36,6 +37,8 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.*;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;

import java.io.*;
import java.net.HttpURLConnection;
Expand Down Expand Up @@ -68,13 +71,16 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
private final int bulkSize;
private final TimeValue bulkTimeout;

private final ExecutableScript script;
private final Map<String, Object> scriptParams = Maps.newHashMap();

private volatile Thread slurperThread;
private volatile Thread indexerThread;
private volatile boolean closed;

private final TransferQueue<String> stream = new LinkedTransferQueue<String>();

@Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client) {
@Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) {
super(riverName, settings);
this.riverIndexName = riverIndexName;
this.client = client;
Expand Down Expand Up @@ -106,13 +112,20 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
} else {
basicAuth = null;
}

if (couchSettings.containsKey("script")) {
script = scriptService.executable("js", couchSettings.get("script").toString(), Maps.newHashMap());
} else {
script = null;
}
} else {
couchHost = "localhost";
couchPort = 5984;
couchDb = "db";
couchFilter = null;
couchFilterParamsUrl = null;
basicAuth = null;
script = null;
}

if (settings.settings().containsKey("index")) {
Expand Down Expand Up @@ -166,30 +179,42 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
}

private String processLine(String s, BulkRequestBuilder bulk) {
Map<String, Object> map = null;
Map<String, Object> ctx;
try {
map = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose();
ctx = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose();
} catch (IOException e) {
logger.warn("failed to parse {}", e, s);
return null;
}
if (map.containsKey("error")) {
if (ctx.containsKey("error")) {
logger.warn("received error {}", s);
return null;
}
String seq = map.get("seq").toString();
String id = map.get("id").toString();
String seq = ctx.get("seq").toString();
String id = ctx.get("id").toString();

// Ignore design documents
if (id.startsWith("_design/")) {
logger.trace("ignoring design document {}", id);
return seq;
}

if (map.containsKey("deleted") && map.get("deleted").equals(Boolean.TRUE)) {
if (script != null) {
scriptParams.put("ctx", ctx);
try {
script.run(scriptParams);
} catch (Exception e) {
logger.warn("failed to script process {}, ignoring", e, ctx);
return seq;
}
}

if (ctx.containsKey("ignore") && ctx.get("ignore").equals(Boolean.TRUE)) {
// ignore dock
} else if (ctx.containsKey("deleted") && ctx.get("deleted").equals(Boolean.TRUE)) {
bulk.add(deleteRequest(indexName).type(typeName).id(id));
} else if (map.containsKey("doc")) {
Map<String, Object> doc = (Map<String, Object>) map.get("doc");
} else if (ctx.containsKey("doc")) {
Map<String, Object> doc = (Map<String, Object>) ctx.get("doc");
bulk.add(indexRequest(indexName).type(typeName).id(id).source(doc));
} else {
logger.warn("ignoring unknown change {}", s);
Expand Down

0 comments on commit ed9d9aa

Please sign in to comment.