返回

google cloud数据流-Apache beam在无边界输入端被阻止

发布时间:2022-05-09 14:30:28 259
# 数据库# apache# 数据

我的问题与另一篇帖子非常相似:ApacheBeam Cloud数据流卡在输入端。一、 然而,在那里尝试了这个解决方案(将GlobalWindows()应用于side输入),它似乎并没有解决我的问题。

我在Python SDK中有一个数据流管道(但我使用DirectRunner进行调试),其中主要输入是来自PubSub的日志,次要输入是来自基本不变的数据库的关联数据。我想加入这两种方法,以便每个日志与来自相同近似时间的侧输入数据配对。可以删除没有相关日志的多余侧输入。

我看到的行为是,管道似乎作为单个线程运行。它首先处理所有的输入元素,然后开始处理主要的输入元素。如果边输入是有界的(非流式),这就可以了,管道可以合并输入并运行到完成。但是,如果侧输入是无限制的(流式),则在等待侧输入处理完成时,主输入将被无限期地阻止。

为了说明这种行为,我在下面制作了简化的测试用例。

  start = timestamp.Timestamp.now()

  # Bounded side inputs work OK.
  stop = start + 20

  # Unbounded side inputs appear to block execution of main input
  # processing.
  #stop = timestamp.MAX_TIMESTAMP

  side_interval = 5
  main_interval = 1

  side_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start,
          stop_timestamp=stop,
          fire_interval=side_interval,
          apply_windowing=True)
      | apache_beam.Map(lambda x: ('side', x))
      | apache_beam.ParDo(Logger('side_input'))
  )
  main_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start, stop_timestamp=stop,
          fire_interval=main_interval, apply_windowing=True)
      | apache_beam.Map(lambda x: ('main', x))
      | apache_beam.ParDo(Logger('main_input'))
      | 'CrossJoin' >> apache_beam.FlatMap(
          cross_join, rights=apache_beam.pvalue.AsIter(side_input))
      | 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
  )
  pipeline.run()

我遗漏了一些东西,阻止主输入与侧输入并行处理?

谢谢

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像