Subversion Repositories XServices

Rev

Rev 201 | Show entire file | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 201 Rev 203
Line 13... Line 13...
13
import java.sql.Connection;
13
import java.sql.Connection;
14
import java.sql.ResultSet;
14
import java.sql.ResultSet;
15
import java.sql.SQLException;
15
import java.sql.SQLException;
16
import java.sql.Statement;
16
import java.sql.Statement;
17
import java.time.Instant;
17
import java.time.Instant;
-
 
18
import java.time.temporal.ChronoUnit;
-
 
19
import java.util.Date;
18
import java.util.concurrent.atomic.AtomicLong;
20
import java.util.concurrent.atomic.AtomicLong;
Line 19... Line 21...
19
 
21
 
Line 20... Line 22...
20
import static org.quartz.TriggerBuilder.newTrigger;
22
import static org.quartz.TriggerBuilder.newTrigger;
Line 46... Line 48...
46
     *
48
     *
47
     * @return connection pool
49
     * @return connection pool
48
     */
50
     */
49
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
51
    private static JdbcConnectionPool getDbPool(String dbConnectString) {
50
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
52
        JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
51
        p.setMaxConnections(16);
53
        p.setMaxConnections(256);
52
        p.setLoginTimeout(5);
54
        p.setLoginTimeout(20);
53
        try {
-
 
54
            Connection c = p.getConnection();
55
        try (Connection c = p.getConnection();){
55
            Statement s = c.createStatement();
56
            Statement s = c.createStatement();
56
            s.execute(dbinit);
57
            s.execute(dbinit);
-
 
58
            c.commit();
57
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
59
            log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
58
            c.close();
60
            c.close();
59
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
61
            log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
60
        } catch (SQLException e) {
62
        } catch (SQLException e) {
61
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
63
            log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
Line 70... Line 72...
70
        try {
72
        try {
71
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
73
            Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
72
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
74
            log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
Line 73... Line 75...
73
 
75
 
-
 
76
            JobKey key = JobKey.jobKey("ALFEmitter");
74
            JobKey key = JobKey.jobKey("ALFEmitter");
77
            JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
75
            synchronized (scheduler) {
78
            synchronized (scheduler) {
76
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
79
                if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
77
                    scheduler.interrupt(key);
80
                    scheduler.interrupt(key);
78
                    scheduler.deleteJob(key);
81
                    scheduler.deleteJob(key);
79
                    log.info("Gracefully stopped the ALFEventEmitter job.");
82
                    log.info("Gracefully stopped the ALFEventEmitter job.");
-
 
83
                }
-
 
84
                if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
-
 
85
                    scheduler.interrupt(cleanerkey);
-
 
86
                    scheduler.deleteJob(cleanerkey);
-
 
87
                    log.info("Gracefully stopped the ALFEventEmitter job.");
80
                }
88
                }
81
                if (!scheduler.isShutdown()) {
89
                if (!scheduler.isShutdown()) {
82
                    scheduler.shutdown(true);
90
                    scheduler.shutdown(true);
83
                }
91
                }
84
            }
92
            }
Line 96... Line 104...
96
        try {
104
        try {
97
            log.info("Create/Re-open file based database to persist memory database.");
105
            log.info("Create/Re-open file based database to persist memory database.");
98
            Connection con = fdbpool.getConnection();
106
            Connection con = fdbpool.getConnection();
99
            Statement s = con.createStatement();
107
            Statement s = con.createStatement();
Line 100... Line 108...
100
 
108
 
101
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
109
            final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
102
            s.execute(insert);
110
            s.execute(insert);
103
            int count = s.getUpdateCount();
111
            int count = s.getUpdateCount();
-
 
112
            log.info("Persisted {} active event rows in file-based database.", count);
-
 
113
 
-
 
114
            final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
-
 
115
            s.execute(save_all);
-
 
116
            count = s.getUpdateCount();
-
 
117
            log.info("Persisted {} event rows from all_events log in file-based database", count);
104
            log.info("Persisted {} rows in file-based database.", count);
118
 
105
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
119
            log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
-
 
120
            s.execute("SHUTDOWN;");
106
            s.execute("SHUTDOWN;");
121
 
107
            con.close();
122
            con.close();
108
            log.info("Shutting down databases complete.");
123
            log.info("Shutting down databases complete.");
109
        } catch (SQLException e) {
124
        } catch (SQLException e) {
110
            log.error("An error occurred during database persistence: {}", e.getMessage());
125
            log.error("An error occurred during database persistence: {}", e.getMessage());
Line 139... Line 154...
139
        }
154
        }
Line 140... Line 155...
140
 
155
 
141
        //Load events from file based database into in-memory database
156
        //Load events from file based database into in-memory database
142
        try {
157
        try {
143
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
158
            log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
144
            final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
-
 
145
                                "CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
159
            final String link = getLinkSQL();
146
            final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
160
            final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
147
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
161
            final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
148
            int count = 0;
162
            int count = 0;
149
            Connection con = fdbpool.getConnection();
163
            Connection con = fdbpool.getConnection();
150
            con.setAutoCommit(false);
164
            con.setAutoCommit(false);
Line 161... Line 175...
161
        } catch (SQLException e) {
175
        } catch (SQLException e) {
162
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
176
            log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
163
            throw new RuntimeException(e);
177
            throw new RuntimeException(e);
164
        }
178
        }
165
        //Start initial run of the emitter
179
        //Start initial run of the emitter
-
 
180
        if(configuration.isEmitterActive()) {
166
        startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
181
            startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
-
 
182
        }
-
 
183
 
-
 
184
        //Start mem db log cleaner
-
 
185
        if(configuration.getCleaner_interval()>0) {
-
 
186
            startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
-
 
187
        }
-
 
188
    }
-
 
189
 
-
 
190
    private String getLinkSQL() {
-
 
191
        final String dbDriverClass = "org.h2.Driver";
-
 
192
        final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
-
 
193
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
-
 
194
                            "CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
-
 
195
                            "";
-
 
196
        return link;
167
    }
197
    }
Line 168... Line 198...
168
 
198
 
169
    private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
199
    private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
170
        try {
200
        try {
171
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
201
            if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
172
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
202
                JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
173
                job2.getJobDataMap().put("mdbConnection", mempool);
203
                job2.getJobDataMap().put("mdbConnection", mempool);
174
                job2.getJobDataMap().put("fdbConnection", fdbpool);
204
                job2.getJobDataMap().put("fdbConnection", fdbpool);
175
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
205
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
176
                job2.getJobDataMap().put("egres_counter", egres_counter);
206
                job2.getJobDataMap().put("egres_counter", egres);
177
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
207
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
178
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
208
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
179
                scheduler.scheduleJob(job2, t);
209
                scheduler.scheduleJob(job2, t);
180
            }
210
            }
181
        } catch (SchedulerException ex) {
211
        } catch (SchedulerException ex) {
182
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
212
            log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
183
        }
213
        }
Line -... Line 214...
-
 
214
    }
-
 
215
 
-
 
216
    private void startEventLogCleaner(Scheduler scheduler) {
-
 
217
        try {
-
 
218
            if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
-
 
219
                JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
-
 
220
                job2.getJobDataMap().put("mdbConnection", mempool);
-
 
221
                job2.getJobDataMap().put("fdbConnection", fdbpool);
-
 
222
                job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
-
 
223
                job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
-
 
224
                SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
-
 
225
                        .withSchedule(SimpleScheduleBuilder.simpleSchedule()
-
 
226
                                .withIntervalInMinutes(configuration.getCleaner_interval())
-
 
227
                                .repeatForever())
-
 
228
                        .startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
-
 
229
                        .build();
-
 
230
                scheduler.scheduleJob(job2, t);
-
 
231
            }
-
 
232
        } catch (SchedulerException ex) {
-
 
233
            log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
-
 
234
        }
184
    }
235
    }
185
 
236
 
186
    private void readConfiguration(ServletContext ctx) {
237
    private void readConfiguration(ServletContext ctx) {
187
                /* Configure ServletContext attributes using configuration object*/
238
                /* Configure ServletContext attributes using configuration object*/
188
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
239
                EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();