from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
es_of_prod = Elasticsearch('http://host:port', http_auth=("username", "pwd"), verify_certs=False, timeout=60)
index_of_query = "index-name"
current_time = datetime.now()
previous_hour_time = current_time - timedelta(hours=1)
previous_hour_time_formatted = previous_hour_time.strftime("%Y-%m-%d %H")
previous_hourAndMin_time_formatted = previous_hour_time.strftime("%Y-%m-%d %H:%M")
kql_query_xxx_prod = 'context:"*{query String}*" AND ext.time:"*'+previous_hour_time_formatted+'*"'
kql_query_xxx_prod_hourAndMin = 'context:"*{query String}*" AND ext.time:"*'+previous_hourAndMin_time_formatted+'*"'
print(f"query condition: {kql_query_xxx_prod_hourAndMin}")
zero_today = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
midnight_24 = zero_today + timedelta(days=1)
kql_query_xxx_json = {"query": {"bool": {"must": [{"query_string": {"query":kql_query_xxx_prod_hourAndMin,"default_field": "*" }},{"range": {"@timestamp": {"gte": zero_today, "lt": midnight_24, "time_zone": "+08:00" }}}]}}
}
previous_hour_xxx_count = es_of_prod.count(index = index_of_query,body = kql_query_xxx_json
)
print(f"previous_hour_xxx_count: {previous_hour_xxx_count['count']}")
query_body = {"query_string": {"query": kql_query_xxx_prod_hourAndMin,"default_field": "*","analyze_wildcard": True,"lenient": True}
}
try:response = es_of_prod.search(index = index_of_query,query = query_body,size=10000)total = response['hits']['total']['value']print(f"获取到total:{total}")traceids = [hit['_source']['ext.traceId'] for hit in response['hits']['hits']]print(f"获取到{len(traceids)}条traceid记录")
except Exception as e:print(f"查询异常: {str(e)}")traceids = []