forked from hep-cce2/root_serialization
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSharedPDSSource.cc
More file actions
126 lines (108 loc) · 4.46 KB
/
SharedPDSSource.cc
File metadata and controls
126 lines (108 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include "SharedPDSSource.h"
#include "Deserializer.h"
#include "UnrolledDeserializer.h"
#include "TClass.h"
using namespace cce::tf;
SharedPDSSource::SharedPDSSource(unsigned int iNLanes, unsigned long long iNEvents, std::string const& iName) :
SharedSourceBase(iNEvents),
file_{iName, std::ios_base::binary},
readTime_{std::chrono::microseconds::zero()}
{
pds::Serialization serialization;
auto productInfo = readFileHeader(file_, compression_, serialization);
laneInfos_.reserve(iNLanes);
for(unsigned int i = 0; i< iNLanes; ++i) {
DeserializeStrategy strategy;
switch(serialization) {
case pds::Serialization::kRoot: {
strategy = DeserializeStrategy::make<DeserializeProxy<Deserializer>>(); break;
}
case pds::Serialization::kRootUnrolled: {
strategy = DeserializeStrategy::make<DeserializeProxy<UnrolledDeserializer>>(); break;
}
}
laneInfos_.emplace_back(productInfo, std::move(strategy));
}
}
SharedPDSSource::LaneInfo::LaneInfo(std::vector<pds::ProductInfo> const& productInfo, DeserializeStrategy deserialize):
deserializers_{std::move(deserialize)},
decompressTime_{std::chrono::microseconds::zero()},
deserializeTime_{std::chrono::microseconds::zero()}
{
dataProducts_.reserve(productInfo.size());
dataBuffers_.resize(productInfo.size(), nullptr);
deserializers_.reserve(productInfo.size());
size_t index =0;
for(auto const& pi : productInfo) {
TClass* cls = TClass::GetClass(pi.className().c_str());
dataBuffers_[index] = cls->New();
assert(cls);
dataProducts_.emplace_back(index,
&dataBuffers_[index],
pi.name(),
cls,
&delayedRetriever_);
deserializers_.emplace_back(cls);
++index;
}
}
SharedPDSSource::LaneInfo::~LaneInfo() {
auto it = dataProducts_.begin();
for( void * b: dataBuffers_) {
it->classType()->Destructor(b);
++it;
}
}
size_t SharedPDSSource::numberOfDataProducts() const {
return laneInfos_[0].dataProducts_.size();
}
std::vector<DataProductRetriever>& SharedPDSSource::dataProducts(unsigned int iLane, long iEventIndex) {
return laneInfos_[iLane].dataProducts_;
}
EventIdentifier SharedPDSSource::eventIdentifier(unsigned int iLane, long iEventIndex) {
return laneInfos_[iLane].eventID_;
}
void SharedPDSSource::readEventAsync(unsigned int iLane, long iEventIndex, OptionalTaskHolder iTask) {
queue_.push(*iTask.group(), [iLane, optTask = std::move(iTask), this]() mutable {
auto start = std::chrono::high_resolution_clock::now();
std::vector<uint32_t> buffer;
if(pds::readCompressedEventBuffer(file_, this->laneInfos_[iLane].eventID_, buffer)) {
auto group = optTask.group();
group->run([this, buffer=std::move(buffer), task = optTask.releaseToTaskHolder(), iLane]() {
auto& laneInfo = this->laneInfos_[iLane];
auto start = std::chrono::high_resolution_clock::now();
std::vector<uint32_t> uBuffer = pds::uncompressEventBuffer(this->compression_, buffer);
laneInfo.decompressTime_ +=
std::chrono::duration_cast<decltype(laneInfo.decompressTime_)>(std::chrono::high_resolution_clock::now() - start);
start = std::chrono::high_resolution_clock::now();
pds::deserializeDataProducts(uBuffer.begin(), uBuffer.end(), laneInfo.dataProducts_, laneInfo.deserializers_);
laneInfo.deserializeTime_ +=
std::chrono::duration_cast<decltype(laneInfo.deserializeTime_)>(std::chrono::high_resolution_clock::now() - start);
});
}
readTime_ +=std::chrono::duration_cast<decltype(readTime_)>(std::chrono::high_resolution_clock::now() - start);
});
}
void SharedPDSSource::printSummary() const {
std::cout <<"\nSource:\n"
" read time: "<<readTime().count()<<"us\n"
" decompress time: "<<decompressTime().count()<<"us\n"
" deserialize time: "<<deserializeTime().count()<<"us\n"<<std::endl;
};
std::chrono::microseconds SharedPDSSource::readTime() const {
return readTime_;
}
std::chrono::microseconds SharedPDSSource::decompressTime() const {
auto time = std::chrono::microseconds::zero();
for(auto const& l : laneInfos_) {
time += l.decompressTime_;
}
return time;
}
std::chrono::microseconds SharedPDSSource::deserializeTime() const {
auto time = std::chrono::microseconds::zero();
for(auto const& l : laneInfos_) {
time += l.deserializeTime_;
}
return time;
}