這篇文章將為大家詳細講解有關Kafka Connect及FileConnector的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
站在用戶的角度思考問題,與客戶深入溝通,找到北屯網站設計與北屯網站推廣的解決方案,憑借多年的經驗,讓設計與互聯網技術結合,創造個性化、用戶體驗好的作品,建站類型包括:成都做網站、成都網站建設、企業官網、英文網站、手機端網站、網站推廣、空間域名、網頁空間、企業郵箱。業務覆蓋北屯地區。
Kafka是一個使用越來越廣的消息系統,尤其是在大數據開發中(實時數據處理和分析)。為何集成其他系統和解耦應用,經常使用Producer來發送消息到Broker,并使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統與Kafka的集成。Kafka Connect運用用戶快速定義并實現各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。
如圖中所示,左側的Sources負責從其他異構系統中讀取數據并導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其他的系統中。
Kafka Connector很多,包括開源和商業版本的。如下列表中是常用的開源的Connector
Connectors | References |
---|---|
Jdbc | Source, Sink |
Elastic Search | Sink1, Sink2, Sink3 |
Cassandra | Source1, Source 2, Sink1, Sink2 |
MongoDB | Source |
HBase | Sink |
Syslog | Source |
MQTT (Source) | Source |
Twitter (Source) | Source, Sink |
S3 | Sink1, Sink2 |
商業版的可以通過Confluent.io獲得
本例演示如何使用Kafka Connect把Source(test.txt)轉為流數據再寫入到Destination(test.sink.txt)中。如下圖所示:
本例使用到了兩個Connector:
FileStreamSource:從test.txt中讀取并發布到Broker中
FileStreamSink:從Broker中讀取數據并寫入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
3.3.3 打開console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line
關于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
當前標題:KafkaConnect及FileConnector的示例分析
當前地址:http://www.yijiale78.com/article26/pcescg.html
成都網站建設公司_創新互聯,為您提供手機網站建設、虛擬主機、App設計、網站改版、云服務器、網站制作
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯