V1
成都創(chuàng)新互聯(lián)專注于彌渡網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供彌渡營銷型網(wǎng)站建設(shè),彌渡網(wǎng)站制作、彌渡網(wǎng)頁設(shè)計、彌渡網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務(wù),打造彌渡網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供彌渡網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
Logstash 2.x版本output-kafka插件只支持kafka-0.8.x版本。但是工作中我們可能用到0.9.x版本的kafka。故而需要升級Logstash-output-kafka插件至3.x版本。
安裝依賴包
yum -y install ruby rubygems ruby-devel gem sources --add https://ruby.taobao.org/ --remove http://rubygems.org/ gem install jar-dependencies -v '0.3.4' gem install ruby-maven -v '3.3.11'
升級output-kafka
/usr/local/logstash/bin/logstash-plugin update logstash-output-kafka
啟動logstash 有如下警告信息
./logstash -f /usr/local/logstash/conf/kafka.conf Settings: Default pipeline workers: 8 log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Pipeline main started
解決辦法
參考網(wǎng)站
1.切換到/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/目錄下
cd /usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/
2.備份kafka.rb文件
mv kafka.rb{,.backup}
3.新建kafka.rb文件內(nèi)容如下:
require 'logstash/namespace' require 'logstash/outputs/base' require 'jruby-kafka' # Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on # the broker. # # The only required configuration is the topic name. The default codec is json, # so events will be persisted on the broker in json format. If you select a codec of plain, # Logstash will encode your messages with not only the message but also with a timestamp and # hostname. If you do not want anything but your message passing through, you should make the output # configuration something like: # [source,ruby] # output { # kafka { # codec => plain { # format => "%{message}" # } # } # } # For more information see http://kafka.apache.org/documentation.html#theproducer # # Kafka producer configuration: http://kafka.apache.org/documentation.html#newproducerconfigs class LogStash::Outputs::Kafka < LogStash::Outputs::Base config_name 'kafka' default :codec, 'json' # The topic to produce messages to config :topic_id, :validate => :string, :required => true # This is for bootstrapping and the producer will only use it for getting metadata (topics, # partitions and replicas). The socket connections for sending the actual data will be # established based on the broker information returned in the metadata. The format is # `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a # subset of brokers. config :bootstrap_servers, :validate => :string, :default => 'localhost:9092' # Serializer class for the key of the message config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer' # Serializer class for the value of the message config :value_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer' # The key that will be included with the record # # If a `message_key` is present, a partition will be chosen using a hash of the key. # If not present, a partition for the message will be assigned in a round-robin fashion. config :message_key, :validate => :string # The number of acknowledgments the producer requires the leader to have received # before considering a request complete. # # acks=0, the producer will not wait for any acknowledgment from the server at all. # acks=1, This will mean the leader will write the record to its local log but # will respond without awaiting full acknowledgement from all followers. # acks=all, This means the leader will wait for the full set of in-sync replicas to acknowledge the record. config :acks, :validate => ["0", "1", "all"], :default => "1" # The total bytes of memory the producer can use to buffer records waiting to be sent to the server. config :buffer_memory, :validate => :number, :default => 33554432 # The compression type for all data generated by the producer. # The default is none (i.e. no compression). Valid values are none, gzip, or snappy. config :compression_type, :validate => ["none", "gzip", "snappy"], :default => "none" # Setting a value greater than zero will cause the client to # resend any record whose send fails with a potentially transient error. config :retries, :validate => :number, :default => 0 # The producer will attempt to batch records together into fewer requests whenever multiple # records are being sent to the same partition. This helps performance on both the client # and the server. This configuration controls the default batch size in bytes. config :batch_size, :validate => :number, :default => 16384 # The id string to pass to the server when making requests. # The purpose of this is to be able to track the source of requests beyond just # ip/port by allowing a logical application name to be included with the request config :client_id, :validate => :string # The producer groups together any records that arrive in between request # transmissions into a single batched request. Normally this occurs only under # load when records arrive faster than they can be sent out. However in some circumstances # the client may want to reduce the number of requests even under moderate load. # This setting accomplishes this by adding a small amount of artificial delay—that is, # rather than immediately sending out a record the producer will wait for up to the given delay # to allow other records to be sent so that the sends can be batched together. config :linger_ms, :validate => :number, :default => 0 # The maximum size of a request config :max_request_size, :validate => :number, :default => 1048576 # The size of the TCP receive buffer to use when reading data config :receive_buffer_bytes, :validate => :number, :default => 32768 # The size of the TCP send buffer to use when sending data. config :send_buffer_bytes, :validate => :number, :default => 131072 # The configuration controls the maximum amount of time the server will wait for acknowledgments # from followers to meet the acknowledgment requirements the producer has specified with the # acks configuration. If the requested number of acknowledgments are not met when the timeout # elapses an error will be returned. This timeout is measured on the server side and does not # include the network latency of the request. config :timeout_ms, :validate => :number, :default => 30000 # When our memory buffer is exhausted we must either stop accepting new # records (block) or throw errors. By default this setting is true and we block, # however in some scenarios blocking is not desirable and it is better to immediately give an error. config :block_on_buffer_full, :validate => :boolean, :default => true # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000 # the max time in milliseconds before a metadata refresh is forced. config :metadata_max_age_ms, :validate => :number, :default => 300000 # The amount of time to wait before attempting to reconnect to a given host when a connection fails. config :reconnect_backoff_ms, :validate => :number, :default => 10 # The amount of time to wait before attempting to retry a failed produce request to a given topic partition. config :retry_backoff_ms, :validate => :number, :default => 100 public def register LogStash::Logger.setup_log4j(@logger) options = { :key_serializer => @key_serializer, :value_serializer => @value_serializer, :bootstrap_servers => @bootstrap_servers, :acks => @acks, :buffer_memory => @buffer_memory, :compression_type => @compression_type, :retries => @retries, :batch_size => @batch_size, :client_id => @client_id, :linger_ms => @linger_ms, :max_request_size => @max_request_size, :receive_buffer_bytes => @receive_buffer_bytes, :send_buffer_bytes => @send_buffer_bytes, :timeout_ms => @timeout_ms, :block_on_buffer_full => @block_on_buffer_full, :metadata_fetch_timeout_ms => @metadata_fetch_timeout_ms, :metadata_max_age_ms => @metadata_max_age_ms, :reconnect_backoff_ms => @reconnect_backoff_ms, :retry_backoff_ms => @retry_backoff_ms } @producer = Kafka::KafkaProducer.new(options) @producer.connect @logger.info('Registering kafka producer', :topic_id => @topic_id, :bootstrap_servers => @bootstrap_servers) @codec.on_event do |event, data| begin key = if @message_key.nil? then nil else event.sprintf(@message_key) end @producer.send_msg(event.sprintf(@topic_id), nil, key, data) rescue LogStash::ShutdownSignal @logger.info('Kafka producer got shutdown signal') rescue => e @logger.warn('kafka producer threw exception, restarting', :exception => e) end end end # def register def receive(event) if event == LogStash::SHUTDOWN return end @codec.encode(event) end def close @producer.close end end #class LogStash::Outputs::Kafka
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
分享名稱:logstash升級kafka插件-創(chuàng)新互聯(lián)
本文地址:http://www.yijiale78.com/article8/dchpip.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、微信小程序、面包屑導(dǎo)航、標(biāo)簽優(yōu)化、做網(wǎng)站、網(wǎng)站收錄
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)