Subversion Repositories XServices

Compare Revisions

Ignore whitespace Rev 200 → Rev 201

/xservices/trunk/src/main/java/net/brutex/xservices/util/EventEmitter.java
0,0 → 1,180
package net.brutex.xservices.util;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
 
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
 
@Slf4j
public class EventEmitter implements Job, InterruptableJob {
 
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
 
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
final Instant d = Instant.now();
final long ts = d.toEpochMilli();
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
.get(EventmanagerConfiguration.KEY);
 
final String url = conf.getTargeturl();
 
final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
final long run_key = context.getMergedJobDataMap().getLong("run_key");
final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
 
final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
 
final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
" btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
 
 
final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
"VALUES (?,?,?,?,?,?,?,?);";
 
/**
* Move event table data to snapshot
*/
 
Connection con = null;
Connection fcon = null;
try {
con = pool.getConnection();
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
con.setAutoCommit(false);
Statement stmt = con.createStatement();
PreparedStatement moveprep= con.prepareStatement(moveSQL);
moveprep.setLong(1, run_key);
moveprep.execute();
stmt.execute(deleteTable);
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
 
 
fcon = fpool.getConnection();
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
 
PreparedStatement del = con.prepareStatement(deleteSQL);
 
ResultSet rs = stmt.executeQuery(querySQL);
 
 
while(rs.next() && !isInterrupted.get()) {
/* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
String id = rs.getString(1);
Clob c = rs.getClob(2);
String obj_id = rs.getString(3);
String event_type = rs.getString(4);
String obj_type = rs.getString(5);
long event_ts = rs.getLong(6);
boolean bretry = false;
 
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
int retry = 0;
Reader response = null;
String rsp = "";
boolean succeeded = false;
while(retry < 3 && !succeeded && !isInterrupted.get()) {
retry++;
response = ss.sendSoap(false);
succeeded = true;
if(response!=null) {
rsp = IOUtils.toString(response);
}
 
if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
 
if (succeeded) {
// Successfully send
del.setString(1, id);
del.execute();
con.commit();
egres_counter.incrementAndGet();
log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
} else {
// Error during sending
log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("Interrupted while waiting to retry: {}", e.getMessage());
}
}
}
 
if(! succeeded) {
log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
"Moving event back to the queue unless there is a superseding event already queued.", id);
 
 
try {
//this is in file-based db
errorPrepSql.setString(1, event_type);
errorPrepSql.setString(2, id);
errorPrepSql.setString(3, obj_type);
errorPrepSql.setString(4, obj_id);
errorPrepSql.setLong(5, event_ts);
errorPrepSql.setBoolean(6, bretry);
errorPrepSql.setClob(7, new StringReader(rsp) );
errorPrepSql.setClob(8, c);
errorPrepSql.execute();
fcon.commit();
 
//this is in-memory
del.setString(1, id);
del.execute();
con.commit();
} catch (SQLException e) {
log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
}
}
}
 
 
} catch (SQLException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new JobExecutionException(e);
} catch (IOException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new RuntimeException(e);
} finally {
try {
if(fcon!=null) fcon.close();
if(con!=null) con.close();
} catch (SQLException e) {
log.error("Error closing the database connections: {}", e.getMessage());
throw new RuntimeException(e);
}
}
}
 
 
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
isInterrupted.set(true);
log.warn("ALFEmitter received and interrupt.");
}
}