google cloud数据流-Apache beam在无边界输入端被阻止
发布时间:2022-05-09 14:30:28 259
相关标签: # 数据库# apache# 数据
我的问题与另一篇帖子非常相似:ApacheBeam Cloud数据流卡在输入端。一、 然而,在那里尝试了这个解决方案(将GlobalWindows()应用于side输入),它似乎并没有解决我的问题。
我在Python SDK中有一个数据流管道(但我使用DirectRunner进行调试),其中主要输入是来自PubSub的日志,次要输入是来自基本不变的数据库的关联数据。我想加入这两种方法,以便每个日志与来自相同近似时间的侧输入数据配对。可以删除没有相关日志的多余侧输入。
我看到的行为是,管道似乎作为单个线程运行。它首先处理所有的输入元素,然后开始处理主要的输入元素。如果边输入是有界的(非流式),这就可以了,管道可以合并输入并运行到完成。但是,如果侧输入是无限制的(流式),则在等待侧输入处理完成时,主输入将被无限期地阻止。
为了说明这种行为,我在下面制作了简化的测试用例。
start = timestamp.Timestamp.now()
# Bounded side inputs work OK.
stop = start + 20
# Unbounded side inputs appear to block execution of main input
# processing.
#stop = timestamp.MAX_TIMESTAMP
side_interval = 5
main_interval = 1
side_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start,
stop_timestamp=stop,
fire_interval=side_interval,
apply_windowing=True)
| apache_beam.Map(lambda x: ('side', x))
| apache_beam.ParDo(Logger('side_input'))
)
main_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start, stop_timestamp=stop,
fire_interval=main_interval, apply_windowing=True)
| apache_beam.Map(lambda x: ('main', x))
| apache_beam.ParDo(Logger('main_input'))
| 'CrossJoin' >> apache_beam.FlatMap(
cross_join, rights=apache_beam.pvalue.AsIter(side_input))
| 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
)
pipeline.run()
我遗漏了一些东西,阻止主输入与侧输入并行处理?
谢谢
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报