要将Pub/Sub数据通过数据流写入Google Cloud Storage (GCS),你可以使用Google Cloud Dataflow。以下是一个简单的步骤指南,以及一个示例代码,展示如何实现这一过程。
以下是一个使用Apache Beam SDK的Python示例,展示如何从Pub/Sub读取数据并将其写入GCS。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.gcsio import WriteToGCS
class ParseMessage(beam.DoFn):
def process(self, element):
# 假设消息是以JSON格式发送的
import json
record = json.loads(element.decode('utf-8'))
yield record
def run(project_id, pubsub_topic, output_path, service_account_keyfile):
options = PipelineOptions()
gcp_options = options.view_as(GoogleCloudOptions)
gcp_options.project = project_id
gcp_options.region = 'us-central1' # 或者你选择的区域
gcp_options.job_name = 'pubsub-to-gcs'
gcp_options.service_account_email = service_account_keyfile
gcp_options.use_public_ips = False
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> ReadFromPubSub(topic=pubsub_topic)
| 'Parse JSON' >> beam.ParDo(ParseMessage())
| 'Write to GCS' >> WriteToGCS(output_path, file_naming=WriteToGCS.FileNaming.GENERATE_FILE_NAME))
if __name__ == '__main__':
project_id = 'your-project-id'
pubsub_topic = 'projects/your-project-id/topics/your-topic-name'
output_path = 'gs://your-bucket-name/output/'
service_account_keyfile = 'path/to/your/service-account-key.json'
run(project_id, pubsub_topic, output_path, service_account_keyfile)
ParseMessage
类中的消息解析逻辑。领取专属 10元无门槛券
手把手带您无忧上云