Re: raw dataport

From: Tore Halset (halse..vv.ntnu.no)
Date: Fri Jun 16 2006 - 10:00:38 EDT

  • Next message: Rashid Khan: "CayenneModeler "To Dep PK" Checkbox problem"

    Hello.

    I have a bit of trouble understanding the current DataPort and why it
    does not work on huge tables. It looks very much like cayenne is
    trying to get all the rows from the table into memory before starting
    to insert. Setting setPageSize on the select did not fix the problem.

    I have attached my current jdbc-based processInsert-method for others
    to play with. It reduces the number of problems with plain jdbc by
    using DbAdapter for binding parameters and determine support for
    batching.

    Regards,
      - Tore.

         protected void processInsert(List entities) throws
    CayenneException {
             // Allow delegate to modify the list of entities
             // any way it wants. For instance delegate may filter
             // or sort the list (though it doesn't have to, and can simply
             // pass through the original list).
             if (delegate != null) {
                 entities = delegate.willCleanData(this, entities);
             }

             if (entities == null || entities.isEmpty()) {
                 return;
             }

             Connection sourceConn = null;
             Connection destConn = null;

             DbAdapter destAdapter = getDestinationNode().getAdapter();
             boolean useInsertBatch = destAdapter.supportsBatchUpdates();

             try {
                 sourceConn = getSourceNode().getDataSource
    ().getConnection();
                 destConn = getDestinationNode().getDataSource
    ().getConnection();
                 destConn.setAutoCommit(false);

                 // process ordered list of entities one by one
                 Iterator it = entities.iterator();
                 while (it.hasNext()) {
                     DbEntity entity = (DbEntity) it.next();

                     // skip derived DbEntities...
                     if (entity instanceof DerivedDbEntity) {
                         continue;
                     }

                     StringBuffer selectsql = new StringBuffer();
                     selectsql.append("SELECT ");

                     StringBuffer insertsql = new StringBuffer();
                     insertsql.append("INSERT INTO ");
                     insertsql.append(entity.getFullyQualifiedName());
                     insertsql.append(' ');

                     StringBuffer fields = new StringBuffer();
                     StringBuffer parameters = new StringBuffer();

                     // make sure we always are using the same order
                     List attributes = new ArrayList(entity.getAttributes
    ());

                     for (Iterator ait = attributes.iterator();
    ait.hasNext();) {
                         DbAttribute attribute = (DbAttribute) ait.next();
                         fields.append(attribute.getName());
                         parameters.append('?');
                         if (ait.hasNext()) {
                             fields.append(',');
                             parameters.append(',');
                         }
                     }

                     selectsql.append(fields);
                     selectsql.append(" FROM ");
                     selectsql.append(entity.getFullyQualifiedName());

                     insertsql.append('(');
                     insertsql.append(fields);
                     insertsql.append(") VALUES (");
                     insertsql.append(parameters);
                     insertsql.append(')');

                     QueryLogger.log(selectsql.toString());
                     QueryLogger.log(insertsql.toString());

                     Statement selectst = sourceConn.createStatement();
                     // TODO: configure/determine fetch size.
                     selectst.setFetchSize(100);
                     ResultSet rs = selectst.executeQuery
    (selectsql.toString());

                     PreparedStatement insertst =
    destConn.prepareStatement(insertsql.toString());

                     int size = 0;
                     long rows = 0;
                     int rowsInBatch = 0;
                     long start = System.currentTimeMillis();

                     while (rs.next()) {
                         rows++;
                         rowsInBatch++;
                         for (int i = 0; i < attributes.size(); i++) {
                             DbAttribute attribute = (DbAttribute)
    attributes.get(i);

                             int idx = i + 1;
                             int type = attribute.getType();
                             int prec = attribute.getPrecision();

                             switch (type) {
                             case Types.BLOB:
                                 Blob blob = rs.getBlob(idx);
                                 if (blob != null) {
                                     size = size + (int) blob.length();
                                 }
                                 destAdapter.bindParameter(insertst,
    blob, idx, type, prec);
                                 break;
                             case Types.VARBINARY:
                             case Types.LONGVARBINARY:
                                 byte[] bindata = rs.getBytes(idx);
                                 if (bindata != null) {
                                     size = size + bindata.length;
                                 }
                                 destAdapter.bindParameter(insertst,
    bindata, idx, type, prec);
                                 break;
                             case Types.CLOB:
                             case Types.CHAR:
                             case Types.VARCHAR:
                                 String string = rs.getString(idx);
                                 if (string != null) {
                                     size = size + string.length();
                                 }
                                 destAdapter.bindParameter(insertst,
    string, idx, type, prec);
                                 break;
                             default:
                                 Object o = rs.getObject(idx);
                                 size = size + 1;
                                 destAdapter.bindParameter(insertst, o,
    idx, type, prec);
                                 break;
                             }

                         }

                         if (useInsertBatch) {
                             insertst.addBatch();
                         } else {
                             insertst.execute();
                         }

                         // try to be clever about when to commit
                         // TODO: configure
                         if (size > 1000000) {
                             if (useInsertBatch) {
                                 insertst.executeBatch();
                             }
                             int used = (int) ((System.currentTimeMillis
    () - start) / 1000l);
                             QueryLogger.log("partly commit " +
    entity.getName() + " at row " + rows
                                     + ". " + (rowsInBatch / used) + "
    rows/s.");
                             destConn.commit();
                             size = 0;
                             rowsInBatch = 0;
                             start = System.currentTimeMillis();
                         }
                     }

                     if (useInsertBatch) {
                         insertst.executeBatch();
                     }
                     QueryLogger.log("commit " + entity.getName() + ". "
    + rows + " rows.");
                     destConn.commit();

                     rs.close();
                     insertst.close();
                     selectst.close();

                 }

             } catch (SQLException e) {
                 throw new CayenneException(e);
             } catch (Exception e) {
                 throw new CayenneException(e);
             } finally {
                 if (sourceConn != null) {
                     try {
                         sourceConn.close();
                     } catch (SQLException e) {
                     }
                 }
                 if (destConn != null) {
                     try {
                         destConn.close();
                     } catch (SQLException e) {
                     }
                 }
             }
         }



    This archive was generated by hypermail 2.0.0 : Fri Jun 16 2006 - 10:01:06 EDT