Subversion Repositories XServices

Rev

Rev 201 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
201 brianR 1
package net.brutex.xservices.util;
2
 
3
 
4
import lombok.extern.slf4j.Slf4j;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
7
import org.quartz.impl.StdSchedulerFactory;
8
 
9
import javax.servlet.ServletContext;
10
import javax.servlet.ServletContextEvent;
11
import javax.servlet.ServletContextListener;
12
import javax.servlet.annotation.WebListener;
13
import java.sql.Connection;
14
import java.sql.ResultSet;
15
import java.sql.SQLException;
16
import java.sql.Statement;
17
import java.time.Instant;
203 brianR 18
import java.time.temporal.ChronoUnit;
19
import java.util.Date;
201 brianR 20
import java.util.concurrent.atomic.AtomicLong;
21
 
22
import static org.quartz.TriggerBuilder.newTrigger;
23
 
24
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
25
 
26
/**
27
 * Handle servlet lifecycle actions for the MiscService servlet, such as
28
 * initializing in-memory database, persist on shutdown etc.
29
 */
30
 
31
 
32
@WebListener
33
@Slf4j
34
public class MiscServiceServletContextListener implements ServletContextListener {
35
 
36
    /**
37
     * SQL initialization for in-memory database
38
     * INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
39
     */
40
    private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
41
 
42
    private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
43
    private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
44
    private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
45
 
46
    /**
47
     * Create DB connection pool and initialize the in-memory database with schema.
48
     *
49
     * @return connection pool
50
     */
51
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
52
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
203 brianR 53
        p.setMaxConnections(256);
54
        p.setLoginTimeout(20);
55
        try (Connection c = p.getConnection();){
201 brianR 56
            Statement s = c.createStatement();
57
            s.execute(dbinit);
203 brianR 58
            c.commit();
201 brianR 59
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
60
            c.close();
61
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
62
        } catch (SQLException e) {
63
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
64
            throw new RuntimeException(e);
65
        }
66
        return p;
67
    }
68
 
69
    @Override
70
    public void contextDestroyed(ServletContextEvent arg0) {
71
        log.trace("contextDestroyed called.");
72
        try {
73
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
74
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
75
 
76
            JobKey key = JobKey.jobKey("ALFEmitter");
203 brianR 77
            JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
201 brianR 78
            synchronized (scheduler) {
79
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
80
                    scheduler.interrupt(key);
81
                    scheduler.deleteJob(key);
82
                    log.info("Gracefully stopped the ALFEventEmitter job.");
83
                }
203 brianR 84
                if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
85
                    scheduler.interrupt(cleanerkey);
86
                    scheduler.deleteJob(cleanerkey);
87
                    log.info("Gracefully stopped the ALFEventEmitter job.");
88
                }
201 brianR 89
                if (!scheduler.isShutdown()) {
90
                    scheduler.shutdown(true);
91
                }
92
            }
93
        } catch (SchedulerException e) {
94
            log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
95
            throw new RuntimeException(e);
96
        }
97
 
98
        log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
99
        int act_i = mempool.getActiveConnections();
100
        if (act_i > 0) {
101
            log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
102
        }
103
 
104
        try {
105
            log.info("Create/Re-open file based database to persist memory database.");
106
            Connection con = fdbpool.getConnection();
107
            Statement s = con.createStatement();
108
 
203 brianR 109
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
201 brianR 110
            s.execute(insert);
111
            int count = s.getUpdateCount();
203 brianR 112
            log.info("Persisted {} active event rows in file-based database.", count);
113
 
114
            final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
115
            s.execute(save_all);
116
            count = s.getUpdateCount();
117
            log.info("Persisted {} event rows from all_events log in file-based database", count);
118
 
201 brianR 119
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
120
            s.execute("SHUTDOWN;");
203 brianR 121
 
201 brianR 122
            con.close();
123
            log.info("Shutting down databases complete.");
124
        } catch (SQLException e) {
125
            log.error("An error occurred during database persistence: {}", e.getMessage());
126
            throw new RuntimeException(e);
127
        }
128
        log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
129
        log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
130
    }
131
 
132
    //Run this before web application is started
133
    @Override
134
    public void contextInitialized(ServletContextEvent arg0) {
135
        log.debug("ServletContextListener started");
136
        ServletContext context = arg0.getServletContext();
137
        readConfiguration(context);
138
 
139
        context.setAttribute("mdbConnection", mempool);
140
        context.setAttribute("fdbConnection", fdbpool);
141
        context.setAttribute("ingres_counter", 0);
142
        AtomicLong egres = new AtomicLong(0);
143
        context.setAttribute("egres_counter", egres);
144
        context.setAttribute("ingres_counter", new AtomicLong(0));
145
        try {
146
            StdSchedulerFactory fact = new StdSchedulerFactory();
147
            fact.initialize("MiscServicesScheduler-quartz.properties");
148
            Scheduler scheduler = fact.getScheduler();
149
            scheduler.start();
150
            context.setAttribute("scheduler", scheduler);
151
        } catch (SchedulerException e) {
152
            log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
153
            throw new RuntimeException(e);
154
        }
155
 
156
        //Load events from file based database into in-memory database
157
        try {
158
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
203 brianR 159
            final String link = getLinkSQL();
160
            final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
201 brianR 161
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
162
            int count = 0;
163
            Connection con = fdbpool.getConnection();
164
            con.setAutoCommit(false);
165
            Statement statement = con.createStatement();
166
            statement.execute(link);
167
            con.commit();
168
            ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
169
            if(rs.next()) count = rs.getInt(1);
170
            statement.execute(recoverSQL);
171
            log.info("Recovered {} events and loaded them into in-memory database.", count);
172
            statement.execute(truncate);
173
            con.commit();
174
            con.close();
175
        } catch (SQLException e) {
176
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
177
            throw new RuntimeException(e);
178
        }
179
        //Start initial run of the emitter
203 brianR 180
        if(configuration.isEmitterActive()) {
181
            startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
182
        }
183
 
184
        //Start mem db log cleaner
185
        if(configuration.getCleaner_interval()>0) {
186
            startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
187
        }
201 brianR 188
    }
189
 
203 brianR 190
    private String getLinkSQL() {
191
        final String dbDriverClass = "org.h2.Driver";
192
        final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
193
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
194
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
195
                            "";
196
        return link;
197
    }
198
 
199
    private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
201 brianR 200
        try {
201
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
202
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
203
                job2.getJobDataMap().put("mdbConnection", mempool);
204
                job2.getJobDataMap().put("fdbConnection", fdbpool);
205
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
203 brianR 206
                job2.getJobDataMap().put("egres_counter", egres);
201 brianR 207
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
208
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
209
                scheduler.scheduleJob(job2, t);
210
            }
211
        } catch (SchedulerException ex) {
212
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
213
        }
214
    }
215
 
203 brianR 216
    private void startEventLogCleaner(Scheduler scheduler) {
217
        try {
218
            if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
219
                JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
220
                job2.getJobDataMap().put("mdbConnection", mempool);
221
                job2.getJobDataMap().put("fdbConnection", fdbpool);
222
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
223
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
224
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
225
                        .withSchedule(SimpleScheduleBuilder.simpleSchedule()
226
                                .withIntervalInMinutes(configuration.getCleaner_interval())
227
                                .repeatForever())
228
                        .startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
229
                        .build();
230
                scheduler.scheduleJob(job2, t);
231
            }
232
        } catch (SchedulerException ex) {
233
            log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
234
        }
235
    }
236
 
201 brianR 237
    private void readConfiguration(ServletContext ctx) {
238
                /* Configure ServletContext attributes using configuration object*/
239
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
240
                ctx.setAttribute(EventmanagerConfiguration.KEY, c);
241
    }
242
 
243
}