除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

0. 背景

公司从 naocs 的注册中心到 zookeeper 注册中心转变,自己也好久没看过 zookeeper 相关了,上次应该看的是《从Paxos到Zookeeper》一书,这次正好趁年假把另一本经典的书《Zookeeper:分布式过程协同技术详解》看下。

1. 总览

一共十章,八章偏入门实战,敲代码和实战理论为主,在 github 上也有相应的源码。第九章是原理讲解,最后一章是配置详解。
前八章应该属于入门实战,通过介绍四大节点类型和监听,来实现一个简单的分布式任务协调中心。
第九章通过对源码讲解,对关键的“顺序一致性”、“事务”做代码上的了解。
最后一章应该属于参数调优,适合运维和部署者,属于优化点。

2. 入门实战

入门实战分布式任务调度中心书中源码地址:https://github.com/fpj/zookeeper-book-example

关键点:

  1. 临时节点
  2. 临时有序节点
  3. 永久节点
  4. 永久临时节点
  5. watch 监听
  6. 异步调用
  7. ACL 权限控制
  8. watch 羊群效应的避免
  9. watch 事件类型业务处理
  10. curator 开源客户端的使用

观察者的横向扩展用于读性能提高。而在选举时的跟随者以及领导者时的 zxid 的 zab 广播协议过程是通过延长领导者选举时间、超过半数来避免脑裂的产生。

3. 源码分析

关键步骤:

  1. git clone https://github.com/apache/zookeeper.git
  2. ant eclipse 编译
  3. idea 导入: file - new - Project From Existing Sources
  4. zookeeper-jute 项目需要编译得到生成之后的代码: maven clean install
  5. 复制 zookeeper-jute 项目的 target/classed/org 到 zookeeper-server 下
  6. 在 zookeeper-server 项目的 org.apache.zookeeper.version 包新建 Info 类:

    1
    2
    3
    4
    5
    6
    7
    8
    public interface Info {
    int MAJOR=3;
    int MINOR=5;
    int MICRO=6;
    String QUALIFIER=null;
    String REVISION_HASH="c11b7e26bc554b8523dc929761dd28808913f091";
    String BUILD_DATE="01/19/2020 10:13 GMT";
    }
  7. 启动 server:执行 org.apache.zookeeper.server.quorum.QuorumPeerMain 类并带 JVM 参数:-Dlog4j.configuration=file:/Users/liwenguang/SourceCode/middle/zookeeper/conf/log4j.properties 以及启动参数:conf/zoo_sample.cfg

-Dzookeeper.4lw.commands.whitelist=* 用于 telnet 命令查询。

  1. 至此,server 启动完毕
  2. 启动 client:执行 org.apache.zookeeper.ZooKeeperMain,启动参数:-server 127.0.0.1:2181 ls /zookeeper
  • ls /zookeeper 为命令
  1. 至此 client 启动测试完毕

4. 最后

翻译的好烂!错别字不说,句子翻译的都不通畅!但是真适合入门(第九章除外),第九章适合看了《从Paxos到Zookeeper》之后再看。

除非你觉得你的时间不是很宝贵,否则不要看这篇流水账式的博文,这只是篇个人的工作的学习一个总结而已,没有包含任何的技术细节
阅读全文 »

0. 背景

一直在用 RPC,但是对非常重要的序列化原理一直模糊不懂,尤其是其中的 serialVersionUID 也模糊不清,虽然 Dubbo 已经帮我们使用了经过优化过的 hession2,但是还是想一探究竟,为什么原生的 JDK 序列化就不行呢?原生 JDK 序列化到底是怎么样的。

主要参考《分布式Java应用》的4.3序列化/反序列化章节

1. 序列化之 writeObject

1.1 获取类信息

序列化时,需要获取到当前序列化对象的类信息,获取类信息主要是调用的 ObjectStreamClass.lookup(cl, true),ObjectStreamClass 会通过 ConcurrentMap 缓存类信息优化序列化速度:static final ConcurrentMap<WeakClassKey,Reference<?>> localDescs,其中 key 使用了弱引用(在 ThreadLocal 也有使用弱引用)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void writeObject0(Object obj, boolean unshared)
throws IOException
{
// ...
for (;;) {
// REMIND: skip this check for strings/arrays?
Class<?> repCl;
// 获取类信息
desc = ObjectStreamClass.lookup(cl, true);
if (!desc.hasWriteReplaceMethod() ||
(obj = desc.invokeWriteReplace(obj)) == null ||
(repCl = obj.getClass()) == cl)
{
break;
}
cl = repCl;
}
if (enableReplace) {
Object rep = replaceObject(obj);
if (rep != obj && rep != null) {
cl = rep.getClass();
desc = ObjectStreamClass.lookup(cl, true);
}
obj = rep;
}

// if object replaced, run through original checks a second time
if (obj != orig) {
subs.assign(orig, obj);
if (obj == null) {
writeNull();
return;
} else if (!unshared && (h = handles.lookup(obj)) != -1) {
writeHandle(h);
return;
} else if (obj instanceof Class) {
writeClass((Class) obj, unshared);
return;
} else if (obj instanceof ObjectStreamClass) {
writeClassDesc((ObjectStreamClass) obj, unshared);
return;
}
}

// remaining cases
if (obj instanceof String) {
writeString((String) obj, unshared);
} else if (cl.isArray()) {
writeArray(obj, desc, unshared);
} else if (obj instanceof Enum) {
writeEnum((Enum<?>) obj, desc, unshared);
} else if (obj instanceof Serializable) {
writeOrdinaryObject(obj, desc, unshared);
} else {
if (extendedDebugInfo) {
throw new NotSerializableException(
cl.getName() + "\n" + debugInfoStack.toString());
} else {
throw new NotSerializableException(cl.getName());
}
}
} finally {
depth--;
bout.setBlockDataMode(oldMode);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
private static class Caches {
/** cache mapping local classes -> descriptors */
static final ConcurrentMap<ObjectStreamClass.WeakClassKey,Reference<?>> localDescs =
new ConcurrentHashMap<>();

/** cache mapping field group/local desc pairs -> field reflectors */
static final ConcurrentMap<ObjectStreamClass.FieldReflectorKey,Reference<?>> reflectors =
new ConcurrentHashMap<>();

/** queue for WeakReferences to local classes */
private static final ReferenceQueue<Class<?>> localDescsQueue =
new ReferenceQueue<>();
/** queue for WeakReferences to field reflectors keys */
private static final ReferenceQueue<Class<?>> reflectorsQueue =
new ReferenceQueue<>();
}

static ObjectStreamClass lookup(Class<?> cl, boolean all) {
if (!(all || Serializable.class.isAssignableFrom(cl))) {
return null;
}
processQueue(ObjectStreamClass.Caches.localDescsQueue, ObjectStreamClass.Caches.localDescs);
ObjectStreamClass.WeakClassKey key = new ObjectStreamClass.WeakClassKey(cl, ObjectStreamClass.Caches.localDescsQueue);
Reference<?> ref = ObjectStreamClass.Caches.localDescs.get(key);
Object entry = null;
if (ref != null) {
entry = ref.get();
}
ObjectStreamClass.EntryFuture future = null;
if (entry == null) {
ObjectStreamClass.EntryFuture newEntry = new ObjectStreamClass.EntryFuture();
Reference<?> newRef = new SoftReference<>(newEntry);
do {
if (ref != null) {
ObjectStreamClass.Caches.localDescs.remove(key, ref);
}
ref = ObjectStreamClass.Caches.localDescs.putIfAbsent(key, newRef);
if (ref != null) {
entry = ref.get();
}
} while (ref != null && entry == null);
if (entry == null) {
future = newEntry;
}
}

if (entry instanceof ObjectStreamClass) { // check common case first
return (ObjectStreamClass) entry;
}
if (entry instanceof ObjectStreamClass.EntryFuture) {
future = (ObjectStreamClass.EntryFuture) entry;
if (future.getOwner() == Thread.currentThread()) {
/*
* Handle nested call situation described by 4803747: waiting
* for future value to be set by a lookup() call further up the
* stack will result in deadlock, so calculate and set the
* future value here instead.
*/
entry = null;
} else {
entry = future.get();
}
}
if (entry == null) {
try {
entry = new ObjectStreamClass(cl);
} catch (Throwable th) {
entry = th;
}
if (future.set(entry)) {
ObjectStreamClass.Caches.localDescs.put(key, new SoftReference<Object>(entry));
} else {
// nested lookup call already set future
entry = future.get();
}
}

if (entry instanceof ObjectStreamClass) {
return (ObjectStreamClass) entry;
} else if (entry instanceof RuntimeException) {
throw (RuntimeException) entry;
} else if (entry instanceof Error) {
throw (Error) entry;
} else {
throw new InternalError("unexpected entry: " + entry);
}
}

1.2 初始化 ObjectStreamClass

此时从缓存获取的为 null,因此需要初始化:new ObjectStreamClass(cl);,该初始化方法会有一系列的判断:

  1. 是否是代理类(JDK动态代理都会继承 Proxy)
  2. 是否为 Enum
  3. 是否为 serializable(通过 native)
  4. 是否为 externalizable
  5. 获取父类信息(父类如果没有实现 Serializable 则必须有空构造方法)

当为 serializable 类型,继续如下步骤:

  1. 如果为 enum 则生成一个值为 0 的 suid,并将 fields 设置为空的 ObjectStreamFiled 数组。
  2. 如果为 Array 类型,将 fields 设置为空的 ObjectStreamField 数组。
  3. 如果非以上两种类型则获取定义的 serialVersionUID 的值
  4. 最后写入类信息若发现没有 suid,则根据类签名信息(computeDefaultSUID)来生成
  5. 如果类实现了 Externalizable 接口则调用 writeExternal
  6. 写入流时,直接采用全类名写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private ObjectStreamClass(final Class<?> cl) {
this.cl = cl;
name = cl.getName();
isProxy = Proxy.isProxyClass(cl);
isEnum = Enum.class.isAssignableFrom(cl);
serializable = Serializable.class.isAssignableFrom(cl);
externalizable = Externalizable.class.isAssignableFrom(cl);
// 获取父类信息
Class<?> superCl = cl.getSuperclass();
superDesc = (superCl != null) ? lookup(superCl, false) : null;
localDesc = this;

if (serializable) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
if (isEnum) {
suid = Long.valueOf(0);
fields = NO_FIELDS;
return null;
}
if (cl.isArray()) {
fields = NO_FIELDS;
return null;
}
// 生成 suid
suid = getDeclaredSUID(cl);
try {
fields = getSerialFields(cl);
// 计算Filed空间,引用属性(非基本类型)个数
computeFieldOffsets();
} catch (InvalidClassException e) {
serializeEx = deserializeEx =
new ObjectStreamClass.ExceptionInfo(e.classname, e.getMessage());
fields = NO_FIELDS;
}

if (externalizable) {
cons = getExternalizableConstructor(cl);
} else {
cons = getSerializableConstructor(cl);
writeObjectMethod = getPrivateMethod(cl, "writeObject",
new Class<?>[] { ObjectOutputStream.class },
Void.TYPE);
readObjectMethod = getPrivateMethod(cl, "readObject",
new Class<?>[] { ObjectInputStream.class },
Void.TYPE);
readObjectNoDataMethod = getPrivateMethod(
cl, "readObjectNoData", null, Void.TYPE);
hasWriteObjectData = (writeObjectMethod != null);
}
domains = getProtectionDomains(cons, cl);
writeReplaceMethod = getInheritableMethod(
cl, "writeReplace", null, Object.class);
readResolveMethod = getInheritableMethod(
cl, "readResolve", null, Object.class);
return null;
}
});
} else {
suid = Long.valueOf(0);
fields = NO_FIELDS;
}

try {
fieldRefl = getReflector(fields, this);
} catch (InvalidClassException ex) {
// field mismatches impossible when matching local fields vs. self
throw new InternalError(ex);
}

if (deserializeEx == null) {
if (isEnum) {
deserializeEx = new ObjectStreamClass.ExceptionInfo(name, "enum type");
} else if (cons == null) {
deserializeEx = new ObjectStreamClass.ExceptionInfo(name, "no valid constructor");
}
}
for (int i = 0; i < fields.length; i++) {
if (fields[i].getField() == null) {
defaultSerializeEx = new ObjectStreamClass.ExceptionInfo(
name, "unmatched serializable field(s) declared");
}
}
initialized = true;
}

2. 反序列化之 readObject

在最开始校验:

1
2
3
4
5
6
final static short STREAM_MAGIC = (short)0xaced;

/**
* Version number that is written to the stream header.
*/
final static short STREAM_VERSION = 5;

  1. 读取流中类名、suid,是否有 writeObject、是否是 enum 类型,Filed 个数属性等。