且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

大XML文件解析入库的一个方法

更新时间:2022-09-16 13:46:19

本来想用ORACLE的外部表导入XML文件的数据,但是太麻烦,而且XMLTYPE对于大数据
量似乎性能很差。GOOLE上找到一个上有一个分段XML入库的列子
(http://forums.oracle.com/forums/thread.jspa?threadID=461009&tstart=0),
即把XML文件,分解成N个DOM树写入数据库,作都思路很好,(就是不太爱写注释)。
我在他的基础上写了一个解析SAX文件,把数据入库(不是DOM树)的方法,500M的数据在我的PC上10多分钟。
    整个思路是这样:一个线程负责解析XML文件,当然采用SAX方式,DOM方式会让内存吃不消。
另外一个线程池负责向数据库写入。解析线程把解析出来的数据暂存在workQueue里,写入线程不断读取,
并写入数据库。当workQueue队列里的数据超过一定阀值,则解析线程等待,这是因为如果都解析出来,封装成
对象也很大,可能导致内存不足。基本上类似于生产者与消费者模式。
 刚写完,测试了几例,还算稳定,欢迎大家补充完善。
   OK,先贴代码。
SourceProcessor.java:
/*XML文件解析类    我用的是ORACLE的解析包xmlparserv2.jar 
* SAX在解析的过程中,在一个元素结尾与另一个元素开始处,解析器会把他当成一个文本结点。 
* characters方法会多出很多空格,最后用一个笨的方法解决了他,还请批评指正。 
*/
 
public class SourceProcessor extends Thread implements ContentHandler { 

  private List<List<String>> cacheList = new ArrayList<List<String>>(); 
  private SaxProcessor saxProcessor; 
  private String targetFilename = null
  private boolean recordStart = false
  private boolean useable = false

  private List<String> curDatas = new ArrayList<String>(); 
  private StringBuffer curData = new StringBuffer(); 

  public boolean isRecordStart() { 
    return recordStart; 
  } 

  public void setRecordStart(boolean recordStart) { 
    this.recordStart = recordStart; 
  } 

  public SourceProcessor(String threadName) { 
    super(threadName); 
  } 

  public String getTargetFilename() { 
    return targetFilename; 
  } 

  public void setTargetFilename(String targetFilename) { 
    this.targetFilename = targetFilename; 
  } 

  public void characters(char[] ch, int start, int length) 
      throws SAXException { 

    if (this.useable == true) { 
      curData.append(new String(ch, start, length)); 
    } 
  } 

  public void endDocument() throws SAXException { 
    if (cacheList.size() > 0) { 
      this.saxProcessor.addToQueue(cacheList); 
    } 
    this.saxProcessor.setParsingComplete(); 
    System.out.println("over"); 
  } 

  public void endElement(String uri, String localName, String name) 
      throws SAXException { 
    /* 一条记录完成 */ 
    if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) { 
      cacheList.add(curDatas); 
      this.curDatas = new ArrayList<String>(); 

      if (cacheList.size() >= 100) { 
        this.saxProcessor.addToQueue(cacheList); 
        cacheList = new ArrayList<List<String>>(); 
      } 
    } else if (this.recordStart == true) { 
      curDatas.add(curData.toString().trim()); 
      curData = new StringBuffer(); 
      this.useable = false
    } 
  } 

  public void endPrefixMapping(String prefix) throws SAXException { 

  } 

  public void ignorableWhitespace(char[] ch, int start, int length) 
      throws SAXException { 
  } 

  public void processingInstruction(String target, String data) 
      throws SAXException { 

  } 

  public void setDocumentLocator(Locator locator) { 

  } 

  public void skippedEntity(String name) throws SAXException { 
    // TODO Auto-generated method stub 
  } 

  public void startDocument() throws SAXException { 

  } 

  public void startElement(String uri, String localName, String name, 
      Attributes atts) throws SAXException { 
    if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) { 
      this.recordStart = true
    } else if (recordStart == true) { 
      /* 可以收集 */ 
      this.useable = true
    } 

  } 

  public void startPrefixMapping(String prefix, String uri) 
      throws SAXException { 

  } 

  public SaxProcessor getSaxProcessor() { 
    return saxProcessor; 
  } 

  public void setSaxProcessor(SaxProcessor saxProcessor) { 
    this.saxProcessor = saxProcessor; 
  } 

  public void run() { 
    try { 
      SAXParser parser = new SAXParser(); 
      parser.setAttribute(SAXParser.STANDALONE, Boolean.valueOf(true)); 
      parser.setValidationMode(SAXParser.NONVALIDATING); 
      parser.setContentHandler(this); 
      this.saxProcessor.setParserActive(); 
      parser.parse(new FileInputStream(this.targetFilename)); 

    } catch (ProcessingCompleteException pce) { 
      pce.printStackTrace(); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    } 
  } 

  public boolean isUseable() { 
    return useable; 
  } 

  public void setUseable(boolean useable) { 
    this.useable = useable; 
  } 

}
DatabaseWriter.java:
/*数据库写入线程类,记录数达到commitCharge条则提交,最后提交剩余记录*/ 
public class DatabaseWriter extends Thread { 
  private Connection connection; 
  private SaxProcessor processor; 
  private String threadName; 
  private int commitCharge; 
  private int recordCount = 0; 

  public Connection getConnection() { 
    return connection; 
  } 

  public void setConnection(Connection connection) { 
    this.connection = connection; 
  } 

  public SaxProcessor getProcessor() { 
    return processor; 
  } 

  public void setProcessor(SaxProcessor processor) { 
    this.processor = processor; 
  } 

  DatabaseWriter(SaxProcessor processor, String threadName, 
      Connection connection) { 
    this.connection = connection; 
    this.processor = processor; 
    this.threadName = threadName; 
  } 

  public void setParameters(int commitCharge) { 
    this.commitCharge = commitCharge; 
  } 

  public void run() { 
    PreparedStatement stat = null
    try { 
      connection.setAutoCommit(false); 
      stat = connection 
          .prepareStatement("insert /*+ append*/ into testMT nologging values(?,?,?,?)"); 

    } catch (SQLException e1) { 
      e1.printStackTrace(); 
    } 

    while (!this.processor.processingComplete()) { 
      List datas = this.processor.getNextData(this.threadName); 
      System.out.println(this.threadName + " run!!"); 

      if (datas != null && stat != null) { 
        for (int i = 0; i < datas.size(); i++) { 
          try { 
            List record = (List) datas.get(i); 
            stat.setString(1, (String) record.get(0)); 
            stat.setString(2, (String) record.get(1)); 
            stat.setString(3, (String) record.get(2)); 
            stat.setString(4, (String) record.get(3)); 
            stat.execute(); 
            this.recordCount++; 

          } catch (SQLException e) { 
            e.printStackTrace(); 
          } 
        } 
      } 
      if (recordCount >= commitCharge) { 
        try { 
          connection.commit();
          this.recordCount = 0;
        } catch (SQLException e) { 
          e.printStackTrace(); 
        } 
      } 

    } 
    try { 
      if (!connection.isClosed()) { 
        connection.commit(); 
      } 
    } catch (SQLException e) { 
      e.printStackTrace(); 
    } 
  } 
}
SaxProcessor.java
/* 
* 解析控制程序 
*/
 
public class SaxProcessor extends ConnectionProvider { 
  /*工作队列,暂存解析的数据*/ 
  private Vector workQueue = new Vector(); 

  /*线程池,源作者用Hashtable实现的线程池,我觉得JDK1.5已经有现成的了,何必再造***呢*/ 
  BlockingDeque<Runnable> queue = new LinkedBlockingDeque<Runnable>(); 
  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, 
      TimeUnit.MINUTES, queue); 
    
  private int threadCount; 
  Thread saxReader; 
  private boolean parserActive = false

  private void setWriterCount(int count) { 
    this.threadCount = count; 
  } 

  private int getWriterCount() { 
    return this.threadCount; 
  } 

    

  protected synchronized void setParserActive() { 
    this.parserActive = true
  } 

  protected synchronized void setParsingComplete() { 
    this.parserActive = false
    notifyAll(); 
  } 

  public synchronized boolean parsingComplete() { 
    return !this.parserActive; 
  } 

  public synchronized boolean processingComplete() { 

    boolean result = (parsingComplete()) && (this.workQueue.size() == 0); 
    return result; 
  } 

  private boolean listQueueFull() { 
    return (this.workQueue.size() >= (2 * getWriterCount())); 
  } 

  /* 向工作队列添加一个任务,并通知所有受阻线程 */ 
  protected synchronized void addToQueue(List data) throws SAXException { 
    if (listQueueFull()) { 
      try { 
        wait(); 
      } catch (InterruptedException ie) { 
        ie.printStackTrace(); 
      } 
    } 
     
    this.workQueue.addElement(data); 
    notifyAll(); 
     
  } 

  public synchronized List getNextData(String thread) { 
    List data = null
    while (!parsingComplete() && (this.workQueue.size() == 0)) { 
      try { 
        wait(); 
      } catch (InterruptedException ioe) { 
      } 
    } 
    if (this.workQueue.size() > 0) { 
      data = (List) this.workQueue.remove(0); 
      notifyAll(); 
    } 
    return data; 
  } 

  public SaxProcessor() throws SQLException, IOException, SAXException { 
    super(); 
  } 

  public void doSomething(String[] args) { 
    try { 
      setWriterCount(Integer.parseInt(getSetting("ThreadCount","4"))); 
      this.saxReader = createSourceProcessor(); 
      this.setParserActive(); 
      this.saxReader.start(); 
      createDatabaseWriters(); 
      waitForCompletion(); 
    } catch (Exception e) { 
      e.printStackTrace(); 
      this.setParsingComplete(); 
    } 
  } 

  private synchronized void waitForCompletion() { 
    while (!parsingComplete()) { 
      try { 
        wait(); 
      } catch (InterruptedException ioe) { 
      } 
    } 
    this.executor.shutdown(); 
  } 

  private void createDatabaseWriters() throws SQLException { 
    DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance(); 
    df.applyPattern("000000"); 
    int commitCharge = Integer.parseInt(getSetting("CommitCharge""50")); 
    for (int i = 0; i < getWriterCount(); i++) { 
      System.out.println(getWriterCount()); 
      String threadName = "Writer_" + df.format(i + 1); 
      Connection conn = getNewConnection(); 
      conn.setAutoCommit(false); 
      DatabaseWriter writer = new DatabaseWriter(this, threadName, conn); 
      writer.setParameters(commitCharge); 
        
      this.executor.execute(writer); 
    } 
  } 

  private Thread createSourceProcessor() throws SQLException { 
    String threadName = "SaxReader"
    SourceProcessor saxReader = new SourceProcessor(threadName); 
    saxReader.setSaxProcessor(this); 
     saxReader.setTargetFilename(getSetting("SourceXML","DIR")); 
    return saxReader; 
  } 

  protected synchronized void printXML(Document xml, PrintWriter pw) 
      throws IOException { 
    ((XMLDocument) xml).print(pw); 
  } 

        /*主函数*/ 
  public static void main(String[] args) { 
    try { 
      SaxProcessor app = new SaxProcessor(); 
      app.initializeConnection(); 
      app.doSomething(args); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    } 
  } 
  public Vector getWorkQueue() { 
    return workQueue; 
  } 

  public void setWorkQueue(Vector workQueue) { 
    this.workQueue = workQueue; 
  } 

ConnectionProvider.java
/*配置文件解析类,很简单,不说了*/ 
public class ConnectionProvider extends Object { 
  public static final boolean DEBUG = true

  protected OracleConnection connection; 

  protected XMLDocument connectionDefinition; 

  public static final String CONNECTION = "Connection"
  public static final String DRIVER = "Driver"
  public static final String HOSTNAME = "Hostname"
  public static final String PORT = "Port"
  public static final String SID = "SID"
  public static final String SERVICENAME = "ServiceName"
  public static final String SERVERMODE = "Server"
  public static final String SCHEMA = "Schema"
  public static final String PASSWORD = "Password"
  public static final String POOL = "Pool"
  public static final String THIN_DRIVER = "thin"
  // public static final String OCI_DRIVER = "oci8"; 

  public static final String DEFAULT_CONNECTION_DEFINITION = "c:\\temp\\connection.xml"
  public static final String DEFAULT_DRIVER = THIN_DRIVER; 
  public static final String DEFAULT_HOSTNAME = "localhost"
  public static final String DEFAULT_PORT = "1521"
  public static final String DEFAULT_SERVERMODE = "DEDICATED"

  public static final String TARGET_DIRECTORY = "targetDirectory"

  protected PrintStream log; 

  public ConnectionProvider() { 

  } 

  public void initializeConnection() throws SAXException, IOException, 
      SQLException { 
    this.initializeConnection(System.out); 
  } 

  public void initializeConnection(PrintStream log) throws SAXException, 
      IOException, SQLException { 
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); 
    this.log = log; 
    loadConnectionSettings(); 
    this.connection = openConnection(); 
  } 

  public ConnectionProvider getConnectionProvider() { 
    return this
  } 

  public void initalizeConnection(String connectionLocation, PrintStream log) 
      throws SAXException, IOException, SQLException { 
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); 
    this.log = log; 
    loadConnectionSettings(connectionLocation); 
    this.connection = openConnection(); 
  } 

  public void setLogger(PrintStream log) { 
    this.log = log; 
  } 

  private void setConnectionSettings(XMLDocument doc) { 
    this.connectionDefinition = doc; 
  } 

  private void dumpConnectionSettings() throws IOException { 
    StringWriter sw = new StringWriter(); 
    PrintWriter pw = new PrintWriter(sw); 
    this.connectionDefinition.print(pw); 
    pw.close(); 
    sw.close(); 
  } 

  public OracleConnection getConnection() throws SQLException { 
    return this.connection; 
  } 

  public void closeConnection(Connection conn) throws Exception { 
    if (isPooled()) { 
      conn.close(); 
    } 
  } 

  public Connection getConnection(String schema, String passwd) 
      throws Exception { 
    if (isPooled()) { 
      return (OracleOCIConnection) this.getConnection(schema, passwd); 
    } else { 
      return this.connection; 
    } 
  } 

  public String getSetting(String nodeName) { 
    return getSetting(nodeName, null); 
  } 

  public String getSetting(String nodeName, String defaultValue) { 
    XMLElement root = (XMLElement) this.connectionDefinition 
        .getDocumentElement(); 
    NodeList children = root.getChildrenByTagName(nodeName); 
    if (children.getLength() != 0) { 
      Element element = (Element) children.item(0); 
      Text text = (Text) element.getFirstChild(); 
      if (text != null) { 
        return text.getData(); 
      } 
    } 
    return defaultValue; 
  } 

  protected String getDriver() { 
    return getSetting(DRIVER, DEFAULT_DRIVER); 
  } 

  protected String getHostname() { 
    return getSetting(HOSTNAME, DEFAULT_HOSTNAME); 
  } 

  protected String getPort() { 
    return getSetting(PORT, DEFAULT_PORT); 
  } 

  protected String getServerMode() { 
    return getSetting(SERVERMODE, DEFAULT_SERVERMODE); 
  } 

  protected String getServiceName() { 
    return getSetting(SERVICENAME); 
  } 

  protected String getSID() { 
    return getSetting(SID); 
  } 

  protected boolean isPooled() { 
    String usePool = getSetting(POOL, Boolean.FALSE.toString()); 
    return !usePool.equalsIgnoreCase(Boolean.FALSE.toString()); 
  } 

  protected String getSchema() { 
    return getSetting(SCHEMA); 
  } 

  protected String getPassword() { 
    return getSetting(PASSWORD); 
  } 

  public void loadConnectionSettings() throws IOException, SAXException { 
    String filename = System.getProperty( 
        "com.oracle.st.xmldb.pm.ConnectionParameters"
        this.DEFAULT_CONNECTION_DEFINITION); 
    loadConnectionSettings(filename); 
  } 

  public void loadConnectionSettings(String filename) throws IOException, 
      SAXException { 
    if (DEBUG) { 
      System.out 
          .println("Using connection Parameters from : " + filename); 
    } 
    Reader reader = new FileReader(new File(filename)); 
    DOMParser parser = new DOMParser(); 
    parser.parse(reader); 
    XMLDocument doc = parser.getDocument(); 
    setConnectionSettings(doc); 
    if (DEBUG) { 
      dumpConnectionSettings(); 
    } 
  } 

  protected String getDatabaseURL() { 
    if (getDriver() != null) { 
      if (getDriver().equalsIgnoreCase(THIN_DRIVER)) { 
        return "jdbc:oracle:thin:@" + getHostname() + ":" + getPort() 
            + ":" + getSID(); 
      } else { 
        return "jdbc:oracle:oci8:@(description=(address=(host=" 
            + getHostname() + ")(protocol=tcp)(port=" + getPort() 
            + "))(connect_data=(service_name=" + getServiceName() 
            + ")(server=" + getServerMode() + ")))"
      } 
    } else { 
      return null
    } 
  } 

  private OracleConnection openConnection() throws SQLException { 
    String user = getSchema(); 
    String password = getPassword(); 
    String connectionString = user + "/" + password + "@" 
        + getDatabaseURL(); 
    OracleConnection conn = null
    if (DEBUG) { 
      this.log 
          .println("ConnectionProvider.establishConnection(): Connecting as " 
              + connectionString); 
    } 
    try { 
      conn = (OracleConnection) DriverManager.getConnection( 
          getDatabaseURL(), user, password); 
      if (DEBUG) { 
        this.log 
            .println("ConnectionProvider.establishConnection(): Database Connection Established"); 
      } 
    } catch (SQLException sqle) { 
      int err = sqle.getErrorCode(); 
      this.log 
          .println("ConnectionProvider.establishConnection(): Failed to connect using " 
              + connectionString); 
      sqle.printStackTrace(this.log); 
      throw sqle; 
    } 
    return conn; 
  } 

  public OracleConnection getNewConnection() throws SQLException { 
    return openConnection(); 
  } 

  public XMLDocument getConnectionSettings() { 
    return this.connectionDefinition; 
  } 
}
ProcessingCompleteException.java
public class ProcessingCompleteException extends SAXException { 
  public ProcessingCompleteException() { 
    super("Processing Complete"); 
  } 
}
配置文件例子(connection.xml):  
<?xml version="1.0" encoding="UTF-8"?> 
<Connection> 
    <Driver>Thin</Driver> 
    <Hostname>10.1.199.250</Hostname> 
    <Port>1521</Port> 
    <ServiceName>orcl.xp.mark.drake.oracle.com</ServiceName> 
    <SID>idm</SID> 
    <ServerMode>DEDICATED</ServerMode> 
    <Schema>dcaudit</Schema> 
    <Password>dcaudit</Password> 
    <SourceXML>C:\temp\AuditDemo2.xml</SourceXML> 
    <Element>Record</Element> 
    <Table>RDF_DOCUMENT_TABLE</Table> 
    <ErrorTable>RDF_ERROR_TABLE</ErrorTable> 
    <schemaInstancePrefix>xsi</schemaInstancePrefix> 
    <schemaLocation/> 
    <noNamespaceSchemaLocation/> 
    <CommitCharge>10</CommitCharge> 
    <ThreadCount>2</ThreadCount> 
</Connection>

下班了,不多说了,88 。


本文转自 anranran 51CTO博客,原文链接:http://blog.51cto.com/guojuanjun/322318