Logstash

logstash 欄位名稱中@前綴的意義是什麼?

  • January 11, 2014

以下 logstash 配置用於通過 TCP 連接將 Windows 事件日誌作為 json 接受,然後經過一些過濾後將結果轉發到 Elastic 搜尋(來源:https ://gist.github.com/robinsmidsrod/4215337 ):

input {
   tcp {
       type => "syslog"
       host => "127.0.0.1"
       port => 3514
   }
   tcp {
       type   => "eventlog"
       host   => "10.1.1.2"
       port   => 3515
       format => 'json'
   }
}

# Details at http://cookbook.logstash.net/recipes/syslog-pri/
filter {

# Incoming data from rsyslog
   grok {
       type      => "syslog"
       pattern   => [ "<%{POSINT:syslog_pri}>(?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:syslog_timestamp8601}) %{SYSLOGHOST:syslog_hostname} %{PROG:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]
       add_field => [ "received_at", "%{@timestamp}" ]
       add_field => [ "received_from", "%{@source_host}" ]
   }
   syslog_pri {
       type => "syslog"
   }
   date {
       type                 => "syslog"
       syslog_timestamp8601 => "ISO8601" # RSYSLOG_ForwardFormat
       syslog_timestamp     => [ "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
   }
   mutate {
       type         => "syslog"
       exclude_tags => "_grokparsefailure"
       replace      => [ "@source_host", "%{syslog_hostname}" ]
       replace      => [ "@message", "%{syslog_message}" ]
   }
   mutate {
       type   => "syslog"
       remove => [ "syslog_hostname", "syslog_message", "syslog_timestamp", "syslog_timestamp8601" ]
   }

# Incoming Windows Event logs from nxlog
   # The EventReceivedTime field must contain only digits, or it is an invalid message
   grep {
       type              => "eventlog"
       EventReceivedTime => "\d+"
   }
   mutate {
       # Lowercase some values that are always in uppercase
       type      => "eventlog"
       lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
   }
   mutate {
       # Set source to what the message says
       type   => "eventlog"
       rename => [ "Hostname", "@source_host" ]
   }
   date {
       # Convert timestamp from integer in UTC
       type              => "eventlog"
       EventReceivedTime => "UNIX"
   }
   mutate {
       # Rename some fields into something more useful
       type   => "eventlog"
       rename => [ "Message", "@message" ]
       rename => [ "Severity", "eventlog_severity" ]
       rename => [ "SeverityValue", "eventlog_severity_code" ]
       rename => [ "Channel", "eventlog_channel" ]
       rename => [ "SourceName", "eventlog_program" ]
       rename => [ "SourceModuleName", "nxlog_input" ]
       rename => [ "Category", "eventlog_category" ]
       rename => [ "EventID", "eventlog_id" ]
       rename => [ "RecordNumber", "eventlog_record_number" ]
       rename => [ "ProcessID", "eventlog_pid" ]
   }
   mutate {
       # Remove redundant fields
       type   => "eventlog"
       remove => [ "SourceModuleType", "EventTimeWritten", "EventTime", "EventReceivedTime", "EventType" ]
   }
}

output {
   elasticsearch {
       embedded => true
   }
   graphite {
       # Ping the graphite server every time a syslog message is received
       type => "syslog"
       port => 2023     # carbon-aggregator
       metrics => [ "syslog.received.%{@source_host}.count", "1" ]
   }
   graphite {
       # Ping the graphite server every time an eventlog message is received
       type => "eventlog"
       port => 2023     # carbon-aggregator
       metrics => [ "eventlog.received.%{@source_host}.count", "1" ]
   }
}

@第 58 行和第 68 行某些欄位名稱的前綴有什麼意義?即在這些過濾器@source_host上:@message``mutate

mutate {
   # Set source to what the message says
   type   => "eventlog"
   rename => [ "Hostname", "@source_host" ]
}

mutate {
   # Rename some fields into something more useful
   type   => "eventlog"
   rename => [ "Message", "@message" ]
   rename => [ "Severity", "eventlog_severity" ]
   rename => [ "SeverityValue", "eventlog_severity_code" ]
   rename => [ "Channel", "eventlog_channel" ]
   rename => [ "SourceName", "eventlog_program" ]
   rename => [ "SourceModuleName", "nxlog_input" ]
   rename => [ "Category", "eventlog_category" ]
   rename => [ "EventID", "eventlog_id" ]
   rename => [ "RecordNumber", "eventlog_record_number" ]
   rename => [ "ProcessID", "eventlog_pid" ]
}

我相信這只是一個避免衝突的命名空間決定。

它大部分已從較新版本的 logstash 中清除。只剩下@timestamp 和@version。您應該考慮升級 logstash 和您的托運人。

引用自:https://serverfault.com/questions/566553