ELK生态:Logstash增量读取txt文件数据,导入到Elasticsearch

释放双眼,带上耳机,听听看~!

简介
1. ELK生态之Logstash通过sql导入地理坐标GeoPoint数据到Elasticserch;

2. 数据源:mysql数据表,数据表含地理坐标——lon(经度),lat(纬度);

3. Elasticsearch和Logstash版本:5.6.1;

4. GeoPoint地理位置信息可以使用三种数据格式表示:

(1) 以半角逗号分割的字符串形式表示——”lat,lon”,纬度在前,经度在后;

(2) 明确以 lat 和 lon 作为属性的对象,自定义前后;

(3) 以数组形式表示——[lon,lat],经度在前,纬度在后;

5. 这里logstash导入es地理坐标的数据格式为:对象格式;

6. 其余格式参考:

数组格式:ELK生态:Logstash通过sql导入地理坐标到ES,数据格式为数组;

字符串格式:ELK生态:Logstash通过sql导入地理坐标到ES,数据格式为字符串;

实践
location.sql文件内容:
SELECT
id,
addr,
lat,
lon,
postTime

FROM
location_tab
WHERE
postTime>:sql_last_value
logstash.conf配置:
#1.连接数据库,数据输入阶段
input {
stdin {
}
jdbc {
jdbc_driver_library => “../lib/mysql-connector-java-5.1.44.jar”
jdbc_driver_class => “com.mysql.jdbc.Driver”
jdbc_connection_string=>”jdbc:mysql://127.0.0.1:3306/master”
jdbc_user => “root”
jdbc_password => “root”
jdbc_paging_enabled => “false”
jdbc_page_size => “1000”
lowercase_column_names => “false”
statement_filepath => “/data/es/logstash-5.6.1/config/sql/location.sql”
schedule => “* * * * *”
clean_run => false
record_last_run => false
last_run_metadata_path => “/data/es/logstash-5.6.1/data/location.lastrun”
use_column_value => true
tracking_column => postTime
type=>”location”
}

}

#2.过滤格式化数据阶段
filter {
json {
source => “message”
remove_field => [“message”]
}

#将sql里的两个坐标字段指定数据格式
mutate {
convert => { “lon” => “float” }
convert => { “lat” => “float” }
}

#将两个坐标字段合并成一个字段
mutate {
rename => {
“lon” => “[location][lon]”
“lat” => “[location][lat]”
}
}
}

#3.数据输出到ES阶段
output {
#控制台输出
stdout {
codec => json_lines
}

#插入到es
if[type] == “location”{
elasticsearch {
hosts => “127.0.0.1:9200”
index => “location_index”
document_type => “location_index”
#document_id => “%{id}”
#flush_size => 1000
#idle_flush_time => 15

#覆盖模板,不需要可注释掉,通用模板下载:https://download.csdn.net/download/alan_liuyue/11241484
#template_overwrite=>true
#template=>”/data/es/logstash-5.6.1/template/logstash.json”
}
}

}
注意事项
1. 导入地理坐标数据需要指定字段location数据格式为geo_point,指定的方法有多种,这里讲两种,有兴趣的程序员可继续深入挖掘;

(1)利用template模板指定location字段为地理坐标类型(geo_point);

(2)直接在kibana控制台指定location坐标为地理坐标类型(geo_point);

PUT location_index/location_index/_mapping
{
“properties”: {
“location”:{
“type”: “geo_point”
}
}
2. 在java中使用SpringData接入Elasticsearch,创建ElasticsearchRepository操作数据时,需要将实体类的地理坐标字段设置成以下格式:

/**
* 坐标位置
*/
@GeoPointField
private String location;

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}
3. logstash.conf文件配置的数据输入阶段input,连接jdbc数据库,增量读取数据表数据的操作详情,可参考博主另外一篇博客,这里不再详述:ELK生态:linux系统安装和配置logstash数据导入工具

 

总结
实践是检验认识真理性的唯一标准,自己动手,丰衣足食~

文章技巧

中国疫情地图和全球疫情地图组件,一句话调用

2020-3-30 20:14:33

文章技巧

coding 文件网盘:无限空间高速直链下载

2020-4-6 11:45:15

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
有新消息 消息中心
搜索