// Copyright (C) 2002-2025, Open Design Alliance (the "Alliance"). // All rights reserved. // // This software and its documentation and related materials are owned by // the Alliance. The software may only be incorporated into application // programs owned by members of the Alliance, subject to a signed // Membership Agreement and Supplemental Software License Agreement with the // Alliance. The structure and organization of this software are the valuable // trade secrets of the Alliance and its suppliers. The software is also // protected by copyright law and international treaty provisions. Application // programs incorporating this software must include the following statement // with their copyright notices: // // This application incorporates Open Design Alliance software pursuant to a license // agreement with Open Design Alliance. // Open Design Alliance Copyright (C) 2002-2025 by Open Design Alliance. // All rights reserved. // // By use of this software, its documentation or related materials, you // acknowledge and accept the above terms. /////////////////////////////////////////////////////////////////////////////// /* Main OdVisualizeStreamingServer implementation */ #include "OdaCommon.h" #include "Server.h" #include "TvDatabaseReceiver.h" #include "Core\Include\DbSystemServices.h" #include #include OdTvStreamingServer::~OdTvStreamingServer() { m_commands.clear(); } bool OdTvStreamingServer::start( const char* port ) { m_buffer.resize( SERVER_PACK_SIZE ); if( !initialize( port ) ) return false; m_bDone = false; if( !serverLoop() ) return false; return true; } bool OdTvStreamingServer::serverLoop() { while( !m_bDone ) { if( !onServerLoopIteration() ) return false; if( !hasClient() ) { if( !connectClient() ) return false; } else { int nBytes = 0; if( !readFromClient( m_buffer.asArrayPtr(), (int)m_buffer.size(), nBytes ) ) return false; if( nBytes > 0 ) { m_builder.buildFromBuffer( m_buffer.asArrayPtr(), nBytes, this ); } //Process commands if( !m_commands.empty() ) { for( auto it = m_commands.begin(); it != m_commands.end(); it++ ) { do { if( !processCommand( *it ) ) return false; } while( ( *it )->m_bIncomplete ); } } removeDoneCommands(); std::this_thread::sleep_for( std::chrono::milliseconds( 30 ) ); } } return true; } void OdTvStreamingServer::addCommand( ServerCommandPtr pCmd ) { m_commands.push_back( pCmd ); } void OdTvStreamingServer::removeCommand( OdUInt64 id ) { for( std::list< ServerCommandPtr >::const_iterator it = m_commands.cbegin(); it != m_commands.cend(); ++it ) { switch( (*it)->m_type ) { case ServerCommand::Type::kBinData: case ServerCommand::Type::kFile: { const ServerCommand_BinaryData* pCmd = reinterpret_cast( it->get() ); if( pCmd->pMessage && pCmd->pMessage->id() == id ) { m_commands.erase( it ); return; } } break; } } } bool OdTvStreamingServer::processCommand( ServerCommandPtr pCmd ) { if( pCmd->m_bDone ) { ODA_FAIL(); return true; } pCmd->m_bIncomplete = false; switch( pCmd->m_type ) { case ServerCommand::Type::kIdle: { pCmd->m_bDone = true; return true; } break; case ServerCommand::Type::kExit: { pCmd->m_bDone = true; m_bDone = true; return true; } break; case ServerCommand::Type::kBinData: { return processBinaryCommand( reinterpret_cast( pCmd.get() ) ); } break; case ServerCommand::Type::kFile: { return processFileCommand( reinterpret_cast( pCmd.get() ) ); } break; } ODA_FAIL(); return true; } void OdTvStreamingServer::removeDoneCommands() { if( m_commands.empty() ) return; if( m_commands.size() > 1 ) { bool stop = true; } for( auto it = m_commands.begin(); it != m_commands.end(); ) { if( !( *it )->m_bDone ) { it++; continue; } if( ( *it )->m_type == ServerCommand::Type::kFile ) { ServerCommand_BinaryData* pCmd = reinterpret_cast( (*it).get() ); if( pCmd->pMessage ) { m_logger.logMessage( "File request %llu complete", pCmd->pMessage->id() ); } else { m_logger.logMessage( "File request UNKNOWN complete" ); } if( pCmd->pMessage ) { std::shared_ptr< ServerCommand_FileStatus > pStatCmd = std::make_shared< ServerCommand_FileStatus >(); pStatCmd->m_bDone = false; pStatCmd->m_type = ServerCommand::Type::kBinData; pStatCmd->pMessage = m_builder.buildFileStatusMessage( OdTvMessage::FileStatus::kComplete ); pStatCmd->pMessage->setId( pCmd->pMessage->id() ); addCommand( pStatCmd ); } } it = m_commands.erase( it ); } } bool OdTvStreamingServer::processBinaryCommand( ServerCommand_BinaryData* pCmd ) { if( pCmd->m_bDone ) { ODA_FAIL(); return true; } int nSent = 0; size_t headerSz = 0; //If message is new, header will be added. Otherwise data will not contain header. if( pCmd->pMessage->currentOffset() == 0 ) headerSz = pCmd->pMessage->headerSize(); if( !pCmd->pMessage->writeTo( m_buffer.asArrayPtr(), m_buffer.size() ) ) return false; int nBytes = (int)( headerSz + pCmd->pMessage->currentOffset() ); unsigned oldOffset = pCmd->pMessage->currentOffset(); bool bFull = nBytes == headerSz + pCmd->pMessage->binDataSize(); if( nBytes > 0 ) { if( !writeToClient( m_buffer.asArrayPtr(), nBytes, nSent ) ) return false; if( nBytes == nSent && bFull ) { pCmd->m_bDone = true; } else if( nSent == 0 ) { pCmd->pMessage->resetOffset( oldOffset ); return true; } else if( nBytes != nSent ) { pCmd->pMessage->resetOffset( (unsigned)( nSent - headerSz ) ); pCmd->m_bIncomplete = true; } } else pCmd->m_bDone = true; return true; } bool OdTvStreamingServer::processFileCommand( ServerCommand_File* pCmd ) { if( pCmd->m_bDone ) { ODA_FAIL(); return true; } if( pCmd->pMessage->currentOffset() == pCmd->pMessage->binDataSize() ) { pCmd->syncBuffer( m_builder ); if( pCmd->m_bDone ) return true; } processBinaryCommand( pCmd ); if( pCmd->m_bDone ) //whole buffer sent { pCmd->syncBuffer( m_builder ); if( pCmd->m_bDone ) return true; } else { pCmd->m_bIncomplete = true; } return true; } OdTvStreamingServer::FileAddingResult OdTvStreamingServer::addFile( const OdString& filePath ) { OdTvResult rc = tvOk; OdUInt8 flgs = OdTvDatabaseReceiver::collectStreamingFlags( filePath, &rc ); if( rc == tvCannotOpenFile ) return FileAddingResult::kNotFound; if( !GETBIT( flgs, OdTvDatabaseReceiver::k_VSFXStreaming_CompatibleFlag ) ) return FileAddingResult::kNotStreamingCompatible; FileDescr d; d.filePath = filePath; int sep = filePath.reverseFind( L'\\' ); if( sep > 0 ) { d.clientName = filePath.right( filePath.getLength() - sep - 1 ); } else { d.clientName = filePath; } d.bSupportPartial = GETBIT( flgs, OdTvDatabaseReceiver::k_VSFXStreaming_FriendlyFlag ) && GETBIT( flgs, OdTvDatabaseReceiver::k_VSFXStreaming_HasPartialIndexFlag ); m_files.push_back( d ); return d.bSupportPartial ? FileAddingResult::kPartialStreaming : FileAddingResult::kLinearStreaming; } void OdTvStreamingServer::onMessage( OdTvMessagePtr pMessage ) { if( pMessage->type() == OdTvMessage::Type::kCommand ) { OdTvMessage::Commands cmd = m_builder.extractCommandFromMessage( pMessage ); if( cmd == OdTvMessage::Commands::kGet_FileList_Linear || cmd == OdTvMessage::Commands::kGet_FileList_Partial ) { m_logger.logMessage( "Received FileList command" ); std::shared_ptr< ServerCommand_BinaryData > pCmd = std::make_shared< ServerCommand_BinaryData >(); pCmd->m_type = ServerCommand::Type::kBinData; pCmd->m_bDone = false; OdVector< OdString > files; for( unsigned i = 0; i < m_files.size(); ++i ) { if( cmd == OdTvMessage::Commands::kGet_FileList_Linear ) files.push_back( m_files[ i ].clientName ); else { if( m_files[ i ].bSupportPartial ) files.push_back( m_files[ i ].clientName ); } } pCmd->pMessage = m_builder.buildStringListMessage( files ); pCmd->pMessage->setId( pMessage->id() ); addCommand( pCmd ); } else if( cmd == OdTvMessage::Commands::kClose ) { m_commands.clear(); m_logger.logMessage( "Received Close command" ); std::shared_ptr< ServerCommand > pCmd = std::make_shared< ServerCommand >(); pCmd->m_type = ServerCommand::Type::kExit; pCmd->m_bDone = false; addCommand( pCmd ); } else if( cmd == OdTvMessage::Commands::kCancelRequest ) { OdUInt64 id = pMessage->id(); m_logger.logMessage( "Canceling request %llu", id ); removeCommand( id ); } else { m_logger.logMessage( "Received unknow command" ); } } else if( pMessage->type() == OdTvMessage::Type::kFileRequest ) { m_logger.logMessage( "Received File request %llu", pMessage->id() ); std::shared_ptr< ServerCommand_File > pCmd = std::make_shared< ServerCommand_File >(); pCmd->m_type = ServerCommand::Type::kFile; pCmd->m_bDone = false; pCmd->filename = resolveFile( m_builder.extractFileRequestMessage( pMessage ) ); pCmd->syncBuffer( m_builder ); if( pCmd->pStream.isNull() ) { m_logger.logMessage( "ERROR: requested unavailable file" ); std::shared_ptr< ServerCommand_FileStatus > pStatCmd = std::make_shared< ServerCommand_FileStatus >(); pStatCmd->m_bDone = false; pStatCmd->m_type = ServerCommand::Type::kBinData; pStatCmd->pMessage = m_builder.buildFileStatusMessage( OdTvMessage::FileStatus::kNotFound ); pStatCmd->pMessage->setId( pMessage->id() ); addCommand( pStatCmd ); return; } pCmd->pMessage->setId( pMessage->id() ); if( !pCmd->m_bDone ) { addCommand( pCmd ); } } else if( pMessage->type() == OdTvMessage::Type::kFilePartsRequest ) { m_logger.logMessage( "Received File Parts request %llu", pMessage->id() ); std::shared_ptr< ServerCommand_FileParts > pCmd = std::make_shared< ServerCommand_FileParts >(); pCmd->m_type = ServerCommand::Type::kFile; pCmd->m_bDone = false; OdString file = OdString::kEmpty; if( !m_builder.extractFilePartsFromMessage( pMessage, file, pCmd->parts ) ) { m_logger.logMessage( "ERROR whilke parcing File Parts" ); return; } pCmd->filename = resolveFile( file ); pCmd->syncBuffer( m_builder ); if( pCmd->pStream.isNull() ) { m_logger.logMessage( "ERROR: requested unavailable file" ); std::shared_ptr< ServerCommand_FileStatus > pStatCmd = std::make_shared< ServerCommand_FileStatus >(); pStatCmd->m_bDone = false; pStatCmd->m_type = ServerCommand::Type::kBinData; pStatCmd->pMessage = m_builder.buildFileStatusMessage( OdTvMessage::FileStatus::kNotFound ); pStatCmd->pMessage->setId( pMessage->id() ); addCommand( pStatCmd ); return; } pCmd->pMessage->setId( pMessage->id() ); if( !pCmd->m_bDone ) { addCommand( pCmd ); } } else { m_logger.logMessage( "Received unknow message" ); } } void OdTvStreamingServer::ServerCommand_File::syncBuffer( const OdTvMessageBuilder& builder ) { if( pStream.isNull() ) { pStream = odTvSystemServices()->createFile( filename ); if( pStream.isNull() ) { throw "SystemServices error"; } } if( pStream->isEof() ) { m_bDone = true; return; } m_bDone = false; OdUInt64 id = 0; if( pMessage ) id = pMessage->id(); unsigned bufSize = (unsigned)( SERVER_PACK_SIZE - OdTvMessage::headerSize() ); if( bufSize > pStream->length() - pStream->tell() ) { bufSize = (unsigned)( pStream->length() - pStream->tell() ); } OdVector< char > buf; buf.resize( bufSize ); pStream->getBytes( buf.asArrayPtr(), bufSize ); pMessage = builder.buildBinaryMessage( buf.asArrayPtr(), bufSize ); if( id != 0 ) pMessage->setId( id ); } void OdTvStreamingServer::ServerCommand_FileParts::syncBuffer( const OdTvMessageBuilder& builder ) { if( pStream.isNull() ) { try { pStream = odTvSystemServices()->createFile( filename ); } catch( ... ) { pStream = OdStreamBufPtr(); } if( pStream.isNull() ) { return; } } if( parts.empty() || curPart == parts.size() ) { m_bDone = true; return; } m_bDone = false; OdUInt64 id = 0; if( pMessage ) id = pMessage->id(); unsigned bufSize = (unsigned)( SERVER_PACK_SIZE - OdTvMessage::headerSize() ); OdVector< char > buf; buf.resize( bufSize ); OdUInt64 nBytesMax = (OdUInt64)bufSize; OdUInt64 nBytes = 0; for( ; curPart < parts.size(); ) { pStream->seek( parts[ curPart ].offset + curOffset, OdDb::kSeekFromStart ); OdUInt64 bytesPart = parts[ curPart ].size - curOffset; if( bytesPart < nBytesMax - nBytes ) //can complitely write { pStream->getBytes( buf.asArrayPtr() + nBytes, (OdUInt32)bytesPart ); nBytes += bytesPart; curPart++; curOffset = 0; } else { bytesPart = nBytesMax - nBytes; pStream->getBytes( buf.asArrayPtr() + nBytes, (OdUInt32)bytesPart ); curOffset += bytesPart; nBytes += bytesPart; break; } } pMessage = builder.buildBinaryMessage( buf.asArrayPtr(), nBytes ); if( id != 0 ) pMessage->setId( id ); } OdString OdTvStreamingServer::resolveFile( const OdString& clientName ) const { for( const auto& it : m_files ) { if( it.clientName == clientName ) return it.filePath; } return OdString::kEmpty; } //ConsoleLogger void ConsoleLogger::logMessage( const char* msg, ... ) { if( m_bCanUpdate ) { printf( "\n" ); fflush( nullptr ); m_bCanUpdate = false; } else { printf( "\n" ); } va_list args; va_start( args, msg ); vprintf( msg, args ); va_end( args ); fflush( nullptr ); } void ConsoleLogger::updateMessage( const char* msg, ... ) { if( m_bCanUpdate ) { printf( "\r" ); } else { printf( "\n" ); m_bCanUpdate = true; } fflush( nullptr ); va_list args; va_start( args, msg ); vprintf( msg, args ); va_end( args ); fflush( nullptr ); }