大致流程

EventBus 是一种用于 Android 的事件发布-订阅总线, 是一个可以实现组件间通信的一个第三方框架, 使用起来非常的简单, 一共也就是 4 个步骤, 注册订阅, 发送事件, 解除订阅, 大致的流程如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MainActivity extends Activity {

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(initLayout());
        // 第一步
        EventBus.getDefault().register(this); // 注册一个订阅者
        
        // 第四步
        EventBus.getDefault().post(""); // 发送一个事件
    }
    
    // 第二步
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void receiverEvent(String/* 事件类型 */ str) {
    
    }
    
    @Override
    protected void onDestroy() {
        super.onDestroy();
        // 第三步
        EventBus.getDefault().unregister(this); // 解除注册
    }
}

下面,就让我们看看源码中是如何实现的, 其具体的实现原理是什么

注册订阅事件

首先先看一下大致的 EventBus 一些类的成员信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class EventBus {
    static volatile EventBus defaultInstance;
    
     //DCL 单利设计模式
     public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }
    
    // Key 对应的是事件类型, Value 对应的是关于事件类型的信息集合
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    // Key 对应的是订阅者, Value 对应的是事件类型集合
    private final Map<Object, List<Class<?>>> typesBySubscriber;
    // 存储粘性事件的集合, Key 对应的是事件类型类, Value 存储的是事件类型
    private final Map<Class<?>, Object> stickyEvents;
}

注册

具体在源码中的注册流程如下, 看这部分的代码, 其实也就是3个部分

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void register(Object subscriber) {
        // 获取订阅者的 Class 类
        Class<?> subscriberClass = subscriber.getClass();
        // 找到订阅者的所有订阅方法, 进行封装
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                // 订阅
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

通过 register 注册订阅者, 然后再通过订阅者找到订阅者的相关的方法的信息, 封装成一个集合, 然后再开始订阅, 如果根据上面的小示例的话, 传入的是 MainActivity.class 类, 然后找到这个类被添加了注解 Subscribe 的订阅的所有方法, 封装成一个集合, 再订阅相关操作, 具体再看源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 检测缓存中是否有订阅者的信息, 如果有直接返回
        // 反射需要消耗性能, 所以这里做了一个缓存处理
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        // 这里默认为false, 
        if (ignoreGeneratedIndex) {
            // 使用反射的方式
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 这里是使用编译时注解的方式
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        
        // 为空的话,直接抛了个异常, 也就是说, 如果你订阅了, 必须要有方法
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // 将对应的 class 订阅者, 和方法保存起来
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        // 创建了 FindState 对象, 这里使用了一个享元设计模式, 也就是对象的复用
        FindState findState = prepareFindState();
        // 初始化值
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 寻找订阅者中的所有方法
            findUsingReflectionInSingleClass(findState);
            // 检查父类是否有订阅了的方法
            // 如果父类没有订阅了的方法, 那么就退出了这个循环 
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
    
    
    
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            // 通过反射找到订阅者的所有方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            // 是否跳过父类的查找, 默认为false
            findState.skipSuperClasses = true;
        }
        // 遍历所有的方法
        for (Method method : methods) {
            // 获取方法的权限修饰符
            int modifiers = method.getModifiers();
            // 必须要为 public 的
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                // 获取方法的参数
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 方法的参数的长度必须是 1
                if (parameterTypes.length == 1) {
                    // 获取方法注解上面的信息
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            // 线程模型
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 将解析到的所有属性,进行封装,添加到集合中
                            findState.subscriberMethods.add(new
                            SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

订阅

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // 获取方法参数的 calss
        Class<?> eventType = subscriberMethod.eventType;
        // 创建 newSubscription , 将对应的订阅者, 和订阅者的方法封装在一个对象中
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            // 创建线程安全的 ArrayList
            subscriptions = new CopyOnWriteArrayList<>();
            // 存储到集合中
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
    
        // 获取方法的个数, 主要这里处理的是优先级
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        // 将事件类,添加到订阅者集合中
        subscribedEvents.add(eventType);

        // 处理粘性事件
        if (subscriberMethod.sticky) {
            if (eventInheritance) { // 默认为 true
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    // 判断类类型是否相同
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        // 发布处理事件
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

这里的操作, 其实没有做什么, 只是将订阅者类的信息方法检测和封装, 分别是方法必须是为 public 的, 事件类型参数只能为一个, 然后对线程模型, 优先级, 是否为粘性事件

这里还有一个特别重要的信息, 那就是我们都知道粘性事件, 和普通发送的 post 事件不同, post 事件是先订阅后发布, 粘性事件是可以先发布后订阅, 在上面的源码中, subscriberMethod.sticky 在 for 循环处理的时候会检查是否是粘性事件, 如果是粘性事件, 其实在我们发送一个粘性事件的时候, 会往集合中存入事件, 那我们到了另外一个界面, 订阅的时候如果检查到有粘性事件就直接给他处理了, 这也就是为什么粘性事件可以先发布后订阅的原因

发布 POST

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void post(Object event) {
        // 这里使用的 ThreadLocal 对象维护的一个某个线程下唯一的对象 (和Handler 源码中的 ThreadLocal 也是一样的)
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            // 判断是否是 主线程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    // 主要的方法, 发送事件
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        // 获取事件的 class
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            // 查找 eventClass 的所有的父类和接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                // for循环, 依次发送事件
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            // 发送事件
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            // 在缓存中得到 Subscription 列表
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            // 遍历集合中的所有 subscription
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    // 发送事件
                    postToSubscription(subscription, event, postingState.isMainThread);
                    // 事件是否取消了
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                // 如果取消了, 退出循环
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                // 直接执行,  发送的事件在什么线程,就是什么线程
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                // 在主线程中执行
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    // 如果发送的线程不是主线程, 那么使用 Handler 切换回主线程来
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
                // 在子线程中执行
            case BACKGROUND:
                if (isMainThread) {
                    // 如果发布的线程是主线程, 切换回子线程中执行, 这里使用的是线程池 Executors.newCachedThreadPool()
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    // 直接执行
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                // 和发送事件处于不同的线程
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

粘性事件

1
2
3
4
5
6
7
8
9
 public void postSticky(Object event) {
        synchronized (stickyEvents) {
            // 存入到集合中
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        // 直接执行
        post(event); 
    }

将订阅信息存入到集合中, 这个主要是为了, 先发布后订阅的时候可以执行, 也就是上面源码分析的地方, 订阅的时候, 判断集合中是否有粘性事件, 如果存在粘性事件, 并且事件类型相同的话, 就直接执行了。 这里可以发现, 还有一个 post(event) 方法, 也就是说, 这里也可以先订阅后发布的那种方式处理事件

取消订阅

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  public synchronized void unregister(Object subscriber) {
        // 通过订阅者, 获取所有订阅事件类集合, Key 对应的是订阅者, Value 对应的是事件类型集合
        // 概括为移除订阅者对应订阅者信息(相关方法和注解上面的信息等)
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                // 将订阅者的订阅信息移除
                unsubscribeByEventType(subscriber, eventType);
            }
            // 集合中移除订阅者
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {       
        // 获取事件类的所有订阅信息列表,将订阅信息从订阅信息集合中移除,同时将订阅信息中的active属性置为FALSE
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    // 置为false
                    subscription.active = false;
                    // 移除
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

总结

好了, 到了这里大致的执行流程的源码就分析完成了, 这里其实有点像观察者设计模式, 可以说是,也可以说不是,或者说运用了观察者设计模式的思想,本质上内部核心是使用的反射, 主要是靠几个集合类, 然后通过反射的方式进行执行的, 也就是其核心原理了

  • 注册的时候, 通过查找订阅者类信息上面的注解信息, 封装成 SubscriberMethod 存入到集合中 (subscriptionsByEventType Key 对应的是事件类型, Value 对应的是关于事件类型的信息集合), 并缓存起来, 然后再对粘性事件进行处理
  • 发送的时候, 通过订阅者查找所有订阅者中对应的事件类型, 再反射执行相应的所有的事件类型
  • 解除注册的时候通过订阅者查找 typesBySubscriber (Key 对应的是订阅者, Value 对应的是事件类型集合) 移除相应的事件类型集合, 然后再移除订阅者信息