// Copyright (C) 2002-2025, Open Design Alliance (the "Alliance"). // All rights reserved. // // This software and its documentation and related materials are owned by // the Alliance. The software may only be incorporated into application // programs owned by members of the Alliance, subject to a signed // Membership Agreement and Supplemental Software License Agreement with the // Alliance. The structure and organization of this software are the valuable // trade secrets of the Alliance and its suppliers. The software is also // protected by copyright law and international treaty provisions. Application // programs incorporating this software must include the following statement // with their copyright notices: // // This application incorporates Open Design Alliance software pursuant to a license // agreement with Open Design Alliance. // Open Design Alliance Copyright (C) 2002-2025 by Open Design Alliance. // All rights reserved. // // By use of this software, its documentation or related materials, you // acknowledge and accept the above terms. /////////////////////////////////////////////////////////////////////////////// /* Main OdVisualizeStreamingClient implementation */ #include "OdaCommon.h" #include "Client.h" bool OdTvStreamingClient::startClient( const char* ip, const char* port ) { m_buffer.resize( CLIENT_PACK_SIZE ); setClientStatus( Status::kNotConnected ); if( !initialize( ip, port ) ) { setClientStatus( Status::kError ); return false; } if( !clientLoop() ) { setClientStatus( Status::kError ); return false; } if( !shutdownClient() ) { setClientStatus( Status::kError ); return false; } setClientStatus( Status::kNotConnected ); return true; } bool OdTvStreamingClient::clientLoop() { int nTotal = 0; bool bRequest = true; for( ;;) { if( !onClientLoopIteration() ) return false; if( !hasConnection() ) { if( !connectServer() ) return false; } else { setClientStatus( Status::kOk ); int nBytes = 0; if( !readFromServer( m_buffer.asArrayPtr(), (int)m_buffer.size(), nBytes ) ) return false; if( nBytes > 0 ) { nTotal += nBytes; printf( "Received %d bytes, total %d\n", nBytes, nTotal ); processIncoming( nBytes ); } OdTvMessagePtr pMessage = getTopCommand(); if( pMessage ) { if( !sendMessage( pMessage ) ) return false; if( pMessage->currentOffset() == pMessage->binDataSize() ) { //Message complitely sent if( isClosing() ) { break; } removeTopCommand(); onMessageSent( pMessage ); } } } } return true; } void OdTvStreamingClient::processIncoming( int nBytes ) { m_messageBuilder.buildFromBuffer( m_buffer.asArrayPtr(), nBytes, this ); } bool OdTvStreamingClient::sendMessage( OdTvMessagePtr pMsg ) { if( !pMsg ) return false; int nSent = 0; size_t headerSz = 0; //If message is new, header will be added. Otherwise data will not contain header. if( pMsg->currentOffset() == 0 ) headerSz = pMsg->headerSize(); if( !pMsg->writeTo( m_buffer.asArrayPtr(), m_buffer.size() ) ) return false; int nBytes = (int)( headerSz + pMsg->currentOffset() ); bool bFull = nBytes == headerSz + pMsg->binDataSize(); if( nBytes > 0 ) { if( !writeToServer( m_buffer.asArrayPtr(), nBytes, nSent ) ) return false; if( nBytes == nSent && bFull ) { printf( "Full message sent\n" ); } else if( nBytes != nSent ) { pMsg->resetOffset( (unsigned)( nSent - headerSz ) ); printf( "Incomplete message sendt\n" ); } } else { printf( "Nothing to sent\n" ); } return true; } void OdTvStreamingClient::onMessage( OdTvMessagePtr pMessage ) { printf( "Received message\n" ); unsigned nResponse = 0; { std::lock_guard guard( m_awaitsMutex ); for( ; nResponse < m_awaitsResponses.size(); nResponse++ ) { if( pMessage->id() == m_awaitsResponses[ nResponse ].id ) break; } if( nResponse == m_awaitsResponses.size() ) { printf( "Unknown responce id\n" ); return; } if( !isCorrectResponseType( pMessage, m_awaitsResponses[ nResponse ] ) ) { printf( "Invalid responce type\n" ); return; } if( m_awaitsResponses[ nResponse ].isSingle() ) m_awaitsResponses.removeAt( nResponse ); } if( pMessage->type() == OdTvMessage::Type::kStringList ) { OdVector< OdString > files; if( !m_messageBuilder.extractStringListFromMessage( pMessage, files ) ) { printf( "Can not parse string list\n" ); } else { std::shared_ptr< OdTvStreamingClientStringListData > pData = std::make_shared< OdTvStreamingClientStringListData >(); printf( "Received string list\n" ); for( unsigned i = 0; i < files.size(); ++i ) { pData->stringList.push_back( files[ i ] ); } std::lock_guard guard( m_responseMutex ); m_responses[ pMessage->id() ] = pData; } } else if( pMessage->type() == OdTvMessage::Type::kBinData ) { printf( "Received binary data\n" ); std::lock_guard guard( m_responseMutex ); auto pIter = m_responses.find( pMessage->id() ); OdTvStreamingClientBinaryData* pData = nullptr; if( pIter != m_responses.end() ) { pData = dynamic_cast( pIter->second.get() ); } else { std::shared_ptr< OdTvStreamingClientBinaryData > pD = std::make_shared< OdTvStreamingClientBinaryData >(); m_responses[ pMessage->id() ] = pD; pData = pD.get(); } if( !pData ) { printf( "Can not store response for binary data\n" ); return; } if( pData->buffer.empty() ) { pData->set( pMessage->binData(), pMessage->binDataSize() ); } else { pData->add( pMessage->binData(), pMessage->binDataSize() ); } } else if( pMessage->type() == OdTvMessage::Type::kFileStatus ) { printf( "Received file status\n" ); OdTvMessage::FileStatus status = m_messageBuilder.extractStatusFromMessage( pMessage ); OdTvStreamingClientData::Status st = OdTvStreamingClientData::Status::kInProgress; if( status == OdTvMessage::FileStatus::kNotFound ) { st = OdTvStreamingClientData::Status::kError; } else if( status == OdTvMessage::FileStatus::kComplete ) { st = OdTvStreamingClientData::Status::kComplete; } std::lock_guard guard( m_responseMutex ); auto pIter = m_responses.find( pMessage->id() ); OdTvStreamingClientBinaryData* pData = nullptr; if( pIter != m_responses.end() ) { pData = dynamic_cast( pIter->second.get() ); } else { std::shared_ptr< OdTvStreamingClientBinaryData > pD = std::make_shared< OdTvStreamingClientBinaryData >(); m_responses[ pMessage->id() ] = pD; pData = pD.get(); } if( !pData ) { printf( "Can not store response for binary data\n" ); return; } pData->currentStatus = st; } else { printf( "Unsupported message\n" ); } } bool OdTvStreamingClient::isCorrectResponseType( OdTvMessagePtr pMsg, const AwaitsResponse& resp ) { if( resp.m_command == OdTvMessage::Commands::kGet_FileList_Linear || resp.m_command == OdTvMessage::Commands::kGet_FileList_Partial ) { return pMsg->type() == OdTvMessage::Type::kStringList; } if( resp.m_command == OdTvMessage::Commands::kUnknown ) { return pMsg->type() == OdTvMessage::Type::kBinData || pMsg->type() == OdTvMessage::Type::kFileStatus; } return false; } void OdTvStreamingClient::onMessageSent( OdTvMessagePtr pMessage ) { if( pMessage->type() == OdTvMessage::Type::kCommand ) { OdTvMessage::Commands cmd = m_messageBuilder.extractCommandFromMessage( pMessage ); m_awaitsResponses.push_back( std::move( AwaitsResponse{ pMessage->id(), cmd } ) ); } else if( pMessage->type() == OdTvMessage::Type::kFileRequest || pMessage->type() == OdTvMessage::Type::kFilePartsRequest ) { m_awaitsResponses.push_back( std::move( AwaitsResponse{ pMessage->id(), OdTvMessage::Commands::kUnknown } ) ); } }