c++ - Using boost::thread to start/stop logging data (2nd update) -
i'm trying log real-time data using boost::thread , check box. when check box, logging thread starts. when uncheck, logging thread stops. problem arises when check/uncheck repeatedly , fast (program crashes, files aren't logged, etc.). how can write reliable thread-safe program these problems don't occur when repeatedly , checking/unchecking? don't want use join() since temporarily stops data input coming main thread. in secondary thread, i'm opening log file, reading socket buffer, copying buffer, , writing buffer log file. i'm thinking maybe should use mutex locks reading/writing. if so, specific locks should use? below code snippet:
//main thread if(m_loggingcheckbox->ischecked()) { ... if(m_threadlogdata.initializereadthread(socketinfo))//opens socket. //if socket opened , can read, start thread. m_threadlogdata.startreadthread(); else std::cout << "did not initialize thread\n"; } else if(!m_loggingcheckbox->ischecked()) { m_threadlogdata.stopreadthread(); } void threadlogdata::startreadthread() { //std::cout << "thread started." << std::endl; m_stoplogthread = false; m_threadsenddata = boost::thread(&threadlogdata::logdata,this); } void threadlogdata::stopreadthread() { m_stoplogthread = true; m_readdatasocket.close_socket(); // close socket if(ofstreamlogfile.is_open()) { ofstreamlogfile.flush(); //flush log file before closing it. ofstreamlogfile.close(); // close log file } m_threadsenddata.interrupt(); // interrupt thread //m_threadsenddata.join(); // join thread. commented out since temporarily stops data input. } //secondary thread bool threadlogdata::logdata() { unsigned short int buffer[1024]; bool bufferflag; unsigned int isizeofbuffer = 1024; int isizeofbufferread = 0; int ltimeout = 5; if(!ofstreamlogfile.is_open()) { ofstreamlogfile.open(directory_string().c_str(), ios::out); if(!ofstreamlogfile.is_open()) { return 0; } } while(!m_stoplogthread) { try { int ret = m_readdatasocket.read_sock(&m_msgbuffer.m_buffer [0],isizeofbuffer,ltimeout,&isizeofbufferread); memcpy(&buffer[0],m_msgbuffer.m_buffer,isizeofbufferread); bufferflag = m_buffer.setbuffer(buffer); if(!bufferflag) return false; object = &m_buffer; unsigned int data = object->getdata(); ofstreamlogfile << data << std::endl; boost::this_thread::interruption_point(); } catch (boost::thread_interrupted& interruption) { std::cout << "threadlogdata::logdata(): caught interruption thread." << std::endl; stopreadthread(); } catch (...) { std::cout << "threadlogdata::logdata(): caught something." << std::endl; stopreadthread(); } } // end while() }
i use boost asio async stuff
#include <iostream> #include <fstream> #include <boost/asio.hpp> #include <boost/asio/signal_set.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/bind.hpp> #include <boost/optional.hpp> #include <thread> using boost::asio::ip::tcp; namespace asio = boost::asio; struct program { asio::io_service _ioservice; asio::deadline_timer _timer; asio::signal_set _signals; std::array<char, 1024> _buffer; tcp::socket _client; tcp::resolver _resolver; std::ofstream _logfile; std::thread _thread; program() : _timer(_ioservice), _signals(_ioservice), _client(_ioservice), _resolver(_ioservice) { do_connect(_resolver.resolve({ "localhost", "6767" })); do_toggle_logging_cycle(); _signals.add(sigint); _signals.async_wait([this](boost::system::error_code ec, int) { if (!ec) close(); }); _thread = std::thread(boost::bind(&asio::io_service::run, boost::ref(_ioservice))); } ~program() { if (_thread.joinable()) _thread.join(); } void close() { _ioservice.post([this]() { _signals.cancel(); _timer.cancel(); _client.close(); }); } private: void do_toggle_logging_cycle(boost::system::error_code ec = {}) { if (ec != boost::asio::error::operation_aborted) { if (_logfile.is_open()) { _logfile.close(); _logfile.clear(); } else { _logfile.open("/tmp/output.log"); } _timer.expires_from_now(boost::posix_time::seconds(2)); _timer.async_wait(boost::bind(&program::do_toggle_logging_cycle, this, boost::asio::placeholders::error())); } else { std::cerr << "\ndone, goobye\n"; } } void do_connect(tcp::resolver::iterator endpoint_iterator) { boost::asio::async_connect( _client, endpoint_iterator, [this](boost::system::error_code ec, tcp::resolver::iterator) { if (!ec) do_read(); else close(); }); } void do_read() { boost::asio::async_read( _client, asio::buffer(_buffer.data(), _buffer.size()), [this](boost::system::error_code ec, std::size_t length) { if (!ec) { if (_logfile.is_open()) { _logfile.write(_buffer.data(), length); } do_read(); } else { close(); } }); } }; int main() { { program p; // socket reading , (optional) logging on separate thread std::cout << "\nmain thread going sleep 15 seconds...\n"; std::this_thread::sleep_for(std::chrono::seconds(15)); p.close(); // if user doesn't press ^c, let's take initiative std::cout << "\ndestruction of program...\n"; } std::cout << "\nmain thread ends\n"; };
the program connects port 6767 of localhost , asynchronously reads data it.
if logging active (_logfile.is_open()
), received data written /tmp/output.log
.
now
- the reading/writing on separate thread, operations serialized using
_ioservice
(see e.g.post
inclose()
) - the user can abort the socket reading loop ctrl+c
- every 2 seconds, logging (de)activated (see
do_toggle_logging_cycle
)
the main thread sleeps 15 seconds before canceling program (similar user pressing ctrl-c).
Comments
Post a Comment