要将Apache Flink连接到RabbitMQ,您需要执行以下步骤:
在您的pom.xml
文件中添加RabbitMQ客户端和Flink-connector-rabbitmq依赖项:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>1.11.2</version>
</dependency>
创建一个Flink RabbitMQ Source用于从RabbitMQ消费消息:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class RabbitMQConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
DataStream<String> messageStream = env
.addSource(new RMQSource<>(
connectionConfig,
"my-queue", // RabbitMQ queue name
true, // use correlation ids
new SimpleStringSchema() // deserialization schema
)).name("RabbitMQ Source");
messageStream.print();
env.execute("RabbitMQ Consumer");
}
}
创建一个Flink RabbitMQ Sink用于向RabbitMQ发送消息:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache催化flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbit区和mq.common.RMQConnectionConfig;
public class RabbitMQProducer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionState.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
DataStream<String> messageStream = env.fromElements(
"Hello",
"RabbitMQ"
);
messageStream.addSink(new RMQSink<>(
connectionConfig,
"my-queue", // RabbitMQ queue name
new SimpleStringSchema() // serialization schema
)).name("RabbitMQ Sink");
env.execute("RabbitMQ Producer");
}
}
现在,Flink已经成功连接到RabbitMQ,并可以使用上面创建的Source和Sink来消费和发送消息。确保RabbitMQ服务器正在运行,且已创建所需的队列(在本示例中为my-queue
)。
领取专属 10元无门槛券
手把手带您无忧上云