我希望有一个并行的Flink源,它使用内存中的阻塞队列.我的想法是让应用程序将元素推入这个队列,Flink管道会消耗并处理它们。
这方面最好的模式是什么?我看过一些Flink源代码实现(比如Kafka、RabbitMQ等),它们都在初始化源实例中所需的连接。我不能这样做(即从每个源实例中初始化队列),因为
目前,我已经想出了以下内容,但是静态队列的使用对我来说并不合适。
1.从每个Flink源实例获取其元素的队列.
public class TheQueue implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(TheQueue.class);
private transient static final BlockingQueue<Object> OBJECT_QUEUE = new LinkedBlockingQueue<>();
public static SerializableSupplier<Object> getObjectConsumer() {
return () -> {
return OBJECT_QUEUE.take();
}
};
}2.我的Flink流水线节选.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(10);
env.addSource(TestParallelSourceFunction.getInstance(TheQueue.getObjectConsumer()))3. Flink源函数.
public class TestParallelSourceFunction<T> extends RichParallelSourceFunction<T>{
private static final Logger LOGGER = LoggerFactory.getLogger(TestParallelSourceFunction.class);
private SerializableSupplier<T> supplier;
// initialisation code
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("Starting Flink source.");
isRunning = true;
while (isRunning) {
final T t = supplier.get();
if (t != null) {
ctx.collect(t);
}
}
LOGGER.info("Stopped Flink source.");
}发布于 2019-06-17 21:07:58
我认为,您对像Kafka和RabbitMQ这样的消息队列系统及其在流应用程序中的作用的理解是有缺陷的。它们是存在于Flink之外的独立服务。Flink不启动或配置它们,它只是打开连接从它们读取。
因此,我们的想法是,启动一个Kafka集群,并给出必要的连接细节和主题名称给您的Flink作业和其他应用程序,将元素推入Kafka。将元素推入队列的应用程序通过tcpip与Kafka集群对话,Flink也是如此。
发布于 2019-09-13 11:00:19
问题是(根据我的理解),Flink获取所有操作符并序列化它们,然后发送给一个反序列化它们的"worker“。这就是为什么源通常在其中创建一个连接,而不接收外部连接的原因。
如果在进程中运行Flink管道(本地执行环境),您可以做的是创建一个类,该类扩展RichSource函数,将ID作为可序列化字段,并在ID和阻塞队列之间建立静态映射。它看起来是这样的(在没有IDE的情况下编写它,所以语法可能略有偏离):
public class BlockingQueueSource<T> extends RichSourceFunction<T> {
private static final Map<String, BlockingQueue<T>> idToQueue;
private final String id;
private volatile boolean isRunning;
public BlockingQueueSource(String id) {
this.id = id;
this.isRunning = true;
}
@Override
public void open(...) {
idToQueue.put(id, new LinkedBlockingQueue<>());
}
public void close() {
isRunning = false;
idToQueue.remove(id);
}
public void run(SourceContext<T> context) {
BlockingQueue<T> queue = idToQueue.get(id);
while(isRunning) {
T item = queue.take();
context.collect(item);
}
}
public void addItem(T item) {
idToQueue.get(id).put(item);
}
}再次指出,只有当源位于创建所有Flink管道的相同进程中时,这才能工作,这意味着您使用本地执行环境运行它。
https://stackoverflow.com/questions/54604545
复制相似问题