Flink未來將與 Pulsar集成提供大規模的彈性數據處理
未來整合
Pulsar可以以不同的方式與Apache Flink集成。一些潛在的集成包括使用流式連接器為流式工作負載提供支持,并使用批量源連接器支持批量工作負載。Pulsar還提供對schema 的本地支持,可以與Flink集成并提供對數據的結構化訪問,例如使用Flink SQL作為在Pulsar中查詢數據的方式。最后,集成這些技術的另一種方法可能包括使用Pulsar作為Flink的狀態后端。由于Pulsar具有分層架構(Streams和Segmented Streams,由Apache Bookkeeper提供支持),因此將Pulsar用作存儲層并存儲Flink狀態變得很自然。
從體系結構的角度來看,我們可以想象兩個框架之間的集成,它使用Apache Pulsar作為統一的數據層視圖,Apache Flink作為統一的計算和數據處理框架和API。
現有集成
兩個框架之間的集成正在進行中,開發人員已經可以通過多種方式將Pulsar與Flink結合使用。例如,Pulsar可用作Flink DataStream應用程序中的流媒體源和流式接收器。開發人員可以將Pulsar中的數據提取到Flink作業中,該作業可以計算和處理實時數據,然后將數據作為流式接收器發送回Pulsar主題。這樣的例子如下所示:
// create and configure Pulsar consumer
PulsarSourceBuilder<String>builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
// ingest DataStream with Pulsar consumer
DataStream<String> words = env.addSource(src);
// perform computation on DataStream (here a simple WordCount)
DataStream<WordWithCount> wc = words
.flatMap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {
collector.collect(new WordWithCount(word, 1));
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
// emit result via Pulsar producer
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word)
);
開發人員可以利用的兩個框架之間的另一個集成包括將Pulsar用作Flink SQL或Table API查詢的流式源和流式表接收器,如下例所示:
// obtain a DataStream with words
DataStream<String> words = ...
// register DataStream as Table "words" with two attributes ("word", "ts").
// "ts" is an event-time timestamp.
tableEnvironment.registerDataStream("words", words, "word, ts.rowtime");
// create a TableSink that produces to Pulsar
TableSink sink = new PulsarJsonTableSink(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
ROUTING_KEY);
// register Pulsar TableSink as table "wc"
tableEnvironment.registerTableSink(
"wc",
sink.configure(
new String[]{"word", "cnt"},
new TypeInformation[]{Types.STRING, Types.LONG}));
// count words per 5 seconds and write result to table "wc"
tableEnvironment.sqlUpdate(
"INSERT INTO wc " +
"SELECT word, COUNT(*) AS cnt " +
"FROM words " +
"GROUP BY word, TUMBLE(ts, INTERVAL '5' SECOND)");
最后,Flink將批量工作負載與Pulsar集成為批處理接收器,其中所有結果在Apache Flink完成靜態數據集中的計算后被推送到Pulsar。這樣的例子如下所示:
// obtain DataSet from arbitrary computation
DataSet<WordWithCount> wc = ...
// create PulsarOutputFormat instance
OutputFormat pulsarOutputFormat = new PulsarOutputFormat(
serviceUrl,
topic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes());
// write DataSet to Pulsar
wc.output(pulsarOutputFormat);
結論
Pulsar和Flink都對應用程序的數據和計算級別如何以批量作為特殊情況流“流式傳輸”方式分享了類似的觀點。通過Pulsar的Segmented Streams方法和Flink在一個框架下統一批處理和流處理工作負載的步驟,有許多方法將這兩種技術集成在一起,以提供大規模的彈性數據處理。
請輸入評論內容...
請輸入評論/評論長度6~500個字
最新活動更多
- 1 AI狂歡遇上油價破百,全球股市還能漲多久? | 產聯看全球
- 2 OpenAI深夜王炸!ChatGPT Images 2.0實測:中文穩、細節炸,設計師慌了
- 3 6000億美元估值錨定:字節跳動的“去單一化”突圍與估值重構
- 4 Tesla AI5芯片最新進展總結
- 5 連夜測了一波DeepSeek-V4,我發現它可能只剩“審美”這個短板了
- 6 熱點丨AI“瑜亮之爭”:既生OpenClaw,何生Hermes?
- 7 AI界的殺豬盤:9秒刪庫跑路,全員被封號,還繼續扣錢!
- 8 2026,人形機器人只贏了面子
- 9 DeepSeek降價90%:價格屠夫不是身份,是戰略
- 10 AI Infra產業鏈卡在哪里了?


分享













