點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
Hmily框架特性[https://github.com/yu199195/hmily]
-
無縫整合Spring,Spring boot start。
-
縫整合Dubbo,SpringCloud,Motan等rpc框架。
-
多種事務日誌的儲存方式(redis,mongdb,mysql等)。
-
多種不同日誌序列化方式(Kryo,protostuff,hession)。
-
事務自動恢復。
-
支援內嵌事務的依賴傳遞。
-
程式碼零侵入,配置簡單靈活。
Hmily為什麼這麼高效能?
1.採用disruptor進行事務日誌的非同步讀寫(disruptor是一個無鎖,無GC的併發程式設計框架)
package com.hmily.tcc.core.disruptor.publisher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory;
import com.hmily.tcc.core.disruptor.handler.HmilyConsumerDataHandler;
import com.hmily.tcc.core.disruptor.translator.HmilyTransactionEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* event publisher.
*
* @author xiaoyu(Myth)
*/
@Component
public class HmilyTransactionEventPublisher implements DisposableBean {
private Disruptor disruptor;
private final CoordinatorService coordinatorService;
@Autowired
public HmilyTransactionEventPublisher(final CoordinatorService coordinatorService) {
this.coordinatorService = coordinatorService;
}
/**
* disruptor start.
*
* @param bufferSize this is disruptor buffer size.
* @param threadSize this is disruptor consumer thread size.
*/
public void start(final int bufferSize, final int threadSize) {
disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, r -> {
AtomicInteger index = new AtomicInteger(1);
return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
}, ProducerType.MULTI, new BlockingWaitStrategy());
final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
HmilyThreadFactory.create("hmily-log-disruptor", false),
new ThreadPoolExecutor.AbortPolicy());
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
for (int i = 0; i consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
}
/**
* publish disruptor event.
*
* @param tccTransaction {@linkplain com.hmily.tcc.common.bean.entity.TccTransaction }
* @param type {@linkplain EventTypeEnum}
*/
public void publishEvent(final TccTransaction tccTransaction, final int type) {
final RingBuffer ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), tccTransaction);
}
@Override
public void destroy() {
disruptor.shutdown();
}
}
在這裡bufferSize 的預設值是4094 * 4,使用者可以根據自行的情況進行配置。
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
for (int i = 0; i consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
}
disruptor.handleEventsWithWorkerPool(consumers);
這裡是採用多個消費者去處理佇列裡面的任務。
2.非同步執行confrim,cancel方法。
package com.hmily.tcc.core.service.handler;
import com.hmily.tcc.common.bean.context.TccTransactionContext;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.TccActionEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.service.HmilyTransactionHandler;
import com.hmily.tcc.core.service.executor.HmilyTransactionExecutor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* this is transaction starter.
*
* @author xiaoyu
*/
@Component
public class StarterHmilyTransactionHandler implements HmilyTransactionHandler {
private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() <1;
private final HmilyTransactionExecutor hmilyTransactionExecutor;
private final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
HmilyThreadFactory.create("hmily-execute", false),
new ThreadPoolExecutor.AbortPolicy());
@Autowired
public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor) {
this.hmilyTransactionExecutor = hmilyTransactionExecutor;
}
@Override
public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context)
throws Throwable {
Object returnValue;
try {
TccTransaction tccTransaction = hmilyTransactionExecutor.begin(point);
try {
//execute try
returnValue = point.proceed();
tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
hmilyTransactionExecutor.updateStatus(tccTransaction);
} catch (Throwable throwable) {
//if exception ,execute cancel
final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor
.cancel(currentTransaction));
throw throwable;
}
//execute confirm
final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
} finally {
hmilyTransactionExecutor.remove();
}
return returnValue;
}
}
當try方法的AOP切麵有異常的時候,採用執行緒池非同步去執行cancel,無異常的時候去執行confrim方法。
這裡有人可能會問:那麼cancel方法異常,或者confrim方法異常怎麼辦呢?
答:首先這種情況是非常罕見的,因為你上一面才剛剛執行完try。其次如果出現這種情況,在try階段會儲存好日誌,Hmily有內建的排程執行緒池來進行恢復,不用擔心。
有人又會問:這裡如果日誌儲存異常了怎麼辦?
答:首先這又是一個牛角尖問題,首先日誌配置的引數,在框架啟動的時候,會要求你配置的。其次,就算在執行過程中日誌儲存異常,這時候框架會取快取中的,並不會影響程式正確執行。最後,萬一日誌儲存異常了,系統又在很極端的情況下down機了,恭喜你,你可以去買彩票了,最好的解決辦法就是不去解決它。
3.ThreadLocal快取的使用。
/**
* transaction begin.
*
* @param point cut point.
* @return TccTransaction
*/
public TccTransaction begin(final ProceedingJoinPoint point) {
LogUtil.debug(LOGGER, () -> "......hmily transaction!start....");
//build tccTransaction
final TccTransaction tccTransaction = buildTccTransaction(point, TccRoleEnum.START.getCode(), null);
//save tccTransaction in threadLocal
CURRENT.set(tccTransaction);
//publishEvent
hmilyTransactionEventPublisher.publishEvent(tccTransaction, EventTypeEnum.SAVE.getCode());
//set TccTransactionContext this context transfer remote
TccTransactionContext context = new TccTransactionContext();
//set action is try
context.setAction(TccActionEnum.TRYING.getCode());
context.setTransId(tccTransaction.getTransId());
context.setRole(TccRoleEnum.START.getCode());
TransactionContextLocal.getInstance().set(context);
return tccTransaction;
}
首先要理解,threadLocal儲存的發起者一方法的事務資訊。這個很重要,不要會有點懵逼。rpc的呼叫,會形成呼叫鏈,進行儲存。
/**
* add participant.
*
* @param participant {@linkplain Participant}
*/
public void enlistParticipant(final Participant participant) {
if (Objects.isNull(participant)) {
return;
}
Optional.ofNullable(getCurrentTransaction())
.ifPresent(c -> {
c.registerParticipant(participant);
updateParticipant(c);
});
}
5.GuavaCache的使用
package com.hmily.tcc.core.cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.helper.SpringBeanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* use google guava cache.
* @author xiaoyu
*/
public final class TccTransactionCacheManager {
private static final int MAX_COUNT = 10000;
private static final LoadingCache LOADING_CACHE =
CacheBuilder.newBuilder().maximumWeight(MAX_COUNT)
.weigher((Weigher) (string, tccTransaction) -> getSize())
.build(new CacheLoader() {
@Override
public TccTransaction load(final String key) {
return cacheTccTransaction(key);
}
});
private static CoordinatorService coordinatorService = SpringBeanUtils.getInstance().getBean(CoordinatorService.class);
private static final TccTransactionCacheManager TCC_TRANSACTION_CACHE_MANAGER = new TccTransactionCacheManager();
private TccTransactionCacheManager() {
}
/**
* TccTransactionCacheManager.
*
* @return TccTransactionCacheManager
*/
public static TccTransactionCacheManager getInstance() {
return TCC_TRANSACTION_CACHE_MANAGER;
}
private static int getSize() {
return (int) LOADING_CACHE.size();
}
private static TccTransaction cacheTccTransaction(final String key) {
return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
}
/**
* cache tccTransaction.
*
* @param tccTransaction {@linkplain TccTransaction}
*/
public void cacheTccTransaction(final TccTransaction tccTransaction) {
LOADING_CACHE.put(tccTransaction.getTransId(), tccTransaction);
}
/**
* acquire TccTransaction.
*
* @param key this guava key.
* @return {@linkplain TccTransaction}
*/
public TccTransaction getTccTransaction(final String key) {
try {
return LOADING_CACHE.get(key);
} catch (ExecutionException e) {
return new TccTransaction();
}
}
/**
* remove guava cache by key.
* @param key guava cache key.
*/
public void removeByKey(final String key) {
if (StringUtils.isNotEmpty(key)) {
LOADING_CACHE.invalidate(key);
}
}
}
在參與者中,我們使用了ThreadLocal,而在參與者中,我們為什麼不使用呢?
其實原因有二點:首先.因為try,和confrim 會不在一個執行緒裡,會造成ThreadLocal失效。當考慮到RPC叢集的時候,可能會負載到不同的機器上。這裡有一個細節就是:
private static TccTransaction cacheTccTransaction(final String key) {
return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
}
當GuavaCache裡面沒有的時候,會去查詢日誌傳回,這樣就保證了對叢集環境的支援。
以上5點早就了Hmily是一個非同步的高效能分散式事務TCC框架的原因。
Hmily如何使用?(https://github.com/yu199195/hmily/tree/master/hmily-tcc-demo)
首先因為之前的包命名問題,框架包並沒有上傳到maven中心倉庫,固需要使用者自己拉取程式碼,編譯deploy到自己的私服。
1.dubbo使用者
-
在你的Api介面專案引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-annotationartifactId>
<version>{you version}version>
dependency>
-
在你的服務提供者專案引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-dubboartifactId>
<version>{you version}version>
dependency>
-
配置啟動bean
<aop:aspectj-autoproxy expose-proxy="true"/>
<context:component-scan base-package="com.hmily.tcc.*"/>
<bean id="hmilyTransactionBootstrap" class="com.hmily.tcc.core.bootstrap.HmilyTransactionBootstrap">
<property name="serializer" value="kryo"/>
<property name="recoverDelayTime" value="120"/>
<property name="retryMax" value="3"/>
<property name="scheduledDelay" value="120"/>
<property name="scheduledThreadMax" value="4"/>
<property name="repositorySupport" value="db"/>
<property name="tccDbConfig">
<bean class="com.hmily.tcc.common.config.TccDbConfig">
<property name="url"
value="jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&characterEncoding=utf8"/>
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
bean>
property>
bean>
當然配置屬性很多,這裡我只給出了demo,具體可以參考這個類:
package com.hmily.tcc.common.config;
import com.hmily.tcc.common.enums.RepositorySupportEnum;
import lombok.Data;
/**
* hmily config.
*
* @author xiaoyu
*/
@Data
public class TccConfig {
/**
* Resource suffix this parameter please fill in about is the transaction store path.
* If it's a table store this is a table suffix, it's stored the same way.
* If this parameter is not filled in, the applicationName of the application is retrieved by default
*/
private String repositorySuffix;
/**
* log serializer.
* {@linkplain com.hmily.tcc.common.enums.SerializeEnum}
*/
private String serializer = "kryo";
/**
* scheduledPool Thread size.
*/
private int scheduledThreadMax = Runtime.getRuntime().availableProcessors() <1;
/**
* scheduledPool scheduledDelay unit SECONDS.
*/
private int scheduledDelay = 60;
/**
* retry max.
*/
private int retryMax = 3;
/**
* recoverDelayTime Unit seconds
* (note that this time represents how many seconds after the local transaction was created before execution).
*/
private int recoverDelayTime = 60;
/**
* Parameters when participants perform their own recovery.
* 1.such as RPC calls time out
* 2.such as the starter down machine
*/
private int loadFactor = 2;
/**
* repositorySupport.
* {@linkplain RepositorySupportEnum}
*/
private String repositorySupport = "db";
/**
* disruptor bufferSize.
*/
private int bufferSize = 4096 * 2 * 2;
/**
* this is disruptor consumerThreads.
*/
private int consumerThreads = Runtime.getRuntime().availableProcessors() <1;
/**
* db config.
*/
private TccDbConfig tccDbConfig;
/**
* mongo config.
*/
private TccMongoConfig tccMongoConfig;
/**
* redis config.
*/
private TccRedisConfig tccRedisConfig;
/**
* zookeeper config.
*/
private TccZookeeperConfig tccZookeeperConfig;
/**
* file config.
*/
private TccFileConfig tccFileConfig;
}
2.SpringCloud使用者
-
需要引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-springcloudartifactId>
<version>{you version}version>
dependency>
-
配置啟動bean 如上。
2.Motan使用者
-
需要引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-motanartifactId>
<version>{you version}version>
dependency>
-
配置啟動bean 如上。
hmily-spring-boot-start
-
那這個就更容易了,只需要根據你的RPC框架去引入不同的jar包。
-
如果你是dubbo使用者,那麼引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-spring-boot-starter-dubboartifactId>
<version>${your version}version>
dependency>
-
如果你是SpringCloud使用者,那麼引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-spring-boot-starter-springcloudartifactId>
<version>${your version}version>
dependency>
-
如果你是Motan使用者,那麼引入
<dependency>
<groupId>com.hmily.tccgroupId>
<artifactId>hmily-tcc-spring-boot-starter-motanartifactId>
<version>${your version}version>
dependency>
-
然後在你的yml裡面進行如下配置:
hmily:
tcc :
serializer : kryo
recoverDelayTime : 128
retryMax : 3
scheduledDelay : 128
scheduledThreadMax : 10
repositorySupport : db
tccDbConfig :
driverClassName : com.mysql.jdbc.Driver
url : jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&characterEncoding=utf8
username : root
password : 123456
#repositorySupport : redis
#tccRedisConfig:
#masterName: mymaster
#sentinel : true
#sentinelUrl : 192.168.1.91:26379;192.168.1.92:26379;192.168.1.93:26379
#password : foobaredbbexONE123
# repositorySupport : zookeeper
# host : 92.168.1.73:2181
# sessionTimeOut : 100000
# rootPath : /tcc
# repositorySupport : mongodb
# mongoDbUrl : 192.168.1.68:27017
# mongoDbName : happylife
# mongoUserName : xiaoyu
# mongoUserPwd : 123456
# repositorySupport : file
# path : /account
# prefix : account
就這麼簡單,然後就可以在介面方法上加上@Tcc註解,進行愉快的使用了。
當然因為篇幅問題,很多東西只是簡單的描述,尤其是邏輯方面的。
如果你感興趣,可以在github上進行star和fork,也可以加微信和QQ群進行交流。
下麵是github地址:https://github.com/yu199195/hmily
最後再次感謝大家,如果有興趣的朋友,可以提供你的優秀牛逼轟轟的PR。。。。
如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢:
目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:
01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽
05. 拓展機制 SPI
06. 執行緒池
07. 服務暴露 Export
08. 服務取用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務呼叫 Invoke
13. 呼叫特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 叢集容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
… 一共 69+ 篇
目前在知識星球更新了《Netty 原始碼解析》目錄如下:
01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap
05. 事件輪詢 EventLoop
06. 通道管道 ChannelPipeline
07. 通道 Channel
08. 位元組緩衝區 ByteBuf
09. 通道處理器 ChannelHandler
10. 編解碼 Codec
11. 工具類 Util
… 一共 61+ 篇
目前在知識星球更新了《資料庫物體設計》目錄如下:
01. 商品模組
02. 交易模組
03. 營銷模組
04. 公用模組
… 一共 17+ 篇