且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Logstash doc_as_upsert弹性搜索中的交叉索引以消除重复

更新时间:2023-02-19 07:57:28

A新的索引是一个全新的关键空间,没有办法告诉ES不要在两个不同的索引中索引两个具有相同ID的文档。



但是,您可以通过添加 elasticsearch 过滤器到您的管道,它将查找所有索引中的文档,如果找到一个,则可以删除事件。



这将做(注意,用法将是一个跨越所有使用 - * 索引的别名):

  filter {
elasticsearch {
hosts => [elastic4:9204]
index => usages
query => _id:%{[@ metadata] [fingerprint]}
fields => {_id=> other_id}
}
#如果找到该文件,删除这个
如果[other_id] {
drop {}
}
}


I have a logstash configuration that uses the following in the output block in an attempt to mitigate duplicates.

output {
        if [type] == "usage" {
                elasticsearch {
                        hosts => ["elastic4:9204"]
                        index => "usage-%{+YYYY-MM-dd-HH}"
                        document_id => "%{[@metadata][fingerprint]}"
                        action => "update"
                        doc_as_upsert => true
                }

        }
}

The fingerprint is calculated from a SHA1 hash of two unique fields.

This works when logstash sees the same doc in the same index, but since the command that generates the input data doesn't have a reliable rate at which different documents appear, logstash will sometimes insert duplicates docs in a different date stamped index.

For example, the command that logstash runs to get the input generally returns the last two hours of data. However, since I can't definitively tell when a doc will appear/disappear, I tun the command every fifteen minutes.

This is fine when the duplicates occur within the same hour. However, when the hour or day date stamp rolls over, and the document still appears, elastic/logstash thinks it's a new doc.

Is there a way to make the upsert work cross index? These would all be the same type of doc, they would simply apply to every index that matches "usage-*"

A new index is an entirely new keyspace and there's no way to tell ES to not index two documents with the same ID in two different indices.

However, you could prevent this by adding an elasticsearch filter to your pipeline which would look up the document in all indices and if it finds one, it could drop the event.

Something like this would do (note that usages would be an alias spanning all usage-* indices):

filter {
    elasticsearch {
        hosts => ["elastic4:9204"]
        index => "usages"
        query => "_id:%{[@metadata][fingerprint]}"
        fields => {"_id" => "other_id"}
    }
    # if the document was found, drop this one
    if [other_id] {
        drop {}
    }
}