首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java Flink外部源

Java Flink外部源
EN

Stack Overflow用户
提问于 2019-02-09 08:30:46
回答 2查看 301关注 0票数 1

我希望有一个并行的Flink源,它使用内存中的阻塞队列.我的想法是让应用程序将元素推入这个队列,Flink管道会消耗并处理它们。

这方面最好的模式是什么?我看过一些Flink源代码实现(比如Kafka、RabbitMQ等),它们都在初始化源实例中所需的连接。我不能这样做(即从每个源实例中初始化队列),因为

  • 每个源实例实例都会创建自己的队列。
  • 需要引用Flink外部的队列来将元素推送到它。

目前,我已经想出了以下内容,但是静态队列的使用对我来说并不合适。

1.从每个Flink源实例获取其元素的队列.

代码语言:javascript
复制
public class TheQueue implements Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(TheQueue.class);

    private transient static final BlockingQueue<Object> OBJECT_QUEUE = new LinkedBlockingQueue<>();

    public static SerializableSupplier<Object> getObjectConsumer() {
        return () -> {
            return OBJECT_QUEUE.take();
        }
    };
}

2.我的Flink流水线节选.

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(10);
env.addSource(TestParallelSourceFunction.getInstance(TheQueue.getObjectConsumer()))

3. Flink源函数.

代码语言:javascript
复制
public class TestParallelSourceFunction<T> extends RichParallelSourceFunction<T>{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestParallelSourceFunction.class);

    private SerializableSupplier<T> supplier;

    // initialisation code

    @Override
    public void run(final SourceContext<T> ctx) throws Exception {

        LOGGER.info("Starting Flink source.");
        isRunning = true;

        while (isRunning) {
            final T t = supplier.get();
            if (t != null) {
                ctx.collect(t);
            }
        }

        LOGGER.info("Stopped Flink source.");
    }
EN

回答 2

Stack Overflow用户

发布于 2019-06-17 21:07:58

我认为,您对像Kafka和RabbitMQ这样的消息队列系统及其在流应用程序中的作用的理解是有缺陷的。它们是存在于Flink之外的独立服务。Flink不启动或配置它们,它只是打开连接从它们读取。

因此,我们的想法是,启动一个Kafka集群,并给出必要的连接细节和主题名称给您的Flink作业和其他应用程序,将元素推入Kafka。将元素推入队列的应用程序通过tcpip与Kafka集群对话,Flink也是如此。

票数 0
EN

Stack Overflow用户

发布于 2019-09-13 11:00:19

问题是(根据我的理解),Flink获取所有操作符并序列化它们,然后发送给一个反序列化它们的"worker“。这就是为什么源通常在其中创建一个连接,而不接收外部连接的原因。

如果在进程中运行Flink管道(本地执行环境),您可以做的是创建一个类,该类扩展RichSource函数,将ID作为可序列化字段,并在ID和阻塞队列之间建立静态映射。它看起来是这样的(在没有IDE的情况下编写它,所以语法可能略有偏离):

代码语言:javascript
复制
public class BlockingQueueSource<T> extends RichSourceFunction<T> {
  private static final Map<String, BlockingQueue<T>> idToQueue;

  private final String id;
  private volatile boolean isRunning;

  public BlockingQueueSource(String id) {
    this.id = id;
    this.isRunning = true;
  }

  @Override
  public void open(...) {
    idToQueue.put(id, new LinkedBlockingQueue<>());
  }

  public void close() {
    isRunning = false;
    idToQueue.remove(id);
  }

  public void run(SourceContext<T> context) {
    BlockingQueue<T> queue = idToQueue.get(id);

    while(isRunning) {
      T item = queue.take();

      context.collect(item);
    }
  }

  public void addItem(T item) {
    idToQueue.get(id).put(item);
  }
}

再次指出,只有当源位于创建所有Flink管道的相同进程中时,这才能工作,这意味着您使用本地执行环境运行它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54604545

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档