/****************************************************************************** * Product: Adempiere ERP & CRM Smart Business Solution * * Copyright (C) 1999-2006 ComPiere, Inc. All Rights Reserved. * * This program is free software; you can redistribute it and/or modify it * * under the terms version 2 of the GNU General Public License as published * * by the Free Software Foundation. This program is distributed in the hope * * that it will be useful, but WITHOUT ANY WARRANTY; without even the implied * * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * * with this program; if not, write to the Free Software Foundation, Inc., * * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. * * For the text or an alternative of this public license, you may reach us * * ComPiere, Inc., 2620 Augustine Dr. #245, Santa Clara, CA 95054, USA * * or via info@compiere.org or http://www.compiere.org/license.html * *****************************************************************************/ package org.compiere.util; import java.io.PrintWriter; import java.io.StringWriter; import java.sql.Connection; import java.sql.SQLException; import java.sql.Savepoint; import java.util.Collection; import java.util.Date; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.adempiere.exceptions.AdempiereException; import org.adempiere.exceptions.DBException; import org.compiere.Adempiere; import org.compiere.db.StatementProxy; import org.compiere.model.MSysConfig; import org.compiere.model.PO; /** *
 *	Transaction Management.
 *	- Create new Transaction by Trx.get(name);
 *	- ..transactions..
 *	- commit();
 *	----	start();
 *	----	commit();
 *	- close();
 *	
* @author Jorg Janke * @author Low Heng Sin *
  • added rollback(boolean) and commit(boolean) [20070105] *
  • remove unnecessary use of savepoint *
  • use UUID for safer transaction name generation * @author Teo Sarca, http://www.arhipac.ro *
  • FR [ 2080217 ] Implement TrxRunnable *
  • BF [ 2876927 ] Oracle JDBC driver problem * https://sourceforge.net/p/adempiere/bugs/2173/ * @author Teo Sarca, teo.sarca@gmail.com *
  • BF [ 2849122 ] PO.AfterSave is not rollback on error - add releaseSavepoint method * https://sourceforge.net/p/adempiere/bugs/2073/ */ public class Trx { /** * Get Transaction * @param trxName trx name * @param createNew if false, null is returned if not found * @return Transaction or null */ public static Trx get (String trxName, boolean createNew) { if (trxName == null || trxName.length() == 0) throw new IllegalArgumentException ("No Transaction Name"); Trx retValue = (Trx)s_cache.get(trxName); if (retValue == null && createNew) { retValue = new Trx (trxName); s_cache.put(trxName, retValue); } return retValue; } // get /** * Get Transaction in a Connection * @param trxName trx name * @param createNew if false, null is returned if not found * @param con Connection * @return Transaction or null */ @Deprecated public static Trx get (String trxName, boolean createNew, Connection con) { if (trxName == null || trxName.length() == 0) throw new IllegalArgumentException ("No Transaction Name"); Trx retValue = (Trx)s_cache.get(trxName); if (retValue == null && createNew) { retValue = new Trx (trxName, con); s_cache.put(trxName, retValue); } return retValue; } // get /** Transaction Cache */ private static final Map s_cache = new ConcurrentHashMap(); /** Transaction timeout monitor */ private static final Trx.TrxMonitor s_monitor = new Trx.TrxMonitor(); /** Transaction event listeners */ private ConcurrentLinkedQueue listeners = new ConcurrentLinkedQueue(); protected Exception trace; private String m_displayName; private boolean m_changesMadeByEventListener = false; /** * Start transaction timeout monitor (run every 5 minutes) */ public static void startTrxMonitor() { Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(s_monitor, 5, 5, TimeUnit.MINUTES); } /** * Create unique Transaction Name * @param prefix optional prefix * @return unique transaction name */ public static String createTrxName (String prefix) { String displayName = null; if (prefix == null || prefix.length() == 0) { prefix = "Trx"; if (MSysConfig.getBooleanValue(MSysConfig.TRX_AUTOSET_DISPLAY_NAME, false)) { StackWalker walker = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE); Optional stackName = walker.walk(frames -> frames.map( stackFrame -> stackFrame.getClassName() + "." + stackFrame.getMethodName() + ":" + stackFrame.getLineNumber()) .filter(f -> ! f.startsWith(Trx.class.getName() + ".")) .findFirst()); displayName = (stackName.orElse(null)); } } prefix += "_" + UUID.randomUUID(); //System.currentTimeMillis(); //create transaction entry Trx trx = Trx.get(prefix, true); if (displayName != null) trx.setDisplayName(displayName); return prefix; } // createTrxName /** * Create unique Transaction Name * @return unique name */ public static String createTrxName () { return createTrxName(null); } // createTrxName /** * Transaction Constructor * @param trxName unique name */ private Trx (String trxName) { setTrxName (trxName); if (trxName.length() < 36) { String msg = "Illegal transaction name format, not prefix+UUID or UUID: " + trxName; log.log(Level.SEVERE, msg, new Exception(msg)); } } // Trx /** * Transaction Constructor * @param trxName unique name * @param con optional connection ( ignore for remote transaction ) */ @Deprecated private Trx (String trxName, Connection con) { this(trxName); setConnection (con); } // Trx /** Logger */ private CLogger log = CLogger.getCLogger(getClass()); private Connection m_connection = null; private String m_trxName = null; private boolean m_active = false; private long m_startTime; /** transaction timeout, in seconds **/ private int m_timeout = 60 * 120; //120 minutes /** * Get connection * @return connection */ public Connection getConnection() { return getConnection(true); } /** * Get Connection * @param createNew if true, create new connection if the trx does not have one created yet * @return connection */ public synchronized Connection getConnection(boolean createNew) { if (log.isLoggable(Level.ALL))log.log(Level.ALL, "Active=" + isActive() + ", Connection=" + m_connection); if (m_connection == null) // get new Connection { if (createNew) { if (!s_cache.containsKey(m_trxName)) { new Exception("Illegal to getConnection for Trx that is not register.").printStackTrace(); return null; } setConnection(DB.createConnection(false, Connection.TRANSACTION_READ_COMMITTED)); } else return null; } if (!isActive()) start(); if (MSysConfig.getBooleanValue(MSysConfig.TRACE_ALL_TRX_CONNECTION_GET, false)) trace = new Exception(); return m_connection; } // getConnection /** * Set Connection * @param conn connection */ private void setConnection (Connection conn) { if (conn == null) return; m_connection = conn; if (log.isLoggable(Level.FINEST)) log.finest("Connection=" + conn); try { m_connection.setAutoCommit(false); } catch (SQLException e) { log.log(Level.SEVERE, "connection", e); } trace = new Exception(); } // setConnection /** * Set Trx Name * @param trxName transaction name */ private void setTrxName (String trxName) { if (trxName == null || trxName.length() == 0) throw new IllegalArgumentException ("No Transaction Name"); m_trxName = trxName; } // setName /** * Get Name * @return transaction name */ public String getTrxName() { return m_trxName; } // getName /** * Start Trx * @return true if trx started */ public boolean start() { if (m_active) { log.warning("Trx in progress " + m_trxName); return false; } m_active = true; m_startTime = System.currentTimeMillis(); m_changesMadeByEventListener = false; return true; } // startTrx /** * @return The start time of this transaction */ public Date getStartTime() { return new Date(m_startTime); } /** * Transaction is Active * @return true if transaction is active */ public boolean isActive() { return m_active; } // isActive /** * Rollback * @param throwException if true, re-throws exception * @return true if success, false if failed or transaction already rollback */ public synchronized boolean rollback(boolean throwException) throws SQLException { //local try { if (m_connection != null) { m_connection.rollback(); log.log(isLocalTrx(m_trxName) ? Level.FINE : Level.INFO, "**** " + m_trxName); m_active = false; fireAfterRollbackEvent(true); return true; } } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); if (throwException) { m_active = false; fireAfterRollbackEvent(false); throw e; } } m_active = false; fireAfterRollbackEvent(false); return false; } // rollback /** * Fire after rollback event * @param success */ private void fireAfterRollbackEvent(boolean success) { for(TrxEventListener l : listeners) { l.afterRollback(this, success); } } /** * Rollback * @return true if success, false if failed or transaction already rollback */ public boolean rollback() { try { return rollback(false); } catch (SQLException e) { return false; } } /** * Rollback to save point * @param savepoint * @return true if success, false if failed or transaction already rollback * @throws SQLException */ public boolean rollback(Savepoint savepoint) throws SQLException { //local try { if (m_connection != null) { m_connection.rollback(savepoint); if (log.isLoggable(Level.INFO)) log.info ("**** " + m_trxName); m_changesMadeByEventListener = false; return true; } } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); throw e; } return false; } // rollback /** * Commit * @param throwException if true, re-throws exception * @return true if success **/ public synchronized boolean commit(boolean throwException) throws SQLException { //local try { if (m_connection != null) { m_connection.commit(); if (log.isLoggable(Level.INFO)) log.info ("**** " + m_trxName); m_active = false; fireAfterCommitEvent(true); return true; } } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); if (throwException) { m_active = false; fireAfterCommitEvent(false); throw e; } else { String msg = DBException.getDefaultDBExceptionMessage(e); log.saveError(msg != null ? msg : e.getLocalizedMessage(), e); } } m_active = false; fireAfterCommitEvent(false); return false; } // commit /** * Fire after commit event * @param success */ private void fireAfterCommitEvent(boolean success) { for(TrxEventListener l : listeners) { l.afterCommit(this, success); } } /** * Commit * @return true if success */ public boolean commit() { try { return commit(false); } catch(SQLException e) { return false; } } /** * Rollback and close transaction.
    * This is means to be called by the timeout monitor and developer usually shouldn't call this directly. * @return true if success */ public boolean rollbackAndCloseOnTimeout() { boolean success = false; try { rollback(true); } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); } finally { success = close(); } return success; } /** * End Transaction and Close Connection * @return true if success */ public synchronized boolean close() { s_cache.remove(getTrxName()); //local if (m_connection == null) return true; try { if (isActive() && !m_connection.isReadOnly()) commit(); } catch (SQLException e) { } // Close Connection try { m_connection.setAutoCommit(true); } catch (SQLException e) { } finally { //ensure connection return to pool with readonly=false try { if (m_connection.isReadOnly()) { m_connection.setReadOnly(false); } } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); } try { m_connection.close(); } catch (SQLException e) { log.log(Level.SEVERE, m_trxName, e); } } m_connection = null; trace = null; m_active = false; fireAfterCloseEvent(); if (log.isLoggable(Level.CONFIG)) log.config(m_trxName); return true; } // close /** * Fire after close event */ private void fireAfterCloseEvent() { for(TrxEventListener l : listeners) { l.afterClose(this); } } /** * Set transaction save point * @param name optional savepoint name * @return Savepoint * @throws SQLException */ public synchronized Savepoint setSavepoint(String name) throws SQLException { if (m_connection == null) getConnection(); if(m_connection != null) { if (name != null) return m_connection.setSavepoint(name); else return m_connection.setSavepoint(); } else { return null; } } private Savepoint m_lastWFSavepoint = null; /** * Set last workflow save point.
    * For workflow engine use, developer usually shouldn't call this method directly. * @param savepoint */ public synchronized void setLastWFSavepoint(Savepoint savepoint) { m_lastWFSavepoint = savepoint; } /** * Get last set workflow save point. * For workflow engine use, developer usually shouldn't call this method directly. * @return last set workflow save point or null */ public synchronized Savepoint getLastWFSavepoint() { return m_lastWFSavepoint; } /** * Release Savepoint * @param savepoint * @throws SQLException * @see {@link Connection#releaseSavepoint(Savepoint)} */ public synchronized void releaseSavepoint(Savepoint savepoint) throws SQLException { if (DB.isOracle()) { // Note: As of Oracle Database 10g, releaseSavepoint and // oracleReleaseSavepoint are not supported. If you call either // of the methods, then SQLException is thrown with the message // "Unsupported feature". // -- 4-4 Oracle Database JDBC Developer's Guide and Reference return; } if (m_connection == null) { getConnection(); } if(m_connection != null) { m_connection.releaseSavepoint(savepoint); } } /** * String Representation * @return info */ @Override public String toString() { StringBuilder sb = new StringBuilder("Trx["); sb.append(getDisplayName()) .append(",Active=").append(isActive()) .append("]"); return sb.toString(); } // toString /** * Get register transactions * @return array of register transactions */ public static Trx[] getOpenTransactions() { Collection collections = s_cache.values(); Trx[] trxs = new Trx[collections.size()]; collections.toArray(trxs); return trxs; } /** * @return Trx[] * @deprecated - wrong method name fixed with IDEMPIERE-5355 - please use getOpenTransactions */ public static Trx[] getActiveTransactions() { return getOpenTransactions(); } /** * @see #run(String, TrxRunnable) */ public static void run(TrxRunnable r) { run(null, r); } /** * Execute runnable object using provided transaction.
    * If execution fails, database operations will be rolled back. *

    * Example:

    	 * Trx.run(null, new {@link TrxRunnable}() {
    	 *     public void run(String trxName) {
    	 *         // do something using trxName
    	 *     }
    	 * )};
    	 * 
    * * @param trxName transaction name (if null, a new transaction will be created) * @param r runnable object * @throws RuntimeException or {@link AdempiereException} */ public static void run(String trxName, TrxRunnable r) { boolean localTrx = false; if (trxName == null) { trxName = Trx.createTrxName("TrxRun"); localTrx = true; } Trx trx = Trx.get(trxName, true); Savepoint savepoint = null; try { if (!localTrx) savepoint = trx.setSavepoint(null); r.run(trxName); if (localTrx) trx.commit(true); } catch (Throwable e) { // Rollback transaction if (localTrx) { trx.rollback(); } else if (savepoint != null) { try { trx.rollback(savepoint); } catch (SQLException e2) {;} } // Throw exception if (e instanceof RuntimeException) { throw (RuntimeException)e; } else { throw new AdempiereException(e); } } finally { if (localTrx && trx != null) { trx.close(); trx = null; } } } /** * Get transaction timout value * @return trx timoue value in second */ public int getTimeout() { return m_timeout; } /** * set transaction timeout value ( in seconds ) * @param timeout */ public void setTimeout(int timeout) { m_timeout = timeout; } /** * Add transaction event listener * @param listener */ public void addTrxEventListener(TrxEventListener listener) { listeners.add(listener); } /** * Remove transaction event listener * @param listener * @return true if listener is found and remove */ public boolean removeTrxEventListener(TrxEventListener listener) { return listeners.remove(listener); } /** * Get stack trace save * @return stack trace save or empty string */ public String getStrackTrace() { if (trace != null) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter); trace.printStackTrace(printWriter); return stringWriter.getBuffer().toString(); } else { return ""; } } /** * Get transaction display name. Fall back to transaction name if display name is not set. * @return display name or name */ public String getDisplayName() { return m_displayName != null ? m_displayName : m_trxName; } /** * Set transaction display name * @param displayName */ public void setDisplayName(String displayName) { m_displayName = displayName; } /** * Indicate additional DB changes have been made by a transaction event listener * @param changesMade */ public void setChangesMadeByEventListener(boolean changesMade) { m_changesMadeByEventListener = changesMade; } /** * Is there additional changes make by transaction event listener * @return true if event listener(s) has flag that additional db changes have been made */ public boolean hasChangesMadeByEventListener() { return m_changesMadeByEventListener; } /** Transaction timeout monitor class */ public static class TrxMonitor implements Runnable { public void run() { if (!Trx.s_cache.isEmpty()) { Trx[] trxs = Trx.s_cache.values().toArray(new Trx[0]); for(int i = 0; i < trxs.length; i++) { if (trxs[i].m_startTime <= 0) continue; long since = System.currentTimeMillis() - trxs[i].m_startTime; if (since > trxs[i].getTimeout() * 1000) { trxs[i].log.log(Level.WARNING, "Transaction timeout. Name="+trxs[i].getTrxName() + ", timeout(sec)="+(since / 1000)); if (trxs[i].trace != null) { trxs[i].log.log(Level.WARNING, "Transaction timeout. Trace:\n" + trxs[i].getStrackTrace()); } trxs[i].rollbackAndCloseOnTimeout(); } } } } } /** * @param trxName * @return true if trxName is a local transaction */ private boolean isLocalTrx(String trxName) { return trxName == null || trxName.startsWith(PO.LOCAL_TRX_PREFIX) // TODO: hardcoded ; } @Override protected void finalize() throws Throwable { if (m_connection != null && trace != null) { final Trx me = this; Adempiere.getThreadPoolExecutor().schedule(new Runnable() { @Override public void run() { if (me.m_connection != null && me.trace != null) { log.log(Level.WARNING, "Trx Not Close: " + me.getStrackTrace()); me.trace = null; me.close(); } } }, 2, TimeUnit.SECONDS); } } /** * Register a null trx * @return */ public static String registerNullTrx() { String nullTrxName = "NullTrx_" + UUID.randomUUID().toString(); Trx nullTrx = new Trx(nullTrxName); nullTrx.trace = new Exception(); nullTrx.m_startTime = System.currentTimeMillis(); String displayName = null; StackWalker walker = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE); Optional stackName = walker.walk(frames -> frames.map( stackFrame -> stackFrame.getClassName() + "." + stackFrame.getMethodName() + ":" + stackFrame.getLineNumber()) .filter(f -> ! (f.startsWith(Trx.class.getName() + ".") || f.startsWith(StatementProxy.class.getName() + ".") || f.startsWith("jdk.proxy") || f.startsWith("org.compiere.util.DB."))) .findFirst()); displayName = (stackName.orElse(null)); if (displayName != null) nullTrx.setDisplayName(displayName); s_cache.put(nullTrxName, nullTrx); return nullTrxName; } /** * Unregister a null trx * @param nullTrxName */ public static void unregisterNullTrx(String nullTrxName) { Trx nullTrx = s_cache.get(nullTrxName); nullTrx.setDisplayName(null); nullTrx.trace = null; s_cache.remove(nullTrxName); } } // Trx