您好,登錄后才能下訂單哦!
sharding中怎么執行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
內存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的數據庫連接數量不做限制。如果實際執行的SQL需要對某數據庫實例中的200張表做操作,則對每張表創建一個新的數據庫連接,并通過多線程的方式并發處理,以達成執行效率最大化。并且在SQL滿足條件情況下,優先選擇流式歸并,以防止出現內存溢出或避免頻繁垃圾回收情況
連接限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的數據庫連接數量。如果實際執行的SQL需要對某數據庫實例中的200張表做操作,那么只會創建唯一的數據庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數據庫,仍然采用多線程處理對不同庫的操作,但每個庫的每次操作仍然只創建一個唯一的數據庫連接。這樣即可以防止對一次請求對數據庫連接占用過多所帶來的問題。該模式始終選擇內存歸并
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執行sql,根據分片改寫后的sql,應該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執行
1.初始化PreparedStatementExecutor#init,封裝Statement執行單元
public final class PreparedStatementExecutor extends AbstractStatementExecutor { @Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor( final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) { super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys; } /** * Initialize executor. * * @param routeResult route result * @throws SQLException SQL exception */ public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement()); //添加路由單元,即數據源對應的sql單元 getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); //緩存statement、參數 cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException { //執行封裝Statement執行單元 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); } @SuppressWarnings("MagicConstant") private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException { return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); } ... ... }
2.執行封裝Statement執行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor public final class SQLExecutePrepareTemplate { private final int maxConnectionsSizePerQuery; /** * Get execute unit groups. * * @param routeUnits route units * @param callback SQL execute prepare callback * @return statement execute unit groups * @throws SQLException SQL exception */ public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(routeUnits, callback); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups( final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { //數據源對應sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?] Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits); Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) { //添加分片執行組 result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) { Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1); for (RouteUnit each : routeUnits) { if (!result.containsKey(each.getDataSourceName())) { result.put(each.getDataSourceName(), new LinkedList<SQLUnit>()); } result.get(each.getDataSourceName()).add(each.getSqlUnit()); } return result; } private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups( final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); //在maxConnectionSizePerQuery允許的范圍內,當一個連接需要執行的請求數量大于1時,意味著當前的數據庫連接無法持有相應的數據結果集,則必須采用內存歸并; //反之,當一個連接需要執行的請求數量等于1時,意味著當前的數據庫連接可以持有相應的數據結果集,則可以采用流式歸并 //TODO 場景:在不分庫只分表的情況下,會存在一個數據源對應多個sql單元的情況 //計算所需要的分區大小 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); //按照分區大小進行分區 //事例: //sqlUnits = [1, 2, 3, 4, 5] //desiredPartitionSize = 2 //則結果為:[[1, 2], [3,4], [5]] List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); //maxConnectionsSizePerQuery該參數表示一次查詢時每個數據庫所允許使用的最大連接數 //根據maxConnectionsSizePerQuery來判斷使用連接模式 //CONNECTION_STRICTLY 連接限制模式 //MEMORY_STRICTLY 內存限制模式 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; //獲取分區大小的連接 List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; //遍歷分區,將分區好的sql單元放到指定連接執行 for (List<SQLUnit> each : sqlUnitPartitions) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; } private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException { List<StatementExecuteUnit> result = new LinkedList<>(); //遍歷sql單元 for (SQLUnit each : sqlUnitGroup) { //回調,創建statement執行單元 result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode)); } //封裝成分片執行組 return new ShardingExecuteGroup<>(result); } }
1.執行查詢sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... /** * Execute query. * * @return result set list * @throws SQLException SQL exception */ public List<QueryResult> executeQuery() throws SQLException { //獲取當前是否異常值 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); //創建回調實例 //執行SQLExecuteCallback的execute方法 SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(statement, connectionMode); } }; return executeCallback(executeCallback); } ... ... protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException { List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); //執行完后刷新分片元數據,比如創建表、修改表etc. refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement); return result; } ... ... }
2.通過線程池分組執行,并回調callback
@RequiredArgsConstructor public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> { //數據庫類型 private final DatabaseType databaseType; //是否異常 private final boolean isExceptionThrown; @Override public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException { Collection<T> result = new LinkedList<>(); //遍歷statement執行單元 for (StatementExecuteUnit each : statementExecuteUnits) { //執行添加返回結果T result.add(execute0(each, isTrunkThread, shardingExecuteDataMap)); } return result; } private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException { //設置當前線程是否異常 ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); //根據url獲取數據源元數據 DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL()); //sql執行鉤子 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook(); try { sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap); //執行sql T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode()); sqlExecutionHook.finishSuccess(); return result; } catch (final SQLException ex) { sqlExecutionHook.finishFailure(ex); ExecutorExceptionHandler.handleException(ex); return null; } } protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException; }
3.執行executeSQL,調用第三步的callback中的executeSQL,封裝ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException { PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery(); ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule(); //緩存resultSet getResultSets().add(resultSet); //判斷ConnectionMode //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內存MemoryQueryResult return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) : new MemoryQueryResult(resultSet, shardingRule); } ... ... }
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。