Subversion Repositories XServices

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
201 brianR 1
package net.brutex.xservices.util;
2
 
3
 
4
import lombok.extern.slf4j.Slf4j;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
7
import org.quartz.impl.StdSchedulerFactory;
8
 
9
import javax.servlet.ServletContext;
10
import javax.servlet.ServletContextEvent;
11
import javax.servlet.ServletContextListener;
12
import javax.servlet.annotation.WebListener;
13
import java.sql.Connection;
14
import java.sql.ResultSet;
15
import java.sql.SQLException;
16
import java.sql.Statement;
17
import java.time.Instant;
18
import java.util.concurrent.atomic.AtomicLong;
19
 
20
import static org.quartz.TriggerBuilder.newTrigger;
21
 
22
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
23
 
24
/**
25
 * Handle servlet lifecycle actions for the MiscService servlet, such as
26
 * initializing in-memory database, persist on shutdown etc.
27
 */
28
 
29
 
30
@WebListener
31
@Slf4j
32
public class MiscServiceServletContextListener implements ServletContextListener {
33
 
34
    /**
35
     * SQL initialization for in-memory database
36
     * INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
37
     */
38
    private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
39
 
40
    private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
41
    private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
42
    private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
43
 
44
    /**
45
     * Create DB connection pool and initialize the in-memory database with schema.
46
     *
47
     * @return connection pool
48
     */
49
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
50
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
51
        p.setMaxConnections(16);
52
        p.setLoginTimeout(5);
53
        try {
54
            Connection c = p.getConnection();
55
            Statement s = c.createStatement();
56
            s.execute(dbinit);
57
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
58
            c.close();
59
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
60
        } catch (SQLException e) {
61
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
62
            throw new RuntimeException(e);
63
        }
64
        return p;
65
    }
66
 
67
    @Override
68
    public void contextDestroyed(ServletContextEvent arg0) {
69
        log.trace("contextDestroyed called.");
70
        try {
71
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
72
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
73
 
74
            JobKey key = JobKey.jobKey("ALFEmitter");
75
            synchronized (scheduler) {
76
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
77
                    scheduler.interrupt(key);
78
                    scheduler.deleteJob(key);
79
                    log.info("Gracefully stopped the ALFEventEmitter job.");
80
                }
81
                if (!scheduler.isShutdown()) {
82
                    scheduler.shutdown(true);
83
                }
84
            }
85
        } catch (SchedulerException e) {
86
            log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
87
            throw new RuntimeException(e);
88
        }
89
 
90
        log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
91
        int act_i = mempool.getActiveConnections();
92
        if (act_i > 0) {
93
            log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
94
        }
95
 
96
        try {
97
            log.info("Create/Re-open file based database to persist memory database.");
98
            Connection con = fdbpool.getConnection();
99
            Statement s = con.createStatement();
100
 
101
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
102
            s.execute(insert);
103
            int count = s.getUpdateCount();
104
            log.info("Persisted {} rows in file-based database.", count);
105
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
106
            s.execute("SHUTDOWN;");
107
            con.close();
108
            log.info("Shutting down databases complete.");
109
        } catch (SQLException e) {
110
            log.error("An error occurred during database persistence: {}", e.getMessage());
111
            throw new RuntimeException(e);
112
        }
113
        log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
114
        log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
115
    }
116
 
117
    //Run this before web application is started
118
    @Override
119
    public void contextInitialized(ServletContextEvent arg0) {
120
        log.debug("ServletContextListener started");
121
        ServletContext context = arg0.getServletContext();
122
        readConfiguration(context);
123
 
124
        context.setAttribute("mdbConnection", mempool);
125
        context.setAttribute("fdbConnection", fdbpool);
126
        context.setAttribute("ingres_counter", 0);
127
        AtomicLong egres = new AtomicLong(0);
128
        context.setAttribute("egres_counter", egres);
129
        context.setAttribute("ingres_counter", new AtomicLong(0));
130
        try {
131
            StdSchedulerFactory fact = new StdSchedulerFactory();
132
            fact.initialize("MiscServicesScheduler-quartz.properties");
133
            Scheduler scheduler = fact.getScheduler();
134
            scheduler.start();
135
            context.setAttribute("scheduler", scheduler);
136
        } catch (SchedulerException e) {
137
            log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
138
            throw new RuntimeException(e);
139
        }
140
 
141
        //Load events from file based database into in-memory database
142
        try {
143
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
144
            final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
145
                                "CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
146
            final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
147
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
148
            int count = 0;
149
            Connection con = fdbpool.getConnection();
150
            con.setAutoCommit(false);
151
            Statement statement = con.createStatement();
152
            statement.execute(link);
153
            con.commit();
154
            ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
155
            if(rs.next()) count = rs.getInt(1);
156
            statement.execute(recoverSQL);
157
            log.info("Recovered {} events and loaded them into in-memory database.", count);
158
            statement.execute(truncate);
159
            con.commit();
160
            con.close();
161
        } catch (SQLException e) {
162
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
163
            throw new RuntimeException(e);
164
        }
165
        //Start initial run of the emitter
166
        startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
167
    }
168
 
169
    private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
170
        try {
171
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
172
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
173
                job2.getJobDataMap().put("mdbConnection", mempool);
174
                job2.getJobDataMap().put("fdbConnection", fdbpool);
175
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
176
                job2.getJobDataMap().put("egres_counter", egres_counter);
177
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
178
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
179
                scheduler.scheduleJob(job2, t);
180
            }
181
        } catch (SchedulerException ex) {
182
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
183
        }
184
    }
185
 
186
    private void readConfiguration(ServletContext ctx) {
187
                /* Configure ServletContext attributes using configuration object*/
188
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
189
                ctx.setAttribute(EventmanagerConfiguration.KEY, c);
190
    }
191
 
192
}