您的位置:68399皇家赌场 > 域名注册 > Kafka入门杰出教程

Kafka入门杰出教程

发布时间:2019-08-24 14:12编辑:域名注册浏览(155)

    卡夫卡 Log4j达成日志聚集处理

    笔录怎么着利用卡夫卡 Log4j达成聚集国和东瀛志管理的经过。

    引言

    前方写的《Spring Log4j ActiveMQ达成远程记录日志——实战 深入分析》得到了相当的多同桌的承认,在确认的同不经常间,也许有同学提议能够行使卡夫卡来聚焦管理日志,于是前几天就来上学一下。

    特地表明,由于网络上有关Kafka Log4j的欧洲经济共同体例子并没有多少,作者也是单向学习一边选拔,因而一旦有表明得不好可能失实的地点,迎接批评指正,假如您有好的主见,也招待留言钻探。
     
    第3局地 搭建卡夫卡景况

    安装Kafka

    下载:

    tar zxf kafka-<VERSION>.tgz
    cd kafka-<VERSION>

    启动Zookeeper

    开发银行Zookeeper前必要配置一下config/zookeeper.properties:

    68399皇家赌场 1

    接下去运维Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    启动Kafka Server

    开发银行卡夫卡Server前须求安插一下config/server.properties。主要布置以下几项,内容就背着了,注释里都很详细:

    68399皇家赌场 2

    下一场启动卡夫卡 Server:

    bin/kafka-server-start.sh config/server.properties

    创建Topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    查阅创设的Topic

    >bin/kafka-topics.sh --list --zookeeper localhost:2181

    运营调控台Producer,向卡夫卡发送信息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    ^C

    起步调节台Consumer,费用刚刚发送的新闻

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    This is a message
    This is another message

    删除Topic

    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

    注:独有当delete.topic.enable=true时,该操作才使得

    陈设卡夫卡集群(单台机器上)

    率先拷贝server.properties文件为多份(这里演示4个节点的卡夫卡集群,因而还索要拷贝3份配置文件):

    cp config/server.properties config/server1.properties
    cp config/server.properties config/server2.properties
    cp config/server.properties config/server3.properties

    修改server1.properties的以下内容:

    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

    同理修改server2.properties和server3.properties的这一个内容,并保持全部配置文件的zookeeper.connect属性都指向运营在本机的zookeeper地址localhost:2181。注意,由于那个卡夫卡节点都将运转在同等台机械上,由此须要保险那多少个值差别,这里以增进的点子处理。举例在server2.properties上:

    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

    把server3.properties也安插好之后,依次运行这一个节点:

    bin/kafka-server-start.sh config/server1.properties &
    bin/kafka-server-start.sh config/server2.properties &
    bin/kafka-server-start.sh config/server3.properties &

    Topic & Partition

    Topic在逻辑上能够被感觉是一个queue,每条消费都不可能不钦点它的Topic,能够简轻巧单领悟为必得指明把那条音讯放进哪个queue里。为了使得卡夫卡的吞吐率可以线性升高,物理上把Topic分成一个或三个Partition,各个Partition在物理上相应三个文本夹,该文件夹下存款和储蓄这么些Partition的具备新闻和目录文件。

    现行反革命在卡夫卡集群上开创备份因子为3,分区数为4的Topic:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

    评释:备份因子replication-factor越大,则证实集群容错性越强,就是当集群down掉后,数据恢复生机的可能性越大。全部的分区数里的内容共同构成了一份数据,分区数partions越大,则该topic的音讯就越分散,集群中的音讯传布就越均匀。

    然后使用kafka-topics.sh的--describe参数查看一下Topic为kafka的详细的情况:

    68399皇家赌场 3

    出口的率先行是兼备分区的轮廓,接下去的每一行是多少个分区的描述。能够看出Topic为kafka的音讯,PartionCount=4,ReplicationFactor=3便是大家创立时钦定的分区数和备份因子。

    其它:Leader是指担当这几个分区全数读写的节点;Replicas是指那几个分区所在的兼具节点(不论它是否活着);IS哈弗是Replicas的子集,代表存有其一分区新闻同期方今活着的节点。

    拿partition:0那一个分区来讲,该分区的Leader是server0,遍及在id为0,1,2那多少个节点上,並且那八个节点都活着。

    再来看下卡夫卡集群的日志:

    68399皇家赌场 4

    其间kafka-logs-0意味server0的日记,kafka-logs-1代表server1的日记,依此类推。

    从上面包车型大巴布署可见,id为0,1,2,3的节点分别对应server0, server1, server2, server3。而上例中的partition:0遍布在id为0, 1, 2那多个节点上,因而得以在server0, server1, server2那四个节点上收看有kafka-0以此文件夹。那个kafka-0就意味着Topic为kafka的partion0。
     
    第二片段 卡夫卡 Log4j项目结合

    先来看下Maven项目布局图:

    68399皇家赌场 5

    作为德姆o,文件十分的少。先看看pom.xml引入了何等jar包:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>18.0</version>
    </dependency>

    珍视的从头到尾的经过是log4j.properties:

    log4j.rootLogger=INFO,console
     
    # for package com.demo.kafka, log would be sent to kafka appender.
    log4j.logger.com.demo.kafka=DEBUG,kafka
     
    # appender kafka
    log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
    log4j.appender.kafka.topic=kafka
    # multiple brokers are separated by comma ",".
    log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095
    log4j.appender.kafka.compressionType=none
    log4j.appender.kafka.syncSend=true
    log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
     
    # appender console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n  

    App.java里面就很粗大略啦,首假若透过log4j输出日志:

    package com.demo.kafka;
    import org.apache.log4j.Logger;
    public class App {
        private static final Logger LOGGER = Logger.getLogger(App.class);
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 20; i ) {
                LOGGER.info("Info [" i "]");
                Thread.sleep(1000);
            }
        }
    }

    MyConsumer.java用于花费kafka中的消息:

    package com.demo.kafka;
     
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import com.google.common.collect.ImmutableMap;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
     
    public class MyConsumer {
        private static final String ZOOKEEPER = "localhost:2181";
        //groupName能够Infiniti制给,因为对于kafka里的每条音讯,每一个group都会完全的拍卖三次
        private static final String GROUP_NAME = "test_group";
        private static final String TOPIC_NAME = "kafka";
        private static final int CONSUMER_NUM = 4;
        private static final int PARTITION_NUM = 4;
     
        public static void main(String[] args) {
            // specify some consumer properties
            Properties props = new Properties();
            props.put("zookeeper.connect", ZOOKEEPER);
            props.put("zookeeper.connectiontimeout.ms", "1000000");
            props.put("group.id", GROUP_NAME);
     
            // Create the connection to the cluster
            ConsumerConfig consumerConfig = new ConsumerConfig(props);
            ConsumerConnector consumerConnector =
                Consumer.createJavaConsumerConnector(consumerConfig);
     
            // create 4 partitions of the stream for topic “test”, to allow 4
            // threads to consume
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
                consumerConnector.createMessageStreams(
                    ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
            List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);
     
            // create list of 4 threads to consume from each of the partitions
            ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
     
            // consume the messages in the threads
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                executor.submit(new Runnable() {
                    public void run() {
                        for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
                            // process message (msgAndMetadata.message())
                            System.out.println(new String(msgAndMetadata.message()));
                        }
                    }
                });
            }
        }
    }

    MyProducer.java用于向卡夫卡发送音讯,但不通过log4j的appender发送。此案例中得以绝不。不过自身也许放在这里:

    package com.demo.kafka;
     
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
     
    public class MyProducer {
        private static final String TOPIC = "kafka";
        private static final String CONTENT = "This is a single message";
        private static final String BROKER_LIST = "localhost:9092";
        private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
       
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("serializer.class", SERIALIZER_CLASS);
            props.put("metadata.broker.list", BROKER_LIST);
           
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
     
            //Send one message.
            KeyedMessage<String, String> message =
                new KeyedMessage<String, String>(TOPIC, CONTENT);
            producer.send(message);
           
            //Send multiple messages.
            List<KeyedMessage<String,String>> messages =
                new ArrayList<KeyedMessage<String, String>>();
            for (int i = 0; i < 5; i ) {
                messages.add(new KeyedMessage<String, String>
                    (TOPIC, "Multiple message at a time. " i));
            }
            producer.send(messages);
        }
    }

    到此地,代码就终止了。
     
    其三局地 运维与认证

    先运维MyConsumer,使其处于监听状态。同不时候,仍是可以够运转卡夫卡自带的ConsoleConsumer来证实是不是跟MyConsumer的结果一致。最终运维App.java。

    先来探问MyConsumer的出口:

    68399皇家赌场 6

    再来看看ConsoleConsumer的出口:

    68399皇家赌场 7

    能够看来,即使发往卡夫卡的音讯去往了差别的地点,可是内容是同样的,并且一条也十分的多。最后再来看看卡夫卡的日记。

    我们领略,Topic为kafka的音讯有4个partion,以前边的截图可见那4个partion均匀遍布在4个kafka节点上,于是笔者对每二个partion随机挑选三个节点查看了日志内容。

    上海教室藕中湖蓝选中部分每一种代表在server0上查看partion0,在server1上查看partion1,以此类推。

    而金红部分是日记内容,由于在开立Topic时希图将20条日志分成4个区存储,能够很精通的见到,那20条日志确实是很均匀的积累在了多少个partion上。

    摘一点Infoq上的话:每一种日志文件都以三个log entrie系列,每一个log entrie包蕴二个4字节整型数值(值为N 5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的音信体。每条新闻皆有二个当下Partition下独一的64字节的offset,它指明了那条新闻的胚胎地点。磁盘上囤积的音信格式如下:

    message length : 4 bytes (value: 1 4 n)
    "magic" value : 1 byte
    crc : 4 bytes
    payload : n bytes

    此间大家看到的日志文件的每一行,正是一个log entrie,每一行前边不可能出示的字符(深灰蓝选中部分),正是(message length magic value crc)了。而log entrie的后有的,则是音讯体的剧情了。

    问题:

    1. 若是要使用此种格局,有一种现象是提取某天大概某小时的日记,那么哪些设计Topic呢?是或不是要在Topic上带走日期恐怕时辰数?还会有越来越好的解决方案吗?

    2. 假诺按每小时设计Topic,那么怎么样在运用诸如logger.info()那样的秘诀时,自动依照时间去改变Topic呢?有接近的例子吗?

    ----款待调换,共同进步。

    样例下载:

    ------------------------------------------分割线------------------------------------------

    无偿下载地址在

    顾客名与密码都是www.bkjia.com

    具体下载目录在 /二零一六年龄资历料/7月/二二十四日/卡夫卡 Log4j完成日志集中管理

    下载格局见

    ------------------------------------------分割线------------------------------------------

    参照页面:

    布满式公布订阅音信系统 卡夫卡 架构划虚拟计

    Apache 卡夫卡 代码实例

    Apache 卡夫卡 教程笔记

    Apache kafka原理与特点(0.8V) 

    卡夫卡铺排与代码实例 

    卡夫卡介绍和集群情状搭建 

    卡夫卡 的详细介绍:请点这里
    卡夫卡 的下载地址:请点这里

    本文长久更新链接地址:

    记录如何采纳卡夫卡 Log4j完毕聚集国和扶桑志管理的进度。 引言 前边写的《Spring Log4j ActiveMQ达成长途记录日志实战 深入分析...

    1. import java.util.HashMap;  
    2. import java.util.List;  
    3. import java.util.Map;  
    4. import java.util.Properties;  
    5.   
    6. import kafka.consumer.Consumer;  
    7. import kafka.consumer.ConsumerConfig;  
    8. import kafka.consumer.ConsumerIterator;  
    9. import kafka.consumer.KafkaStream;  
    10. import kafka.javaapi.consumer.ConsumerConnector;  
    11.   
    12.   
    13.   
    14.   
    15. /** 
    16.  * 接收数据 
    17.  * 接收到: message: 10 
    18. 接收到: message: 11 
    19. 接收到: message: 12 
    20. 接收到: message: 13 
    21. 接收到: message: 14 
    22.  * @author zm 
    23.  * 
    24.  */  
    25. public class kafkaConsumer extends Thread{  
    26.   
    27.     private String topic;  
    28.       
    29.     public kafkaConsumer(String topic){  
    30.         super();  
    31.         this.topic = topic;  
    32.     }  
    33.       
    34.       
    35.     @Override  
    36.     public void run() {  
    37.         ConsumerConnector consumer = createConsumer();  
    38.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
    39.         topicCountMap.put(topic, 1); // 一回从主旨中获取四个数目  
    40.          Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
    41.          KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每一次接到到的这么些数量  
    42.          ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
    43.          while(iterator.hasNext()){  
    44.              String message = new String(iterator.next().message());  
    45.              System.out.println("接收到: "   message);  
    46.          }  
    47.     }  
    48.   
    49.     private ConsumerConnector createConsumer() {  
    50.         Properties properties = new Properties();  
    51.         properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk  
    52.         properties.put("group.id", "group1");// 必须要动用其他组名称, 若是生产者和花费者都在长久以来组,则不能够访问同一组内的topic数据  
    53.         return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
    54.      }  
    55.       
    56.       
    57.     public static void main(String[] args) {  
    58.         new kafkaConsumer("test").start();// 使用kafka集群中开创好的宗旨 test   
    59.           
    60.     }  
    61.        
    62. }  

    68399皇家赌场 8

     

    复制代码
    config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

     

    复制代码
    Step 3: 创建 topic成立一个名叫“test”的topic,它唯有二个分区,四个别本。> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    Java代码  68399皇家赌场 9

    68399皇家赌场 10

    • kafka.rar (8.6 KB)
    • 下载次数: 888

    分区函数有四个参数:key和可用的分区数量,从分区列表中甄选四个分区并回到id。暗中同意的分区战略是hash(key)%numPartitions.假如key是null,就随意的挑选二个。能够通过参数partitioner.class定制分区函数。

    代码见附属类小部件:

    复制代码
    ctrl c可以脱离发送。Step 5: 启动consumer卡夫卡 also has a command line consumer that will dump out messages to standard output.卡夫卡也许有叁个限令行consumer能够读取音信并出口到标准输出:> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    This is a message
    This is another message

    2.1 java端生产数据, kafka集群花费数量:

    复制代码
    您在多个巅峰中运作consumer命令行,另三个极限中运转producer命令行,就足以在一个极端输入消息,另三个终极读取新闻。那四个指令都有自身的可选参数,能够在运维的时候不加任何参数能够看看协助新闻。Step 6: 搭建八个四个broker的集群刚刚只是运转了单个broker,未来起步有3个broker组成的集群,这么些broker节点也都以在本机上的:首先为种种节点编写配置文件:> cp config/server.properties config/server-1.properties

    Java代码  68399皇家赌场 11

    复制代码
    删除日记管理器允许定制删除战略。近日的国策是剔除修改时间在N天事先的日记(按期间删除),也得以应用其他几个战略:保留最终的N GB数据的国策(按大小删除)。为了幸免在剔除时打断读操作,采纳了copy-on-write格局的达成,删除操作实行时,读取操作的二分查找功效实在是在四个静态的快速照相别本上进行的,这看似于Java的CopyOnWriteArrayList。可信赖性保障日记文件有多少个可配置的参数M,缓存超越这一个数额的音讯将被强行刷新到硬盘。二个日志考订线程将循环检查最新的日志文件中的消息确认各类音信都是法定的。合法的正规为:全数文件的深浅的和最大的offset小于日志文件的轻重,並且消息的CRC32校验码与仓库储存在音信实体中的校验码一致。借使在有些offset开掘不合规的音信,从这几个offset到下一个法定的offset之间的剧情将被移除。有三种情状必得思考:1,当发生崩溃时有一点点数据块未能写入。2,写入了一部分空白数据块。第三种情景的来头是,对于每一个文件,操作系统都有三个inode(inode是指在非常多“类Unix文件系统”中的一种数据结构。各种inode保存了文件系统中的一个文件系统对象,包涵文件、目录、大小、设备文件、socket、管道, 等等),但无法确定保证更新inode和写入数据的次第,当inode保存的尺寸音讯被更新了,但写入数据时爆发了崩溃,就发出了空荡荡数据块。CRC校验码能够检查这个块并移除,当然因为崩溃而未写入的数量块也就不见了。

    Java代码  68399皇家赌场 12

    复制代码
    ...
    my test message 1my test message 2^C

     

    复制代码
    低档其余API是尖端别API完结的底子,也是为着局地对保持消费情形有独特殊需求要的境况,举个例子Hadoop consumer那样的离线consumer。高端其余API/* 创造连接 /
    ConsumerConnector connector = Consumer.create(consumerConfig);
    interface ConsumerConnector {
    /
    *

     3 kafka 使用Java写成本者,那样 先运维kafkaProducer ,在运维kafkaConsumer,就可以获取生产者的数目:

    绝大相当多音讯系统宣称可以落成“准确的二次”,但是稳重翻阅它们的的文档能够见见里面存在误导,比方未有认证当consumer或producer失利时怎么样,或然当有七个consumer并行时怎么,或写入硬盘的数码错过时又会如何。kafka的做法要更升高一些。当宣布音信时,卡夫卡有一个“committed”的概念,一旦音信被交给了,只要音讯被写入的分区的三街六巷的别本broker是移动的,数据就不会甩掉。关于别本的位移的定义,下节文档会商量。以后只要broker是不会down的。假若producer公布新闻时发生了网络错误,但又不明确实在付出以前发生的还是提交今后发出的,这种情况就算不普遍,不过必需思索进来,以后卡夫卡版本还尚未化解那个难题,以往的本子正在着力尝试消除。并不是具有的情景都亟待“正确的三遍”那样高的品级,卡夫卡允许producer灵活的内定品级。举例producer可以钦定必得等待消息被提交的公告,可能完全的异步发送新闻而不等待其余通告,大概只有等候leader注脚它得到了消息(followers没有要求)。未来从consumer的下边思索这么些难题,全体的别本都有同等的日记文件和均等的offset,consumer维护本人开支的新闻的offset,要是consumer不会崩溃当然能够在内部存储器中保存那么些值,当然什么人也不能担保那一点。借使consumer崩溃了,会有其它二个consumer接着开销音信,它必要从一个妥贴的offset继续管理。这种情景下能够有以下选拔:consumer能够先读取新闻,然后将offset写入日志文件中,然后再管理新闻。那存在一种或者就是在存款和储蓄offset后还没管理信息就crash了,新的consumer继续从这么些offset管理,那么就能够稍为音讯恒久不会被拍卖,那便是上面说的“最多一回”。
    consumer能够先读取音讯,处理音信,最终记录offset,当然假使在记录offset从前就crash了,新的consumer会重复的花费一些新闻,那就是地点说的“最少一遍”。
    “正确二次”能够透过将送交分为四个阶段来缓慢解决:保存了offset后提交贰次,新闻管理成功以后再交付贰遍。不过还应该有个更简约的做法:将新闻的offset和消息被拍卖后的结果保存在协同。比方用Hadoop ETL管理音信时,将拍卖后的结果和offset同期保留在HDFS中,那样就能够担保新闻和offser同期被拍卖了。

     

    由四个机械组成的集群具备4个分区 (P0-P3) 2个consumer组. A组有四个consumerB组有4个对照古板的音讯系统,卡夫卡能够很好的保证有序性。
    古板的行列在服务器上保留有序的新闻,假如四个consumers同不时间从那些服务器花费消息,服务器就能够以音信存款和储蓄的一一贯consumer分发新闻。固然服务器按顺序公布新闻,可是新闻是被异步的分发到各consumer上,所以当音讯达到时大概已经失去了原本的次第,那表示并发开支将促成顺序错乱。为了幸免故障,那样的信息系统经常接纳“专项使用consumer”的定义,其实便是只同意三个主顾开支音讯,当然那就代表失去了并发性。在那上头卡夫卡做的更加好,通过分区的概念,卡夫卡能够在八个consumer组并发的状态下提供较好的有序性和负载均衡。将各个分区分只分发给二个consumer组,那样一个分区就只被那些组的贰个consumer花费,就足以顺序的花费那些分区的音信。因为有多少个分区,依然得以在四个consumer组之间进行负荷均衡。注意consumer组的多寡不能够多于分区的多寡,相当于有多少分区就同意多少并发成本。卡夫卡只可以保险多少个分区之内音讯的有序性,在分歧的分区之间是不可能的,那早已足以满意大多数行使的急需。借使急需topic中具有音讯的有序性,那就只好让这几个topic独有多少个分区,当然也就唯有二个consumer组消费它。###########################################二、情状搭建
    Step 1: 下载Kafka点击下载最新的版本并解压.> tar -xzf kafka_2.9.2-0.8.1.1.tgz

     

    复制代码
    费用这几个音讯:> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

     

    bin/kafka-server-start.sh config/server-2.properties &
    ...

     

    68399皇家赌场 13

    1. 1.zookeeper集群  搭建在110, 111,112  
    2.   
    3. 2.kafka使用3个节点110, 111,112  
    4. 修改配置文件config/server.properties  
    5. broker.id=110  
    6. host.name=192.168.1.110  
    7. log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs  
    8. 复制到其余五个节点,然后修改对应节点上的config/server.pro   
    9.   
    10. 3.起动,在七个节点分别试行  
    11. bin/kafka-server-start.sh  config/server.properties >/dev/null 2>&1 &  
    12.   
    13. 4 创立主旨  
    14. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test  
    15.   
    16. 5 查看主旨详细  
    17. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  
    18.  --topic test  
    19. Topic:test      PartitionCount:3        ReplicationFactor:3     Configs:  
    20.         Topic: test     Partition: 0    Leader: 110     Replicas: 110,111,112  Isr: 110,111,112  
    21.         Topic: test     Partition: 1    Leader: 111     Replicas: 111,112,110  Isr: 111,112,110  
    22.         Topic: test     Partition: 2    Leader: 112     Replicas: 112,110,111  Isr: 112,110,111  
    23.   
    24.   
    25. 6 去zk上看kafka集群  
    26. [zk: localhost:2181(CONNECTED) 5] ls /  
    27. [admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]  
    28. [zk: localhost:2181(CONNECTED) 6] ls /brokers   ----> 查看注册在zk内的kafka  
    29. [topics, ids]  
    30. [zk: localhost:2181(CONNECTED) 7] ls /brokers/ids  
    31. [112, 110, 111]  
    32. [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112  
    33. []  
    34. [zk: localhost:2181(CONNECTED) 9] ls /brokers/topics   
    35. [test]  
    36. [zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test   
    37. [partitions]  
    38. [zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions  
    39. [2, 1, 0]  
    40. [zk: localhost:2181(CONNECTED) 12]   
    • @author leicui bourne_cui@163.com
      */
      public class KafkaConsumer extends Thread
      {
      private final ConsumerConnector consumer;
      private final String topic;
      public KafkaConsumer(String topic)
      {
      consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig());
      this.topic = topic;
      }
      private static ConsumerConfig createConsumerConfig()
      {
      Properties props = new Properties();
      props.put("zookeeper.connect", KafkaProperties.zkConnect);
      props.put("group.id", KafkaProperties.groupId);
      props.put("zookeeper.session.timeout.ms", "40000");
      props.put("zookeeper.sync.time.ms", "200");
      props.put("auto.commit.interval.ms", "1000");
      return new ConsumerConfig(props);
      }
      @Override
      public void run() {
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topic, new Integer(1));
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
      KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      while (it.hasNext()) {
      System.out.println("receive:" new String(it.next().message()));
      try {
      sleep(3000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }

    2  kafka java调用:

    种种分区都由一名目许多有序的、不可变的音信构成,那么些音讯被接连的增加到分区中。分区中的各个音讯都有一个一连的队列号叫做offset,用来在分区中独一的标记那么些音信。在八个可布置的年月段内,卡夫卡集群保留全体发表的消息,不管那一个新闻有未有被花费。举例,假若音信的保存战略被安装为2天,那么在三个新闻被揭露的两日时间内,它都以能够被花费的。之后它将被撤除以自由空间。卡夫卡的属性是和数据量毫无干系的常量级的,所以保留太多的数码而不成难点。实际上每种consumer独一需求体贴的多少是消息在日记中的位置,也正是offset.那一个offset有consumer来保卫安全:一般意况下随着consumer不断的读取新闻,那offset的值持续追加,但事实上consumer能够以随机的逐一读取新闻,举个例子它能够将offset设置成为二个旧的值来重读在此之前的音讯。以上特点的整合,使卡夫卡consumers非常的轻量级:它们得以在窘迫集群和别的consumer产生影响的景色下读取新闻。你能够行职分令行来"tail"音讯而不会对其余正在消费音信的consumer产生影响。将日志分区能够达到规定的标准以下指标:首先那使得各种日志的数码不会太大,能够在单个服务上保留。别的每一种分区可以独立发布和花费,为出现操作topic提供了一种或许。布满式
    各类分区在卡夫卡集群的若干服务中都有别本,那样这一个具备副本的劳务能够共同处理数量和呼吁,别本数量是足以安插的。别本使卡夫卡具有了容错技巧。各种分区都由多少个服务器作为“leader”,零或若干服务器作为“followers”,leader肩负处理新闻的读和写,followers则去复制leader.假设leader down了,followers中的一台则会自行成为leader。集群中的每一个服务都会同一时间扮演七个剧中人物:作为它所享有的一片段分区的leader,同期作为任何分区的followers,那样集群就能占领较好的载重均衡。68399皇家赌场,Producers
    Producer将音信公布到它内定的topic中,并负责调控宣布到哪些分区。平日简单的由负载均衡机制随机挑选分区,但也能够经过一定的分区函数选用分区。使用的越多的是第三种。Consumers
    颁发新闻平日有二种情势:队列方式(queuing)和发布-订阅形式(publish-subscribe)。队列情势中,consumers能够同不平日候从服务端读取新闻,每种新闻只被内部二个consumer读到;发表-订阅方式中消息被广播到具有的consumer中。Consumers能够投入二个consumer 组,共同竞争贰个topic,topic中的新闻将被分发到组中的二个成员中。同一组中的consumer可以在差别的顺序中,也得以在不一样的机器上。如若持有的consumer都在二个组中,那就改为了观念的行列格局,在各consumer中实现负载均衡。倘使全部的consumer都不在差别的组中,那就改成了发表-订阅方式,全部的新闻都被分发到具备的consumer中。更加宽泛的是,各样topic都有多少数量的consumer组,每一个组都以二个逻辑上的“订阅者”,为了容错和更加好的和睦,每一个组由若干consumer组成。那其实正是三个揭露-订阅格局,只然则订阅者是个组实际不是单个consumer。

     

    那是二个在可用性和延续性之间的权衡。若是等待IS锐界中的节点苏醒,一旦ISPRADO中的节点起不起来照旧数额都以了,那集群就恒久复苏持续了。假若等待ISMurano意外的节点苏醒,那一个节点的数码就能够被看做线上数据,有极大恐怕和真正的多少颇具出入,因为有个别数据它或者还没一块到。Kafka这段时间甄选了第二种政策,在现在的本子中校使那几个布署的接纳可配备,能够根据气象灵活的抉择。这种困境不只卡夫卡会越过,大概全部的遍及式数据系统都会蒙受。别本管理如上只是以一个topic八个分区为例子进行了商讨,但实则一个卡夫卡将会管理数不尽的topic分区.Kafka尽量的使具备分区均匀的遍布到集群具有的节点上并非汇总在某个节点上,别的主从关系也尽量均衡这样种种几点都会担当一定比例的分区的leader.优化leader的取舍进程也是相当的重视的,它调整了系统产生故障时的空窗期有多久。卡夫卡选取一个节点作为“controller”,当发现成节点down掉的时候它承担在游泳分区的具有节点中甄选新的leader,那使得卡夫卡能够批量的快速的治本全部分区节点的主从关系。假诺controller down掉了,活着的节点中的多少个会备切换为新的controller.###################################################九、客户端API
    Kafka Producer APIsProcuder API有三种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都落实了同二个接口:class Producer {
    /* 将音讯发送到内定分区 /
    publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
    /
    批量发送一群新闻 /
    publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
    /
    关闭producer */
    publicvoid close();
    }

     

    total length : 4 bytes
    error code : 2 bytes
    message 1 : x bytes
    ...
    message n : x bytes
    MultiMessageSetSend (multiFetch result)

    1. 1 创制maven工程,pom.xml中加进如下:  
    2.  <dependency>  
    3.         <groupId>org.apache.kafka</groupId>  
    4.         <artifactId>kafka_2.10</artifactId>  
    5.         <version>0.8.2.0</version>  
    6.     </dependency>  
    7.   
    8.   
    9. 2 java代码:  向宗旨test内写入数据  
    10.   
    11. import java.util.Properties;  
    12. import java.util.concurrent.TimeUnit;  
    13.   
    14. import kafka.javaapi.producer.Producer;  
    15. import kafka.producer.KeyedMessage;  
    16. import kafka.producer.ProducerConfig;  
    17. import kafka.serializer.StringEncoder;  
    18.   
    19.   
    20.   
    21.   
    22. public class kafkaProducer extends Thread{  
    23.   
    24.     private String topic;  
    25.       
    26.     public kafkaProducer(String topic){  
    27.         super();  
    28.         this.topic = topic;  
    29.     }  
    30.       
    31.       
    32.     @Override  
    33.     public void run() {  
    34.         Producer producer = createProducer();  
    35.         int i=0;  
    36.         while(true){  
    37.             producer.send(new KeyedMessage<Integer, String>(topic, "message: "   i ));  
    38.             try {  
    39.                 TimeUnit.SECONDS.sleep(1);  
    40.             } catch (InterruptedException e) {  
    41.                 e.printStackTrace();  
    42.             }  
    43.         }  
    44.     }  
    45.   
    46.     private Producer createProducer() {  
    47.         Properties properties = new Properties();  
    48.         properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk  
    49.         properties.put("serializer.class", StringEncoder.class.getName());  
    50.         properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明kafka broker  
    51.         return new Producer<Integer, String>(new ProducerConfig(properties));  
    52.      }  
    53.       
    54.       
    55.     public static void main(String[] args) {  
    56.         new kafkaProducer("test").start();// 使用kafka集群中创制好的宗旨 test   
    57.           
    58.     }  
    59.        
    60. }  
    61.   
    62.   
    63.   
    64.   
    65. 3  kafka集群中成本宗旨test的数额:  
    66. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin  
    67.   
    68. 4   运行java代码,然后在看集群开销的数据如下:  
    69.   
    70. message: 0  
    71. message: 1  
    72. message: 2  
    73. message: 3  
    74. message: 4  
    75. message: 5  
    76. message: 6  
    77. message: 7  
    78. message: 8  
    79. message: 9  
    80. message: 10  
    81. message: 11  
    82. message: 12  
    83. message: 13  
    84. message: 14  
    85. message: 15  
    86. message: 16  
    87. message: 17  
    88. message: 18  
    89. message: 19  
    90. message: 20  
    91. message: 21  

    复制代码
    今昔大家搭建了一个集群,怎么知道各类节点的新闻吗?运转“"describe topics”命令就足以了:> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

    cp config/server.properties config/server-2.properties

    复制代码
    在拷贝出的新文件中拉长以下参数:config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

    复制代码
    即使开始时期担当续写音信的leader down掉了,但在此之前的音信仍是可以够费用的:> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2

    本文由68399皇家赌场发布于域名注册,转载请注明出处:Kafka入门杰出教程

    关键词: 68399皇家赌场 kafka 大数据开发 e_Kafka

上一篇:68399皇家赌场:行使wget递归镜像网址

下一篇:没有了