腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(525)
视频
沙龙
1
回答
如
何在
Apache
Beam
中
对
早期
触发
进行
单元测试
(
Python
SDK
)
、
我想为具有
早期
触发
的流水线创建一个
单元测试
。示例管道如下所示: class CalculateTeamScores(
beam
.PTransform): return scores\
beam
.window.FixedWindows(120),(su
浏览 20
提问于2020-07-22
得票数 4
1
回答
窗口和GroupByKey
、
LE: TL;DR;如
何在
Python
中
创建无界数据源?有可能吗?" >>
beam
.ParDo(DummyWindowPrint())我遇到的问题是,GroupByKey之后的操作只有在输入完成后才会启动,但是我希望在WindowInto上使用一个
触发
器,该
触发
器将在窗口中的最后一个条目到达后1秒后
触发
。> DEBUG:
apache
_
beam
.runners.worker.
sdk<
浏览 1
提问于2020-03-12
得票数 2
1
回答
Apache
Beam
有状态ParDo工作令牌无效
、
、
我有一个有状态的DoFn,它基本上
对
即将到来的元素
进行
批处理,当缓冲区达到一定大小时,缓冲区将被清除并将元素插入到BigQuery
中
。我注意到的是,管道有时会引发异常,该异常并不是停止作业运行。File "/usr/local/lib/
python
3.7/site-packages/
apache
_
beam
/runners/worker/
sdk
_worker.py"
浏览 7
提问于2020-12-09
得票数 1
回答已采纳
2
回答
数据流:
Apache
波束警告:带指数退避的
apache
_
beam
.utils.retry:Retry:
、
、
我有一个简单的管道,3周前它可以正常工作,但是我已经返回代码来增强它,当我试图运行代码时,它返回了以下错误:代码如下所示,它用于将作业提交到数据流
中
,而不存在任何问题。import jsonfrom
apache
_
beam</
浏览 6
提问于2020-10-23
得票数 4
2
回答
如果我们的类路径中有2.2.0,那么我的管道使用哪个版本的
SDK
?
beam
-sdks-java-core的最新版本是2.3.0。然而,如果我的流水线代码使用
beam
-sdks-java-core 2.2.0,那么我的流水线在Dataflow
中
运行时会使用2.2.0还是2.3.0?
浏览 1
提问于2018-03-10
得票数 0
2
回答
如
何在
Python
中
创建从发布/订阅到GCS的数据流管道
、
、
、
我的最终目标是创建一个自定义的管道,所以“发布/订阅到云存储”模板
对
我来说是不够的,而且我
对
Java一无所知,这让我开始在
Python
中
进行
调整。这是我目前所得到的(
Apache
Beam
Python
SDK
2.10.0): import
apache
_
beam
as
beam
TOPIC_PATH="projects/<my-projectlocal
浏览 6
提问于2019-02-18
得票数 7
回答已采纳
1
回答
ApacheBeam on FlinkRunner不读卡夫卡
、
、
我正在尝试运行由本地Flink集群支持的
Apache
,以便使用卡夫卡主题,
如
中所述。但是,在将一些消息写入myTopic之后,终端仍然是冻结的,并且我看不到输出文件夹
中
的任何内容我
对
Beam
/Fl
浏览 1
提问于2020-08-30
得票数 0
回答已采纳
0
回答
Apache
光束字段分区
、
、
、
我想通过一个特定的字段来使用
Python
来划分
Apache
Beam
中
的PCollection。我在
Python
SDK
文档中找到了以下代码,其中
对
PCollection
进行
了分区def partition_fn(student, num_partitions):return int(get_percentile(student) * num_partitions / 100) by_decile = stu
浏览 0
提问于2017-11-30
得票数 0
1
回答
带窗口的GroupByKey之后,
Beam
管道不会产生任何输出,我得到了内存错误。
、
、
目的:问题:如果我减少了元素大小(元素计数不会改变),它就能工作!因为实际上,按步骤分组等待所有数据被分组,然后
触发
所有新的窗口数据。逐行扁平(因此它生成大约10,000个)元素 创建数据的键值(使用从1到10的随机整数键
浏览 3
提问于2019-04-12
得票数 4
1
回答
光束Kafka流输入,无打印或文本输出
、
、
如果我将max_num_records =20放在ReadFromKafka
中
,我可以看到打印或输出为文本的结果。如果我尝试使用
beam
.io.WriteToText输出,将会创建一个空的临时文件夹,
如
:
beam
-temp-StatOut-d16768eadec511eb8bd897b012f36e97 终端显示: 2.30.0: Pulling from
apache
/
beam
_java8_
sdk
Digest: sha256:720144b98d9cb2
浏览 40
提问于2021-07-07
得票数 0
1
回答
数据流作业不产生任何输出。
、
、
、
对于下面的代码,我假设它会产生加窗口的输出,在每个窗口之后有效
触发
。| "Parse" >>
beam
.Map(parse_json)
beam
.window.FixedWindows(5*60), trigger=
beam
.trigger.Repeatedly(
beam
.trigger.AfterProcessingTime(5*60)),这
浏览 2
提问于2020-06-11
得票数 0
回答已采纳
1
回答
在
Apache
Beam
Python
SDK
中发布/订阅到数据存储批处理数据流作业可能吗?
、
、
、
我已经阅读了
Apache
Beam
Python
SDK
文档和许多问题,但对一些事情仍然不确定。 是否可以将发布/订阅IO作为非流作业的一部分
进行
读取?然后,同一作业是否可以使用数据存储IO (当前不支持流)
进行
写入?我是否可以假设默认的全局窗口和
触发
器将正确地告诉作业何时停止从发布/订阅读取(当不再写入批量消息时)?或者我需要添加某种类型的
触发
器/窗口方案,
如
最大时间或最大元素数量?该
触发</e
浏览 2
提问于2019-02-21
得票数 1
1
回答
Golang中有
Apache
Beam
+ Cloud Bigtable连接器吗?
、
、
、
有没有一种方法可以访问存储在Cloud Bigtable
中
的数据,作为运行
Apache
Beam
管道的输入源?
浏览 6
提问于2019-05-09
得票数 2
1
回答
apache
_
beam
(
python
SDK
)是否支持.zip压缩类型
、
我正在用
apache
beam
实现一个批处理管道,它可以解压缩json文件,
对
它们
进行
预处理,然后将它们存储回文件系统
中
的给定位置。 可以使用ZIP或GZIP算法压缩文件。解压
对
GZIP文件工作正常,但对ZIP文件解压失败...经过调查,我发现只有GZIP、BZIP2和DEFLATE压缩类型仅在JAVA
SDK
中
受支持,而不存在
python
实现。
浏览 13
提问于2020-12-18
得票数 0
1
回答
数据流
中
的动态bigquery表名
、
我不知道如
何在
bigquery中高效地完成这项工作,所以我在考虑使用数据流。使用dataflow,我们可以首先加载数据,然后为每条记录创建一个键值
对
,键是我们想要拆分表的特定列的所有可能值,然后我们可以按键
对
记录
进行
分组。所以操作应该是:p|
beam
.io.Read(
beam
.io.BigQuerySource()) |
beam
.map(lambda record:(record‘’splitcol‘,record)) |
beam
.GroupBy
浏览 9
提问于2017-07-13
得票数 0
0
回答
带有CoGroupByKey的
Beam
Java
SDK
2.4/2.5 PAssert
、
、
我使用
beam
sdk
2.4和2.5
进行
了测试。:245) at org.
apache
.
beam
.
sdk
.Pipeline.replaceat org.
apache
.
beam
.
sdk
.Pipeline.run(Pipel
浏览 0
提问于2018-07-14
得票数 0
回答已采纳
1
回答
有没有办法将文件从本地机器复制到
python
+
apache
beam
中
的Dataflow线束实例
、
、
我想根据json模式文件验证ParDo函数
中
每个元素的数据。 为此,我需要从本地计算机复制json模式文件,以利用由
Python
Beam
Dataflow
SDK
创建的Dataflow实例。每个单独的元素表示单独的表的数据(这种不同的元素的变化是26,这意味着可以根据表示表名的元素
中
的键字段将元素转储到这26个表
中
的任何一个表
中
)。我希望这个json模式文件只在Dataflow作业开始时复制一次,然后
对
已经存储了json模式的元素
进行</em
浏览 22
提问于2021-02-11
得票数 0
1
回答
执行TFMA: AttributeError:'NoneType‘对象时出现TFX管道错误,对象没有属性'ToBatchTensors’
、
、
、
基本上,我只重用了iris utils和iris pipeline
中
的代码,
对
服务输入做了很小的更改: def _get_serve_tf_examples_fn(model, tf_transform_output/site-packages/
apache
_
beam
/runners/worker/
sdk
_worker.py", line 256, in _execute response = task()File "/
浏览 49
提问于2020-10-26
得票数 0
回答已采纳
1
回答
AttributeError:模块“google.cloud”没有属性“存储”
、
、
我试图运行一个简单的
beam
脚本上的GCP数据流,以便应用一个科学工具包-学习模型的一些数据。数据需要在应用模型之前和之后
进行
处理。
如
您所见,我尝试使用新的安装在一个新的虚拟环境
中
运行。知道怎么修吗?from
apache
_
beam
.options.pipeline_optionsWARNING:root:M
浏览 12
提问于2020-09-13
得票数 0
回答已采纳
2
回答
用编写每个窗口的唯一拼花文件
、
、
、
、
我试图用
apache
将从kafka消费者到google云存储的消息通过30秒的 windows传输到谷歌云存储。使用
beam
_nuggets.io阅读卡夫卡的主题。您可以看到我的代码如下:from
apache
_
beam
.transforms.trigger import AfterAny, AfterCount, AfterProcessingTime, AfterWatermark, Repeatedly f
浏览 3
提问于2021-10-18
得票数 2
点击加载更多
相关
资讯
Java近期新闻: JReleaser 1.2、Spring Batch、PrimeFaces、Quarkus、JobRunr与Apache Beam
迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler
Apache DolphinScheduler 社区 3 月月报
Kafka落选!InfoWorld最佳开源数据平台奖公布
Apache Flink 社区发布的 Stateful Functions 2.2.0 是什么?
热门
标签
更多标签
云服务器
ICP备案
云直播
腾讯会议
实时音视频
活动推荐
运营活动
广告
关闭
领券