您的位置:68399皇家赌场 > 域名注册 > 怎么在 Apache 卡夫卡 中经过 KSQL 深入分析 推特

怎么在 Apache 卡夫卡 中经过 KSQL 深入分析 推特

发布时间:2019-08-03 14:08编辑:域名注册浏览(143)

    借助Apache Kafka和Kafka Streams框架能够更加好地搭建以流为大旨的架会谈支付布满式流处理应用程序。Confluent的首席营业官杰伊Kreps在上周举行的贰零壹伍响应式高峰会议上为大家带来了关于流管理和微服务的演讲。

    Confluent联合创办人兼COO 杰伊Kreps发布了一篇博文,给出了卡夫卡的着实定位——它不光是个音信系统,它照旧个存款和储蓄系统,而它的终极目的是要让流式处理成为当代集团的主流开荒范式。以下内容翻译自小编的博文,查看原来的文章It’s Okay To Store Data In Apache Kafka。

    前言

    近日在使用Spark streamingKafka创设三个实时的数据分析系统,对书籍阅读数据开始展览辨析,抓好时推荐。斯ParkerStreaming 模块是对于 斯Parker Core 的四个恢宏,指标是为着以高吞吐量,何况容错的措施管理持续性的数据流。近些日子斯Parker Streaming 辅助的外表数据源有 Flume、 卡夫卡、推特(TWTR.US)、ZeroMQ、TCP Socket 等。Apache Kafka是贰个遍及式的音信公布-订阅系统,卡夫卡能够用作流计算种类的数据源,本例中Spark Streaming将从Kafka中开销数据。

    图片 1

    杰伊说,人们早就在数据库本领上做了好多斟酌,但在新闻队列上做得并非常少。在根据微服务的系统架构里,新闻传递能够视作服务的强硬后盾。

    民众三番五次问是还是不是足以把卡夫卡作为长时间的数目存款和储蓄来行使,很显著,要是把数据保存战术设置为“永远”大概启用核心的日志压缩功用,那么数量就能够被永远保存下来。但自个儿感到大家实际确实想领会的是,那样做是还是不是很疯狂。

    系统景况

    介绍

    KSQL 是 Apache Kafka中的开源的流式 SQL 引擎。它能够令你在 卡夫卡宗旨topic上,使用三个简易的还借使交互式的 SQL 接口,很轻松地做一些复杂的流管理。在这些短文中,大家将见到怎么样轻巧地布局并运维在一个沙箱中去查究它,并应用大家都欣赏的言传身教数据库源: Facebook。大家将从推文的原始流中得到,通过选择 KSQL 中的条件去过滤它,来创设多少个群集,如总计各种用户每小时的推文数量。

     

    她涉及了二种编程范式:诉求/响应、批处理和流管理,以及那二种范式之间的分别。流管理同一时间适用于在线和批管理二种情景。流管理而不是更加快的MapReduce,而是管理和分析数据的另一种范式。杰伊介绍了卡夫卡的三种关于流管理的核心API:Producer、Consumer、Connector和Streams。

    简单的讲,那样做不算疯狂。实际上,大家一向都在如此做,何况Kafka的规划意图之一就是要将它看成数据存款和储蓄系统。可是难点是,为何大家要把Kafka作为数据存款和储蓄吗?

    软件版本

    1
    2
    3
    Spark: 1.4.1
    Kafka: 0.8.1.1
    zookeeper: 3.4.6

    Confluent

    图片 2

    首先, 赢得贰个 Confluent 平台的别本。笔者使用的是 RPM 包,可是,要是您须求的话,你也得以采取 tar、 zip 等等 。启动 Confluent 系统:

    1. $ confluent start

    (假设您感兴趣,这里有二个 Confluent 命令行的快捷教程)

    我们将选择 卡夫卡 Connect 从 推特(TWTR.US) 上拉取数据。 那个 推特连接器能够在 GitHub 上找到。要安装它,像上边那样操作:

    1. #Clone the git repo
    2. cd/home/rmoff
    3. gitclone https://github.com/jcustenborder/kafka-connect-twitter.git
    1. #Compile the code
    2. cd kafka-connect-twitter
    3. mvn clean package

    要让 卡夫卡 Connect 去选拔大家创设的连接器, 你要去修改配置文件。因为大家使用 Confluent 命令行,真实的布局文件是在 etc/schema-registry/connect-avro-distributed.properties,因而去修改它并扩展如下内容:

    1. plugin.path=/home/rmoff/kafka-connect-twitter/target/kafka-connect-twitter-0.2-SNAPSHOT.tar.gz

    重启动 Kafka Connect:

    1. confluent stop connect
    2. confluent start connect

    万一你安装好插件,你能够很轻易地去布置它。你能够直接行使 卡夫卡 Connect 的 REST API ,恐怕创建你的布置文件,那正是笔者要在这里做的。若是你须求全部的点子,请首先拜会 推文(Tweet)来获得你的 API 密钥。

    1. {
    2. "name":"twitter_source_json_01",
    3. "config":{
    4. "connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    5. "twitter.oauth.accessToken":"xxxx",
    6. "twitter.oauth.consumerSecret":"xxxxx",
    7. "twitter.oauth.consumerKey":"xxxx",
    8. "twitter.oauth.accessTokenSecret":"xxxxx",
    9. "kafka.delete.topic":"twitter_deletes_json_01",
    10. "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    11. "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    12. "value.converter.schemas.enable":false,
    13. "key.converter.schemas.enable":false,
    14. "kafka.status.topic":"twitter_json_01",
    15. "process.deletes":true,
    16. "filter.keywords":"rickastley,kafka,ksql,rmoff"
    17. }
    18. }

    设若你写这个到 /home/rmoff/twitter-source.json,你可在此以前日运作:

    1. $ confluent load twitter_source -d /home/rmoff/twitter-source.json

    下一场推文就从豪门都爱好的网络歌唱家 [rick] 滚滚而来……

    1. $ kafka-console-consumer --bootstrap-server localhost:9092--from-beginning --topic twitter_json_01|jq '.Text'
    2. {
    3. "string":"RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB"
    4. }
    5. {
    6. "string":"RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!nDo not forget Chile!!nWe hope you get back someday!!nHappy weekend for you!!n❤…"
    7. }

     

    卡夫卡Streams是一个Java类库,能够用来创设具有容错技能的遍及式流管理应用程序。它帮衬map、filter、aggregate(count、sum)和join这一个主意。

    1. 你或然在创设二个依照事件源自的应用程序,须要二个数据存款和储蓄来保存改换日志。理论上,你能够应用另外一种存款和储蓄系统。卡夫卡已经减轻了不可变(immutable)日志和基于那些日志生成“物化视图”的主题素材,既然这样,为啥不直接行使Kafka呢?纽约时报已经在他们的CMS系统里采取卡夫卡来保存他们的稿子。
    2. 你恐怕在应用程序里采纳了缓存,并从卡夫卡上获取数据来更新缓存。你能够将卡夫卡的核心设置为压缩型日志,应用程序每趟在重启时就足以从零偏移量地点再次刷新缓存。
    3. 您的流式作业数据流来自卡夫卡,在流式作业的逻辑发生转移后,要求重新计算结果。最简易的办法正是将偏移量重新恢复设置为零,让新代码重新总结结果。
    4. 卡夫卡常常被用于捕获和分发数据库的变动事件(平日被称之为CDC,Change Data Capture)。应用程序也许只须求最新的数据库改造,但却要管理一体化的多少快速照相,而那是极度耗费时间的操作。要是启用主题的日记压缩作用,就足以让应用程序直接从零偏移量地方再一次加载数据。

    集群节点

    一齐有四台主机,主机名分别为nn0001, dn0001, dn0002, dn0003。

    1
    2
    3
    4
    192.168.186.12   nn0001
    192.168.186.13   dn0001
    192.168.186.14   dn0002
    192.168.186.15   dn0003

     

    KSQL

    近期我们从 KSQL 开首 ! 马上去下载并创设它:

    1. cd/home/rmoff
    2. gitclone https://github.com/confluentinc/ksql.git
    3. cd/home/rmoff/ksql
    4. mvn clean compile install -DskipTests

    塑造产生后,让大家来运行它:

    1. ./bin/ksql-cli local--bootstrap-server localhost:9092
    1. ======================================
    2. = _ __ _____ ____ _ =
    3. =||/ // ____|/ __ | |=
    4. =|' /| (___ | | | | | =
    5. = | < ___ | | | | | =
    6. = | . ____) | |__| | |____ =
    7. = |_|______/ __________| =
    8. = =
    9. = Streaming SQL Engine for Kafka =
    10. Copyright 2017 Confluent Inc.
    11. CLI v0.1, Server v0.1 located at http://localhost:9098
    12. Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    13. ksql>

    利用 KSQL, 我们能够让大家的多寡保存在 Kafka主旨上并能够查询它。首先,我们须求去报告 KSQL 大旨上的数目格局schema是怎样,一个 twitter 音讯其实是三个卓殊巨大的 JSON 对象, 可是,为了简洁,我们只选出在那之中几行:

    1. ksql> CREATE STREAM twitter_raw (CreatedAt BIGINT,Id BIGINT,Text VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01', VALUE_FORMAT='JSON');
    2. Message
    3. ----------------
    4. Stream created

    在概念的形式中,大家得以查询这一个流。要让 KSQL 从该大旨的启幕展现数据(并不是暗中认可的当下时间点),运维如下命令:

    1. ksql> SET 'auto.offset.reset'='earliest';
    2. Successfully changed localproperty'auto.offset.reset'from'null' to 'earliest'

    近年来,让大家看看那一个多少,大家将应用 LIMIT 从句仅检索一行:

    1. ksql> SELECT text FROM twitter_raw LIMIT 1;
    2. RT @rickastley:30 years ago today I said I was NeverGonnaGiveYouUp. I am a man of my word -Rick x https://t.co/VmbMQA6tQB
    3. LIMIT reached for the partition.
    4. Query terminated
    5. ksql>

    今日,让大家接纳刚刚定义和可用的推文内容的漫天数量再一次定义该流:

    1. ksql> DROP stream twitter_raw;
    2. Message
    3. --------------------------------
    4. Source TWITTER_RAW was dropped
    5. ksql> CREATE STREAM twitter_raw (CreatedAt bigint,Id bigint,Text VARCHAR, SOURCE VARCHAR,Truncated VARCHAR,InReplyToStatusId VARCHAR,InReplyToUserId VARCHAR,InReplyToScreenName VARCHAR,GeoLocation VARCHAR,Place VARCHAR,Favorited VARCHAR,Retweeted VARCHAR,FavoriteCount VARCHAR,User VARCHAR,Retweet VARCHAR,Contributors VARCHAR,RetweetCount VARCHAR,RetweetedByMe VARCHAR,CurrentUserRetweetId VARCHAR,PossiblySensitive VARCHAR,Lang VARCHAR,WithheldInCountries VARCHAR,HashtagEntities VARCHAR,UserMentionEntities VARCHAR,MediaEntities VARCHAR,SymbolEntities VARCHAR,URLEntities VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01',VALUE_FORMAT='JSON');
    6. Message
    7. ----------------
    8. Stream created
    9. ksql>

    今后,大家得以操作和检讨越多的近年的数码,使用相似的 SQL 查询:

    1. ksql> SELECT TIMESTAMPTOSTRING(CreatedAt,'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,
    2. EXTRACTJSONFIELD(user,'$.ScreenName')asScreenName,Text
    3. FROM twitter_raw
    4. WHERE LCASE(hashtagentities) LIKE '%oow%' OR
    5. LCASE(hashtagentities) LIKE '%ksql%';
    6. 2017-09-2913:59:58.000| rmoff |Looking forward to talking all about @apachekafka&@confluentinc’s #KSQL at #OOW17 on Sunday13:45 https://t.co/XbM4eIuzeG

    在意这里未有 LIMIT 从句,因而,你就要显示器上见到  “continuous query” 的结果。不像关系型数据表中回到贰个鲜明数量结果的询问,二个不休查询会运营在最棒的流式数据上, 由此,它连接或然回到更加多的笔录。点击 Ctrl-C 去中断然后回来到 KSQL 提醒符。在以上的询问中大家做了一部分政工:

    • TIMESTAMPTOSTRING 将时间戳从 epoch 格式调换成人类可读格式。(LCTT 译注: epoch 指的是贰个一定的时刻 1968-01-01 00:00:00 UTC)
    • EXTRACTJSONFIELD 来体现数据源中嵌套的用户域中的八个字段,它看起来像:

      1. {
      2. "CreatedAt":1506570308000,
      3. "Text":"RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6",
      4. [...]
      5. "User":{
      6. "Id":82564066,
      7. "Name":"Robin Moffatt uD83CuDF7BuD83CuDFC3uD83EuDD53",
      8. "ScreenName":"rmoff",
      9. [...]
    • 动用断言去显得内容,对 #(hashtag)使用情势匹配, 使用 LCASE 去强制小写字母。(LCTT 译注:hashtag 是twitter 中用来标记线索核心的竹签)

    有关补助的函数列表,请查看 KSQL 文档。

    大家能够成立一个从那几个数目中获取的流:

    1. ksql> CREATE STREAM twitter AS
    2. SELECT TIMESTAMPTOSTRING(CreatedAt,'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,
    3. EXTRACTJSONFIELD(user,'$.Name') AS user_Name,
    4. EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,
    5. EXTRACTJSONFIELD(user,'$.Location') AS user_Location,
    6. EXTRACTJSONFIELD(user,'$.Description') AS user_Description,
    7. Text,hashtagentities,lang
    8. FROM twitter_raw ;
    9. Message
    10. ----------------------------
    11. Stream created and running
    12. ksql> DESCRIBE twitter;
    13. Field|Type
    14. ------------------------------------
    15. ROWTIME | BIGINT
    16. ROWKEY | VARCHAR(STRING)
    17. CREATEDAT | VARCHAR(STRING)
    18. USER_NAME | VARCHAR(STRING)
    19. USER_SCREENNAME | VARCHAR(STRING)
    20. USER_LOCATION | VARCHAR(STRING)
    21. USER_DESCRIPTION | VARCHAR(STRING)
    22. TEXT | VARCHAR(STRING)
    23. HASHTAGENTITIES | VARCHAR(STRING)
    24. LANG | VARCHAR(STRING)
    25. ksql>

    况且询问那些获得的流:

    1. ksql> SELECT CREATEDAT, USER_NAME, TEXT
    2. FROM TWITTER
    3. WHERE TEXT LIKE '%KSQL%';
    4. 2017-10-0323:39:37.000|NicolaFerraro| RT @flashdba:Again, I'm really taken with the possibilities opened up by @confluentinc's KSQL engine #Kafka https://t.co/aljnScgvvs

     

    在大会的另三个演讲里,来自UC Santa Cruz的Peter Alvaro聊起怎么着为常见分布式容错系统做活动故障测量检验。他关系了流传驱动故障注入(Lineage-Drive Fault Injection,LDFI)方法,这种格局运用日志追踪音信来鉴定分别冗余计算,有利于测验的实行。

    像这么在卡夫卡里积攒数据而不是怎样疯狂事,Kafka本来正是规划用来积攒数据的。数据经过校验后被长久化在磁盘上,并透过复制别本升高容错本事。再多的多寡都不会拖慢Kafka,在生育条件中,有个别卡夫卡集群以致早已保存超越1 TB的数量。

    zookeeper安装

    kafka使用zookeeper来保管,存款和储蓄一些meta音讯,并动用了zookeeper watch机制来开掘meta新闻的退换并作出相应的动作(比方consumer失效,触发负载均衡等)。
    Zookeeper的配置在机械1上形成后散发到别的三台机械就能够。

    1
    2
    3
    4
    5
    6
    [bigdata@nn0001 ~]$ wget http://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.6.tar.gz
    [bigdata@nn0001 ~]$ tar -zxvf zookeeper-3.4.6.tar.gz
    [bigdata@nn0001 ~]$cd zookeeper-3.4.6/conf
    [bigdata@nn0001 conf]$ pwd
    /home/bigdata/bigprosoft/zookeeper-3.4.6/conf
    [bigdata@nn0001 conf]$ cp zoo_sample.cfg zoo.cfg

     

    修改配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [bigdata@nn0001 conf]$ vi zoo.cfg 
    tickTime=2000
    dataDir=/home/bigdata/bigprosoft/zookeeper/data
    clientPort=2181
    initLimit=10
    syncLimit=5
    server.1=nn0001:2888:3888
    server.2=dn0001:2888:3888
    server.3=dn0002:2888:3888
    server.4=dn0003:2888:3888

     

    在dataDir目录下开创myid文件,nn0001机器的剧情为1,dn0001机器的剧情为2,越多由此及彼。

    1
    2
    3
    [bigdata@nn0001 data]$ echo 1 > myid
    [bigdata@nn0001 data]$ cat myid
    1

     

    起步测验

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [bigdata@nn0001 bin]$ ./zkServer.sh start
    [bigdata@nn0001 bin]$ jps
    10805 QuorumPeerMain   #已经启动成功了
    15494 Master
    11816 NameNode
    20958 Jps
    17539 Worker
    12084 ResourceManager
    12945 RunJar
    12944 RunJar

     

    停止

    1
    [bigdata@nn0001 bin]$ ./zkServer.sh stop

     

    其他机器同样操作,scp过去就可以。

    聚合

    在我们甘休从前,让我们去看一下怎么去做一些集合。

    1. ksql> SELECT user_screenname, COUNT(*)
    2. FROM twitter WINDOW TUMBLING (SIZE 1 HOUR)
    3. GROUP BY user_screenname HAVING COUNT(*)>1;
    4. Oracleace |2
    5. rojulman |2
    6. smokeinpublic |2
    7. ArtFlowMe|2
    8. [...]

    您将恐怕得到满显示屏的结果;那是因为 KSQL 在历次给定的年月窗口更新时实际发生聚合值。因为大家设置 KSQL 去读取在宗旨上的满贯音信(SET 'auto.offset.reset' = 'earliest';),它是一回性读取那一个富有的音信并计算聚合更新。这里有叁个奇妙之处值得去深刻钻探。我们的入站推文流正好便是一个流。不过,现存它不能够创立聚合,大家实际上是成立了多个表。多少个表是在加以时间点的给定键的值的一个快速照相。 KSQL 聚合数据依赖新闻的事件时间,而且只要它立异了,通过轻松的相干窗口重申去操作前边达到的数码。困惑了呢? 笔者期待未有,但是,让大家看一下,尽管大家能够用那一个例子去印证。 我们将注明大家的成团作为二个真实的表:

    1. ksql> CREATE TABLE user_tweet_count AS
    2. SELECT user_screenname, count(*) AS tweet_count
    3. FROM twitter WINDOW TUMBLING (SIZE 1 HOUR)
    4. GROUP BY user_screenname ;
    5. Message
    6. ---------------------------
    7. Table created and running

    看表中的列,这里除了大家要求的外,还会有八个隐含列:

    1. ksql> DESCRIBE user_tweet_count;
    2. Field|Type
    3. -----------------------------------
    4. ROWTIME | BIGINT
    5. ROWKEY | VARCHAR(STRING)
    6. USER_SCREENNAME | VARCHAR(STRING)
    7. TWEET_COUNT | BIGINT
    8. ksql>

    大家看一下这一个是如何:

    1. ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss.SSS'),
    2. ROWKEY, USER_SCREENNAME, TWEET_COUNT
    3. FROM user_tweet_count
    4. WHERE USER_SCREENNAME='rmoff';
    5. 2017-09-2911:00:00.000| rmoff :Window{start=1506708000000end=-}| rmoff |2
    6. 2017-09-2912:00:00.000| rmoff :Window{start=1506711600000end=-}| rmoff |4
    7. 2017-09-2822:00:00.000| rmoff :Window{start=1506661200000end=-}| rmoff |2
    8. 2017-09-2909:00:00.000| rmoff :Window{start=1506700800000end=-}| rmoff |4
    9. 2017-09-2915:00:00.000| rmoff :Window{start=1506722400000end=-}| rmoff |2
    10. 2017-09-2913:00:00.000| rmoff :Window{start=1506715200000end=-}| rmoff |6

    ROWTIME 是窗口开头时间,  ROWKEY 是 GROUP BYUSER_SCREENNAME)加上窗口的咬合。因而,我们得以因而成立别的二个衍生的表来整理一下:

    1. ksql> CREATE TABLE USER_TWEET_COUNT_DISPLAY AS
    2. SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START ,
    3. USER_SCREENNAME, TWEET_COUNT
    4. FROM user_tweet_count;
    5. Message
    6. ---------------------------
    7. Table created and running

    今后它更易于查询和查看大家感兴趣的多少:

    1. ksql> SELECT WINDOW_START , USER_SCREENNAME, TWEET_COUNT
    2. FROM USER_TWEET_COUNT_DISPLAY WHERE TWEET_COUNT>20;
    3. 2017-09-2912:00:00.000|VikasAatOracle|22
    4. 2017-09-2814:00:00.000|Throne_ie|50
    5. 2017-09-2814:00:00.000| pikipiki_net |22
    6. 2017-09-2909:00:00.000| johanlouwers |22
    7. 2017-09-2809:00:00.000| yvrk1973 |24
    8. 2017-09-2813:00:00.000| cmosoares |22
    9. 2017-09-2911:00:00.000| ypoirier |24
    10. 2017-09-2814:00:00.000| pikisec |22
    11. 2017-09-2907:00:00.000|Throne_ie|22
    12. 2017-09-2909:00:00.000|ChrisVoyance|24
    13. 2017-09-2811:00:00.000|ChrisVoyance|28

     

    Peter总括了系统故障测量检验需求有所的多少个标准:

    那么大家为什么会对利用卡夫卡来积攒数据心存疑问呢?

    kafka安装

    卡夫卡的broker、producer、consumer、topic等概念以及原理能够查看官方文档
    此番试验应用的多节点多broker集群形式,为每一台机器分配八个broker id

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [bigdata@nn0001 ~]$ wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
    [bigdata@nn0001 ~]$ tar zxf kafka_2.10-0.8.1.1.tgz
    [bigdata@nn0001 ~]$ cd kafka_2.10-0.8.1.1
    [bigdata@nn0001 kafka_2.10-0.8.1.1]$ cd conf
    [bigdata@nn0001 conf]$ vi server.properties
    broker.id=1  #其它机器的id依次递增即可
    port=9092
    host.name=192.168.186.12
    advertised.host.name=192.168.186.12
    zookeeper.connect=192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181

     

    修改完毕后散发到别的三台机械上。

    起首测验

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [bigdata@nn0001 bin]$ nohup ./kafka-server-start.sh ../config/server.properties &
    [bigdata@nn0001 conf]$ jps
    10805 QuorumPeerMain
    21282 Jps
    15494 Master
    21209 Kafka
    11816 NameNode
    17539 Worker
    12084 ResourceManager
    12945 RunJar
    12944 RunJar

     

    依次运行机器

    结论

    所以大家有了它! 大家得以从 卡夫卡 中获取数据, 何况很轻便选择 KSQL 去索求它。 而不光是去浏览和调换数据,大家得以很轻巧地利用 KSQL 从流和表中国建筑工程总公司立流处理。

    图片 3

    一经你对 KSQL 能够做什么样感兴趣,去查看:

    • KSQL 公告
    • 我们近来的 KSQL 在线研究研究会 和 卡夫卡高峰会议演讲
    • clickstream 演示,它是 KSQL 的 GitHub 仓库 的一有的
    • 本人近来做的发言 体现了 KSQL 怎么着去支撑基于流的 ETL 平台

    铭记,KSQL 今后正处在开采者预览阶段。 迎接在 KSQL 的 GitHub 旅舍上提议任何难题, 恐怕去我们的 community Slack group 的 #KSQL 频道。

    下边关于Kafka的小说您也只怕喜欢,无妨参照他事他说加以考察下:

    CentOS 7.2部署Elasticsearch Kibana Zookeeper Kafka  http://www.linuxidc.com/Linux/2016-11/137636.htm

    CentOS 7下安装Logstash ELK Stack 日志处理体系  http://www.linuxidc.com/Linux/2016-08/134165.htm

    卡夫卡集群安顿与布局手册 http://www.linuxidc.com/Linux/2017-02/141037.htm

    CentOS 7下卡夫卡集群安装  http://www.linuxidc.com/Linux/2017-01/139734.htm

    Apache 卡夫卡 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

    CentOS 7下安装Kafka单机版  http://www.linuxidc.com/Linux/2017-01/139732.htm

    Apache kafka原理与特色(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

    卡夫卡陈设与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

    卡夫卡介绍及条件搭建  http://www.linuxidc.com/Linux/2016-12/138724.htm

    卡夫卡介绍和集群情形搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

    CentOS7.0设置配备卡夫卡集群  http://www.linuxidc.com/Linux/2017-06/144951.htm

    Kafka的详尽介绍:请点这里
    卡夫卡的下载地址:请点这里


    via:

    作者:Robin Moffatt 译者:qhwdw 校对:wxy

    本文由 LCTT 原创编写翻译,Linux中国 荣誉推出

    本文恒久更新链接地址:http://www.linuxidc.com/Linux/2017-11/148455.htm

    图片 4

    • 实在的主题素材
    • 真正的系统
    • 想想时间
    • 故障自由

    本身想,大家更加的多的是把卡夫卡当成了音讯队列系统。新闻队列有一点不成文的平整,举个例子“不要在音信队列里保存消息”。守旧的新闻系统就此不能用来保存音信,是因为:

    kafka使用测量试验

    创建topic

    1
    [bigdata@nn0001 bin]$ ./kafka-topics.sh --create --zookeeper nn0001:2181 --replication-factor 3 --partitions 1 --topic test

     

    查看topic

    1
    2
    3
    4
    5
    6
    [bigdata@nn0001 bin]$ ./kafka-topics.sh --describe --zookeeper nn0001:2181
    Topic:mytest    PartitionCount:2        ReplicationFactor:2     Configs:
            Topic: mytest   Partition: 0    Leader: 2       Replicas: 3,2   Isr: 2
            Topic: mytest   Partition: 1    Leader: -1      Replicas: 4,3   Isr: 
    Topic:test      PartitionCount:1        ReplicationFactor:3     Configs:
            Topic: test     Partition: 0    Leader: 2       Replicas: 2,3,4 Isr: 2

     

    producer测试

    1
    2
    3
    [bigdata@nn0001 bin]$ ./kafka-console-producer.sh --broker-list 192.168.186.12:9092 --topic test
    gsdggfgfgfd
    gdfgdfgdf

     

    conumer测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [bigdata@nn0001 bin]$ ./kafka-console-consumer.sh --zookeeper  192.168.186.12:2181 --from-beginning --topic test
    
    
    abfsfsdfsdfs
    ffsdfs
    gsdggfgfgfd
    gdfgdfgdf
    ^C[2015-08-28 17:48:40,991] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
    Consumed 7 messages
    `

     

    测量检验高可用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    [bigdata@nn0001 bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test
    Topic:test      PartitionCount:1        ReplicationFactor:3     Configs:
            Topic: test     Partition: 0    Leader: 2       Replicas: 2,3,4 Isr: 2,4
    #可以看到leader是2,是dn0001机器,把此机器上的kafka进程杀掉,再查看topic的leader
    
    [bigdata@dn0002 bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --topic test
    Topic:test      PartitionCount:1        ReplicationFactor:3     Configs:
            Topic: test     Partition: 0    Leader: 4       Replicas: 2,3,4 Isr: 4
    #此时leader变成了4,对应的机器是dn0003.
    
    [bigdata@nn0001 bin]$ ./kafka-console-consumer.sh --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test
    
    
    abfsfsdfsdfs
    ffsdfs
    gsdggfgfgfd
    gdfgdfgdf
    q
    
    ^C[2015-08-31 10:14:50,964] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
    Consumed 7 messages
    #消费者消费信息测试

     

    ok,搭建进程就做到,下边用python/java/scala进行付出实例就能够。

    有关这些话题的越多消息,能够在Netflix博客上看到。

    • 音讯被读取后就能被剔除
    • 紧缩性差
    • 缺乏健全的复制机制(若是broker崩溃,数据也就甩掉了)

    排错

    大会第二天的运动内容包涵Jan Machacek带来的“从单体到微服务”以及Anil Gursel和Akara Sucharitakul带来的“使用Akka Streams和卡夫卡举办回压”。

    古板的音信系统在安排上设有重重相差。从根本上讲,任何二个异步音信系统都会保留新闻,只是时间极短,不经常候唯有几分钟,直到音信被费用完结。借使有一个劳务向消息队列发送音信,并愿意有一种机制得以保险其余服务还可以这一个音信,那么消息就供给被保留在有些地点,直到其余服务读取它。假如消息系统不擅长存款和储蓄音讯,也就谈不上给音讯“排队”了。你也许感到无所谓,因为您并不准备长日子地保存新闻。但无论怎样,如果音信系统相连地管理负荷,总会有一对未被花费的消息供给保留下去。一旦新闻系统爆发崩溃,若无实用的容错存款和储蓄机制,数据就能舍弃。消息存款和储蓄是音讯系统的根底,但人们一连忽略这点。

    问题1描述

    1
    2
    3
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

    赶尽杀绝办法

    1
    2
    3
    [bigdata@nn0001 ~]$ wget http://www.slf4j.org/dist/slf4j-1.7.12.tar.gz
    [bigdata@nn0001 ~]$ cd slf4j-1.7.12
    [bigdata@nn0001 ~]$ cp slf4j-nop-1.7.12.jar ~/bigprosoft/kafka/libs/

     

    本文由68399皇家赌场发布于域名注册,转载请注明出处:怎么在 Apache 卡夫卡 中经过 KSQL 深入分析 推特

    关键词: 68399皇家赌场

上一篇:CentOS 7.2装置Zabbix 3.2图像和文字详解

下一篇:没有了