В настоящее время работаю над одной клиент-серверной системой, использующей для поддержки персистентности широко известный Hibernate (а в качестве бэкэнда - СУБД PostgreSQL). И не далее как на прошлой неделе столкнулся с одной забавной проблемкой.
Понадобилось прикрутить к системе Quartz Scheduler, дабы была возможность запускать отдельные задачи "по расписанию". Для того, чтобы зареганные таски загадочно не исчезали после падения внеплановых остановов серверов ;), старина Quartz умеет сохранять данные о них в БД, стоит только поплясать над его конфигом. В целях максимальной переносимости Quartz использует JDBC, а вместе с дистрибутивом идет ряд SQL-скриптов, создающих все необходимые Quartz'у объекты в БД. Меня такое положение дел мягко сказать не удовлетворило и, думаю, многие в этом плане будут со мной полностью солидарны... Кому нужны дополнительные приплясывания, связанные с подготовкой структуры базы при развертывании системы? Пускай эта "нелегкая" задача ложится на плечи современных ORM-технологий.
Однако здесь то и возникла основная проблема. Все пользовательские данные задачи Quartz хранит в HashMap, а в базе - в бинарном виде. PostgreSQL предоставляет два способа хранения бинарных данных - поля BYTEA и OID (Large Object). Quartz работает с BYTEA, а Hibernate мэппит поля, помеченные JPA-аннотацией @Lob в OID. Т.е., при мэппинге вместо:
JOB_DATA BYTEA NULL
получаем:
JOB_DATA OID
И, как следствие, полный лог счастья...
Научить Hibernate работать с BYTEA - означает отказаться от одного из главных преимуществ его использования - переносимости. Уж лучше научить Quartz работать с OID. Благо такая возможность существует. Все средства работы с базой, не зависимые от конкретной СУБД, в Quartz'е сосредоточены в классе org.quartz.impl.jdbcjobstore.StdJDBCDelegate. Далее есть ряд классов-делегатов, расширяющих StdJDBCDelegate и предназначенных для работы с конкретными СУБД. Среди них можно найти и класс org.quartz.impl.jdbcjobstore.PostgreSQLDelegate, но мы о нем забудем, работа с BYTEA нам не нужна. Напишем свою реализацию делегата.
Собственно, все что нам нужно - это переопределить 3 метода класса StdJDBCDelegate:
Object getObjectFromBlob(ResultSet rs, String columnName)
throws ClassNotFoundException, IOException, SQLException;
Object getJobDetailFromBlob(ResultSet rs, String columnName)
throws ClassNotFoundException, IOException, SQLException;
void setBytes(PreparedStatement ps, int index, ByteArrayOutputStream baos)
throws SQLException;
Работа с сохранением и извлечением бинарных данных подробно расписана в документации на сайте PostgreSQL. Далее приводится код моего делегата.
package com.tuneit.wf.scheduled.quartz;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
/**
* <p>
* This is a driver delegate for the PostgreSQL JDBC driver.
* </p>
*
* @author Ruslan F. Isbarov
* @version 1.0
*/
public class PostgreSQLDelegate extends StdJDBCDelegate {
/**
* <p>
* Create new PostgreSQLDelegate instance.
* </p>
*
* @param log the logger to use during execution
* @param tablePrefix the prefix of all table names
*/
public PostgreSQLDelegate(Log log, String tablePrefix,
String instanceId) {
super(log, tablePrefix, instanceId);
}
/**
* <p>
* Create new PostgreSQLDelegate instance.
* </p>
*
* @param log the logger to use during execution
* @param tablePrefix the prefix of all table names
* @param useProperties use java.util.Properties for storage
*/
public PostgreSQLDelegate(Log log, String tablePrefix,
String instanceId, Boolean useProperties) {
super(log, tablePrefix, instanceId, useProperties);
}
/**
* <p>
* This method should be overridden by any delegate subclasses that need
* special handling for BLOBs. The default implementation uses standard
* JDBC <code>java.sql.Blob</code> operations.
* </p>
*
* @param rs the result set, already queued to the correct row
* @param columnName the column name for the BLOB
* @return the deserialized Object from the ResultSet BLOB
* @throws ClassNotFoundException
* if a class found during deserialization cannot be found
* @throws IOException if deserialization causes an error
*/
@Override
protected Object getObjectFromBlob(ResultSet rs, String columnName)
throws ClassNotFoundException, IOException, SQLException {
try {
// Get bytes from the ResultSet BLOB.
byte[] buffer = getBytes(rs, columnName);
// Get the object from input stream.
Object object = null;
if (buffer != null && buffer.length != 0) {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
ObjectInputStream ois = new ObjectInputStream(bais);
try {
object = ois.readObject();
} finally {
ois.close();
}
}
return object;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw e;
}
}
/**
* <p>
* This method should be overridden by any delegate subclasses that need
* special handling for BLOBs for job details. The default implementation
* uses standard JDBC <code>java.sql.Blob</code> operations.
* </p>
*
* @param rs the result set, already queued to the correct row
* @param columnName the column name for the BLOB
* @return the deserialized Object from the ResultSet BLOB
* @throws ClassNotFoundException
* if a class found during deserialization cannot be found
* @throws IOException if deserialization causes an error
*/
@Override
protected Object getJobDetailFromBlob(ResultSet rs, String columnName)
throws ClassNotFoundException, IOException, SQLException {
if (canUseProperties()) {
try {
// Get bytes from the ResultSet BLOB.
byte[] buffer = getBytes(rs, columnName);
if (buffer != null && buffer.length != 0) {
return new ByteArrayInputStream(buffer);
}
return null;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw e;
}
}
return getObjectFromBlob(rs, columnName);
}
/**
* Returns a byte array from BLOB
*
* @param rs the result set, already queued to the correct row
* @param columnName the column name for the BLOB
* @return byte array from the ResultSet BLOB
*/
protected byte[] getBytes(ResultSet rs, String columnName) throws SQLException {
Connection connection = null;
try {
// Get DB connection.
connection = rs.getStatement().getConnection();
// Store previous auto-commit state.
boolean autoCommitState = connection.getAutoCommit();
// All LargeObject API calls must be within a transaction block.
connection.setAutoCommit(false);
// Get the Large Object Manager to perform operations with.
LargeObjectManager largeObjectManager =
((PGConnection) connection).getLargeObjectAPI();
// Get OID.
long oid = rs.getLong(columnName);
// Load LO.
LargeObject largeObject =
largeObjectManager.open(oid, LargeObjectManager.READ);
// Read the data.
byte buffer[] = new byte[largeObject.size()];
largeObject.read(buffer, 0, largeObject.size());
// Close the object.
largeObject.close();
// Finally, commit the transaction.
connection.commit();
// Restore auto-commit state.
connection.setAutoCommit(autoCommitState);
return buffer;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
if (connection != null) {
try {
connection.rollback();
} catch (Exception ignored) {
}
}
throw e;
}
}
/**
* Sets the designated parameter to the byte array of the given
* <code>ByteArrayOutputStream</code>. Will set parameter value to null if the
* <code>ByteArrayOutputStream</code> is null.
* This just wraps <code>{@link PreparedStatement#setBytes(int, byte[])}</code>
* by default, but it can be overloaded by subclass delegates for databases that
* don't explicitly support storing bytes in this way.
*/
@Override
protected void setBytes(PreparedStatement ps, int index,
ByteArrayOutputStream baos) throws SQLException {
Connection connection = null;
try {
// Get DB connection.
connection = ps.getConnection();
// Store auto-commit state.
boolean autoCommitState = connection.getAutoCommit();
// All LargeObject API calls must be within a transaction block.
connection.setAutoCommit(false);
// Get the Large Object Manager to perform operations with.
LargeObjectManager largeObjectManager =
((PGConnection) connection).getLargeObjectAPI();
// Create a new large object.
long oid = largeObjectManager.createLO(
LargeObjectManager.READ | LargeObjectManager.WRITE
);
// Open the large object for writing.
LargeObject largeObject =
largeObjectManager.open(oid, LargeObjectManager.WRITE);
// Copy the data from stream to the large object
largeObject.write((baos == null) ? new byte[0] : baos.toByteArray());
// Close the large object
largeObject.close();
ps.setLong(index, oid);
// Finally, commit the transaction.
connection.commit();
// Restore auto-commit state.
connection.setAutoCommit(autoCommitState);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
if (connection != null) {
try {
connection.rollback();
} catch (Exception ignored) {
}
}
throw e;
}
}
}
Все! Собираем, пакуем, кладем куда нам вздумается (главное, чтобы наше творение попало в CLASSPATH). И напоследок прописываем наш делегат в конфиге quartz.properties:
#==============================
# Configure JobStore
#==============================
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass
= com.tuneit.wf.scheduled.quartz.PostgreSQLDelegate
org.quartz.jobStore.dataSource = WF2Pool
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = false
Enjoy!