json-lib反序列化精度丢失问题

最近在工作中,遇到一个问题,项目中某处使用了json-lib的2.4-jdk15版本.问题最终简化为 double amount = 6264583.33; String jsonString = "{\"pi\":" + amount + "}"; JSONObject jsonObject = JSONObject.fromObject(jsonString); System.out.println("转换前:" + jsonString); System.out.println("转换后:" + jsonObject); 这个值输出的将会是6264583.5 这个值.这个问题.先google一下,很快赵到了 http://sourceforge.net/p/json-lib/bugs/116/ 于是,大概问题知道了.是json-lib的一个bug,但是这个bug怎么来的呢.结合这个bug下面的评论和debug代码,先以pi这个例子,很快走到了. 可以看到json-lib走到了apache common-lang(2.5这个版本) 的NumberUtils.createNumber处,此时String的还是对的. 继续单步,来到这个方法里面 可以看到这里小数部分,整数部分也都还是对的.继续向下走.我擦.画风不太对.居然采用了先尝试float,发现没问题.然后就继续尝试double,我擦.直接数据就丢失了呀.. 好吧..那么现在问题就便成了更简单的一个问题. 使用NumberUtils.createNumber 的bug.在bug issue里,有人提到.这个bug,apache官方已知.好的. https://issues.apache.org/jira/browse/LANG-693 然后在这里有官方的一次修复,修复记录在这里.3.2版本已经修复. http://svn.apache.org/viewvc?view=revision&revision=1484263 可以看到是对小数部分的长度进行了判断.如果小于7位,就用float转换,如果大于7,小于16,就用double,如果还大,就用BigDecimal. n = org.apache.commons.lang3.math.NumberUtils.createNumber("3.14159265358"); System.out.println("lang3_createNumber_3.14159265358---->" + n + "->精度正常"); 于是我继续debug,看common-lang3的修复情况,好像确实是修复了.但是对于我出现的问题1.6264583.33 这个数字,还是出现了精度丢失,因为这里小数部分小于7位,所以尝试使用float转换,直接丢失精度 修复不完善.. 于是提个bug :https://issues.apache.org/jira/browse/LANG-1187 等回复. 继续.公司内部一般使用fastjson,那么如果我使用fastjson,有问题吗? 发现没有问题. Object o = com.alibaba.fastjson.JSONObject.parse("3.14159265358"); System.out.println("fastjson_createNumber_3.14159265358---->" + o + "->精度正常"); o = com.alibaba.fastjson.JSONObject.parse("6264583.33"); System.out.println("fastjson_createNumber_6264583.33---->" + o + "->精度正常"); 可以看到,这里做转换的时候传递了一个是否是bigdecimal的标识.而这个标识默认是开启的.而且即使不开启.. 最坏的情况也是个double.所以数据不会丢失. 再顺便说一下,double的6264583.33 为什么转换到float会精度丢失,先看一下浮点数在计算机中怎么表示的 找到一张图,这是double的标识和浮点数的计算. 而浮点数则是32位,1位符号位,8位幂,23位尾数,看测试代码 //double标识测试 double d = 6264583.33d; long l = Double.doubleToLongBits(d); System.out.println(Long.toBinaryString(l)); //float想要表示这个数字 float f = 6264583.33f; int value = Float.floatToIntBits(f); System.out.println(Integer.toBinaryString(value)); //double表示这个值 d = 6264583.5d; l = Double.doubleToLongBits(d); System.out.println(Long.toBinaryString(l)); 输出结果(做一下分割对齐) 1 00000101010 111111001011100000111010101000111101011100001010010 1 00101010 1111110010111000001111 1 00000101010 111111001011100000111100000000000000000000000000000 注意看,第一行是6264583.33的double表示.而同样想要用float表示这个数字,发现幂,符号位,都是对的.但是因为尾数只有23位,所以四舍五入,将完整double的后几位进位1,变成了这个二进制表示法,这时候已经不准确了, 而这个数字呢.看第三行,会发现实际上是6264583.5的精确值表示.尾数位0都是可以省略的,因为按照公式计算也没啥作用. 如有问题,欢迎评论讨论. 附录: 完整的测试代码 public class App { public static void main(String[] args) { //http://sourceforge.net/p/json-lib/bugs/116/ //2.4版本有问题 double pi = 3.14159265358; String jsonString = "{\"pi\":" + pi + "}"; JSONObject jsonObject = JSONObject.

zookeeper伪集群部署

zookeeper是用来管理分布式环境的系统主要用来服务发现,配置管理,同步.大致原理是zookeeper 自身集群的每个节点都维护这一个目录树,内容相同,每个节点的数据一致性由zookeeper自身的算法来解决.下篇尝试.zookeeper本篇主要说明如果部署zookeeper的分布式环境. 下载 zookeeper由apache在管理,下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper/.下载完成后,随便放个目录好了.. 配置 本次创建3个节点. 1 . 存储目录准备 首先给每个伪节点创建一个目录.用来存储每个节点保存的目录信息.真实的分布式环境将对应在不同的机器上. 这里我在D:\zookeeper,创建三个目录,分别是zk1,zk2,zk3. 然后为每个集群编写一个myid文件,标识集群id 2 . 启动配置文件 下载完成后,在conf目录会看到由一个zoo_sample.cfg实例配置文件,我们可以以这个为模板.来为分布式环境的每个zookeeper节点配置一个节点的数据目录,端口.其他节点的信息等. 我们在conf目录例创建三个配置文件,分别为zk1.cfg,zk2.cfg,zk3.cfg; 里面的值 zk1.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:/zookeeper/zk1 clientPort=2181 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890 zk2.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:/zookeeper/zk2 clientPort=2182 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890 zk1.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:/zookeeper/zk3 clientPort=2183 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890 这里的server.1.2.3这就是每个机器对应的myid的值. server.1=127.0.0.1:2888:3888解释一下这条配置.前面的2888是各个节点用来互相交流.选取leader的端口.后面这个端口,3888是各个节点用来和leader沟通的节点.而clientPort 是开放出去,等待客户端连接的端口. 启动 分别启动三个实例,在zookeeper的安装目录下.进如bin目录,复制三个zkServer.cmd 文件,要是linux就不用这么麻烦了.. 分别加上一行 set ZOOCFG=../conf/zk1.cfg 最终这个文件像这样 setlocal call "%~dp0zkEnv.cmd" set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain set ZOOCFG=../conf/zk1.cfg echo on java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* endlocal 然后直接双击启动zkServer1.cmd,zkServer2.cmd,zkServer3.cmd 刚启动第一个之后,你会看到有报错,是zookeeper进行选举的时候报错的.因为第一个zk节点.从自己的启动配置里,知道还有两个节点,于是尝试连接.但是连接不上,再启动另外两个.都启动后,报错消失 然后在D:\zookeeper中可以看到由数据写入. 测试 启动bin目录的zkCli.cmd,自动连接本机的2181端口.也可以自己指定 zkCli.cmd –server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 对客户端来说.连接上了一个列表之后,如果一台挂了,并不会影响.系统依旧可以运行. 然后执行一些简单的操作 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容。 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据。 创建文件,并设置初始内容: create /zktest “test” 创建一个新的 znode节点“ zk ”以及与它关联的字符串。 获取文件内容: get /zktest 确认 znode 是否包含我们所创建的字符串。 修改文件内容: set /zktest “zkbak” 对 zk 所关联的字符串进行设置。 删除文件: delete /zktest 将刚才创建的 znode 删除。 退出客户端: quit 帮助命令: help 可以关掉一个服务器,会发现客户端依然正常.可以执行get set等操作.

Java可重入锁学习笔记

前几天被前辈问到这个可重入锁,结果忘掉了.于是抽空整个了解一下 目录 什么是可重入锁 为什么要可重入 如何实现可重入锁 有不可重入锁吗 demo代码展示 参考文章 1 . 什么是可重入锁 锁的概念就不用多解释了,当某个线程A已经持有了一个锁,当线程B尝试进入被这个锁保护的代码段的时候.就会被阻塞.而锁的操作粒度是"线程”,而不是调用(至于为什么要这样,下面解释).同一个线程再次进入同步代码的时候.可以使用自己已经获取到的锁,这就是可重入锁 java里面内置锁(synchronize)和Lock(ReentrantLock)都是可重入的 2 . 为什么要可重入 如果线程A继续再次获得这个锁呢?比如一个方法是synchronized,递归调用自己,那么第一次已经获得了锁,第二次调用的时候还能进入吗? 直观上当然需要能进入.这就要求必须是可重入的.可重入锁又叫做递归锁,再举个例子. public class Widget { public synchronized void doSomething() { ... } } public class LoggingWidget extends Widget { public synchronized void doSomething() { System.out.println(toString() + ": calling doSomething"); super.doSomething();//若内置锁是不可重入的,则发生死锁 } } 这个例子是java并发编程实战中的例 子.synchronized 是父类Widget的内置锁,当执行子 类的方法的时候,先获取了一次Widget的锁,然后在执行super的时候,就要获取一次,如果不可重入,那么就跪了. 3 . 如何实现可重入锁 为每个锁关联一个获取计数器和一个所有者线程,当计数值为0的时候,这个所就没有被任何线程只有.当线程请求一个未被持有的锁时,JVM将记下锁的持有者,并且将获取计数值置为1,如果同一个线程再次获取这个锁,技术值将递增,退出一次同步代码块,计算值递减,当计数值为0时,这个锁就被释放. ReentrantLock里面有实现 4 . 有不可重入锁吗 这个还真有.Linux下的pthread_mutex_t锁是默认是非递归的。可以通过设置PTHREAD_MUTEX_RECURSIVE属性,将pthread_mutex_t锁设置为递归锁。如果要自己实现不可重入锁,同可重入锁,这个计数器只能为1.或者0,再次进入的时候,发现已经是1了,就进行阻塞.jdk里面没有默认的实现类. 5 . demo代码展示 5.1 内置锁的可重入 public class ReentrantTest { public void method1() { synchronized (ReentrantTest.class) { System.out.println("方法1获得ReentrantTest的内置锁运行了"); method2(); } } public void method2() { synchronized (ReentrantTest.class) { System.out.println("方法1里面调用的方法2重入内置锁,也正常运行了"); } } public static void main(String[] args) { new ReentrantTest().method1(); } } 5.2 lock对象的可重入 import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private Lock lock = new ReentrantLock(); public void method1() { lock.lock(); try { System.out.println("方法1获得ReentrantLock锁运行了"); method2(); } finally { lock.unlock(); } } public void method2() { lock.lock(); try { System.

事务学习笔记

最近有个感受,在实践中学习固然重要,但是实践遇到的问题常常并没有想象的那么多,而且并不能覆盖所有的情况,所以还是需要对理论有一些深入的理解 什么是事务 事务指的是逻辑上的一组操作,这组操作要么全部成功,要么全部失败,不允许出现部分成功的情况. 事务的特性 定义了事务之后,事务四个特性 原子性 事务是不可分割的单位,事务中的这组操作要么都发生,要么都不发生. 一致性 一致性说是事务执行前后必须要保持一致,不能出现凭空消失的情况,典型的如银行转账的操作,A给B转账,如果刚开始两人总共有100元,转账完成后两人总共还要有100元. 隔离性 多个用户并发访问数据库的时候,一个用户的事务不能被其他的用户的事务所干扰.多个并发事务之间数据要相互隔离.比如事务1,C给A转帐,此时事务2,A给B转账.那么两个事务都要修改A账户的余额,一个增加,一个减少,如何保证他们改完之后数据是对的.这是隔离性的要求. 持久性 一旦事务被提交,对数据库的改变就是持久性的.即使数据库发生故障也不应该有任何影响. 事务的隔离级别 为什么要有隔离级别呢,因为如果没有隔离级别,当两个事务同时对某条记录进行操作的时候,可能会出现如下几种大家常常听到的情况. 1 脏读 脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。 事务1:更新一条数据 ------->事务2:读取事务1更新的记录 事务1:调用commit进行提交 由于事务2使用了事务1还没有提交的记录,如果事务1最后正常提交了还好,但是如果事务1没有提交,而是回滚了.那么事务2的操作就有问题,因为他用的数据是错的.这就是脏读 2 不可重复读 在同一事务中,两次读取同一数据,得到内容不同 事务1:查询一条记录 ————–>事务2:更新事务1查询的记录 ————–>事务2:调用commit进行提交 事务1:再次查询上次的记录 事务1要进行两次查询来做一些比如展示或者使用的操作,但是在两次查询事件被事务2更新掉了记录,所以事务1就出现了不可重复读的问题. 3 幻读 同一事务中,用同样的操作读取两次,得到的记录数不相同 事务1:查询表中所有记录 ————–>事务2:插入一条记录 ————–>事务2:调用commit进行提交 事务1:再次查询表中所有记录 此时事务1两次查询到的记录是不一样的,称为幻读 幻读的重点是新增或者删除,由于另一个事务对表中进行了新增或者删除,到时当前事务每次看到的都条数不一样,就像发生了幻觉一样,查一次多了一条,再查一次,发现又没了. 为此,对事务引入了隔离级别这个概念,由数据库保证 DEFAULT 使用数据库设置的隔离级别 ( 默认 ) ,由 DBA 默认的设置来决定隔离级别 . READ_UNCOMMITTED 会出现脏读、不可重复读、幻读 ( 隔离级别最低,并发性能高 ) READ_COMMITTED 会出现不可重复读、幻读问题(锁定正在读取的行) REPEATABLE_READ 会出幻读(锁定所读取的所有行) SERIALIZABLE 保证所有的情况不会发生(锁表) 可以看到,这四种从上到下性能越来越差,保障性越来越高. 以解决幻读问题为例,SERIALIZABLE直接进行了锁表,那么印发幻读的对该表的插入和删除都无法操作,只能查询.所以不会有问题了.. 事务的传播行为 事务的传播行为主要是为了解决事务嵌套调用的问题,比如A方法里面使用了事务操作,B方法里面也使用了事务操作,当A调用B的时候.这个情况是如何处理的呢 1 REQUIRED 业务方法需要在一个事务中运行,如果方法运行时,已处在一个事务中,那么就加入该事务,否则自己创建一个新的事务.这是spring默认的传播行为. 2 SUPPORTS 如果业务方法在某个事务范围内被调用,则方法成为该事务的一部分,如果业务方法在事务范围外被调用,则方法在没有事务的环境下执行. 3 MANDATORY 只能在一个已存在事务中执行,业务方法不能发起自己的事务,如果业务方法在没有事务的环境下调用,就抛异常 4 REQUIRES_NEW 业务方法总是会为自己发起一个新的事务,如果方法已运行在一个事务中,则原有事务被挂起,新的事务被创建,直到方法结束,新事务才结束,原先的事务才会恢复执行. 5 NOT_SUPPORTED 声明方法需要事务,如果方法没有关联到一个事务,容器不会为它开启事务.如果方法在一个事务中被调用,该事务会被挂起,在方法调用结束后,原先的事务便会恢复执行. 6 NEVER 声明方法绝对不能在事务范围内执行,如果方法在某个事务范围内执行,容器就抛异常.只有没关联到事务,才正常执行. 7 NESTED 如果一个活动的事务存在,则运行在一个嵌套的事务中.如果没有活动的事务,则按REQUIRED属性执行.它使用了一个单独的事务, 这个事务拥有多个可以回滚的保证点.内部事务回滚不会对外部事务造成影响, 它只对DataSourceTransactionManager 事务管理器起效. 总共7个,1,4,7最重要.1就是说A和B会在A的事务里.而4是B会开启一个新的事务,直到完成结束,A的事务才会继续运行. 参考资料 Spring事务管理 Innodb中的事务隔离级别和锁的关系

kafka快速开发demo

在kafka快速上手,主要是使用kafka提供的测试来做了一下简单测试,实际开发中的使用可能才是我们要关系的.启动zk和kafka,新建topic的过程都不变. 1 新建一个maven工程,引入依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> </dependency> 2 编写配置文件 public interface KafkaProperties { public final static String ZK = "127.0.0.1:2181"; public final static String GROUP_ID = "test_group1"; public final static String TOPIC = "test"; public final static String BROKER_LIST = "127.0.0.1:9092"; public final static String SESSION_TIMEOUT = "20000"; public final static String SYNC_TIMEOUT = "20000"; public final static String INTERVAL = "1000"; } 3 编写生产者 public class KafkaProducer extends Thread { private Producer<Integer, String> producer; private String topic; private Properties props = new Properties(); private final int SLEEP = 1000 * 3; public KafkaProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); //生产者直接和broker列表连接 props.put("metadata.broker.list", KafkaProperties.BROKER_LIST); producer = new Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int offsetNo = 1; while (true) { String msg = new String("Message_" + offsetNo); System.out.println("Send->[" + msg + "]"); producer.send(new KeyedMessage<Integer, String>(topic, msg)); offsetNo++; try { sleep(SLEEP); } catch (Exception ex) { ex.

kafka文章推荐

本文主要分享看到的好的关于kafka的文章.后续看到持续更新 Kafka剖析(一):Kafka背景及架构介绍 Kafka设计解析(二):Kafka High Availability (上) Kafka设计解析(三):Kafka High Availability (下) Kafka设计解析(四):Kafka Consumer解析

kafka分布式部署与验证

在kafka快速上手,和kafka中的partition和offset中,已经解释了kafka的一些原理,和完成了一个简单的生产消费的实践,如第一篇所说,kafka是一个分布式环境下的消息组件,那么,按照我们前面的简单上手,如果kafka的应用进程被杀或者kafka的机器宕机,那么kafka消息组件就无法使用了,或者zookeeper宕机了,那么kafka也无法使用了. kafka集群(cluster) 一台机器不够,那就多搞几台,首先,启动zookeeper这个就不多说了.可以参看前文,在启动kafka的时候,我们在单机模拟启动多个kafka应用. 首先在config目录,copy两个server.properties 文件,这里我复制三份,分别起名server1.properties ,server2.properties server3.properties 然后修改这三个配置文件,主要修改broker.id=2,port=9094,log.dir=/tmp/kafka-logs-2这三个值,broker.id是用来标记分布式环境中的broker的,要求唯一,port和log.dir一个端口,一个log目录,如果在真实的分布式环境中是不需要修改.这里单机模拟防止端口冲突. 分别把broker.id改为1,2,3,log.dir则分别改成kafka-logs-1,kafka-logs-2,kafka-logs-3,然后依次启动 kafka-server-start.bat ../../config/server1.properties kafka-server-start.bat ../../config/server2.properties kafka-server-start.bat ../../config/server3.properties 如果你启动有报错,一个就是之前说的那个vm参数太大,另一个可能是你的端口没改好.具体错误看下报错就好了. 然后我们注册一个topic,叫做replicationtest kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replicationtest 这里冗余是3,分区是1,那么最终各个broker都会保留一份,最多允许N-1,也就是2台broker宕机,服务照样运行. 注册之后,这时候 kafka-topics.bat--describe --zookeeper localhost:2181 --topic replicationtest 执行描述命令,看下集群情况 第一行给出了分区的汇总信息。每个分区行给出分区信息。 “Leader” 节点是2. “Replicas” 信息,在节点2,3,1上,所有的节点信息. “Isr” 工作中的复制节点的集合. 也就是活的节点的集合. 其他的就不用解释了.这里选出了2是leader,也就是说2这个节点会给消费者提供服务. 然后我们测试一条信息. kafka-console-producer.bat --broker-list localhost:7777,localhost:8888,localhost:9999 --topic replicationtest 上面的7777是server1.properties 中设置的.根据个人情况.改改.然后在控制台发发消息. 然后消费一下. kafka-console-consumer.bat --zookeeper localhost:2181 --topic replicationtest 这里的2181是zookeeper的端口,不用改. 然后.我们开始关掉一个broker,在3的控制台里CTRL,C.然后是否终止操作,输入Y. 再发一条消息 一切正常.我们看一下集群信息 发现Isr中存活的机器少了3.因为3挂了. 然后我们关掉broker2.这时候,会触发新的leader选举.期望值1变成leader,再发一条消息 可以看到生产者发消息过程中,产生了异常,因为和2的连接断开了.但是注意,消息并没有丢,因为触发了新的选举.可以看到,消费者还是接到了正常的消息.集群情况如下 至此,kafka的broker集群测试完毕,那么剩下的问题来了.消费者启动的时候连接的是zookeeper的地址,如果这台zookeeper挂了呢. 那么我们需要zookeeper集群部署. zookeeper集群 这就包括两部分. 是broker本来要能知道这些zookeeper集群的地址,当一个宕机的时候,才会切换到另一个zookeeper 消费者要知道这些zookeeper的地址,理由同上. 因此步骤如下.可以自己试一试,比较简单 复制3份zookeeper.properties文件,命名为zookeeper1.properties,zookeeper2.properties,zookeeper3.properties,修改文件中的dataDir=/tmp/zookeeper和,clientPort=2181,端口分别设置为2181,2182,2183.然后启动三个zookeeper 修改kafka启动配置,server1.properties三个文件中的zookeeper.connect=localhost:2181这个配置,逗号隔开.最终为zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,然后启动 生产者也改下配置中的.metadata.broker.list=localhost:9092,如果使用命令行启动就不用改了.参数指定也可以. 消费者同理,可以改下配置文件中zookeeper.connect=127.0.0.1:2181,也可以命令行启动的时候修改. 5.最终就是各种宕机测试了.

kafka中的partition和offset

在kafka快速上手中,留下的问题是关于partition和offset,这篇文章主要解释这个. Log机制 说到分区,就要说kafka对消息的存储.在官方文档中. 首先,kafka是通过log(日志)来记录消息发布的.每当产生一个消息,kafka会记录到本地的log文件中,这个log和我们平时的log有一定的区别.这里可以参考一下The Log,不多解释. 这个log文件默认的位置在config/server.properties中指定的.默认的位置是log.dirs=/tmp/kafka-logs,linux不用说,windows的话就在你对应磁盘的根目录下.我这里是D盘. #分区partition# kafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小.因此有了partition的概念. kafka对消息进行一定的计算,通过hash来进行分区.这样,就把一份log文件分成了多份.如上面的分区读写日志图,分成多份以后,在单台broker上,比如快速上手中,如果新建topic的时候,我们选择了--replication-factor 1 --partitions 2,那么在log目录里,我们会看到 test-0目录和test-1目录.就是两个分区了. 你可能会想,这特么没啥区别呀.注意,当有了多个broker之后,这个意义就存在了.这里上一张图,原文在参考链接里有 这是一个topic包含4个Partition,2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法.将总共8份数据,分配到broker集群上. 结果就是每个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.比如图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.所以还能使用,如果再宕机一台,那么数据不完整了.当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.需要在存储占用和高可用之间做衡量. 至于宕机后,zookeeper会选出新的partition leader.来提供服务.这个等下篇文章 #偏移offset# 上一段说了分区,分区就是一个有序的,不可变的消息队列.新来的commit log持续往后面加数据.这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息. 消费者消费到了哪条消息,是保持在消费者这一端的.消息者也可以控制,消费者可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也可以重置offset #如何通过offset算出分区# 其实partition存储的时候,又分成了多个segment(段),然后通过一个index,索引,来标识第几段.这里先可以去看一下本地log目录的分区文件夹. 在我这里,test-0,这个分区里面,会有一个index文件和一个log文件, 对于某个指定的分区,假设每5个消息,作为一个段大小,当产生了10条消息的情况想,目前有会得到(只是解释) 0.index (表示这里index是对0-4做的索引) 5.index (表示这里index是对5-9做的索引) 10.index (表示这里index是对10-15做的索引,目前还没满) 和 0.log 5.log 10.log ,当消费者需要读取offset=8的时候,首先kafka对index文件列表进行二分查找,可以算出.应该是在5.index对应的log文件中,然后对对应的5.log文件,进行顺序查找,5->6->7->8,直到顺序找到8就好了. 具体的算法参看美团的文章好了 更多文档 官方文档 Kafka文件存储机制那些事 Kafka集群partition replication自动分配分析

kafka快速上手

简单介绍 kafka是一个分布式消息中间件,在kafka中主要涉及到四个基本名词: Topic Kafka将消息种子分门别类, 每一类的消息称之为一个主题(Topic). Producer 发布消息的对象称之为主题生产者. Consumer 订阅消息并处理消息的对象称之为主题消费者 Broker 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器称为一个代理(Broker). 消费者可以订阅一个或多个主题,并从Broker拉数据(注意是拉,不是pull,),从而消费这些已发布的消息。 安装(以windows为例) 安装非常简单,从这里下载,下载完成后解压到一个目录就好了. 简单使用 首先使用kafka的一个流程就是生产者生产消息,发送给kafka集群,然后消费者从kafka集群中获取消息进行消费. 要启动kafka需要先启动zookeeper,因为ZooKeeper是通过冗余服务实现高可用性的,也就是说在分布式环境中,如何保证kafka集群的高可用.zookeeper会来做leader选取,当消费者准备发消息时,会从zookeeper中获取一个可用的消息服务器地址,然后连接进行发送,保证党集群内有服务器宕机并不影响整体的使用. 1.启动自带的简易zookeeper. 进行解压目录的bin/windows目录 zookeeper-server-start.bat ../../config/zookeeper.properties 执行命令启动,从zookeeper.properties中会看到.zookeeper会开发一个clientPort=2181,2181的端口给消费者使用,其实也可以给生产者使用,但是在0.8.0版本后,producer不再通过zookeeper连接broker, 而是通过brokerlist(192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092配置,直接和broker连接,只要能和一个broker连接上就能够获取到集群中其他broker上的信息,绕过了zookeeper. 2.启动kafka服务 kafka-server-start.bat ../../config/server.properties 执行启动,另一个命令行窗口,同样的.查看配置问题,会知道kafka的服务会在port=9092 ,9092端口打开. 3.注册一个topic kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 这个命令中,create表示创建.zookeeper 和后面的地址表示kafka使用本机2181端口开放的zookeeper保持高可用.replication-factor表示消息只冗余一份,目前我们只有一个kafka机器,broker,partitions 表示一份分区,分区是kafka的另一个概念,大致是说,同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的位置上,这个下次写好了.这样已经在kafka注册了一个名为test的消息topic了. 4.使用简易的控制台生产者模拟 kafka-console-producer.bat --broker-list localhost:9092 --topic test 前面说过了.新版本生产者直接通过brokerlist来连接kafka,目前只有一台,所以就一个地址,准备向test这个topic发送消息. 5.使用简易的控制台消费者模拟 kafka-console-consumer.bat --zookeeper localhost:2181 --topic test 这个前面也说过了.消费者使用zookeeper获取可用的broker列表,然后拉去消息,并且还有一些offset同步的问题.和分区,文件存储一起的一个概念,下次写. 6.开始生产和消费消息 至此,已经开了四个控制台窗口了..在producer窗口里,随便打几个字,然后enter,在消费者的窗口里将会显示出来. 其他问题 实际可能不那么顺利,如果你启动kafka或者其他应用的时候,有错误提示,提示无法创建虚拟机vm这样的.那么修改一下对应的bat脚本.就好了 ,vm的heap申请是1G,如果你机器内存不够,改成512M,或者更小的就好了. 更多文档 官方文档 kafka快速入门

AngularJS 学习资源

一直对AngularJS很好奇,之前看过国内某在线网站的一套AngularJS教程,可惜没看懂.也没理解为了解决什么问题,只是知道是个前端JS框架.这周看到个Udemy - Learn and Understand AngularJS 这个教程.去看了一下,是收费的.强烈推荐看.无字幕,但是不难,已经看完了.讲的非常非常好.网上有别人分享的.我给个磁力链接吧. 01 Getting Started 02 Model, View, Whatever___ 03 Services and Dependency Injection 04 Data Binding and Directives 05 Single Page Applications 06 Custom Services 07 Custom Directives 08 Lets Build an App in record time 09 BONUS Lectures 10 Getting Ready for AngularJS 2.0 in 2016 11 Conclusion 非常的介绍.看完之后,你会理解AngularJS解决了什么问题,他的数据绑定怎么使用,模块怎么弄,怎么写服务,怎么定义指令等等.强烈推荐. 最好有一点js和css的基础会比较好理解.如果懂后端开发的就更好了.依赖注入这些有点基础会比较好理解.