您好,登錄后才能下訂單哦!
以前都是使用Sqoop來完成數據從生成的hdfs數據存儲上來抽取至oracle的數據庫:sqoop抽取語句:
sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用戶名 --password 密碼 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;
由于項目需求我們現在要完成在代碼中省城所需字段之后,直接回寫到oracle中,因為數據量每天都很大,用實例或者List存有很大的局限性,可能會出現內存異常等不可預料的東西,所以我通過緩存器機制來存儲數據,然后進行生成結果的臨時表直接回寫(后面做的hbase接口封裝批量提交也比較類似)
廢話不多說直接上代碼:
1、建立緩存實體
package usi.java.oracle;
/**
@date 2011-2-15 下午06:45:57
*/
public class Cache {
private String key;
private Object value;
private long timeOut;
private boolean expired;
public Cache() {
super();
}
public Cache(String key, String value, long timeOut, boolean expired) {
this.key = key;
this.value = value;
this.timeOut = timeOut;
this.expired = expired;
}
public String getKey() {
return key;
}
public long getTimeOut() {
return timeOut;
}
public Object getValue() {
return value;
}
public void setKey(String string) {
key = string;
}
public void setTimeOut(long l) {
timeOut = l;
}
public void setValue(Object object) {
value = object;
}
public boolean isExpired() {
return expired;
}
public void setExpired(boolean b) {
expired = b;
}
}
2、建立緩存控制器
package usi.java.oracle;
import java.util.Date;
import java.util.HashMap;
/**
@date 2011-2-15 下午09:40:00
*/
public class CacheManager {
private static HashMap cacheMap = new HashMap();
/**
This class is singleton so private constructor is used.
*/
private CacheManager() {
super();
}
/**
@return Cache
*/
private synchronized static Cache getCache(String key) {
return (Cache)cacheMap.get(key);
}
/**
@return Cache
*/
private synchronized static boolean hasCache(String key) {
return cacheMap.containsKey(key);
}
/**
Invalidates all cache
*/
public synchronized static void invalidateAll() {
cacheMap.clear();
}
/**
@param key
*/
public synchronized static void invalidate(String key) {
cacheMap.remove(key);
}
/**
@return Cache
*/
private synchronized static void putCache(String key, Cache object) {
cacheMap.put(key, object);
}
/**
@return
*/
public static Cache getContent(String key) {
if (hasCache(key)) {
Cache cache = getCache(key);
if (cacheExpired(cache)) {
cache.setExpired(true);
}
return cache;
} else {
return null;
}
}
/**
@param ttl
*/
public static void putContent(String key, Object content, long ttl) {
Cache cache = new Cache();
cache.setKey(key);
cache.setValue(content);
cache.setTimeOut(ttl + new Date().getTime());
cache.setExpired(false);
putCache(key, cache);
}
/* @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} /
private static boolean cacheExpired(Cache cache) {
if (cache == null) {
return false;
}
long milisNow = new Date().getTime();
long milisExpire = cache.getTimeOut();
if (milisExpire < 0) { // Cache never expires
return false;
} else if (milisNow >= milisExpire) {
return true;
} else {
return false;
}
}
}
3、建立需要導出數據對象
package usi.java.oracle;
public class TaskAll {
private String mme_eid;
private String mme_editor;
private String entitytype_eid;
private String project_eid;
private String resource_eid;
public String getMme_eid() {
return mme_eid;
}
public void setMme_eid(String mme_eid) {
this.mme_eid = mme_eid;
}
public String getMme_editor() {
return mme_editor;
}
public void setMme_editor(String mme_editor) {
this.mme_editor = mme_editor;
}
public String getEntitytype_eid() {
return entitytype_eid;
}
public void setEntitytype_eid(String entitytype_eid) {
this.entitytype_eid = entitytype_eid;
}
public String getProject_eid() {
return project_eid;
}
public void setProject_eid(String project_eid) {
this.project_eid = project_eid;
}
public String getResource_eid() {
return resource_eid;
}
public void setResource_eid(String resource_eid) {
this.resource_eid = resource_eid;
}
}
5、執行邏輯主體,回寫數據,批量提交
package usi.java.oracle;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//import java.sql.ResultSet;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
public class redict_to_171ora {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("redict_to_171ora");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";
DataFrame redict_to_171ora= hc.sql(hivesql1);
//redict_to_171ora.registerTempTable("hivesql1");
List<Row> collect=redict_to_171ora.javaRDD().collect();
int o=0;
for (Row lists: collect){
TaskAll task=new TaskAll();
task.setMme_eid(lists.getString(0));
task.setMme_editor(lists.getString(1));
task.setEntitytype_eid(lists.getString(2));
task.setProject_eid(lists.getString(3));
task.setResource_eid(lists.getString(4));
CacheManager.putContent(o+"", task, 30000000);
o++;
/* System.out.println(lists.size());
System.out.println(lists.getString(0));
System.out.println(lists.getString(1));
System.out.println(lists.getString(2));
System.out.println(lists.getString(3));
System.out.println(lists.getString(4));*/
}
System.out.println(o);
Connection con = null;// 創建一個數據庫連接
PreparedStatement pre = null;// 創建預編譯語句對象,一般都是用這個而不用Statement
//ResultSet result = null;// 創建一個結果集對象
try
{
Class.forName("oracle.jdbc.driver.OracleDriver");// 加載Oracle驅動程序
System.out.println("開始嘗試連接數據庫!");
String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本機地址,XE是精簡版Oracle的默認數據庫名
String user = "user";// 用戶名,系統默認的賬戶名
String password = "password";// 你安裝時選設置的密碼
con = DriverManager.getConnection(url, user, password);// 獲取連接
System.out.println("連接成功!");
String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 預編譯語句,“?”代表參數
pre = con.prepareStatement(sql);// 實例化預編譯語句
for(int i=0;i<o;i++){
// for (Row lists: collect){
// String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values('"+task.getMme_eid()+"','"+task.getMme_editor()+"','"+task.getEntitytype_eid()+"','"+task.getProject_eid()+"','"+task.getResource_eid()+"')";// 預編譯語句,“?”代表參數
// pre.setString(1, "三星");// 設置參數,前面的1表示參數的索引,而不是表中列名的索引
TaskAll task=(TaskAll) CacheManager.getContent(""+i).getValue();
pre.setString(1, task.getMme_eid());
pre.setString(2, task.getMme_editor());
pre.setString(3, task.getEntitytype_eid());
pre.setString(4, task.getProject_eid());
pre.setString(5, task.getResource_eid());
pre.addBatch();
if(i%20000==0){//可以設置不同的大小;如50,100,500,1000等等
pre.executeBatch();
con.commit();
pre.clearBatch();
// System.out.println("i的值"+i);
}
// result = pre.executeQuery();// 執行查詢,注意括號中不需要再加參數
}
pre.executeBatch();
con.commit();
pre.clearBatch();
// System.out.println("i的值"+i);
/* if (result != null)
result.close();*/
if (pre != null)
pre.close();
/* while (result.next())
// 當結果集不為空時
System.out.println("usernum:" + result.getString("usernum") + "flow:"
+ result.getString("flow"));*/
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
// 逐一將上面的幾個對象關閉,因為不關閉的話會影響性能、并且占用資源
// 注意關閉的順序,最后使用的最先關閉
/* if (result != null)
result.close();*/
if (pre != null)
pre.close();
if (con != null)
con.close();
//System.out.println("數據庫連接已關閉!");
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。