5353#include " roaring/roaring.c"
5454#include " roaring/roaring.hh" // for COMPACT_COUNT_DISTINCT
5555
56+ #include < cereal/archives/binary.hpp> // for PERCENTILE
57+ #include < cereal/types/vector.hpp> // for PERCENTILE
58+
5659namespace voltdb {
5760/*
5861 * Type of the hash set used to check for column aggregate distinctness
@@ -564,45 +567,116 @@ class CompactToCardinalityAgg : public CompactCountDistinctAgg {
564567
565568// / Single partition aggregate
566569class PercentileAgg : public Agg {
570+ protected:
567571 double m_percentile;
572+ std::vector<double > m_values;
568573
569- public:
570- PercentileAgg (double percentile) : m_percentile(percentile)
571- {
572- }
574+ // Utility function to get the given NValue as a double. There's a bunch of
575+ // clean, handy looking methods in NValue, but they're all private.
576+ double doubleValue (const NValue& value) {
577+ // Cast the value as a double first, because peekDouble() will only
578+ // give you a value if the NValue is of type VALUE_TYPE_DOUBLE.
579+ return ValuePeeker::peekDouble (value.castAs (VALUE_TYPE_DOUBLE));
580+ }
573581
574- virtual void advance (const NValue& val)
575- {
576- // TODO: do the thing
577- }
582+ public:
583+ PercentileAgg (double percentile) : m_percentile(percentile)
584+ {
585+ // Do some basic checking to ensure that the percentile given is in
586+ // a valid range (0% to 100% exclusive).
587+ assert (percentile > 0.0 );
588+ assert (percentile < 1.0 );
589+ }
578590
579- virtual NValue finalize (ValueType type)
580- {
581- // TODO: do the thing
582- return ValueFactory::getDoubleValue (m_percentile);
591+ virtual void advance (const NValue& val)
592+ {
593+ // We only handle numeric types.
594+ assert (ValuePeeker::peekValueType (val) == VALUE_TYPE_TINYINT
595+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_SMALLINT
596+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_INTEGER
597+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_BIGINT
598+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_DOUBLE
599+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_TIMESTAMP
600+ || ValuePeeker::peekValueType (val) != VALUE_TYPE_DECIMAL);
601+
602+ // We don't track null or nonsensical values.
603+ if (!val.isNull () && !val.isNaN ()) {
604+ m_values.push_back (doubleValue (val));
583605 }
606+ }
584607
585- virtual void resetAgg ()
586- {
587- Agg::resetAgg ();
608+ virtual NValue finalize (ValueType type)
609+ {
610+ // If we didn't see any values, the percentile is null.
611+ if (m_values.empty ()) {
612+ return ValueFactory::getNullValue ();
588613 }
614+
615+ // Sort the values before we index into the list.
616+ std::sort (m_values.begin (), m_values.end ());
617+
618+ // Figure out the rank of the value pertaining to the Nth percentile.
619+ // This implements the "Second variant" of the linear interpolation models
620+ // described in the following Wikipedia article:
621+ // https://web.archive.org/web/20200223144405/https://en.wikipedia.org/wiki/Percentile
622+ // rank = (percentile * (sample count - 1)) + 1
623+ double rank = (m_percentile * (m_values.size () - 1 )) + 1 ;
624+
625+ // The rank potentially has both an integral and a fractional rank.
626+ // Break it into those component pieces via modf.
627+ double integralRank;
628+ double fractionalRank = modf (rank, &integralRank);
629+
630+ // The integral rank is 1-based, but our vector of values is 0-based. Fetch the
631+ // value at the integral rank position (the "start") and the value just past
632+ // that position (the "end"). Mix in some std::min and std::max to ensure we
633+ // don't fall off either end of the vector into undefined space.
634+ double start = m_values.at (std::max (0 , (int ) integralRank - 1 ));
635+ double end = m_values.at (std::min ((int ) integralRank, (int ) m_values.size () - 1 ));
636+
637+ // The interpolated result uses the following formula:
638+ // result = start + (fractional rank * (end - start))
639+ return ValueFactory::getDoubleValue (start + (fractionalRank * (end - start)));
640+ }
641+
642+ virtual void resetAgg ()
643+ {
644+ Agg::resetAgg ();
645+ m_values.clear ();
646+ }
589647};
590648
591649// / Push-down (multi partition) aggregate
592- class ValuesToTDigestAgg : public Agg {
650+ class ValuesToTDigestAgg : public PercentileAgg {
593651public:
594- virtual void advance (const NValue& val)
652+ // The percentile that we pass down to PercentileAgg doesn't matter for the
653+ // push down aggregate. It just needs to be in the bounds that PercentileAgg
654+ // accepts.
655+ ValuesToTDigestAgg () : PercentileAgg(0.5 )
595656 {
596- // TODO: do the thing
597657 }
598658
599659 virtual NValue finalize (ValueType type)
600660 {
601- // TODO: do the thing
661+ // Make sure the caller is asking for a VARBINARY value. This aggregate
662+ // doesn't know how to produce anything else.
602663 assert (type == VALUE_TYPE_VARBINARY);
603- int byteSize = 0 ;
604- char *serializedBytes = new char [byteSize];
605- return ValueFactory::getTempBinaryValue (serializedBytes, byteSize);
664+
665+ // Create a buffer to hold the serialized values.
666+ std::ostringstream oss;
667+ {
668+ // Use Cereal (https://uscilab.github.io/cereal/index.html) to serialize
669+ // the vector of values to the buffer. This code is inside a block to ensure
670+ // that all of the BinaryOutputArchive structures are flushed to the
671+ // ostringstream, which it does when it goes out of scope at the end of
672+ // this block.
673+ cereal::BinaryOutputArchive oarchive (oss);
674+ oarchive (m_values);
675+ }
676+
677+ // Convert the buffer to a VARBINARY value for VoltDB.
678+ return ValueFactory::getTempBinaryValue (oss.str ().c_str (),
679+ static_cast <int32_t >(oss.str ().length ()));
606680 }
607681};
608682
@@ -615,7 +689,30 @@ class TDigestToPercentileAgg : public PercentileAgg {
615689
616690 virtual void advance (const NValue& val)
617691 {
618- // TODO: do the thing
692+ // Ensure that the incoming value is a VARBINARY (we don't know how to work
693+ // with anything else) and that it's not null.
694+ assert (ValuePeeker::peekValueType (val) == VALUE_TYPE_VARBINARY);
695+ assert (!val.isNull ());
696+
697+ // Fetch a pointer to the underlying data. It's being loaned to us by the
698+ // NValue, so we don't need to deallocate it when we're done.
699+ int32_t length;
700+ const char * buf = ValuePeeker::peekObject_withoutNull (val, &length);
701+ assert (length > 0 );
702+
703+ // Wrap the data up in a buffer that Cereal can work with. This is going to
704+ // copy the data, which is not strictly necessary. But I'm not sure we have
705+ // an alternative. Cereal wants an istringstream and there don't appear to
706+ // be any non-copying variants.
707+ std::istringstream iss (std::string (buf, length));
708+
709+ // Use Cereal to deserialize the values into a std::vector.
710+ cereal::BinaryInputArchive iarchive (iss);
711+ std::vector<double > incomingValues;
712+ iarchive (incomingValues);
713+
714+ // Add the deserialized values to any prior values (from other VoltDB dist nodes).
715+ m_values.insert (m_values.end (), incomingValues.begin (), incomingValues.end ());
619716 }
620717};
621718
@@ -662,33 +759,33 @@ inline Agg* getAggInstance(Pool& memoryPool, ExpressionType agg_type, bool isDis
662759 case EXPRESSION_TYPE_AGGREGATE_VALUES_TO_TDIGEST:
663760 return new (memoryPool) ValuesToTDigestAgg ();
664761 case EXPRESSION_TYPE_AGGREGATE_MEDIAN:
665- return new (memoryPool) PercentileAgg (50 );
762+ return new (memoryPool) PercentileAgg (0.5 );
666763 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_MEDIAN:
667- return new (memoryPool) TDigestToPercentileAgg (50 );
764+ return new (memoryPool) TDigestToPercentileAgg (0.5 );
668765 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_1:
669- return new (memoryPool) PercentileAgg (1 );
766+ return new (memoryPool) PercentileAgg (0.01 );
670767 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_1:
671- return new (memoryPool) TDigestToPercentileAgg (1 );
768+ return new (memoryPool) TDigestToPercentileAgg (0.01 );
672769 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_5:
673- return new (memoryPool) PercentileAgg (5 );
770+ return new (memoryPool) PercentileAgg (0.05 );
674771 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_5:
675- return new (memoryPool) TDigestToPercentileAgg (5 );
772+ return new (memoryPool) TDigestToPercentileAgg (0.05 );
676773 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_25:
677- return new (memoryPool) PercentileAgg (25 );
774+ return new (memoryPool) PercentileAgg (0. 25 );
678775 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_25:
679- return new (memoryPool) TDigestToPercentileAgg (25 );
776+ return new (memoryPool) TDigestToPercentileAgg (0. 25 );
680777 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_75:
681- return new (memoryPool) PercentileAgg (75 );
778+ return new (memoryPool) PercentileAgg (0. 75 );
682779 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_75:
683- return new (memoryPool) TDigestToPercentileAgg (75 );
780+ return new (memoryPool) TDigestToPercentileAgg (0. 75 );
684781 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_95:
685- return new (memoryPool) PercentileAgg (95 );
782+ return new (memoryPool) PercentileAgg (0. 95 );
686783 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_95:
687- return new (memoryPool) TDigestToPercentileAgg (95 );
784+ return new (memoryPool) TDigestToPercentileAgg (0. 95 );
688785 case EXPRESSION_TYPE_AGGREGATE_PERCENTILE_99:
689- return new (memoryPool) PercentileAgg (99 );
786+ return new (memoryPool) PercentileAgg (0. 99 );
690787 case EXPRESSION_TYPE_AGGREGATE_TDIGEST_TO_PERCENTILE_99:
691- return new (memoryPool) TDigestToPercentileAgg (99 );
788+ return new (memoryPool) TDigestToPercentileAgg (0. 99 );
692789 default :
693790 {
694791 char message[128 ];
0 commit comments