Subversion Repositories XServices

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
201 brianR 1
package net.brutex.xservices.util;
2
 
3
import lombok.extern.slf4j.Slf4j;
4
import org.apache.commons.io.IOUtils;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
7
 
8
import java.io.IOException;
9
import java.io.Reader;
10
import java.io.StringReader;
11
import java.sql.*;
12
import java.time.Instant;
13
import java.util.Date;
14
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.concurrent.atomic.AtomicLong;
16
 
17
import static org.quartz.TriggerBuilder.newTrigger;
18
 
19
@Slf4j
20
public class EventEmitter implements Job, InterruptableJob {
21
 
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
23
 
24
    @Override
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
26
        final Instant d = Instant.now();
27
        final long ts = d.toEpochMilli();
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
29
                .get(EventmanagerConfiguration.KEY);
30
 
31
        final String url = conf.getTargeturl();
32
 
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
37
 
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
41
 
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
44
 
45
 
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
48
                "VALUES (?,?,?,?,?,?,?,?);";
49
 
50
        /**
51
         * Move event table data to snapshot
52
         */
53
 
54
        Connection con = null;
55
        Connection fcon = null;
56
        try {
57
            con = pool.getConnection();
58
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
59
            con.setAutoCommit(false);
60
            Statement stmt = con.createStatement();
61
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
62
            moveprep.setLong(1, run_key);
63
            moveprep.execute();
64
            stmt.execute(deleteTable);
65
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
66
 
67
 
68
            fcon = fpool.getConnection();
69
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
70
 
71
            PreparedStatement del = con.prepareStatement(deleteSQL);
72
 
73
            ResultSet rs = stmt.executeQuery(querySQL);
74
 
75
 
76
            while(rs.next() && !isInterrupted.get()) {
77
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
78
                String id = rs.getString(1);
79
                Clob c = rs.getClob(2);
80
                String obj_id = rs.getString(3);
81
                String event_type = rs.getString(4);
82
                String obj_type = rs.getString(5);
83
                long event_ts = rs.getLong(6);
84
                boolean bretry = false;
85
 
86
                SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
87
                int retry = 0;
88
                Reader response = null;
89
                String rsp = "";
90
                boolean succeeded = false;
91
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
92
                    retry++;
93
                        response = ss.sendSoap(false);
94
                        succeeded = true;
95
                        if(response!=null) {
96
                            rsp = IOUtils.toString(response);
97
                        }
98
 
99
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
100
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
101
 
102
                        if (succeeded) {
103
                            // Successfully send
104
                            del.setString(1, id);
105
                            del.execute();
106
                            con.commit();
107
                            egres_counter.incrementAndGet();
108
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
109
                        } else {
110
                            // Error during sending
111
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
112
                            try {
113
                                Thread.sleep(2000);
114
                            } catch (InterruptedException e) {
115
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
116
                            }
117
                        }
118
                }
119
 
120
                if(! succeeded) {
121
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
122
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
123
 
124
 
125
                    try {
126
                        //this is in file-based db
127
                        errorPrepSql.setString(1, event_type);
128
                        errorPrepSql.setString(2, id);
129
                        errorPrepSql.setString(3, obj_type);
130
                        errorPrepSql.setString(4, obj_id);
131
                        errorPrepSql.setLong(5, event_ts);
132
                        errorPrepSql.setBoolean(6, bretry);
133
                        errorPrepSql.setClob(7, new StringReader(rsp) );
134
                        errorPrepSql.setClob(8, c);
135
                        errorPrepSql.execute();
136
                        fcon.commit();
137
 
138
                        //this is in-memory
139
                        del.setString(1, id);
140
                        del.execute();
141
                        con.commit();
142
                    }  catch (SQLException e) {
143
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
144
                    }
145
                }
146
            }
147
 
148
 
149
        } catch (SQLException e) {
150
            log.error("Exception in SQL execution: {}", e.getMessage());
151
            throw new JobExecutionException(e);
152
        } catch (IOException e) {
153
            log.error("Exception in SQL execution: {}", e.getMessage());
154
            throw new RuntimeException(e);
155
        } finally {
156
            try {
157
                if(fcon!=null) fcon.close();
158
                if(con!=null)  con.close();
159
            } catch (SQLException e) {
160
                    log.error("Error closing the database connections: {}", e.getMessage());
161
                    throw new RuntimeException(e);
162
            }
163
        }
164
    }
165
 
166
 
167
    /**
168
     * <p>
169
     * Called by the <code>{@link Scheduler}</code> when a user
170
     * interrupts the <code>Job</code>.
171
     * </p>
172
     *
173
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
174
     */
175
    @Override
176
    public synchronized void interrupt() throws UnableToInterruptJobException {
177
        isInterrupted.set(true);
178
        log.warn("ALFEmitter received and interrupt.");
179
    }
180
}