歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—路由&執行

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

繼續以 sharding-jdbc-example-jdbc模組中的 com.dangdang.ddframe.rdb.sharding.example.jdbc.Main為基礎,剖析分庫分表簡單查詢SQL實現– printSimpleSelect(dataSource);,即如何執行簡單的查詢SQL,接下來的分析以執行SQL陳述句 "SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"為例;

單表查詢

MainprintSimpleSelect()方法呼叫 preparedStatement.executeQuery(),即呼叫ShardingPreparedStatement中的 executeQuery()方法,核心原始碼如下:

  1. @Override

  2. public ResultSet executeQuery() throws SQLException {

  3.    ResultSet result;

  4.    try {

  5.        // 核心方法route(),即解析SQL如何路由執行

  6.        Collection<PreparedStatementUnit> preparedStatementUnits = route();

  7.        // 根據路由資訊執行SQL

  8.        List<ResultSet> resultSets = new PreparedStatementExecutor(

  9.                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();

  10.        // 對傳回的結果進行merge合併

  11.        result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());

  12.    } finally {

  13.        clearBatch();

  14.    }

  15.    currentResultSet = result;

  16.    return result;

  17. }

透過上面的原始碼可知,SQL查詢兩個核心:路由和結果合併,接下來一一分析sharding-jdbc如何實現;

單表查詢之路由

接下來分析下麵這段程式碼是如何取得路由資訊的:

  1. Collection<PreparedStatementUnit> preparedStatementUnits = route();

route()核心原始碼如下:

  1. private Collection<PreparedStatementUnit> route() throws SQLException {

  2.    Collection<PreparedStatementUnit> result = new LinkedList<>();

  3.    // 呼叫PreparedStatementRoutingEngine中的route()方法,route()方法呼叫sqlRouter.route(logicSQL, parameters, sqlStatement)

  4.    routeResult = routingEngine.route(getParameters());

  5.    for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {

  6.        SQLType sqlType = routeResult.getSqlStatement().getType();

  7.        Collection<PreparedStatement> preparedStatements;

  8.        if (SQLType.DDL == sqlType) {

  9.            preparedStatements = generatePreparedStatementForDDL(each);

  10.        } else {

  11.            preparedStatements = Collections.singletonList(generatePreparedStatement(each));

  12.        }

  13.        routedStatements.addAll(preparedStatements);

  14.        for (PreparedStatement preparedStatement : preparedStatements) {

  15.            replaySetParameter(preparedStatement);

  16.            result.add(new PreparedStatementUnit(each, preparedStatement));

  17.        }

  18.    }

  19.    return result;

  20. }

SQLRouter介面有兩個實現類:DatabaseHintSQLRouter和ParsingSQLRouter,由於這裡沒有用hint語法強制執行某個庫,所以呼叫ParsingSQLRouter中的route()方法:

  1. private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {

  2.    Collection<String> tableNames = sqlStatement.getTables().getTableNames();

  3.    RoutingEngine routingEngine;

  4.    // 如果sql中只有一個表名,或者多個表名之間是系結表關係,或者所有表都在預設資料源指定的資料庫中(即不參與分庫分表的表),那麼用SimpleRoutingEngine作為路由判斷引擎;

  5.    if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {

  6.        routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);

  7.    } else {

  8.        routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);

  9.    }

  10.    return routingEngine.route();

  11. }

接下來分析一下SimpleRoutingEngine和ComplexRoutingEngine;

SimpleRoutingEngine

執行SQL: "SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"時,由於SQL中只有一個表(1 == tableNames.size()),所以路由引擎是SimpleRoutingEngineSimpleRoutingEngine.route()原始碼如下:

  1. @Override

  2. public RoutingResult route() {

  3.    // 根據邏輯表得到tableRule,邏輯表為t_order;表規則的配置為:.actualTables(Arrays.asList("t_order_0", "t_order_1")),所以有兩個實際表;

  4.    TableRule tableRule = shardingRule.getTableRule(logicTableName);

  5.    // 根據規則先路由資料源:即根據user_id取模路由

  6.    Collection<String> routedDataSources = routeDataSources(tableRule);

  7.    // routedMap儲存路由到的標的資料源和表的結果:key為資料源,value為該資料源下路由到的標的表集合

  8.    Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());

  9.    // 遍歷路由到的標的資料源

  10.    for (String each : routedDataSources) {

  11.        // 再根據規則路由表:即根據order_id取模路由

  12.        routedMap.put(each, routeTables(tableRule, each));

  13.    }

  14.    // 將得到的路由資料源和表資訊封裝到RoutingResult中,RoutingResult中有個TableUnits型別屬性,TableUnits類中有個List tableUnits屬性,TableUnit包含三個屬性:dataSourceName--資料源名稱,logicTableName--邏輯表名稱,actualTableName--實際表名稱,例如:TableUnit:{dataSourceName:ds_jdbc_1, logicTableName:t_order, actualTableName: t_order_1}

  15.    return generateRoutingResult(tableRule, routedMap);

  16. }

資料源路由詳細解讀:由於資料源的sharding策略為 databaseShardingStrategy(newDatabaseShardingStrategy("user_id",newModuloDatabaseShardingAlgorithm()));且where條件為 whereo.user_id=?AND o.order_id=?,即where條件中有userid,根據取模路由策略,當userid為奇數時,資料源為dsjdbc1;當userid為偶數時,資料源為dsjdbc0; 表路由詳細解讀:表的sharding策略為 tableShardingStrategy(newTableShardingStrategy("order_id",newModuloTableShardingAlgorithm())),即where條件中有orderid,根據取模路由策略,當orderid為奇數時,表為torder1;當orderid為偶數時,表為torder0; 綜上所述:最終需要執行的表數量為路由到的資料源個數路由到的實際表個數*;

實體1where o.orderid=1001 AND o.userid=10,userid=10所以路由得到資料源為dsjdbc0; orderid=1001,路由得到實際表為torder1;那麼最終只需在dsjdbc0這個資料源中的torder1表中執行即可; 實體2where o.orderid=1000,userid沒有值所以路由得到所有資料源dsjdbc0和dsjdbc1; orderid=1000,路由得到實際表為torder0;那麼最終需在dsjdbc0和dsjdbc1兩個資料源中的torder0表中執行即可; 實體3where o.userid=11,userid=11所以路由得到資料源為dsjdbc1; orderid沒有值所以路由得到實際表為torder0和torder1;那麼最終只需在dsjdbc1這個資料源中的torder0和torder1表中執行即可;

ComplexRoutingEngine

待定… …

單表查詢之執行

路由完成後就決定了SQL需要在哪些資料源的哪些實際表中執行,接下來以執行 SELECT o.*FROM t_order o whereo.user_id=10為例分析下麵這段Java程式碼sharding-jdbc是如何執行的:

根據前面的路由分析可知,這條SQL會路由到dsjdbc0這個資料源中,且在所有實際表([torder0, torder1])中執行這個SQL;

  1. List<ResultSet> resultSets = new PreparedStatementExecutor(

  2.                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();

執行的核心程式碼在ExecutorEngine中,核心原始碼如下:

  1. public <T> List<T> executePreparedStatement(

  2.        final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {

  3.    // preparedStatementUnits就是前面路由分析結果:執行SQL select o.* from t_order o where o.user_id=10時,只需在ds_jdbc_0這個資料源中的t_order_0和t_order_1兩個實際表中執行即可;

  4.    return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);

  5. }

  6. private  <T> List<T> execute(

  7.        final SQLType sqlType, final Collection extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {

  8.    if (baseStatementUnits.isEmpty()) {

  9.        return Collections.emptyList();

  10.    }

  11.    Iterator extends BaseStatementUnit> iterator = baseStatementUnits.iterator();

  12.    // 第一個任務分離出來

  13.    BaseStatementUnit firstInput = iterator.next();

  14.    // 除第一個任務之外的任務非同步執行

  15.    ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);

  16.    T firstOutput;

  17.    List<T> restOutputs;

  18.    try {

  19.        // 第一個任務同步執行[猜測是不是考慮到分庫分表後只需路由到一個資料源中的一個表的SQL執行效能問題,最佳化這種SQL執行為同步執行?分庫分表後,面向使用者的API佔用了99%的請求量,而這些API對應的SQL 99%只需要在一個資料源上的一個實際表執行即可,例如根據訂單表根據user_id分庫分表後,查詢使用者的訂單資訊這種場景]

  20.        firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);

  21.        // 取得其他非同步執行任務的結果

  22.        restOutputs = restFutures.get();

  23.        //CHECKSTYLE:OFF

  24.    } catch (final Exception ex) {

  25.        //CHECKSTYLE:ON

  26.        ExecutorExceptionHandler.handleException(ex);

  27.        return null;

  28.    }

  29.    List<T> result = Lists.newLinkedList(restOutputs);

  30.    // 將第一個任務同步執行結果與其他任務非同步執行結果合併就是最終的結果

  31.    result.add(0, firstOutput);

  32.    return result;

  33. }

非同步執行核心程式碼:

  1. private final ListeningExecutorService executorService;

  2. public ExecutorEngine(final int executorSize) {

  3.    // 非同步執行的執行緒池是透過google-guava封裝的執行緒池,設定了執行緒名稱為增加了ShardingJDBC-***,增加了shutdown hook--應用關閉時最多等待60秒直到所有任務完成,從而實現優雅停機

  4.    executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(

  5.            executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));

  6.    MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);

  7. }

  8. private <T> ListenableFuture<List<T>> asyncExecute(

  9.        final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {

  10.    // 構造一個存放非同步執行後的結果的list

  11.    List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());

  12.    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();

  13.    final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();

  14.    for (final BaseStatementUnit each : baseStatementUnits) {

  15.        // 執行緒池方式非同步執行所有SQL,執行緒池在ExecutorEngine的構造方法中初始化;

  16.        result.add(executorService.submit(new Callable<T>() {

  17.            @Override

  18.            public T call() throws Exception {

  19.                return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);

  20.            }

  21.        }));

  22.    }

  23.    // google-guava的方法--將所有非同步執行結果轉為list型別

  24.    return Futures.allAsList(result);

  25. }

同步執行核心程式碼:

  1. private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {

  2.        return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());

  3.    }

由同步執行核心程式碼和非同步執行核心程式碼可知,最終都是呼叫 executeInternal(),跟讀這個方法的原始碼可知:最終就是在標的資料庫表上執行 PreparedStatementexecute***()方法;且在執行前會利用google-guava的EventBus釋出BEFOREEXECUTE的事件(執行完成後,如果執行成功還會釋出EXECUTESUCCESS事件,如果執行失敗釋出EXECUTE_FAILURE事件),部分核心原始碼如下:

  1. // 釋出事件

  2. List<AbstractExecutionEvent> events = new LinkedList<>();

  3. if (parameterSets.isEmpty()) {

  4.    // 構造無參SQL的事件(事件型別為BEFORE_EXECUTE)

  5.    events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));

  6. }

  7. for (List<Object> each : parameterSets) {

  8.    // 構造有參SQL的事件(事件型別為BEFORE_EXECUTE)

  9.    events.add(getExecutionEvent(sqlType, baseStatementUnit, each));

  10. }

  11. // 呼叫google-guava的EventBus.post()提交事件

  12. for (AbstractExecutionEvent event : events) {

  13.    EventBusInstance.getInstance().post(event);

  14. }

  15. try {

  16.    // 執行SQL

  17.    result = executeCallback.execute(baseStatementUnit);

  18. } catch (final SQLException ex) {

  19.    // 如果執行過程中丟擲SQLException,即執行SQL失敗,那麼post一個EXECUTE_FAILURE型別的事件

  20.    for (AbstractExecutionEvent each : events) {

  21.        each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);

  22.        each.setException(Optional.of(ex));

  23.        EventBusInstance.getInstance().post(each);

  24.        ExecutorExceptionHandler.handleException(ex);

  25.    }

  26.    return null;

  27. }

  28. for (AbstractExecutionEvent each : events) {

  29.    // // 如果執行成功,那麼post一個EXECUTE_SUCCESS型別的事件

  30.    each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);

  31.    EventBusInstance.getInstance().post(each);

  32. }

接下來需要對並行執行後得到的結果集進行merge,下麵的sharding-jdbc原始碼分析系列文章繼續對其進行分析;

EventBus

說明:EventBus是google-guava提供的訊息釋出-訂閱類庫; google-guava的EventBus正確開啟姿勢:

  1. 釋出事務:呼叫EventBus的post()--sharding-jdbc中釋出事務:EventBusInstance.getInstance().post(each);

  2. 訂閱事務:呼叫EventBus的register()--sharding-jdbc中註冊事務:EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());

EventBusInstance原始碼如下--EventBus全類名為 com.google.common.eventbus.EventBus

  1. @NoArgsConstructor(access = AccessLevel.PRIVATE)

  2. public final class EventBusInstance {

  3.    private static final EventBus INSTANCE = new EventBus();

  4.    /**

  5.     * Get event bus instance.

  6.     *

  7.     * @return event bus instance

  8.     */

  9.    public static EventBus getInstance() {

  10.        return INSTANCE;

  11.    }

  12. }

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖