Subversion Repositories XServices

Compare Revisions

Ignore whitespace Rev 202 → Rev 203

/xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java
15,6 → 15,8
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
48,12 → 50,12
*/
private static JdbcConnectionPool getDbPool(String dbConnectString) {
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
p.setMaxConnections(16);
p.setLoginTimeout(5);
try {
Connection c = p.getConnection();
p.setMaxConnections(256);
p.setLoginTimeout(20);
try (Connection c = p.getConnection();){
Statement s = c.createStatement();
s.execute(dbinit);
c.commit();
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
c.close();
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
72,6 → 74,7
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
 
JobKey key = JobKey.jobKey("ALFEmitter");
JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
synchronized (scheduler) {
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
scheduler.interrupt(key);
78,6 → 81,11
scheduler.deleteJob(key);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
scheduler.interrupt(cleanerkey);
scheduler.deleteJob(cleanerkey);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown()) {
scheduler.shutdown(true);
}
98,12 → 106,19
Connection con = fdbpool.getConnection();
Statement s = con.createStatement();
 
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
s.execute(insert);
int count = s.getUpdateCount();
log.info("Persisted {} rows in file-based database.", count);
log.info("Persisted {} active event rows in file-based database.", count);
 
final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
s.execute(save_all);
count = s.getUpdateCount();
log.info("Persisted {} event rows from all_events log in file-based database", count);
 
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
s.execute("SHUTDOWN;");
 
con.close();
log.info("Shutting down databases complete.");
} catch (SQLException e) {
141,9 → 156,8
//Load events from file based database into in-memory database
try {
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
final String link = getLinkSQL();
final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
int count = 0;
Connection con = fdbpool.getConnection();
163,10 → 177,26
throw new RuntimeException(e);
}
//Start initial run of the emitter
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
if(configuration.isEmitterActive()) {
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
}
 
//Start mem db log cleaner
if(configuration.getCleaner_interval()>0) {
startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
}
}
 
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
private String getLinkSQL() {
final String dbDriverClass = "org.h2.Driver";
final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
"CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
"";
return link;
}
 
private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
173,7 → 203,7
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put("egres_counter", egres_counter);
job2.getJobDataMap().put("egres_counter", egres);
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
scheduler.scheduleJob(job2, t);
183,6 → 213,27
}
}
 
private void startEventLogCleaner(Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(configuration.getCleaner_interval())
.repeatForever())
.startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
.build();
scheduler.scheduleJob(job2, t);
}
} catch (SchedulerException ex) {
log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
}
}
 
private void readConfiguration(ServletContext ctx) {
/* Configure ServletContext attributes using configuration object*/
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();