Subversion Repositories XServices

Rev

Rev 201 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
201 brianR 1
package net.brutex.xservices.util;
2
 
3
import lombok.extern.slf4j.Slf4j;
4
import org.apache.commons.io.IOUtils;
5
import org.h2.jdbcx.JdbcConnectionPool;
6
import org.quartz.*;
7
 
8
import java.io.IOException;
9
import java.io.Reader;
10
import java.io.StringReader;
11
import java.sql.*;
12
import java.time.Instant;
13
import java.util.Date;
14
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.concurrent.atomic.AtomicLong;
16
 
17
import static org.quartz.TriggerBuilder.newTrigger;
18
 
19
@Slf4j
20
public class EventEmitter implements Job, InterruptableJob {
21
 
22
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
23
 
24
    @Override
25
    public void execute(JobExecutionContext context) throws JobExecutionException {
26
        final Instant d = Instant.now();
27
        final long ts = d.toEpochMilli();
28
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
29
                .get(EventmanagerConfiguration.KEY);
30
 
31
        final String url = conf.getTargeturl();
32
 
33
        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
34
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
35
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
36
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
37
 
38
        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
39
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
40
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
41
 
42
        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
43
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
44
 
45
 
46
        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
47
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
48
                "VALUES (?,?,?,?,?,?,?,?);";
49
 
50
        /**
51
         * Move event table data to snapshot
52
         */
53
 
203 brianR 54
 
55
        try(          Connection  con = pool.getConnection();
56
                      Connection fcon = fpool.getConnection();
57
        ) {
58
 
201 brianR 59
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
60
            con.setAutoCommit(false);
61
            Statement stmt = con.createStatement();
62
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
63
            moveprep.setLong(1, run_key);
64
            moveprep.execute();
65
            stmt.execute(deleteTable);
66
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
67
 
68
 
203 brianR 69
 
201 brianR 70
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
71
 
72
            PreparedStatement del = con.prepareStatement(deleteSQL);
73
 
74
            ResultSet rs = stmt.executeQuery(querySQL);
75
 
76
 
77
            while(rs.next() && !isInterrupted.get()) {
78
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
79
                String id = rs.getString(1);
80
                Clob c = rs.getClob(2);
81
                String obj_id = rs.getString(3);
82
                String event_type = rs.getString(4);
83
                String obj_type = rs.getString(5);
84
                long event_ts = rs.getLong(6);
85
                boolean bretry = false;
86
 
203 brianR 87
                //SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getAsciiStream()c.getCharacterStream()));
88
                SimpleSoap ss = new SimpleSoap(url, id, c.getSubString(1L, (int) c.length()));
201 brianR 89
                int retry = 0;
90
                Reader response = null;
91
                String rsp = "";
92
                boolean succeeded = false;
93
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
94
                    retry++;
95
                        response = ss.sendSoap(false);
96
                        succeeded = true;
97
                        if(response!=null) {
98
                            rsp = IOUtils.toString(response);
99
                        }
100
 
101
                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
102
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
103
 
104
                        if (succeeded) {
105
                            // Successfully send
106
                            del.setString(1, id);
107
                            del.execute();
108
                            con.commit();
109
                            egres_counter.incrementAndGet();
110
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
111
                        } else {
112
                            // Error during sending
113
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
114
                            try {
115
                                Thread.sleep(2000);
116
                            } catch (InterruptedException e) {
117
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
118
                            }
119
                        }
120
                }
121
 
122
                if(! succeeded) {
123
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
124
                            "Moving event back to the queue unless there is a superseding event already queued.", id);
125
 
126
 
127
                    try {
128
                        //this is in file-based db
129
                        errorPrepSql.setString(1, event_type);
130
                        errorPrepSql.setString(2, id);
131
                        errorPrepSql.setString(3, obj_type);
132
                        errorPrepSql.setString(4, obj_id);
133
                        errorPrepSql.setLong(5, event_ts);
134
                        errorPrepSql.setBoolean(6, bretry);
135
                        errorPrepSql.setClob(7, new StringReader(rsp) );
136
                        errorPrepSql.setClob(8, c);
137
                        errorPrepSql.execute();
138
                        fcon.commit();
139
 
140
                        //this is in-memory
141
                        del.setString(1, id);
142
                        del.execute();
143
                        con.commit();
144
                    }  catch (SQLException e) {
145
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
146
                    }
147
                }
148
            }
149
 
150
 
151
        } catch (SQLException e) {
152
            log.error("Exception in SQL execution: {}", e.getMessage());
153
            throw new JobExecutionException(e);
154
        } catch (IOException e) {
155
            log.error("Exception in SQL execution: {}", e.getMessage());
156
            throw new RuntimeException(e);
157
        }
158
    }
159
 
160
 
161
    /**
162
     * <p>
163
     * Called by the <code>{@link Scheduler}</code> when a user
164
     * interrupts the <code>Job</code>.
165
     * </p>
166
     *
167
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
168
     */
169
    @Override
170
    public synchronized void interrupt() throws UnableToInterruptJobException {
171
        isInterrupted.set(true);
172
        log.warn("ALFEmitter received and interrupt.");
173
    }
174
}