Subversion Repositories XServices

Rev

Rev 201 | Only display areas with differences | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 201 Rev 203
1
package net.brutex.xservices.util;
1
package net.brutex.xservices.util;
2
 
2
 
3
 
3
 
4
import lombok.extern.slf4j.Slf4j;
4
import lombok.extern.slf4j.Slf4j;
5
import org.h2.jdbcx.JdbcConnectionPool;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
6
import org.quartz.*;
7
import org.quartz.impl.StdSchedulerFactory;
7
import org.quartz.impl.StdSchedulerFactory;
8
 
8
 
9
import javax.servlet.ServletContext;
9
import javax.servlet.ServletContext;
10
import javax.servlet.ServletContextEvent;
10
import javax.servlet.ServletContextEvent;
11
import javax.servlet.ServletContextListener;
11
import javax.servlet.ServletContextListener;
12
import javax.servlet.annotation.WebListener;
12
import javax.servlet.annotation.WebListener;
13
import java.sql.Connection;
13
import java.sql.Connection;
14
import java.sql.ResultSet;
14
import java.sql.ResultSet;
15
import java.sql.SQLException;
15
import java.sql.SQLException;
16
import java.sql.Statement;
16
import java.sql.Statement;
17
import java.time.Instant;
17
import java.time.Instant;
-
 
18
import java.time.temporal.ChronoUnit;
-
 
19
import java.util.Date;
18
import java.util.concurrent.atomic.AtomicLong;
20
import java.util.concurrent.atomic.AtomicLong;
19
 
21
 
20
import static org.quartz.TriggerBuilder.newTrigger;
22
import static org.quartz.TriggerBuilder.newTrigger;
21
 
23
 
22
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
24
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
23
 
25
 
24
/**
26
/**
25
 * Handle servlet lifecycle actions for the MiscService servlet, such as
27
 * Handle servlet lifecycle actions for the MiscService servlet, such as
26
 * initializing in-memory database, persist on shutdown etc.
28
 * initializing in-memory database, persist on shutdown etc.
27
 */
29
 */
28
 
30
 
29
 
31
 
30
@WebListener
32
@WebListener
31
@Slf4j
33
@Slf4j
32
public class MiscServiceServletContextListener implements ServletContextListener {
34
public class MiscServiceServletContextListener implements ServletContextListener {
33
 
35
 
34
    /**
36
    /**
35
     * SQL initialization for in-memory database
37
     * SQL initialization for in-memory database
36
     * INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
38
     * INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
37
     */
39
     */
38
    private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
40
    private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
39
 
41
 
40
    private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
42
    private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
41
    private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
43
    private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
42
    private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
44
    private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
43
 
45
 
44
    /**
46
    /**
45
     * Create DB connection pool and initialize the in-memory database with schema.
47
     * Create DB connection pool and initialize the in-memory database with schema.
46
     *
48
     *
47
     * @return connection pool
49
     * @return connection pool
48
     */
50
     */
49
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
51
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
50
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
52
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
51
        p.setMaxConnections(16);
53
        p.setMaxConnections(256);
52
        p.setLoginTimeout(5);
54
        p.setLoginTimeout(20);
53
        try {
-
 
54
            Connection c = p.getConnection();
55
        try (Connection c = p.getConnection();){
55
            Statement s = c.createStatement();
56
            Statement s = c.createStatement();
56
            s.execute(dbinit);
57
            s.execute(dbinit);
-
 
58
            c.commit();
57
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
59
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
58
            c.close();
60
            c.close();
59
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
61
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
60
        } catch (SQLException e) {
62
        } catch (SQLException e) {
61
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
63
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
62
            throw new RuntimeException(e);
64
            throw new RuntimeException(e);
63
        }
65
        }
64
        return p;
66
        return p;
65
    }
67
    }
66
 
68
 
67
    @Override
69
    @Override
68
    public void contextDestroyed(ServletContextEvent arg0) {
70
    public void contextDestroyed(ServletContextEvent arg0) {
69
        log.trace("contextDestroyed called.");
71
        log.trace("contextDestroyed called.");
70
        try {
72
        try {
71
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
73
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
72
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
74
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
73
 
75
 
74
            JobKey key = JobKey.jobKey("ALFEmitter");
76
            JobKey key = JobKey.jobKey("ALFEmitter");
-
 
77
            JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
75
            synchronized (scheduler) {
78
            synchronized (scheduler) {
76
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
79
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
77
                    scheduler.interrupt(key);
80
                    scheduler.interrupt(key);
78
                    scheduler.deleteJob(key);
81
                    scheduler.deleteJob(key);
79
                    log.info("Gracefully stopped the ALFEventEmitter job.");
82
                    log.info("Gracefully stopped the ALFEventEmitter job.");
80
                }
83
                }
-
 
84
                if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
-
 
85
                    scheduler.interrupt(cleanerkey);
-
 
86
                    scheduler.deleteJob(cleanerkey);
-
 
87
                    log.info("Gracefully stopped the ALFEventEmitter job.");
-
 
88
                }
81
                if (!scheduler.isShutdown()) {
89
                if (!scheduler.isShutdown()) {
82
                    scheduler.shutdown(true);
90
                    scheduler.shutdown(true);
83
                }
91
                }
84
            }
92
            }
85
        } catch (SchedulerException e) {
93
        } catch (SchedulerException e) {
86
            log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
94
            log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
87
            throw new RuntimeException(e);
95
            throw new RuntimeException(e);
88
        }
96
        }
89
 
97
 
90
        log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
98
        log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
91
        int act_i = mempool.getActiveConnections();
99
        int act_i = mempool.getActiveConnections();
92
        if (act_i > 0) {
100
        if (act_i > 0) {
93
            log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
101
            log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
94
        }
102
        }
95
 
103
 
96
        try {
104
        try {
97
            log.info("Create/Re-open file based database to persist memory database.");
105
            log.info("Create/Re-open file based database to persist memory database.");
98
            Connection con = fdbpool.getConnection();
106
            Connection con = fdbpool.getConnection();
99
            Statement s = con.createStatement();
107
            Statement s = con.createStatement();
100
 
108
 
101
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
109
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
102
            s.execute(insert);
110
            s.execute(insert);
103
            int count = s.getUpdateCount();
111
            int count = s.getUpdateCount();
-
 
112
            log.info("Persisted {} active event rows in file-based database.", count);
-
 
113
 
-
 
114
            final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
-
 
115
            s.execute(save_all);
-
 
116
            count = s.getUpdateCount();
-
 
117
            log.info("Persisted {} event rows from all_events log in file-based database", count);
104
            log.info("Persisted {} rows in file-based database.", count);
118
 
105
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
119
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
-
 
120
            s.execute("SHUTDOWN;");
106
            s.execute("SHUTDOWN;");
121
 
107
            con.close();
122
            con.close();
108
            log.info("Shutting down databases complete.");
123
            log.info("Shutting down databases complete.");
109
        } catch (SQLException e) {
124
        } catch (SQLException e) {
110
            log.error("An error occurred during database persistence: {}", e.getMessage());
125
            log.error("An error occurred during database persistence: {}", e.getMessage());
111
            throw new RuntimeException(e);
126
            throw new RuntimeException(e);
112
        }
127
        }
113
        log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
128
        log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
114
        log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
129
        log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
115
    }
130
    }
116
 
131
 
117
    //Run this before web application is started
132
    //Run this before web application is started
118
    @Override
133
    @Override
119
    public void contextInitialized(ServletContextEvent arg0) {
134
    public void contextInitialized(ServletContextEvent arg0) {
120
        log.debug("ServletContextListener started");
135
        log.debug("ServletContextListener started");
121
        ServletContext context = arg0.getServletContext();
136
        ServletContext context = arg0.getServletContext();
122
        readConfiguration(context);
137
        readConfiguration(context);
123
 
138
 
124
        context.setAttribute("mdbConnection", mempool);
139
        context.setAttribute("mdbConnection", mempool);
125
        context.setAttribute("fdbConnection", fdbpool);
140
        context.setAttribute("fdbConnection", fdbpool);
126
        context.setAttribute("ingres_counter", 0);
141
        context.setAttribute("ingres_counter", 0);
127
        AtomicLong egres = new AtomicLong(0);
142
        AtomicLong egres = new AtomicLong(0);
128
        context.setAttribute("egres_counter", egres);
143
        context.setAttribute("egres_counter", egres);
129
        context.setAttribute("ingres_counter", new AtomicLong(0));
144
        context.setAttribute("ingres_counter", new AtomicLong(0));
130
        try {
145
        try {
131
            StdSchedulerFactory fact = new StdSchedulerFactory();
146
            StdSchedulerFactory fact = new StdSchedulerFactory();
132
            fact.initialize("MiscServicesScheduler-quartz.properties");
147
            fact.initialize("MiscServicesScheduler-quartz.properties");
133
            Scheduler scheduler = fact.getScheduler();
148
            Scheduler scheduler = fact.getScheduler();
134
            scheduler.start();
149
            scheduler.start();
135
            context.setAttribute("scheduler", scheduler);
150
            context.setAttribute("scheduler", scheduler);
136
        } catch (SchedulerException e) {
151
        } catch (SchedulerException e) {
137
            log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
152
            log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
138
            throw new RuntimeException(e);
153
            throw new RuntimeException(e);
139
        }
154
        }
140
 
155
 
141
        //Load events from file based database into in-memory database
156
        //Load events from file based database into in-memory database
142
        try {
157
        try {
143
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
158
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
144
            final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
159
            final String link = getLinkSQL();
145
                                "CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
-
 
146
            final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
160
            final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
147
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
161
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
148
            int count = 0;
162
            int count = 0;
149
            Connection con = fdbpool.getConnection();
163
            Connection con = fdbpool.getConnection();
150
            con.setAutoCommit(false);
164
            con.setAutoCommit(false);
151
            Statement statement = con.createStatement();
165
            Statement statement = con.createStatement();
152
            statement.execute(link);
166
            statement.execute(link);
153
            con.commit();
167
            con.commit();
154
            ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
168
            ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
155
            if(rs.next()) count = rs.getInt(1);
169
            if(rs.next()) count = rs.getInt(1);
156
            statement.execute(recoverSQL);
170
            statement.execute(recoverSQL);
157
            log.info("Recovered {} events and loaded them into in-memory database.", count);
171
            log.info("Recovered {} events and loaded them into in-memory database.", count);
158
            statement.execute(truncate);
172
            statement.execute(truncate);
159
            con.commit();
173
            con.commit();
160
            con.close();
174
            con.close();
161
        } catch (SQLException e) {
175
        } catch (SQLException e) {
162
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
176
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
163
            throw new RuntimeException(e);
177
            throw new RuntimeException(e);
164
        }
178
        }
165
        //Start initial run of the emitter
179
        //Start initial run of the emitter
-
 
180
        if(configuration.isEmitterActive()) {
166
        startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
181
            startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
-
 
182
        }
-
 
183
 
-
 
184
        //Start mem db log cleaner
-
 
185
        if(configuration.getCleaner_interval()>0) {
-
 
186
            startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
-
 
187
        }
-
 
188
    }
-
 
189
 
-
 
190
    private String getLinkSQL() {
-
 
191
        final String dbDriverClass = "org.h2.Driver";
-
 
192
        final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
-
 
193
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
-
 
194
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
-
 
195
                            "";
-
 
196
        return link;
167
    }
197
    }
168
 
198
 
169
    private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
199
    private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
170
        try {
200
        try {
171
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
201
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
172
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
202
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
173
                job2.getJobDataMap().put("mdbConnection", mempool);
203
                job2.getJobDataMap().put("mdbConnection", mempool);
174
                job2.getJobDataMap().put("fdbConnection", fdbpool);
204
                job2.getJobDataMap().put("fdbConnection", fdbpool);
175
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
205
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
176
                job2.getJobDataMap().put("egres_counter", egres_counter);
206
                job2.getJobDataMap().put("egres_counter", egres);
177
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
207
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
178
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
208
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
179
                scheduler.scheduleJob(job2, t);
209
                scheduler.scheduleJob(job2, t);
180
            }
210
            }
181
        } catch (SchedulerException ex) {
211
        } catch (SchedulerException ex) {
182
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
212
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
183
        }
213
        }
-
 
214
    }
-
 
215
 
-
 
216
    private void startEventLogCleaner(Scheduler scheduler) {
-
 
217
        try {
-
 
218
            if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
-
 
219
                JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
-
 
220
                job2.getJobDataMap().put("mdbConnection", mempool);
-
 
221
                job2.getJobDataMap().put("fdbConnection", fdbpool);
-
 
222
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
-
 
223
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
-
 
224
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
-
 
225
                        .withSchedule(SimpleScheduleBuilder.simpleSchedule()
-
 
226
                                .withIntervalInMinutes(configuration.getCleaner_interval())
-
 
227
                                .repeatForever())
-
 
228
                        .startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
-
 
229
                        .build();
-
 
230
                scheduler.scheduleJob(job2, t);
-
 
231
            }
-
 
232
        } catch (SchedulerException ex) {
-
 
233
            log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
-
 
234
        }
184
    }
235
    }
185
 
236
 
186
    private void readConfiguration(ServletContext ctx) {
237
    private void readConfiguration(ServletContext ctx) {
187
                /* Configure ServletContext attributes using configuration object*/
238
                /* Configure ServletContext attributes using configuration object*/
188
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
239
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
189
                ctx.setAttribute(EventmanagerConfiguration.KEY, c);
240
                ctx.setAttribute(EventmanagerConfiguration.KEY, c);
190
    }
241
    }
191
 
242
 
192
}
243
}
193
 
244
 
194
Generated by GNU Enscript 1.6.5.90.
245
Generated by GNU Enscript 1.6.5.90.
195
 
246
 
196
 
247