谷歌的BigQuery存储API可以从仅涉及SELECT、from和WHERE的基本查询创建的临时表中读取。
我看到的是,当您使用ORDER语句检索有序的一组行时,从BigQuery存储API创建的临时表是不可读的。
请看下面的代码示例:
让我们来看看这个查询:
sql = """SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` LIMIT 1000"""如果您使用以下BigQuery python代码运行它:
bq_client = bigquery.Client("myproject") ## << Change to your project
query_job = bq_client.query(
sql,
location='US')
project_id = query_job.destination.project
dataset_id = query_job.destination.dataset_id
table_id = query_job.destination.table_id
print("Destination table: " + project_id + "." + dataset_id + "." + table_id)..。然后获得目标表。
在这里,您可以将此目标表传递给BigQuery存储API以使用RPC获得结果:
client = bigquery_storage_v1beta1.BigQueryStorageClient()
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = project_id
table_ref.dataset_id = dataset_id
table_ref.table_id = table_id
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("name")
parent = "projects/{}".format(project_id)
session = client.create_read_session(
table_ref, parent, table_modifiers=modifiers, read_options=read_options
) # API request.
reader = client.read_rows(
bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)
rows = reader.rows(session)这个很好用。
现在,将sql= <yourquery>中的查询更改为
sql = """SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` ORDER BY name ASC LIMIT 1000"""您将从代码的BigQuery存储API部分获得以下错误:
Table 'myproject:mydataset.temptable' has a storage format that is not supported.
这意味着查询中的ORDER语句增加了某种形式的复杂性,使临时表无法读取到存储API。
问题: 1)对于如何解决这个问题有什么想法,或者这是否是目前存储API的一个真正的限制? 2)如果ORDER造成问题,那么为存储API创建不可读临时表的查询的全部范围是什么?
发布于 2021-07-16 10:48:57
我们可以使用bigquery_storage.BigQueryReadClient从由order、join等查询创建的临时表中读取。下面是工作代码。我已经使用join创建了临时表。
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types, ReadRowsResponse
bqclient = bigquery.Client(credentials=credentials, project=your_project_id,)
client = bigquery_storage.BigQueryReadClient(credentials=credentials)
try:
import fastavro
except ImportError:
fastavro = None
sql = """SELECT s.id, s.name, d.dept FROM sbx-test.EMP.emp01 s join sbx-test.EMP.dept d
on s.id = d.id"""
query_job = bqclient.query(sql)
project_id = query_job.destination.project
dataset_id = query_job.destination.dataset_id
table_id = query_job.destination.table_id
table = "projects/{}/datasets/{}/tables/{}".format(
project_id, dataset_id, table_id
)
requested_session = types.ReadSession()
requested_session.table = table
requested_session.data_format = types.DataFormat.AVRO
requested_session.read_options.selected_fields = ["name", "dept"]
parent = "projects/{}".format(project_id)
session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=1,
)
reader = client.read_rows(session.streams[0].name)
rows = reader.rows(session)
names = set()
depts = set()
for row in rows:
names.add(row["name"])
depts.add(row["dept"])
print("Got unique employees {} and departments {}".format(names, depts))https://stackoverflow.com/questions/56199879
复制相似问题