Subversion Repositories XServices

Rev

Rev 201 | Show entire file | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 201 Rev 203
Line 8... Line 8...
8
import java.io.IOException;
8
import java.io.IOException;
9
import java.io.Reader;
9
import java.io.Reader;
10
import java.io.StringReader;
10
import java.io.StringReader;
11
import java.sql.*;
11
import java.sql.*;
12
import java.time.Instant;
12
import java.time.Instant;
13
import java.util.Date;
-
 
14
import java.util.concurrent.atomic.AtomicBoolean;
13
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.concurrent.atomic.AtomicLong;
14
import java.util.concurrent.atomic.AtomicLong;
Line 16... Line -...
16
 
-
 
17
import static org.quartz.TriggerBuilder.newTrigger;
-
 
18
 
15
 
-
 
16
@Slf4j
19
@Slf4j
17
@DisallowConcurrentExecution
Line 20... Line 18...
20
public class EventEmitter implements Job, InterruptableJob {
18
public class EventLogCleanerJob implements Job, InterruptableJob {
Line 21... Line 19...
21
 
19
 
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
20
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
23
 
21
 
24
    @Override
22
    @Override
-
 
23
    public void execute(JobExecutionContext context) throws JobExecutionException {
-
 
24
        final Instant d = Instant.now();
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
25
        final long ts = d.toEpochMilli();
26
        final Instant d = Instant.now();
26
 
Line 27... Line -...
27
        final long ts = d.toEpochMilli();
-
 
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
-
 
29
                .get(EventmanagerConfiguration.KEY);
-
 
30
 
27
        log.info("EventLogCleaner is executing now.");
31
        final String url = conf.getTargeturl();
-
 
32
 
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
-
 
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
-
 
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
-
 
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
-
 
Line 37... Line 29...
37
 
29
                .get(EventmanagerConfiguration.KEY);
-
 
30
 
-
 
31
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
32
        final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
Line 39... Line -...
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
-
 
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
-
 
41
 
-
 
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
-
 
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
33
 
44
 
34
        final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
45
 
35
                "where btx_timestamp < " + (ts-5000) + " ";
Line 46... Line -...
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
-
 
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
-
 
48
                "VALUES (?,?,?,?,?,?,?,?);";
-
 
49
 
-
 
50
        /**
-
 
51
         * Move event table data to snapshot
-
 
52
         */
-
 
53
 
-
 
54
        Connection con = null;
-
 
55
        Connection fcon = null;
-
 
56
        try {
-
 
57
            con = pool.getConnection();
-
 
58
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
 
59
            con.setAutoCommit(false);
-
 
60
            Statement stmt = con.createStatement();
-
 
61
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
-
 
62
            moveprep.setLong(1, run_key);
-
 
63
            moveprep.execute();
-
 
64
            stmt.execute(deleteTable);
-
 
65
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
-
 
66
 
-
 
67
 
-
 
68
            fcon = fpool.getConnection();
-
 
69
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
-
 
70
 
-
 
71
            PreparedStatement del = con.prepareStatement(deleteSQL);
-
 
72
 
-
 
73
            ResultSet rs = stmt.executeQuery(querySQL);
-
 
74
 
-
 
75
 
-
 
76
            while(rs.next() && !isInterrupted.get()) {
-
 
77
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
-
 
78
                String id = rs.getString(1);
-
 
79
                Clob c = rs.getClob(2);
-
 
80
                String obj_id = rs.getString(3);
-
 
81
                String event_type = rs.getString(4);
-
 
82
                String obj_type = rs.getString(5);
-
 
83
                long event_ts = rs.getLong(6);
-
 
84
                boolean bretry = false;
-
 
85
 
-
 
86
                SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
-
 
87
                int retry = 0;
-
 
88
                Reader response = null;
-
 
89
                String rsp = "";
-
 
90
                boolean succeeded = false;
-
 
91
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
-
 
92
                    retry++;
-
 
93
                        response = ss.sendSoap(false);
-
 
94
                        succeeded = true;
-
 
95
                        if(response!=null) {
-
 
96
                            rsp = IOUtils.toString(response);
-
 
97
                        }
-
 
98
 
-
 
99
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
-
 
100
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
-
 
101
 
-
 
102
                        if (succeeded) {
-
 
103
                            // Successfully send
-
 
104
                            del.setString(1, id);
-
 
105
                            del.execute();
-
 
106
                            con.commit();
-
 
107
                            egres_counter.incrementAndGet();
-
 
108
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
-
 
109
                        } else {
-
 
110
                            // Error during sending
-
 
111
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
-
 
112
                            try {
-
 
113
                                Thread.sleep(2000);
-
 
114
                            } catch (InterruptedException e) {
-
 
115
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
-
 
116
                            }
-
 
117
                        }
-
 
118
                }
-
 
119
 
-
 
120
                if(! succeeded) {
-
 
121
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
-
 
122
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
-
 
123
 
-
 
124
 
-
 
125
                    try {
-
 
126
                        //this is in file-based db
-
 
127
                        errorPrepSql.setString(1, event_type);
-
 
128
                        errorPrepSql.setString(2, id);
-
 
129
                        errorPrepSql.setString(3, obj_type);
-
 
130
                        errorPrepSql.setString(4, obj_id);
-
 
131
                        errorPrepSql.setLong(5, event_ts);
-
 
132
                        errorPrepSql.setBoolean(6, bretry);
-
 
133
                        errorPrepSql.setClob(7, new StringReader(rsp) );
-
 
134
                        errorPrepSql.setClob(8, c);
-
 
135
                        errorPrepSql.execute();
-
 
136
                        fcon.commit();
-
 
137
 
-
 
138
                        //this is in-memory
-
 
Line -... Line 36...
-
 
36
        final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
-
 
37
        final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);
-
 
38
 
-
 
39
 
-
 
40
        /**
-
 
41
         * Move event table data to all events log
-
 
42
         */
-
 
43
 
-
 
44
 
-
 
45
        try (Connection fcon = fpool.getConnection();
-
 
46
            Connection mcon = mpool.getConnection()){
-
 
47
            fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
 
48
            fcon.setAutoCommit(false);
-
 
49
            Statement stmt = fcon.createStatement();
-
 
50
            stmt.execute(moveSQL);
-
 
51
            int count = stmt.getUpdateCount();
Line 139... Line 52...
139
                        del.setString(1, id);
52
            fcon.commit();
140
                        del.execute();
53
            Statement mstm = mcon.createStatement();
141
                        con.commit();
54
 
142
                    }  catch (SQLException e) {
-
 
143
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
-
 
144
                    }
-
 
145
                }
-
 
146
            }
-
 
147
 
-
 
148
 
-
 
149
        } catch (SQLException e) {
-
 
150
            log.error("Exception in SQL execution: {}", e.getMessage());
-
 
151
            throw new JobExecutionException(e);
-
 
152
        } catch (IOException e) {
-
 
153
            log.error("Exception in SQL execution: {}", e.getMessage());
55
 
154
            throw new RuntimeException(e);
56
            mstm.execute(deleteMemTable);
Line 155... Line 57...
155
        } finally {
57
            int count2 = mstm.getUpdateCount();
156
            try {
58
            mcon.commit();
157
                if(fcon!=null) fcon.close();
59
 
158
                if(con!=null)  con.close();
60
            log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);
159
            } catch (SQLException e) {
61
 
160
                    log.error("Error closing the database connections: {}", e.getMessage());
62
        } catch (SQLException e) {
161
                    throw new RuntimeException(e);
63
            log.error("Exception in SQL execution: {}", e.getMessage());
162
            }
64
            throw new JobExecutionException(e);
163
        }
65
        }
164
    }
66
    }
165
 
67
 
166
 
68
 
-
 
69
        /**
167
    /**
70
     * <p>
168
     * <p>
71
     * Called by the <code>{@link Scheduler}</code> when a user