null

Quartz + Hibernate + PostgreSQL = Проблема с BYTEA и OID

В настоящее время работаю над одной клиент-серверной системой, использующей для поддержки персистентности широко известный Hibernate (а в качестве бэкэнда - СУБД PostgreSQL). И не далее как на прошлой неделе столкнулся с одной забавной проблемкой.

Понадобилось прикрутить к системе Quartz Scheduler, дабы была возможность запускать отдельные задачи "по расписанию". Для того, чтобы зареганные таски загадочно не исчезали после падения внеплановых остановов серверов ;), старина Quartz умеет сохранять данные о них в БД, стоит только поплясать над его конфигом. В целях максимальной переносимости Quartz использует JDBC, а вместе с дистрибутивом идет ряд SQL-скриптов, создающих все необходимые Quartz'у объекты в БД. Меня такое положение дел мягко сказать не удовлетворило и, думаю, многие в этом плане будут со мной полностью солидарны... Кому нужны дополнительные приплясывания, связанные с подготовкой структуры базы при развертывании системы? Пускай эта "нелегкая" задача ложится на плечи современных ORM-технологий.

Однако здесь то и возникла основная проблема. Все пользовательские данные задачи Quartz хранит в HashMap, а в базе - в бинарном виде. PostgreSQL предоставляет два способа хранения бинарных данных - поля BYTEA и OID (Large Object). Quartz работает с BYTEA, а Hibernate мэппит поля, помеченные JPA-аннотацией @Lob в OID. Т.е., при мэппинге вместо:

JOB_DATA BYTEA NULL

получаем:

JOB_DATA OID

И, как следствие, полный лог счастья...

Научить Hibernate работать с BYTEA - означает отказаться от одного из главных преимуществ его использования - переносимости. Уж лучше научить Quartz работать с OID. Благо такая возможность существует. Все средства работы с базой, не зависимые от конкретной СУБД, в Quartz'е сосредоточены в классе org.quartz.impl.jdbcjobstore.StdJDBCDelegate. Далее есть ряд классов-делегатов, расширяющих StdJDBCDelegate и предназначенных для работы с конкретными СУБД. Среди них можно найти и класс org.quartz.impl.jdbcjobstore.PostgreSQLDelegate, но мы о нем забудем, работа с BYTEA нам не нужна. Напишем свою реализацию делегата.

Собственно, все что нам нужно - это переопределить 3 метода класса StdJDBCDelegate:

Object getObjectFromBlob(ResultSet rs, String columnName) 
       throws ClassNotFoundException, IOException, SQLException;
       
Object getJobDetailFromBlob(ResultSet rs, String columnName) 
       throws ClassNotFoundException, IOException, SQLException;
       
void setBytes(PreparedStatement ps, int index, ByteArrayOutputStream baos) 
     throws SQLException;

Работа с сохранением и извлечением бинарных данных подробно расписана в документации на сайте PostgreSQL. Далее приводится код моего делегата.

package com.tuneit.wf.scheduled.quartz;
     
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
     
import org.apache.commons.logging.Log;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
     
/**
 * <p>
 * This is a driver delegate for the PostgreSQL JDBC driver.
 * </p>
 *
 * @author Ruslan F. Isbarov
 * @version 1.0
 */
public class PostgreSQLDelegate extends StdJDBCDelegate {
    /**
     * <p>
     * Create new PostgreSQLDelegate instance.
     * </p>
     *
     * @param log the logger to use during execution
     * @param tablePrefix the prefix of all table names
     */
    public PostgreSQLDelegate(Log log, String tablePrefix,
        String instanceId) {
        super(log, tablePrefix, instanceId);
    }
     
    /**
     * <p>
     * Create new PostgreSQLDelegate instance.
     * </p>
     *
     * @param log the logger to use during execution
     * @param tablePrefix the prefix of all table names
     * @param useProperties use java.util.Properties for storage
     */
    public PostgreSQLDelegate(Log log, String tablePrefix,
        String instanceId, Boolean useProperties) {
        
        super(log, tablePrefix, instanceId, useProperties);
    }
     
    /**
     * <p>
     * This method should be overridden by any delegate subclasses that need
     * special handling for BLOBs. The default implementation uses standard
     * JDBC <code>java.sql.Blob</code> operations.
     * </p>
     *
     * @param rs the result set, already queued to the correct row
     * @param columnName the column name for the BLOB
     * @return the deserialized Object from the ResultSet BLOB
     * @throws ClassNotFoundException 
     *         if a class found during deserialization cannot be found
     * @throws IOException if deserialization causes an error
     */
    @Override
    protected Object getObjectFromBlob(ResultSet rs, String columnName) 
                     throws ClassNotFoundException, IOException, SQLException {
            
        try {
            // Get bytes from the ResultSet BLOB.
            byte[] buffer = getBytes(rs, columnName);
            
            // Get the object from input stream.
            Object object = null;
            
            if (buffer != null && buffer.length != 0) {
                ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
                
                ObjectInputStream ois = new ObjectInputStream(bais);
                try {
                    object = ois.readObject();
                } finally {
                    ois.close();
                }
            }
            
            return object;
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
            
            throw e;
        }
    }
     
    /**
     * <p>
     * This method should be overridden by any delegate subclasses that need
     * special handling for BLOBs for job details. The default implementation
     * uses standard JDBC <code>java.sql.Blob</code> operations.
     * </p>
     *
     * @param rs the result set, already queued to the correct row
     * @param columnName the column name for the BLOB
     * @return the deserialized Object from the ResultSet BLOB
     * @throws ClassNotFoundException
     *         if a class found during deserialization cannot be found
     * @throws IOException if deserialization causes an error
     */
    @Override
    protected Object getJobDetailFromBlob(ResultSet rs, String columnName) 
                     throws ClassNotFoundException, IOException, SQLException {
         
        if (canUseProperties()) {
            try {
                // Get bytes from the ResultSet BLOB.
                byte[] buffer = getBytes(rs, columnName);
                
                if (buffer != null && buffer.length != 0) {
                    return new ByteArrayInputStream(buffer);
                }
                
                return null;
            } catch (SQLException e) {
                logger.error(e.getMessage(), e);
                
                throw e;
            }
        }
        
        return getObjectFromBlob(rs, columnName);
    }
     
    /**
     * Returns a byte array from BLOB
     * 
     * @param rs the result set, already queued to the correct row
     * @param columnName the column name for the BLOB
     * @return byte array from the ResultSet BLOB
     */
    protected byte[] getBytes(ResultSet rs, String columnName) throws SQLException {
        Connection connection = null;
        
        try {
            // Get DB connection.
            connection = rs.getStatement().getConnection();
            
            // Store previous auto-commit state.
            boolean autoCommitState = connection.getAutoCommit();
            
            // All LargeObject API calls must be within a transaction block.
            connection.setAutoCommit(false);
            
            // Get the Large Object Manager to perform operations with.
            LargeObjectManager largeObjectManager = 
                ((PGConnection) connection).getLargeObjectAPI();
            
            // Get OID.
            long oid = rs.getLong(columnName);
            
            // Load LO.
            LargeObject largeObject = 
                largeObjectManager.open(oid, LargeObjectManager.READ);
            
            // Read the data.
            byte buffer[] = new byte[largeObject.size()];
            
            largeObject.read(buffer, 0, largeObject.size());
            
            // Close the object.
            largeObject.close();
            
            // Finally, commit the transaction.
            connection.commit();
            
            // Restore auto-commit state.
            connection.setAutoCommit(autoCommitState);
            
            return buffer;
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
            
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (Exception ignored) {
                }
            }
            
            throw e;
        }
    }
          
    /**
     * Sets the designated parameter to the byte array of the given
     * <code>ByteArrayOutputStream</code>.  Will set parameter value to null if the
     * <code>ByteArrayOutputStream</code> is null.
     * This just wraps <code>{@link PreparedStatement#setBytes(int, byte[])}</code>
     * by default, but it can be overloaded by subclass delegates for databases that
     * don't explicitly support storing bytes in this way.
     */
    @Override
    protected void setBytes(PreparedStatement ps, int index, 
        ByteArrayOutputStream baos) throws SQLException {
     
        Connection connection = null;
      
        try {
            // Get DB connection.
            connection = ps.getConnection();
     
            // Store auto-commit state.
            boolean autoCommitState = connection.getAutoCommit();
     
            // All LargeObject API calls must be within a transaction block.
            connection.setAutoCommit(false);
    
            // Get the Large Object Manager to perform operations with.
            LargeObjectManager largeObjectManager = 
                ((PGConnection) connection).getLargeObjectAPI();
    
            // Create a new large object.
            long oid = largeObjectManager.createLO(
                LargeObjectManager.READ | LargeObjectManager.WRITE
            );
    
            // Open the large object for writing.
            LargeObject largeObject = 
                largeObjectManager.open(oid, LargeObjectManager.WRITE);
     
            // Copy the data from stream to the large object
            largeObject.write((baos == null) ? new byte[0] : baos.toByteArray());
     
            // Close the large object
            largeObject.close();
     
            ps.setLong(index, oid);
     
            // Finally, commit the transaction.
            connection.commit();
            
            // Restore auto-commit state.
            connection.setAutoCommit(autoCommitState);
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
     
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (Exception ignored) {
                }
            }
     
            throw e;
        }
    }
}

Все! Собираем, пакуем, кладем куда нам вздумается (главное, чтобы наше творение попало в CLASSPATH). И напоследок прописываем наш делегат в конфиге quartz.properties:

#==============================
# Configure JobStore
#==============================

org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass 
    = com.tuneit.wf.scheduled.quartz.PostgreSQLDelegate
org.quartz.jobStore.dataSource = WF2Pool
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = false

Enjoy!

Назад Вперед

 

Область моих профессиональных интересов:

  • языки программирования Java, C/C++, C#;
  • базы данных;
  • криптография;
  • разработка корпоративных информационных систем с использованием различных технологий;
  • разработка приложений для устройств с ограниченными возможностями;
  • проектирование программных систем;
  • управление проектами;
  • и многое другое...