1
justest123 2023-10-10 13:53:15 +08:00
如果决定使用 kafka -> logstash -> elasticsearch 的方案,结合我以前的经验,大概率是可以在 logstash 这一环节补充账号对应的部门信息的(最近几年没怎么实际用过 logstash 了,不敢打保票)。
先回答你的两种方式: 第一种,多个 input 同时读取,这种是不可行的,对多个 input 来说,它们采集到的数据是相互独立的,没有办法结合。 第二种,es 应该要新增部门字段,但这个字段比较难在写入文档的时候从账号关联到部门,印象里 es 有个 script 脚本功能,但好像都是用在更新、查询的时候,能不能用在文档写入阶段就不懂了(→_→ 有没有大佬有实际应用的案例能长长见识。 最后,关于怎么实现账号找部门:logstash 的插件分三类,input filter output ,可以尝试 logstash-filter-ruby 这个 filter 插件来写 ruby 代码。 1. 如果 input 插件读取到的日志信息是 json 格式的,可以用一下 logstash-filter-json 插件,将内容先解析出来。 2. logstash-filter-ruby 插件中拿到账号,如果可以将账号和部门信息存在文件里,就可以写 ruby 代码读取本地文件,找到部门,将部门字段同时写进 logstash 的 event 对象里。 3. filter 结束,output 环节照常,es 中新增一个部门字段,写入即可。 |
2
baozhibo 2023-10-10 14:23:15 +08:00
这个我们用的 flink 解决,从 kafka 读日志,flink 日志泛化异步查询 redis 部门员工信息,输出到 es 去。这样 es 里展示的日志就是都有部门信息了。
|
3
iian OP @justest123 #1 logstash-filter-ruby 插件的方式我再看看说明如何来实现
|
4
iian OP @baozhibo #2 flink 的方式我查查资料,是否使用 logstash 倒是无所谓,只要能实现从 kafka 读,中间过程匹配出部门信息,最后写到 es 中就行。
|
5
justest123 2023-10-10 17:06:46 +08:00
@iian 简单写了个,测试了下可以用,但只能读本地文件确实没直接实时读 redis 方便。
``` input { file { path => "D:/logstash-test/input.txt" } } filter { ruby { init => ' # 引入 json ,方便操作 require "json" # 从本地文件中读取,解析后初始化一个 hash ,key 为 userId ,value 为部门 id @@userDepMap = Hash.new File.open("D:/logstash-test/filter.txt", "r").each_line do |line| userDepArray = line.split(",") @@userDepMap[userDepArray[0]] = userDepArray[1] end ' code => ' # 从 message 中拿到消息本身,转 json msg = event.get("message") msgJson = JSON.parse(msg) # 从消息中拿到 userId ,从 hash 中找到对应的部门 user = msgJson["user"].to_s dep = @@userDepMap[user] # 将部门保存到消息 json 中 msgJson["dep"] = dep # 将最新的 json 转字符串,重新设置回 event 中 event.set("message", JSON.generate(msgJson)) ' } } output { stdout { } file { path => "D:/logstash-test/output.txt" } } ``` |
6
iian OP @justest123 感谢🙏,我测试看看。
|