ES-MongoDB学习2_mongodb-river-elasticsearch源码解析

来源:转载


MongoDBRiverPlugin类是插件注册类,它继承自AbstractPlugin,其功能是

1.      在RiverModule中注册一个MongoDBRiver

2.      在RestModule中注册一个RestMongoDBRiverAction

[java] view plaincopy
  1. package org.elasticsearch.plugin.river.mongodb;  
  2. import org.elasticsearch.plugins.AbstractPlugin;  
  3. import org.elasticsearch.rest.RestModule;  
  4. import org.elasticsearch.rest.action.mongodb.RestMongoDBRiverAction;  
  5. import org.elasticsearch.river.RiversModule;  
  6. import org.elasticsearch.river.mongodb.MongoDBRiver;  
  7. import org.elasticsearch.river.mongodb.MongoDBRiverModule;  
  8. /** 
  9. * @author flaper87 (Flavio Percoco Premoli) 
  10. * @author aparo (Alberto Paro) 
  11. * @author kryptt (Rodolfo Hansen) 
  12. */  
  13. public class MongoDBRiverPlugin extends AbstractPlugin {  
  14. @Override  
  15. public String name() {  
  16. return MongoDBRiver.NAME;  
  17. }  
  18. @Override  
  19. public String description() {  
  20. return MongoDBRiver.DESCRIPTION;  
  21. }  
  22. /** 
  23. * Register the MongoDB river to Elasticsearch node 
  24. * @param module 
  25. */  
  26. public void onModule(RiversModule module) {  
  27. module.registerRiver(MongoDBRiver.TYPE, MongoDBRiverModule.class);  
  28. }  
  29. /** 
  30. * Register the REST move to Elasticsearch node 
  31. * @param module 
  32. */  
  33. public void onModule(RestModule module) {  
  34. module.addRestAction(RestMongoDBRiverAction.class);  
  35. }  
  36. }  


MongoDBRiver

首先看river部分 org.elasticsearch.river.mongodb.MongoDBRiver是核心类,构造函数中都是都是elasticsearch 的配置信息和服务

参数类型
参数名称含义取值RiverNameriverName名称 RiverSettingssettings设置信息 StringriverIndexName索引名 Clientclient客户端 ScriptServicescriptService 脚本服务 MongoDBRiverDefinitiondefinition解析后的定义

MongoDBRiverDefinition.parseSettings(riverName.name()

,riverIndexName, settings, scriptService);

还有一个参数stream表示操作流,用来存储需要放在mongo oplog中的数据队列

[java] view plaincopy
  1. BlockingQueue<QueueEntry> stream =   
  2. definition.getThrottleSize() == -1 ?   
  3. new LinkedTransferQueue<QueueEntry>()   
  4. : new ArrayBlockingQueue<QueueEntry>(definition.getThrottleSize());  

可以看到,如果definition中设定的阈值大小没有设定的话,使用一个链表数据结构作为队列,否则使用一个数组队列。不过两种情况使用的数据结构都是多线程使用的数据结构BlockingQueue阻塞队列。阻塞队列是用在“生产者-消费者”模式的主要数据结构,其作用是如果队列空,则消费者阻塞;如果队列满,则生产者阻塞。而且队列支持多个生产者和消费者线程。其中QueueEntry定义如下,其中Operation是一个枚举,包含了各种mongodb操作:INSERT,UPDATE, DELETE, DROP_COLLECTION, DROP_DATABASE, COMMAND, UNKNOWN;

[java] view plaincopy
  1. protected static class QueueEntry {  
  2. private final DBObject data;  
  3. private final Operation operation;  
  4. private final Timestamp<?> oplogTimestamp;  
  5. private final String collection;  
  6. public QueueEntry(DBObject data, String collection) {  
  7. this(null, Operation.INSERT, data, collection);  
  8. }  
  9. public QueueEntry(Timestamp<?> oplogTimestamp, Operation oplogOperation, DBObject data, String collection) {  
  10. this.data = data;  
  11. this.operation = oplogOperation;  
  12. this.oplogTimestamp = oplogTimestamp;  
  13. this.collection = collection;  
  14. }  
  15. public boolean isOplogEntry() {  
  16. return oplogTimestamp != null;  
  17. }  
  18. public boolean isAttachment() {  
  19. return (data instanceof GridFSDBFile);  
  20. }  
  21. public DBObject getData() {  
  22. return data;  
  23. }  
  24. public Operation getOperation() {  
  25. return operation;  
  26. }  
  27. public Timestamp<?> getOplogTimestamp() {  
  28. return oplogTimestamp;  
  29. }  
  30. public String getCollection() {  
  31. return collection;  
  32. }  
  33. }  
  34. }  

最后MongoDBRiver构造函数里面还有一个全局参数SharedContext context,这个参数包含了这个队列的引用,并且包含了整体运行状态的一个上下文状态:UNKNOWN, START_FAILED, RUNNING, STOPPED, IMPORT_FAILED,INITIAL_IMPORT_FAILED, SCRIPT_IMPORT_FAILED, RIVER_STALE;

[java] view plaincopy
  1. this.context = new SharedContext(stream, Status.STOPPED);  
初始化之后就可以,elasticsearch将通过start方法启动这个插件,启动逻辑如下:

* 首先是各种状态的检查:

     1、 用client获取elastic的状态,转成Status

[java] view plaincopy
  1. client.prepareGet("_river", "mongodb-river", "_riverstatus").get()  
  2. XContentMapValues.extractValue("mongodb.status")  
    2、如果状态是IMPORT_FAILED、INITIAL_IMPORT_FAILED、SCRIPT_IMPORT_FAILED Status.START_FAILED 或者 STOPPED;表示有问题,直接打印日志并返回
    

    3、 如果没有问题,则使用方法设定river为启动状态:

[java] view plaincopy
  1. MongoDBRiverHelper.setRiverStatus(client, riverName.getName(), Status.RUNNING);  
  2. context.setStatus(Status.RUNNING);  

    4、如果不存在索引则建立之

[java] view plaincopy
  1. // Create the index if it does not exist  
  2. client.admin().indices().prepareCreate(definition.getIndexName()).get();  

   5、 如果是GridFS要做一些额外的索引工作

[java] view plaincopy
  1. client.admin().indices().preparePutMapping(definition.getIndexName()).setType(definition.getTypeName()).setSource(getGridFSMapping()).get();  
    6、 然后我们开始启动相关的线程:

如果是mongos,就启动多个OpLog处理线程,否则使用一个线程,创建方式如下:

[java] view plaincopy
  1. EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_slurper").newThread(  
  2.     new Slurper(definition.getMongoServers(), definition, context, client));  
   7、启动之后再启动Indexer进程[java] view plaincopy
  1. EsExecutors.daemonThreadFactory(settings.globalSettings(),"mongodb_river_indexer").newThread(new Indexer(this, definition, context, client, scriptService));  
   8、最后再启动一个状态监测进程:

[java] view plaincopy
  1. EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_status").newThread(new StatusChecker(this, definition, context));  

*  所以代码的核心就是三个线程:

收割 new Slurper(definition.getMongoServers(), definition, context,client)

索引处理 new Indexer(this, definition, context, client, scriptService)

状态检查 new StatusChecker(this, definition, context)

可以看到共同的参数都是:一个definition包含所有的配置,context包含了操作队列和状态


Slurper收割线程

其逻辑是:

1、  如果driver的状态是Running,则查找OpLog的信息并放入stream队列中

 

2、 如果无法获取oplogCollection队列,则退出线程failed to assign oplogCollection orslurpedCollection

3、  增量处理是按照上次注入时间点为查询条件的

[java] view plaincopy
  1. cursor = oplogCursor(startTimestamp);  
  2. if (cursor == null) {  
  3.     cursor = processFullOplog();  
  4. }  

查询条件是

[java] view plaincopy
  1. filter.put(MongoDBRiver.OPLOG_TIMESTAMP,new BasicDBObject(QueryOperators.GTE, time));  
  2.   
  3. ts > time  

4、获得数据库指针之后,处理每一个OpLog的数据

[java] view plaincopy
  1. while (cursor.hasNext()) {  
  2.     DBObject item = cursor.next();  
  3.     startTimestamp = processOplogEntry(item, startTimestamp);  
  4. }  

处理这些数据最后就是调用 addToStream 或 addInsertToStream 加入stream中

 

初始化导入

上面的过程只适合于从当前时间开始的数据,如果需要把原来的数据导入的话,还需要做一个initialimport

当程序配置满足一下条件的时候,才会在第一次运行该线程的时候进行初始化导入:

[java] view plaincopy
  1. SkipInitialImport == false  
  2. InitialTimestamp == null // initial timestamp 为空  
  3. MongoDBRiver.getIndexCount(client, definition) == 0 // 没有index过  
  4. MongoDBRiver.getLastTimestamp(client, definition) == null;  
  5. Get the latest timestamp for a given namespace.  

满足这些条件之后才会进行数据的初始化导入:初始化导入会查看一下设置,如果是ImportAllCollections,则检查每一个collection并注入否则,找出设定的collection并注入

核心代码是这样的:

[java] view plaincopy
  1. if (!definition.isSkipInitialImport()) {  
  2.                     if (!riverHasIndexedFromOplog() && definition.getInitialTimestamp() == null) {  
  3.                         if (!isIndexEmpty()) {  
  4.                             MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);  
  5.                             break;  
  6.                         }  
  7.                         if (definition.isImportAllCollections()) {  
  8.                             for (String name : slurpedDb.getCollectionNames()) {  
  9.                                 DBCollection collection = slurpedDb.getCollection(name);  
  10.                                 startTimestamp = doInitialImport(collection);  
  11.                             }  
  12.                         } else {  
  13.                             DBCollection collection = slurpedDb.getCollection(definition.getMongoCollection());  
  14.                             startTimestamp = doInitialImport(collection);  
  15.                         }  
  16.                     }  
  17.                 } else {  
  18.                     logger.info("Skip initial import from collection {}", definition.getMongoCollection());  
  19.                 }  

[java] view plaincopy
  1.  /** 
  2. * Does an initial sync the same way MongoDB does. 
  3. * https://groups.google.com/ 
  4. * forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns 
  5. * @return the last oplog timestamp before the import began 
  6. * @throws InterruptedException 
  7. * if the blocking queue stream is interrupted while waiting 
  8. */  
  9. protected Timestamp<?> doInitialImport(DBCollection collection) throws InterruptedException {  
  10. // TODO: ensure the index type is empty  
  11. // DBCollection slurpedCollection =  
  12. // slurpedDb.getCollection(definition.getMongoCollection());  
  13. logger.info("MongoDBRiver is beginning initial import of " + collection.getFullName());  
  14. Timestamp<?> startTimestamp = getCurrentOplogTimestamp();  
  15. boolean inProgress = true;  
  16. String lastId = null;  
  17. while (inProgress) {  
  18. DBCursor cursor = null;  
  19. try {  
  20. if (definition.isDisableIndexRefresh()) {  
  21. updateIndexRefresh(definition.getIndexName(), -1L);  
  22. }  
  23. if (!definition.isMongoGridFS()) {  
  24. logger.info("Collection {} - count: {}", collection.getName(), collection.count());  
  25. long count = 0;  
  26. cursor = collection.find(getFilterForInitialImport(definition.getMongoCollectionFilter(), lastId));  
  27. while (cursor.hasNext()) {  
  28. DBObject object = cursor.next();  
  29. count++;  
  30. if (cursor.hasNext()) {  
  31. lastId = addInsertToStream(null, applyFieldFilter(object), collection.getName());  
  32. } else {  
  33. logger.debug("Last entry for initial import - add timestamp: {}", startTimestamp);  
  34. lastId = addInsertToStream(startTimestamp, applyFieldFilter(object), collection.getName());  
  35. }  
  36. }  
  37. inProgress = false;  
  38. logger.info("Number documents indexed: {}", count);  
  39. } else {  
  40. // TODO: To be optimized.  
  41. // https://github.com/mongodb/mongo-java-driver/pull/48#issuecomment-25241988  
  42. // possible option: Get the object id list from .fs  
  43. // collection  
  44. // then call GriDFS.findOne  
  45. GridFS grid = new GridFS(mongo.getDB(definition.getMongoDb()), definition.getMongoCollection());  
  46. cursor = grid.getFileList();  
  47. while (cursor.hasNext()) {  
  48. DBObject object = cursor.next();  
  49. if (object instanceof GridFSDBFile) {  
  50. GridFSDBFile file = grid.findOne(new ObjectId(object.get(MongoDBRiver.MONGODB_ID_FIELD).toString()));  
  51. if (cursor.hasNext()) {  
  52. lastId = addInsertToStream(null, file);  
  53. } else {  
  54. logger.debug("Last entry for initial import - add timestamp: {}", startTimestamp);  
  55. lastId = addInsertToStream(startTimestamp, file);  
  56. }  
  57. }  
  58. }  
  59. inProgress = false;  
  60. }  
  61. } catch (MongoException.CursorNotFound e) {  
  62. logger.info("Initial import - Cursor {} has been closed. About to open a new cusor.", cursor.getCursorId());  
  63. logger.debug("Total document inserted [{}]", totalDocuments.get());  
  64. } finally {  
  65. if (cursor != null) {  
  66. logger.trace("Closing initial import cursor");  
  67. cursor.close();  
  68. }  
  69. if (definition.isDisableIndexRefresh()) {  
  70. updateIndexRefresh(definition.getIndexName(), TimeValue.timeValueSeconds(1));  
  71. }  
  72. }  
  73. }  
  74. return startTimestamp;  
  75. }  
  76. private BasicDBObject getFilterForInitialImport(BasicDBObject filter, String id) {  
  77. if (id == null) {  
  78. return filter;  
  79. } else {  
  80. BasicDBObject filterId = new BasicDBObject(MongoDBRiver.MONGODB_ID_FIELD, new BasicBSONObject(QueryOperators.GT, id));  
  81. if (filter == null) {  
  82. return filterId;  
  83. } else {  
  84. List<BasicDBObject> values = ImmutableList.of(filter, filterId);  
  85. return new BasicDBObject(QueryOperators.AND, values);  
  86.   
  87. }  
  88. }  

 Indexer线程

其逻辑是:

1、如果driver的状态是Running,则从stream队列中获取信息并放入Index中

在构造函数初始化的时候会做一些MongoDBRiverBulkProcessor的创建 build:

[java] view plaincopy
  1. SimpleEntry<String, String> entry = new SimpleEntry<String, String>(index, type);  
  2.       if (!processors.containsKey(entry)) {  
  3.           processors.put(new SimpleEntry<String, String>(index, type), new MongoDBRiverBulkProcessor.Builder(river, definition, client,  
  4.                   index, type).build());  
  5.       }  
  6.       return processors.get(entry);  

然后在业务逻辑中读取entry,并processBlockingQueue processBlockingQueue就是根据不同的业务的内容做不同的处理,就是对不同的操作用相关的es api加以处理。

[java] view plaincopy
  1. // 1. Attempt to fill as much of the bulk request as possible  
  2.                QueueEntry entry = context.getStream().take();  
  3.                lastTimestamp = processBlockingQueue(entry);  
  4.                while ((entry = context.getStream().poll(definition.getBulk().getFlushInterval().millis(), MILLISECONDS)) != null) {  
  5.                    lastTimestamp = processBlockingQueue(entry);  
  6.                }  
  7.   
  8.                // 2. Update the timestamp  
  9.                if (lastTimestamp != null) {  
  10.                    MongoDBRiver.setLastTimestamp(definition, lastTimestamp,  
  11.                            getBulkProcessor(definition.getIndexName(), definition.getTypeName()).getBulkProcessor());  
  12.                }  

StatusChecker

状态检查就是更具用户的命令进行开/关

 

就是检查elastic中的最新状态【用户设定的状态】:MongoDBRiverHelper.getRiverStatus(client, riverName);

如果状态和当前状态不一致,就进行driver的start或stop

 

用一个流程图来解释这几个线程之间的关系就是这样的:


RestModule

注册这个模块的作用是在原来es支持的rest api基础上,增加针对mongodb的新的api类型,具体实现可以参考一下这篇文章,这里不再赘述了:

http://elasticsearchserverbook.com/creating-custom-elasticsearch-rest-action/


原文来自:http://blog.csdn.net/luoluowushengmimi/article/details/38727097



分享给朋友:
您可能感兴趣的文章:
随机阅读: