點選上方“Java技術驛站”,選擇“置頂公眾號”。
有內涵、有價值的文章第一時間送達!
EventBus來自於google-guava包中。原始碼註釋如下:
Dispatches events to listeners, and provides ways for listeners to register themselves. The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.
翻譯:將事件分派給監聽器,併為監聽器提供註冊自己的方法。EventBus允許元件之間的釋出 – 訂閱式通訊,而不需要元件彼此明確註冊(並且因此彼此意識到)。 它專門用於使用顯式註冊替換傳統的Java行程內事件分發。 它不是一個通用的釋出 – 訂閱系統,也不是用於行程間通訊。
使用參考
關於EventBus的用例程式碼提取自sharding-jdbc原始碼,並結合lombok最大限度的簡化:
-
EventBusInstance–用於獲取EventBus實體(餓漢式單例樣式)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EventBusInstance {
private static final EventBus INSTANCE = new EventBus();
public static EventBus getInstance() {
return INSTANCE;
}
}
-
DMLExecutionEvent--釋出訂閱事件模型
@Getter
@Setter
public class DMLExecutionEvent {
private String id;
private String dataSource;
private Date sendTime;
}
-
DMLExecutionEventListener--事件監聽器
public final class DMLExecutionEventListener {
@Subscribe
@AllowConcurrentEvents
public void listener(final DMLExecutionEvent event) {
System.out.println("監聽的DML執行事件: " + JSON.toJSONString(event));
// do something
}
}
-- Main--主方法:註冊訂閱者監聽事件,以及釋出事件。
/**
* @author wangzhenfei9
* @version 1.0.0
* @since 2018年04月24日
*/
public class Main {
static{
System.out.println("register listener...");
EventBusInstance.getInstance().register(new DMLExecutionEventListener());
}
public static void main(String[] args) throws InterruptedException {
for (int i=0; i<10; i++) {
pub();
Thread.sleep(3000);
}
}
private static void pub(){
DMLExecutionEvent event = new DMLExecutionEvent();
event.setId(String.valueOf(new Random().nextInt(1000)));
event.setDataSource("sj_db_1");
event.setSendTime(new Date());
System.out.println("釋出的DML執行事件: " + JSON.toJSONString(event));
EventBusInstance.getInstance().post(event);
}
}
核心方法
EventBus一些重要方法解釋如下:
-
post(Object):Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers, and regardless of any exceptions thrown by subscribers.
-
register(Object): Registers all subscriber methods on object to receive events.Subscriber methods are selected and classified using this EventBus's SubscriberFindingStrategy; the default strategy is the AnnotatedSubscriberFinder.
-
unregister(Object):Unregisters all subscriber methods on a registered object.
原始碼分析
主要分析釋出事件以及訂閱的核心原始碼;
釋出原始碼分析
public void post(Object event) {
// 得到所有該類已經它的所有父類(因為有些註冊監聽器是監聽其父類)
Set<Class>> dispatchTypes = flattenHierarchy(event.getClass());
boolean dispatched = false;
// 遍歷類本身以及所有父類
for (Class> eventType : dispatchTypes) {
// 重入讀鎖先鎖住
subscribersByTypeLock.readLock().lock();
try {
// 得到類的所有訂閱者,例如DMLExecutionEvent的訂閱者就是DMLExecutionEventListener(EventSubscriber有兩個屬性:重要的屬性target和method,target就是監聽器即DMLExecutionEventListener,method就是監聽器方法即listener;從而知道DMLExecutionEvent這個事件由哪個類的哪個方法監聽處理)
Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
if (!wrappers.isEmpty()) {
// 如果有時間訂閱者,那麼dispatched = true,表示該事件可以分發
dispatched = true;
// 遍歷所有的時間訂閱者,每個訂閱者的佇列都增加該事件
for (EventSubscriber wrapper : wrappers) {
enqueueEvent(event, wrapper);
}
}
} finally {
subscribersByTypeLock.readLock().unlock();
}
}
if (!dispatched && !(event instanceof DeadEvent)) {
post(new DeadEvent(this, event));
}
// 分發進入佇列的事件
dispatchQueuedEvents();
}
/**
* queues of events for the current thread to dispatch;
* 核心資料結構為LinkedList,儲存的是EventBus.EventWithSubscriber型別資料
*/
private final ThreadLocal<Queue<EventBus.EventWithSubscriber>> eventsToDispatch =
new ThreadLocal<Queue<EventBus.EventWithSubscriber>>() {
@Override protected Queue<EventBus.EventWithSubscriber> initialValue() {
return new LinkedList<EventBus.EventWithSubscriber>();
}
};
void enqueueEvent(Object event, EventSubscriber subscriber) {
// 資料結構為new LinkedList
(),EventWithSubscriber就是對event和subscriber的封裝,LinkedList資料結構保證進入佇列和消費佇列順序一致
eventsToDispatch.get().offer(new EventBus.EventWithSubscriber(event, subscriber));
}
/**
* Drain the queue of events to be dispatched. As the queue is being drained,
* new events may be posted to the end of the queue.
* 排乾要被分發的事件佇列,正在排乾的過程中,可能有新的事件被追加到佇列尾部
*/
void dispatchQueuedEvents() {
// don't dispatch if we're already dispatching, that would allow reentrancy
// and out-of-order events. Instead, leave the events to be dispatched
// after the in-progress dispatch is complete.
// 如果正在排乾佇列,則不分發
if (isDispatching.get()) {
return;
}
// ThreadLocal設定正在分發即isDispatching為true
isDispatching.set(true);
try {
Queue<EventBus.EventWithSubscriber> events = eventsToDispatch.get();
EventBus.EventWithSubscriber eventWithSubscriber;
while ((eventWithSubscriber = events.poll()) != null) {
// 呼叫訂閱者處理事件(method.invoke(target, new Object[] { event });,method和target來自訂閱者)
dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
}
} finally {
// ThreadLocal可能記憶體洩漏,用完需要remove
isDispatching.remove();
// 佇列中的事件任務處理完,清空佇列,即所謂的排乾(Drain)
eventsToDispatch.remove();
}
}
訂閱原始碼分析
/**
* Registers all subscriber methods on {@code object} to receive events.
* 註冊object上所有訂閱方法,用來接收事件,上面的使用參考,DMLExecutionEventListener就是這裡的object
*/
public void register(Object object) {
// Multimap是guava自定義資料結構,類似Map
>,key就是事件型別,例如DMLExecutionEvent,value就是EventSubscriber即事件訂閱者集合(說明,這個的訂閱者集合是指object裡符合訂閱者的所有方法,例如DMLExecutionEventListener.listener(),DMLExecutionEventListener中可以有多個訂閱者,註解@Subscribe即可),
Multimap<Class>, EventSubscriber> methodsInListener =
finder.findAllSubscribers(object);
// 重入寫鎖保證執行緒安全
subscribersByTypeLock.writeLock().lock();
try {
// 把訂閱者資訊放到map中快取起來(釋出事件post()時就會用到)
subscribersByType.putAll(methodsInListener);
} finally {
subscribersByTypeLock.writeLock().unlock();
}
}
END