欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > Elasticsearch:使用 BigQuery 提取数据

Elasticsearch:使用 BigQuery 提取数据

2025/3/9 17:41:43 来源:https://blog.csdn.net/UbuntuTouch/article/details/146110160  浏览:    关键词:Elasticsearch:使用 BigQuery 提取数据

作者:来自 Elastic Jeffrey Rengifo

了解如何使用 Python 在 Elasticsearch 中索引和搜索 Google BigQuery 数据。

BigQuery 是 Google 的一个平台,允许你将来自不同来源和服务的数据集中到一个存储库中。它还支持数据分析,并可使用生成式 AI (GenAI) 和机器学习 (ML) 工具。以下是将数据引入 BigQuery 的方式:

将所有这些来源的数据索引到 Elasticsearch,可帮助你集中数据源,从而获得更好的可观测性体验。

在本文中,你将学习如何使用 Python 将 BigQuery 数据索引到 Elasticsearch,使你能够统一来自不同系统的数据,以便进行搜索和分析。

你可以在此 Google Colab 笔记本中使用本文提供的示例。

步骤

  1. 准备 BigQuery
  2. 配置 BigQuery Python 客户端
  3. 将数据索引到 Elasticsearch
  4. 搜索数据

准备 BigQuery

要使用 BigQuery,你需要访问 Google Cloud 控制台并创建一个项目。完成后,你将被重定向到以下界面:

BigQuery 允许你从 Google Drive 和 Google Cloud Storage 传输数据,并支持上传本地文件。要将数据上传到 BigQuery,你首先需要创建一个数据集。创建一个并命名为 "server-logs",这样我们就可以上传一些文件。

在本文中,我们将上传一个包含不同类型文章的本地数据集。请查看 BigQuery 的官方文档,了解如何上传本地文件。

数据集

我们将上传到 BigQuery 的文件包含服务器日志数据,其中包含 HTTP 响应及其描述,文件格式为 ndjson。该 ndjson 文件包含以下字段:ip_address_timestamphttp_methodendpointstatus_coderesponse_timestatus_code_description

BigQuery 将从该文件中提取数据,然后我们将使用 Python 进行整理,并将其索引到 Elasticsearch。

创建一个名为 logs.ndjson 的文件,并填充以下内容:

{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:03Z", "http_method": "GET", "endpoint": "/about", "status_code": "404", "response_time": 89, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:07Z", "http_method": "GET", "endpoint": "/contact", "status_code": "404", "response_time": 76, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:01Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 123, "status_code_description": "OK"}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:04Z", "http_method": "GET", "endpoint": "/products", "status_code": "200", "response_time": 156, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:05Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 101, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:08Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 98, "status_code_description": "OK"}
{"ip_address": "192.168.1.6", "_timestamp": "2024-12-03T12:00:10Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 105, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:02Z", "http_method": "POST", "endpoint": "/login", "status_code": "500", "response_time": 340, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.5", "_timestamp": "2024-12-03T12:00:09Z", "http_method": "POST", "endpoint": "/payment", "status_code": "500", "response_time": 512, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.4", "_timestamp": "2024-12-03T12:00:06Z", "http_method": "POST", "endpoint": "/checkout", "status_code": "503", "response_time": 450, "status_code_description": "Service temporarily unavailable during the checkout process."}

我们将此文件上传到刚刚创建的数据集(显示为 "server_logs"),并使用 "logs" 作为表名(显示为 "table id")。

完成后,你的文件应如下所示:

配置 BigQuery Python 客户端

下面,我们将学习如何使用 BigQuery Python 客户端和 Google Colab 来构建一个应用。

1. 安装依赖

首先,我们需要安装以下依赖项:

!pip install google-cloud-bigquery elasticsearch==8.16

google-cloud-bigquery 依赖项提供了访问 BigQuery 数据所需的工具,elasticsearch 允许连接到 Elastic 并索引数据,而 getpass 可用于输入敏感变量而不在代码中暴露它们。

让我们导入所有必要的依赖项:

from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass
from datetime import datetime
import json

我们还需要声明其他变量,并初始化 Elasticsearch 的 Python 客户端:

ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")
# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f'{PROJECT_ID}.server-logs'
# Elasticsearch client
es_client = Elasticsearch(ELASTICSEARCH_ENDPOINT,api_key=ELASTIC_API_KEY,
)

2. 身份验证

为了获取使用 BigQuery 所需的凭证,我们将使用认证。运行下面的命令行,并选择你用于创建 Google Cloud 项目的相同账户:

auth.authenticate_user()

现在,让我们查看 BigQuery 中的数据:

client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)
data = {}
for table in tables:# Table id must be in format <dataset_name>.<table_name>table_id = f"{DATASET_ID}.{table.table_id}"print(f"Processing table: {table.table_id}")# Query to retrieve BigQuery tables dataquery = f"""SELECT *FROM `{table_id}`"""query_job = client.query(query)results = query_job.result()print(f"Results for table: {table.table_id}:")data[table.table_id] = []for row in results:# Saving data with key=table_iddata[table.table_id].append(dict(row))print(row)# variable with data
logs_data = data['logs']

这应该是你看到的结果:

Processing table: logs
Results for table: logs:
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/about', datetime.datetime(2024, 12, 3, 12, 0, 3, tzinfo=datetime.timezone.utc), 89, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/contact', datetime.datetime(2024, 12, 3, 12, 0, 7, tzinfo=datetime.timezone.utc), 76, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 1, tzinfo=datetime.timezone.utc), 123, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/products', datetime.datetime(2024, 12, 3, 12, 0, 4, tzinfo=datetime.timezone.utc), 156, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 5, tzinfo=datetime.timezone.utc), 101, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 8, tzinfo=datetime.timezone.utc), 98, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 10, tzinfo=datetime.timezone.utc), 105, '192.168.1.6'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/login', datetime.datetime(2024, 12, 3, 12, 0, 2, tzinfo=datetime.timezone.utc), 340, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/payment', datetime.datetime(2024, 12, 3, 12, 0, 9, tzinfo=datetime.timezone.utc), 512, '192.168.1.5'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Service temporarily unavailable during the checkout process.', 503, 'POST', '/checkout', datetime.datetime(2024, 12, 3, 12, 0, 6, tzinfo=datetime.timezone.utc), 450, '192.168.1.4'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})

通过这段简单的代码,我们已经从 BigQuery 提取了数据,并将其存储在 logs_data 变量中,现在可以将其与 Elasticsearch 一起使用。

将数据索引到 Elasticsearch

我们将首先在 Kibana Devtools 控制台中定义数据结构:

es_client.indices.create(index="bigquery-logs",body={"mappings": {"properties": {"status_code_description": {"type": "match_only_text"},"status_code": {"type": "keyword"},"@timestamp": {"type": "date"},"ip_address": {"type": "ip"},"http_method": {"type": "keyword"},"endpoint": {"type": "keyword"},"response_time": {"type": "integer"},}}})

match_only_text 字段是 text 字段类型的一种变体,通过不存储用于计算分数的元数据来节省磁盘空间。我们使用它,因为日志通常是时间为中心的,即日期比文本字段中的匹配质量更为重要。使用 textfield 的查询与使用 match_only_text 字段的查询是兼容的。

我们将使用 Elasticsearch _bulk API 来索引这些文件:

 bulk_data = []for log_entry in logs_data:# Convert timestamp to ISO 8601 stringtimestamp_iso8601 = log_entry["_timestamp"].isoformat()# Prepare action metadataaction_metadata = {"index": {"_index": "bigquery-logs","_id": f"{log_entry['ip_address']}-{timestamp_iso8601}"}}# Prepare documentdocument = {"ip_address": log_entry["ip_address"],"status_code": log_entry["status_code"],"@timestamp": timestamp_iso8601,"http_method": log_entry["http_method"],"endpoint": log_entry["endpoint"],"response_time": log_entry["response_time"],"status_code_description": log_entry["status_code_description"]}# Append to bulk databulk_data.append(action_metadata)bulk_data.append(document)print(bulk_data)
# Indexing data
response = es_client.bulk(body=bulk_data)

搜索数据

现在,我们可以使用来自 bigquery-logs 索引的数据运行查询。

在这个例子中,我们将使用来自服务器的错误描述(status_code_description 字段)进行搜索。此外,我们将按日期对结果进行排序,并获取错误的 IP 地址:

es_client.search(index="bigquery-logs",body={"query": {"match": {"status_code_description": "error"}},"sort": [{"@timestamp": {"order": "desc"}}],"aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},},
)

这是结果:

{..."hits": {..."hits": [{"_index": "bigquery-logs","_id": "192.168.1.5-2024-12-03T12:00:09+00:00","_score": null,"_source": {"ip_address": "192.168.1.5","status_code": 500,"@timestamp": "2024-12-03T12:00:09+00:00","http_method": "POST","endpoint": "/payment","response_time": 512,"status_code_description": "Internal error while processing the payment gateway."},"sort": [1733227209000]},{"_index": "bigquery-logs","_id": "192.168.1.2-2024-12-03T12:00:02+00:00","_score": null,"_source": {"ip_address": "192.168.1.2","status_code": 500,"@timestamp": "2024-12-03T12:00:02+00:00","http_method": "POST","endpoint": "/login","response_time": 340,"status_code_description": "Internal error while processing the payment gateway."},"sort": [1733227202000]}]},"aggregations": {"by_ip": {"doc_count_error_upper_bound": 0,"sum_other_doc_count": 0,"buckets": [{"key": "192.168.1.2","doc_count": 1},{"key": "192.168.1.5","doc_count": 1}]}}
}

结论

像 BigQuery 这样的工具有助于集中信息,对于数据管理非常有用。除了搜索,将 BigQuery 与 Elasticsearch 一起使用,可以利用机器学习和数据分析的强大功能,更简单、更快速地检测或分析问题。

想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训的时间!

Elasticsearch 提供了许多新功能,帮助你为自己的使用案例构建最佳的搜索解决方案。深入了解我们的示例笔记本,了解更多内容,开始免费的云试用,或者现在就尝试在本地机器上使用 Elastic。

原文:Ingesting data with BigQuery - Elasticsearch Labs

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词