在Kafka 0.10.1.0中使用Flink,可以通过以下步骤进行:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
请注意,${flink.version}
应该替换为你使用的Flink版本。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
public class KafkaFlinkExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer-group");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("my-topic", new SimpleStringSchema(), properties);
env.addSource(consumer)
.print();
env.execute("Kafka Flink Example");
}
}
在上面的示例中,我们创建了一个Kafka消费者,并将其添加到Flink的数据源中。然后,我们使用print()
方法将接收到的数据打印出来。
请注意,你需要根据你的Kafka集群配置更新bootstrap.servers
和group.id
属性,并将my-topic
替换为你要消费的Kafka主题。
mvn clean package
然后,使用以下命令提交作业到Flink集群:
./bin/flink run -c com.example.KafkaFlinkExample path/to/your/jar-file.jar
请注意,com.example.KafkaFlinkExample
应该替换为你的主类名,path/to/your/jar-file.jar
应该替换为你的JAR文件路径。
领取专属 10元无门槛券
手把手带您无忧上云