About adding fields and specifying dates in aggregation queries

Asked 2 years ago, Updated 2 years ago, 76 views

For ElasticSearch to add and date fields in aggregation queries, see
Below is a summary of the specified fields in the aggregation, and the results are extracted by date, and the top order is displayed.

First question
1. Each aggregated field appears, but what should I do if I want to display additional data there?

"key": "122.xxx.xxx.xxx",
    "doc_count": 2013,
    "sample_sum": {
      "value"—7730929.0
      Data that the following would like to add:
      It doesn't matter where it is.
      ----------------------------
      "bgp_ipv4_next_hop"—1 or 0
      ----------------------------
    },

"sample_range": {
      "buckets": [{
        "key": "2016-02-24T09:14:00.000Z-2016-02-24T09:29:18.517Z",
        "from" : 1.45630524E12,
        "from_as_string": "2016-02-24T09:14:00.000Z",
        "to" : 1.456306158517E12,
        "to_as_string": "2016-02-24T09:29:18.517Z",
        "doc_count": 295,
        "sample_calculate": {
          "value"—1043050.0
        }

For example, if you want to add bgp_ipv4_next_hop data per ip to the aggregation results of the above query,
How do I add it?The creation query is listed at the bottom.
I've tried a lot of nesting, but it's not going well.

2. Currently, we are extracting data every hour using the following settings.
If I want to do it at 00:00 on a regular basis, can I specify the time?
If so, how do you do it at a fixed time every day?
(To be honest, I still don't know how to write time...)

"gte": "now-1h/h",
  "lte": "now/d",

The following is a query for creation

{
  "size"—0,
  "query": {
    "filtered": {
      "query": {
        "query_string": {
          "query": "*",
          "analyze_wildcard"—true
        }
      },
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "@timestamp":{
                  "gte": "now-1h/h",
                  "lte": "now/d",
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": [ ]
        }
      }
    }
  },
  "aggs": {
    "sample_query": {
      "terms": {
        "field": "ipv4_src_addr",
        "size":15,
        "order": {
          "tsutsui_sum": "desc"
        }
      },
      "aggs": {
        "sample_sum": {
          "sum": {
            "field": "in_bytes"
          }
        },
        "sample_range": {
          "date_range": {
            "field": "@timestamp",
            "ranges": [
              {
                "from": "now-15m/m",
                "to" —"Now"
              }
            ]
          },
          "aggs": {
            "sample_calculate": {
              "sum": {
                "field": "in_bytes"
              }
            }
          }
        }
      }
    }
  }
}

Thank you for your cooperation.

The following is the netflow data structure to use

"mappings":{
     "netflow": {
     "properties": {
      "@timestamp":{
        "type": "date",
        "format": " strict_date_optional_time||epoch_millis"
      },
      "bgp_ipv4_next_hop": {
        "type": "string"
      },
      "bgp_ipv6_next_hop": {
        "type": "string"
      },
      "direction": {
        "type": "long"
      },
      "dst_as": {
        "type": "long"
      },
      "dst_mask": {
        "type": "long"
      },
      "egress_vrf_id": {
        "type": "long"
      },
      "first_switched": {
        "type": "date",
        "format": " strict_date_optional_time||epoch_millis"
      },
      "flow_sampler_id": {
        "type": "long"
      },
      "flow_sampler_mode": {
        "type": "long"
      },
      "flow_sampler_random_interval": {
        "type": "long"
      },
      "flow_seq_num": {
        "type": "long"
      },
      "flowset_id": {
        "type": "long"
      },
      "forwarding_status": {
        "type": "long"
      },
      "geoip_city": {
        "type": "string"
      },
      "geoip_country": {
        "type": "string"
      },
      "geoip_lat": {
        "type": "double"
      },
      "geoip_lon": {
        "type": "double"
      },
      "geoip_pin": {
        "type": "string"
      },
      "host": {
        "type": "string"
      },
      "in_bytes": {
        "type": "long"
      },
      "in_pkts": {
        "type": "long"
      },
      "ingress_vrf_id": {
        "type": "long"
      },
      "input_snmp": {
        "type": "long"
      },
      "ipv4_dst_addr": {
        "type": "string"
      },
      "ipv4_src_addr": {
        "type": "string"
      },
      "ipv6_dst_addr": {
        "type": "string"
      },
      "ipv6_dst_mask": {
        "type": "long"
      },
      "ipv6_flow_label": {
        "type": "long"
      },
      "ipv6_option_headers": {
        "type": "long"
      },
      "ipv6_src_addr": {
        "type": "string"
      },
      "ipv6_src_mask": {
        "type": "long"
      },
      "l4_dst_port": {
        "type": "long"
      },
      "l4_src_port": {
        "type": "long"
      },
      "last_switched": {
        "type": "date",
        "format": " strict_date_optional_time||epoch_millis"
      },
      "output_snmp": {
        "type": "long"
      },
      "protocol": {
        "type": "long"
      },
      "sampler_name": {
        "type": "string"
      },
      "src_as": {
        "type": "long"
      },
      "src_mask": {
        "type": "long"
      },
      "src_tos": {
        "type": "long"
      },
      "system": {
        "type": "string"
      },
      "tcp_flags": {
        "type": "long"
      },
      "version": {
        "type": "long"
          }
        }
      }
    }
  }
}

The following sample data

@timestamp March 3rd 2016, 14:36:00.000
    t_id AVM6-heNCCEYIL1 vsnDa
    t_index flow-20160303
    #_score1
    t_type netflow
    tbgp_ipv6_next_hop::
    # direction 0
    # dst_as0
    # egress_vrf_id 1,610,612,736
    first_switched March 3rd 2016, 14:35:45.999
    # flow_sampler_id1
    # flow_seq_num 150,022,732
    # flowset_id266
    # forwarding_status64
    last 133.130.xxx.xxx
    # in_bytes 1,500
    # in_pkts1
    # ingress_vrf_id 1,610,612,736
    # input_snmp84
    tipv6_dst_addr2400:8500:1301:2519:157:7:xx:xx
    # ipv6_dst_mask64
    # ipv6_flow_label 0
    # ipv6_option_headers0
    tipv6_src_addr2001:da8:208:849d:c86d:769a:xxxx:xxxx
    # ipv6_src_mask32
    #l4_dst_port 25,565
    #l4_src_port 6,944
    last_switched March 3rd 2016, 14:35:45.999
    # output_snmp10
    #protocol6
    # src_as23,910
    # src_tos0
    # tcp_flags16
    # version 9

elasticsearch

2022-09-30 21:15

1 Answers

The first question is whether it would be better if you could write down the structure of the data and what kind of output you would like.
It's easy to answer if you have sample data.

The second question is that running queries regularly is a function of Elasticsearch.
I wonder if it's a way to boot from the outside (such as cron) or run it regularly with a commercial plug-in called Watcher.
Elasticsearch calculates the use of a notation such as 'now-1h/h' based on the time the query was run.


2022-09-30 21:15

If you have any answers or tips


© 2024 OneMinuteCode. All rights reserved.