Subversion Repositories XServices

Rev

Rev 201 | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

package net.brutex.xservices.util;


import lombok.extern.slf4j.Slf4j;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;

import static org.quartz.TriggerBuilder.newTrigger;

//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.

/**
 * Handle servlet lifecycle actions for the MiscService servlet, such as
 * initializing in-memory database, persist on shutdown etc.
 */


@WebListener
@Slf4j
public class MiscServiceServletContextListener implements ServletContextListener {

    /**
     * SQL initialization for in-memory database
     * INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
     */
    private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";

    private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
    private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
    private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());

    /**
     * Create DB connection pool and initialize the in-memory database with schema.
     *
     * @return connection pool
     */
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
        p.setMaxConnections(256);
        p.setLoginTimeout(20);
        try (Connection c = p.getConnection();){
            Statement s = c.createStatement();
            s.execute(dbinit);
            c.commit();
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
            c.close();
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
        } catch (SQLException e) {
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
            throw new RuntimeException(e);
        }
        return p;
    }

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
        log.trace("contextDestroyed called.");
        try {
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());

            JobKey key = JobKey.jobKey("ALFEmitter");
            JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
            synchronized (scheduler) {
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
                    scheduler.interrupt(key);
                    scheduler.deleteJob(key);
                    log.info("Gracefully stopped the ALFEventEmitter job.");
                }
                if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
                    scheduler.interrupt(cleanerkey);
                    scheduler.deleteJob(cleanerkey);
                    log.info("Gracefully stopped the ALFEventEmitter job.");
                }
                if (!scheduler.isShutdown()) {
                    scheduler.shutdown(true);
                }
            }
        } catch (SchedulerException e) {
            log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
            throw new RuntimeException(e);
        }

        log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
        int act_i = mempool.getActiveConnections();
        if (act_i > 0) {
            log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
        }

        try {
            log.info("Create/Re-open file based database to persist memory database.");
            Connection con = fdbpool.getConnection();
            Statement s = con.createStatement();

            final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
            s.execute(insert);
            int count = s.getUpdateCount();
            log.info("Persisted {} active event rows in file-based database.", count);

            final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
            s.execute(save_all);
            count = s.getUpdateCount();
            log.info("Persisted {} event rows from all_events log in file-based database", count);

            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
            s.execute("SHUTDOWN;");

            con.close();
            log.info("Shutting down databases complete.");
        } catch (SQLException e) {
            log.error("An error occurred during database persistence: {}", e.getMessage());
            throw new RuntimeException(e);
        }
        log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
        log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
    }

    //Run this before web application is started
    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        log.debug("ServletContextListener started");
        ServletContext context = arg0.getServletContext();
        readConfiguration(context);

        context.setAttribute("mdbConnection", mempool);
        context.setAttribute("fdbConnection", fdbpool);
        context.setAttribute("ingres_counter", 0);
        AtomicLong egres = new AtomicLong(0);
        context.setAttribute("egres_counter", egres);
        context.setAttribute("ingres_counter", new AtomicLong(0));
        try {
            StdSchedulerFactory fact = new StdSchedulerFactory();
            fact.initialize("MiscServicesScheduler-quartz.properties");
            Scheduler scheduler = fact.getScheduler();
            scheduler.start();
            context.setAttribute("scheduler", scheduler);
        } catch (SchedulerException e) {
            log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
            throw new RuntimeException(e);
        }

        //Load events from file based database into in-memory database
        try {
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
            final String link = getLinkSQL();
            final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
            int count = 0;
            Connection con = fdbpool.getConnection();
            con.setAutoCommit(false);
            Statement statement = con.createStatement();
            statement.execute(link);
            con.commit();
            ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
            if(rs.next()) count = rs.getInt(1);
            statement.execute(recoverSQL);
            log.info("Recovered {} events and loaded them into in-memory database.", count);
            statement.execute(truncate);
            con.commit();
            con.close();
        } catch (SQLException e) {
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
            throw new RuntimeException(e);
        }
        //Start initial run of the emitter
        if(configuration.isEmitterActive()) {
            startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
        }

        //Start mem db log cleaner
        if(configuration.getCleaner_interval()>0) {
            startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
        }
    }

    private String getLinkSQL() {
        final String dbDriverClass = "org.h2.Driver";
        final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
                            "";
        return link;
    }

    private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
        try {
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
                job2.getJobDataMap().put("mdbConnection", mempool);
                job2.getJobDataMap().put("fdbConnection", fdbpool);
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
                job2.getJobDataMap().put("egres_counter", egres);
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
                scheduler.scheduleJob(job2, t);
            }
        } catch (SchedulerException ex) {
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
        }
    }

    private void startEventLogCleaner(Scheduler scheduler) {
        try {
            if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
                JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
                job2.getJobDataMap().put("mdbConnection", mempool);
                job2.getJobDataMap().put("fdbConnection", fdbpool);
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
                        .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                                .withIntervalInMinutes(configuration.getCleaner_interval())
                                .repeatForever())
                        .startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
                        .build();
                scheduler.scheduleJob(job2, t);
            }
        } catch (SchedulerException ex) {
            log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
        }
    }

    private void readConfiguration(ServletContext ctx) {
                /* Configure ServletContext attributes using configuration object*/
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
                ctx.setAttribute(EventmanagerConfiguration.KEY, c);
    }

}

Generated by GNU Enscript 1.6.5.90.