您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關網頁主動探測中的NIO優化是怎樣的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
建表語句:
CREATE SEQUENCE seq_probe_id INCREMENT BY 1 START WITH 1 NOMAXvalue NOCYCLE CACHE 2000;
create table probe(
host varchar(40) not null,
state int not null,
type varchar(10) not null,
) ;
使用NIO優化這個程序,進一步壓榨資源使用率,已經想了好長時間了
無奈NIO+多線程,網上例子都不是很靠譜.自己學的也非常頭疼,一拖就是一年多.
新的程序,采用三段過程
首先 使用一個線程池不斷的發送連接請求,但是不處理接收.僅僅注冊一個SelectionKey.OP_READ的鍵
另外的一個單線程 程序,不斷select符合條件的通道,然后分配給另外一個線程池,用于接收數據,解析數據.(接收和解析的過程合并了)
最后,使用一個單線程的程序,不斷的把結果通過批量的方式刷入數據庫.這塊也算一個優化.由單條Insert改為批量入庫.這塊至少節約了一個CPU核的處理能力.
持久化過程和解析過程 基本復用了原來的代碼
<ol start="1" class="dp-j" white-space:normal;"="">
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Probe {
private static final int REQUESTTHREADCOUNT = 10;
private static final BlockingQueue CONNECTLIST = new LinkedBlockingQueue();
private static final BlockingQueue PERSISTENCELIST = new LinkedBlockingQueue();
private static ExecutorService REQUESTTHREADPOOL;
private static ExecutorService RESPONSETHREADPOOL;
private static ExecutorService PERSISTENCETHREADPOOL;
private static final List DOMAINLIST = new CopyOnWriteArrayList<>();
private static Selector SELECTOR;
static {
REQUESTTHREADPOOL = Executors.newFixedThreadPool(REQUESTTHREADCOUNT);
RESPONSETHREADPOOL = Executors.newFixedThreadPool(3);
PERSISTENCETHREADPOOL = Executors.newFixedThreadPool(1);
DOMAINLIST.add("news.163.com");
try {
SELECTOR = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
CONNECTLIST.put(new Task("news.163.com", 80, "/index.html"));
for (int i = 0; i < REQUESTTHREADCOUNT; i++) {
REQUESTTHREADPOOL.submit(new RequestHandler(CONNECTLIST, SELECTOR));
}
RESPONSETHREADPOOL
.submit(new ResponseHandler(SELECTOR, CONNECTLIST, PERSISTENCELIST, DOMAINLIST, RESPONSETHREADPOOL));
PERSISTENCETHREADPOOL.submit(new PersistenceHandler(PERSISTENCELIST));
while (true) {
Thread.sleep(1000);
long end = System.currentTimeMillis();
float interval = ((end - start) / 1000);
int connectTotal = ResponseHandler.GETCOUNT();
int persistenceTotal = PersistenceHandler.GETCOUNT();
int connectps = Math.round(connectTotal / interval);
int persistenceps = Math.round(persistenceTotal / interval);
System.out.print(
"\r連接總數:" + connectTotal + " \t每秒連接:" + connectps + "\t連接隊列剩余:" + CONNECTLIST.size() + " \t持久化總數:"
+ persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化隊列剩余:" + PERSISTENCELIST.size());
}
}
}
class RequestHandler implements Runnable {
BlockingQueue connectlist;
Selector selector;
public RequestHandler(BlockingQueue connectlist, Selector selector) {
this.connectlist = connectlist;
this.selector = selector;
}
@Override
public void run() {
while (true) {
try {
Task task = (Task) connectlist.take();
SocketAddress addr = new InetSocketAddress(task.getHost(), 80);
SocketChannel socketChannel = SocketChannel.open(addr);
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(2400);
byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));
byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));
byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));
byteBuffer.put(("\r\n").getBytes("utf8"));
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
socketChannel.register(selector, SelectionKey.OP_READ, task);
selector.wakeup();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class ResponseHandler implements Runnable {
Selector selector;
BlockingQueue connectlist;
BlockingQueue persistencelist;
List domainlist;
ExecutorService threadPool;
Charset charset = Charset.forName("utf8");
Charset gbkcharset = Charset.forName("gbk");
public static int GETCOUNT() {
return COUNT.get();
}
private static final AtomicInteger COUNT = new AtomicInteger();
public ResponseHandler(Selector selector, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist,
ExecutorService threadpool) {
this.selector = selector;
this.connectlist = connectlist;
this.persistencelist = persistencelist;
this.domainlist = domainlist;
this.threadPool = threadpool;
}
@Override
public void run() {
while (true) {
try {
int n = selector.selectNow();
if (n == 0)
continue;
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if (key.isReadable() && key.isValid()) {
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
Runnable r = new Runnable() {
@Override
public void run() {
try {
Task task = (Task) key.attachment();
ByteBuffer byteBuffer = ByteBuffer.allocate(2400);
SocketChannel channel = (SocketChannel) key.channel();
int length;
while ((length = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
task.appendContent(charset.decode(charset.encode(gbkcharset.decode(byteBuffer)))
.toString());
byteBuffer.compact();
}
if (length == -1) {
channel.close();
COUNT.incrementAndGet();
new ParseHandler(task, connectlist, persistencelist, domainlist).handler();
} else {
channel.register(selector, SelectionKey.OP_READ, task);
}
key.selector().wakeup();
} catch (Exception e) {
try {
key.cancel();
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
};
threadPool.submit(r);
}
it.remove();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class ParseHandler {
private static final Set SET = new HashSet();
private BlockingQueue connectlist;
private BlockingQueue persistencelist;
List domainlist;
Task task;
private interface Filter {
void doFilter(Task fatherTask, Task newTask, String path, Filter chain);
}
private class FilterChain implements Filter {
private List list = new ArrayList();
{
addFilter(new TwoLevel());
addFilter(new OneLevel());
addFilter(new FullPath());
addFilter(new Root());
addFilter(new Default());
}
private void addFilter(Filter filter) {
list.add(filter);
}
private Iterator it = list.iterator();
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (it.hasNext()) {
((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);
}
}
}
private class TwoLevel implements Filter {
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (path.startsWith("../../")) {
String prefix = getPrefix(fatherTask.getCurrentPath(), 3);
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));
} else {
chain.doFilter(fatherTask, newTask, path, chain);
}
}
}
private class OneLevel implements Filter {
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (path.startsWith("../")) {
String prefix = getPrefix(fatherTask.getCurrentPath(), 2);
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));
} else {
chain.doFilter(fatherTask, newTask, path, chain);
}
}
}
private class FullPath implements Filter {
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (path.startsWith("http://")) {
Iterator it = domainlist.iterator();
boolean flag = false;
while (it.hasNext()) {
String domain = (String) it.next();
if (path.startsWith("http://" + domain + "/")) {
newTask.init(domain, fatherTask.getPort(), path.replace("http://" + domain + "/", "/"));
flag = true;
break;
}
}
if (!flag) {
newTask.setValid(false);
}
} else {
chain.doFilter(fatherTask, newTask, path, chain);
}
}
}
private class Root implements Filter {
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (path.startsWith("/")) {
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path);
} else {
chain.doFilter(fatherTask, newTask, path, chain);
}
}
}
private class Default implements Filter {
@Override
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
if (path.contains(":")) {
newTask.setValid(false);
return;
}
String prefix = getPrefix(fatherTask.getCurrentPath(), 1);
newTask.init(fatherTask.getHost(), fatherTask.getPort(), prefix + "/" + path);
}
}
public ParseHandler(Task task, BlockingQueue connectlist, BlockingQueue persistencelist, List domainlist) {
this.connectlist = connectlist;
this.task = task;
this.persistencelist = persistencelist;
this.domainlist = domainlist;
}
private Pattern pattern = Pattern.compile("\"[^\"]+\\.htm[^\"]*\"");
protected void handler() {
try {
parseTaskState(task);
if (200 == task.getState()) {
Matcher matcher = pattern.matcher(task.getContent());
while (matcher.find()) {
String path = matcher.group();
if (!path.contains(" ") && !path.contains("\t") && !path.contains("(") && !path.contains(")")) {
path = path.substring(1, path.length() - 1);
createNewTask(task, path);
}
}
}
task.dropContent();
persistencelist.put(task);
} catch (Exception e) {
e.printStackTrace();
}
}
private void parseTaskState(Task task) {
if (task.getContent().startsWith("HTTP/1.1")) {
task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
} else {
task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
}
}
/**
* @param fatherTask
* @param path
* @throws Exception
*/
private void createNewTask(Task fatherTask, String path) throws Exception {
Task newTask = new Task();
FilterChain filterchain = new FilterChain();
filterchain.doFilter(fatherTask, newTask, path, filterchain);
if (newTask.isValid()) {
synchronized (SET) {
if (SET.contains(newTask.getHost() + newTask.getCurrentPath())) {
return;
}
SET.add(newTask.getHost() + newTask.getCurrentPath());
}
connectlist.put(newTask);
}
}
private String getPrefix(String s, int count) {
String prefix = s;
while (count > 0) {
prefix = prefix.substring(0, prefix.lastIndexOf("/"));
count--;
}
return "".equals(prefix) ? "/" : prefix;
}
}
class Task {
public Task() {
}
public void init(String host, int port, String path) {
this.setCurrentPath(path);
this.host = host;
this.port = port;
}
public Task(String host, int port, String path) {
init(host, port, path);
}
private String host;
private int port;
private String currentPath;
private long starttime;
private long endtime;
public long getStarttime() {
return starttime;
}
public void setStarttime(long starttime) {
this.starttime = starttime;
}
public long getEndtime() {
return endtime;
}
public void setEndtime(long endtime) {
this.endtime = endtime;
}
private long taskTime;
private String type;
private StringBuilder content = new StringBuilder(2400);
private int state;
private boolean isValid = true;
public boolean isValid() {
return isValid;
}
public void setValid(boolean isValid) {
this.isValid = isValid;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public String getCurrentPath() {
return currentPath;
}
public void setCurrentPath(String currentPath) {
this.currentPath = currentPath;
int i = 0;
if (currentPath.indexOf("?") != -1) {
i = currentPath.indexOf("?");
} else {
if (currentPath.indexOf("#") != -1) {
i = currentPath.indexOf("#");
} else {
i = currentPath.length();
}
}
this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);
}
public long getTaskTime() {
return getEndtime() - getStarttime();
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getContent() {
return content.toString();
}
public void dropContent() {
this.content = null;
}
public void appendContent(String content) {
this.content.append(content);
}
}
class PersistenceHandler implements Runnable {
static {
try {
Class.forName("oracle.jdbc.OracleDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static int GETCOUNT() {
return COUNT.get();
}
private static final AtomicInteger COUNT = new AtomicInteger();
private BlockingQueue persistencelist;
public PersistenceHandler(BlockingQueue persistencelist) {
this.persistencelist = persistencelist;
try {
conn = DriverManager.getConnection("jdbc:oracle:thin:127.0.0.1:1521:orcl", "edmond", "edmond");
ps = conn.prepareStatement(
"insert into probe(id,host,path,state,tasktime,type) values(seq_probe_id.nextval,?,?,?,?,?)");
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private Connection conn;
private PreparedStatement ps;
@Override
public void run() {
while (true) {
this.handler();
COUNT.addAndGet(1);
}
}
private void handler() {
try {
Task task = (Task) persistencelist.take();
ps.setString(1, task.getHost());
ps.setString(2, task.getCurrentPath());
ps.setInt(3, task.getState());
ps.setLong(4, task.getTaskTime());
ps.setString(5, task.getType());
ps.addBatch();
if (GETCOUNT() % 500 == 0) {
ps.executeBatch();
conn.commit();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
每秒可以爬170-200左右的網頁
因為這個速度受制于公司帶寬.
CPU也基本上跑滿了
這個程序還有優化的空間,主要是以下代碼的阻塞和喚醒關系,還是沒有搞明白.
socketChannel.register(selector, SelectionKey.OP_READ, task);
int n = selector.select();
key.selector().wakeup();
以上就是網頁主動探測中的NIO優化是怎樣的,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。