这篇是上一篇ES-项目实战1-唐诗300首 的姊妹篇,从本地磁盘读取个人博客内容,转化格式后,基于 Java High-Level Restful API
导入到 ES 集群中,并做数据分析和可视化相关的内容。
数据预备阶段 准备测试环境 见上篇测试环境准备 。
数据导入及其准备 实验数据来自于本地个人博客的Markdown文件,文件格式如下:
1 2 3 4 5 6 7 --- title: ElasticSearch Snapshot date: 2020-04-16 16:43:42 tags: [Elasticsearch] --- Content
基于文件内容,我们的设置了如下建模结构:
字段名称
字段类型
备注说明
_id
对应自增id
contents
text & keyword
涉及分词,注意开启:fielddata:true
tags
text & keyword
title
text & keyword
timestamp
date
代表blog创建时间
date
text & keyword
文件创建时间(String)格式
cont_length
long
contents长度, 排序用
创建template 将设计好的Index Mapping 转换为 Templates。 并绑定了默认的 pipeline。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 PUT _template/blog_tpl { "index_patterns": [ "blog*" ], "settings": { "index.default_pipeline": "add_content_length", "number_of_replicas": 1, "refresh_interval": "30s" }, "mappings": { "properties": { "cont_length":{ "type":"long" }, "contents": { "type": "text", "fields": { "field": { "type": "keyword" } }, "analyzer": "ik_smart", "fielddata": true }, "timestamp": { "type": "date" }, "title": { "type": "text", "fields": { "field": { "type": "keyword" } }, "analyzer": "ik_smart" }, "tags": { "type": "text", "fields": { "field": { "type": "keyword" } }, "analyzer": "ik_smart" }, "date" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } }
创建Ingest Pipeline Pipeline 主要实现两个功能
将 String 类型的博客创建日期转为 Date类型
统计博客正文长度,并添加到字段 cont_length 中
由于Blog 的 date 字段,没有在个位数前补0,我们在 timestamp 转化处,同时声明了多个 format,来支持多种不同的日期格式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 PUT _ingest/pipeline/add_content_length { "description": "Add parameter the length of content" , "processors": [ { "date" : { "field" : "date" , "target_field" : "timestamp" , "formats" : ["yyyy-MM-dd HH:mm:ss" ,"yyyy-MM-dd H:mm:ss" ,"yyyy-M-dd HH:mm:ss" ,"yyyy-M-dd H:mm:ss" ,"yyyy-M-d HH:mm:ss" ,"yyyy-M-d H:mm:ss" ,"yyyy-MM-d HH:mm:ss" ,"yyyy-MM-d H:mm:ss" ] } }, { "script": { "source": "ctx.cont_length = ctx.contents.length();" } } ] }
批量导入数据 代码主要分为以下几个部分:
获取 Blog 文件夹下所有 Markdown 文件的路径
读取 Markdown 文件内容,将其转为 String[]
基于 String 数组,转为 Json并由如下几个字段组成(date,content,title,tags)
将 Json数组通过 BulkAPI 发往 ES
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 package ES;import org.apache.http.HttpHost;import org.elasticsearch.action.bulk.BulkItemResponse;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.json.JSONObject;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.util.ArrayList;import java.util.LinkedList;public class CovertMdToJson { public static String newLineChar = "\n" ; public static void main (String[] args) { CovertMdToJson covertMdToJson = new CovertMdToJson (); covertMdToJson.covertMarkdownToJson(); } public void covertMarkdownToJson () { String blogPwd = "/Users/ligaofeng/blog/source/_posts/" ; ArrayList<String> fileList = getAllAvailableFilePath(blogPwd); LinkedList<JSONObject> blogJsonList = new LinkedList <>(); fileList.stream().forEach(i -> { blogJsonList.add(coverStrToJson(readToString(i))); }); bulkIndexDataToES(blogJsonList); System.out.println("Done!" ); } private void bulkIndexDataToES (LinkedList<JSONObject> blogJsonList) { RestHighLevelClient client = new RestHighLevelClient ( RestClient.builder( new HttpHost ("localhost" , 9200 , "http" ))); try { BulkRequest request = new BulkRequest (); blogJsonList.stream().forEach(i->{ request.add(new IndexRequest ("blogs" ) .source(i.toString().trim(),XContentType.JSON)); }); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println("Bulk Request failed on id " +failure.getCause()); } } }catch (Exception e){ e.printStackTrace(); }finally { try { client.close(); } catch (IOException e) { System.out.println("Error: Can't close es connection" ); e.printStackTrace(); } } } public static ArrayList<String> getAllAvailableFilePath (String blogPwd) { ArrayList<String> filePathList = new ArrayList <String>(); File file = new File (blogPwd); if (file.isDirectory()) { File[] tempList = file.listFiles(); for (int i = 0 ; i < tempList.length; i++) { if (tempList[i].getName().startsWith("." )) { continue ; } if (tempList[i].isFile() && tempList[i].getName().endsWith("md" )) { filePathList.add(tempList[i].toString()); } if (tempList[i].isDirectory()) { filePathList.addAll(getAllAvailableFilePath(blogPwd + "/" + tempList[i].getName())); } } } else if (file.isFile() && file.getName().endsWith("md" )) { filePathList.add(file.toString()); } return filePathList; } public static String[] readToString(String filePath) { File file = new File (filePath); Long fileLength = file.length(); byte [] fileContent = new byte [fileLength.intValue()]; try { FileInputStream in = new FileInputStream (file); in.read(fileContent); in.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } String[] fileContentArr = new String (fileContent).split(newLineChar); return fileContentArr; } public JSONObject coverStrToJson (String[] markdownContent) { JSONObject currBlogCont = new JSONObject (); boolean isHeader = true ; int headerLineCount = 0 ; StringBuffer blogContent = new StringBuffer (); for (int i = 0 ; i < markdownContent.length; i++) { String j = markdownContent[i].trim(); if (j.length()<1 ){ continue ; } if (isHeader){ if (markdownContent[i].equals("---" ) ){ headerLineCount++; if (headerLineCount==2 ){ isHeader=false ; } continue ; } switch (j.substring(0 ,j.indexOf(':' ))){ case "title" : currBlogCont.put("title" , j.substring(j.indexOf(":" )+1 ).trim()); break ; case "tags" : currBlogCont.put("tags" , j.substring(j.indexOf(":" )+1 ).trim()); break ; case "date" : currBlogCont.put("date" , j.substring(j.indexOf(":" )+1 ).trim()); break ; default : continue ; } }else { blogContent.append(j+"\n" ); } } currBlogCont.put("contents" , blogContent.toString()); return currBlogCont; } }
数据可视化阶段 创建 Index-Pattern 以 timestamp
字段作为日期创建 index-pattern.
Note: 在创建 template Mapping环节,我们为 date的 keyword 类型命名为 keyword,而其他字段的 keyword 类型则为 field。因此在Fields 中,存在 date.keyword 和 contents.field这两种不同的 Keyword 类型,本质上是在 Mapping multiple fields 中命名不同产生的。
制作 Visualization Golang 好久没碰了,居然之前写了这么多 Blog。
Mark 果然不少,打了好多 TODO没写。
有几个 Tags 黏在一起了,IK-Smart 分词器没有分开吗?可以基于 analyze API去解决下这个问题。
每月博客数量折线图 18年底-19年初是相当高产的时间段:正处在离职找工作的状态,博客更新很频繁。
19年初-20年中更新频率一般,工作繁忙,比较投入。