在 Google App Engine (GAE) 中,如果你希望将数据上传到 Datastore 或 Cloud Datastore,而不使用 Bulkloader
,你可以通过使用 Google Cloud SDK 或 App Engine 的 Python API 来进行数据的上传。这里有一些方法和步骤,帮助你在不使用 Bulkloader
的情况下将数据上传到 GAE。
1、问题背景
用户想上传大量数据到谷歌应用引擎 (GAE),但又不想使用 Bulkloader。在 GAE SDK 1.3.4 之前,可以使用 appcfg.py download_data
和 appcfg.py upload_data
命令来完成这个任务,但从 1.3.4 版本开始,这两个命令已被弃用。因此,需要寻找其他的方法来实现。
2、解决方案
可以使用 Bulkloader API 来实现数据上传。Bulkloader API 是一个用于将数据批量加载到 GAE 的库。它提供了多种方式来加载数据,包括使用命令行工具、Python API 和 Java API。
(1) 使用命令行工具
a. 安装 Bulkloader
pip install google-cloud-datastore-bulkloader
b. 准备数据文件
将数据导出成 CSV 文件或 JSON 文件,并将其保存在本地计算机上。
c. 运行 Bulkloader 命令
bulkloader load --dataset_id=YOUR_DATASET_ID --input_file=YOUR_DATA_FILE
其中:
YOUR_DATASET_ID
是要加载数据到的数据集的 ID。YOUR_DATA_FILE
是要加载的数据文件。(2) 使用 Python API
from google.cloud import datastore_v1
# 创建 Bulkloader 客户端。
bulkloader = datastore_v1.DatastoreAdminClient()
# 创建数据集。
dataset = datastore_v1.types.Dataset(
name=datastore_v1.DatastoreAdminClient.dataset_path(
"YOUR_PROJECT_ID", "YOUR_DATASET_ID"
)
)
bulkloader.create_dataset(request={"dataset": dataset})
# 准备数据文件。
data_file = "path/to/your/data/file.csv"
# 加载数据。
operation = bulkloader.load_data(
request={
"parent": datastore_v1.DatastoreAdminClient.dataset_path(
"YOUR_PROJECT_ID", "YOUR_DATASET_ID"
),
"input_url": f"gs://{bucket_name}/{file_name}",
"name": "YOUR_OPERATION_NAME",
}
)
# 等待加载操作完成。
operation.result(300)
(3) 使用 Java API
import com.google.cloud.datastore.bulkloader.v1.BulkLoaderServiceClient;
import com.google.cloud.datastore.bulkloader.v1.DataImport;
import com.google.cloud.datastore.bulkloader.v1.DataImport.DataFile;
import com.google.cloud.datastore.bulkloader.v1.DataImport.EntityFilter;
import com.google.cloud.datastore.bulkloader.v1.ImportEntitiesRequest;
import com.google.cloud.datastore.bulkloader.v1.OperationMetadata;
import com.google.datastore.v1.DatastoreName;
import com.google.datastore.v1.Key;
import com.google.protobuf.ByteString;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ImportData {
public static void main(String[] args)
throws IOException, NoSuchAlgorithmException, ExecutionException, InterruptedException, TimeoutException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String datasetId = "your-dataset-id";
String gcsSourceUri = "gs://source-bucket/source.csv";
importData(projectId, datasetId, gcsSourceUri);
}
// Imports data to the specified dataset.
public static void importData(String projectId, String datasetId, String gcsSourceUri)
throws IOException, NoSuchAlgorithmException, ExecutionException, InterruptedException, TimeoutException {
try (BulkLoaderServiceClient client = BulkLoaderServiceClient.create()) {
String datasetName = DatastoreName.of(projectId, datasetId).toString();
// Data source information.
DataFile dataFile = DataFile.newBuilder().setGcsSource(gcsSourceUri).build();
// Import with an empty filter imports all entities in the file.
EntityFilter entityFilter = EntityFilter.newBuilder().build();
// Compute the hash of the source data for the DataImport.
String hash = computeHash(Paths.get(gcsSourceUri).toFile());
DataImport dataImport =
DataImport.newBuilder()
.setGcsSource(gcsSourceUri)
.setEntityFilter(entityFilter)
.setSourceDataHash(ByteString.copyFromUtf8(hash))
.build();
ImportEntitiesRequest request =
ImportEntitiesRequest.newBuilder()
.setParent(datasetName)
.setDataImport(dataImport)
.build();
// Start an import job.
OperationMetadata operation = client.importEntitiesAsync(request).get(10, TimeUnit.MINUTES);
// Block until the operation completes.
System.out.println("Waiting for operation to complete: " + operation.getName());
operation = client.getOperation(operation.getName());
// Check the status of the import job.
if (!operation.getDone()) {
System.out.println("Import job failed: " + operation.getErrorMessage());
return;
}
System.out.println("Import job completed successfully.");
}
}
// Computes the MD5 hash of the data in the file.
private static String computeHash(java.io.File file) throws IOException, NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("MD5");
try (FileInputStream fis = new FileInputStream(file)) {
byte[] buffer = new byte[8192];
int length;
while ((length = fis.read(buffer)) != -1) {
md.update(buffer, 0, length);
}
}
return toHex(md.digest());
}
// Converts a byte array to a hex string.
private static String toHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b & 0xFF));
}
return sb.toString();
}
}
无论使用哪种方法,在加载数据之前,都需要先创建一个数据集。如果数据集已经存在,则可以跳过这一步。
使用 Bulkloader API 加载数据时,需要注意以下几点:
__key__
的列,该列的值是实体的键。__property__
的列,该列的值是实体的属性。如果满足以上几点要求,则可以使用 Bulkloader API 将数据加载到 GAE。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。