Download Code: coherence-identity-0.1.zip
One of the things that often gets Coherence newbies is the fact that there is no built-in facility that can be used to generate object identifiers, something similar to Oracle sequences or SQL Server identity columns. Unlike databases, which store data as tuples within the database tables, Coherence stores objects as cache entries consisting of a key (identity) and associated value.
That means that in order to insert an object into a Coherence cache, you need to provide its identity as well, to serve as the key.
If the object you need to insert into the cache has a natural key, your job is easy. For example, if you need to cache Country instances, you can simply use their ISO codes as cache keys. Unfortunately, most objects don't have natural keys, or have a composite one that is too complex, so you need to come up with a way to generate synthetic keys up front.
One option is to use UUID as an identifier, and Coherence provides excellent UUID implementations in Java, .NET and C++, so if that alternative works for you, problem solved.
However, many people dislike using UUIDs as synthetic identifiers for their objects for several reasons:
- They are big. Coherence UUID implementation is 256 bits, which means that each key will be 32 bytes, compared to 4 bytes for an integer or 8 for a long (actually, likely even less than that, as both integers and longs are packed when serialized using POF).
- They are cumbersome to use. While for the most part you don't care about the type of identifier, UUIDs can get unwieldy when they leak to the surface, and more often they not they eventually will. For example, UUID within the URL doesn't look very friendly.
If you are in that group, and are looking for an easy way to assign unique integer-based identifiers to your objects, keep reading.
Sequence Generator Design
Conceptually, generating sequential numbers is quite simple -- all you need to do is to associate sequence name with the last assigned number, and provide a way to increment it. However, there are several issues to consider, especially in a highly concurrent and distributed system such as Coherence:
- Accessibility -- it should be easy for a client application to obtain the next sequence number, which means that the sequences should be stored in a central location accessible to all cluster nodes, as well as remote clients.
- Concurrency -- in order to ensure that no duplicate numbers are issued, you need to synchronize access to a sequence. If many clients try to obtain the next number from a sequence at the same time, this can lead to a bottleneck.
- Performance -- obtaining the next number should be fast. Ideally, there should be no network or database calls involved, especially in a highly concurrent environment.
- Reliability -- you must ensure that sequence numbers are persisted reliably as soon as they are incremented, or you might end up issuing duplicate numbers after system failure.
Coherence itself is an obvious central location, so we will create a sequences cache to store our sequences. The cache will be keyed by sequence name, and the values will be instances of a very simple Sequence class:
public class Sequence implements Serializable, PortableObject {
private long last;
...
}
Basically, the only state our sequence has is a long field that is used to store the last assigned number.
In order to improve performance and reduce contention on individual sequences, we will allow clients to request a block of numbers at once. The size of the block should be configurable on a case by case basis -- for some high-contention sequences it might make sense to allocate few hundred numbers in a single call, while for others you might want to allocate individual numbers by setting block size to 1.
Complete implementation of the SequenceBlock class looks like this:
public class SequenceBlock
implements Serializable, PortableObject
{
// ---- data members ----------------------------------------------------
/**
* The next assignable number within this sequence block.
*/
private AtomicLong m_next;
/**
* The last assignable number within this sequence block.
*/
private long m_last;
// ---- constructors ----------------------------------------------------
/**
* For internal use only.
*/
public SequenceBlock()
{
// deserialization constructor
}
/**
* Construct a new sequence block.
*
* @param first first number in a sequence
* @param last last number in a sequence
*/
public SequenceBlock(long first, long last)
{
m_next = new AtomicLong(first);
m_last = last;
}
// ---- public methods --------------------------------------------------
/**
* Return <tt>true</tt> if there are avialable numbers in this
* sequence block, <tt>false</tt> otherwise.
*
* @return <tt>true</tt> if there are avialable numbers in this
* sequence block, <tt>false</tt> otherwise
*/
public boolean hasNext()
{
return m_next.longValue() <= m_last;
}
/**
* Return the next available number in this sequence block.
*
* @return the next available number in this sequence block
*/
public long next()
{
return m_next.getAndIncrement();
}
// ---- PortableObject implementation -----------------------------------
/**
* Deserialize object from the POF stream.
*
* @param pofReader POF reader to use
*
* @throws IOException if an error occurs
*/
public void readExternal(PofReader pofReader)
throws IOException
{
m_next = new AtomicLong(pofReader.readLong(0));
m_last = pofReader.readLong(1);
}
/**
* Serialize object into the POF stream.
*
* @param pofWriter POF writer to use
*
* @throws IOException if an error occurs
*/
public void writeExternal(PofWriter pofWriter)
throws IOException
{
pofWriter.writeLong(0, m_next.longValue());
pofWriter.writeLong(1, m_last);
}
}
We also need to add a method that will allocate a new sequence block to our Sequence class:
public class Sequence
implements Serializable, PortableObject
{
// ---- data members ----------------------------------------------------
/**
* The last allocated number from this sequence.
*/
private long m_last;
// ---- public methods --------------------------------------------------
/**
* Allocate a block of sequence numbers, starting from the last allocated
* sequence value.
*
* @param blockSize the number of sequences to allocate
*
* @return allocated block of sequential numbers
*/
public SequenceBlock allocateBlock(int blockSize)
{
SequenceBlock block = new SequenceBlock(m_last + 1, m_last + blockSize);
m_last += blockSize;
return block;
}
/**
* Return the last allocated sequence number.
*
* @return the last allocated sequence number
*/
public long peek()
{
return m_last;
}
// ---- PortableObject implementation -----------------------------------
/**
* Deserialize object from the POF stream.
*
* @param pofReader POF reader to use
*
* @throws IOException if an error occurs
*/
public void readExternal(PofReader pofReader)
throws IOException
{
m_last = pofReader.readLong(0);
}
/**
* Serialize object into the POF stream.
*
* @param pofWriter POF writer to use
*
* @throws IOException if an error occurs
*/
public void writeExternal(PofWriter pofWriter)
throws IOException
{
pofWriter.writeLong(0, m_last);
}
}
Now that we have core classes in place, implementing the actual generator is quite trivial:
public class SequenceGenerator
implements IdentityGenerator<Long>
{
// ---- data members ----------------------------------------------------
/**
* Default sequence block size.
*/
private static final int DEFAULT_BLOCK_SIZE = 20;
/**
* Sequences cache.
*/
private static final NamedCache
s_sequenceCache = CacheFactory.getCache("sequences");
/**
* Sequence name.
*/
private String name;
/**
* Sequence block size.
*/
private int blockSize;
/**
* Currently allocated block of sequences.
*/
private SequenceBlock allocatedSequences;
// ---- constructors ----------------------------------------------------
/**
* Construct sequence generator.
*
* @param name a sequence name
*/
public SequenceGenerator(String name)
{
this(name, DEFAULT_BLOCK_SIZE);
}
/**
* Construct sequence generator.
*
* @param name a sequence name
* @param blockSize the size of the sequence block to allocate at once
*/
public SequenceGenerator(String name, int blockSize)
{
this.name = name;
this.blockSize = blockSize;
}
// ---- IdentityGenerator implementation --------------------------------
/**
* Return the next number in the sequence.
*
* @return the next number in the sequence
*/
public synchronized Long generateIdentity()
{
if (allocatedSequences == null || !allocatedSequences.hasNext())
{
allocatedSequences = allocateSequenceBlock();
}
return allocatedSequences.next();
}
// ---- helper methods --------------------------------------------------
/**
* Allocate a new sequence block.
*
* @return block of sequential numbers
*/
protected SequenceBlock allocateSequenceBlock()
{
return (SequenceBlock)
s_sequenceCache.invoke(name,
new SequenceBlockAllocator(blockSize));
}
}
As you can see, generateIdentity method is synchronized, which makes the generator thread-safe within the single JVM. Cluster-wide concurrency is ensured using SequenceBlockAllocator entry processor, which is guaranteed to execute atomically:
public class SequenceBlockAllocator
extends AbstractProcessor
implements PortableObject
{
// ---- data members ----------------------------------------------------
/**
* The size of the sequence block to allocate.
*/
private int m_blockSize;
// ---- constructors ----------------------------------------------------
/**
* For internal use only.
*/
public SequenceBlockAllocator()
{
// deserialization constructor
}
/**
* Construct new sequence block allocator.
*
* @param blockSize the size of the sequence block to allocate
*/
public SequenceBlockAllocator(int blockSize)
{
this.m_blockSize = blockSize;
}
// ---- EntryProcessor implementation -----------------------------------
/**
* Allocates a block of sequences from a target entry.
* <p/>
* If the target entry for the given name does not already exist in a cache,
* it will be created automatically.
*
* @param entry target entry to allocate sequence block from
*
* @return allocated sequence block
*/
public Object process(InvocableMap.Entry entry)
{
Sequence sequence = entry.isPresent()
? (Sequence) entry.getValue()
: new Sequence();
SequenceBlock block = sequence.allocateBlock(m_blockSize);
entry.setValue(sequence);
return block;
}
// ---- PortableObject implementation -----------------------------------
/**
* Deserialize object from the POF stream.
*
* @param pofReader POF reader to use
*
* @throws IOException if an error occurs
*/
public void readExternal(PofReader pofReader)
throws IOException
{
m_blockSize = pofReader.readInt(0);
}
/**
* Serialize object into the POF stream.
*
* @param pofWriter POF writer to use
*
* @throws IOException if an error occurs
*/
public void writeExternal(PofWriter pofWriter)
throws IOException
{
pofWriter.writeInt(0, m_blockSize);
}
}
In order to ensure reliability, we need to store the sequence numbers in a persistent store that is accessible from all Coherence nodes, such as relational database. This can be easily accomplished by configuring sequences cache to use read/write backing map and appropriate cache store implementation. Because of the differences in environments and persistence framework preferences, this is left as an exercise for the reader.
Using Sequence Generator
Once you have everything in place, actual usage of the sequence generator is quite simple -- all you need to do is create an instance of SequenceGenerator class and invoke generateIdentity method whenever you need a new id.
The most logical place to place this code in is the class for which you need to generate the identifier. For example, in order to use sequence generator to create identifiers for new Account instances, you would make a SequenceGenerator a static field of the Account class and use it within the create factory method:
public class Account implements Entity {
// ----- static members -------------------------------
private static final IdentityGenerator<Long> idGenerator
= new SequenceGenerator("account.id");
// ----- data members ---------------------------------
private final long id;
private final long customerId;
private String name;
private BigDecimal balance;
private long lastTransactionNumber;
// ----- dependencies ---------------------------------
private transient CustomerRepository customerRepository;
private transient TransactionRepository transactionRepository;
// ----- constructors and factory methods -------------
public Account(long id, long customerId, String name,
BigDecimal balance, long lastTransactionNumber) {
this.id = id;
this.customerId = customerId;
this.name = name;
this.balance = balance;
this.lastTransactionNumber = lastTransactionNumber;
}
public static Account create(long customerId, String name) {
return new Account(idGenerator.generateIdentity(),
customerId, name, new BigDecimal(0), 0);
}
...
}
Conclusion
While Coherence doesn't have a built-in mechanism for sequential identifier generation, implementing one on top of it is quite simple.
You can download all the code for this article using the link at the top. Enjoy!
Comments