Subversion Repositories XServices

Rev

Rev 201 | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

package net.brutex.xservices.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
@DisallowConcurrentExecution
public class EventLogCleanerJob implements Job, InterruptableJob {

    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        final Instant d = Instant.now();
        final long ts = d.toEpochMilli();

        log.info("EventLogCleaner is executing now.");
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
                .get(EventmanagerConfiguration.KEY);

        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
        final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");

        final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
                "where btx_timestamp < " + (ts-5000) + " ";
        final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
        final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);


        /**
         * Move event table data to all events log
         */


        try (Connection fcon = fpool.getConnection();
            Connection mcon = mpool.getConnection()){
            fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
            fcon.setAutoCommit(false);
            Statement stmt = fcon.createStatement();
            stmt.execute(moveSQL);
            int count = stmt.getUpdateCount();
            fcon.commit();
            Statement mstm = mcon.createStatement();


            mstm.execute(deleteMemTable);
            int count2 = mstm.getUpdateCount();
            mcon.commit();

            log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);

        } catch (SQLException e) {
            log.error("Exception in SQL execution: {}", e.getMessage());
            throw new JobExecutionException(e);
        }
    }


        /**
     * <p>
     * Called by the <code>{@link Scheduler}</code> when a user
     * interrupts the <code>Job</code>.
     * </p>
     *
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
     */
    @Override
    public synchronized void interrupt() throws UnableToInterruptJobException {
        isInterrupted.set(true);
        log.warn("EventLogCleaner received and interrupt.");
        Thread.currentThread().interrupt();
    }
}