Subversion Repositories XServices

Compare Revisions

No changes between revisions

Ignore whitespace Rev 199 → Rev 201

/xservices/trunk/src/main/java/net/brutex/xservices/util/EventEmitter.java
0,0 → 1,180
package net.brutex.xservices.util;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
 
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
 
@Slf4j
public class EventEmitter implements Job, InterruptableJob {
 
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
 
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
final Instant d = Instant.now();
final long ts = d.toEpochMilli();
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
.get(EventmanagerConfiguration.KEY);
 
final String url = conf.getTargeturl();
 
final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
final long run_key = context.getMergedJobDataMap().getLong("run_key");
final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
 
final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
 
final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
" btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
 
 
final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
"VALUES (?,?,?,?,?,?,?,?);";
 
/**
* Move event table data to snapshot
*/
 
Connection con = null;
Connection fcon = null;
try {
con = pool.getConnection();
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
con.setAutoCommit(false);
Statement stmt = con.createStatement();
PreparedStatement moveprep= con.prepareStatement(moveSQL);
moveprep.setLong(1, run_key);
moveprep.execute();
stmt.execute(deleteTable);
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
 
 
fcon = fpool.getConnection();
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
 
PreparedStatement del = con.prepareStatement(deleteSQL);
 
ResultSet rs = stmt.executeQuery(querySQL);
 
 
while(rs.next() && !isInterrupted.get()) {
/* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
String id = rs.getString(1);
Clob c = rs.getClob(2);
String obj_id = rs.getString(3);
String event_type = rs.getString(4);
String obj_type = rs.getString(5);
long event_ts = rs.getLong(6);
boolean bretry = false;
 
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
int retry = 0;
Reader response = null;
String rsp = "";
boolean succeeded = false;
while(retry < 3 && !succeeded && !isInterrupted.get()) {
retry++;
response = ss.sendSoap(false);
succeeded = true;
if(response!=null) {
rsp = IOUtils.toString(response);
}
 
if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
 
if (succeeded) {
// Successfully send
del.setString(1, id);
del.execute();
con.commit();
egres_counter.incrementAndGet();
log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
} else {
// Error during sending
log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("Interrupted while waiting to retry: {}", e.getMessage());
}
}
}
 
if(! succeeded) {
log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
"Moving event back to the queue unless there is a superseding event already queued.", id);
 
 
try {
//this is in file-based db
errorPrepSql.setString(1, event_type);
errorPrepSql.setString(2, id);
errorPrepSql.setString(3, obj_type);
errorPrepSql.setString(4, obj_id);
errorPrepSql.setLong(5, event_ts);
errorPrepSql.setBoolean(6, bretry);
errorPrepSql.setClob(7, new StringReader(rsp) );
errorPrepSql.setClob(8, c);
errorPrepSql.execute();
fcon.commit();
 
//this is in-memory
del.setString(1, id);
del.execute();
con.commit();
} catch (SQLException e) {
log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
}
}
}
 
 
} catch (SQLException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new JobExecutionException(e);
} catch (IOException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new RuntimeException(e);
} finally {
try {
if(fcon!=null) fcon.close();
if(con!=null) con.close();
} catch (SQLException e) {
log.error("Error closing the database connections: {}", e.getMessage());
throw new RuntimeException(e);
}
}
}
 
 
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
isInterrupted.set(true);
log.warn("ALFEmitter received and interrupt.");
}
}
/xservices/trunk/src/main/java/net/brutex/xservices/util/EventmanagerConfiguration.java
0,0 → 1,68
package net.brutex.xservices.util;
 
import lombok.Data;
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.PropertiesBuilderParametersImpl;
import org.apache.commons.configuration2.ex.ConfigurationException;
 
import javax.servlet.ServletContext;
 
/**
* A configuration object for the MiscService -> Eventmanager. Implemented as singleton.
* @author Brian Rosenberger, bru@brutex.de
**/
@Data
@Slf4j
public class EventmanagerConfiguration {
 
public static final String KEY = "net.brutex.xservices.EventmanagerConfiguration";
 
private static class InstanceHolder {
public static final EventmanagerConfiguration instance = new EventmanagerConfiguration();
}
 
private EventmanagerConfiguration() {
refreshConfig();
}
 
public static EventmanagerConfiguration getInstance() {
return InstanceHolder.instance;
}
 
 
private String targeturl;
private int interval;
private String jdbc_memdb;
private String jdbc_filedb;
 
 
public synchronized EventmanagerConfiguration refreshConfig() {
log.trace("Reading EventmanagerConfiguration from file eventmanager.properties.");
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(new PropertiesBuilderParametersImpl().setFileName("eventmanager.properties"));
 
try {
Configuration config = builder.getConfiguration();
 
/* Read from eventmanager.properties file */
this.targeturl = config.getString("target.url");
this.interval = config.getInt("interval", 10);
this.jdbc_memdb = config.getString("memdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;");
this.jdbc_filedb = config.getString("fdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;");
 
 
} catch (ConfigurationException e) {
log.error("Error loading configuration for event manager in XServices MiscServices: {}", e.getMessage());
throw new RuntimeException(e);
}
return this;
}
 
 
}
/xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java
0,0 → 1,192
package net.brutex.xservices.util;
 
 
import lombok.extern.slf4j.Slf4j;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
 
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
 
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
 
/**
* Handle servlet lifecycle actions for the MiscService servlet, such as
* initializing in-memory database, persist on shutdown etc.
*/
 
 
@WebListener
@Slf4j
public class MiscServiceServletContextListener implements ServletContextListener {
 
/**
* SQL initialization for in-memory database
* INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
*/
private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
 
private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
 
/**
* Create DB connection pool and initialize the in-memory database with schema.
*
* @return connection pool
*/
private static JdbcConnectionPool getDbPool(String dbConnectString) {
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
p.setMaxConnections(16);
p.setLoginTimeout(5);
try {
Connection c = p.getConnection();
Statement s = c.createStatement();
s.execute(dbinit);
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
c.close();
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
} catch (SQLException e) {
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
throw new RuntimeException(e);
}
return p;
}
 
@Override
public void contextDestroyed(ServletContextEvent arg0) {
log.trace("contextDestroyed called.");
try {
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
 
JobKey key = JobKey.jobKey("ALFEmitter");
synchronized (scheduler) {
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
scheduler.interrupt(key);
scheduler.deleteJob(key);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown()) {
scheduler.shutdown(true);
}
}
} catch (SchedulerException e) {
log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
throw new RuntimeException(e);
}
 
log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
int act_i = mempool.getActiveConnections();
if (act_i > 0) {
log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
}
 
try {
log.info("Create/Re-open file based database to persist memory database.");
Connection con = fdbpool.getConnection();
Statement s = con.createStatement();
 
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
s.execute(insert);
int count = s.getUpdateCount();
log.info("Persisted {} rows in file-based database.", count);
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
s.execute("SHUTDOWN;");
con.close();
log.info("Shutting down databases complete.");
} catch (SQLException e) {
log.error("An error occurred during database persistence: {}", e.getMessage());
throw new RuntimeException(e);
}
log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
}
 
//Run this before web application is started
@Override
public void contextInitialized(ServletContextEvent arg0) {
log.debug("ServletContextListener started");
ServletContext context = arg0.getServletContext();
readConfiguration(context);
 
context.setAttribute("mdbConnection", mempool);
context.setAttribute("fdbConnection", fdbpool);
context.setAttribute("ingres_counter", 0);
AtomicLong egres = new AtomicLong(0);
context.setAttribute("egres_counter", egres);
context.setAttribute("ingres_counter", new AtomicLong(0));
try {
StdSchedulerFactory fact = new StdSchedulerFactory();
fact.initialize("MiscServicesScheduler-quartz.properties");
Scheduler scheduler = fact.getScheduler();
scheduler.start();
context.setAttribute("scheduler", scheduler);
} catch (SchedulerException e) {
log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
throw new RuntimeException(e);
}
 
//Load events from file based database into in-memory database
try {
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
int count = 0;
Connection con = fdbpool.getConnection();
con.setAutoCommit(false);
Statement statement = con.createStatement();
statement.execute(link);
con.commit();
ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
if(rs.next()) count = rs.getInt(1);
statement.execute(recoverSQL);
log.info("Recovered {} events and loaded them into in-memory database.", count);
statement.execute(truncate);
con.commit();
con.close();
} catch (SQLException e) {
log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
throw new RuntimeException(e);
}
//Start initial run of the emitter
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
}
 
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put("egres_counter", egres_counter);
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
scheduler.scheduleJob(job2, t);
}
} catch (SchedulerException ex) {
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
}
}
 
private void readConfiguration(ServletContext ctx) {
/* Configure ServletContext attributes using configuration object*/
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
ctx.setAttribute(EventmanagerConfiguration.KEY, c);
}
 
}
/xservices/trunk/src/main/java/net/brutex/xservices/util/SimpleSoap.java
0,0 → 1,117
/*
* Copyright 2013 Brian Rosenberger (Brutex Network)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.brutex.xservices.util;
 
 
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.concurrent.atomic.AtomicBoolean;
 
 
/**
* Construct a HTTP POST and send it.
*
* @author Brian Rosenberger, bru(at)brutex.de
* @since 0.1
*/
@Slf4j
public class SimpleSoap {
 
private final String url;
private final String soapBody;
private final String id;
private long duration = 0;
 
 
final AtomicBoolean isInterrupted = new AtomicBoolean(false);
/**
* Instantiates a new simple http event.
*
* @param url the url
* @param soapBody the soap body
*/
public SimpleSoap(String url, String id, String soapBody) {
this.url = url;
this.id = id;
this.soapBody = soapBody;
}
 
/**
* Send soap.
*
* @param isDropResponse show interest in response or not
* @throws ClientProtocolException the client protocol exception
* @throws IOException Signals that an I/O exception has occurred.
*/
public Reader sendSoap(boolean isDropResponse) {
Reader response = null;
long start = System.currentTimeMillis();
EntityBuilder entitybuilder = EntityBuilder.create();
entitybuilder.setContentEncoding("UTF-8");
entitybuilder.setText(soapBody);
HttpEntity entity = entitybuilder.build();
 
log.trace("Sending event '{}' to target ALF Event Manager.", id);
 
if(isInterrupted.get()) return null;
 
try {
Response resp = Request.Post(url)
.addHeader("Accept", "text/xml")
.addHeader("Content-Type", "text/xml; charset=utf-8")
.addHeader("SOAPAction", "")
.body(entity).execute();
 
if (!isDropResponse) {
HttpEntity e = resp.returnResponse().getEntity();
response = new BufferedReader(new InputStreamReader(e.getContent()));
/*
StringBuilder sb = new StringBuilder();
BufferedReader in = new BufferedReader(new InputStreamReader(e.getContent()));
String s;
while ((s = in.readLine()) != null) {
sb.append(s);
}
log.trace("Response: \n {}", sb.toString());
if (sb.toString().contains("<soap:Fault>")) { return false;};
if (! sb.toString().contains(":Envelope ")) { return false;};
 
*/
} else {
log.debug("Response intentionally ignored.");
}
} catch (IOException e) {
log.error("Error sending ALF Event '{}'. Got IOException: {}", id, e.getMessage());
}
 
duration = System.currentTimeMillis() - start;
return response;
}
 
public void interrupt() {
this.isInterrupted.set(true);
}
}
Property changes:
Added: svn:mime-type
+text/plain
\ No newline at end of property