CONTEXT
I want to share a problem that we had in our project. We were doing a real-time “Profits and Loss” server (P&L). The server sends stock updates to all the users subscribed to the stocks, basically as Google Finance or Yahoo Finance.
SIMPLE IMPLEMENTATION
I will used a basic approach (no aggregation and no optimization) to explain the problem that we had with Serialization and Hessian.
The server always keep the last value on the stocks, because when a user ask for a quote we want to send back as soon as possible the
last value (from the cache) that we had on that stock).
We kept the stock prices (BID and ASK) in a simple object : SymbolSerializable.
When a update is received for a stock, we update the values in his SymbolSerializable and send back the updated values to all
client subscribed to this stock.
...
symbol.setBid(update.getBid());
symbol.setAsk(update.getAsk());
...
for(Client client : clientList){
client.sendUpdate(symbol);
}
....
It can’t be more simpler than that. The client will received each updates on his stocks.
The problems that we had was surprising. The clients were receiving the always the same price on a stock !
WHY ?
here an example :
stock : ABC
Updates :
1 : bid=10.25$, ask=10.50$
2 : bid=11.50$, ask=11.75$
3 : bid=12.00$, ask=12.15$
and the clients received :
1 : bid=10.25$, ask=10.50$
2 : bid=10.25$, ask=10.50$
3 : bid=10.25$, ask=10.50$
To debug our server we printed the values sent to the client, and we saw in the server’s logs that the values were corrects.
value sent to client :
bid=10.25$, ask=10.50$
value sent to client :
bid=11.50$, ask=11.75$
value sent to client :
bid=12.00$, ask=12.15$
Everything look fine, why it doesn’t work on the client side ?
INVESTIGATION/SOLUTION
The only thing that could cause the problem were our Serialization in the server.
I’ll show you four different test cases. See the implementations below.
- #1 : Reference implementation using Serializable (failed)
- #2 : Changed Serializable for Externalizable interface (failed)
- #3 : Serializable using new (passed)
- #4 : Serializable using reset (passed)
| #1 : Serializable : Reference | #2 : Externalizable | #3 : Serializable with new | #4 : Serializable with reset |
|---|---|---|---|
|
|
|
|
Here the Serializable pojo
package ca.sebastiendionne.demo.model;
import java.io.Serializable;
public class SymbolSerializable implements Serializable {
private static final long serialVersionUID = 7853892880704628717L;
Double bid = null;
Double ask = null;
public SymbolSerializable(){
}
public Double getBid() {
return bid;
}
public void setBid(Double bid) {
this.bid = bid;
}
public Double getAsk() {
return ask;
}
public void setAsk(Double ask) {
this.ask = ask;
}
@Override
public String toString() {
return "bid = " + bid + " ask=" + ask;
}
}
Here the Externalizable pojo
package ca.sebastiendionne.demo.model;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
public class SymbolExternalizable implements Externalizable {
private static final long serialVersionUID = 1853892880704628717L;
Double bid = null;
Double ask = null;
public SymbolExternalizable(){
}
public Double getBid() {
return bid;
}
public void setBid(Double bid) {
this.bid = bid;
}
public Double getAsk() {
return ask;
}
public void setAsk(Double ask) {
this.ask = ask;
}
@Override
public String toString() {
return "bid = " + bid + " ask=" + ask;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
bid = (Double)in.readObject();
ask = (Double)in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(bid);
out.writeObject(ask);
}
}
For the test cases I use a for loop to create Symbol and serialized on the hard drive and unserialized them back.
Here the code for the test case #1 (used as reference)
package ca.sebastiendionne.demo;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import ca.sebastiendionne.demo.model.SymbolSerializable;
public class SerializationFailed {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File("test.ser")));
SymbolSerializable symbol = new SymbolSerializable();
// dummy values
symbol.setAsk(new Double(-1));
symbol.setBid(new Double(-1));
for(int i=0;i<10;i++){
// update values
symbol.setAsk(new Double(i));
symbol.setBid(new Double(i));
oos.writeObject(symbol);
oos.flush();
}
oos.flush();
oos.close();
// read
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("test.ser")));
try {
Object obj = null;
while((obj = ois.readObject())!=null){
System.out.println((SymbolSerializable)obj);
}
} catch (EOFException e) {
System.out.println("All items read");
} catch(Exception e){
e.printStackTrace();
} finally {
ois.close();
}
}
}
Here the code for the test case #2
package ca.sebastiendionne.demo;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import ca.sebastiendionne.demo.model.SymbolExternalizable;
public class SerializationFailed2 {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File("test.ser")));
SymbolExternalizable symbol = new SymbolExternalizable();
// dummy values
symbol.setAsk(new Double(-1));
symbol.setBid(new Double(-1));
for(int i=0;i<10;i++){
// update values
symbol.setAsk(new Double(i));
symbol.setBid(new Double(i));
oos.writeObject(symbol);
oos.flush();
}
oos.flush();
oos.close();
// read
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("test.ser")));
try {
Object obj = null;
while((obj = ois.readObject())!=null){
System.out.println((SymbolExternalizable)obj);
}
} catch (EOFException e) {
System.out.println("All items read");
} catch(Exception e){
e.printStackTrace();
} finally {
ois.close();
}
}
}
Here the code for the test case #3
package ca.sebastiendionne.demo;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import ca.sebastiendionne.demo.model.SymbolSerializable;
public class SerializationWithNew {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File("test.ser")));
SymbolSerializable symbol = new SymbolSerializable();
// dummy values
symbol.setAsk(new Double(-1));
symbol.setBid(new Double(-1));
for(int i=0;i<10;i++){
// update values
symbol.setAsk(new Double(i));
symbol.setBid(new Double(i));
SymbolSerializable symbol2 = new SymbolSerializable();
symbol2.setAsk(symbol.getAsk());
symbol2.setBid(symbol.getBid());
oos.writeObject(symbol2);
oos.flush();
}
oos.flush();
oos.close();
// read
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("test.ser")));
try {
Object obj = null;
while((obj = ois.readObject())!=null){
System.out.println((SymbolSerializable)obj);
}
} catch (EOFException e) {
System.out.println("All items read");
} catch(Exception e){
e.printStackTrace();
} finally {
ois.close();
}
}
}
Here the code for the test case #4
package ca.sebastiendionne.demo;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import ca.sebastiendionne.demo.model.SymbolSerializable;
public class SerializationWithReset {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File("test.ser")));
SymbolSerializable symbol = new SymbolSerializable();
// dummy values
symbol.setAsk(new Double(-1));
symbol.setBid(new Double(-1));
for(int i=0;i<10;i++){
// update values
symbol.setAsk(new Double(i));
symbol.setBid(new Double(i));
oos.writeObject(symbol);
oos.flush();
oos.reset(); // magic line
}
oos.flush();
oos.close();
// read
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("test.ser")));
try {
Object obj = null;
while((obj = ois.readObject())!=null){
System.out.println((SymbolSerializable)obj);
}
} catch (EOFException e) {
System.out.println("All items read");
} catch(Exception e){
e.printStackTrace();
} finally {
ois.close();
}
}
}
I can't say what is the performance impact if we use reset() on each updates sent. It is better to use new or reset ?
You can follow me on Twitter : http://twitter.com/survivant
What happens if you call ObjectOutputStream.writeUnshared(Object obj) instead of writeObject? I think that you sent the object (1), changed its state, and sent the object again (2). Since it was the same object, serialization at (2) just saved a reference to what it wrote at (1).
See http://download.oracle.com/javase/1.4.2/docs/api/java/io/ObjectOutputStream.html#writeUnshared(java.lang.Object)
Look like using
oos.writeUnshared(symbol);
works fine in my example. Too bad that I can’t test that with the real application, I left that job.
We read the javadoc and google about our problem, but didn’t find an answer. So we fool around.
Hope that will help other people.
The java serialization mechanism ensures that if we send two objects (A and B), if A == B on the sender-side, thus A == B also on the receiver side. This is done automatically by the ObjectOutputStream, which memorizes all the object sent. This has several advantages, but it also introduces a few problems.
One is the one found by you: if you have mutable objects sent more than once, than the receiver will only receive the first status.
Another drawback is the memory consumption. The OutputStream stores all the objects you send in a regular map (not a WeakHashMap), hence it will prevent the garbage collector to free them. If you are sending a lot of new objects (e.g. out.write(new new SymbolSerializable()); ) for a very long time (you are writing a server application), then you will have soon an out of memory error.
For this reason, I usually group related writes in “transactions”, and I perform a reset once a transaction is completed.