Subversion Repositories XServices

Rev

Rev 201 | Only display areas with differences | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 201 Rev 203
1
package net.brutex.xservices.util;
1
package net.brutex.xservices.util;
2
 
2
 
3
import lombok.extern.slf4j.Slf4j;
3
import lombok.extern.slf4j.Slf4j;
4
import org.apache.commons.io.IOUtils;
4
import org.apache.commons.io.IOUtils;
5
import org.h2.jdbcx.JdbcConnectionPool;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
6
import org.quartz.*;
7
 
7
 
8
import java.io.IOException;
8
import java.io.IOException;
9
import java.io.Reader;
9
import java.io.Reader;
10
import java.io.StringReader;
10
import java.io.StringReader;
11
import java.sql.*;
11
import java.sql.*;
12
import java.time.Instant;
12
import java.time.Instant;
13
import java.util.Date;
13
import java.util.Date;
14
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.concurrent.atomic.AtomicLong;
15
import java.util.concurrent.atomic.AtomicLong;
16
 
16
 
17
import static org.quartz.TriggerBuilder.newTrigger;
17
import static org.quartz.TriggerBuilder.newTrigger;
18
 
18
 
19
@Slf4j
19
@Slf4j
20
public class EventEmitter implements Job, InterruptableJob {
20
public class EventEmitter implements Job, InterruptableJob {
21
 
21
 
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
23
 
23
 
24
    @Override
24
    @Override
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
26
        final Instant d = Instant.now();
26
        final Instant d = Instant.now();
27
        final long ts = d.toEpochMilli();
27
        final long ts = d.toEpochMilli();
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
29
                .get(EventmanagerConfiguration.KEY);
29
                .get(EventmanagerConfiguration.KEY);
30
 
30
 
31
        final String url = conf.getTargeturl();
31
        final String url = conf.getTargeturl();
32
 
32
 
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
37
 
37
 
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
41
 
41
 
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
44
 
44
 
45
 
45
 
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
48
                "VALUES (?,?,?,?,?,?,?,?);";
48
                "VALUES (?,?,?,?,?,?,?,?);";
49
 
49
 
50
        /**
50
        /**
51
         * Move event table data to snapshot
51
         * Move event table data to snapshot
52
         */
52
         */
-
 
53
 
53
 
54
 
54
        Connection con = null;
55
        try(          Connection  con = pool.getConnection();
55
        Connection fcon = null;
56
                      Connection fcon = fpool.getConnection();
56
        try {
-
 
-
 
57
        ) {
57
            con = pool.getConnection();
58
 
58
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
59
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
59
            con.setAutoCommit(false);
60
            con.setAutoCommit(false);
60
            Statement stmt = con.createStatement();
61
            Statement stmt = con.createStatement();
61
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
62
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
62
            moveprep.setLong(1, run_key);
63
            moveprep.setLong(1, run_key);
63
            moveprep.execute();
64
            moveprep.execute();
64
            stmt.execute(deleteTable);
65
            stmt.execute(deleteTable);
65
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
66
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
66
 
67
 
67
 
-
 
-
 
68
 
68
            fcon = fpool.getConnection();
69
 
69
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
70
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
70
 
71
 
71
            PreparedStatement del = con.prepareStatement(deleteSQL);
72
            PreparedStatement del = con.prepareStatement(deleteSQL);
72
 
73
 
73
            ResultSet rs = stmt.executeQuery(querySQL);
74
            ResultSet rs = stmt.executeQuery(querySQL);
74
 
75
 
75
 
76
 
76
            while(rs.next() && !isInterrupted.get()) {
77
            while(rs.next() && !isInterrupted.get()) {
77
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
78
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
78
                String id = rs.getString(1);
79
                String id = rs.getString(1);
79
                Clob c = rs.getClob(2);
80
                Clob c = rs.getClob(2);
80
                String obj_id = rs.getString(3);
81
                String obj_id = rs.getString(3);
81
                String event_type = rs.getString(4);
82
                String event_type = rs.getString(4);
82
                String obj_type = rs.getString(5);
83
                String obj_type = rs.getString(5);
83
                long event_ts = rs.getLong(6);
84
                long event_ts = rs.getLong(6);
84
                boolean bretry = false;
85
                boolean bretry = false;
85
 
86
 
-
 
87
                //SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getAsciiStream()c.getCharacterStream()));
86
                SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
88
                SimpleSoap ss = new SimpleSoap(url, id, c.getSubString(1L, (int) c.length()));
87
                int retry = 0;
89
                int retry = 0;
88
                Reader response = null;
90
                Reader response = null;
89
                String rsp = "";
91
                String rsp = "";
90
                boolean succeeded = false;
92
                boolean succeeded = false;
91
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
93
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
92
                    retry++;
94
                    retry++;
93
                        response = ss.sendSoap(false);
95
                        response = ss.sendSoap(false);
94
                        succeeded = true;
96
                        succeeded = true;
95
                        if(response!=null) {
97
                        if(response!=null) {
96
                            rsp = IOUtils.toString(response);
98
                            rsp = IOUtils.toString(response);
97
                        }
99
                        }
98
 
100
 
99
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
101
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
100
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
102
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
101
 
103
 
102
                        if (succeeded) {
104
                        if (succeeded) {
103
                            // Successfully send
105
                            // Successfully send
104
                            del.setString(1, id);
106
                            del.setString(1, id);
105
                            del.execute();
107
                            del.execute();
106
                            con.commit();
108
                            con.commit();
107
                            egres_counter.incrementAndGet();
109
                            egres_counter.incrementAndGet();
108
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
110
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
109
                        } else {
111
                        } else {
110
                            // Error during sending
112
                            // Error during sending
111
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
113
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
112
                            try {
114
                            try {
113
                                Thread.sleep(2000);
115
                                Thread.sleep(2000);
114
                            } catch (InterruptedException e) {
116
                            } catch (InterruptedException e) {
115
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
117
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
116
                            }
118
                            }
117
                        }
119
                        }
118
                }
120
                }
119
 
121
 
120
                if(! succeeded) {
122
                if(! succeeded) {
121
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
123
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
122
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
124
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
123
 
125
 
124
 
126
 
125
                    try {
127
                    try {
126
                        //this is in file-based db
128
                        //this is in file-based db
127
                        errorPrepSql.setString(1, event_type);
129
                        errorPrepSql.setString(1, event_type);
128
                        errorPrepSql.setString(2, id);
130
                        errorPrepSql.setString(2, id);
129
                        errorPrepSql.setString(3, obj_type);
131
                        errorPrepSql.setString(3, obj_type);
130
                        errorPrepSql.setString(4, obj_id);
132
                        errorPrepSql.setString(4, obj_id);
131
                        errorPrepSql.setLong(5, event_ts);
133
                        errorPrepSql.setLong(5, event_ts);
132
                        errorPrepSql.setBoolean(6, bretry);
134
                        errorPrepSql.setBoolean(6, bretry);
133
                        errorPrepSql.setClob(7, new StringReader(rsp) );
135
                        errorPrepSql.setClob(7, new StringReader(rsp) );
134
                        errorPrepSql.setClob(8, c);
136
                        errorPrepSql.setClob(8, c);
135
                        errorPrepSql.execute();
137
                        errorPrepSql.execute();
136
                        fcon.commit();
138
                        fcon.commit();
137
 
139
 
138
                        //this is in-memory
140
                        //this is in-memory
139
                        del.setString(1, id);
141
                        del.setString(1, id);
140
                        del.execute();
142
                        del.execute();
141
                        con.commit();
143
                        con.commit();
142
                    }  catch (SQLException e) {
144
                    }  catch (SQLException e) {
143
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
145
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
144
                    }
146
                    }
145
                }
147
                }
146
            }
148
            }
147
 
149
 
148
 
150
 
149
        } catch (SQLException e) {
151
        } catch (SQLException e) {
150
            log.error("Exception in SQL execution: {}", e.getMessage());
152
            log.error("Exception in SQL execution: {}", e.getMessage());
151
            throw new JobExecutionException(e);
153
            throw new JobExecutionException(e);
152
        } catch (IOException e) {
154
        } catch (IOException e) {
153
            log.error("Exception in SQL execution: {}", e.getMessage());
155
            log.error("Exception in SQL execution: {}", e.getMessage());
154
            throw new RuntimeException(e);
156
            throw new RuntimeException(e);
155
        } finally {
-
 
156
            try {
-
 
157
                if(fcon!=null) fcon.close();
-
 
158
                if(con!=null)  con.close();
-
 
159
            } catch (SQLException e) {
-
 
160
                    log.error("Error closing the database connections: {}", e.getMessage());
-
 
161
                    throw new RuntimeException(e);
-
 
162
            }
-
 
163
        }
157
        }
164
    }
158
    }
165
 
159
 
166
 
160
 
167
    /**
161
    /**
168
     * <p>
162
     * <p>
169
     * Called by the <code>{@link Scheduler}</code> when a user
163
     * Called by the <code>{@link Scheduler}</code> when a user
170
     * interrupts the <code>Job</code>.
164
     * interrupts the <code>Job</code>.
171
     * </p>
165
     * </p>
172
     *
166
     *
173
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
167
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
174
     */
168
     */
175
    @Override
169
    @Override
176
    public synchronized void interrupt() throws UnableToInterruptJobException {
170
    public synchronized void interrupt() throws UnableToInterruptJobException {
177
        isInterrupted.set(true);
171
        isInterrupted.set(true);
178
        log.warn("ALFEmitter received and interrupt.");
172
        log.warn("ALFEmitter received and interrupt.");
179
    }
173
    }
180
}
174
}