作者:来自 Elastic Jeffrey Rengifo
了解如何使用 Python 在 Elasticsearch 中索引和搜索 Google BigQuery 数据。
BigQuery 是 Google 的一个平台,允许你将来自不同来源和服务的数据集中到一个存储库中。它还支持数据分析,并可使用生成式 AI (GenAI) 和机器学习 (ML) 工具。以下是将数据引入 BigQuery 的方式:
将所有这些来源的数据索引到 Elasticsearch,可帮助你集中数据源,从而获得更好的可观测性体验。
在本文中,你将学习如何使用 Python 将 BigQuery 数据索引到 Elasticsearch,使你能够统一来自不同系统的数据,以便进行搜索和分析。
你可以在此 Google Colab 笔记本中使用本文提供的示例。
步骤
- 准备 BigQuery
- 配置 BigQuery Python 客户端
- 将数据索引到 Elasticsearch
- 搜索数据
准备 BigQuery
要使用 BigQuery,你需要访问 Google Cloud 控制台并创建一个项目。完成后,你将被重定向到以下界面:
BigQuery 允许你从 Google Drive 和 Google Cloud Storage 传输数据,并支持上传本地文件。要将数据上传到 BigQuery,你首先需要创建一个数据集。创建一个并命名为 "server-logs",这样我们就可以上传一些文件。
在本文中,我们将上传一个包含不同类型文章的本地数据集。请查看 BigQuery 的官方文档,了解如何上传本地文件。
数据集
我们将上传到 BigQuery 的文件包含服务器日志数据,其中包含 HTTP 响应及其描述,文件格式为 ndjson。该 ndjson 文件包含以下字段:ip_address
、_timestamp
、http_method
、endpoint
、status_code
、response_time
和 status_code_description
。
BigQuery 将从该文件中提取数据,然后我们将使用 Python 进行整理,并将其索引到 Elasticsearch。
创建一个名为 logs.ndjson
的文件,并填充以下内容:
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:03Z", "http_method": "GET", "endpoint": "/about", "status_code": "404", "response_time": 89, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:07Z", "http_method": "GET", "endpoint": "/contact", "status_code": "404", "response_time": 76, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:01Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 123, "status_code_description": "OK"}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:04Z", "http_method": "GET", "endpoint": "/products", "status_code": "200", "response_time": 156, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:05Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 101, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:08Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 98, "status_code_description": "OK"}
{"ip_address": "192.168.1.6", "_timestamp": "2024-12-03T12:00:10Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 105, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:02Z", "http_method": "POST", "endpoint": "/login", "status_code": "500", "response_time": 340, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.5", "_timestamp": "2024-12-03T12:00:09Z", "http_method": "POST", "endpoint": "/payment", "status_code": "500", "response_time": 512, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.4", "_timestamp": "2024-12-03T12:00:06Z", "http_method": "POST", "endpoint": "/checkout", "status_code": "503", "response_time": 450, "status_code_description": "Service temporarily unavailable during the checkout process."}
我们将此文件上传到刚刚创建的数据集(显示为 "server_logs"),并使用 "logs" 作为表名(显示为 "table id")。
完成后,你的文件应如下所示:
配置 BigQuery Python 客户端
下面,我们将学习如何使用 BigQuery Python 客户端和 Google Colab 来构建一个应用。
1. 安装依赖
首先,我们需要安装以下依赖项:
!pip install google-cloud-bigquery elasticsearch==8.16
google-cloud-bigquery
依赖项提供了访问 BigQuery 数据所需的工具,elasticsearch
允许连接到 Elastic 并索引数据,而 getpass
可用于输入敏感变量而不在代码中暴露它们。
让我们导入所有必要的依赖项:
from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass
from datetime import datetime
import json
我们还需要声明其他变量,并初始化 Elasticsearch 的 Python 客户端:
ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")
# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f'{PROJECT_ID}.server-logs'
# Elasticsearch client
es_client = Elasticsearch(ELASTICSEARCH_ENDPOINT,api_key=ELASTIC_API_KEY,
)
2. 身份验证
为了获取使用 BigQuery 所需的凭证,我们将使用认证。运行下面的命令行,并选择你用于创建 Google Cloud 项目的相同账户:
auth.authenticate_user()
现在,让我们查看 BigQuery 中的数据:
client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)
data = {}
for table in tables:# Table id must be in format <dataset_name>.<table_name>table_id = f"{DATASET_ID}.{table.table_id}"print(f"Processing table: {table.table_id}")# Query to retrieve BigQuery tables dataquery = f"""SELECT *FROM `{table_id}`"""query_job = client.query(query)results = query_job.result()print(f"Results for table: {table.table_id}:")data[table.table_id] = []for row in results:# Saving data with key=table_iddata[table.table_id].append(dict(row))print(row)# variable with data
logs_data = data['logs']
这应该是你看到的结果:
Processing table: logs
Results for table: logs:
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/about', datetime.datetime(2024, 12, 3, 12, 0, 3, tzinfo=datetime.timezone.utc), 89, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/contact', datetime.datetime(2024, 12, 3, 12, 0, 7, tzinfo=datetime.timezone.utc), 76, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 1, tzinfo=datetime.timezone.utc), 123, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/products', datetime.datetime(2024, 12, 3, 12, 0, 4, tzinfo=datetime.timezone.utc), 156, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 5, tzinfo=datetime.timezone.utc), 101, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 8, tzinfo=datetime.timezone.utc), 98, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 10, tzinfo=datetime.timezone.utc), 105, '192.168.1.6'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/login', datetime.datetime(2024, 12, 3, 12, 0, 2, tzinfo=datetime.timezone.utc), 340, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/payment', datetime.datetime(2024, 12, 3, 12, 0, 9, tzinfo=datetime.timezone.utc), 512, '192.168.1.5'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Service temporarily unavailable during the checkout process.', 503, 'POST', '/checkout', datetime.datetime(2024, 12, 3, 12, 0, 6, tzinfo=datetime.timezone.utc), 450, '192.168.1.4'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
通过这段简单的代码,我们已经从 BigQuery 提取了数据,并将其存储在 logs_data
变量中,现在可以将其与 Elasticsearch 一起使用。
将数据索引到 Elasticsearch
我们将首先在 Kibana Devtools 控制台中定义数据结构:
es_client.indices.create(index="bigquery-logs",body={"mappings": {"properties": {"status_code_description": {"type": "match_only_text"},"status_code": {"type": "keyword"},"@timestamp": {"type": "date"},"ip_address": {"type": "ip"},"http_method": {"type": "keyword"},"endpoint": {"type": "keyword"},"response_time": {"type": "integer"},}}})
match_only_text
字段是 text
字段类型的一种变体,通过不存储用于计算分数的元数据来节省磁盘空间。我们使用它,因为日志通常是时间为中心的,即日期比文本字段中的匹配质量更为重要。使用 textfield
的查询与使用 match_only_text
字段的查询是兼容的。
我们将使用 Elasticsearch
_bulk API 来索引这些文件:
bulk_data = []for log_entry in logs_data:# Convert timestamp to ISO 8601 stringtimestamp_iso8601 = log_entry["_timestamp"].isoformat()# Prepare action metadataaction_metadata = {"index": {"_index": "bigquery-logs","_id": f"{log_entry['ip_address']}-{timestamp_iso8601}"}}# Prepare documentdocument = {"ip_address": log_entry["ip_address"],"status_code": log_entry["status_code"],"@timestamp": timestamp_iso8601,"http_method": log_entry["http_method"],"endpoint": log_entry["endpoint"],"response_time": log_entry["response_time"],"status_code_description": log_entry["status_code_description"]}# Append to bulk databulk_data.append(action_metadata)bulk_data.append(document)print(bulk_data)
# Indexing data
response = es_client.bulk(body=bulk_data)
搜索数据
现在,我们可以使用来自 bigquery-logs
索引的数据运行查询。
在这个例子中,我们将使用来自服务器的错误描述(status_code_description
字段)进行搜索。此外,我们将按日期对结果进行排序,并获取错误的 IP 地址:
es_client.search(index="bigquery-logs",body={"query": {"match": {"status_code_description": "error"}},"sort": [{"@timestamp": {"order": "desc"}}],"aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},},
)
这是结果:
{..."hits": {..."hits": [{"_index": "bigquery-logs","_id": "192.168.1.5-2024-12-03T12:00:09+00:00","_score": null,"_source": {"ip_address": "192.168.1.5","status_code": 500,"@timestamp": "2024-12-03T12:00:09+00:00","http_method": "POST","endpoint": "/payment","response_time": 512,"status_code_description": "Internal error while processing the payment gateway."},"sort": [1733227209000]},{"_index": "bigquery-logs","_id": "192.168.1.2-2024-12-03T12:00:02+00:00","_score": null,"_source": {"ip_address": "192.168.1.2","status_code": 500,"@timestamp": "2024-12-03T12:00:02+00:00","http_method": "POST","endpoint": "/login","response_time": 340,"status_code_description": "Internal error while processing the payment gateway."},"sort": [1733227202000]}]},"aggregations": {"by_ip": {"doc_count_error_upper_bound": 0,"sum_other_doc_count": 0,"buckets": [{"key": "192.168.1.2","doc_count": 1},{"key": "192.168.1.5","doc_count": 1}]}}
}
结论
像 BigQuery 这样的工具有助于集中信息,对于数据管理非常有用。除了搜索,将 BigQuery 与 Elasticsearch 一起使用,可以利用机器学习和数据分析的强大功能,更简单、更快速地检测或分析问题。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训的时间!
Elasticsearch 提供了许多新功能,帮助你为自己的使用案例构建最佳的搜索解决方案。深入了解我们的示例笔记本,了解更多内容,开始免费的云试用,或者现在就尝试在本地机器上使用 Elastic。
原文:Ingesting data with BigQuery - Elasticsearch Labs