View Javadoc

1   /**
2    *
3    * Copyright 2004 The Apache Software Foundation
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.geronimo.timer.jdbc;
19  
20  import java.sql.Connection;
21  import java.sql.PreparedStatement;
22  import java.sql.ResultSet;
23  import java.sql.SQLException;
24  import java.sql.Types;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Date;
28  import javax.sql.DataSource;
29  
30  import com.thoughtworks.xstream.XStream;
31  import org.apache.geronimo.timer.PersistenceException;
32  import org.apache.geronimo.timer.Playback;
33  import org.apache.geronimo.timer.WorkInfo;
34  import org.apache.geronimo.timer.WorkerPersistence;
35  
36  /**
37   * TODO use an insert returning or stored procedure to insert.
38   *
39   * @version $Rev: 355877 $ $Date: 2005-12-10 18:48:27 -0800 (Sat, 10 Dec 2005) $
40   */
41  public class JDBCWorkerPersistence implements WorkerPersistence {
42  
43      private static final String createSequenceSQL = "create sequence timertasks_seq";
44      private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)";
45      private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))";
46      private static final String sequenceSQL = "select timertasks_seq.nextval";
47      private static final String identitySQL = "values IDENTITY_VAL_LOCAL()";
48      private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)";
49      private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)";
50      private static final String deleteSQL = "delete from timertasks where id=?";
51      private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?";
52      private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?";
53      private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?";
54      private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)";
55  
56      private final String serverUniqueId;
57      private final DataSource dataSource;
58      private boolean useSequence = false;
59  
60      protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException {
61          this.serverUniqueId = serverUniqueId;
62          this.dataSource = datasource;
63          this.useSequence = useSequence;
64          if (this.useSequence) {
65              execSQL(createSequenceSQL);
66              execSQL(createTableSQLWithSequence);
67          } else {
68              execSQL(createTableSQLWithIdentity);
69          }
70      }
71  
72  
73      public void save(WorkInfo workInfo) throws PersistenceException {
74          try {
75              Connection c = dataSource.getConnection();
76              try {
77                  if (useSequence) {
78                      long id;
79                      PreparedStatement seqStatement = c.prepareStatement(sequenceSQL);
80                      try {
81                          ResultSet seqRS = seqStatement.executeQuery();
82                          try {
83                              seqRS.next();
84                              id = seqRS.getLong(1);
85                          } finally {
86                              seqRS.close();
87                          }
88                      } finally {
89                          seqStatement.close();
90                      }
91                      workInfo.setId(id);
92                      PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence);
93                      try {
94                          String serializedUserId = serialize(workInfo.getUserId());
95                          String serializedUserKey = serialize(workInfo.getUserInfo());
96                          insertStatement.setLong(1, id);
97                          insertStatement.setString(2, serverUniqueId);
98                          insertStatement.setString(3, workInfo.getKey());
99                          insertStatement.setString(4, serializedUserId);
100                         insertStatement.setString(5, serializedUserKey);
101                         insertStatement.setLong(6, workInfo.getTime().getTime());
102                         if (workInfo.getPeriod() == null) {
103                             insertStatement.setNull(7, Types.NUMERIC);
104                         } else {
105                             insertStatement.setLong(7, workInfo.getPeriod().longValue());
106                         }
107                         insertStatement.setBoolean(8, workInfo.getAtFixedRate());
108                         int result = insertStatement.executeUpdate();
109                         if (result != 1) {
110                             throw new PersistenceException("Could not insert!");
111                         }
112                     } finally {
113                         insertStatement.close();
114                     }
115                 } else {
116                     PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity);
117                     try {
118                         String serializedUserId = serialize(workInfo.getUserId());
119                         String serializedUserKey = serialize(workInfo.getUserInfo());
120                         insertStatement.setString(1, serverUniqueId);
121                         insertStatement.setString(2, workInfo.getKey());
122                         insertStatement.setString(3, serializedUserId);
123                         insertStatement.setString(4, serializedUserKey);
124                         insertStatement.setLong(5, workInfo.getTime().getTime());
125                         if (workInfo.getPeriod() == null) {
126                             insertStatement.setNull(6, Types.NUMERIC);
127                         } else {
128                             insertStatement.setLong(6, workInfo.getPeriod().longValue());
129                         }
130                         insertStatement.setBoolean(7, workInfo.getAtFixedRate());
131                         int result = insertStatement.executeUpdate();
132                         if (result != 1) {
133                             throw new PersistenceException("Could not insert!");
134                         }
135                     } finally {
136                         insertStatement.close();
137                     }
138                     long id;
139                     PreparedStatement identityStatement = c.prepareStatement(identitySQL);
140                     try {
141                         ResultSet seqRS = identityStatement.executeQuery();
142                         try {
143                             seqRS.next();
144                             id = seqRS.getLong(1);
145                         } finally {
146                             seqRS.close();
147                         }
148                     } finally {
149                         identityStatement.close();
150                     }
151                     workInfo.setId(id);
152                 }
153             } finally {
154                 c.close();
155             }
156         } catch (SQLException e) {
157             throw new PersistenceException(e);
158         }
159     }
160 
161     public void cancel(long id) throws PersistenceException {
162         try {
163             Connection c = dataSource.getConnection();
164             try {
165                 PreparedStatement deleteStatement = c.prepareStatement(deleteSQL);
166                 try {
167                     deleteStatement.setLong(1, id);
168                     deleteStatement.execute();
169                 } finally {
170                     deleteStatement.close();
171                 }
172             } finally {
173                 c.close();
174             }
175         } catch (SQLException e) {
176             throw new PersistenceException(e);
177         }
178 
179     }
180 
181     public void playback(String key, Playback playback) throws PersistenceException {
182         try {
183             Connection c = dataSource.getConnection();
184             try {
185                 PreparedStatement selectStatement = c.prepareStatement(selectSQL);
186                 selectStatement.setString(1, serverUniqueId);
187                 selectStatement.setString(2, key);
188                 try {
189                     ResultSet taskRS = selectStatement.executeQuery();
190                     try {
191                         while (taskRS.next()) {
192                             long id = taskRS.getLong(1);
193                             String serizalizedUserId = taskRS.getString(2);
194                             Object userId = deserialize(serizalizedUserId);
195                             String serializedUserInfo = taskRS.getString(3);
196                             Object userInfo = deserialize(serializedUserInfo);
197                             long timeMillis = taskRS.getLong(4);
198                             Date time = new Date(timeMillis);
199                             Long period = null;
200                             period = new Long(taskRS.getLong(5));
201                             if (!taskRS.wasNull()) {
202                                 period = null;
203                             }
204                             boolean atFixedRate = taskRS.getBoolean(6);
205                             //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence.
206                             WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
207                             workInfo.setId(id);
208                             playback.schedule(workInfo);
209                         }
210                     } finally {
211                         taskRS.close();
212                     }
213                 } finally {
214                     selectStatement.close();
215                 }
216             } finally {
217                 c.close();
218             }
219         } catch (SQLException e) {
220             throw new PersistenceException(e);
221         }
222     }
223 
224     public void fixedRateWorkPerformed(long id) throws PersistenceException {
225         try {
226             Connection c = dataSource.getConnection();
227             try {
228                 PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL);
229                 try {
230                     updateStatement.setLong(1, id);
231                     updateStatement.execute();
232                 } finally {
233                     updateStatement.close();
234                 }
235             } finally {
236                 c.close();
237             }
238         } catch (SQLException e) {
239             throw new PersistenceException(e);
240         }
241     }
242 
243     public void intervalWorkPerformed(long id, long period) throws PersistenceException {
244         long next = System.currentTimeMillis() + period;
245         try {
246             Connection c = dataSource.getConnection();
247             try {
248                 PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL);
249                 try {
250                     updateStatement.setLong(1, next);
251                     updateStatement.setLong(2, id);
252                     updateStatement.execute();
253                 } finally {
254                     updateStatement.close();
255                 }
256             } finally {
257                 c.close();
258             }
259         } catch (SQLException e) {
260             throw new PersistenceException(e);
261         }
262     }
263 
264     public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
265         Collection ids = new ArrayList();
266         try {
267             Connection c = dataSource.getConnection();
268             try {
269                 PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL);
270                 selectStatement.setString(1, serverUniqueId);
271                 selectStatement.setString(2, key);
272                 if (userId == null) {
273                     selectStatement.setNull(3, Types.VARCHAR);
274                     selectStatement.setNull(4, Types.VARCHAR);
275                 } else {
276                     String userIdString = serialize(userId);
277                     selectStatement.setString(3, userIdString);
278                     selectStatement.setString(4, userIdString);
279                 }
280                 try {
281                     ResultSet taskRS = selectStatement.executeQuery();
282                     try {
283                         while (taskRS.next()) {
284                             long id = taskRS.getLong(1);
285                             ids.add(new Long(id));
286                         }
287                     } finally {
288                         taskRS.close();
289                     }
290                 } finally {
291                     selectStatement.close();
292                 }
293             } finally {
294                 c.close();
295             }
296         } catch (SQLException e) {
297             throw new PersistenceException(e);
298         }
299         return ids;
300     }
301 
302     private String serialize(Object task) {
303         XStream xStream = new XStream();
304         return xStream.toXML(task);
305     }
306 
307     private Object deserialize(String serializedRunnable) {
308         XStream xStream = new XStream();
309         return xStream.fromXML(serializedRunnable);
310     }
311 
312     private void execSQL(String sql) throws SQLException {
313         Connection c = dataSource.getConnection();
314         try {
315             PreparedStatement updateStatement = c.prepareStatement(sql);
316             try {
317                 updateStatement.execute();
318             } catch (SQLException e) {
319                 //ignore... table already exists.
320             } finally {
321                 updateStatement.close();
322             }
323         } finally {
324             c.close();
325         }
326     }
327 
328 }