91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark2.x由淺入深深到底系列六之RDD java api用JdbcRDD讀取關系型數據庫

發布時間:2020-07-22 00:30:35 來源:網絡 閱讀:2629 作者:tangweiqun 欄目:大數據

學習任何的spark技術之前,請先正確理解spark,可以參考:正確理解spark



以下是用spark RDD java api實現從關系型數據庫中讀取數據,這里使用的是derby本地數據庫,當然可以是mysql或者oracle等關系型數據庫:

package com.twq.javaapi.java7;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;

import java.io.Serializable;
import java.sql.*;

public class JavaJdbcRDDSuite implements Serializable {

    public static void prepareData() throws ClassNotFoundException, SQLException {
        //使用本地數據庫derby,當然可以使用mysql等關系型數據庫
        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
        Connection connection =
                DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");

        try {
            //創建一張表FOO,ID是一個自增的主鍵,DATA是一個INTEGER列
            Statement create = connection.createStatement();
            create.execute(
                    "CREATE TABLE FOO(" +
                            "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
                            "DATA INTEGER)");
            create.close();

            //插入數據
            PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
            for (int i = 1; i <= 5; i++) {
                insert.setInt(1, i * 2);
                insert.executeUpdate();
            }
            insert.close();
        } catch (SQLException e) {
            // If table doesn't exist...
            if (e.getSQLState().compareTo("X0Y32") != 0) {
                throw e;
            }
        } finally {
            connection.close();
        }
    }

    public static void shutdownDB() throws SQLException {
        try {
            DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
        } catch (SQLException e) {
            // Throw if not normal single database shutdown
            // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
            if (e.getSQLState().compareTo("08006") != 0) {
                throw e;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");

        //準備數據
        prepareData();

        //構建JdbcRDD
        JavaRDD<Integer> rdd = JdbcRDD.create(
                sc,
                new JdbcRDD.ConnectionFactory() {
                    @Override
                    public Connection getConnection() throws SQLException {
                        return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
                    }
                },
                "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
                1, 5, 1,
                new Function<ResultSet, Integer>() {
                    @Override
                    public Integer call(ResultSet r) throws Exception {
                        return r.getInt(1);
                    }
                }
        );
        //結果: [2, 4, 6, 8, 10]
        System.out.println(rdd.collect());

        shutdownDB();

        sc.stop();
    }
}


詳細了解RDD的api的話,可以參考: spark core RDD api原理詳解

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

海兴县| 寿宁县| 江山市| 满城县| 松滋市| 上蔡县| 清流县| 博兴县| 开化县| 泾阳县| 景德镇市| 宁城县| 宣威市| 靖江市| 邮箱| 湖州市| 漾濞| 凤凰县| 玉门市| 通榆县| 盈江县| 和静县| 明水县| 海宁市| 新丰县| 修水县| 屏南县| 崇仁县| 上栗县| 乐平市| 南皮县| 忻州市| 常熟市| 克什克腾旗| 友谊县| 霍城县| 利津县| 泽普县| 林甸县| 临颍县| 阳城县|