ElasticSearch大批量数据入库

来源:转载


最近着手处理大批量数据的任务。

现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。

数据分布情况

13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)

 

名称 大小(b)gather_1_2014-02-27-14-50-0.txt 568497gather_1_2014-02-27-14-50-1.txt 568665gather_1_2014-02-27-14-50-2.txt 568172gather_1_2014-02-27-14-50-3.txt 568275

 

 

同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.

每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。

针对上述情况需要对目前的储存结构进行优化。

才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。

测试平台

 

cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz * 32内存: 64G硬盘:1.5T操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)

读取文件的目录结构:

 

[[email protected] data]$ ls0 1 2 3

 简单测试代码:

 

 

public class FileReader{ private File file; private String splitCharactor; private Map<String, Class<?>> colNames; private static final Logger LOG = Logger.getLogger(FileReader.class); /** * @param path * 文件路径 * @param fileName * 文件名 * @param splitCharactor * 拆分字符 * @param colNames * 主键名称 */ public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames) { this.file = file; this.splitCharactor = splitCharactor; this.colNames = colNames; } /** * 读取文件 * * @return * @throws Exception */ public List<Map<String, Object>> readFile() throws Exception { List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); if (!file.isFile()) { throw new Exception("File not exists." + file.getName()); } LineIterator lineIterator = null; try { lineIterator = FileUtils.lineIterator(file, "UTF-8"); while (lineIterator.hasNext()) { String line = lineIterator.next(); String[] values = line.split(splitCharactor); if (colNames.size() != values.length) { continue; } Map<String, Object> map = new HashMap<String, Object>(); Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet() .iterator(); int count = 0; while (iterator.hasNext()) { Entry<String, Class<?>> entry = iterator.next(); Object value = values[count]; if (!String.class.equals(entry.getValue())) { value = entry.getValue().getMethod("valueOf", String.class) .invoke(null, value); } map.put(entry.getKey(), value); count++; } list.add(map); } } catch (IOException e) { LOG.error("File reading line error." + e.toString(), e); } finally { LineIterator.closeQuietly(lineIterator); } return list; }}

 

public class StreamIntoEs{ public static class ChildThread extends Thread { int number; public ChildThread(int number) { this.number = number; } @Override public void run() { Settings settings = ImmutableSettings.settingsBuilder() .put("client.transport.sniff", true) .put("client.transport.ping_timeout", 100) .put("cluster.name", "elasticsearch").build(); TransportClient client = new TransportClient(settings) .addTransportAddress(new InetSocketTransportAddress("192.168.32.228", 9300)); File dir = new File("/export/home/es/data/" + number); LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>(); colNames.put("aa", Long.class); colNames.put("bb", String.class); colNames.put("cc", String.class); colNames.put("dd", Integer.class); colNames.put("ee", Long.class); colNames.put("ff", Long.class); colNames.put("hh", Long.class); int count = 0; long startTime = System.currentTimeMillis(); for (File file : dir.listFiles()) { int currentCount = 0; long startCurrentTime = System.currentTimeMillis(); FileReader reader = new FileReader(file, "//$", colNames); BulkResponse resp = null; <strong>BulkRequestBuilder bulkRequest = client.prepareBulk();</strong> try { List<Map<String, Object>> results = reader.readFile(); for (Map<String, Object> col : results) { bulkRequest.add(client.prepareIndex("flux", "fluxdata") .setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime"))); count++; currentCount++; } resp = bulkRequest.execute().actionGet(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } long endCurrentTime = System.currentTimeMillis(); System.out.println("[thread-" + number + "-]per count:" + currentCount); System.out.println("[thread-" + number + "-]per time:" + (endCurrentTime - startCurrentTime)); System.out.println("[thread-" + number + "-]per count/s:" + (float) currentCount / (endCurrentTime - startCurrentTime) * 1000); System.out.println("[thread-" + number + "-]per count/s:" + resp.toString()); } long endTime = System.currentTimeMillis(); System.out.println("[thread-" + number + "-]total count:" + count); System.out.println("[thread-" + number + "-]total time:" + (endTime - startTime)); System.out.println("[thread-" + number + "-]total count/s:" + (float) count / (endTime - startTime) * 1000); // IndexRequest request = // = client.index(request); } } public static void main(String args[]) { for (int i = 0; i < 4; i++) { ChildThread childThread = new ChildThread(i); childThread.start(); } }}

 起了4个线程来做入库,每个文件解析完成进行一次批处理。

 

初始化脚本:

 

curl -XDELETE 'http://192.168.32.228:9200/twitter/'

 

curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '{ "index" :{ "number_of_shards" : 5, "number_of_replicas ": 0, <strong>"index.refresh_interval": "-1", "index.translog.flush_threshold_ops": "100000"</strong> }}'

 

curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '{ "<span style="font-size: 1em; line-height: 1.5;">twiterdata</span><span style="font-size: 1em; line-height: 1.5;">": {</span> "aa" : {"type" : "long", "index" : "not_analyzed"}, "bb" : {"type" : "String", "index" : "not_analyzed"}, "cc" : {"type" : "String", "index" : "not_analyzed"}, "dd" : {"type" : "integer", "index" : "not_analyzed"}, "ee" : {"type" : "long", "index" : "no"}, "ff" : {"type" : "long", "index" : "no"}, "gg" : {"type" : "long", "index" : "no"}, "hh" : {"type" : "long", "index" : "no"}, "ii" : {"type" : "long", "index" : "no"}, "jj" : {"type" : "long", "index" : "no"}, "kk" : {"type" : "long", "index" : "no"}, }}

 执行效率参考:

 

不开启refresh_interval[[email protected] bin]$ more StreamIntoEs.out|grep total[thread-2-]total count:1199411[thread-2-]total time:1223718[thread-2-]total count/s:980.1368[thread-1-]total count:1447214[thread-1-]total time:1393528[thread-1-]total count/s:1038.5253[thread-0-]total count:1508043[thread-0-]total time:1430167[thread-0-]total count/s:1054.4524[thread-3-]total count:1650576[thread-3-]total time:1471103[thread-3-]total count/s:1121.99894195.1134开启refresh_interval[[email protected] bin]$ more StreamIntoEs.out |grep total[thread-2-]total count:1199411[thread-2-]total time:996111[thread-2-]total count/s:1204.0938[thread-1-]total count:1447214[thread-1-]total time:1163207[thread-1-]total count/s:1244.1586[thread-0-]total count:1508043[thread-0-]total time:1202682[thread-0-]total count/s:1253.9[thread-3-]total count:1650576[thread-3-]total time:1236239[thread-3-]total count/s:1335.15935037.3117开启refresh_interval 字段类型转换[[email protected] bin]$ more StreamIntoEs.out |grep total[thread-2-]total count:1199411[thread-2-]total time:1065229[thread-2-]total count/s:1125.9653[thread-1-]total count:1447214[thread-1-]total time:1218342[thread-1-]total count/s:1187.8552[thread-0-]total count:1508043[thread-0-]total time:1230474[thread-0-]total count/s:1225.5789[thread-3-]total count:1650576[thread-3-]total time:1274027[thread-3-]total count/s:1295.55814834.9575开启refresh_interval 字段类型转换 设置id[thread-2-]total count:1199411[thread-2-]total time:912251[thread-2-]total count/s:1314.7817[thread-1-]total count:1447214[thread-1-]total time:1067117[thread-1-]total count/s:1356.1906[thread-0-]total count:1508043[thread-0-]total time:1090577[thread-0-]total count/s:1382.7937[thread-3-]total count:1650576[thread-3-]total time:1128490[thread-3-]total count/s:1462.64125516.4072

 580M的数据平均用时大概是20分钟。索引文件大约为1.76G

 

 相关测试结果可以参考这里:

 elasticsearch 性能测试

 

 



分享给朋友:
您可能感兴趣的文章:
随机阅读: