Files CCore/inc/Packet.h CCore/src/Packet.cpp
Packets is an infra-structure for efficient, flexible, parallel mass request processing. In this infra-structure processing nodes, also called devices, are exchanging packets. Packet is a task request. It contains a small stack for request parameters and return value and typically has an attached data buffer to transfer a bulk of data. Packet may be canceled by the originator. Each packet device has a packet entry gate, through which packets are coming for a processing. During processing a packet can be stored, transferred to another packet device or completed. Eventually packet must be completed and during completion returned to the packet pool, from which it was borrowed. Packet cancellation is a mean the packet originator can use to ask a current packet device to cancel a packet processing. It is assumed, that packet should be quickly disengaged from the processing and completed.
Packet functions, complete or cancel, must be called in a lock-free context.
The number of software entities are required to deal with packets.
using TimeStamp = SecTimer::ValueType ; using PacketFunction = Function<void (PacketHeader *)> ;
TimeStamp is a type of time-stamp value.
PacketFunction is a packet function type.
const ulen MaxPacketExtLen = Align(256) ; const ulen MaxPacketCompleteFunctions = 8 ; const ulen DefaultPacketMaxDataLen = 1600 ; enum PacketCancelState { Packet_NoCancelFunction, Packet_HasCancelFunction, Packet_Cancelled }; enum LenStatus { Len_ok, Len_too_short, Len_too_long };
MaxPacketExtLen is a space for the packet extension stack.
MaxPacketCompleteFunctions is a packet complete function stack depth.
DefaultPacketMaxDataLen is a default packet data buffer length.
PacketCancelState is a packet cancel state constants.
LenStatus is a length status.
void GuardLenSubTooShort(const char *name,ulen len,ulen delta); inline ulen LenSub(const char *name,ulen len,ulen delta) // always > 0 { if( len<=delta ) GuardLenSubTooShort(name,len,delta); return len-delta; } inline PacketHeader * GetPacketHeader(PacketHeader *packet) { return packet; } template <class POD,class ... TT> PacketHeader * GetPacketHeader(Packet<POD,TT...> packet) { return packet.getPtr(); } template <class POD,class T> void DropPacketExt(PacketHeader *packet_) { Packet<POD,T> packet=packet_; packet.popExt().complete(); }
LenSub() performs the length subtraction. It is expected, that the result is positive. Otherwise, an exception is thrown. GuardLenSubTooShort() throw an exception. name is a function name in the exception message.
GetPacketHeader() casts a "packet" to the pointer to the PacketHeader.
DropPacketExt() is a simple packet completion function. Use it when you need only to pop some packet extension.
The class PacketBuf is used to handle a packet data buffer. This class keeps a pointer to the data buffer alone with its length and the data length, stored in the buffer. From the PacketBuf perspective data is a raw data, i.e. a sequence of bytes.
class PacketBuf : NoCopy
{
void *data;
ulen max_data_len;
ulen data_len;
....
public:
// constructors
PacketBuf() noexcept;
explicit PacketBuf(ulen max_data_len);
~PacketBuf();
// std move
PacketBuf(PacketBuf &&obj) noexcept;
PacketBuf & operator = (PacketBuf &&obj) noexcept;
// methods
bool provide(ulen max_data_len);
void detach();
// data methods
bool noSpace() const { return !max_data_len; }
ulen getMaxRawLen() const { return max_data_len; }
ulen getRawLen() const { return data_len; }
void setRawLen(ulen data_len_) { guardLen(data_len_); data_len=data_len_; }
Place<void> getRaw() { return PlaceAt(data); }
};
Default constructor creates an empty data buffer.
Another constructor creates a data buffer with the given capacity. The data_len is zero. Exception is thrown in case of error.
Destructor releases the buffer.
PacketBuf is std movable. The original object is nullified during the move.
provide() ensures the data buffer has the given capacity. It also clears data_len. If the buffer of a greater capacity was already allocated, the extra memory is returned to the heap. The return value is true, if the operation is successful.
detach() releases the buffer. The object becomes empty.
noSpace() is true, if the buffer is empty.
getMaxRawLen() returns the buffer capacity in bytes.
getRawLen() returns the actual data length in the buffer.
setRawLen() sets the actual data length, the value must not exceed the buffer capacity, otherwise the Abort() is called.
getRaw() returns the "place" of the beginning of the data buffer. You may use this value to put data to the buffer.
Packet itself consists of the packet header, which is an object of the class PacketHeader and the packet extension behind the header. All of this is placed in a memory block of some predefined length. Packet extension is organized as a stack of any type objects. PacketHeader contains another stack of completion functions.
class PacketHeader : NoCopy
{
PacketBuf pbuf;
TimeStamp time_stamp;
Atomic cancel_state; // PacketCancelState
PacketFunction cancel_function;
void *cancel_arg;
PacketFunction stack[MaxPacketCompleteFunctions];
ulen stack_len;
ulen ext_len;
DLink<PacketHeader> link;
bool in_list;
public:
// constructors
static constexpr ulen AllocLen() { return Delta()+MaxPacketExtLen; }
explicit PacketHeader(PacketFunction complete_function) noexcept;
void recycle(PacketFunction complete_function);
~PacketHeader() {}
PacketHeader has a number of inner data fields. It also has an extension.
Normally you don't need the PacketHeader constructor, because you get a ready-to-use packet from a packet pool. This constructor creates a clean packet with the given completion function.
The destructor is not trivial: it calls a destructor of the inner PacketBuf. And again, you don't use it directly, because once you finished with a packet, you call its complete method and packet returns to the packet pool, where it was originally taken.
recycle() is used to prepare the packet for reuse. The packet must have empty extension, complete stack and no cancel function, otherwise the Abort() is called.
AllocLen() is a packet memory block length.
// complete void pushCompleteFunction(PacketFunction complete_function); PacketFunction popCompleteFunction(); void complete() { popCompleteFunction()(this); }
Packet has the inner complete function stack. Once a packet processing is finished, the method complete() must be called to complete the packet. A packet originator assigns a complete function, when preparing a packet for a processing. When a packet is taken from a packet pool, it has the one complete function in the stack, which returns the packet to the pool.
pushCompleteFunction() pushes a packet function to the complete function stack.
popCompleteFunction() pops a packet function from the complete function stack.
complete() pops a packet function from the complete function stack and applies it to the packet.
In case of stack overflow or underflow the Abort() is called.
// cancel PacketCancelState setCancelFunction(PacketFunction cancel_function,void *cancel_arg=0); PacketCancelState clearCancelFunction(); PacketCancelState getCancelFunction(PacketFunction &ret); void * getCancelArg() const { return cancel_arg; }
Packet has a cancel state. The state has one of three values: Packet_NoCancelFunction, Packet_HasCancelFunction, Packet_Cancelled.
setCancelFunction() assigns a cancel function with an optional argument. The packet must not have a cancel function assigned. If the packet is already canceled, the function does nothing, otherwise it assigns a cancel function and changes the packet state to the Packet_HasCancelFunction. The previous state is returned.
clearCancelFunction() clears the cancel function. The packet must be in Packet_HasCancelFunction or Packet_Cancelled state. If the packet is already canceled, the function does nothing, otherwise it clears a cancel function and changes the packet state to the Packet_NoCancelFunction. The previous state is returned.
These two functions are called by packet processors. Once a packet is received for processing, a packet processor may assign a cancel function to it, if the packet processing is not immediate. Before the packet is disengaged, a cancel function must be cleared. To learn how to properly use this feature, see the CancelPacketList implementation.
getCancelFunction() cancels the packet. The state is changed to Packet_Cancelled. The previous state is returned and the cancel function is returned if has been set. This function is called by the packet originator to cancel the packet processing. If the packet has an assigned cancel function, then it must be called to request the packet processing cancellation. The cancel function, if exists, completes the packet.
getCancelArg() returns the cancel argument.
// stamp TimeStamp getStamp() const { return time_stamp; } bool isOld(TimeStamp now,TimeStamp how_old) const { return now-time_stamp>how_old; } void stamp() { time_stamp=SecTimer::Get(); }
PacketHeader has the inner time-stamp field. The seconds timer is used for the time-stamping.
getStamp() return the packet time-stamp.
isOld() is used to check, if the packet is old enough. The first argument is the current time and the second is a desired life-time.
stamp() sets the current time as a time-stamp.
// ext template <class T,class ... SS> T * pushExt(SS && ... ss); template <class T> T * getExt(); template <class T,class ... TT> // T, T1, T2, ... , Ttop T * getDeepExt(); template <class T> void popExt();
The next set of methods are to "extend" the packet.
pushExt() pushs the object of the type T to the extension stack. The object is constructed using the given set of arguments. Pointer to the constructed object is returned. If there is no enough room for the object, the Abort() is called.
getExt() returns a pointer to the top of the extension stack. You must known exactly, that the object exists and has the given type.
getDeepExt() is used, if you have to look deeper into the extension stack. If the stack contains a sequence of objects of types T, T1, ... Ttop, you may use this methods to get a pointer to the deep object of the type T.
popExt() pops and destroys the top object from the extension stack. You must known exactly, that the object exists and has the given type.
// data attach/detach bool provide(ulen max_data_len=DefaultPacketMaxDataLen); void detach(); void attach(PacketBuf &pbuf); void detach(PacketBuf &pbuf); void detach(PacketHeader *dst);
PacketHeader has the inner PacketBuf field and a set of methods to deal with it. This buffer can be used to store a bulk of data with a packet.
These methods are direct calls of the correspondent PacketBuf methods. Last three methods move the data buffer. It is not allowed if the packet belongs to a packet list. In this case the Abort() is called. The last method moves the packet buffer to another packet. This packet also must not belong to a packet list.
// raw data bool noSpace() const { return pbuf.noSpace(); } ulen getMaxRawLen() const { return pbuf.getMaxRawLen(); } ulen getRawLen() const { return pbuf.getRawLen(); } void setRawLen(ulen data_len) { pbuf.setRawLen(data_len); } Place<void> getRaw() { return pbuf.getRaw(); } };
Raw data methods are direct calls of the correspondent PacketBuf methods.
PacketHeader contains a list link and can be inserted into a PacketList. It is not possible to have a PacketHeader in two lists. If a packet belongs to a list, it cannot be completed also.
class PacketList : NoCopy
{
....
public:
// constructors
PacketList() noexcept;
~PacketList();
// std move
PacketList(PacketList &&obj) noexcept;
PacketList & operator = (PacketList &&obj) noexcept;
// props
ulen getCount() const { return count; }
ulen getTotalLen() const { return total_len; }
// put
void put(PacketHeader *packet);
void put_first(PacketHeader *packet);
template <class P>
void put(P packet) { put(GetPacketHeader(packet)); }
template <class P>
void put_first(P packet) { put_first(GetPacketHeader(packet)); }
// get
PacketHeader * get();
PacketHeader * get_last();
// del
void del(PacketHeader *packet);
template <class P>
void del(P packet) { del(GetPacketHeader(packet)); }
// methods
void complete();
void moveOld(TimeStamp how_old,PacketList &dest);
// swap/move object
void objSwap(PacketList &obj);
explicit PacketList(ToMoveCtor<PacketList> obj);
};
PacketList must be empty before the destructor is called, otherwise the Abort() is called.
PacketList is std movable. The original object is nullified during the move.
getCount() is the number of packets in the list.
getTotalLen() is the total buffer space of packets in the list.
put() puts the packet into the list. Packet must not be in a list, otherwise the Abort() is called.
put_first() puts the packet into the list as the first element. Packet must not be in a list, otherwise the Abort() is called.
get() gets the first packet from the list. If the list is empty, a null pointer is returned.
get_last() gets the last packet from the list. If the list is empty, a null pointer is returned.
del() deletes the packet form the list. Packet must belong to this list.
complete() completes all packets from the list. List becomes empty.
moveOld() moves all packets, older than how_old to the destination list.
PacketList is swappable and movable.
The class Packet is a PacketHeader pointer wrapper. It behaves like a raw pointer, so use it with caution.
template <class POD,class ... TT> // T1, T2, ... , Ttop
class Packet
{
PacketHeader *packet;
public:
// constructors
Packet() noexcept : packet(0) {}
Packet(NothingType) : Packet() {}
Packet(PacketHeader *packet_) : packet(packet_) {}
Packet template parameters are: the assumed type of data in the data buffer, it is a POD type. Usually, it is char or uint8. Other parameters are types of top extension objects.
Packet can be implicitly constructed from a raw pointer. Make sure the pointer either null, or points to a packet with required properties.
// props PacketHeader * operator + () const { return packet; } bool operator ! () const { return packet==0; } PacketHeader * getPtr() const { return packet; }
Usual "is null", "not null" and object access operations. You may use the GetPacketHeader() function to convert a Packet to the PacketHeader pointer.
// complete
void pushCompleteFunction(PacketFunction complete_function) { packet->pushCompleteFunction(complete_function); }
PacketFunction popCompleteFunction() { return packet->popCompleteFunction(); }
void complete() { Replace_null(packet)->complete(); }
Complete function methods.
The method complete() not only completes the packet, but also nullifies it. After this method call the object becomes null. Remember, once you called the method complete, the packet is no longer available for you. That is why we nullify the pointer in this class.
// cancel PacketCancelState setCancelFunction(PacketFunction cancel_function,void *cancel_arg=0) { return packet->setCancelFunction(cancel_function,cancel_arg); } PacketCancelState clearCancelFunction() { return packet->clearCancelFunction(); } void * getCancelArg() { return packet->getCancelArg(); }
These methods are the packet cancellation methods.
// stamp TimeStamp getStamp() { return packet->getStamp(); } bool isOld(TimeStamp now,TimeStamp how_old) { return packet->isOld(now,how_old); } void stamp() { packet->stamp(); }
These methods are the packet time-stamp methods.
// ext template <class T,class ... SS> Packet<POD,TT...,T> pushExt(SS && ... ss) { PacketHeader *ret=Replace_null(packet); ret->pushExt<T>( std::forward<SS>(ss)... ); return ret; } Ttop * getExt() { return packet->getExt<typename PacketExtType<POD,TT...>::TopType>(); } Packet<POD,...> popExt() { PacketHeader *ret=Replace_null(packet); ret->popExt<typename PacketExtType<POD,TT...>::TopType>(); return ret; } template<ulen Index> TIndex * getDeepExt() // 1 -> T1, 2 -> T2, ... { return PacketDeepExt<Index,TT...>::Get(packet); } template<ulen Index> Packet<POD,TIndex+1,...> forgetExt() // 1 -> T2,... , 2 -> T3,... , ... { return Replace_null(packet); }
Packet extension methods.
pushExt() pushs the object to the packet extension stack. The type of the packet changes, so this method nullifies the current Packet and returns the new Packet of the proper type.
getExt() returns a pointer to the top extension object. The type is derived from the Packet type.
popExt() pops an object from the packet extension stack. The type of the packet changes, so this method nullifies the current Packet and returns the new Packet of the proper type.
getDeepExt() gets a deep object from the packet extension. The type is derived from the Packet type.
forgetExt() "forgets" some packet extension object types. This method nullifies the current Packet and returns the new Packet of the proper type.
// data attach/detach bool provide(ulen max_data_len) { return packet->provide(max_data_len*sizeof (POD)); } bool provide() { return packet->provide(); } void detach() { packet->detach(); } void attach(PacketBuf &pbuf) { packet->attach(pbuf); } void detach(PacketBuf &pbuf) { packet->detach(pbuf); } void detach(PacketHeader *dst) { packet->detach(dst); } template <class ... SS> void detach(Packet<POD,SS...> dst) { packet->detach(dst.getPtr()); }
Packet data buffer methods. The method provide() takes the data length in POD units, not in bytes.
// data bool noSpace() { return packet->noSpace(); } ulen getMaxDataLen() { return packet->getMaxRawLen()/sizeof (POD); } ulen getDataLen() { return packet->getRawLen()/sizeof (POD); } POD * getData() { return packet->getRaw(); } PtrLen<POD> getMaxRange() { return Range(getData(),getMaxDataLen()); } PtrLen<POD> getRange() { return Range(getData(),getDataLen()); } bool checkDataLen(ULenSat data_len) { return data_len<=getMaxDataLen(); } PtrLen<POD> setDataLen(ULenSat data_len) // assume checkDataLen(data_len) { ulen raw_len=data_len.value*sizeof (POD); packet->setRawLen(raw_len); return Range(getData(),data_len.value); }
Packet data methods. All length here are in POD units.
checkDataLen() checks the data_len of the ULenSat type to fit into the data buffer capacity.
setDataLen() sets the data length and returns the data range. It is assumed, the data_len fits into the data buffer capacity.
// format PacketFormat::SubResult getMaxDataLen(PacketFormat format) { return format.sub(getMaxDataLen()); } PacketFormat::SubResult getDataLen(PacketFormat format) { return format.sub(getDataLen()); } PacketFormat::CutResult<POD> getMaxRange(PacketFormat format) { return format.cutMax(getMaxRange()); } bool checkRange(PacketFormat format) { return format.check(getDataLen()); } PtrLen<POD> getRange(PacketFormat format) // assume checkRange(format) { return format.cut(getRange()); } PtrLen<POD> getPrefix(PacketFormat format) // assume checkRange(format) { return format.cutPrefix(getRange()); } PtrLen<POD> getSuffix(PacketFormat format) // assume checkRange(format) { return format.cutSuffix(getRange()); } bool checkDataLen(PacketFormat format,ULenSat data_len) { return format.checkLen(data_len,getMaxDataLen()); } PtrLen<POD> setDataLen(PacketFormat format,ULenSat data_len) // assume checkDataLen(format,data_len) { setDataLen(format.add(data_len.value)); return Range(getData()+format.prefix,data_len.value); } };
The set of data methods with PacketFormat. See below more explanations about packet formats.
getMaxDataLen() returns the maximum working data length with the given packet format.
getDataLen() returns the working data length with the given packet format.
getMaxRange() return the maximum working data range with the given packet format.
checkRange() returns true, if the data length fits in the given packet format.
getRange() returns the working data range with the given packet format.
getPrefix() returns the data prefix with the given packet format.
getSuffix() returns the data suffix with the given packet format.
checkDataLen() checks if the given working data length can be set.
setDataLen() sets the working data length and returns the working data range.
PacketCanceller is a helper class. It simplifies the process of packet cancellation.
class PacketCanceller : NoCopy
{
PacketHeader *packet;
PacketFunction cancel_function;
public:
template <class P>
explicit PacketCanceller(P packet_) : packet(GetPacketHeader(packet_)) {}
bool getCancelFunction() { return packet->getCancelFunction(cancel_function)==Packet_HasCancelFunction; }
void cancel() { cancel_function(packet); }
};
Constructor records a packet pointer.
getCancelFunction() cancels the recorded packet. The method returns true, if a cancel function has been retrieved.
cancel() calls the cancel function. Use must call it, if the getCancelFunction() returned true.
The process has two phases, because the first call can be done in any context, but a cancel function must be called in a lock-free context.
Packet format consists of three values: the prefix length, the suffix length and the maximum working data length. It is often required to reserve some prefix and suffix in the data range for a further processing. Fox example, when you prepare UDP packet, the prefix for UDP header, IP header and Ethernet header is reserved. These parts are filled by the next-layer packet devices. You put data into the working subrange after this prefix.
PacketFormat represents the packet format and has a number of methods to manipulate with lengths and data ranges.
struct PacketFormat
{
ulen prefix;
ulen max_data;
ulen suffix;
// prefix + suffix + max_data <= MaxULen
PacketFormat()
{
prefix=0;
max_data=0;
suffix=0;
}
PacketFormat(NothingType) : PacketFormat() {}
ulen getMaxTotalLen() const { return prefix+max_data+suffix; }
PacketFormat addPrefix(ulen prefix_len) const;
PacketFormat addSuffix(ulen suffix_len) const;
PacketFormat addPrefixSuffix(ulen prefix_len,ulen suffix_len) const;
getMaxTotalLen() is the maximum total data length. It is the sum of the prefix length, the suffix length and the maximum working data length.
addPrefix() creates the new PacketFormat with increased prefix, max_data is decreased. Exception is thrown, if there is no space.
addSuffix() creates the new PacketFormat with increased suffix, max_data is decreased. Exception is thrown, if there is no space.
addPrefixSuffix() creates the new PacketFormat with increased prefix and suffix, max_data is decreased. Exception is thrown, if there is no space.
ulen add(ulen len) const; // return total_len; struct SubResult { ulen len; LenStatus status; SubResult(ulen len_,LenStatus status_) : len(len_),status(status_) {} bool isTooShort() const { return status==Len_too_short; } bool notFitFormat() const { return status!=Len_ok; } }; SubResult sub(ulen total_len) const; // return len; bool checkLen(ULenSat len,ulen max_total_len) const;
add() calculates the total data range length from the working range length. No checks are performed.
sub() performs the inverse operation: it calculates the working range length from the total data range length. But it is a checked operation, so it returns a data structure with two fields: len and status. If the total_len is too short, the status is Len_too_short and the len is zero. If the total_len is too large, the status is Len_too_long and the len is max_data. Otherwise status is Len_ok and the len is the working range length.
checkLen() checks, if the len can be used as the working data length and the total data length is limited by the given limit.
bool check(ulen total_len) const; template <class T> PtrLen<T> cut(PtrLen<T> range) const; // assume check(range.len) template <class T> PtrLen<T> cutPrefix(PtrLen<T> range) const; // assume check(range.len) template <class T> PtrLen<T> cutSuffix(PtrLen<T> range) const; // assume check(range.len) template <class T> struct CutResult { PtrLen<T> range; LenStatus status; CutResult(PtrLen<T> range_,LenStatus status_) : range(range_),status(status_) {} LenStatus operator ! () const { return status==Len_too_short; } }; template <class T> CutResult<T> cutMax(PtrLen<T> range) const; };
check() checks, if the given length can be a total data length.
cut() cuts the working range from the total data range.
cutPrefix() cuts the prefix from the total data range.
cutSuffix() cuts the suffix from the total data range.
cutMax() cuts the working range from the total data range. This method doesn't assume the total range has the proper length. It returns the structure with two fields: range with the resulting range and status with the length status. If the input range is too short, the status is Len_too_short and the range is empty. If the input range has a good length, the status is Len_ok and the range is the working range. Finally, if the input range is too long, the status is Len_too_long and the range has max_data length.
Assume, we have a packet processing device, like this:
class Device { .... public: .... void put(Packet<uint8,int> packet); };
To use this device a packet must be acquired. You may simply borrow it from the default packet pool:
Packet<uint8> packet=AllocPacket<uint8>(); if( !packet ) { .... } // error handling
If the pool is empty, the call is blocked until some packet is returned. This is different, than a memory allocation behavior. We assume, that packets will be eventually completed and return to the pool. So if there is no available packets, processing will be delayed.
Or (preferred), you may use the packet set.
PacketSet<uint8> pset; Packet<uint8> packet=pset.get(); if( !packet ) { .... } // error handling
Packet set takes packets from the same pool, but it limits the number. It also tracks packets, so you may also wait until all packets, taken from this set, are completed, or cancel them.
Then you have to prepare the packet:
Packet<uint8,int> packet2=packet.pushExt<int>(12345); // int = 12345 is pushed auto range=packet2.setDataLen(100); .... // fill range packet2.pushCompleteFunction(DropPacketExt<uint8,int>);
You should push the required packet extension and fill the data buffer. You also have to push a complete function. In our case we push the function to drop the additional packet extension we have pushed to the packet.
When the packet is ready, submit it to the processing.
Device *device=....; device->put(packet2);
Now the device owns the packet. When it finish to deal with one, the packet will be completed and returned to the packet pool (or packet set). You may prepare and submit several packets. If you need a packet processing result, you may push such a complete function, which examines the result. Let's assume, the device above returns some integer code in the packet extension. Then you may proceed as following:
class Submitter : public Funchor_nocopy { int code; Sem sem; private: void complete(PacketHeader *packet_) { Packet<uint8,int> packet=packet_; code=*packet.getExt(); sem.give(); packet.popExt().complete(); } PacketFunction function_complete() { return FunctionOf(this,&Submitter::complete); } public: Submitter(); ~Submitter(); int submit(Device *device,Packet<uint8> packet) { Packet<uint8,int> packet2=packet.pushExt<int>(0); packet2.pushCompleteFunction(function_complete()); device->put(packet2); sem.take(); return code; } }; .... Submitter submitter; Device *device=....; Packet<uint8> packet=AllocPacket<uint8>(); if( !packet ) { .... } // error handling auto range=packet.setDataLen(100); .... // fill range int code=submitter.submit(device,packet);
It is recommended to use the PacketSet class as a packet source. To learn more about packet processing see the test3009.PacketSet.cpp.