Subversion Repositories XServices

Rev

Rev 201 | Only display areas with differences | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 201 Rev 203
1
package net.brutex.xservices.util;
1
package net.brutex.xservices.util;
2
 
2
 
3
import lombok.extern.slf4j.Slf4j;
3
import lombok.extern.slf4j.Slf4j;
4
import org.apache.commons.io.IOUtils;
4
import org.apache.commons.io.IOUtils;
5
import org.h2.jdbcx.JdbcConnectionPool;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
6
import org.quartz.*;
7
 
7
 
8
import java.io.IOException;
8
import java.io.IOException;
9
import java.io.Reader;
9
import java.io.Reader;
10
import java.io.StringReader;
10
import java.io.StringReader;
11
import java.sql.*;
11
import java.sql.*;
12
import java.time.Instant;
12
import java.time.Instant;
13
import java.util.Date;
-
 
14
import java.util.concurrent.atomic.AtomicBoolean;
13
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.concurrent.atomic.AtomicLong;
14
import java.util.concurrent.atomic.AtomicLong;
16
 
-
 
17
import static org.quartz.TriggerBuilder.newTrigger;
-
 
18
 
15
 
-
 
16
@Slf4j
19
@Slf4j
17
@DisallowConcurrentExecution
20
public class EventEmitter implements Job, InterruptableJob {
18
public class EventLogCleanerJob implements Job, InterruptableJob {
21
 
19
 
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
20
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
23
 
21
 
24
    @Override
22
    @Override
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
23
    public void execute(JobExecutionContext context) throws JobExecutionException {
26
        final Instant d = Instant.now();
24
        final Instant d = Instant.now();
27
        final long ts = d.toEpochMilli();
25
        final long ts = d.toEpochMilli();
-
 
26
 
-
 
27
        log.info("EventLogCleaner is executing now.");
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
29
                .get(EventmanagerConfiguration.KEY);
29
                .get(EventmanagerConfiguration.KEY);
30
 
-
 
31
        final String url = conf.getTargeturl();
-
 
32
 
-
 
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
30
 
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
-
 
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
31
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
-
 
37
 
-
 
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
-
 
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
-
 
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
32
        final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
-
 
33
 
-
 
34
        final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
41
 
35
                "where btx_timestamp < " + (ts-5000) + " ";
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
-
 
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
-
 
44
 
-
 
45
 
-
 
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
36
        final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
37
        final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);
48
                "VALUES (?,?,?,?,?,?,?,?);";
38
 
49
 
-
 
50
        /**
-
 
51
         * Move event table data to snapshot
-
 
52
         */
-
 
53
 
-
 
54
        Connection con = null;
-
 
55
        Connection fcon = null;
-
 
56
        try {
-
 
57
            con = pool.getConnection();
-
 
58
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
 
59
            con.setAutoCommit(false);
-
 
60
            Statement stmt = con.createStatement();
-
 
61
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
-
 
62
            moveprep.setLong(1, run_key);
-
 
63
            moveprep.execute();
-
 
64
            stmt.execute(deleteTable);
-
 
65
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
-
 
66
 
-
 
67
 
-
 
68
            fcon = fpool.getConnection();
-
 
69
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
-
 
70
 
-
 
71
            PreparedStatement del = con.prepareStatement(deleteSQL);
-
 
72
 
-
 
73
            ResultSet rs = stmt.executeQuery(querySQL);
-
 
74
 
-
 
75
 
-
 
76
            while(rs.next() && !isInterrupted.get()) {
-
 
77
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
-
 
78
                String id = rs.getString(1);
-
 
79
                Clob c = rs.getClob(2);
-
 
80
                String obj_id = rs.getString(3);
-
 
81
                String event_type = rs.getString(4);
-
 
82
                String obj_type = rs.getString(5);
-
 
83
                long event_ts = rs.getLong(6);
-
 
84
                boolean bretry = false;
-
 
85
 
-
 
86
                SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
-
 
87
                int retry = 0;
-
 
88
                Reader response = null;
-
 
89
                String rsp = "";
-
 
90
                boolean succeeded = false;
-
 
91
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
-
 
92
                    retry++;
-
 
93
                        response = ss.sendSoap(false);
-
 
94
                        succeeded = true;
-
 
95
                        if(response!=null) {
-
 
96
                            rsp = IOUtils.toString(response);
-
 
97
                        }
-
 
98
 
-
 
99
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
-
 
100
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
-
 
101
 
-
 
102
                        if (succeeded) {
-
 
103
                            // Successfully send
-
 
104
                            del.setString(1, id);
-
 
105
                            del.execute();
-
 
106
                            con.commit();
-
 
107
                            egres_counter.incrementAndGet();
-
 
108
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
-
 
109
                        } else {
-
 
110
                            // Error during sending
-
 
111
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
-
 
112
                            try {
-
 
113
                                Thread.sleep(2000);
-
 
114
                            } catch (InterruptedException e) {
-
 
115
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
-
 
116
                            }
-
 
117
                        }
-
 
118
                }
-
 
119
 
-
 
120
                if(! succeeded) {
-
 
121
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
-
 
122
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
-
 
123
 
-
 
124
 
-
 
125
                    try {
-
 
126
                        //this is in file-based db
-
 
127
                        errorPrepSql.setString(1, event_type);
-
 
128
                        errorPrepSql.setString(2, id);
-
 
129
                        errorPrepSql.setString(3, obj_type);
-
 
130
                        errorPrepSql.setString(4, obj_id);
-
 
131
                        errorPrepSql.setLong(5, event_ts);
-
 
132
                        errorPrepSql.setBoolean(6, bretry);
-
 
133
                        errorPrepSql.setClob(7, new StringReader(rsp) );
-
 
134
                        errorPrepSql.setClob(8, c);
-
 
135
                        errorPrepSql.execute();
-
 
136
                        fcon.commit();
-
 
137
 
-
 
138
                        //this is in-memory
-
 
139
                        del.setString(1, id);
-
 
140
                        del.execute();
-
 
141
                        con.commit();
-
 
-
 
39
 
-
 
40
        /**
-
 
41
         * Move event table data to all events log
-
 
42
         */
-
 
43
 
-
 
44
 
-
 
45
        try (Connection fcon = fpool.getConnection();
-
 
46
            Connection mcon = mpool.getConnection()){
-
 
47
            fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
 
48
            fcon.setAutoCommit(false);
-
 
49
            Statement stmt = fcon.createStatement();
-
 
50
            stmt.execute(moveSQL);
-
 
51
            int count = stmt.getUpdateCount();
-
 
52
            fcon.commit();
-
 
53
            Statement mstm = mcon.createStatement();
-
 
54
 
142
                    }  catch (SQLException e) {
55
 
143
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
56
            mstm.execute(deleteMemTable);
144
                    }
57
            int count2 = mstm.getUpdateCount();
145
                }
58
            mcon.commit();
146
            }
59
 
147
 
60
            log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);
148
 
61
 
149
        } catch (SQLException e) {
62
        } catch (SQLException e) {
150
            log.error("Exception in SQL execution: {}", e.getMessage());
63
            log.error("Exception in SQL execution: {}", e.getMessage());
151
            throw new JobExecutionException(e);
64
            throw new JobExecutionException(e);
152
        } catch (IOException e) {
-
 
153
            log.error("Exception in SQL execution: {}", e.getMessage());
-
 
154
            throw new RuntimeException(e);
-
 
155
        } finally {
-
 
156
            try {
-
 
157
                if(fcon!=null) fcon.close();
-
 
158
                if(con!=null)  con.close();
-
 
159
            } catch (SQLException e) {
-
 
160
                    log.error("Error closing the database connections: {}", e.getMessage());
-
 
161
                    throw new RuntimeException(e);
-
 
162
            }
-
 
163
        }
65
        }
164
    }
66
    }
165
 
67
 
166
 
68
 
167
    /**
69
        /**
168
     * <p>
70
     * <p>
169
     * Called by the <code>{@link Scheduler}</code> when a user
71
     * Called by the <code>{@link Scheduler}</code> when a user
170
     * interrupts the <code>Job</code>.
72
     * interrupts the <code>Job</code>.
171
     * </p>
73
     * </p>
172
     *
74
     *
173
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
75
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
174
     */
76
     */
175
    @Override
77
    @Override
176
    public synchronized void interrupt() throws UnableToInterruptJobException {
78
    public synchronized void interrupt() throws UnableToInterruptJobException {
177
        isInterrupted.set(true);
79
        isInterrupted.set(true);
178
        log.warn("ALFEmitter received and interrupt.");
80
        log.warn("EventLogCleaner received and interrupt.");
-
 
81
        Thread.currentThread().interrupt();
179
    }
82
    }
180
}
83
}