最近由于一些需求,要搞一下python,于是周末搞了搞.要连接服务器,进行一些服务器的操作,于是安装这个Paramiko包,

直接pip install paramiko 结果.报错,最关键的一句是:

1
error: Unable to find vcvarsall.bat

google一圈.最终找到一种最简单地方法.其他的安装vs.安装MinGW都太复杂了.

  1. 安装PyCrypto 第三方版
    因为paramiko依赖PyCrypto,上面那个错就是他报错出来的.安装PyCrypto第三方版
    1
    2
    pip install --use-wheel --no-index --find-links=https://github.com/sfbahr/PyCrypto-Wheels/raw/master/pycrypto-2.6.1-cp35-none-win_amd64.whl pycrypto
    `

安装完成后,再次安装paramiko即可.
2.修改nt.py

安装完上面的步骤,写一个简单的程序测试下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#-*- coding: utf-8 -*-
#!/usr/bin/python
import paramiko
import threading

def ssh2(ip,username,passwd,cmd):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip,22,username,passwd,timeout=5)
for m in cmd:
stdin, stdout, stderr = ssh.exec_command(m)
out = stdout.readlines()
#屏幕输出
for o in out:
print(o)
print('%s\tOK\n'%(ip))
ssh.close()
except :
print('%s\tError\n'%(ip))


if __name__=='__main__':
cmd = ['find /home/admin/logs/ -mtime +3 -name \'*.log.*\' -exec rm -rf {} \;']#你要执行的命令列表
username = "admin" #用户名
passwd = "password" #密码
threads = [] #多线程
ip = "127.0.0.1"
print("Begin......")
a=threading.Thread(target=ssh2,args=(ip,username,passwd,cmd))
a.start()
input()

运行报错.
ImportError: No module named 'winrandom'

so一下..找到一个办法
http://stackoverflow.com/questions/24804829/another-one-about-pycrypto-and-paramiko

找到python3.5的安装目录的
Lib\site-packages\Crypto\Random\OSRNG的nt.py文件将

1
import winrandom

改成

1
from . import winrandom

再次运行ok.非常简单

最近在工作中,遇到一个问题,项目中某处使用了json-lib的2.4-jdk15版本.问题最终简化为

1
2
3
4
5
double amount = 6264583.33;
String jsonString = "{\"pi\":" + amount + "}";
JSONObject jsonObject = JSONObject.fromObject(jsonString);
System.out.println("转换前:" + jsonString);
System.out.println("转换后:" + jsonObject);

这个值输出的将会是6264583.5 这个值.这个问题.先google一下,很快赵到了
http://sourceforge.net/p/json-lib/bugs/116/ 于是,大概问题知道了.是json-lib的一个bug,但是这个bug怎么来的呢.结合这个bug下面的评论和debug代码,先以pi这个例子,很快走到了.

json_create

可以看到json-lib走到了apache common-lang(2.5这个版本) 的NumberUtils.createNumber处,此时String的还是对的.
继续单步,来到这个方法里面

json_mant_dec

可以看到这里小数部分,整数部分也都还是对的.继续向下走.我擦.画风不太对.居然采用了先尝试float,发现没问题.然后就继续尝试double,我擦.直接数据就丢失了呀..

json_float

json_float_fluent

好吧..那么现在问题就便成了更简单的一个问题.

使用NumberUtils.createNumber 的bug.在bug issue里,有人提到.这个bug,apache官方已知.好的.
https://issues.apache.org/jira/browse/LANG-693
然后在这里有官方的一次修复,修复记录在这里.3.2版本已经修复.
http://svn.apache.org/viewvc?view=revision&revision=1484263
可以看到是对小数部分的长度进行了判断.如果小于7位,就用float转换,如果大于7,小于16,就用double,如果还大,就用BigDecimal.

json_common_lang3_fix

1
2
n = org.apache.commons.lang3.math.NumberUtils.createNumber("3.14159265358");
System.out.println("lang3_createNumber_3.14159265358---->" + n + "->精度正常");

于是我继续debug,看common-lang3的修复情况,好像确实是修复了.但是对于我出现的问题1.6264583.33 这个数字,还是出现了精度丢失,因为这里小数部分小于7位,所以尝试使用float转换,直接丢失精度

lang3_float_loss

修复不完善..

于是提个bug :https://issues.apache.org/jira/browse/LANG-1187 等回复.

继续.公司内部一般使用fastjson,那么如果我使用fastjson,有问题吗? 发现没有问题.

1
2
3
4
5
Object o = com.alibaba.fastjson.JSONObject.parse("3.14159265358");
System.out.println("fastjson_createNumber_3.14159265358---->" + o + "->精度正常");

o = com.alibaba.fastjson.JSONObject.parse("6264583.33");
System.out.println("fastjson_createNumber_6264583.33---->" + o + "->精度正常");

fastjson_decimal

可以看到,这里做转换的时候传递了一个是否是bigdecimal的标识.而这个标识默认是开启的.而且即使不开启..

fastjson_first_decimal

最坏的情况也是个double.所以数据不会丢失.

再顺便说一下,double的6264583.33 为什么转换到float会精度丢失,先看一下浮点数在计算机中怎么表示的
double_present
找到一张图,这是double的标识和浮点数的计算.

而浮点数则是32位,1位符号位,8位幂,23位尾数,看测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//double标识测试
double d = 6264583.3d;
long l = Double.doubleToLongBits(d);
System.out.println(Long.toBinaryString(l));

//float想要表示这个数字
float f = 6264583.3f;
int value = Float.floatToIntBits(f);
System.out.println(Integer.toBinaryString(value));

//double表示这个值
d = 6264583.d;
l = Double.doubleToLongBits(d);
System.out.println(Long.toBinaryString(l));

输出结果(做一下分割对齐)

1
2
3
1 00000101010    111111001011100000111010101000111101011100001010010
1 00101010 1111110010111000001111
1 00000101010 111111001011100000111100000000000000000000000000000

注意看,第一行是6264583.33的double表示.而同样想要用float表示这个数字,发现幂,符号位,都是对的.但是因为尾数只有23位,所以四舍五入,将完整double的后几位进位1,变成了这个二进制表示法,这时候已经不准确了,
而这个数字呢.看第三行,会发现实际上是6264583.5的精确值表示.尾数位0都是可以省略的,因为按照公式计算也没啥作用.

如有问题,欢迎评论讨论.

附录:
完整的测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class App {
public static void main(String[] args) {
//http://sourceforge.net/p/json-lib/bugs/116/

//2.4版本有问题
double pi = 3.14159265358;
String jsonString = "{\"pi\":" + pi + "}";
JSONObject jsonObject = JSONObject.fromObject(jsonString);
System.out.println("转换前:" + jsonString);
System.out.println("转换后:" + jsonObject);

double amount = 6264583.33;
jsonString = "{\"pi\":" + amount + "}";
jsonObject = JSONObject.fromObject(jsonString);
System.out.println("转换前:" + jsonString);
System.out.println("转换后:" + jsonObject);

//测试2.4版本引入的lang,这里面
Number n = org.apache.commons.lang.math.NumberUtils.createNumber("3.14159265358");
System.out.println("lang2_createNumber_3.14159265358---->" + n + "->精度丢失");

n = org.apache.commons.lang.math.NumberUtils.createNumber("6264583.33");
System.out.println("lang2_createNumber_6264583.33---->" + n + "->精度丢失");

//测试3.4版本,里面是根据小数部分的长度,选择是否使用float还是double,当小数部分大于7的时候,就会使用double
n = org.apache.commons.lang3.math.NumberUtils.createNumber("3.14159265358");
System.out.println("lang3_createNumber_3.14159265358---->" + n + "->精度正常");

//这种情况就会有问题,虽然小数部分是33,两位,但是实际上是个浮点数.所以还会丢失精度
n = org.apache.commons.lang3.math.NumberUtils.createNumber("6264583.33");
System.out.println("lang3_createNumber_6264583.33---->" + n + "->精度丢失");

//测试fastjson

Object o = com.alibaba.fastjson.JSONObject.parse("3.14159265358");
System.out.println("fastjson_createNumber_3.14159265358---->" + o + "->精度正常");

o = com.alibaba.fastjson.JSONObject.parse("6264583.33");
System.out.println("fastjson_createNumber_6264583.33---->" + o + "->精度正常");

//double标识测试
double d = 6264583.33d;
long l = Double.doubleToLongBits(d);
System.out.println(Long.toBinaryString(l));

//float想要表示这个数字
float f = 6264583.33f;
int value = Float.floatToIntBits(f);
System.out.println(Integer.toBinaryString(value));

//double表示这个值
d = 6264583.5d;
l = Double.doubleToLongBits(d);
System.out.println(Long.toBinaryString(l));
}
}

zookeeper是用来管理分布式环境的系统主要用来服务发现,配置管理,同步.大致原理是zookeeper 自身集群的每个节点都维护这一个目录树,内容相同,每个节点的数据一致性由zookeeper自身的算法来解决.下篇尝试.zookeeper本篇主要说明如果部署zookeeper的分布式环境.

下载

zookeeper由apache在管理,下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper/.下载完成后,随便放个目录好了..

配置

本次创建3个节点.
1 . 存储目录准备
首先给每个伪节点创建一个目录.用来存储每个节点保存的目录信息.真实的分布式环境将对应在不同的机器上.
这里我在D:\zookeeper,创建三个目录,分别是zk1,zk2,zk3.
然后为每个集群编写一个myid文件,标识集群id

2 . 启动配置文件
下载完成后,在conf目录会看到由一个zoo_sample.cfg实例配置文件,我们可以以这个为模板.来为分布式环境的每个zookeeper节点配置一个节点的数据目录,端口.其他节点的信息等.

我们在conf目录例创建三个配置文件,分别为zk1.cfg,zk2.cfg,zk3.cfg;
里面的值
zk1.cfg

1
2
3
4
5
6
7
8
tickTime=2000 
initLimit=10
syncLimit=5
dataDir=D:/zookeeper/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zk2.cfg

1
2
3
4
5
6
7
8
tickTime=2000 
initLimit=10
syncLimit=5
dataDir=D:/zookeeper/zk2
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zk1.cfg

1
2
3
4
5
6
7
8
tickTime=2000 
initLimit=10
syncLimit=5
dataDir=D:/zookeeper/zk3
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

这里的server.1.2.3这就是每个机器对应的myid的值.
server.1=127.0.0.1:2888:3888解释一下这条配置.前面的2888是各个节点用来互相交流.选取leader的端口.后面这个端口,3888是各个节点用来和leader沟通的节点.而clientPort 是开放出去,等待客户端连接的端口.

启动

分别启动三个实例,在zookeeper的安装目录下.进如bin目录,复制三个zkServer.cmd 文件,要是linux就不用这么麻烦了..

分别加上一行

set ZOOCFG=../conf/zk1.cfg

最终这个文件像这样

1
2
3
4
5
6
7
8
9
setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
set ZOOCFG=../conf/zk1.cfg
echo on
java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal

然后直接双击启动zkServer1.cmd,zkServer2.cmd,zkServer3.cmd

刚启动第一个之后,你会看到有报错,是zookeeper进行选举的时候报错的.因为第一个zk节点.从自己的启动配置里,知道还有两个节点,于是尝试连接.但是连接不上,再启动另外两个.都启动后,报错消失

然后在D:\zookeeper中可以看到由数据写入.
zk数据目录

测试

启动bin目录的zkCli.cmd,自动连接本机的2181端口.也可以自己指定
zkCli.cmd –server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
对客户端来说.连接上了一个列表之后,如果一台挂了,并不会影响.系统依旧可以运行.

然后执行一些简单的操作
zk测试结果

显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容。
显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据。
创建文件,并设置初始内容: create /zktest “test” 创建一个新的 znode节点“ zk ”以及与它关联的字符串。
获取文件内容: get /zktest 确认 znode 是否包含我们所创建的字符串。
修改文件内容: set /zktest “zkbak” 对 zk 所关联的字符串进行设置。
删除文件: delete /zktest 将刚才创建的 znode 删除。
退出客户端: quit
帮助命令: help

可以关掉一个服务器,会发现客户端依然正常.可以执行get set等操作.

前几天被前辈问到这个可重入锁,结果忘掉了.于是抽空整个了解一下

目录

  1. 什么是可重入锁
  2. 为什么要可重入
  3. 如何实现可重入锁
  4. 有不可重入锁吗
  5. demo代码展示
  6. 参考文章

1 . 什么是可重入锁

锁的概念就不用多解释了,当某个线程A已经持有了一个锁,当线程B尝试进入被这个锁保护的代码段的时候.就会被阻塞.而锁的操作粒度是”线程”,而不是调用(至于为什么要这样,下面解释).同一个线程再次进入同步代码的时候.可以使用自己已经获取到的锁,这就是可重入锁
java里面内置锁(synchronize)和Lock(ReentrantLock)都是可重入的

2 . 为什么要可重入

如果线程A继续再次获得这个锁呢?比如一个方法是synchronized,递归调用自己,那么第一次已经获得了锁,第二次调用的时候还能进入吗? 直观上当然需要能进入.这就要求必须是可重入的.可重入锁又叫做递归锁,再举个例子.

1
2
3
4
5
6
7
8
9
10
11
12
public class Widget {
public synchronized void doSomething() {
...
}
}

public class LoggingWidget extends Widget {
public synchronized void doSomething() {
System.out.println(toString() + ": calling doSomething");
super.doSomething();//若内置锁是不可重入的,则发生死锁
}
}

这个例子是java并发编程实战中的例 子.synchronized 是父类Widget的内置锁,当执行子 类的方法的时候,先获取了一次Widget的锁,然后在执行super的时候,就要获取一次,如果不可重入,那么就跪了.

3 . 如何实现可重入锁

为每个锁关联一个获取计数器和一个所有者线程,当计数值为0的时候,这个所就没有被任何线程只有.当线程请求一个未被持有的锁时,JVM将记下锁的持有者,并且将获取计数值置为1,如果同一个线程再次获取这个锁,技术值将递增,退出一次同步代码块,计算值递减,当计数值为0时,这个锁就被释放.
ReentrantLock里面有实现

4 . 有不可重入锁吗

这个还真有.Linux下的pthread_mutex_t锁是默认是非递归的。可以通过设置PTHREAD_MUTEX_RECURSIVE属性,将pthread_mutex_t锁设置为递归锁。如果要自己实现不可重入锁,同可重入锁,这个计数器只能为1.或者0,再次进入的时候,发现已经是1了,就进行阻塞.jdk里面没有默认的实现类.

5 . demo代码展示

5.1 内置锁的可重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ReentrantTest {
public void method1() {
synchronized (ReentrantTest.class) {
System.out.println("方法1获得ReentrantTest的内置锁运行了");
method2();
}
}

public void method2() {
synchronized (ReentrantTest.class) {
System.out.println("方法1里面调用的方法2重入内置锁,也正常运行了");
}
}

public static void main(String[] args) {
new ReentrantTest().method1();
}
}

5.2 lock对象的可重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {
private Lock lock = new ReentrantLock();

public void method1() {
lock.lock();
try {
System.out.println("方法1获得ReentrantLock锁运行了");
method2();
} finally {
lock.unlock();
}
}

public void method2() {
lock.lock();
try {
System.out.println("方法1里面调用的方法2重入ReentrantLock锁,也正常运行了");
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
new ReentrantLockTest().method1();
}
}

5.3 不同线程不可访问同一锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantTwoThreadTest {
private static Lock lock = new ReentrantLock();

private static class T1 extends Thread {
@Override
public void run() {
System.out.println("线程1启动");
lock.lock();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println("线程2终止");
}
}

private static class T2 extends Thread {
@Override
public void run() {
System.out.println("线程2启动");
lock.lock();
lock.unlock();
System.out.println("线程2终止");
}
}


public static void main(String[] args) {
new T1().start();
Thread.sleep(100);
new T2().start();
}
}

6. 参考文章

  1. 可重入锁测试
  2. 生产者消费者的一个更真实的例子
  3. 浅谈Java中的锁
  4. java并发编程实战

最近有个感受,在实践中学习固然重要,但是实践遇到的问题常常并没有想象的那么多,而且并不能覆盖所有的情况,所以还是需要对理论有一些深入的理解

什么是事务

事务指的是逻辑上的一组操作,这组操作要么全部成功,要么全部失败,不允许出现部分成功的情况.

事务的特性

定义了事务之后,事务四个特性

  1. 原子性
    事务是不可分割的单位,事务中的这组操作要么都发生,要么都不发生.
  2. 一致性
    一致性说是事务执行前后必须要保持一致,不能出现凭空消失的情况,典型的如银行转账的操作,A给B转账,如果刚开始两人总共有100元,转账完成后两人总共还要有100元.
  3. 隔离性
    多个用户并发访问数据库的时候,一个用户的事务不能被其他的用户的事务所干扰.多个并发事务之间数据要相互隔离.比如事务1,C给A转帐,此时事务2,A给B转账.那么两个事务都要修改A账户的余额,一个增加,一个减少,如何保证他们改完之后数据是对的.这是隔离性的要求.
  4. 持久性
    一旦事务被提交,对数据库的改变就是持久性的.即使数据库发生故障也不应该有任何影响.

事务的隔离级别

为什么要有隔离级别呢,因为如果没有隔离级别,当两个事务同时对某条记录进行操作的时候,可能会出现如下几种大家常常听到的情况.

1 脏读
脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。

1
2
3
4
5
6

事务1:更新一条数据

------->事务2:读取事务1更新的记录

事务1:调用commit进行提交

由于事务2使用了事务1还没有提交的记录,如果事务1最后正常提交了还好,但是如果事务1没有提交,而是回滚了.那么事务2的操作就有问题,因为他用的数据是错的.这就是脏读

2 不可重复读
在同一事务中,两次读取同一数据,得到内容不同

1
2
3
4
5
6
7
8

事务1:查询一条记录

————–>事务2:更新事务1查询的记录

————–>事务2:调用commit进行提交

事务1:再次查询上次的记录

事务1要进行两次查询来做一些比如展示或者使用的操作,但是在两次查询事件被事务2更新掉了记录,所以事务1就出现了不可重复读的问题.

3 幻读
同一事务中,用同样的操作读取两次,得到的记录数不相同

1
2
3
4
5
6
7
8

事务1:查询表中所有记录

————–>事务2:插入一条记录

————–>事务2:调用commit进行提交

事务1:再次查询表中所有记录

此时事务1两次查询到的记录是不一样的,称为幻读

幻读的重点是新增或者删除,由于另一个事务对表中进行了新增或者删除,到时当前事务每次看到的都条数不一样,就像发生了幻觉一样,查一次多了一条,再查一次,发现又没了.

为此,对事务引入了隔离级别这个概念,由数据库保证
DEFAULT 使用数据库设置的隔离级别 ( 默认 ) ,由 DBA 默认的设置来决定隔离级别 .
READ_UNCOMMITTED 会出现脏读、不可重复读、幻读 ( 隔离级别最低,并发性能高 )
READ_COMMITTED 会出现不可重复读、幻读问题(锁定正在读取的行)
REPEATABLE_READ 会出幻读(锁定所读取的所有行)
SERIALIZABLE 保证所有的情况不会发生(锁表)
隔离级别
可以看到,这四种从上到下性能越来越差,保障性越来越高.

以解决幻读问题为例,SERIALIZABLE直接进行了锁表,那么印发幻读的对该表的插入和删除都无法操作,只能查询.所以不会有问题了..

事务的传播行为

事务的传播行为主要是为了解决事务嵌套调用的问题,比如A方法里面使用了事务操作,B方法里面也使用了事务操作,当A调用B的时候.这个情况是如何处理的呢

1 REQUIRED 业务方法需要在一个事务中运行,如果方法运行时,已处在一个事务中,那么就加入该事务,否则自己创建一个新的事务.这是spring默认的传播行为.

2 SUPPORTS 如果业务方法在某个事务范围内被调用,则方法成为该事务的一部分,如果业务方法在事务范围外被调用,则方法在没有事务的环境下执行.

3 MANDATORY 只能在一个已存在事务中执行,业务方法不能发起自己的事务,如果业务方法在没有事务的环境下调用,就抛异常

4 REQUIRES_NEW 业务方法总是会为自己发起一个新的事务,如果方法已运行在一个事务中,则原有事务被挂起,新的事务被创建,直到方法结束,新事务才结束,原先的事务才会恢复执行.
5 NOT_SUPPORTED 声明方法需要事务,如果方法没有关联到一个事务,容器不会为它开启事务.如果方法在一个事务中被调用,该事务会被挂起,在方法调用结束后,原先的事务便会恢复执行.
6 NEVER 声明方法绝对不能在事务范围内执行,如果方法在某个事务范围内执行,容器就抛异常.只有没关联到事务,才正常执行.

7 NESTED 如果一个活动的事务存在,则运行在一个嵌套的事务中.如果没有活动的事务,则按REQUIRED属性执行.它使用了一个单独的事务, 这个事务拥有多个可以回滚的保证点.内部事务回滚不会对外部事务造成影响, 它只对DataSourceTransactionManager 事务管理器起效.

总共7个,1,4,7最重要.1就是说A和B会在A的事务里.而4是B会开启一个新的事务,直到完成结束,A的事务才会继续运行.

参考资料

  1. Spring事务管理
  2. Innodb中的事务隔离级别和锁的关系

kafka快速上手,主要是使用kafka提供的测试来做了一下简单测试,实际开发中的使用可能才是我们要关系的.启动zk和kafka,新建topic的过程都不变.

1 新建一个maven工程,引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>

2 编写配置文件

1
2
3
4
5
6
7
8
9
10
public interface KafkaProperties {
public final static String ZK = "127.0.0.1:2181";
public final static String GROUP_ID = "test_group1";
public final static String TOPIC = "test";
public final static String BROKER_LIST = "127.0.0.1:9092";
public final static String SESSION_TIMEOUT = "20000";
public final static String SYNC_TIMEOUT = "20000";
public final static String INTERVAL = "1000";

}

3 编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class KafkaProducer extends Thread {

private Producer<Integer, String> producer;
private String topic;
private Properties props = new Properties();
private final int SLEEP = 1000 * 3;

public KafkaProducer(String topic) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
//生产者直接和broker列表连接
props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
producer = new Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}

@Override
public void run() {
int offsetNo = 1;
while (true) {
String msg = new String("Message_" + offsetNo);
System.out.println("Send->[" + msg + "]");
producer.send(new KeyedMessage<Integer, String>(topic, msg));
offsetNo++;
try {
sleep(SLEEP);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

4 编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class KafkaConsumer extends Thread {

private ConsumerConnector consumer;
private String topic;
private final int SLEEP = 1000 * 3;

public KafkaConsumer(String topic) {
consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
this.topic = topic;
}

private ConsumerConfig consumerConfig() {
Properties props = new Properties();
//消费者使用zk的地址获取连接
props.put("zookeeper.connect", KafkaProperties.ZK);
props.put("group.id", KafkaProperties.GROUP_ID);
props.put("zookeeper.session.timeout.ms", KafkaProperties.SESSION_TIMEOUT);
props.put("zookeeper.sync.time.ms", KafkaProperties.SYNC_TIMEOUT);
props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL);
return new ConsumerConfig(props);
}

@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Receive->[" + new String(it.next().message()) + "]");
try {
sleep(SLEEP);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

5 编写启动辅助类

1
2
3
4
5
6
7
8
9
10
11
12
public class KafkaClientApp {
public static void main(String[] args) {

KafkaProducer pro = new KafkaProducer(KafkaProperties.TOPIC);

pro.start();

KafkaConsumer con = new KafkaConsumer(KafkaProperties.TOPIC);

con.start();
}
}

之后启动测试一下就好了.

  1. 参考来源

kafka快速上手,和kafka中的partition和offset中,已经解释了kafka的一些原理,和完成了一个简单的生产消费的实践,如第一篇所说,kafka是一个分布式环境下的消息组件,那么,按照我们前面的简单上手,如果kafka的应用进程被杀或者kafka的机器宕机,那么kafka消息组件就无法使用了,或者zookeeper宕机了,那么kafka也无法使用了.

kafka集群(cluster)

一台机器不够,那就多搞几台,首先,启动zookeeper这个就不多说了.可以参看前文,在启动kafka的时候,我们在单机模拟启动多个kafka应用.
首先在config目录,copy两个server.properties 文件,这里我复制三份,分别起名server1.properties ,server2.properties server3.properties
然后修改这三个配置文件,主要修改broker.id=2,port=9094,log.dir=/tmp/kafka-logs-2这三个值,broker.id是用来标记分布式环境中的broker的,要求唯一,port和log.dir一个端口,一个log目录,如果在真实的分布式环境中是不需要修改.这里单机模拟防止端口冲突.

分别把broker.id改为1,2,3,log.dir则分别改成kafka-logs-1,kafka-logs-2,kafka-logs-3,然后依次启动
kafka-server-start.bat ../../config/server1.properties
kafka-server-start.bat ../../config/server2.properties
kafka-server-start.bat ../../config/server3.properties

如果你启动有报错,一个就是之前说的那个vm参数太大,另一个可能是你的端口没改好.具体错误看下报错就好了.

然后我们注册一个topic,叫做replicationtest
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replicationtest
这里冗余是3,分区是1,那么最终各个broker都会保留一份,最多允许N-1,也就是2台broker宕机,服务照样运行.
注册之后,这时候
kafka-topics.bat--describe --zookeeper localhost:2181 --topic replicationtest
执行描述命令,看下集群情况
集群描述结果
第一行给出了分区的汇总信息。每个分区行给出分区信息。

“Leader” 节点是2.
“Replicas” 信息,在节点2,3,1上,所有的节点信息.
“Isr” 工作中的复制节点的集合. 也就是活的节点的集合.

其他的就不用解释了.这里选出了2是leader,也就是说2这个节点会给消费者提供服务.

然后我们测试一条信息.
kafka-console-producer.bat --broker-list localhost:7777,localhost:8888,localhost:9999 --topic replicationtest
上面的7777是server1.properties 中设置的.根据个人情况.改改.然后在控制台发发消息.

然后消费一下.
kafka-console-consumer.bat --zookeeper localhost:2181 --topic replicationtest
这里的2181是zookeeper的端口,不用改.
目前运行结果

然后.我们开始关掉一个broker,在3的控制台里CTRL,C.然后是否终止操作,输入Y.
再发一条消息
broker3宕机
一切正常.我们看一下集群信息
broker3宕机集群
发现Isr中存活的机器少了3.因为3挂了.
然后我们关掉broker2.这时候,会触发新的leader选举.期望值1变成leader,再发一条消息
broker2宕机
可以看到生产者发消息过程中,产生了异常,因为和2的连接断开了.但是注意,消息并没有丢,因为触发了新的选举.可以看到,消费者还是接到了正常的消息.集群情况如下
broker2宕机集群

至此,kafka的broker集群测试完毕,那么剩下的问题来了.消费者启动的时候连接的是zookeeper的地址,如果这台zookeeper挂了呢.
那么我们需要zookeeper集群部署.

zookeeper集群

这就包括两部分.

  1. 是broker本来要能知道这些zookeeper集群的地址,当一个宕机的时候,才会切换到另一个zookeeper
  2. 消费者要知道这些zookeeper的地址,理由同上.

因此步骤如下.可以自己试一试,比较简单

  1. 复制3份zookeeper.properties文件,命名为zookeeper1.properties,zookeeper2.properties,zookeeper3.properties,修改文件中的dataDir=/tmp/zookeeper和,clientPort=2181,端口分别设置为2181,2182,2183.然后启动三个zookeeper
  2. 修改kafka启动配置,server1.properties三个文件中的zookeeper.connect=localhost:2181这个配置,逗号隔开.最终为zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,然后启动
  3. 生产者也改下配置中的.metadata.broker.list=localhost:9092,如果使用命令行启动就不用改了.参数指定也可以.
  4. 消费者同理,可以改下配置文件中zookeeper.connect=127.0.0.1:2181,也可以命令行启动的时候修改.
    5.最终就是各种宕机测试了.

kafka快速上手中,留下的问题是关于partition和offset,这篇文章主要解释这个.

Log机制

说到分区,就要说kafka对消息的存储.在官方文档中.
分区读写日志图
首先,kafka是通过log(日志)来记录消息发布的.每当产生一个消息,kafka会记录到本地的log文件中,这个log和我们平时的log有一定的区别.这里可以参考一下The Log,不多解释.

这个log文件默认的位置在config/server.properties中指定的.默认的位置是log.dirs=/tmp/kafka-logs,linux不用说,windows的话就在你对应磁盘的根目录下.我这里是D盘.

#分区partition#
kafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小.因此有了partition的概念.

kafka对消息进行一定的计算,通过hash来进行分区.这样,就把一份log文件分成了多份.如上面的分区读写日志图,分成多份以后,在单台broker上,比如快速上手中,如果新建topic的时候,我们选择了--replication-factor 1 --partitions 2,那么在log目录里,我们会看到
test-0目录和test-1目录.就是两个分区了.

你可能会想,这特么没啥区别呀.注意,当有了多个broker之后,这个意义就存在了.这里上一张图,原文在参考链接里有
kafka分布式分区存储
这是一个topic包含4个Partition,2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法.将总共8份数据,分配到broker集群上.

结果就是每个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.比如图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.所以还能使用,如果再宕机一台,那么数据不完整了.当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.需要在存储占用和高可用之间做衡量.
至于宕机后,zookeeper会选出新的partition leader.来提供服务.这个等下篇文章

#偏移offset#

上一段说了分区,分区就是一个有序的,不可变的消息队列.新来的commit log持续往后面加数据.这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息.

消费者消费到了哪条消息,是保持在消费者这一端的.消息者也可以控制,消费者可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也可以重置offset

#如何通过offset算出分区#

其实partition存储的时候,又分成了多个segment(段),然后通过一个index,索引,来标识第几段.这里先可以去看一下本地log目录的分区文件夹.
在我这里,test-0,这个分区里面,会有一个index文件和一个log文件,
index和log

对于某个指定的分区,假设每5个消息,作为一个段大小,当产生了10条消息的情况想,目前有会得到(只是解释)
0.index (表示这里index是对0-4做的索引)
5.index (表示这里index是对5-9做的索引)
10.index (表示这里index是对10-15做的索引,目前还没满)

0.log
5.log
10.log
,当消费者需要读取offset=8的时候,首先kafka对index文件列表进行二分查找,可以算出.应该是在5.index对应的log文件中,然后对对应的5.log文件,进行顺序查找,5->6->7->8,直到顺序找到8就好了.

具体的算法参看美团的文章好了

更多文档

  1. 官方文档
  2. Kafka文件存储机制那些事
  3. Kafka集群partition replication自动分配分析

简单介绍

kafka是一个分布式消息中间件,在kafka中主要涉及到四个基本名词:
Topic
Kafka将消息种子分门别类, 每一类的消息称之为一个主题(Topic).

Producer
发布消息的对象称之为主题生产者.

Consumer
订阅消息并处理消息的对象称之为主题消费者

Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器称为一个代理(Broker). 消费者可以订阅一个或多个主题,并从Broker拉数据(注意是拉,不是pull,),从而消费这些已发布的消息。

安装(以windows为例)

安装非常简单,从这里下载,下载完成后解压到一个目录就好了.

简单使用

首先使用kafka的一个流程就是生产者生产消息,发送给kafka集群,然后消费者从kafka集群中获取消息进行消费.
要启动kafka需要先启动zookeeper,因为ZooKeeper是通过冗余服务实现高可用性的,也就是说在分布式环境中,如何保证kafka集群的高可用.zookeeper会来做leader选取,当消费者准备发消息时,会从zookeeper中获取一个可用的消息服务器地址,然后连接进行发送,保证党集群内有服务器宕机并不影响整体的使用.
来自slideshare的一张图

1.启动自带的简易zookeeper.
进行解压目录的bin/windows目录
zookeeper-server-start.bat ../../config/zookeeper.properties

执行命令启动,从zookeeper.properties中会看到.zookeeper会开发一个clientPort=2181,2181的端口给消费者使用,其实也可以给生产者使用,但是在0.8.0版本后,producer不再通过zookeeper连接broker, 而是通过brokerlist(192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092配置,直接和broker连接,只要能和一个broker连接上就能够获取到集群中其他broker上的信息,绕过了zookeeper.

2.启动kafka服务
kafka-server-start.bat ../../config/server.properties 执行启动,另一个命令行窗口,同样的.查看配置问题,会知道kafka的服务会在port=9092 ,9092端口打开.

3.注册一个topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
这个命令中,create表示创建.zookeeper 和后面的地址表示kafka使用本机2181端口开放的zookeeper保持高可用.replication-factor表示消息只冗余一份,目前我们只有一个kafka机器,broker,partitions 表示一份分区,分区是kafka的另一个概念,大致是说,同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的位置上,这个下次写好了.这样已经在kafka注册了一个名为test的消息topic了.

4.使用简易的控制台生产者模拟
kafka-console-producer.bat --broker-list localhost:9092 --topic test
前面说过了.新版本生产者直接通过brokerlist来连接kafka,目前只有一台,所以就一个地址,准备向test这个topic发送消息.

5.使用简易的控制台消费者模拟
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
这个前面也说过了.消费者使用zookeeper获取可用的broker列表,然后拉去消息,并且还有一些offset同步的问题.和分区,文件存储一起的一个概念,下次写.

6.开始生产和消费消息
至此,已经开了四个控制台窗口了..在producer窗口里,随便打几个字,然后enter,在消费者的窗口里将会显示出来.
实际测试图

其他问题

实际可能不那么顺利,如果你启动kafka或者其他应用的时候,有错误提示,提示无法创建虚拟机vm这样的.那么修改一下对应的bat脚本.就好了
启动错误,vm的heap申请是1G,如果你机器内存不够,改成512M,或者更小的就好了.

更多文档

  1. 官方文档
  2. kafka快速入门