Subversion Repositories XServices

Compare Revisions

Ignore whitespace Rev 200 → Rev 201

/xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java
0,0 → 1,192
package net.brutex.xservices.util;
 
 
import lombok.extern.slf4j.Slf4j;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
 
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
 
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
 
/**
* Handle servlet lifecycle actions for the MiscService servlet, such as
* initializing in-memory database, persist on shutdown etc.
*/
 
 
@WebListener
@Slf4j
public class MiscServiceServletContextListener implements ServletContextListener {
 
/**
* SQL initialization for in-memory database
* INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
*/
private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
 
private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
 
/**
* Create DB connection pool and initialize the in-memory database with schema.
*
* @return connection pool
*/
private static JdbcConnectionPool getDbPool(String dbConnectString) {
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
p.setMaxConnections(16);
p.setLoginTimeout(5);
try {
Connection c = p.getConnection();
Statement s = c.createStatement();
s.execute(dbinit);
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
c.close();
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
} catch (SQLException e) {
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
throw new RuntimeException(e);
}
return p;
}
 
@Override
public void contextDestroyed(ServletContextEvent arg0) {
log.trace("contextDestroyed called.");
try {
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
 
JobKey key = JobKey.jobKey("ALFEmitter");
synchronized (scheduler) {
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
scheduler.interrupt(key);
scheduler.deleteJob(key);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown()) {
scheduler.shutdown(true);
}
}
} catch (SchedulerException e) {
log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
throw new RuntimeException(e);
}
 
log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
int act_i = mempool.getActiveConnections();
if (act_i > 0) {
log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
}
 
try {
log.info("Create/Re-open file based database to persist memory database.");
Connection con = fdbpool.getConnection();
Statement s = con.createStatement();
 
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
s.execute(insert);
int count = s.getUpdateCount();
log.info("Persisted {} rows in file-based database.", count);
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
s.execute("SHUTDOWN;");
con.close();
log.info("Shutting down databases complete.");
} catch (SQLException e) {
log.error("An error occurred during database persistence: {}", e.getMessage());
throw new RuntimeException(e);
}
log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
}
 
//Run this before web application is started
@Override
public void contextInitialized(ServletContextEvent arg0) {
log.debug("ServletContextListener started");
ServletContext context = arg0.getServletContext();
readConfiguration(context);
 
context.setAttribute("mdbConnection", mempool);
context.setAttribute("fdbConnection", fdbpool);
context.setAttribute("ingres_counter", 0);
AtomicLong egres = new AtomicLong(0);
context.setAttribute("egres_counter", egres);
context.setAttribute("ingres_counter", new AtomicLong(0));
try {
StdSchedulerFactory fact = new StdSchedulerFactory();
fact.initialize("MiscServicesScheduler-quartz.properties");
Scheduler scheduler = fact.getScheduler();
scheduler.start();
context.setAttribute("scheduler", scheduler);
} catch (SchedulerException e) {
log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
throw new RuntimeException(e);
}
 
//Load events from file based database into in-memory database
try {
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
int count = 0;
Connection con = fdbpool.getConnection();
con.setAutoCommit(false);
Statement statement = con.createStatement();
statement.execute(link);
con.commit();
ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
if(rs.next()) count = rs.getInt(1);
statement.execute(recoverSQL);
log.info("Recovered {} events and loaded them into in-memory database.", count);
statement.execute(truncate);
con.commit();
con.close();
} catch (SQLException e) {
log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
throw new RuntimeException(e);
}
//Start initial run of the emitter
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
}
 
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put("egres_counter", egres_counter);
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
scheduler.scheduleJob(job2, t);
}
} catch (SchedulerException ex) {
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
}
}
 
private void readConfiguration(ServletContext ctx) {
/* Configure ServletContext attributes using configuration object*/
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
ctx.setAttribute(EventmanagerConfiguration.KEY, c);
}
 
}