首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

谷歌数据流作业从Pubsub读取并写入GCS的速度非常慢,(WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards)耗时太长

谷歌数据流作业从Pubsub读取并写入GCS的速度非常慢,主要是由于以下几个原因导致的:

  1. 网络延迟:数据流作业需要从Pubsub读取数据,并将其写入GCS,这涉及到网络通信。如果网络延迟较高,数据传输速度就会变慢。
  2. 数据量过大:如果数据量非常大,写入GCS的过程可能会变得缓慢。这可能是由于数据流作业在将数据写入GCS时,需要将数据分片并写入多个文件中,这个过程可能会耗费较长的时间。
  3. 数据处理逻辑复杂:数据流作业可能需要对数据进行一些处理,例如分组、排序、聚合等操作。如果数据处理逻辑较为复杂,耗时就会增加。

针对这个问题,可以采取以下措施来改善速度:

  1. 优化网络连接:确保网络连接稳定,并且减少网络延迟。可以选择就近的数据中心进行数据传输,或者使用专用的网络加速服务。
  2. 数据分片优化:对于大数据量的情况,可以考虑优化数据分片的策略,使得每个分片的数据量适中,避免某些分片过大导致写入速度变慢。
  3. 简化数据处理逻辑:如果可能的话,可以尝试简化数据处理逻辑,减少不必要的计算操作,从而提高处理速度。
  4. 调整作业配置:根据实际情况,可以调整数据流作业的配置参数,例如并行度、资源分配等,以优化作业的执行效率。

对于谷歌云平台的相关产品和服务,可以考虑使用以下产品来改善数据流作业的速度:

  1. Pubsub:谷歌云平台提供的消息传递服务,可以实现高可靠、可扩展的消息传递。可以使用Pubsub来优化数据的读取过程。
  2. GCS:谷歌云存储服务,提供了高可靠性、高可扩展性的对象存储。可以使用GCS来优化数据的写入过程。
  3. 数据流:谷歌云平台提供的大数据处理服务,可以实现实时数据处理和批量数据处理。可以使用数据流来处理数据,并将结果写入GCS。

具体的产品介绍和使用方法,请参考谷歌云平台官方文档:谷歌云产品文档

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

弃用 Lambda,Twitter 启用 Kafka 和数据流新架构

谷歌云上,我们使用流数据流作业,对重复数据进行处理,然后进行实时聚合并将数据汇入 BigTable。...事件处理器处理向 Pubsub 事件表示法转换,生成由 UUID 和其他与处理背景相关元信息组成事件背景。UUID 被下游数据流工作器用来进行重复数据删除。...在新 Pubsub 代表事件被创建后,事件处理器会将事件发送到谷歌 Pubsub 主题。 在谷歌云上,我们使用一个建立在谷歌 Dataflow 上 Twitter 内部框架进行实时聚合。...我们通过同时将数据写入 BigQuery 连续查询重复百分比,结果表明了高重复数据删除准确性,如下所述。最后,向 Bigtable 中写入包含查询键聚合计数。...第一步,我们创建了一个单独数据流管道,将重复数据删除前原始事件直接 Pubsub 导出到 BigQuery。然后,我们创建了用于连续时间查询计数预定查询。

1.7K20

云端迁移 - Evernote 基于Google 云平台架构设计和技术转型(上)

Reco 服务(UDP -> PubSub) 当用户向Evernote添加附件或者参考资料时候,如果是PDF 或者图片的话,GCP会尝试读取器中文本信息。...因此,我们将应用程序重新设计为具有不同通信体系结构。 我们重新构建了应用程序,删除了跟踪作业必要性,通过附件来广播NoteStores状态以识别。...每个Reco服务器通过简单地订阅特定PubSub队列确认他们何时完成资源上识别作业方式处理新添加到队列上内容。...考虑到我们需要复制数据量很大,我们立即在后台启动这个海量数据复制工作。 该服务目前(2月14日)仍在读取写入现有的WebDav服务器场,而我们在后台将资源复制到他们新家。...将应用升级迁移至GCS 最后,我们需要考虑如何更新我们应用程序代码,以使用GCS读取写入资源,而不是WebDav。 我们决定添加多个开关,允许打开和关闭特定GCS读/写功能。

2.5K110
  • 作业帮基于 Delta Lake 湖仓一体实践

    问题 & 痛点 作业帮离线数仓基于 Hive 提供 ODS 层到 ADS 层数据构建能力,当 ADS 表生成后,会通过数据集成写入 OLAP 系统面向管理人员提供 BI 服务;此外,DWD、DWS...当 Spark 读取某一个 batch 数据后,根据上述表元数据使用数据中 event time 生成对应 dt 值,如数据流中 event time 值均属于 T+1,则会触发生成数据版本 T...即写入 Delta Lake spark 某个 topic 读取到逻辑表数据是 partition 粒度有序。...通过上述方案,我们将 binlog 数据流写入 Delta Lake 中,且表分区就绪时间延迟<10mins。...查询速度提升:我们重点提升分析师即席查询效率,通过将分析师常用数仓表迁移到 Delta Lake 之后,利用 Zorder 实现了查询加速,查询速度过去数十分钟降低到~3mins。

    73330

    使用NiFi每秒处理十亿个事件

    由于GCS Bucket不提供排队机制,因此NiFi负责使数据集群友好。为此,我们仅在单个节点(主节点)上执行列表。然后,我们将该列表分布在整个集群中,允许集群中所有节点同时GCS中提取。...因此,我们将单个1 TB卷用于内容存储库,以确保最佳性能(写入速度为400 MB /秒,读取速度为1,200 MB /秒)。...在这里,我们看到随着读取记录数减少,写入记录数增加,反之亦然。因此,我们确保在观察统计信息时,仅考虑同时处理小消息和大消息时间段。为此,我们选择时间窗口,其中“记录读取数”达到最高点和最低点。...内容存储库是1 TB持久性SSD(写入400 MB /秒,读取1200 MB /秒)。 可扩展性 尽管了解系统性能特征很重要,但是在某个点上,数据速率太高,单个节点无法跟上。...这是关于您改变行为以抓住新机会速度。这就是为什么我们努力提供如此丰富用户体验来构建这些数据流原因。实际上,该数据流仅花费了大约15分钟即可构建,并且可以随时动态更改。

    3K30

    基于Apache Parquet™更细粒度加密方法

    摄取元存储具有所有元数据,包括摄取管道作业中所需标记信息。当作业从上游摄取数据集时,相关元数据会摄取元存储中提取到作业中。 数据集被写入文件存储系统。...实际上,还有其他几个变量: 文件读取写入时间并不是影响用户查询或 ETL 作业持续时间唯一因素,因此就每个用户查询或 ETL 作业开销而言,博客中数字与真实用户场景相差甚远。...开销被评估为“增加时间”与 Spark 作业总持续时间,我们认为这是更接近真实用户场景评估。 基准测试工作一个挑战是读取写入文件存储延迟不固定。...读取写入平均开销计算如下: 写入开销: image.png 读取开销 image.png 在我们评估中,我们选择了 Java 8 和 CTR 模式加密了 60% 列。...需要指出两点:1) 60% 加密列通常超过实际需要加密百分比,2) 真实用户查询或 ETL 除了读取写入文件之外还有很多其他任务 (例如,表连接、数据混洗)更耗时

    1.9K30

    Hadoop 数据压缩简介

    文件压缩带来两大好处:它减少了存储文件所需空间,加速了数据在网络或者磁盘上传输速度。在处理大量数据时,这两项节省可能非常重要,因此需要仔细考虑如何在 Hadoop 中使用压缩。 1....1.1 压缩输入文件 如果输入文件是压缩,那么HDFS读入字节就会减少,这意味着读取数据时间会减少。对于提升作业执行性能是有帮助。...1.3 压缩Map输出 即使你 MapReduce 应用程序读取写入未压缩数据,它也可能从压缩 Map 阶段中间输出中受益。...由于 Map 输出被写入磁盘通过网络传输到 Reducer 节点,所以通过使用 LZO 或 Snappy 等快速压缩器,由于减少了传输数据量从而获得性能提升。 2. 常见压缩格式 ?...然而,无法为每个块创建 InputSplit,因为不能从 gzip 数据流任意位置开始读取,因此 Map 任务不可能独立于其他 Map 任务而只读取一个 InputSplit 中数据。

    1.6K20

    基于Alluxio系统Spark DataFrame高效存储管理技术

    Alluxio和Spark缓存 用户使用Alluxio存储Spark DataFrame非常简单:通过Spark DataFrame write API将DataFrame作为一个文件写入Alluxio...然而,随着DataFrame数据规模增长,Alluxio中读取DataFrame性能更好,因为Alluxio中读取DataFrame耗时几乎始终随着数据规模线性增长。...由于使用Alluxio存储DataFrame读写性能具有较好线性可扩展性,上层应用可以稳定地以内存速度处理更大规模数据。...当一个DataFrame文件被写入Alluxio后,它可以被不同作业、SparkContext、甚至不同计算框架共享。...由于共有云存储系统网络访问性能不可预测性,最慢Spark作业执行时间超过1700秒, 比平均2倍。然而,当使用Alluxio时,最慢Spark作业执行时间大约比平均时间只6秒。

    1K100

    【流计算 Oceanus】巧用 Flink 实现高性能 ClickHouse 实时数仓

    Development-Slow.png 如果不加约束,大家都从原始数据源来读取数据分析,一方面对原始数据源压力非常大(同时承担着各类业务写请求、读请求),另一方面分析链路难以复用,最终会形成重复开发...第三个因素是数据源繁杂,组件和格式众多,接入起来耗时长: [数据源多样 接入] 例如一个业务可能用 Kafka 来承接 SDK 中上报各类点击流数据,又使用 HBase 等 KV 系统来存储维表信息...如果宽表字段设计合理,内容足够丰富的话,可以大大缓解开发问题。此外,还可以导出数据接口,以供其他业务部门对接。 汇总层数据是明细层和维度层关联而来。...= '' GROUP BY Phone, Model ORDER BY u DESC LIMIT 10; 下图是 1 亿条数据量级下,ClickHouse 与多种常见数据处理系统查询速度对比图(数字越小代表耗时越短...对于实时链路而言,它可以在一定程度上代替 Kafka 等传统流式数据管道;对于需要读取中间层数据等特殊需求,又可以使用常见批处理分析工具来直接分析 Iceberg 数据文件,非常便捷。

    4.9K92

    基于Alluxio系统Spark DataFrame高效存储管理技术

    Alluxio和Spark缓存 用户使用Alluxio存储Spark DataFrame非常简单:通过Spark DataFrame write API将DataFrame作为一个文件写入Alluxio...然而,随着DataFrame数据规模增长,Alluxio中读取DataFrame性能更好,因为Alluxio中读取DataFrame耗时几乎始终随着数据规模线性增长。...由于使用Alluxio存储DataFrame读写性能具有较好线性可扩展性,上层应用可以稳定地以内存速度处理更大规模数据。...当一个DataFrame文件被写入Alluxio后,它可以被不同作业、SparkContext、甚至不同计算框架共享。...由于共有云存储系统网络访问性能不可预测性,最慢Spark作业执行时间超过1700秒, 比平均2倍。然而,当使用Alluxio时,最慢Spark作业执行时间大约比平均时间只6秒。

    1.1K50

    腾讯云 Oceanus 在 MySQL CDC Connector 核心优化

    引言Apache Flink 作为流计算引擎,需要持续从上游接收数据流,并向下游输出最新计算结果。...Connector 起到承上启下作用:Source 负责与上游 MQ、数据库等源表对接,Sink 则写入各类数据库、数仓、数据湖等目的表。...监控数据上来看,这段时间完全没有任何数据输出,但是 Flink 作业运行一切正常,让用户非常困惑。...既然瓶颈在这里,我们也对其算法做了优化,通过利用局部有序性原理,采用二分方式查找边界,将时间复杂度 O(N) 优化到 O(logN),后续观察到,该阶段耗时减少了 80%.增量数据同步性能优化问题背景当...MySQL CDC Source 进入纯增量阶段后,仍然可能会遇到性能瓶颈:由于 Binlog 读取是单线程,如果遇到大表消费场景,并不能简单通过扩容并行度来解决。

    1.1K40

    流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓

    场景多 开发 如果不加约束,大家都从原始数据源来读取数据分析,一方面对原始数据源压力非常大(同时承担着各类业务写请求、读请求),另一方面分析链路难以复用,最终会形成重复开发、各自为政 “烟囱模式...第三个因素是数据源繁杂,组件和格式众多,接入起来耗时长: 数据源多样 接入 例如一个业务可能用 Kafka 来承接 SDK 中上报各类点击流数据,又使用 HBase 等 KV 系统来存储维表信息...如果宽表字段设计合理,内容足够丰富的话,可以大大缓解开发问题。此外,还可以导出数据接口,以供其他业务部门对接。 汇总层数据是明细层和维度层关联而来。...= '' GROUP BY Phone, Model ORDER BY u DESC LIMIT 10; 下图是 1 亿条数据量级下,ClickHouse 与多种常见数据处理系统查询速度对比图(数字越小代表耗时越短...对于实时链路而言,它可以在一定程度上代替 Kafka 等传统流式数据管道;对于需要读取中间层数据等特殊需求,又可以使用常见批处理分析工具来直接分析 Iceberg 数据文件,非常便捷。

    88830

    流计算Oceanus | 巧用Flink构建高性能ClickHouse实时数仓

    场景多 开发 如果不加约束,大家都从原始数据源来读取数据分析,一方面对原始数据源压力非常大(同时承担着各类业务写请求、读请求),另一方面分析链路难以复用,最终会形成重复开发、各自为政“烟囱模式...第三个因素是数据源繁杂,组件和格式众多,接入起来耗时长: 数据源多样 接入 例如一个业务可能用Kafka来承接SDK中上报各类点击流数据,又使用HBase等KV系统来存储维表信息,还使用传统MySQL...如果宽表字段设计合理,内容足够丰富的话,可以大大缓解开发问题。此外,还可以导出数据接口,以供其他业务部门对接。 汇总层数据是明细层和维度层关联而来。...= ''GROUP BY Phone, ModelORDER BY u DESCLIMIT 10; 下图是1亿条数据量级下,ClickHouse与多种常见数据处理系统查询速度对比图(数字越小代表耗时越短...对于实时链路而言,它可以在一定程度上代替Kafka等传统流式数据管道;对于需要读取中间层数据等特殊需求,又可以使用常见批处理分析工具来直接分析Iceberg数据文件,非常便捷。

    74430

    Firestorm - 腾讯自研Remote Shuffle Service在Spark云原生场景实践

    Shuffle失败导致任务陷入重试,严重拖作业。...量)作业非常难以顺利跑过,这里面的问题有: shuffle数据非常容易将磁盘写满。...百度内部MR作业已经改造接入DCE shuffle使用多年,现在Spark批处理作业也已经改造使用DCE shuffle做为其shuffle引擎。...PartitionId)获取存储路径,将Shuffle数据写入Index文件和Data文件中 Task写入完成后,告知Shuffle Server任务已完成获取当前所有任务完成数,假如任务完成数小于预期值...,耗时非常长,甚至达到了2分钟,而Remote Shuffle Service由于读取时降低了网络开销,且读取是整块Shuffle数据,所以耗时短且较为稳定。

    3.1K30

    腾讯云大数据流计算 Oceanus 在 MySQL CDC Connector 核心优化

    引言:Apache Flink 作为流计算引擎,需要持续从上游接收数据流,并向下游输出最新计算结果。...Connector 起到承上启下作用:Source 负责与上游 MQ、数据库等源表对接,Sink 则写入各类数据库、数仓、数据湖等目的表。...监控数据上来看,这段时间完全没有任何数据输出,但是 Flink 作业运行一切正常,让用户非常困惑。...既然瓶颈在这里,我们也对其算法做了优化,通过利用局部有序性原理,采用二分方式查找边界,将时间复杂度 O(N) 优化到 O(logN),后续观察到,该阶段耗时减少了 80%....增量数据同步性能优化 问题背景 当 MySQL CDC Source 进入纯增量阶段后,仍然可能会遇到性能瓶颈:由于 Binlog 读取是单线程,如果遇到大表消费场景,并不能简单通过扩容并行度来解决

    1K40

    业内首个基于Iceberg“云端仓转湖”生产实践探索

    如果可以通过类似Hive sql构建准实时数据,业务可以更容易获得高新鲜度数据进行分析。 业务查数速度,分析效率低。...: 数据流转批:数据湖仓共存,以减少对上层业务使用影响。...Iceberg表格式迁移工具不支持分区目录子目录结构,导致在生成manifest元数据时会丢失大量数据文件记录。 作业Hive表存量数据非常大,甚至有PB级别。...社区版迁移工具文件处理能力有限,针对这种超大规模表迁移耗时非常长。 针对这些云上特定场景,腾讯云EMR对Iceberg迁移工具进行了兼容适配和优化: 兼容归档存储类型。...、可靠性不易实现、开发维护成本高,离线数据增量计算支持弱、查询速度、数据时效性低,离线实时对数难等问题。

    88110

    大规模运行 Apache Airflow 经验和教训

    在 Shopify 中,我们利用谷歌云存储(Google Cloud Storage,GCS)来存储 DAG。...然而,由于我们允许用户自己项目中部署工作负载(甚至在部署时动态生成作业),这就变得更加困难。...为了创建一些基本“护栏”,我们采用了一个 DAG 策略,它从之前提到 Airflow 清单中读取配置,通过引发 AirflowClusterPolicyViolation 来拒绝那些不符合其命名空间约束...下面是一个简化例子,演示如何创建一个 DAG 策略,该策略读取先前共享清单文件,实现上述前三项控制: airflow_local_settings.py:...一个集中元数据存储库可以用来跟踪 DAG 来源和所有权。 DAG 策略对于执行作业标准和限制是非常。 标准化计划生成可以减少或消除流量激增。

    2.7K20

    PySpark SQL 相关知识介绍

    每时每刻都在收集大量数据。这意味着数据速度在增加。一个系统如何处理这个速度?当必须实时分析大量流入数据时,问题就变得复杂了。许多系统正在开发,以处理这种巨大数据流入。...在每个Hadoop作业结束时,MapReduce将数据保存到HDFS并为下一个作业再次读取数据。我们知道,将数据读入和写入文件是代价高昂活动。...除了执行HiveQL查询,您还可以直接Hive读取数据到PySpark SQL并将结果写入Hive 相关链接: https://cwiki.apache.org/confluence/display...Kafka Broker不会将消息推送给Consumer;相反,ConsumerKafka Broker中提取数据。Consumer订阅Kafka Broker上一个或多个主题,读取消息。...使用PySpark SQL,我们可以MongoDB读取数据执行分析。我们也可以写出结果。

    3.9K40

    通过优化 S3 读取来提高效率和减少运行时间

    结果非常令人鼓舞。单独基准测试显示,S3 读取吞吐量提高了 12 倍( 21MB/s 提高到 269MB/s)。吞吐量提高可以缩短生产作业运行时间。...这样速度要比 aws s3 cp 这类命令吞吐量几个数量级,后者速度达到 200+MB/s 都很常见(在 EC2 c5.4xlarge 实例上观测结果)。...如果我们可以提高作业读取数据速度,那么作业就可以更快完成,为我们节省相当多处理时间和金钱。鉴于处理成本很高,节省时间和金钱可以迅速增加到一个可观数量。...单独基准测试 图 2:S3A 和 S3E 吞吐量对比 * 在每种情况下,我们都是顺序读取一个 3.5GB S3 文件,并将其写入本地一个临时文件。...后半部分是为了模拟 mapper 操作期间发生 IO 重叠。基准测试是在 EC2 c5.9xlarge 实例上进行。我们测量了读取文件总时间,计算每种方法有效吞吐量。

    59930

    【Go】使用压缩文件优化io (一)

    先想一下原始需求,读取原始文件 -> 上传数据。但是直接上传原始文件,文件比较大,网络传输,而且存储费用也比较高,怎么办呢?...如果我们压缩文件数据流,在 读取原始文件 -> 上传数据 流程中对上传数据流进行实时压缩,把压缩内容给上传了,实现边读边压缩,对数据流进行处理,像是一个中间件,这样就不用写 lzo 文件了,那么 w_await...优化后 根据之前分析看一下优化之后备份文件需要哪些过程: 读取原始日志 在内存中压缩数据流 http 发送压缩后内容 这个流程节省了两个步骤,写入 lzo 文件和 读取 lzo 文件,不仅没有 w_await...发现这个库实现了 io.Reader 和 io.Writer 接口,io.Reader 读取压缩文件流,输出解压缩数据,io.Writer 实现输入原始数据,写入到输入 io.Writer。...实现原理当 http 输入 io.Reader (实际就是我们上面封装 lzo 库), 读取数据时,这个库检查压缩缓冲是否为空,为空情况会文件读取 256k 数据压缩输入到压缩缓冲中,然后压缩缓冲读取数据给

    1.2K50

    工作还是游戏?程序员:我选择边玩游戏边工作!

    本文将介绍电竞数据平台FunData架构演进中设计思路及相关技术,包括大数据流处理方案、结构化存储转非结构化存储方案和数据API服务设计等。...(全球一天DOTA2比赛场次在120万场,录像分析相对耗时,串行处理会导致任务堆积严重); 数据分布式存储; 系统解耦,各节点可优雅重启与更新。...联赛数据分析模块负责录像文件拉取(Salt、Meta文件与Replay文件获取)与比赛基本数据分析; 联赛录像分析模块负责比赛录像解析并将分析后数据推送至PubSub; 分析/挖掘数据DB代理负责接收录像分析数据批量写入...图7 一致性hash构建RowKey 时间戳使用方便我们在聚合数据时对同一个RowKey和Column数据重复写入,HBase/Bigtable内部有自定GC策略,对于过期时间戳数据会做清理,读取时取最新时间节点数据即可...依赖云平台可用区特性,灰度系统也能方便地实现后端API服务跨可用区,做到物理架构上容灾。 另外,为保证内部跨洋访问链路稳定性,我们在阿里云北美机房搭建数据代理层,利用海外专线来提升访问速度

    70121
    领券