Hello.
I have a bit of trouble understanding the current DataPort and why it
does not work on huge tables. It looks very much like cayenne is
trying to get all the rows from the table into memory before starting
to insert. Setting setPageSize on the select did not fix the problem.
I have attached my current jdbc-based processInsert-method for others
to play with. It reduces the number of problems with plain jdbc by
using DbAdapter for binding parameters and determine support for
batching.
Regards,
- Tore.
protected void processInsert(List entities) throws
CayenneException {
// Allow delegate to modify the list of entities
// any way it wants. For instance delegate may filter
// or sort the list (though it doesn't have to, and can simply
// pass through the original list).
if (delegate != null) {
entities = delegate.willCleanData(this, entities);
}
if (entities == null || entities.isEmpty()) {
return;
}
Connection sourceConn = null;
Connection destConn = null;
DbAdapter destAdapter = getDestinationNode().getAdapter();
boolean useInsertBatch = destAdapter.supportsBatchUpdates();
try {
sourceConn = getSourceNode().getDataSource
().getConnection();
destConn = getDestinationNode().getDataSource
().getConnection();
destConn.setAutoCommit(false);
// process ordered list of entities one by one
Iterator it = entities.iterator();
while (it.hasNext()) {
DbEntity entity = (DbEntity) it.next();
// skip derived DbEntities...
if (entity instanceof DerivedDbEntity) {
continue;
}
StringBuffer selectsql = new StringBuffer();
selectsql.append("SELECT ");
StringBuffer insertsql = new StringBuffer();
insertsql.append("INSERT INTO ");
insertsql.append(entity.getFullyQualifiedName());
insertsql.append(' ');
StringBuffer fields = new StringBuffer();
StringBuffer parameters = new StringBuffer();
// make sure we always are using the same order
List attributes = new ArrayList(entity.getAttributes
());
for (Iterator ait = attributes.iterator();
ait.hasNext();) {
DbAttribute attribute = (DbAttribute) ait.next();
fields.append(attribute.getName());
parameters.append('?');
if (ait.hasNext()) {
fields.append(',');
parameters.append(',');
}
}
selectsql.append(fields);
selectsql.append(" FROM ");
selectsql.append(entity.getFullyQualifiedName());
insertsql.append('(');
insertsql.append(fields);
insertsql.append(") VALUES (");
insertsql.append(parameters);
insertsql.append(')');
QueryLogger.log(selectsql.toString());
QueryLogger.log(insertsql.toString());
Statement selectst = sourceConn.createStatement();
// TODO: configure/determine fetch size.
selectst.setFetchSize(100);
ResultSet rs = selectst.executeQuery
(selectsql.toString());
PreparedStatement insertst =
destConn.prepareStatement(insertsql.toString());
int size = 0;
long rows = 0;
int rowsInBatch = 0;
long start = System.currentTimeMillis();
while (rs.next()) {
rows++;
rowsInBatch++;
for (int i = 0; i < attributes.size(); i++) {
DbAttribute attribute = (DbAttribute)
attributes.get(i);
int idx = i + 1;
int type = attribute.getType();
int prec = attribute.getPrecision();
switch (type) {
case Types.BLOB:
Blob blob = rs.getBlob(idx);
if (blob != null) {
size = size + (int) blob.length();
}
destAdapter.bindParameter(insertst,
blob, idx, type, prec);
break;
case Types.VARBINARY:
case Types.LONGVARBINARY:
byte[] bindata = rs.getBytes(idx);
if (bindata != null) {
size = size + bindata.length;
}
destAdapter.bindParameter(insertst,
bindata, idx, type, prec);
break;
case Types.CLOB:
case Types.CHAR:
case Types.VARCHAR:
String string = rs.getString(idx);
if (string != null) {
size = size + string.length();
}
destAdapter.bindParameter(insertst,
string, idx, type, prec);
break;
default:
Object o = rs.getObject(idx);
size = size + 1;
destAdapter.bindParameter(insertst, o,
idx, type, prec);
break;
}
}
if (useInsertBatch) {
insertst.addBatch();
} else {
insertst.execute();
}
// try to be clever about when to commit
// TODO: configure
if (size > 1000000) {
if (useInsertBatch) {
insertst.executeBatch();
}
int used = (int) ((System.currentTimeMillis
() - start) / 1000l);
QueryLogger.log("partly commit " +
entity.getName() + " at row " + rows
+ ". " + (rowsInBatch / used) + "
rows/s.");
destConn.commit();
size = 0;
rowsInBatch = 0;
start = System.currentTimeMillis();
}
}
if (useInsertBatch) {
insertst.executeBatch();
}
QueryLogger.log("commit " + entity.getName() + ". "
+ rows + " rows.");
destConn.commit();
rs.close();
insertst.close();
selectst.close();
}
} catch (SQLException e) {
throw new CayenneException(e);
} catch (Exception e) {
throw new CayenneException(e);
} finally {
if (sourceConn != null) {
try {
sourceConn.close();
} catch (SQLException e) {
}
}
if (destConn != null) {
try {
destConn.close();
} catch (SQLException e) {
}
}
}
}
This archive was generated by hypermail 2.0.0 : Fri Jun 16 2006 - 10:01:06 EDT