點選上方“Java技術驛站”,選擇“置頂公眾號”。
有內涵、有價值的文章第一時間送達!
在sharding-jdbc原始碼之結果合併中已經分析了OrderByStreamResultSetMerger、LimitDecoratorResultSetMerger、IteratorStreamResultSetMerger,檢視原始碼目錄下ResultSetMerger的實現類,只剩下GroupByMemoryResultSetMerger和GroupByStreamResultSetMerger兩個實現類的分析,接下來根據原始碼對兩者的實現進行剖析;
ResultSetMerge關係圖.png
如何選擇
GroupBy有兩個ResultSetMerge的實現:GroupByMemoryResultSetMerger和GroupByStreamResultSetMerger,那麼如何選擇呢?在MergeEngine中有一段這樣的程式碼:
private ResultSetMerger build() throws SQLException {
// 如果有group by或者聚合型別(例如sum, avg等)的SQL條件,就會選擇一個GroupBy***ResultSetMerger
if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
// isSameGroupByAndOrderByItems()原始碼緊隨其後
if (selectStatement.isSameGroupByAndOrderByItems()) {
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
} else {
return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
}
}
if (!selectStatement.getOrderByItems().isEmpty()) {
return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
}
return new IteratorStreamResultSetMerger(resultSets);
}
// 如果只有group by條件,沒有order by,那麼isSameGroupByAndOrderByItems()為true,例如:`SELECT o.* FROM t_order o where o.user_id=? group by o.order_id`(因為這種sql會被改寫為SELECT o.* , o.order_id AS GROUP_BY_DERIVED_0 FROM t_order_0 o where o.user_id=? group by o.order_id ORDER BY GROUP_BY_DERIVED_0 ASC,即group by和order by完全相同)
public boolean isSameGroupByAndOrderByItems() {
return !getGroupByItems().isEmpty() && getGroupByItems().equals(getOrderByItems());
}
由上段原始碼分析可知,如果只有group by條件,那麼選擇GroupByStreamResultSetMerger;那麼如果既有group by,又有order by,那麼就會選擇GroupByStreamResultSetMerger;
接下來分析GroupByStreamResultSetMerger中如何對結果進行group by聚合,假設資料源 js_jdbc_0
中實際表 t_order_0
和實際表 t_order_1
的資料如下:
order_id | user_id | status |
---|---|---|
1000 | 10 | INIT |
1002 | 10 | INIT |
1004 | 10 | VALID |
1006 | 10 | NEW |
1008 | 10 | INIT |
order_id | user_id | status |
---|---|---|
1001 | 10 | NEW |
1003 | 10 | NEW |
1005 | 10 | VALID |
1007 | 10 | INIT |
1009 | 10 | INIT |
GroupByStreamResultSetMerger
以執行SQL SELECT o.status,count(o.user_id)FROM t_order o whereo.user_id=10groupbyo.status
為例,分析GroupByStreamResultSetMerger,其部分原始碼如下:
public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {
... ...
public GroupByStreamResultSetMerger(
final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
// GroupByStreamResultSetMerger的父類是OrderByStreamResultSetMerger,所以呼叫super()就是呼叫OrderByStreamResultSetMerger的構造方法
super(resultSets, selectStatement.getOrderByItems());
// 標簽(列名)和位置索引的map關係,例如{order_id:1, status:3, user_id:2}
this.labelAndIndexMap = labelAndIndexMap;
// 執行的SQL陳述句
this.selectStatement = selectStatement;
currentRow = new ArrayList<>(labelAndIndexMap.size());
// 如果優先順序佇列不為空,表示where條件中有group by,將佇列中第一個元素的group值賦值給currentGroupByValues,即INIT(預設升序排列,所以INIT > NEW > VALID)
currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
...
}
備註:OrderByStreamResultSetMerger在5. sharding-jdbc原始碼之結果合併這篇文章中已經分析,不再贅述;
next()方法核心原始碼如下:
@Override
public boolean next() throws SQLException {
currentRow.clear();
// 如果優先順序佇列為空,表示沒有任何結果,那麼傳回false
if (getOrderByValuesQueue().isEmpty()) {
return false;
}
if (isFirstNext()) {
super.next();
}
// 集合的核心邏輯在這裡
if (aggregateCurrentGroupByRowAndNext()) {
currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
return true;
}
aggregateCurrentGroupByRowAndNext()實現如下:
private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
boolean result = false;
// selectStatement.getAggregationSelectItems()先得到select所有舉行型別的項,例如select count(o.user_id) ***中聚合項是count(o.user_id), 然後轉化成map,key就是聚合項即o.user_id,value就是集合unit實體即AccumulationAggregationUnit;即o.user_id的COUNT集合計算是透過AccumulationAggregationUnit實現的,下麵有對AggregationUnitFactory的分析
Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
@Override
public AggregationUnit apply(final AggregationSelectItem input) {
return AggregationUnitFactory.create(input.getType());
}
});
// 接下來準備聚合,如何group by的值相同,則進行聚合(因為SQL可能會在多個資料源以及多個實際表上執行)
while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
// 呼叫aggregate()方法進行䄦
aggregate(aggregationUnitMap);
cacheCurrentRow();
// 呼叫next()方法,實際呼叫OrderByStreamResultSetMerger中的next()方法,currentResultSet會指向下一個元素;
result = super.next();
// 如果還有值,那麼繼續遍歷
if (!result) {
break;
}
}
setAggregationValueToCurrentRow(aggregationUnitMap);
return result;
}
AggregationUnitFactory 原始碼如下:
public final class AggregationUnitFactory {
/**
* Create aggregation unit instance.
* 根據這段程式碼可知,select中MAX和MIN這種聚合查詢需要使用ComparableAggregationUnit,SUM和COUNT需要使用AccumulationAggregationUnit,AVG需要使用AverageAggregationUnit;(目前只支援這些聚合操作),
*/
public static AggregationUnit create(final AggregationType type) {
switch (type) {
case MAX:
return new ComparableAggregationUnit(false);
case MIN:
return new ComparableAggregationUnit(true);
case SUM:
case COUNT:
return new AccumulationAggregationUnit();
case AVG:
return new AverageAggregationUnit();
default:
throw new UnsupportedOperationException(type.name());
}
}
}
aggregate()原始碼如下:
private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {
for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
List<Comparable>> values = new ArrayList<>(2);
if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {
values.add(getAggregationValue(entry.getKey()));
} else {
for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) {
values.add(getAggregationValue(each));
}
}
// aggregate()的核心就是呼叫AggregationUnit具體實現中的merge()方法,即呼叫AccumulationAggregationUnit.merge()方法(後面會對AggregationUnit的各個實現進行分析)
entry.getValue().merge(values);
}
}
執行過程圖解
這一塊的程式碼邏輯稍微有點複雜,下麵透過示意圖分解執行過程,讓sharding-jdbc執行group by整個過程更加清晰: step1. SQL執行 首先在兩個實際表 t_order_0
和 t_order_1
中分別執行SQL: SELECT o.status,count(o.user_id)FROM t_order o whereo.user_id=10groupbyo.status
, t_order_0
和 t_order_1
分別得到如下的結果:
status | count(o.user_id) |
---|---|
INIT | 3 |
NEW | 1 |
VALID | 1 |
status | count(o.user_id) |
---|---|
INIT | 2 |
NEW | 2 |
VALID | 1 |
step2. 執行super(*) 即在GroupByStreamResultSetMerger中呼叫OrderByStreamResultSetMerger的構造方法 super(resultSets,selectStatement.getOrderByItems());
,從而得到優先順序佇列,如下圖所示的第一張圖,優先順序中包含兩個元素[(INIT, 3), (INIT 2)]:
powered by afei.png
-
先聚合計算(INIT,3)和(INIT,2),由於NEW和INIT不相等,進行下一輪聚合計算;
-
再聚合計算(NEW,1)和(NEW,2),由於VALID和NEW不相等,進行下一輪聚合計算;
-
再聚合計算(VALID,1)和(VALID,1),兩者的next()為false,聚合計算完成;
step3. aggregationUnitMap 透過轉換得到aggregationUnitMap,key就是count(user_id),value就是COUNT聚合計算的AggregationUnit實現,即AccumulationAggregationUnit;
由於select陳述句中只有COUNT(o.userid涉及到聚合執行,所以這個map的size為1,且key是count(userid);如果SQL是
SELECT o.status,count(o.user_id),max(order_id)FROM t_order o whereo.user_id=?groupbyo.status
,那麼aggregationUnitMap的size為2,且第一個entry的key是count(userid),value是AccumulationAggregationUnit;第二個entry的key是max(orderid),value是ComparableAggregationUnit;
step4. 迴圈遍歷並merge 核心程式碼如下,即將(INIT, 3)和(INIT, 2)透過呼叫AccumulationAggregationUnit中的merge方法,從而得到(INIT, 5)。同樣的原因呼叫AccumulationAggregationUnit中的merge方法merge(NEW, 1)和(NEW, 2),從而得到(NEW, 3);merge(VALID, 1)和(VALID, 1),從而得到(VALID, 2)。所以,最終的結果就是[(INIT, 5), (NEW, 3), (VALID, 2)]
aggregate(aggregationUnitMap);
cacheCurrentRow();
result = super.next();
if (!result) {
break;
}
}
AggregationUnit
AggregationUnit即聚合計算介面,總計有三個實現類AccumulationAggregationUnit,ComparableAggregationUnit和AverageAggregationUnit,接下來分別對其簡單介紹;
AccumulationAggregationUnit
實現原始碼如下,SUN和COUNT兩個聚合計算都是用這個AggregationUnit實現,核心實現就是累加:
@Override
public void merge(final List<Comparable>> values) {
if (null == values || null == values.get(0)) {
return;
}
if (null == result) {
result = new BigDecimal("0");
}
// 核心實現程式碼:累加
result = result.add(new BigDecimal(values.get(0).toString()));
log.trace("Accumulation result: {}", result.toString());
}
ComparableAggregationUnit
實現原始碼如下,MAX和MIN兩個聚合計算都是用這個AggregationUnit實現,核心實現就是比較:
@Override
public void merge(final List<Comparable>> values) {
if (null == values || null == values.get(0)) {
return;
}
if (null == result) {
result = values.get(0);
log.trace("Comparable result: {}", result);
return;
}
// 新的值與舊的值比較大小
int comparedValue = ((Comparable) values.get(0)).compareTo(result);
// 升序和降序比較方式不同(max聚合計算時asc為false,min聚合計算時asc為true),min聚合計算時找一個更小的值(asc && comparedValue < 0),max聚合計算時找一個更大的值(!asc && comparedValue > 0)
if (asc && comparedValue < 0 || !asc && comparedValue > 0) {
result = values.get(0);
log.trace("Comparable result: {}", result);
}
}
AverageAggregationUnit
實現原始碼如下,AVG聚合計算就是用的這個AggregationUnit實現,核心實現是將AVG轉化後的SUM/COUNT,累加得到總SUM和總COUNT相除就是最終的AVG結果;
@Override
public void merge(final List<Comparable>> values) {
if (null == values || null == values.get(0) || null == values.get(1)) {
return;
}
if (null == count) {
count = new BigDecimal("0");
}
if (null == sum) {
sum = new BigDecimal("0");
}
// COUNT累加
count = count.add(new BigDecimal(values.get(0).toString()));
// SUM累加
sum = sum.add(new BigDecimal(values.get(1).toString()));
log.trace("AVG result COUNT: {} SUM: {}", count, sum);
}
END